Skip to main content

peat_mesh/storage/
sync_errors.rs

1//! Sync error handling and retry logic
2//!
3//! This module provides production-grade error handling for Automerge document
4//! synchronization with exponential backoff, circuit breaker patterns, and
5//! comprehensive error tracking.
6//!
7//! # FFI-Friendly Design
8//!
9//! All types are designed to be FFI-safe:
10//! - Error codes are represented as `#[repr(C)]` enums
11//! - Statistics use simple numeric types
12//! - No complex Rust-specific types in public API
13//!
14//! # Architecture
15//!
16//! ```text
17//! ┌─────────────────┐
18//! │ SyncCoordinator │
19//! └────────┬────────┘
20//!          │
21//!          ▼
22//! ┌─────────────────┐      ┌──────────────┐
23//! │  ErrorHandler   │─────▶│ RetryPolicy  │
24//! └────────┬────────┘      └──────────────┘
25//!          │
26//!          ▼
27//! ┌─────────────────┐
28//! │ HealthMonitor   │
29//! └─────────────────┘
30//! ```
31
32use iroh::EndpointId;
33use std::collections::HashMap;
34use std::sync::{Arc, RwLock};
35use std::time::{Duration, SystemTime};
36use thiserror::Error;
37
38/// Sync-specific error types
39///
40/// FFI-friendly representation of sync errors with clear categorization.
41#[derive(Error, Debug, Clone, PartialEq, Eq)]
42#[repr(C)]
43pub enum SyncError {
44    /// Network transport error (connection lost, timeout, etc.)
45    #[error("Network error: {0}")]
46    Network(String),
47
48    /// Automerge document error (corrupted document, invalid operation)
49    #[error("Document error: {0}")]
50    Document(String),
51
52    /// Peer not found or disconnected
53    #[error("Peer not found: {0}")]
54    PeerNotFound(String),
55
56    /// Message encoding/decoding error
57    #[error("Protocol error: {0}")]
58    Protocol(String),
59
60    /// Sync state inconsistency
61    #[error("State error: {0}")]
62    State(String),
63
64    /// Resource exhaustion (memory, file descriptors, etc.)
65    #[error("Resource exhaustion: {0}")]
66    ResourceExhaustion(String),
67
68    /// Circuit breaker open (too many failures)
69    #[error("Circuit breaker open for peer")]
70    CircuitBreakerOpen,
71
72    /// Bandwidth quota exhausted for the given QoS class
73    #[error("Bandwidth exhausted for class {0}")]
74    BandwidthExhausted(String),
75}
76
77/// Error severity levels for prioritization
78#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
79#[repr(C)]
80pub enum ErrorSeverity {
81    /// Transient error, safe to retry immediately
82    Transient = 1,
83    /// Recoverable error, needs exponential backoff
84    Recoverable = 2,
85    /// Severe error, may require circuit breaker
86    Severe = 3,
87    /// Fatal error, cannot retry
88    Fatal = 4,
89}
90
91impl SyncError {
92    /// Determine the severity of this error
93    pub fn severity(&self) -> ErrorSeverity {
94        match self {
95            SyncError::Network(_) => ErrorSeverity::Recoverable,
96            SyncError::PeerNotFound(_) => ErrorSeverity::Transient,
97            SyncError::Protocol(_) => ErrorSeverity::Severe,
98            SyncError::Document(_) => ErrorSeverity::Fatal,
99            SyncError::State(_) => ErrorSeverity::Severe,
100            SyncError::ResourceExhaustion(_) => ErrorSeverity::Severe,
101            SyncError::CircuitBreakerOpen => ErrorSeverity::Transient,
102            SyncError::BandwidthExhausted(_) => ErrorSeverity::Transient,
103        }
104    }
105
106    /// Check if this error is retryable
107    pub fn is_retryable(&self) -> bool {
108        matches!(
109            self.severity(),
110            ErrorSeverity::Transient | ErrorSeverity::Recoverable
111        )
112    }
113}
114
115/// Retry policy configuration
116///
117/// Implements exponential backoff with jitter to prevent thundering herd.
118#[derive(Debug, Clone)]
119pub struct RetryPolicy {
120    /// Initial retry delay
121    pub initial_delay: Duration,
122    /// Maximum retry delay (caps exponential growth)
123    pub max_delay: Duration,
124    /// Maximum number of retry attempts
125    pub max_attempts: u32,
126    /// Exponential backoff base (typically 2.0)
127    pub backoff_multiplier: f64,
128    /// Jitter factor (0.0 to 1.0) to randomize delays
129    pub jitter_factor: f64,
130}
131
132impl Default for RetryPolicy {
133    fn default() -> Self {
134        Self {
135            initial_delay: Duration::from_millis(100),
136            max_delay: Duration::from_secs(30),
137            max_attempts: 5,
138            backoff_multiplier: 2.0,
139            jitter_factor: 0.1,
140        }
141    }
142}
143
144impl RetryPolicy {
145    /// Create a new retry policy with custom settings
146    pub fn new(
147        initial_delay: Duration,
148        max_delay: Duration,
149        max_attempts: u32,
150        backoff_multiplier: f64,
151    ) -> Self {
152        Self {
153            initial_delay,
154            max_delay,
155            max_attempts,
156            backoff_multiplier,
157            jitter_factor: 0.1,
158        }
159    }
160
161    /// Create a policy for transient errors (aggressive retries)
162    pub fn transient() -> Self {
163        Self {
164            initial_delay: Duration::from_millis(50),
165            max_delay: Duration::from_secs(5),
166            max_attempts: 10,
167            backoff_multiplier: 1.5,
168            jitter_factor: 0.1,
169        }
170    }
171
172    /// Create a policy for severe errors (conservative retries)
173    pub fn severe() -> Self {
174        Self {
175            initial_delay: Duration::from_secs(1),
176            max_delay: Duration::from_secs(60),
177            max_attempts: 3,
178            backoff_multiplier: 3.0,
179            jitter_factor: 0.2,
180        }
181    }
182
183    /// Calculate delay for a given attempt number
184    ///
185    /// Uses exponential backoff with jitter: delay = base * multiplier^attempt + random_jitter
186    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
187        if attempt == 0 {
188            return Duration::ZERO;
189        }
190
191        // Calculate base delay: initial_delay * (backoff_multiplier ^ attempt)
192        let base_delay_ms = self.initial_delay.as_millis() as f64
193            * self.backoff_multiplier.powi(attempt as i32 - 1);
194
195        // Cap at max_delay
196        let capped_delay_ms = base_delay_ms.min(self.max_delay.as_millis() as f64);
197
198        // Add jitter: random value between 0 and jitter_factor * capped_delay
199        let jitter = if self.jitter_factor > 0.0 {
200            use rand::Rng;
201            let mut rng = rand::rng();
202            rng.random::<f64>() * self.jitter_factor * capped_delay_ms
203        } else {
204            0.0
205        };
206
207        Duration::from_millis((capped_delay_ms + jitter) as u64)
208    }
209
210    /// Check if we should retry based on attempt count
211    pub fn should_retry(&self, attempt: u32) -> bool {
212        attempt < self.max_attempts
213    }
214}
215
216/// Circuit breaker state for a peer
217///
218/// Implements the circuit breaker pattern to prevent cascading failures.
219#[derive(Debug, Clone, PartialEq, Eq)]
220#[repr(C)]
221pub enum CircuitState {
222    /// Circuit closed, operating normally
223    Closed,
224    /// Circuit open, rejecting requests
225    Open,
226    /// Circuit half-open, testing if peer recovered
227    HalfOpen,
228}
229
230/// Circuit breaker configuration
231#[derive(Debug, Clone)]
232pub struct CircuitBreakerConfig {
233    /// Number of failures to trigger circuit open
234    pub failure_threshold: u32,
235    /// Time window for counting failures
236    pub failure_window: Duration,
237    /// How long to keep circuit open before trying half-open
238    pub open_timeout: Duration,
239    /// Number of successful requests to close from half-open
240    pub success_threshold: u32,
241}
242
243impl Default for CircuitBreakerConfig {
244    fn default() -> Self {
245        Self::from_env()
246    }
247}
248
249impl CircuitBreakerConfig {
250    /// Create config from environment variables with sensible defaults.
251    ///
252    /// Environment variables:
253    /// - `CIRCUIT_FAILURE_THRESHOLD`: Number of failures to trigger open (default: 5)
254    /// - `CIRCUIT_FAILURE_WINDOW_SECS`: Time window for counting failures (default: 5)
255    /// - `CIRCUIT_OPEN_TIMEOUT_SECS`: How long circuit stays open (default: 5)
256    /// - `CIRCUIT_SUCCESS_THRESHOLD`: Successes needed to close from half-open (default: 2)
257    ///
258    /// Lab environments (single machine): Use defaults or lower values (1-5s)
259    /// Production environments: Consider higher values (10-30s) for network variability
260    pub fn from_env() -> Self {
261        let failure_threshold = std::env::var("CIRCUIT_FAILURE_THRESHOLD")
262            .ok()
263            .and_then(|v| v.parse().ok())
264            .unwrap_or(5);
265
266        let failure_window_secs = std::env::var("CIRCUIT_FAILURE_WINDOW_SECS")
267            .ok()
268            .and_then(|v| v.parse().ok())
269            .unwrap_or(5);
270
271        let open_timeout_secs = std::env::var("CIRCUIT_OPEN_TIMEOUT_SECS")
272            .ok()
273            .and_then(|v| v.parse().ok())
274            .unwrap_or(5);
275
276        let success_threshold = std::env::var("CIRCUIT_SUCCESS_THRESHOLD")
277            .ok()
278            .and_then(|v| v.parse().ok())
279            .unwrap_or(2);
280
281        Self {
282            failure_threshold,
283            failure_window: Duration::from_secs(failure_window_secs),
284            open_timeout: Duration::from_secs(open_timeout_secs),
285            success_threshold,
286        }
287    }
288}
289
290/// Per-peer error tracking and health monitoring
291#[derive(Debug, Clone)]
292pub struct PeerHealthTracker {
293    /// Current circuit breaker state
294    pub circuit_state: CircuitState,
295    /// Number of consecutive failures
296    pub consecutive_failures: u32,
297    /// Number of consecutive successes (for half-open state)
298    pub consecutive_successes: u32,
299    /// Timestamp of last failure
300    pub last_failure_time: Option<SystemTime>,
301    /// Timestamp when circuit was opened
302    pub circuit_opened_at: Option<SystemTime>,
303    /// Total failure count (all time)
304    pub total_failures: u64,
305    /// Last error encountered
306    pub last_error: Option<SyncError>,
307    /// Current retry attempt
308    pub retry_attempt: u32,
309}
310
311impl Default for PeerHealthTracker {
312    fn default() -> Self {
313        Self {
314            circuit_state: CircuitState::Closed,
315            consecutive_failures: 0,
316            consecutive_successes: 0,
317            last_failure_time: None,
318            circuit_opened_at: None,
319            total_failures: 0,
320            last_error: None,
321            retry_attempt: 0,
322        }
323    }
324}
325
326impl PeerHealthTracker {
327    /// Record a successful sync operation
328    pub fn record_success(&mut self) {
329        self.retry_attempt = 0;
330        self.consecutive_failures = 0;
331        self.consecutive_successes += 1;
332    }
333
334    /// Record a failed sync operation
335    pub fn record_failure(&mut self, error: SyncError) {
336        self.retry_attempt += 1;
337        self.consecutive_failures += 1;
338        self.consecutive_successes = 0;
339        self.total_failures += 1;
340        self.last_failure_time = Some(SystemTime::now());
341        self.last_error = Some(error);
342    }
343
344    /// Reset the tracker to default state
345    pub fn reset(&mut self) {
346        *self = Self::default();
347    }
348}
349
350/// Sync error handler with retry logic and circuit breaker
351///
352/// Provides centralized error handling for all sync operations with:
353/// - Exponential backoff retry
354/// - Circuit breaker per peer
355/// - Health monitoring
356/// - FFI-friendly statistics
357pub struct SyncErrorHandler {
358    /// Retry policy configuration
359    retry_policy: RetryPolicy,
360    /// Circuit breaker configuration
361    circuit_config: CircuitBreakerConfig,
362    /// Per-peer health tracking
363    peer_health: Arc<RwLock<HashMap<EndpointId, PeerHealthTracker>>>,
364}
365
366impl SyncErrorHandler {
367    /// Create a new error handler with default policies
368    pub fn new() -> Self {
369        Self {
370            retry_policy: RetryPolicy::default(),
371            circuit_config: CircuitBreakerConfig::default(),
372            peer_health: Arc::new(RwLock::new(HashMap::new())),
373        }
374    }
375
376    /// Create an error handler with custom policies
377    pub fn with_policies(retry_policy: RetryPolicy, circuit_config: CircuitBreakerConfig) -> Self {
378        Self {
379            retry_policy,
380            circuit_config,
381            peer_health: Arc::new(RwLock::new(HashMap::new())),
382        }
383    }
384
385    /// Handle a sync error for a peer
386    ///
387    /// Returns Ok(Some(delay)) if should retry after delay, Ok(None) if should not retry,
388    /// or Err if circuit breaker is open.
389    pub fn handle_error(
390        &self,
391        peer_id: &EndpointId,
392        error: SyncError,
393    ) -> Result<Option<Duration>, SyncError> {
394        let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
395        let health = health_map.entry(*peer_id).or_default();
396
397        // Record the failure
398        health.record_failure(error.clone());
399
400        // Check circuit breaker state
401        match health.circuit_state {
402            CircuitState::Open => {
403                // Check if we should try half-open
404                if let Some(opened_at) = health.circuit_opened_at {
405                    if SystemTime::now()
406                        .duration_since(opened_at)
407                        .unwrap_or(Duration::ZERO)
408                        >= self.circuit_config.open_timeout
409                    {
410                        tracing::info!(
411                            "Circuit breaker for peer {:?} transitioning to half-open",
412                            peer_id
413                        );
414                        health.circuit_state = CircuitState::HalfOpen;
415                        health.retry_attempt = 0;
416                        return Ok(Some(Duration::ZERO));
417                    }
418                }
419                return Err(SyncError::CircuitBreakerOpen);
420            }
421            CircuitState::HalfOpen => {
422                // In half-open, allow limited retries
423                if health.consecutive_failures >= 1 {
424                    tracing::warn!("Circuit breaker for peer {:?} reopening", peer_id);
425                    health.circuit_state = CircuitState::Open;
426                    health.circuit_opened_at = Some(SystemTime::now());
427                    return Err(SyncError::CircuitBreakerOpen);
428                }
429            }
430            CircuitState::Closed => {
431                // Check if we should open circuit
432                if health.consecutive_failures >= self.circuit_config.failure_threshold {
433                    tracing::warn!(
434                        "Opening circuit breaker for peer {:?} after {} failures",
435                        peer_id,
436                        health.consecutive_failures
437                    );
438                    health.circuit_state = CircuitState::Open;
439                    health.circuit_opened_at = Some(SystemTime::now());
440                    return Err(SyncError::CircuitBreakerOpen);
441                }
442            }
443        }
444
445        // Determine if error is retryable
446        if !error.is_retryable() {
447            tracing::error!("Non-retryable sync error for peer {:?}: {}", peer_id, error);
448            return Ok(None);
449        }
450
451        // Check retry policy
452        let policy = match error.severity() {
453            ErrorSeverity::Transient => RetryPolicy::transient(),
454            ErrorSeverity::Severe => RetryPolicy::severe(),
455            _ => self.retry_policy.clone(),
456        };
457
458        if !policy.should_retry(health.retry_attempt) {
459            tracing::warn!(
460                "Max retry attempts ({}) exceeded for peer {:?}",
461                policy.max_attempts,
462                peer_id
463            );
464            return Ok(None);
465        }
466
467        // Calculate retry delay
468        let delay = policy.delay_for_attempt(health.retry_attempt);
469        tracing::debug!(
470            "Will retry sync with peer {:?} after {:?} (attempt {})",
471            peer_id,
472            delay,
473            health.retry_attempt
474        );
475
476        Ok(Some(delay))
477    }
478
479    /// Record a successful sync operation
480    pub fn record_success(&self, peer_id: &EndpointId) {
481        let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
482        let health = health_map.entry(*peer_id).or_default();
483
484        health.record_success();
485
486        // Handle half-open -> closed transition
487        if health.circuit_state == CircuitState::HalfOpen
488            && health.consecutive_successes >= self.circuit_config.success_threshold
489        {
490            tracing::info!("Closing circuit breaker for peer {:?}", peer_id);
491            health.circuit_state = CircuitState::Closed;
492            health.circuit_opened_at = None;
493        }
494    }
495
496    /// Get health status for a peer
497    pub fn peer_health(&self, peer_id: &EndpointId) -> Option<PeerHealthTracker> {
498        self.peer_health
499            .read()
500            .unwrap_or_else(|e| e.into_inner())
501            .get(peer_id)
502            .cloned()
503    }
504
505    /// Get health status for all peers
506    pub fn all_peer_health(&self) -> HashMap<EndpointId, PeerHealthTracker> {
507        self.peer_health
508            .read()
509            .unwrap_or_else(|e| e.into_inner())
510            .clone()
511    }
512
513    /// Reset health tracking for a peer
514    pub fn reset_peer(&self, peer_id: &EndpointId) {
515        let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
516        if let Some(health) = health_map.get_mut(peer_id) {
517            health.reset();
518        }
519    }
520
521    /// Check if circuit breaker is open for a peer
522    pub fn is_circuit_open(&self, peer_id: &EndpointId) -> bool {
523        self.peer_health
524            .read()
525            .unwrap()
526            .get(peer_id)
527            .map(|h| h.circuit_state == CircuitState::Open)
528            .unwrap_or(false)
529    }
530}
531
532impl Default for SyncErrorHandler {
533    fn default() -> Self {
534        Self::new()
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn test_retry_policy_delay_calculation() {
544        let policy = RetryPolicy {
545            initial_delay: Duration::from_millis(100),
546            max_delay: Duration::from_secs(10),
547            max_attempts: 5,
548            backoff_multiplier: 2.0,
549            jitter_factor: 0.0, // Disable jitter for predictable tests
550        };
551
552        // Attempt 0 should have zero delay
553        assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
554
555        // Attempt 1: 100ms
556        assert_eq!(policy.delay_for_attempt(1), Duration::from_millis(100));
557
558        // Attempt 2: 100ms * 2 = 200ms
559        assert_eq!(policy.delay_for_attempt(2), Duration::from_millis(200));
560
561        // Attempt 3: 100ms * 4 = 400ms
562        assert_eq!(policy.delay_for_attempt(3), Duration::from_millis(400));
563
564        // Should respect max_delay
565        let long_policy = RetryPolicy {
566            initial_delay: Duration::from_secs(1),
567            max_delay: Duration::from_secs(5),
568            max_attempts: 10,
569            backoff_multiplier: 10.0,
570            jitter_factor: 0.0,
571        };
572        let delay = long_policy.delay_for_attempt(5);
573        assert!(delay <= Duration::from_secs(5));
574    }
575
576    #[test]
577    fn test_retry_policy_should_retry() {
578        let policy = RetryPolicy {
579            max_attempts: 3,
580            ..Default::default()
581        };
582
583        assert!(policy.should_retry(0));
584        assert!(policy.should_retry(1));
585        assert!(policy.should_retry(2));
586        assert!(!policy.should_retry(3));
587        assert!(!policy.should_retry(4));
588    }
589
590    #[test]
591    fn test_sync_error_severity() {
592        assert_eq!(
593            SyncError::Network("test".to_string()).severity(),
594            ErrorSeverity::Recoverable
595        );
596        assert_eq!(
597            SyncError::PeerNotFound("test".to_string()).severity(),
598            ErrorSeverity::Transient
599        );
600        assert_eq!(
601            SyncError::Document("test".to_string()).severity(),
602            ErrorSeverity::Fatal
603        );
604        assert_eq!(
605            SyncError::Protocol("test".to_string()).severity(),
606            ErrorSeverity::Severe
607        );
608    }
609
610    #[test]
611    fn test_sync_error_retryable() {
612        assert!(SyncError::Network("test".to_string()).is_retryable());
613        assert!(SyncError::PeerNotFound("test".to_string()).is_retryable());
614        assert!(!SyncError::Document("test".to_string()).is_retryable());
615    }
616
617    #[test]
618    fn test_peer_health_tracker() {
619        let mut tracker = PeerHealthTracker::default();
620
621        // Record success
622        tracker.record_success();
623        assert_eq!(tracker.consecutive_successes, 1);
624        assert_eq!(tracker.consecutive_failures, 0);
625        assert_eq!(tracker.retry_attempt, 0);
626
627        // Record failure
628        tracker.record_failure(SyncError::Network("timeout".to_string()));
629        assert_eq!(tracker.consecutive_successes, 0);
630        assert_eq!(tracker.consecutive_failures, 1);
631        assert_eq!(tracker.retry_attempt, 1);
632        assert_eq!(tracker.total_failures, 1);
633
634        // Record another failure
635        tracker.record_failure(SyncError::Network("timeout".to_string()));
636        assert_eq!(tracker.consecutive_failures, 2);
637        assert_eq!(tracker.retry_attempt, 2);
638        assert_eq!(tracker.total_failures, 2);
639
640        // Reset
641        tracker.reset();
642        assert_eq!(tracker.consecutive_failures, 0);
643        assert_eq!(tracker.total_failures, 0);
644    }
645
646    #[tokio::test]
647    async fn test_circuit_breaker_opens_after_threshold() {
648        let handler = SyncErrorHandler::with_policies(
649            RetryPolicy::default(),
650            CircuitBreakerConfig {
651                failure_threshold: 3,
652                ..Default::default()
653            },
654        );
655
656        // Create a test EndpointId using SecretKey generation
657        use iroh::SecretKey;
658        let mut rng = rand::rng();
659        let peer_id = SecretKey::generate(&mut rng).public();
660
661        // First 2 failures should allow retry (consecutive_failures will be 1, then 2)
662        for i in 0..2 {
663            let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
664            assert!(result.is_ok(), "Attempt {} should succeed", i);
665        }
666
667        // 3rd failure should open circuit (consecutive_failures reaches 3)
668        let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
669        assert!(
670            matches!(result, Err(SyncError::CircuitBreakerOpen)),
671            "Circuit should open on 3rd failure"
672        );
673
674        // Verify circuit is open
675        assert!(handler.is_circuit_open(&peer_id));
676    }
677
678    #[tokio::test]
679    async fn test_circuit_breaker_half_open_transition() {
680        let handler = SyncErrorHandler::with_policies(
681            RetryPolicy::default(),
682            CircuitBreakerConfig {
683                failure_threshold: 2,
684                open_timeout: Duration::from_millis(100),
685                success_threshold: 2,
686                ..Default::default()
687            },
688        );
689
690        // Create a test EndpointId using SecretKey generation
691        use iroh::SecretKey;
692        let mut rng = rand::rng();
693        let peer_id = SecretKey::generate(&mut rng).public();
694
695        // Trigger circuit open
696        handler
697            .handle_error(&peer_id, SyncError::Network("test".to_string()))
698            .ok();
699        handler
700            .handle_error(&peer_id, SyncError::Network("test".to_string()))
701            .ok();
702        handler
703            .handle_error(&peer_id, SyncError::Network("test".to_string()))
704            .ok();
705
706        assert!(handler.is_circuit_open(&peer_id));
707
708        // Wait for open timeout
709        tokio::time::sleep(Duration::from_millis(150)).await;
710
711        // Next error should transition to half-open
712        let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
713        assert!(result.is_ok());
714
715        let health = handler.peer_health(&peer_id).unwrap();
716        assert_eq!(health.circuit_state, CircuitState::HalfOpen);
717
718        // Record successes to close circuit
719        handler.record_success(&peer_id);
720        handler.record_success(&peer_id);
721
722        let health = handler.peer_health(&peer_id).unwrap();
723        assert_eq!(health.circuit_state, CircuitState::Closed);
724    }
725}