1use iroh::EndpointId;
33use std::collections::HashMap;
34use std::sync::{Arc, RwLock};
35use std::time::{Duration, SystemTime};
36use thiserror::Error;
37
38#[derive(Error, Debug, Clone, PartialEq, Eq)]
42#[repr(C)]
43pub enum SyncError {
44 #[error("Network error: {0}")]
46 Network(String),
47
48 #[error("Document error: {0}")]
50 Document(String),
51
52 #[error("Peer not found: {0}")]
54 PeerNotFound(String),
55
56 #[error("Protocol error: {0}")]
58 Protocol(String),
59
60 #[error("State error: {0}")]
62 State(String),
63
64 #[error("Resource exhaustion: {0}")]
66 ResourceExhaustion(String),
67
68 #[error("Circuit breaker open for peer")]
70 CircuitBreakerOpen,
71
72 #[error("Bandwidth exhausted for class {0}")]
74 BandwidthExhausted(String),
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
79#[repr(C)]
80pub enum ErrorSeverity {
81 Transient = 1,
83 Recoverable = 2,
85 Severe = 3,
87 Fatal = 4,
89}
90
91impl SyncError {
92 pub fn severity(&self) -> ErrorSeverity {
94 match self {
95 SyncError::Network(_) => ErrorSeverity::Recoverable,
96 SyncError::PeerNotFound(_) => ErrorSeverity::Transient,
97 SyncError::Protocol(_) => ErrorSeverity::Severe,
98 SyncError::Document(_) => ErrorSeverity::Fatal,
99 SyncError::State(_) => ErrorSeverity::Severe,
100 SyncError::ResourceExhaustion(_) => ErrorSeverity::Severe,
101 SyncError::CircuitBreakerOpen => ErrorSeverity::Transient,
102 SyncError::BandwidthExhausted(_) => ErrorSeverity::Transient,
103 }
104 }
105
106 pub fn is_retryable(&self) -> bool {
108 matches!(
109 self.severity(),
110 ErrorSeverity::Transient | ErrorSeverity::Recoverable
111 )
112 }
113}
114
115#[derive(Debug, Clone)]
119pub struct RetryPolicy {
120 pub initial_delay: Duration,
122 pub max_delay: Duration,
124 pub max_attempts: u32,
126 pub backoff_multiplier: f64,
128 pub jitter_factor: f64,
130}
131
132impl Default for RetryPolicy {
133 fn default() -> Self {
134 Self {
135 initial_delay: Duration::from_millis(100),
136 max_delay: Duration::from_secs(30),
137 max_attempts: 5,
138 backoff_multiplier: 2.0,
139 jitter_factor: 0.1,
140 }
141 }
142}
143
144impl RetryPolicy {
145 pub fn new(
147 initial_delay: Duration,
148 max_delay: Duration,
149 max_attempts: u32,
150 backoff_multiplier: f64,
151 ) -> Self {
152 Self {
153 initial_delay,
154 max_delay,
155 max_attempts,
156 backoff_multiplier,
157 jitter_factor: 0.1,
158 }
159 }
160
161 pub fn transient() -> Self {
163 Self {
164 initial_delay: Duration::from_millis(50),
165 max_delay: Duration::from_secs(5),
166 max_attempts: 10,
167 backoff_multiplier: 1.5,
168 jitter_factor: 0.1,
169 }
170 }
171
172 pub fn severe() -> Self {
174 Self {
175 initial_delay: Duration::from_secs(1),
176 max_delay: Duration::from_secs(60),
177 max_attempts: 3,
178 backoff_multiplier: 3.0,
179 jitter_factor: 0.2,
180 }
181 }
182
183 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
187 if attempt == 0 {
188 return Duration::ZERO;
189 }
190
191 let base_delay_ms = self.initial_delay.as_millis() as f64
193 * self.backoff_multiplier.powi(attempt as i32 - 1);
194
195 let capped_delay_ms = base_delay_ms.min(self.max_delay.as_millis() as f64);
197
198 let jitter = if self.jitter_factor > 0.0 {
200 use rand::Rng;
201 let mut rng = rand::rng();
202 rng.random::<f64>() * self.jitter_factor * capped_delay_ms
203 } else {
204 0.0
205 };
206
207 Duration::from_millis((capped_delay_ms + jitter) as u64)
208 }
209
210 pub fn should_retry(&self, attempt: u32) -> bool {
212 attempt < self.max_attempts
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
220#[repr(C)]
221pub enum CircuitState {
222 Closed,
224 Open,
226 HalfOpen,
228}
229
230#[derive(Debug, Clone)]
232pub struct CircuitBreakerConfig {
233 pub failure_threshold: u32,
235 pub failure_window: Duration,
237 pub open_timeout: Duration,
239 pub success_threshold: u32,
241}
242
243impl Default for CircuitBreakerConfig {
244 fn default() -> Self {
245 Self::from_env()
246 }
247}
248
249impl CircuitBreakerConfig {
250 pub fn from_env() -> Self {
261 let failure_threshold = std::env::var("CIRCUIT_FAILURE_THRESHOLD")
262 .ok()
263 .and_then(|v| v.parse().ok())
264 .unwrap_or(5);
265
266 let failure_window_secs = std::env::var("CIRCUIT_FAILURE_WINDOW_SECS")
267 .ok()
268 .and_then(|v| v.parse().ok())
269 .unwrap_or(5);
270
271 let open_timeout_secs = std::env::var("CIRCUIT_OPEN_TIMEOUT_SECS")
272 .ok()
273 .and_then(|v| v.parse().ok())
274 .unwrap_or(5);
275
276 let success_threshold = std::env::var("CIRCUIT_SUCCESS_THRESHOLD")
277 .ok()
278 .and_then(|v| v.parse().ok())
279 .unwrap_or(2);
280
281 Self {
282 failure_threshold,
283 failure_window: Duration::from_secs(failure_window_secs),
284 open_timeout: Duration::from_secs(open_timeout_secs),
285 success_threshold,
286 }
287 }
288}
289
290#[derive(Debug, Clone)]
292pub struct PeerHealthTracker {
293 pub circuit_state: CircuitState,
295 pub consecutive_failures: u32,
297 pub consecutive_successes: u32,
299 pub last_failure_time: Option<SystemTime>,
301 pub circuit_opened_at: Option<SystemTime>,
303 pub total_failures: u64,
305 pub last_error: Option<SyncError>,
307 pub retry_attempt: u32,
309}
310
311impl Default for PeerHealthTracker {
312 fn default() -> Self {
313 Self {
314 circuit_state: CircuitState::Closed,
315 consecutive_failures: 0,
316 consecutive_successes: 0,
317 last_failure_time: None,
318 circuit_opened_at: None,
319 total_failures: 0,
320 last_error: None,
321 retry_attempt: 0,
322 }
323 }
324}
325
326impl PeerHealthTracker {
327 pub fn record_success(&mut self) {
329 self.retry_attempt = 0;
330 self.consecutive_failures = 0;
331 self.consecutive_successes += 1;
332 }
333
334 pub fn record_failure(&mut self, error: SyncError) {
336 self.retry_attempt += 1;
337 self.consecutive_failures += 1;
338 self.consecutive_successes = 0;
339 self.total_failures += 1;
340 self.last_failure_time = Some(SystemTime::now());
341 self.last_error = Some(error);
342 }
343
344 pub fn reset(&mut self) {
346 *self = Self::default();
347 }
348}
349
350pub struct SyncErrorHandler {
358 retry_policy: RetryPolicy,
360 circuit_config: CircuitBreakerConfig,
362 peer_health: Arc<RwLock<HashMap<EndpointId, PeerHealthTracker>>>,
364}
365
366impl SyncErrorHandler {
367 pub fn new() -> Self {
369 Self {
370 retry_policy: RetryPolicy::default(),
371 circuit_config: CircuitBreakerConfig::default(),
372 peer_health: Arc::new(RwLock::new(HashMap::new())),
373 }
374 }
375
376 pub fn with_policies(retry_policy: RetryPolicy, circuit_config: CircuitBreakerConfig) -> Self {
378 Self {
379 retry_policy,
380 circuit_config,
381 peer_health: Arc::new(RwLock::new(HashMap::new())),
382 }
383 }
384
385 pub fn handle_error(
390 &self,
391 peer_id: &EndpointId,
392 error: SyncError,
393 ) -> Result<Option<Duration>, SyncError> {
394 let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
395 let health = health_map.entry(*peer_id).or_default();
396
397 health.record_failure(error.clone());
399
400 match health.circuit_state {
402 CircuitState::Open => {
403 if let Some(opened_at) = health.circuit_opened_at {
405 if SystemTime::now()
406 .duration_since(opened_at)
407 .unwrap_or(Duration::ZERO)
408 >= self.circuit_config.open_timeout
409 {
410 tracing::info!(
411 "Circuit breaker for peer {:?} transitioning to half-open",
412 peer_id
413 );
414 health.circuit_state = CircuitState::HalfOpen;
415 health.retry_attempt = 0;
416 return Ok(Some(Duration::ZERO));
417 }
418 }
419 return Err(SyncError::CircuitBreakerOpen);
420 }
421 CircuitState::HalfOpen => {
422 if health.consecutive_failures >= 1 {
424 tracing::warn!("Circuit breaker for peer {:?} reopening", peer_id);
425 health.circuit_state = CircuitState::Open;
426 health.circuit_opened_at = Some(SystemTime::now());
427 return Err(SyncError::CircuitBreakerOpen);
428 }
429 }
430 CircuitState::Closed => {
431 if health.consecutive_failures >= self.circuit_config.failure_threshold {
433 tracing::warn!(
434 "Opening circuit breaker for peer {:?} after {} failures",
435 peer_id,
436 health.consecutive_failures
437 );
438 health.circuit_state = CircuitState::Open;
439 health.circuit_opened_at = Some(SystemTime::now());
440 return Err(SyncError::CircuitBreakerOpen);
441 }
442 }
443 }
444
445 if !error.is_retryable() {
447 tracing::error!("Non-retryable sync error for peer {:?}: {}", peer_id, error);
448 return Ok(None);
449 }
450
451 let policy = match error.severity() {
453 ErrorSeverity::Transient => RetryPolicy::transient(),
454 ErrorSeverity::Severe => RetryPolicy::severe(),
455 _ => self.retry_policy.clone(),
456 };
457
458 if !policy.should_retry(health.retry_attempt) {
459 tracing::warn!(
460 "Max retry attempts ({}) exceeded for peer {:?}",
461 policy.max_attempts,
462 peer_id
463 );
464 return Ok(None);
465 }
466
467 let delay = policy.delay_for_attempt(health.retry_attempt);
469 tracing::debug!(
470 "Will retry sync with peer {:?} after {:?} (attempt {})",
471 peer_id,
472 delay,
473 health.retry_attempt
474 );
475
476 Ok(Some(delay))
477 }
478
479 pub fn record_success(&self, peer_id: &EndpointId) {
481 let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
482 let health = health_map.entry(*peer_id).or_default();
483
484 health.record_success();
485
486 if health.circuit_state == CircuitState::HalfOpen
488 && health.consecutive_successes >= self.circuit_config.success_threshold
489 {
490 tracing::info!("Closing circuit breaker for peer {:?}", peer_id);
491 health.circuit_state = CircuitState::Closed;
492 health.circuit_opened_at = None;
493 }
494 }
495
496 pub fn peer_health(&self, peer_id: &EndpointId) -> Option<PeerHealthTracker> {
498 self.peer_health
499 .read()
500 .unwrap_or_else(|e| e.into_inner())
501 .get(peer_id)
502 .cloned()
503 }
504
505 pub fn all_peer_health(&self) -> HashMap<EndpointId, PeerHealthTracker> {
507 self.peer_health
508 .read()
509 .unwrap_or_else(|e| e.into_inner())
510 .clone()
511 }
512
513 pub fn reset_peer(&self, peer_id: &EndpointId) {
515 let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
516 if let Some(health) = health_map.get_mut(peer_id) {
517 health.reset();
518 }
519 }
520
521 pub fn is_circuit_open(&self, peer_id: &EndpointId) -> bool {
523 self.peer_health
524 .read()
525 .unwrap()
526 .get(peer_id)
527 .map(|h| h.circuit_state == CircuitState::Open)
528 .unwrap_or(false)
529 }
530}
531
532impl Default for SyncErrorHandler {
533 fn default() -> Self {
534 Self::new()
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 #[test]
543 fn test_retry_policy_delay_calculation() {
544 let policy = RetryPolicy {
545 initial_delay: Duration::from_millis(100),
546 max_delay: Duration::from_secs(10),
547 max_attempts: 5,
548 backoff_multiplier: 2.0,
549 jitter_factor: 0.0, };
551
552 assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
554
555 assert_eq!(policy.delay_for_attempt(1), Duration::from_millis(100));
557
558 assert_eq!(policy.delay_for_attempt(2), Duration::from_millis(200));
560
561 assert_eq!(policy.delay_for_attempt(3), Duration::from_millis(400));
563
564 let long_policy = RetryPolicy {
566 initial_delay: Duration::from_secs(1),
567 max_delay: Duration::from_secs(5),
568 max_attempts: 10,
569 backoff_multiplier: 10.0,
570 jitter_factor: 0.0,
571 };
572 let delay = long_policy.delay_for_attempt(5);
573 assert!(delay <= Duration::from_secs(5));
574 }
575
576 #[test]
577 fn test_retry_policy_should_retry() {
578 let policy = RetryPolicy {
579 max_attempts: 3,
580 ..Default::default()
581 };
582
583 assert!(policy.should_retry(0));
584 assert!(policy.should_retry(1));
585 assert!(policy.should_retry(2));
586 assert!(!policy.should_retry(3));
587 assert!(!policy.should_retry(4));
588 }
589
590 #[test]
591 fn test_sync_error_severity() {
592 assert_eq!(
593 SyncError::Network("test".to_string()).severity(),
594 ErrorSeverity::Recoverable
595 );
596 assert_eq!(
597 SyncError::PeerNotFound("test".to_string()).severity(),
598 ErrorSeverity::Transient
599 );
600 assert_eq!(
601 SyncError::Document("test".to_string()).severity(),
602 ErrorSeverity::Fatal
603 );
604 assert_eq!(
605 SyncError::Protocol("test".to_string()).severity(),
606 ErrorSeverity::Severe
607 );
608 }
609
610 #[test]
611 fn test_sync_error_retryable() {
612 assert!(SyncError::Network("test".to_string()).is_retryable());
613 assert!(SyncError::PeerNotFound("test".to_string()).is_retryable());
614 assert!(!SyncError::Document("test".to_string()).is_retryable());
615 }
616
617 #[test]
618 fn test_peer_health_tracker() {
619 let mut tracker = PeerHealthTracker::default();
620
621 tracker.record_success();
623 assert_eq!(tracker.consecutive_successes, 1);
624 assert_eq!(tracker.consecutive_failures, 0);
625 assert_eq!(tracker.retry_attempt, 0);
626
627 tracker.record_failure(SyncError::Network("timeout".to_string()));
629 assert_eq!(tracker.consecutive_successes, 0);
630 assert_eq!(tracker.consecutive_failures, 1);
631 assert_eq!(tracker.retry_attempt, 1);
632 assert_eq!(tracker.total_failures, 1);
633
634 tracker.record_failure(SyncError::Network("timeout".to_string()));
636 assert_eq!(tracker.consecutive_failures, 2);
637 assert_eq!(tracker.retry_attempt, 2);
638 assert_eq!(tracker.total_failures, 2);
639
640 tracker.reset();
642 assert_eq!(tracker.consecutive_failures, 0);
643 assert_eq!(tracker.total_failures, 0);
644 }
645
646 #[tokio::test]
647 async fn test_circuit_breaker_opens_after_threshold() {
648 let handler = SyncErrorHandler::with_policies(
649 RetryPolicy::default(),
650 CircuitBreakerConfig {
651 failure_threshold: 3,
652 ..Default::default()
653 },
654 );
655
656 use iroh::SecretKey;
658 let mut rng = rand::rng();
659 let peer_id = SecretKey::generate(&mut rng).public();
660
661 for i in 0..2 {
663 let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
664 assert!(result.is_ok(), "Attempt {} should succeed", i);
665 }
666
667 let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
669 assert!(
670 matches!(result, Err(SyncError::CircuitBreakerOpen)),
671 "Circuit should open on 3rd failure"
672 );
673
674 assert!(handler.is_circuit_open(&peer_id));
676 }
677
678 #[tokio::test]
679 async fn test_circuit_breaker_half_open_transition() {
680 let handler = SyncErrorHandler::with_policies(
681 RetryPolicy::default(),
682 CircuitBreakerConfig {
683 failure_threshold: 2,
684 open_timeout: Duration::from_millis(100),
685 success_threshold: 2,
686 ..Default::default()
687 },
688 );
689
690 use iroh::SecretKey;
692 let mut rng = rand::rng();
693 let peer_id = SecretKey::generate(&mut rng).public();
694
695 handler
697 .handle_error(&peer_id, SyncError::Network("test".to_string()))
698 .ok();
699 handler
700 .handle_error(&peer_id, SyncError::Network("test".to_string()))
701 .ok();
702 handler
703 .handle_error(&peer_id, SyncError::Network("test".to_string()))
704 .ok();
705
706 assert!(handler.is_circuit_open(&peer_id));
707
708 tokio::time::sleep(Duration::from_millis(150)).await;
710
711 let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
713 assert!(result.is_ok());
714
715 let health = handler.peer_health(&peer_id).unwrap();
716 assert_eq!(health.circuit_state, CircuitState::HalfOpen);
717
718 handler.record_success(&peer_id);
720 handler.record_success(&peer_id);
721
722 let health = handler.peer_health(&peer_id).unwrap();
723 assert_eq!(health.circuit_state, CircuitState::Closed);
724 }
725}