use iroh::EndpointId;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use thiserror::Error;
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum SyncError {
#[error("Network error: {0}")]
Network(String),
#[error("Document error: {0}")]
Document(String),
#[error("Peer not found: {0}")]
PeerNotFound(String),
#[error("Protocol error: {0}")]
Protocol(String),
#[error("State error: {0}")]
State(String),
#[error("Resource exhaustion: {0}")]
ResourceExhaustion(String),
#[error("Circuit breaker open for peer")]
CircuitBreakerOpen,
#[error("Bandwidth exhausted for class {0}")]
BandwidthExhausted(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(C)]
pub enum ErrorSeverity {
Transient = 1,
Recoverable = 2,
Severe = 3,
Fatal = 4,
}
impl SyncError {
pub fn severity(&self) -> ErrorSeverity {
match self {
SyncError::Network(_) => ErrorSeverity::Recoverable,
SyncError::PeerNotFound(_) => ErrorSeverity::Transient,
SyncError::Protocol(_) => ErrorSeverity::Severe,
SyncError::Document(_) => ErrorSeverity::Fatal,
SyncError::State(_) => ErrorSeverity::Severe,
SyncError::ResourceExhaustion(_) => ErrorSeverity::Severe,
SyncError::CircuitBreakerOpen => ErrorSeverity::Transient,
SyncError::BandwidthExhausted(_) => ErrorSeverity::Transient,
}
}
pub fn is_retryable(&self) -> bool {
matches!(
self.severity(),
ErrorSeverity::Transient | ErrorSeverity::Recoverable
)
}
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub initial_delay: Duration,
pub max_delay: Duration,
pub max_attempts: u32,
pub backoff_multiplier: f64,
pub jitter_factor: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
max_attempts: 5,
backoff_multiplier: 2.0,
jitter_factor: 0.1,
}
}
}
impl RetryPolicy {
pub fn new(
initial_delay: Duration,
max_delay: Duration,
max_attempts: u32,
backoff_multiplier: f64,
) -> Self {
Self {
initial_delay,
max_delay,
max_attempts,
backoff_multiplier,
jitter_factor: 0.1,
}
}
pub fn transient() -> Self {
Self {
initial_delay: Duration::from_millis(50),
max_delay: Duration::from_secs(5),
max_attempts: 10,
backoff_multiplier: 1.5,
jitter_factor: 0.1,
}
}
pub fn severe() -> Self {
Self {
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(60),
max_attempts: 3,
backoff_multiplier: 3.0,
jitter_factor: 0.2,
}
}
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::ZERO;
}
let base_delay_ms = self.initial_delay.as_millis() as f64
* self.backoff_multiplier.powi(attempt as i32 - 1);
let capped_delay_ms = base_delay_ms.min(self.max_delay.as_millis() as f64);
let jitter = if self.jitter_factor > 0.0 {
use rand::Rng;
let mut rng = rand::rng();
rng.random::<f64>() * self.jitter_factor * capped_delay_ms
} else {
0.0
};
Duration::from_millis((capped_delay_ms + jitter) as u64)
}
pub fn should_retry(&self, attempt: u32) -> bool {
attempt < self.max_attempts
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub failure_window: Duration,
pub open_timeout: Duration,
pub success_threshold: u32,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self::from_env()
}
}
impl CircuitBreakerConfig {
pub fn from_env() -> Self {
let failure_threshold = std::env::var("CIRCUIT_FAILURE_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5);
let failure_window_secs = std::env::var("CIRCUIT_FAILURE_WINDOW_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5);
let open_timeout_secs = std::env::var("CIRCUIT_OPEN_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5);
let success_threshold = std::env::var("CIRCUIT_SUCCESS_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2);
Self {
failure_threshold,
failure_window: Duration::from_secs(failure_window_secs),
open_timeout: Duration::from_secs(open_timeout_secs),
success_threshold,
}
}
}
#[derive(Debug, Clone)]
pub struct PeerHealthTracker {
pub circuit_state: CircuitState,
pub consecutive_failures: u32,
pub consecutive_successes: u32,
pub last_failure_time: Option<SystemTime>,
pub circuit_opened_at: Option<SystemTime>,
pub total_failures: u64,
pub last_error: Option<SyncError>,
pub retry_attempt: u32,
}
impl Default for PeerHealthTracker {
fn default() -> Self {
Self {
circuit_state: CircuitState::Closed,
consecutive_failures: 0,
consecutive_successes: 0,
last_failure_time: None,
circuit_opened_at: None,
total_failures: 0,
last_error: None,
retry_attempt: 0,
}
}
}
impl PeerHealthTracker {
pub fn record_success(&mut self) {
self.retry_attempt = 0;
self.consecutive_failures = 0;
self.consecutive_successes += 1;
}
pub fn record_failure(&mut self, error: SyncError) {
self.retry_attempt += 1;
self.consecutive_failures += 1;
self.consecutive_successes = 0;
self.total_failures += 1;
self.last_failure_time = Some(SystemTime::now());
self.last_error = Some(error);
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
pub struct SyncErrorHandler {
retry_policy: RetryPolicy,
circuit_config: CircuitBreakerConfig,
peer_health: Arc<RwLock<HashMap<EndpointId, PeerHealthTracker>>>,
}
impl SyncErrorHandler {
pub fn new() -> Self {
Self {
retry_policy: RetryPolicy::default(),
circuit_config: CircuitBreakerConfig::default(),
peer_health: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn with_policies(retry_policy: RetryPolicy, circuit_config: CircuitBreakerConfig) -> Self {
Self {
retry_policy,
circuit_config,
peer_health: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn handle_error(
&self,
peer_id: &EndpointId,
error: SyncError,
) -> Result<Option<Duration>, SyncError> {
let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
let health = health_map.entry(*peer_id).or_default();
health.record_failure(error.clone());
match health.circuit_state {
CircuitState::Open => {
if let Some(opened_at) = health.circuit_opened_at {
if SystemTime::now()
.duration_since(opened_at)
.unwrap_or(Duration::ZERO)
>= self.circuit_config.open_timeout
{
tracing::info!(
"Circuit breaker for peer {:?} transitioning to half-open",
peer_id
);
health.circuit_state = CircuitState::HalfOpen;
health.retry_attempt = 0;
return Ok(Some(Duration::ZERO));
}
}
return Err(SyncError::CircuitBreakerOpen);
}
CircuitState::HalfOpen => {
if health.consecutive_failures >= 1 {
tracing::warn!("Circuit breaker for peer {:?} reopening", peer_id);
health.circuit_state = CircuitState::Open;
health.circuit_opened_at = Some(SystemTime::now());
return Err(SyncError::CircuitBreakerOpen);
}
}
CircuitState::Closed => {
if health.consecutive_failures >= self.circuit_config.failure_threshold {
tracing::warn!(
"Opening circuit breaker for peer {:?} after {} failures",
peer_id,
health.consecutive_failures
);
health.circuit_state = CircuitState::Open;
health.circuit_opened_at = Some(SystemTime::now());
return Err(SyncError::CircuitBreakerOpen);
}
}
}
if !error.is_retryable() {
tracing::error!("Non-retryable sync error for peer {:?}: {}", peer_id, error);
return Ok(None);
}
let policy = match error.severity() {
ErrorSeverity::Transient => RetryPolicy::transient(),
ErrorSeverity::Severe => RetryPolicy::severe(),
_ => self.retry_policy.clone(),
};
if !policy.should_retry(health.retry_attempt) {
tracing::warn!(
"Max retry attempts ({}) exceeded for peer {:?}",
policy.max_attempts,
peer_id
);
return Ok(None);
}
let delay = policy.delay_for_attempt(health.retry_attempt);
tracing::debug!(
"Will retry sync with peer {:?} after {:?} (attempt {})",
peer_id,
delay,
health.retry_attempt
);
Ok(Some(delay))
}
pub fn record_success(&self, peer_id: &EndpointId) {
let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
let health = health_map.entry(*peer_id).or_default();
health.record_success();
if health.circuit_state == CircuitState::HalfOpen
&& health.consecutive_successes >= self.circuit_config.success_threshold
{
tracing::info!("Closing circuit breaker for peer {:?}", peer_id);
health.circuit_state = CircuitState::Closed;
health.circuit_opened_at = None;
}
}
pub fn peer_health(&self, peer_id: &EndpointId) -> Option<PeerHealthTracker> {
self.peer_health
.read()
.unwrap_or_else(|e| e.into_inner())
.get(peer_id)
.cloned()
}
pub fn all_peer_health(&self) -> HashMap<EndpointId, PeerHealthTracker> {
self.peer_health
.read()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
pub fn reset_peer(&self, peer_id: &EndpointId) {
let mut health_map = self.peer_health.write().unwrap_or_else(|e| e.into_inner());
if let Some(health) = health_map.get_mut(peer_id) {
health.reset();
}
}
pub fn is_circuit_open(&self, peer_id: &EndpointId) -> bool {
self.peer_health
.read()
.unwrap()
.get(peer_id)
.map(|h| h.circuit_state == CircuitState::Open)
.unwrap_or(false)
}
}
impl Default for SyncErrorHandler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_policy_delay_calculation() {
let policy = RetryPolicy {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
max_attempts: 5,
backoff_multiplier: 2.0,
jitter_factor: 0.0, };
assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
assert_eq!(policy.delay_for_attempt(1), Duration::from_millis(100));
assert_eq!(policy.delay_for_attempt(2), Duration::from_millis(200));
assert_eq!(policy.delay_for_attempt(3), Duration::from_millis(400));
let long_policy = RetryPolicy {
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(5),
max_attempts: 10,
backoff_multiplier: 10.0,
jitter_factor: 0.0,
};
let delay = long_policy.delay_for_attempt(5);
assert!(delay <= Duration::from_secs(5));
}
#[test]
fn test_retry_policy_should_retry() {
let policy = RetryPolicy {
max_attempts: 3,
..Default::default()
};
assert!(policy.should_retry(0));
assert!(policy.should_retry(1));
assert!(policy.should_retry(2));
assert!(!policy.should_retry(3));
assert!(!policy.should_retry(4));
}
#[test]
fn test_sync_error_severity() {
assert_eq!(
SyncError::Network("test".to_string()).severity(),
ErrorSeverity::Recoverable
);
assert_eq!(
SyncError::PeerNotFound("test".to_string()).severity(),
ErrorSeverity::Transient
);
assert_eq!(
SyncError::Document("test".to_string()).severity(),
ErrorSeverity::Fatal
);
assert_eq!(
SyncError::Protocol("test".to_string()).severity(),
ErrorSeverity::Severe
);
}
#[test]
fn test_sync_error_retryable() {
assert!(SyncError::Network("test".to_string()).is_retryable());
assert!(SyncError::PeerNotFound("test".to_string()).is_retryable());
assert!(!SyncError::Document("test".to_string()).is_retryable());
}
#[test]
fn test_peer_health_tracker() {
let mut tracker = PeerHealthTracker::default();
tracker.record_success();
assert_eq!(tracker.consecutive_successes, 1);
assert_eq!(tracker.consecutive_failures, 0);
assert_eq!(tracker.retry_attempt, 0);
tracker.record_failure(SyncError::Network("timeout".to_string()));
assert_eq!(tracker.consecutive_successes, 0);
assert_eq!(tracker.consecutive_failures, 1);
assert_eq!(tracker.retry_attempt, 1);
assert_eq!(tracker.total_failures, 1);
tracker.record_failure(SyncError::Network("timeout".to_string()));
assert_eq!(tracker.consecutive_failures, 2);
assert_eq!(tracker.retry_attempt, 2);
assert_eq!(tracker.total_failures, 2);
tracker.reset();
assert_eq!(tracker.consecutive_failures, 0);
assert_eq!(tracker.total_failures, 0);
}
#[tokio::test]
async fn test_circuit_breaker_opens_after_threshold() {
let handler = SyncErrorHandler::with_policies(
RetryPolicy::default(),
CircuitBreakerConfig {
failure_threshold: 3,
..Default::default()
},
);
use iroh::SecretKey;
let mut rng = rand::rng();
let peer_id = SecretKey::generate(&mut rng).public();
for i in 0..2 {
let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
assert!(result.is_ok(), "Attempt {} should succeed", i);
}
let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
assert!(
matches!(result, Err(SyncError::CircuitBreakerOpen)),
"Circuit should open on 3rd failure"
);
assert!(handler.is_circuit_open(&peer_id));
}
#[tokio::test]
async fn test_circuit_breaker_half_open_transition() {
let handler = SyncErrorHandler::with_policies(
RetryPolicy::default(),
CircuitBreakerConfig {
failure_threshold: 2,
open_timeout: Duration::from_millis(100),
success_threshold: 2,
..Default::default()
},
);
use iroh::SecretKey;
let mut rng = rand::rng();
let peer_id = SecretKey::generate(&mut rng).public();
handler
.handle_error(&peer_id, SyncError::Network("test".to_string()))
.ok();
handler
.handle_error(&peer_id, SyncError::Network("test".to_string()))
.ok();
handler
.handle_error(&peer_id, SyncError::Network("test".to_string()))
.ok();
assert!(handler.is_circuit_open(&peer_id));
tokio::time::sleep(Duration::from_millis(150)).await;
let result = handler.handle_error(&peer_id, SyncError::Network("test".to_string()));
assert!(result.is_ok());
let health = handler.peer_health(&peer_id).unwrap();
assert_eq!(health.circuit_state, CircuitState::HalfOpen);
handler.record_success(&peer_id);
handler.record_success(&peer_id);
let health = handler.peer_health(&peer_id).unwrap();
assert_eq!(health.circuit_state, CircuitState::Closed);
}
}