chie_core/
proof_submit.rs

1//! Proof submission with retry logic.
2//!
3//! Handles submitting bandwidth proofs to the coordinator with
4//! automatic retry on transient failures.
5
6use chie_shared::BandwidthProof;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12use tracing::{debug, error, info, warn};
13
14/// Configuration for proof submission.
15#[derive(Debug, Clone)]
16pub struct ProofSubmitConfig {
17    /// Coordinator URL.
18    pub coordinator_url: String,
19    /// Maximum retry attempts.
20    pub max_retries: u32,
21    /// Initial retry delay.
22    pub initial_delay: Duration,
23    /// Maximum retry delay.
24    pub max_delay: Duration,
25    /// Retry backoff multiplier.
26    pub backoff_multiplier: f64,
27    /// Request timeout.
28    pub timeout: Duration,
29    /// Maximum queue size for pending submissions.
30    pub max_queue_size: usize,
31    /// Whether to persist queue to disk.
32    pub persist_queue: bool,
33}
34
35impl Default for ProofSubmitConfig {
36    fn default() -> Self {
37        Self {
38            coordinator_url: "http://localhost:3000".to_string(),
39            max_retries: 5,
40            initial_delay: Duration::from_secs(1),
41            max_delay: Duration::from_secs(60),
42            backoff_multiplier: 2.0,
43            timeout: Duration::from_secs(30),
44            max_queue_size: 1000,
45            persist_queue: true,
46        }
47    }
48}
49
50/// Submission result from coordinator.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SubmitResult {
53    /// Whether submission was accepted.
54    pub accepted: bool,
55    /// Proof ID assigned by coordinator.
56    pub proof_id: Option<uuid::Uuid>,
57    /// Reward amount if calculated.
58    pub reward: Option<u64>,
59    /// Error message if rejected.
60    pub error: Option<String>,
61}
62
63/// Submission state.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum SubmitState {
66    /// Waiting in queue.
67    Pending,
68    /// Currently being submitted.
69    Submitting,
70    /// Successfully submitted.
71    Submitted,
72    /// Failed after all retries.
73    Failed,
74}
75
76/// A queued proof submission.
77#[derive(Debug, Clone)]
78pub struct QueuedProof {
79    /// The proof to submit.
80    pub proof: BandwidthProof,
81    /// Current state.
82    pub state: SubmitState,
83    /// Number of attempts made.
84    pub attempts: u32,
85    /// Last attempt time.
86    pub last_attempt: Option<Instant>,
87    /// Last error message.
88    pub last_error: Option<String>,
89    /// Next retry time.
90    pub next_retry: Option<Instant>,
91    /// When the proof was queued.
92    pub queued_at: Instant,
93}
94
95impl QueuedProof {
96    fn new(proof: BandwidthProof) -> Self {
97        Self {
98            proof,
99            state: SubmitState::Pending,
100            attempts: 0,
101            last_attempt: None,
102            last_error: None,
103            next_retry: None,
104            queued_at: Instant::now(),
105        }
106    }
107}
108
109/// Submission statistics.
110#[derive(Debug, Clone, Default)]
111pub struct SubmitStats {
112    /// Total proofs submitted successfully.
113    pub total_submitted: u64,
114    /// Total proofs failed.
115    pub total_failed: u64,
116    /// Total retries performed.
117    pub total_retries: u64,
118    /// Average submission time (ms).
119    pub avg_submit_time_ms: f64,
120    /// Current queue size.
121    pub queue_size: usize,
122    /// Proofs pending retry.
123    pub pending_retry: usize,
124}
125
126/// Proof submitter with retry logic.
127pub struct ProofSubmitter {
128    config: ProofSubmitConfig,
129    client: reqwest::Client,
130    queue: Arc<RwLock<VecDeque<QueuedProof>>>,
131    stats: Arc<RwLock<SubmitStats>>,
132    running: Arc<RwLock<bool>>,
133}
134
135impl ProofSubmitter {
136    /// Create a new proof submitter.
137    #[must_use]
138    #[inline]
139    pub fn new(config: ProofSubmitConfig) -> Self {
140        let client = reqwest::Client::builder()
141            .timeout(config.timeout)
142            .build()
143            .expect("Failed to create HTTP client");
144
145        Self {
146            config,
147            client,
148            queue: Arc::new(RwLock::new(VecDeque::new())),
149            stats: Arc::new(RwLock::new(SubmitStats::default())),
150            running: Arc::new(RwLock::new(false)),
151        }
152    }
153
154    /// Queue a proof for submission.
155    pub async fn queue_proof(&self, proof: BandwidthProof) -> Result<(), ProofSubmitError> {
156        let mut queue = self.queue.write().await;
157
158        if queue.len() >= self.config.max_queue_size {
159            return Err(ProofSubmitError::QueueFull);
160        }
161
162        queue.push_back(QueuedProof::new(proof));
163
164        let mut stats = self.stats.write().await;
165        stats.queue_size = queue.len();
166
167        debug!("Queued proof for submission, queue size: {}", queue.len());
168        Ok(())
169    }
170
171    /// Submit a proof immediately (bypassing queue).
172    pub async fn submit_now(
173        &self,
174        proof: &BandwidthProof,
175    ) -> Result<SubmitResult, ProofSubmitError> {
176        self.do_submit(proof).await
177    }
178
179    /// Submit a proof with retry logic.
180    pub async fn submit_with_retry(
181        &self,
182        proof: BandwidthProof,
183    ) -> Result<SubmitResult, ProofSubmitError> {
184        let mut attempts = 0;
185        let mut delay = self.config.initial_delay;
186        let mut last_error = None;
187
188        while attempts < self.config.max_retries {
189            attempts += 1;
190
191            match self.do_submit(&proof).await {
192                Ok(result) => {
193                    if result.accepted {
194                        let mut stats = self.stats.write().await;
195                        stats.total_submitted += 1;
196                        if attempts > 1 {
197                            stats.total_retries += attempts as u64 - 1;
198                        }
199                        return Ok(result);
200                    } else {
201                        // Rejected by coordinator - don't retry
202                        return Ok(result);
203                    }
204                }
205                Err(e) if e.is_retryable() => {
206                    last_error = Some(e);
207                    warn!(
208                        "Proof submission attempt {} failed, retrying in {:?}",
209                        attempts, delay
210                    );
211                    tokio::time::sleep(delay).await;
212
213                    // Exponential backoff
214                    delay = Duration::from_secs_f64(
215                        (delay.as_secs_f64() * self.config.backoff_multiplier)
216                            .min(self.config.max_delay.as_secs_f64()),
217                    );
218                }
219                Err(e) => {
220                    // Non-retryable error
221                    return Err(e);
222                }
223            }
224        }
225
226        let mut stats = self.stats.write().await;
227        stats.total_failed += 1;
228        stats.total_retries += attempts as u64;
229
230        Err(last_error.unwrap_or(ProofSubmitError::MaxRetriesExceeded))
231    }
232
233    /// Start the background submission worker.
234    pub async fn start_worker(&self) {
235        let mut running = self.running.write().await;
236        if *running {
237            return;
238        }
239        *running = true;
240        drop(running);
241
242        info!("Starting proof submission worker");
243
244        loop {
245            {
246                let running = self.running.read().await;
247                if !*running {
248                    break;
249                }
250            }
251
252            // Process the queue
253            if let Err(e) = self.process_queue().await {
254                error!("Queue processing error: {}", e);
255            }
256
257            // Small delay between processing cycles
258            tokio::time::sleep(Duration::from_millis(100)).await;
259        }
260
261        info!("Proof submission worker stopped");
262    }
263
264    /// Stop the background worker.
265    pub async fn stop_worker(&self) {
266        let mut running = self.running.write().await;
267        *running = false;
268    }
269
270    /// Process queued proofs.
271    async fn process_queue(&self) -> Result<(), ProofSubmitError> {
272        let mut queue = self.queue.write().await;
273
274        if queue.is_empty() {
275            return Ok(());
276        }
277
278        let now = Instant::now();
279
280        // Find next proof ready for processing
281        let ready_idx = queue.iter().position(|p| {
282            p.state == SubmitState::Pending
283                || (p.state == SubmitState::Submitting && p.next_retry.is_none_or(|t| now >= t))
284        });
285
286        if let Some(idx) = ready_idx {
287            let mut queued = queue.remove(idx).unwrap();
288            queued.state = SubmitState::Submitting;
289            queued.last_attempt = Some(now);
290            queued.attempts += 1;
291
292            // Release lock while submitting
293            drop(queue);
294
295            let start = Instant::now();
296            let result = self.do_submit(&queued.proof).await;
297            let submit_time = start.elapsed().as_millis() as f64;
298
299            let mut queue = self.queue.write().await;
300            let mut stats = self.stats.write().await;
301
302            // Update average submit time
303            let n = stats.total_submitted + stats.total_failed;
304            stats.avg_submit_time_ms =
305                (stats.avg_submit_time_ms * n as f64 + submit_time) / (n + 1) as f64;
306
307            match result {
308                Ok(res) if res.accepted => {
309                    queued.state = SubmitState::Submitted;
310                    stats.total_submitted += 1;
311                    debug!("Proof submitted successfully: {:?}", res.proof_id);
312                    // Don't re-add to queue - it's done
313                }
314                Ok(res) => {
315                    // Rejected - don't retry
316                    queued.state = SubmitState::Failed;
317                    queued.last_error = res.error.clone();
318                    stats.total_failed += 1;
319                    warn!("Proof rejected by coordinator: {:?}", res.error);
320                }
321                Err(e) if e.is_retryable() && queued.attempts < self.config.max_retries => {
322                    // Retryable error - schedule retry
323                    let delay = self.calculate_delay(queued.attempts);
324                    queued.next_retry = Some(Instant::now() + delay);
325                    queued.state = SubmitState::Pending;
326                    queued.last_error = Some(e.to_string());
327                    stats.total_retries += 1;
328                    stats.pending_retry += 1;
329                    queue.push_back(queued);
330                    warn!("Proof submission failed, scheduled retry in {:?}", delay);
331                }
332                Err(e) => {
333                    // Non-retryable or max retries exceeded
334                    queued.state = SubmitState::Failed;
335                    queued.last_error = Some(e.to_string());
336                    stats.total_failed += 1;
337                    error!("Proof submission failed permanently: {}", e);
338                }
339            }
340
341            stats.queue_size = queue.len();
342        }
343
344        Ok(())
345    }
346
347    /// Calculate delay for retry attempt.
348    #[must_use]
349    #[inline]
350    fn calculate_delay(&self, attempts: u32) -> Duration {
351        let delay_secs = self.config.initial_delay.as_secs_f64()
352            * self.config.backoff_multiplier.powi(attempts as i32 - 1);
353        Duration::from_secs_f64(delay_secs.min(self.config.max_delay.as_secs_f64()))
354    }
355
356    /// Perform the actual HTTP submission.
357    async fn do_submit(&self, proof: &BandwidthProof) -> Result<SubmitResult, ProofSubmitError> {
358        let url = format!("{}/api/proofs/submit", self.config.coordinator_url);
359
360        let response = self
361            .client
362            .post(&url)
363            .json(proof)
364            .send()
365            .await
366            .map_err(ProofSubmitError::Http)?;
367
368        let status = response.status();
369
370        if status.is_success() {
371            let result: SubmitResult = response.json().await.map_err(ProofSubmitError::Http)?;
372            Ok(result)
373        } else if status.is_server_error() {
374            // Server error - retryable
375            let error_text = response.text().await.unwrap_or_default();
376            Err(ProofSubmitError::ServerError {
377                status: status.as_u16(),
378                message: error_text,
379            })
380        } else {
381            // Client error - not retryable
382            let error_text = response.text().await.unwrap_or_default();
383            Err(ProofSubmitError::ClientError {
384                status: status.as_u16(),
385                message: error_text,
386            })
387        }
388    }
389
390    /// Get current statistics.
391    #[must_use]
392    #[inline]
393    pub async fn stats(&self) -> SubmitStats {
394        self.stats.read().await.clone()
395    }
396
397    /// Get queue size.
398    #[must_use]
399    #[inline]
400    pub async fn queue_size(&self) -> usize {
401        self.queue.read().await.len()
402    }
403
404    /// Clear the queue (drops all pending proofs).
405    pub async fn clear_queue(&self) {
406        let mut queue = self.queue.write().await;
407        let count = queue.len();
408        queue.clear();
409
410        let mut stats = self.stats.write().await;
411        stats.queue_size = 0;
412        stats.pending_retry = 0;
413
414        info!("Cleared {} proofs from submission queue", count);
415    }
416
417    /// Drain and return all failed proofs.
418    #[must_use]
419    pub async fn drain_failed(&self) -> Vec<QueuedProof> {
420        let mut queue = self.queue.write().await;
421        let failed: Vec<_> = queue
422            .iter()
423            .filter(|p| p.state == SubmitState::Failed)
424            .cloned()
425            .collect();
426
427        queue.retain(|p| p.state != SubmitState::Failed);
428
429        let mut stats = self.stats.write().await;
430        stats.queue_size = queue.len();
431
432        failed
433    }
434}
435
436/// Proof submission error.
437#[derive(Debug)]
438pub enum ProofSubmitError {
439    /// HTTP request failed.
440    Http(reqwest::Error),
441    /// Server error (5xx).
442    ServerError { status: u16, message: String },
443    /// Client error (4xx).
444    ClientError { status: u16, message: String },
445    /// Queue is full.
446    QueueFull,
447    /// Maximum retries exceeded.
448    MaxRetriesExceeded,
449    /// Serialization error.
450    Serialization(serde_json::Error),
451}
452
453impl ProofSubmitError {
454    /// Check if this error is retryable.
455    #[must_use]
456    #[inline]
457    pub fn is_retryable(&self) -> bool {
458        match self {
459            Self::Http(e) => e.is_timeout() || e.is_connect(),
460            Self::ServerError { .. } => true,
461            Self::ClientError { .. } => false,
462            Self::QueueFull => false,
463            Self::MaxRetriesExceeded => false,
464            Self::Serialization(_) => false,
465        }
466    }
467}
468
469impl std::fmt::Display for ProofSubmitError {
470    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
471        match self {
472            Self::Http(e) => write!(f, "HTTP error: {}", e),
473            Self::ServerError { status, message } => {
474                write!(f, "Server error {}: {}", status, message)
475            }
476            Self::ClientError { status, message } => {
477                write!(f, "Client error {}: {}", status, message)
478            }
479            Self::QueueFull => write!(f, "Submission queue is full"),
480            Self::MaxRetriesExceeded => write!(f, "Maximum retries exceeded"),
481            Self::Serialization(e) => write!(f, "Serialization error: {}", e),
482        }
483    }
484}
485
486impl std::error::Error for ProofSubmitError {
487    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
488        match self {
489            Self::Http(e) => Some(e),
490            Self::Serialization(e) => Some(e),
491            _ => None,
492        }
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    #[test]
501    fn test_config_default() {
502        let config = ProofSubmitConfig::default();
503        assert_eq!(config.max_retries, 5);
504        assert_eq!(config.initial_delay, Duration::from_secs(1));
505    }
506
507    #[test]
508    fn test_delay_calculation() {
509        let config = ProofSubmitConfig::default();
510        let submitter = ProofSubmitter::new(config);
511
512        let delay1 = submitter.calculate_delay(1);
513        let delay2 = submitter.calculate_delay(2);
514        let delay3 = submitter.calculate_delay(3);
515
516        assert_eq!(delay1, Duration::from_secs(1));
517        assert_eq!(delay2, Duration::from_secs(2));
518        assert_eq!(delay3, Duration::from_secs(4));
519    }
520
521    #[test]
522    fn test_error_retryable() {
523        assert!(
524            ProofSubmitError::ServerError {
525                status: 500,
526                message: "Internal error".to_string()
527            }
528            .is_retryable()
529        );
530
531        assert!(
532            !ProofSubmitError::ClientError {
533                status: 400,
534                message: "Bad request".to_string()
535            }
536            .is_retryable()
537        );
538
539        assert!(!ProofSubmitError::QueueFull.is_retryable());
540    }
541
542    #[tokio::test]
543    async fn test_queue_proof() {
544        let config = ProofSubmitConfig::default();
545        let submitter = ProofSubmitter::new(config);
546
547        // Create a dummy proof
548        let proof = BandwidthProof {
549            session_id: uuid::Uuid::new_v4(),
550            content_cid: "QmTest".to_string(),
551            chunk_index: 0,
552            bytes_transferred: 1024,
553            provider_peer_id: "provider".to_string(),
554            requester_peer_id: "requester".to_string(),
555            provider_public_key: vec![0u8; 32],
556            requester_public_key: vec![0u8; 32],
557            provider_signature: vec![0u8; 64],
558            requester_signature: vec![0u8; 64],
559            challenge_nonce: vec![0u8; 32],
560            chunk_hash: vec![0u8; 32],
561            start_timestamp_ms: 0,
562            end_timestamp_ms: 100,
563            latency_ms: 100,
564        };
565
566        submitter.queue_proof(proof).await.unwrap();
567        assert_eq!(submitter.queue_size().await, 1);
568    }
569}