use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use std::collections::HashMap;
#[cfg(feature = "distributed")]
use dashmap::DashMap;
pub trait FaultTolerance: Send + Sync {
fn is_allowed(&self) -> bool;
fn record_success(&self);
fn record_failure(&self, error: &str);
fn reset(&self);
fn get_stats(&self) -> FaultToleranceStats;
}
#[derive(Debug, Clone, Default)]
pub struct FaultToleranceStats {
pub total_attempts: usize,
pub successes: usize,
pub failures: usize,
pub retries: usize,
pub circuit_breaks: usize,
pub last_success: Option<Instant>,
pub last_failure: Option<Instant>,
}
impl FaultToleranceStats {
pub fn new() -> Self {
Self::default()
}
pub fn success_rate(&self) -> f64 {
let total = self.successes + self.failures;
if total == 0 {
0.0
} else {
self.successes as f64 / total as f64
}
}
pub fn failure_rate(&self) -> f64 {
1.0 - self.success_rate()
}
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_retries: usize,
pub base_delay: Duration,
pub max_delay: Duration,
pub exponential_backoff: bool,
pub jitter_factor: f64,
pub retryable_errors: Vec<String>,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
exponential_backoff: true,
jitter_factor: 0.1,
retryable_errors: vec![
"timeout".to_string(),
"connection".to_string(),
"temporary".to_string(),
],
}
}
}
impl RetryPolicy {
pub fn builder() -> RetryPolicyBuilder {
RetryPolicyBuilder::default()
}
pub fn next_delay(&self, attempt: usize) -> Duration {
let delay = if self.exponential_backoff {
let exp_delay = self.base_delay.as_millis() as u64 * (1u64 << attempt);
Duration::from_millis(exp_delay.min(self.max_delay.as_millis() as u64))
} else {
self.base_delay
};
if self.jitter_factor > 0.0 {
self.add_jitter(delay)
} else {
delay
}
}
pub fn is_retryable(&self, error: &str) -> bool {
let error_lower = error.to_lowercase();
self.retryable_errors
.iter()
.any(|pattern| error_lower.contains(pattern))
}
fn add_jitter(&self, delay: Duration) -> Duration {
use std::time::Duration;
let jitter_range = (delay.as_millis() as f64 * self.jitter_factor) as u128;
let jitter = if jitter_range > 0 {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_nanos()
% (jitter_range + 1);
seed as i128 - (jitter_range / 2) as i128
} else {
0
};
let new_delay = (delay.as_millis() as i128 + jitter).max(0) as u64;
Duration::from_millis(new_delay)
}
}
#[derive(Debug, Default)]
pub struct RetryPolicyBuilder {
max_retries: Option<usize>,
base_delay: Option<Duration>,
max_delay: Option<Duration>,
exponential_backoff: Option<bool>,
jitter_factor: Option<f64>,
retryable_errors: Option<Vec<String>>,
}
impl RetryPolicyBuilder {
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = Some(max_retries);
self
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.base_delay = Some(delay);
self
}
pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
self.max_delay = Some(max_delay);
self
}
pub fn with_exponential_backoff(mut self, enabled: bool) -> Self {
self.exponential_backoff = Some(enabled);
self
}
pub fn with_jitter_factor(mut self, factor: f64) -> Self {
self.jitter_factor = Some(factor);
self
}
pub fn with_retryable_error(mut self, error: impl Into<String>) -> Self {
self.retryable_errors
.get_or_insert_with(Vec::new)
.push(error.into());
self
}
pub fn build(self) -> RetryPolicy {
let mut policy = RetryPolicy::default();
if let Some(v) = self.max_retries {
policy.max_retries = v;
}
if let Some(v) = self.base_delay {
policy.base_delay = v;
}
if let Some(v) = self.max_delay {
policy.max_delay = v;
}
if let Some(v) = self.exponential_backoff {
policy.exponential_backoff = v;
}
if let Some(v) = self.jitter_factor {
policy.jitter_factor = v;
}
if let Some(v) = self.retryable_errors {
policy.retryable_errors = v;
}
policy
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
pub struct CircuitBreaker {
failure_threshold: AtomicUsize,
success_threshold: AtomicUsize,
timeout: Duration,
state: RwLock<CircuitState>,
failure_count: AtomicUsize,
success_count: AtomicUsize,
last_failure_time: RwLock<Option<Instant>>,
stats: RwLock<FaultToleranceStats>,
}
impl CircuitBreaker {
pub fn builder() -> CircuitBreakerBuilder {
CircuitBreakerBuilder::default()
}
pub fn is_allowed(&self) -> bool {
let state_guard = self.state.read().unwrap_or_else(|e| e.into_inner());
match *state_guard {
CircuitState::Closed => true,
CircuitState::Open => {
drop(state_guard); if let Some(last_failure) = *self
.last_failure_time
.read()
.unwrap_or_else(|e| e.into_inner())
{
if last_failure.elapsed() >= self.timeout {
let mut state_guard = self.state.write().unwrap_or_else(|e| e.into_inner());
if matches!(*state_guard, CircuitState::Open) {
*state_guard = CircuitState::HalfOpen;
self.success_count.store(0, Ordering::Relaxed);
}
return true;
}
}
false
}
CircuitState::HalfOpen => true,
}
}
pub fn record_success(&self) {
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.successes += 1;
stats.last_success = Some(Instant::now());
drop(stats);
let mut state_guard = self.state.write().unwrap_or_else(|e| e.into_inner());
if matches!(*state_guard, CircuitState::HalfOpen) {
let count = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= self.success_threshold.load(Ordering::Relaxed) {
*state_guard = CircuitState::Closed;
self.failure_count.store(0, Ordering::Relaxed);
}
}
}
pub fn record_failure(&self, _error: &str) {
*self
.last_failure_time
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(Instant::now());
let mut state_guard = self.state.write().unwrap_or_else(|e| e.into_inner());
match *state_guard {
CircuitState::Closed => {
let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= self.failure_threshold.load(Ordering::Relaxed) {
*state_guard = CircuitState::Open;
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.failures += 1;
stats.last_failure = Some(Instant::now());
stats.circuit_breaks += 1;
}
}
CircuitState::HalfOpen => {
*state_guard = CircuitState::Open;
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.failures += 1;
stats.last_failure = Some(Instant::now());
stats.circuit_breaks += 1;
}
CircuitState::Open => {
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.failures += 1;
stats.last_failure = Some(Instant::now());
}
}
}
pub fn reset(&self) {
*self.state.write().unwrap_or_else(|e| e.into_inner()) = CircuitState::Closed;
self.failure_count.store(0, Ordering::Relaxed);
self.success_count.store(0, Ordering::Relaxed);
*self
.last_failure_time
.write()
.unwrap_or_else(|e| e.into_inner()) = None;
}
pub fn state(&self) -> CircuitState {
*self.state.read().unwrap_or_else(|e| e.into_inner())
}
pub fn get_stats(&self) -> FaultToleranceStats {
self.stats.read().unwrap_or_else(|e| e.into_inner()).clone()
}
}
impl FaultTolerance for CircuitBreaker {
fn is_allowed(&self) -> bool {
self.is_allowed()
}
fn record_success(&self) {
self.record_success()
}
fn record_failure(&self, error: &str) {
self.record_failure(error)
}
fn reset(&self) {
self.reset()
}
fn get_stats(&self) -> FaultToleranceStats {
self.get_stats()
}
}
#[derive(Debug, Default)]
pub struct CircuitBreakerBuilder {
failure_threshold: Option<usize>,
success_threshold: Option<usize>,
timeout: Option<Duration>,
}
impl CircuitBreakerBuilder {
pub fn with_failure_threshold(mut self, threshold: usize) -> Self {
self.failure_threshold = Some(threshold);
self
}
pub fn with_success_threshold(mut self, threshold: usize) -> Self {
self.success_threshold = Some(threshold);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn build(self) -> CircuitBreaker {
CircuitBreaker {
failure_threshold: AtomicUsize::new(self.failure_threshold.unwrap_or(5)),
success_threshold: AtomicUsize::new(self.success_threshold.unwrap_or(2)),
timeout: self.timeout.unwrap_or(Duration::from_secs(30)),
state: RwLock::new(CircuitState::Closed),
failure_count: AtomicUsize::new(0),
success_count: AtomicUsize::new(0),
last_failure_time: RwLock::new(None),
stats: RwLock::new(FaultToleranceStats::new()),
}
}
}
pub struct HealthChecker {
interval: Duration,
timeout: Duration,
#[cfg(feature = "distributed")]
node_health: DashMap<usize, NodeHealth>,
#[cfg(not(feature = "distributed"))]
node_health: RwLock<HashMap<usize, NodeHealth>>,
enabled: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct NodeHealth {
pub node_id: usize,
pub healthy: bool,
pub consecutive_failures: usize,
pub last_check: Option<Instant>,
pub last_success: Option<Instant>,
pub health_score: f64,
}
impl NodeHealth {
pub fn new(node_id: usize) -> Self {
Self {
node_id,
healthy: true,
consecutive_failures: 0,
last_check: None,
last_success: None,
health_score: 100.0,
}
}
pub fn record_success(&mut self) {
self.healthy = true;
self.consecutive_failures = 0;
self.last_check = Some(Instant::now());
self.last_success = Some(Instant::now());
self.health_score = (self.health_score + 10.0).min(100.0);
}
pub fn record_failure(&mut self) {
self.consecutive_failures += 1;
self.last_check = Some(Instant::now());
if self.consecutive_failures >= 3 {
self.healthy = false;
}
self.health_score = (self.health_score - 20.0).max(0.0);
}
}
impl Default for HealthChecker {
fn default() -> Self {
Self::new()
}
}
impl HealthChecker {
#[cfg(feature = "distributed")]
pub fn new() -> Self {
Self {
interval: Duration::from_secs(5),
timeout: Duration::from_secs(2),
node_health: DashMap::new(),
enabled: AtomicBool::new(true),
}
}
#[cfg(not(feature = "distributed"))]
pub fn new() -> Self {
Self {
interval: Duration::from_secs(5),
timeout: Duration::from_secs(2),
node_health: RwLock::new(HashMap::new()),
enabled: AtomicBool::new(true),
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::Relaxed);
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
#[cfg(feature = "distributed")]
pub fn register_node(&self, node_id: usize) {
self.node_health.insert(node_id, NodeHealth::new(node_id));
}
#[cfg(not(feature = "distributed"))]
pub fn register_node(&self, node_id: usize) {
let mut health = self.node_health.write().unwrap_or_else(|e| e.into_inner());
health.insert(node_id, NodeHealth::new(node_id));
}
#[cfg(feature = "distributed")]
pub fn record_node_success(&self, node_id: usize) {
if let Some(mut node) = self.node_health.get_mut(&node_id) {
node.record_success();
}
}
#[cfg(not(feature = "distributed"))]
pub fn record_node_success(&self, node_id: usize) {
let mut health = self.node_health.write().unwrap_or_else(|e| e.into_inner());
if let Some(node) = health.get_mut(&node_id) {
node.record_success();
}
}
#[cfg(feature = "distributed")]
pub fn record_node_failure(&self, node_id: usize) {
if let Some(mut node) = self.node_health.get_mut(&node_id) {
node.record_failure();
}
}
#[cfg(not(feature = "distributed"))]
pub fn record_node_failure(&self, node_id: usize) {
let mut health = self.node_health.write().unwrap_or_else(|e| e.into_inner());
if let Some(node) = health.get_mut(&node_id) {
node.record_failure();
}
}
#[cfg(not(feature = "distributed"))]
pub fn is_node_healthy(&self, node_id: usize) -> bool {
let health = self.node_health.read().unwrap_or_else(|e| e.into_inner());
health.get(&node_id).map(|n| n.healthy).unwrap_or(false)
}
#[cfg(not(feature = "distributed"))]
pub fn get_all_health(&self) -> HashMap<usize, NodeHealth> {
self.node_health
.read()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
#[cfg(feature = "distributed")]
pub fn get_healthy_nodes(&self) -> Vec<usize> {
self.node_health.iter().filter_map(|kv| if kv.value().healthy { Some(*kv.key()) } else { None }).collect()
}
#[cfg(not(feature = "distributed"))]
pub fn get_healthy_nodes(&self) -> Vec<usize> {
let health = self.node_health.read().unwrap_or_else(|e| e.into_inner());
health
.iter()
.filter(|(_, n)| n.healthy)
.map(|(id, _)| *id)
.collect()
}
pub fn interval(&self) -> Duration {
self.interval
}
pub fn timeout(&self) -> Duration {
self.timeout
}
}
pub struct FailureDetector {
suspicion_threshold: AtomicUsize,
max_node_id: AtomicUsize,
#[cfg(feature = "distributed")]
suspicion_counts: DashMap<usize, usize>,
#[cfg(not(feature = "distributed"))]
suspicion_counts: RwLock<Vec<Option<usize>>>,
#[cfg(feature = "distributed")]
last_response: DashMap<usize, Instant>,
#[cfg(not(feature = "distributed"))]
last_response: RwLock<Vec<Option<Instant>>>,
#[cfg(feature = "distributed")]
failed_nodes: DashMap<usize, Instant>,
#[cfg(not(feature = "distributed"))]
failed_nodes: RwLock<Vec<Option<Instant>>>,
failure_timeout: Duration,
}
impl Default for FailureDetector {
fn default() -> Self {
Self::new()
}
}
impl FailureDetector {
#[cfg(feature = "distributed")]
pub fn new() -> Self {
Self {
suspicion_threshold: AtomicUsize::new(3),
max_node_id: AtomicUsize::new(0),
suspicion_counts: DashMap::new(),
last_response: DashMap::new(),
failed_nodes: DashMap::new(),
failure_timeout: Duration::from_secs(60),
}
}
#[cfg(not(feature = "distributed"))]
pub fn new() -> Self {
Self {
suspicion_threshold: AtomicUsize::new(3),
max_node_id: AtomicUsize::new(256), suspicion_counts: RwLock::new(vec![None; 256]),
last_response: RwLock::new(vec![None; 256]),
failed_nodes: RwLock::new(vec![None; 256]),
failure_timeout: Duration::from_secs(60),
}
}
#[cfg(not(feature = "distributed"))]
pub fn with_capacity(capacity: usize) -> Self {
Self {
suspicion_threshold: AtomicUsize::new(3),
max_node_id: AtomicUsize::new(capacity),
suspicion_counts: RwLock::new(vec![None; capacity]),
last_response: RwLock::new(vec![None; capacity]),
failed_nodes: RwLock::new(vec![None; capacity]),
failure_timeout: Duration::from_secs(60),
}
}
pub fn with_suspicion_threshold(self, threshold: usize) -> Self {
self.suspicion_threshold.store(threshold, Ordering::Relaxed);
self
}
pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
self.failure_timeout = timeout;
self
}
#[cfg(not(feature = "distributed"))]
pub fn record_response(&self, node_id: usize) {
self.ensure_capacity(node_id + 1);
let mut last_response = self.last_response.write().unwrap_or_else(|e| e.into_inner());
let mut suspicion_counts = self.suspicion_counts.write().unwrap_or_else(|e| e.into_inner());
let mut failed_nodes = self.failed_nodes.write().unwrap_or_else(|e| e.into_inner());
last_response[node_id] = Some(Instant::now());
suspicion_counts[node_id] = Some(0);
failed_nodes[node_id] = None;
}
#[cfg(not(feature = "distributed"))]
fn ensure_capacity(&self, min_size: usize) {
let current = self.max_node_id.load(Ordering::Relaxed);
if min_size > current {
let new_size = (min_size * 2).max(256);
let mut suspicion_counts = self.suspicion_counts.write().unwrap_or_else(|e| e.into_inner());
suspicion_counts.resize(new_size, None);
let mut last_response = self.last_response.write().unwrap_or_else(|e| e.into_inner());
last_response.resize(new_size, None);
let mut failed_nodes = self.failed_nodes.write().unwrap_or_else(|e| e.into_inner());
failed_nodes.resize(new_size, None);
self.max_node_id.store(new_size, Ordering::Relaxed);
}
}
#[cfg(feature = "distributed")]
pub fn suspect_node(&self, node_id: usize) -> bool {
let mut count = self.suspicion_counts.entry(node_id).or_insert(0);
*count += 1;
*count >= self.suspicion_threshold.load(Ordering::Relaxed)
}
#[cfg(not(feature = "distributed"))]
pub fn suspect_node(&self, node_id: usize) -> bool {
self.ensure_capacity(node_id + 1);
let mut suspicion_counts = self.suspicion_counts.write().unwrap_or_else(|e| e.into_inner());
let count = suspicion_counts[node_id].get_or_insert(0);
*count += 1;
*count >= self.suspicion_threshold.load(Ordering::Relaxed)
}
#[cfg(feature = "distributed")]
pub fn is_failed(&self, node_id: usize) -> bool {
if let Some(failure_time) = self.failed_nodes.get(&node_id) {
if failure_time.elapsed() < self.failure_timeout {
return true;
}
}
if let Some(last_time) = self.last_response.get(&node_id) {
if last_time.elapsed() > self.failure_timeout {
self.failed_nodes.insert(node_id, Instant::now());
return true;
}
}
false
}
#[cfg(not(feature = "distributed"))]
pub fn is_failed(&self, node_id: usize) -> bool {
let max_id = self.max_node_id.load(Ordering::Relaxed);
if node_id >= max_id {
return false;
}
let failed_nodes = self.failed_nodes.read().unwrap_or_else(|e| e.into_inner());
let last_response = self.last_response.read().unwrap_or_else(|e| e.into_inner());
if let Some(failure_time) = &failed_nodes[node_id] {
if failure_time.elapsed() < self.failure_timeout {
return true;
}
}
if let Some(last_time) = &last_response[node_id] {
if last_time.elapsed() > self.failure_timeout {
drop(failed_nodes);
drop(last_response);
let mut failed = self.failed_nodes.write().unwrap_or_else(|e| e.into_inner());
if node_id < failed.len() {
failed[node_id] = Some(Instant::now());
}
return true;
}
}
false
}
#[cfg(feature = "distributed")]
pub fn get_failed_nodes(&self) -> Vec<usize> {
self.failed_nodes.iter().map(|kv| *kv.key()).collect()
}
#[cfg(not(feature = "distributed"))]
pub fn get_failed_nodes(&self) -> Vec<usize> {
let failed_nodes = self.failed_nodes.read().unwrap_or_else(|e| e.into_inner());
failed_nodes
.iter()
.enumerate()
.filter_map(|(i, opt)| opt.map(|_| i))
.collect()
}
#[cfg(feature = "distributed")]
pub fn remove_failed_node(&self, node_id: usize) {
self.failed_nodes.remove(&node_id);
self.suspicion_counts.remove(&node_id);
self.last_response.remove(&node_id);
}
#[cfg(not(feature = "distributed"))]
pub fn remove_failed_node(&self, node_id: usize) {
if node_id >= self.max_node_id.load(Ordering::Relaxed) {
return;
}
let mut failed = self.failed_nodes.write().unwrap_or_else(|e| e.into_inner());
let mut suspicion = self.suspicion_counts.write().unwrap_or_else(|e| e.into_inner());
let mut last_resp = self.last_response.write().unwrap_or_else(|e| e.into_inner());
if node_id < failed.len() {
failed[node_id] = None;
}
if node_id < suspicion.len() {
suspicion[node_id] = None;
}
if node_id < last_resp.len() {
last_resp[node_id] = None;
}
}
}
pub trait RecoveryStrategy: Send + Sync {
fn recover(&self, node_id: usize, data: &[u8]) -> Result<(), String>;
fn needs_recovery(&self, node_id: usize) -> bool;
fn get_recovery_progress(&self, node_id: usize) -> f64;
}
pub struct CheckpointRecovery {
checkpoint_dir: String,
checkpoint_interval: Duration,
last_checkpoint: RwLock<HashMap<usize, Instant>>,
}
impl CheckpointRecovery {
pub fn new(checkpoint_dir: impl Into<String>) -> Self {
Self {
checkpoint_dir: checkpoint_dir.into(),
checkpoint_interval: Duration::from_secs(60),
last_checkpoint: RwLock::new(HashMap::new()),
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.checkpoint_interval = interval;
self
}
pub fn create_checkpoint(&self, node_id: usize, _data: &[u8]) -> Result<String, String> {
use std::time::SystemTime;
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
let filename = format!(
"{}/checkpoint_{}_{}.bin",
self.checkpoint_dir, node_id, timestamp
);
let _ = *self
.last_checkpoint
.write()
.unwrap_or_else(|e| e.into_inner())
.entry(node_id)
.or_insert_with(Instant::now);
Ok(filename)
}
pub fn load_latest_checkpoint(&self, _node_id: usize) -> Result<Vec<u8>, String> {
Ok(vec![])
}
}
impl RecoveryStrategy for CheckpointRecovery {
fn recover(&self, node_id: usize, _data: &[u8]) -> Result<(), String> {
let _checkpoint = self.load_latest_checkpoint(node_id)?;
Ok(())
}
fn needs_recovery(&self, node_id: usize) -> bool {
let last_checkpoint = self
.last_checkpoint
.read()
.unwrap_or_else(|e| e.into_inner());
if let Some(&last_time) = last_checkpoint.get(&node_id) {
last_time.elapsed() > self.checkpoint_interval
} else {
true
}
}
fn get_recovery_progress(&self, _node_id: usize) -> f64 {
1.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum LogLevel {
Debug,
Info,
Warn,
Error,
}
impl fmt::Display for LogLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LogLevel::Debug => write!(f, "DEBUG"),
LogLevel::Info => write!(f, "INFO"),
LogLevel::Warn => write!(f, "WARN"),
LogLevel::Error => write!(f, "ERROR"),
}
}
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub timestamp: Instant,
pub level: LogLevel,
pub target: String,
pub message: String,
pub node_id: Option<usize>,
}
impl LogEntry {
pub fn new(level: LogLevel, target: impl Into<String>, message: impl Into<String>) -> Self {
Self {
timestamp: Instant::now(),
level,
target: target.into(),
message: message.into(),
node_id: None,
}
}
pub fn with_node_id(mut self, node_id: usize) -> Self {
self.node_id = Some(node_id);
self
}
}
pub struct DistributedLogger {
min_level: AtomicUsize,
entries: RwLock<Vec<LogEntry>>,
max_entries: usize,
}
impl Default for DistributedLogger {
fn default() -> Self {
Self::new()
}
}
impl DistributedLogger {
pub fn new() -> Self {
Self {
min_level: AtomicUsize::new(LogLevel::Info as usize),
entries: RwLock::new(Vec::new()),
max_entries: 10000,
}
}
pub fn with_min_level(self, level: LogLevel) -> Self {
self.min_level.store(level as usize, Ordering::Relaxed);
self
}
pub fn with_max_entries(mut self, max: usize) -> Self {
self.max_entries = max;
self
}
pub fn log(&self, entry: LogEntry) {
if entry.level as usize >= self.min_level.load(Ordering::Relaxed) {
let mut entries = self.entries.write().unwrap_or_else(|e| e.into_inner());
entries.push(entry);
if entries.len() > self.max_entries {
let remove_count = entries.len() - self.max_entries;
entries.drain(0..remove_count);
}
}
}
pub fn debug(&self, target: impl Into<String>, message: impl Into<String>) {
self.log(LogEntry::new(LogLevel::Debug, target, message));
}
pub fn info(&self, target: impl Into<String>, message: impl Into<String>) {
self.log(LogEntry::new(LogLevel::Info, target, message));
}
pub fn warn(&self, target: impl Into<String>, message: impl Into<String>) {
self.log(LogEntry::new(LogLevel::Warn, target, message));
}
pub fn error(&self, target: impl Into<String>, message: impl Into<String>) {
self.log(LogEntry::new(LogLevel::Error, target, message));
}
pub fn get_entries(&self, level: Option<LogLevel>) -> Vec<LogEntry> {
let entries = self.entries.read().unwrap_or_else(|e| e.into_inner());
if let Some(min_level) = level {
entries
.iter()
.filter(|e| e.level >= min_level)
.cloned()
.collect()
} else {
entries.clone()
}
}
pub fn clear(&self) {
self.entries
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
}
}
pub fn execute_with_retry<T, F>(
policy: &RetryPolicy,
mut operation: F,
logger: Option<&DistributedLogger>,
) -> Result<T, String>
where
F: FnMut() -> Result<T, String>,
{
let mut attempt = 0;
let mut last_error = String::new();
while attempt <= policy.max_retries {
match operation() {
Ok(result) => return Ok(result),
Err(error) => {
last_error = error.clone();
if let Some(logger) = logger {
logger.warn(
"retry",
format!("Attempt {} failed: {}", attempt + 1, error),
);
}
if !policy.is_retryable(&error) {
return Err(error);
}
if attempt < policy.max_retries {
let delay = policy.next_delay(attempt);
std::thread::sleep(delay);
}
attempt += 1;
}
}
}
Err(format!("Max retries exceeded. Last error: {}", last_error))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_policy_builder() {
use std::time::Duration;
let policy = RetryPolicy::builder()
.with_max_retries(5)
.with_delay(Duration::from_millis(50))
.with_max_delay(Duration::from_secs(5))
.with_exponential_backoff(true)
.with_jitter_factor(0.2)
.with_retryable_error("timeout")
.with_retryable_error("connection")
.build();
assert_eq!(policy.max_retries, 5);
assert_eq!(policy.base_delay, Duration::from_millis(50));
assert_eq!(policy.max_delay, Duration::from_secs(5));
assert!(policy.exponential_backoff);
assert!(policy.is_retryable("connection timeout"));
}
#[test]
fn test_retry_policy_delay() {
let policy = RetryPolicy::builder()
.with_delay(Duration::from_millis(100))
.with_max_delay(Duration::from_secs(1))
.with_exponential_backoff(true)
.with_jitter_factor(0.0)
.build();
assert_eq!(policy.next_delay(0).as_millis(), 100);
assert_eq!(policy.next_delay(1).as_millis(), 200);
assert_eq!(policy.next_delay(2).as_millis(), 400);
}
#[test]
fn test_circuit_breaker() {
let cb = CircuitBreaker::builder()
.with_failure_threshold(3)
.with_success_threshold(2)
.with_timeout(Duration::from_millis(100))
.build();
assert_eq!(cb.state(), CircuitState::Closed);
assert!(cb.is_allowed());
cb.record_failure("error1");
cb.record_failure("error2");
assert_eq!(cb.state(), CircuitState::Closed);
cb.record_failure("error3");
assert_eq!(cb.state(), CircuitState::Open);
assert!(!cb.is_allowed());
cb.reset();
assert_eq!(cb.state(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_half_open() {
use std::time::Duration;
let cb = CircuitBreaker::builder()
.with_failure_threshold(2)
.with_success_threshold(2)
.with_timeout(Duration::from_millis(50))
.build();
cb.record_failure("error1");
cb.record_failure("error2");
assert_eq!(cb.state(), CircuitState::Open);
std::thread::sleep(Duration::from_millis(60));
assert!(cb.is_allowed());
assert_eq!(cb.state(), CircuitState::HalfOpen);
cb.record_success();
assert_eq!(cb.state(), CircuitState::HalfOpen);
cb.record_success();
assert_eq!(cb.state(), CircuitState::Closed);
}
#[test]
fn test_health_checker() {
use std::time::Duration;
let checker = HealthChecker::new()
.with_interval(Duration::from_secs(5))
.with_timeout(Duration::from_secs(2));
checker.register_node(1);
checker.register_node(2);
assert!(checker.is_node_healthy(1));
assert!(checker.is_node_healthy(2));
checker.record_node_failure(1);
checker.record_node_failure(1);
checker.record_node_failure(1);
assert!(!checker.is_node_healthy(1));
assert!(checker.is_node_healthy(2));
}
#[test]
fn test_failure_detector() {
use std::time::Duration;
let detector = FailureDetector::new()
.with_suspicion_threshold(2)
.with_failure_timeout(Duration::from_millis(50));
detector.record_response(1);
detector.record_response(2);
assert!(!detector.is_failed(1));
assert!(!detector.is_failed(2));
assert!(!detector.suspect_node(2));
assert!(detector.suspect_node(2));
std::thread::sleep(Duration::from_millis(60));
assert!(detector.is_failed(1));
assert!(detector.is_failed(2));
}
#[test]
fn test_distributed_logger() {
let logger = DistributedLogger::new()
.with_min_level(LogLevel::Info)
.with_max_entries(100);
logger.debug("test", "debug message");
logger.info("test", "info message");
logger.warn("test", "warn message");
logger.error("test", "error message");
let entries = logger.get_entries(None);
assert_eq!(entries.len(), 3);
let info_entries = logger.get_entries(Some(LogLevel::Info));
assert_eq!(info_entries.len(), 3);
let error_entries = logger.get_entries(Some(LogLevel::Error));
assert_eq!(error_entries.len(), 1);
}
#[test]
fn test_execute_with_retry_success() {
let policy = RetryPolicy::builder()
.with_max_retries(3)
.with_delay(Duration::from_millis(10))
.build();
let logger = DistributedLogger::new();
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counter_clone = counter.clone();
let result = execute_with_retry(
&policy,
|| {
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok::<_, String>("success".to_string())
},
Some(&logger),
);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "success");
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_execute_with_retry_failure() {
let policy = RetryPolicy::builder()
.with_max_retries(3)
.with_delay(Duration::from_millis(10))
.with_retryable_error("timeout")
.build();
let logger = DistributedLogger::new();
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counter_clone = counter.clone();
let result = execute_with_retry(
&policy,
|| {
counter_clone.fetch_add(1, Ordering::SeqCst);
Err::<String, _>("timeout".to_string())
},
Some(&logger),
);
assert!(result.is_err());
assert_eq!(counter.load(Ordering::SeqCst), 4); }
#[test]
fn test_fault_tolerance_stats() {
let mut stats = FaultToleranceStats::new();
stats.successes = 8;
stats.failures = 2;
assert!((stats.success_rate() - 0.8).abs() < 1e-10);
assert!((stats.failure_rate() - 0.2).abs() < 1e-10);
}
#[test]
fn test_node_health() {
let mut health = NodeHealth::new(1);
assert!(health.healthy);
assert_eq!(health.health_score, 100.0);
assert_eq!(health.consecutive_failures, 0);
health.record_failure();
health.record_failure();
health.record_failure();
assert!(!health.healthy);
assert_eq!(health.consecutive_failures, 3);
assert!(health.health_score < 100.0);
health.record_success();
assert!(health.healthy);
assert_eq!(health.consecutive_failures, 0);
}
}