1use chie_shared::BandwidthProof;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12use tracing::{debug, error, info, warn};
13
14#[derive(Debug, Clone)]
16pub struct ProofSubmitConfig {
17 pub coordinator_url: String,
19 pub max_retries: u32,
21 pub initial_delay: Duration,
23 pub max_delay: Duration,
25 pub backoff_multiplier: f64,
27 pub timeout: Duration,
29 pub max_queue_size: usize,
31 pub persist_queue: bool,
33}
34
35impl Default for ProofSubmitConfig {
36 fn default() -> Self {
37 Self {
38 coordinator_url: "http://localhost:3000".to_string(),
39 max_retries: 5,
40 initial_delay: Duration::from_secs(1),
41 max_delay: Duration::from_secs(60),
42 backoff_multiplier: 2.0,
43 timeout: Duration::from_secs(30),
44 max_queue_size: 1000,
45 persist_queue: true,
46 }
47 }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SubmitResult {
53 pub accepted: bool,
55 pub proof_id: Option<uuid::Uuid>,
57 pub reward: Option<u64>,
59 pub error: Option<String>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum SubmitState {
66 Pending,
68 Submitting,
70 Submitted,
72 Failed,
74}
75
76#[derive(Debug, Clone)]
78pub struct QueuedProof {
79 pub proof: BandwidthProof,
81 pub state: SubmitState,
83 pub attempts: u32,
85 pub last_attempt: Option<Instant>,
87 pub last_error: Option<String>,
89 pub next_retry: Option<Instant>,
91 pub queued_at: Instant,
93}
94
95impl QueuedProof {
96 fn new(proof: BandwidthProof) -> Self {
97 Self {
98 proof,
99 state: SubmitState::Pending,
100 attempts: 0,
101 last_attempt: None,
102 last_error: None,
103 next_retry: None,
104 queued_at: Instant::now(),
105 }
106 }
107}
108
109#[derive(Debug, Clone, Default)]
111pub struct SubmitStats {
112 pub total_submitted: u64,
114 pub total_failed: u64,
116 pub total_retries: u64,
118 pub avg_submit_time_ms: f64,
120 pub queue_size: usize,
122 pub pending_retry: usize,
124}
125
126pub struct ProofSubmitter {
128 config: ProofSubmitConfig,
129 client: reqwest::Client,
130 queue: Arc<RwLock<VecDeque<QueuedProof>>>,
131 stats: Arc<RwLock<SubmitStats>>,
132 running: Arc<RwLock<bool>>,
133}
134
135impl ProofSubmitter {
136 #[must_use]
138 #[inline]
139 pub fn new(config: ProofSubmitConfig) -> Self {
140 let client = reqwest::Client::builder()
141 .timeout(config.timeout)
142 .build()
143 .expect("Failed to create HTTP client");
144
145 Self {
146 config,
147 client,
148 queue: Arc::new(RwLock::new(VecDeque::new())),
149 stats: Arc::new(RwLock::new(SubmitStats::default())),
150 running: Arc::new(RwLock::new(false)),
151 }
152 }
153
154 pub async fn queue_proof(&self, proof: BandwidthProof) -> Result<(), ProofSubmitError> {
156 let mut queue = self.queue.write().await;
157
158 if queue.len() >= self.config.max_queue_size {
159 return Err(ProofSubmitError::QueueFull);
160 }
161
162 queue.push_back(QueuedProof::new(proof));
163
164 let mut stats = self.stats.write().await;
165 stats.queue_size = queue.len();
166
167 debug!("Queued proof for submission, queue size: {}", queue.len());
168 Ok(())
169 }
170
171 pub async fn submit_now(
173 &self,
174 proof: &BandwidthProof,
175 ) -> Result<SubmitResult, ProofSubmitError> {
176 self.do_submit(proof).await
177 }
178
179 pub async fn submit_with_retry(
181 &self,
182 proof: BandwidthProof,
183 ) -> Result<SubmitResult, ProofSubmitError> {
184 let mut attempts = 0;
185 let mut delay = self.config.initial_delay;
186 let mut last_error = None;
187
188 while attempts < self.config.max_retries {
189 attempts += 1;
190
191 match self.do_submit(&proof).await {
192 Ok(result) => {
193 if result.accepted {
194 let mut stats = self.stats.write().await;
195 stats.total_submitted += 1;
196 if attempts > 1 {
197 stats.total_retries += attempts as u64 - 1;
198 }
199 return Ok(result);
200 } else {
201 return Ok(result);
203 }
204 }
205 Err(e) if e.is_retryable() => {
206 last_error = Some(e);
207 warn!(
208 "Proof submission attempt {} failed, retrying in {:?}",
209 attempts, delay
210 );
211 tokio::time::sleep(delay).await;
212
213 delay = Duration::from_secs_f64(
215 (delay.as_secs_f64() * self.config.backoff_multiplier)
216 .min(self.config.max_delay.as_secs_f64()),
217 );
218 }
219 Err(e) => {
220 return Err(e);
222 }
223 }
224 }
225
226 let mut stats = self.stats.write().await;
227 stats.total_failed += 1;
228 stats.total_retries += attempts as u64;
229
230 Err(last_error.unwrap_or(ProofSubmitError::MaxRetriesExceeded))
231 }
232
233 pub async fn start_worker(&self) {
235 let mut running = self.running.write().await;
236 if *running {
237 return;
238 }
239 *running = true;
240 drop(running);
241
242 info!("Starting proof submission worker");
243
244 loop {
245 {
246 let running = self.running.read().await;
247 if !*running {
248 break;
249 }
250 }
251
252 if let Err(e) = self.process_queue().await {
254 error!("Queue processing error: {}", e);
255 }
256
257 tokio::time::sleep(Duration::from_millis(100)).await;
259 }
260
261 info!("Proof submission worker stopped");
262 }
263
264 pub async fn stop_worker(&self) {
266 let mut running = self.running.write().await;
267 *running = false;
268 }
269
270 async fn process_queue(&self) -> Result<(), ProofSubmitError> {
272 let mut queue = self.queue.write().await;
273
274 if queue.is_empty() {
275 return Ok(());
276 }
277
278 let now = Instant::now();
279
280 let ready_idx = queue.iter().position(|p| {
282 p.state == SubmitState::Pending
283 || (p.state == SubmitState::Submitting && p.next_retry.is_none_or(|t| now >= t))
284 });
285
286 if let Some(idx) = ready_idx {
287 let mut queued = queue.remove(idx).unwrap();
288 queued.state = SubmitState::Submitting;
289 queued.last_attempt = Some(now);
290 queued.attempts += 1;
291
292 drop(queue);
294
295 let start = Instant::now();
296 let result = self.do_submit(&queued.proof).await;
297 let submit_time = start.elapsed().as_millis() as f64;
298
299 let mut queue = self.queue.write().await;
300 let mut stats = self.stats.write().await;
301
302 let n = stats.total_submitted + stats.total_failed;
304 stats.avg_submit_time_ms =
305 (stats.avg_submit_time_ms * n as f64 + submit_time) / (n + 1) as f64;
306
307 match result {
308 Ok(res) if res.accepted => {
309 queued.state = SubmitState::Submitted;
310 stats.total_submitted += 1;
311 debug!("Proof submitted successfully: {:?}", res.proof_id);
312 }
314 Ok(res) => {
315 queued.state = SubmitState::Failed;
317 queued.last_error = res.error.clone();
318 stats.total_failed += 1;
319 warn!("Proof rejected by coordinator: {:?}", res.error);
320 }
321 Err(e) if e.is_retryable() && queued.attempts < self.config.max_retries => {
322 let delay = self.calculate_delay(queued.attempts);
324 queued.next_retry = Some(Instant::now() + delay);
325 queued.state = SubmitState::Pending;
326 queued.last_error = Some(e.to_string());
327 stats.total_retries += 1;
328 stats.pending_retry += 1;
329 queue.push_back(queued);
330 warn!("Proof submission failed, scheduled retry in {:?}", delay);
331 }
332 Err(e) => {
333 queued.state = SubmitState::Failed;
335 queued.last_error = Some(e.to_string());
336 stats.total_failed += 1;
337 error!("Proof submission failed permanently: {}", e);
338 }
339 }
340
341 stats.queue_size = queue.len();
342 }
343
344 Ok(())
345 }
346
347 #[must_use]
349 #[inline]
350 fn calculate_delay(&self, attempts: u32) -> Duration {
351 let delay_secs = self.config.initial_delay.as_secs_f64()
352 * self.config.backoff_multiplier.powi(attempts as i32 - 1);
353 Duration::from_secs_f64(delay_secs.min(self.config.max_delay.as_secs_f64()))
354 }
355
356 async fn do_submit(&self, proof: &BandwidthProof) -> Result<SubmitResult, ProofSubmitError> {
358 let url = format!("{}/api/proofs/submit", self.config.coordinator_url);
359
360 let response = self
361 .client
362 .post(&url)
363 .json(proof)
364 .send()
365 .await
366 .map_err(ProofSubmitError::Http)?;
367
368 let status = response.status();
369
370 if status.is_success() {
371 let result: SubmitResult = response.json().await.map_err(ProofSubmitError::Http)?;
372 Ok(result)
373 } else if status.is_server_error() {
374 let error_text = response.text().await.unwrap_or_default();
376 Err(ProofSubmitError::ServerError {
377 status: status.as_u16(),
378 message: error_text,
379 })
380 } else {
381 let error_text = response.text().await.unwrap_or_default();
383 Err(ProofSubmitError::ClientError {
384 status: status.as_u16(),
385 message: error_text,
386 })
387 }
388 }
389
390 #[must_use]
392 #[inline]
393 pub async fn stats(&self) -> SubmitStats {
394 self.stats.read().await.clone()
395 }
396
397 #[must_use]
399 #[inline]
400 pub async fn queue_size(&self) -> usize {
401 self.queue.read().await.len()
402 }
403
404 pub async fn clear_queue(&self) {
406 let mut queue = self.queue.write().await;
407 let count = queue.len();
408 queue.clear();
409
410 let mut stats = self.stats.write().await;
411 stats.queue_size = 0;
412 stats.pending_retry = 0;
413
414 info!("Cleared {} proofs from submission queue", count);
415 }
416
417 #[must_use]
419 pub async fn drain_failed(&self) -> Vec<QueuedProof> {
420 let mut queue = self.queue.write().await;
421 let failed: Vec<_> = queue
422 .iter()
423 .filter(|p| p.state == SubmitState::Failed)
424 .cloned()
425 .collect();
426
427 queue.retain(|p| p.state != SubmitState::Failed);
428
429 let mut stats = self.stats.write().await;
430 stats.queue_size = queue.len();
431
432 failed
433 }
434}
435
436#[derive(Debug)]
438pub enum ProofSubmitError {
439 Http(reqwest::Error),
441 ServerError { status: u16, message: String },
443 ClientError { status: u16, message: String },
445 QueueFull,
447 MaxRetriesExceeded,
449 Serialization(serde_json::Error),
451}
452
453impl ProofSubmitError {
454 #[must_use]
456 #[inline]
457 pub fn is_retryable(&self) -> bool {
458 match self {
459 Self::Http(e) => e.is_timeout() || e.is_connect(),
460 Self::ServerError { .. } => true,
461 Self::ClientError { .. } => false,
462 Self::QueueFull => false,
463 Self::MaxRetriesExceeded => false,
464 Self::Serialization(_) => false,
465 }
466 }
467}
468
469impl std::fmt::Display for ProofSubmitError {
470 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
471 match self {
472 Self::Http(e) => write!(f, "HTTP error: {}", e),
473 Self::ServerError { status, message } => {
474 write!(f, "Server error {}: {}", status, message)
475 }
476 Self::ClientError { status, message } => {
477 write!(f, "Client error {}: {}", status, message)
478 }
479 Self::QueueFull => write!(f, "Submission queue is full"),
480 Self::MaxRetriesExceeded => write!(f, "Maximum retries exceeded"),
481 Self::Serialization(e) => write!(f, "Serialization error: {}", e),
482 }
483 }
484}
485
486impl std::error::Error for ProofSubmitError {
487 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
488 match self {
489 Self::Http(e) => Some(e),
490 Self::Serialization(e) => Some(e),
491 _ => None,
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[test]
501 fn test_config_default() {
502 let config = ProofSubmitConfig::default();
503 assert_eq!(config.max_retries, 5);
504 assert_eq!(config.initial_delay, Duration::from_secs(1));
505 }
506
507 #[test]
508 fn test_delay_calculation() {
509 let config = ProofSubmitConfig::default();
510 let submitter = ProofSubmitter::new(config);
511
512 let delay1 = submitter.calculate_delay(1);
513 let delay2 = submitter.calculate_delay(2);
514 let delay3 = submitter.calculate_delay(3);
515
516 assert_eq!(delay1, Duration::from_secs(1));
517 assert_eq!(delay2, Duration::from_secs(2));
518 assert_eq!(delay3, Duration::from_secs(4));
519 }
520
521 #[test]
522 fn test_error_retryable() {
523 assert!(
524 ProofSubmitError::ServerError {
525 status: 500,
526 message: "Internal error".to_string()
527 }
528 .is_retryable()
529 );
530
531 assert!(
532 !ProofSubmitError::ClientError {
533 status: 400,
534 message: "Bad request".to_string()
535 }
536 .is_retryable()
537 );
538
539 assert!(!ProofSubmitError::QueueFull.is_retryable());
540 }
541
542 #[tokio::test]
543 async fn test_queue_proof() {
544 let config = ProofSubmitConfig::default();
545 let submitter = ProofSubmitter::new(config);
546
547 let proof = BandwidthProof {
549 session_id: uuid::Uuid::new_v4(),
550 content_cid: "QmTest".to_string(),
551 chunk_index: 0,
552 bytes_transferred: 1024,
553 provider_peer_id: "provider".to_string(),
554 requester_peer_id: "requester".to_string(),
555 provider_public_key: vec![0u8; 32],
556 requester_public_key: vec![0u8; 32],
557 provider_signature: vec![0u8; 64],
558 requester_signature: vec![0u8; 64],
559 challenge_nonce: vec![0u8; 32],
560 chunk_hash: vec![0u8; 32],
561 start_timestamp_ms: 0,
562 end_timestamp_ms: 100,
563 latency_ms: 100,
564 };
565
566 submitter.queue_proof(proof).await.unwrap();
567 assert_eq!(submitter.queue_size().await, 1);
568 }
569}