Skip to main content

oxirs_stream/fault_tolerance/
mod.rs

1//! # Advanced Fault Tolerance for Stream Processing
2//!
3//! Provides production-grade fault tolerance primitives for stream processing:
4//! health monitoring, bulkhead isolation, configurable retry policies, and
5//! worker supervision.
6//!
7//! ## Components
8//!
9//! - [`StreamHealthMonitor`]: Monitors stream health with configurable thresholds
10//! - [`BulkheadIsolator`]: Isolates stream failures using the bulkhead pattern
11//! - [`StreamRetryPolicy`]: Configurable retry with exponential backoff
12//! - [`StreamSupervisor`]: Supervises stream workers and restarts on failure
13
14pub mod checkpoint_recovery;
15pub use checkpoint_recovery::*;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant, SystemTime};
20
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use tracing::{debug, info, warn};
25
26// ─── Error Types ─────────────────────────────────────────────────────────────
27
28/// Errors in fault tolerance operations
29#[derive(Error, Debug, Clone)]
30pub enum FaultToleranceError {
31    #[error("Bulkhead full: compartment {compartment} has reached capacity {capacity}")]
32    BulkheadFull {
33        compartment: String,
34        capacity: usize,
35    },
36
37    #[error("Max retries exceeded: {attempts} attempts for operation {operation}")]
38    MaxRetriesExceeded { attempts: u32, operation: String },
39
40    #[error("Worker {worker_id} failed to restart after {attempts} attempts")]
41    SupervisorRestartFailed { worker_id: String, attempts: u32 },
42
43    #[error("Health check failed: metric {metric} value {value} exceeds threshold {threshold}")]
44    HealthCheckFailed {
45        metric: String,
46        value: f64,
47        threshold: f64,
48    },
49
50    #[error("Operation timeout after {elapsed_ms}ms (limit {timeout_ms}ms)")]
51    OperationTimeout { elapsed_ms: u64, timeout_ms: u64 },
52}
53
54/// Result type for fault tolerance operations
55pub type FaultResult<T> = Result<T, FaultToleranceError>;
56
57// ─── Stream Health Monitor ────────────────────────────────────────────────────
58
59/// A threshold rule that can trigger a health alert
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct HealthThreshold {
62    /// Name of the metric (e.g. "error_rate", "latency_p99_ms")
63    pub metric_name: String,
64    /// Value above which the health is considered degraded
65    pub warn_threshold: f64,
66    /// Value above which the health is considered critical
67    pub critical_threshold: f64,
68}
69
70/// Severity of a health alert
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub enum HealthAlertSeverity {
73    /// Metric crossed the warning threshold
74    Warning,
75    /// Metric crossed the critical threshold
76    Critical,
77    /// Metric has recovered below warning threshold
78    Recovered,
79}
80
81/// A health alert emitted by the monitor
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct HealthAlert {
84    /// Metric that triggered the alert
85    pub metric_name: String,
86    /// Current metric value
87    pub current_value: f64,
88    /// Applicable threshold that was crossed
89    pub threshold: f64,
90    /// Alert severity
91    pub severity: HealthAlertSeverity,
92    /// When the alert was raised
93    pub raised_at: SystemTime,
94}
95
96/// Overall health status of the stream
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub enum StreamHealthStatus {
99    /// All metrics are within normal thresholds
100    Healthy,
101    /// One or more metrics have crossed warning thresholds
102    Degraded,
103    /// One or more metrics have crossed critical thresholds
104    Critical,
105    /// Health data is too old to be reliable
106    Unknown,
107}
108
109/// A snapshot of all health metrics
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct HealthSnapshot {
112    /// Current health status
113    pub status: StreamHealthStatus,
114    /// All current metric values
115    pub metrics: HashMap<String, f64>,
116    /// Active alerts
117    pub active_alerts: Vec<HealthAlert>,
118    /// Timestamp of this snapshot
119    pub snapshot_time: SystemTime,
120}
121
122/// Configuration for the stream health monitor
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HealthMonitorConfig {
125    /// Health thresholds per metric
126    pub thresholds: Vec<HealthThreshold>,
127    /// Maximum age of a metric before it is considered stale
128    pub metric_staleness: Duration,
129    /// How often to evaluate thresholds
130    pub check_interval: Duration,
131}
132
133impl Default for HealthMonitorConfig {
134    fn default() -> Self {
135        Self {
136            thresholds: vec![
137                HealthThreshold {
138                    metric_name: "error_rate".to_string(),
139                    warn_threshold: 0.01,
140                    critical_threshold: 0.05,
141                },
142                HealthThreshold {
143                    metric_name: "latency_p99_ms".to_string(),
144                    warn_threshold: 100.0,
145                    critical_threshold: 500.0,
146                },
147                HealthThreshold {
148                    metric_name: "backpressure_ratio".to_string(),
149                    warn_threshold: 0.5,
150                    critical_threshold: 0.9,
151                },
152            ],
153            metric_staleness: Duration::from_secs(60),
154            check_interval: Duration::from_secs(5),
155        }
156    }
157}
158
159/// Monitors stream health with configurable thresholds.
160///
161/// Accepts metric updates from the stream pipeline and evaluates them against
162/// configured thresholds to produce health alerts.
163pub struct StreamHealthMonitor {
164    config: HealthMonitorConfig,
165    /// Current metric values with timestamps
166    metrics: Arc<RwLock<HashMap<String, (f64, Instant)>>>,
167    /// Currently active alerts
168    active_alerts: Arc<RwLock<Vec<HealthAlert>>>,
169    /// Alert history (capped to last 1000 alerts)
170    alert_history: Arc<RwLock<Vec<HealthAlert>>>,
171    /// Total alerts raised since creation
172    total_alerts_raised: Arc<RwLock<u64>>,
173}
174
175impl StreamHealthMonitor {
176    /// Creates a new health monitor with the given configuration.
177    pub fn new(config: HealthMonitorConfig) -> Self {
178        Self {
179            config,
180            metrics: Arc::new(RwLock::new(HashMap::new())),
181            active_alerts: Arc::new(RwLock::new(Vec::new())),
182            alert_history: Arc::new(RwLock::new(Vec::new())),
183            total_alerts_raised: Arc::new(RwLock::new(0)),
184        }
185    }
186
187    /// Records a new metric value and evaluates thresholds.
188    ///
189    /// Returns any new alerts that were raised.
190    pub fn record_metric(&self, metric_name: &str, value: f64) -> Vec<HealthAlert> {
191        self.metrics
192            .write()
193            .insert(metric_name.to_string(), (value, Instant::now()));
194        self.evaluate_thresholds(metric_name, value)
195    }
196
197    /// Returns the current health snapshot.
198    pub fn snapshot(&self) -> HealthSnapshot {
199        let metrics = self.metrics.read();
200        let now = Instant::now();
201        let stale_limit = self.config.metric_staleness;
202
203        // Check for stale metrics
204        let all_fresh = metrics
205            .values()
206            .all(|(_, ts)| now.duration_since(*ts) < stale_limit);
207
208        let metric_values: HashMap<String, f64> =
209            metrics.iter().map(|(k, (v, _))| (k.clone(), *v)).collect();
210
211        let active_alerts = self.active_alerts.read().clone();
212
213        let status = if !all_fresh || metric_values.is_empty() {
214            StreamHealthStatus::Unknown
215        } else if active_alerts
216            .iter()
217            .any(|a| a.severity == HealthAlertSeverity::Critical)
218        {
219            StreamHealthStatus::Critical
220        } else if active_alerts
221            .iter()
222            .any(|a| a.severity == HealthAlertSeverity::Warning)
223        {
224            StreamHealthStatus::Degraded
225        } else {
226            StreamHealthStatus::Healthy
227        };
228
229        HealthSnapshot {
230            status,
231            metrics: metric_values,
232            active_alerts,
233            snapshot_time: SystemTime::now(),
234        }
235    }
236
237    /// Returns current metric value for a named metric, if present.
238    pub fn current_metric(&self, name: &str) -> Option<f64> {
239        self.metrics.read().get(name).map(|(v, _)| *v)
240    }
241
242    /// Returns the total number of alerts ever raised.
243    pub fn total_alerts_raised(&self) -> u64 {
244        *self.total_alerts_raised.read()
245    }
246
247    fn evaluate_thresholds(&self, metric_name: &str, value: f64) -> Vec<HealthAlert> {
248        let mut new_alerts = Vec::new();
249        let thresholds = self.config.thresholds.clone();
250
251        for threshold in &thresholds {
252            if threshold.metric_name != metric_name {
253                continue;
254            }
255            let severity = if value >= threshold.critical_threshold {
256                Some(HealthAlertSeverity::Critical)
257            } else if value >= threshold.warn_threshold {
258                Some(HealthAlertSeverity::Warning)
259            } else {
260                // Potentially recovered — remove existing alert for this metric
261                let mut active = self.active_alerts.write();
262                active.retain(|a| a.metric_name != metric_name);
263                None
264            };
265
266            if let Some(sev) = severity {
267                let threshold_val = if sev == HealthAlertSeverity::Critical {
268                    threshold.critical_threshold
269                } else {
270                    threshold.warn_threshold
271                };
272                let alert = HealthAlert {
273                    metric_name: metric_name.to_string(),
274                    current_value: value,
275                    threshold: threshold_val,
276                    severity: sev,
277                    raised_at: SystemTime::now(),
278                };
279                // Upsert active alert for this metric
280                let mut active = self.active_alerts.write();
281                active.retain(|a| a.metric_name != metric_name);
282                active.push(alert.clone());
283                drop(active);
284
285                // Append to history (cap at 1000)
286                let mut history = self.alert_history.write();
287                if history.len() >= 1000 {
288                    history.remove(0);
289                }
290                history.push(alert.clone());
291
292                *self.total_alerts_raised.write() += 1;
293                new_alerts.push(alert);
294                debug!("Health alert raised for metric {}: {}", metric_name, value);
295            }
296        }
297        new_alerts
298    }
299}
300
301// ─── Bulkhead Isolator ────────────────────────────────────────────────────────
302
303/// Statistics for a single bulkhead compartment
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct CompartmentStats {
306    /// Compartment identifier
307    pub compartment_id: String,
308    /// Maximum concurrent operations
309    pub capacity: usize,
310    /// Currently active operations
311    pub active: usize,
312    /// Total operations rejected due to full compartment
313    pub rejected: u64,
314    /// Total operations accepted
315    pub accepted: u64,
316}
317
318/// Configuration for the bulkhead isolator
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct BulkheadConfig {
321    /// Capacity per named compartment
322    pub compartment_capacities: HashMap<String, usize>,
323    /// Default capacity for compartments not explicitly listed
324    pub default_capacity: usize,
325}
326
327impl Default for BulkheadConfig {
328    fn default() -> Self {
329        let mut compartments = HashMap::new();
330        compartments.insert("critical".to_string(), 100);
331        compartments.insert("standard".to_string(), 50);
332        compartments.insert("background".to_string(), 20);
333        Self {
334            compartment_capacities: compartments,
335            default_capacity: 30,
336        }
337    }
338}
339
340/// A permit acquired from the bulkhead; releases on drop.
341pub struct BulkheadPermit {
342    compartment_id: String,
343    active_counter: Arc<RwLock<usize>>,
344}
345
346impl Drop for BulkheadPermit {
347    fn drop(&mut self) {
348        let mut active = self.active_counter.write();
349        if *active > 0 {
350            *active -= 1;
351        }
352        debug!(
353            "Bulkhead permit released for compartment {}",
354            self.compartment_id
355        );
356    }
357}
358
359/// Internal state for a single compartment
360struct Compartment {
361    capacity: usize,
362    active: Arc<RwLock<usize>>,
363    rejected: Arc<RwLock<u64>>,
364    accepted: Arc<RwLock<u64>>,
365}
366
367/// Isolates stream failures using the bulkhead pattern.
368///
369/// Divides the system into isolated compartments with independent concurrency
370/// limits; a failure or overload in one compartment does not affect others.
371pub struct BulkheadIsolator {
372    compartments: Arc<RwLock<HashMap<String, Compartment>>>,
373    default_capacity: usize,
374}
375
376impl BulkheadIsolator {
377    /// Creates a new bulkhead isolator with the given configuration.
378    pub fn new(config: BulkheadConfig) -> Self {
379        let mut compartments = HashMap::new();
380        for (id, capacity) in &config.compartment_capacities {
381            compartments.insert(
382                id.clone(),
383                Compartment {
384                    capacity: *capacity,
385                    active: Arc::new(RwLock::new(0)),
386                    rejected: Arc::new(RwLock::new(0)),
387                    accepted: Arc::new(RwLock::new(0)),
388                },
389            );
390        }
391        Self {
392            compartments: Arc::new(RwLock::new(compartments)),
393            default_capacity: config.default_capacity,
394        }
395    }
396
397    /// Attempts to acquire a permit for the named compartment.
398    ///
399    /// Returns `Ok(BulkheadPermit)` if capacity is available; `Err` if full.
400    pub fn acquire(&self, compartment_id: &str) -> FaultResult<BulkheadPermit> {
401        let mut compartments = self.compartments.write();
402        // Auto-create compartment with default capacity if unknown
403        let compartment = compartments
404            .entry(compartment_id.to_string())
405            .or_insert_with(|| Compartment {
406                capacity: self.default_capacity,
407                active: Arc::new(RwLock::new(0)),
408                rejected: Arc::new(RwLock::new(0)),
409                accepted: Arc::new(RwLock::new(0)),
410            });
411
412        let current = *compartment.active.read();
413        if current >= compartment.capacity {
414            *compartment.rejected.write() += 1;
415            return Err(FaultToleranceError::BulkheadFull {
416                compartment: compartment_id.to_string(),
417                capacity: compartment.capacity,
418            });
419        }
420        *compartment.active.write() += 1;
421        *compartment.accepted.write() += 1;
422        debug!(
423            "Bulkhead permit acquired for compartment {} ({}/{})",
424            compartment_id,
425            current + 1,
426            compartment.capacity
427        );
428
429        Ok(BulkheadPermit {
430            compartment_id: compartment_id.to_string(),
431            active_counter: Arc::clone(&compartment.active),
432        })
433    }
434
435    /// Returns statistics for all compartments.
436    pub fn stats(&self) -> Vec<CompartmentStats> {
437        self.compartments
438            .read()
439            .iter()
440            .map(|(id, c)| CompartmentStats {
441                compartment_id: id.clone(),
442                capacity: c.capacity,
443                active: *c.active.read(),
444                rejected: *c.rejected.read(),
445                accepted: *c.accepted.read(),
446            })
447            .collect()
448    }
449
450    /// Returns statistics for a specific compartment.
451    pub fn compartment_stats(&self, compartment_id: &str) -> Option<CompartmentStats> {
452        self.compartments
453            .read()
454            .get(compartment_id)
455            .map(|c| CompartmentStats {
456                compartment_id: compartment_id.to_string(),
457                capacity: c.capacity,
458                active: *c.active.read(),
459                rejected: *c.rejected.read(),
460                accepted: *c.accepted.read(),
461            })
462    }
463}
464
465// ─── Stream Retry Policy ──────────────────────────────────────────────────────
466
467/// Configures when and how many times an operation should be retried
468#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct StreamRetryPolicy {
470    /// Maximum number of retry attempts (0 = no retries)
471    pub max_attempts: u32,
472    /// Initial delay before first retry
473    pub initial_delay: Duration,
474    /// Multiplier applied to delay after each retry (exponential backoff)
475    pub backoff_multiplier: f64,
476    /// Maximum delay cap (prevents unbounded backoff)
477    pub max_delay: Duration,
478    /// Whether to add random jitter (fraction of delay) to prevent thundering herd
479    pub jitter: bool,
480}
481
482impl Default for StreamRetryPolicy {
483    fn default() -> Self {
484        Self {
485            max_attempts: 3,
486            initial_delay: Duration::from_millis(100),
487            backoff_multiplier: 2.0,
488            max_delay: Duration::from_secs(30),
489            jitter: true,
490        }
491    }
492}
493
494impl StreamRetryPolicy {
495    /// Returns the delay before the nth retry attempt (0-indexed).
496    ///
497    /// Incorporates exponential backoff and an optional pseudo-random jitter
498    /// derived deterministically from the attempt number (no rand crate needed).
499    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
500        let factor = self.backoff_multiplier.powi(attempt as i32);
501        let base_ms = self.initial_delay.as_millis() as f64 * factor;
502        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
503
504        let jitter_ms = if self.jitter {
505            // Deterministic jitter: 0..25% of capped_ms using linear congruential
506            let pseudo = ((attempt as u64)
507                .wrapping_mul(6364136223846793005)
508                .wrapping_add(1))
509                % 1000;
510            let ratio = pseudo as f64 / 4000.0; // 0..0.25
511            capped_ms * ratio
512        } else {
513            0.0
514        };
515
516        Duration::from_millis((capped_ms + jitter_ms) as u64)
517    }
518
519    /// Executes a fallible synchronous closure with retries.
520    ///
521    /// Uses `std::thread::sleep` for backoff delays. For async contexts see
522    /// [`StreamRetryPolicy::retry_async`].
523    pub fn retry<F, T, E>(&self, operation_name: &str, mut f: F) -> FaultResult<T>
524    where
525        F: FnMut() -> Result<T, E>,
526        E: std::fmt::Debug,
527    {
528        for attempt in 0..=self.max_attempts {
529            match f() {
530                Ok(result) => {
531                    if attempt > 0 {
532                        info!(
533                            "Operation {} succeeded after {} retries",
534                            operation_name, attempt
535                        );
536                    }
537                    return Ok(result);
538                }
539                Err(err) => {
540                    if attempt >= self.max_attempts {
541                        warn!(
542                            "Operation {} failed after {} attempts: {:?}",
543                            operation_name,
544                            attempt + 1,
545                            err
546                        );
547                        return Err(FaultToleranceError::MaxRetriesExceeded {
548                            attempts: attempt + 1,
549                            operation: operation_name.to_string(),
550                        });
551                    }
552                    let delay = self.delay_for_attempt(attempt);
553                    debug!(
554                        "Operation {} attempt {} failed, retrying in {:?}",
555                        operation_name,
556                        attempt + 1,
557                        delay
558                    );
559                    std::thread::sleep(delay);
560                }
561            }
562        }
563        // Unreachable but satisfies the type checker
564        Err(FaultToleranceError::MaxRetriesExceeded {
565            attempts: self.max_attempts + 1,
566            operation: operation_name.to_string(),
567        })
568    }
569
570    /// Executes a fallible async closure with retries and tokio async sleep.
571    pub async fn retry_async<F, Fut, T, E>(&self, operation_name: &str, mut f: F) -> FaultResult<T>
572    where
573        F: FnMut() -> Fut,
574        Fut: std::future::Future<Output = Result<T, E>>,
575        E: std::fmt::Debug,
576    {
577        for attempt in 0..=self.max_attempts {
578            match f().await {
579                Ok(result) => {
580                    if attempt > 0 {
581                        info!(
582                            "Async operation {} succeeded after {} retries",
583                            operation_name, attempt
584                        );
585                    }
586                    return Ok(result);
587                }
588                Err(err) => {
589                    if attempt >= self.max_attempts {
590                        warn!(
591                            "Async operation {} failed after {} attempts: {:?}",
592                            operation_name,
593                            attempt + 1,
594                            err
595                        );
596                        return Err(FaultToleranceError::MaxRetriesExceeded {
597                            attempts: attempt + 1,
598                            operation: operation_name.to_string(),
599                        });
600                    }
601                    let delay = self.delay_for_attempt(attempt);
602                    debug!(
603                        "Async operation {} attempt {} failed, retrying in {:?}",
604                        operation_name,
605                        attempt + 1,
606                        delay
607                    );
608                    tokio::time::sleep(delay).await;
609                }
610            }
611        }
612        Err(FaultToleranceError::MaxRetriesExceeded {
613            attempts: self.max_attempts + 1,
614            operation: operation_name.to_string(),
615        })
616    }
617}
618
619// ─── Stream Supervisor ────────────────────────────────────────────────────────
620
621/// The status of a supervised worker
622#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
623pub enum WorkerStatus {
624    /// Worker is running normally
625    Running,
626    /// Worker has failed and is waiting for restart
627    Failed,
628    /// Worker is being restarted
629    Restarting,
630    /// Worker has been stopped intentionally
631    Stopped,
632    /// Worker has exceeded max restart attempts and is permanently failed
633    Exhausted,
634}
635
636/// A record of a worker restart
637#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct RestartRecord {
639    /// Worker ID
640    pub worker_id: String,
641    /// Restart attempt number
642    pub attempt: u32,
643    /// Reason for restart
644    pub reason: String,
645    /// When the restart was attempted
646    pub restarted_at: SystemTime,
647    /// Whether the restart succeeded
648    pub success: bool,
649}
650
651/// Internal worker state tracked by the supervisor
652#[derive(Debug, Clone)]
653struct WorkerState {
654    worker_id: String,
655    status: WorkerStatus,
656    restart_count: u32,
657    max_restarts: u32,
658    last_failure: Option<SystemTime>,
659    last_restart: Option<SystemTime>,
660}
661
662/// Configuration for the stream supervisor
663#[derive(Debug, Clone, Serialize, Deserialize)]
664pub struct SupervisorConfig {
665    /// Maximum number of restart attempts per worker before giving up
666    pub max_restarts: u32,
667    /// Restart backoff policy
668    pub restart_policy: StreamRetryPolicy,
669    /// Whether to propagate failure to sibling workers (one-for-all strategy)
670    pub one_for_all: bool,
671}
672
673impl Default for SupervisorConfig {
674    fn default() -> Self {
675        Self {
676            max_restarts: 5,
677            restart_policy: StreamRetryPolicy {
678                max_attempts: 5,
679                initial_delay: Duration::from_millis(500),
680                backoff_multiplier: 2.0,
681                max_delay: Duration::from_secs(60),
682                jitter: true,
683            },
684            one_for_all: false,
685        }
686    }
687}
688
689/// Statistics for the stream supervisor
690#[derive(Debug, Clone, Serialize, Deserialize)]
691pub struct SupervisorStats {
692    /// Total workers registered
693    pub total_workers: usize,
694    /// Workers currently running
695    pub running_workers: usize,
696    /// Workers permanently failed
697    pub exhausted_workers: usize,
698    /// Total restart events
699    pub total_restarts: u64,
700    /// Total restart records
701    pub restart_history_len: usize,
702}
703
704/// Supervises stream workers and restarts them on failure.
705///
706/// Tracks worker health, enforces restart limits, and optionally propagates
707/// failures to sibling workers (one-for-all supervision strategy).
708pub struct StreamSupervisor {
709    config: SupervisorConfig,
710    workers: Arc<RwLock<HashMap<String, WorkerState>>>,
711    restart_history: Arc<RwLock<Vec<RestartRecord>>>,
712    total_restarts: Arc<RwLock<u64>>,
713}
714
715impl StreamSupervisor {
716    /// Creates a new supervisor with the given configuration.
717    pub fn new(config: SupervisorConfig) -> Self {
718        Self {
719            config,
720            workers: Arc::new(RwLock::new(HashMap::new())),
721            restart_history: Arc::new(RwLock::new(Vec::new())),
722            total_restarts: Arc::new(RwLock::new(0)),
723        }
724    }
725
726    /// Registers a worker with the supervisor.
727    pub fn register_worker(&self, worker_id: impl Into<String>) {
728        let id = worker_id.into();
729        self.workers.write().insert(
730            id.clone(),
731            WorkerState {
732                worker_id: id,
733                status: WorkerStatus::Running,
734                restart_count: 0,
735                max_restarts: self.config.max_restarts,
736                last_failure: None,
737                last_restart: None,
738            },
739        );
740    }
741
742    /// Notifies the supervisor that a worker has failed.
743    ///
744    /// The supervisor will attempt to restart the worker unless the restart
745    /// limit has been reached.
746    ///
747    /// Returns the new worker status.
748    pub fn report_failure(&self, worker_id: &str, reason: &str) -> FaultResult<WorkerStatus> {
749        let new_status = {
750            let mut workers = self.workers.write();
751            let worker = workers.get_mut(worker_id).ok_or_else(|| {
752                FaultToleranceError::SupervisorRestartFailed {
753                    worker_id: worker_id.to_string(),
754                    attempts: 0,
755                }
756            })?;
757
758            worker.last_failure = Some(SystemTime::now());
759
760            if worker.restart_count >= worker.max_restarts {
761                worker.status = WorkerStatus::Exhausted;
762                warn!(
763                    "Worker {} permanently failed after {} restarts",
764                    worker_id, worker.restart_count
765                );
766                WorkerStatus::Exhausted
767            } else {
768                worker.status = WorkerStatus::Restarting;
769                worker.restart_count += 1;
770                worker.last_restart = Some(SystemTime::now());
771                WorkerStatus::Restarting
772            }
773        };
774
775        // Record restart attempt
776        let attempt = self
777            .workers
778            .read()
779            .get(worker_id)
780            .map(|w| w.restart_count)
781            .unwrap_or(0);
782        let record = RestartRecord {
783            worker_id: worker_id.to_string(),
784            attempt,
785            reason: reason.to_string(),
786            restarted_at: SystemTime::now(),
787            success: new_status == WorkerStatus::Restarting,
788        };
789        let mut history = self.restart_history.write();
790        if history.len() >= 10_000 {
791            history.remove(0);
792        }
793        history.push(record);
794
795        if new_status == WorkerStatus::Restarting {
796            *self.total_restarts.write() += 1;
797            info!("Restarting worker {} (attempt {})", worker_id, attempt);
798
799            // If one-for-all strategy, mark siblings for restart too
800            if self.config.one_for_all {
801                let siblings: Vec<String> = self
802                    .workers
803                    .read()
804                    .keys()
805                    .filter(|k| k.as_str() != worker_id)
806                    .cloned()
807                    .collect();
808                for sibling_id in siblings {
809                    let mut workers = self.workers.write();
810                    if let Some(sibling) = workers.get_mut(&sibling_id) {
811                        if sibling.status == WorkerStatus::Running {
812                            sibling.status = WorkerStatus::Restarting;
813                            sibling.restart_count += 1;
814                            sibling.last_restart = Some(SystemTime::now());
815                        }
816                    }
817                }
818            }
819        }
820
821        Ok(new_status)
822    }
823
824    /// Acknowledges that a worker has successfully restarted.
825    pub fn acknowledge_restart(&self, worker_id: &str) -> FaultResult<()> {
826        let mut workers = self.workers.write();
827        let worker = workers.get_mut(worker_id).ok_or_else(|| {
828            FaultToleranceError::SupervisorRestartFailed {
829                worker_id: worker_id.to_string(),
830                attempts: 0,
831            }
832        })?;
833        worker.status = WorkerStatus::Running;
834        info!("Worker {} successfully restarted", worker_id);
835        Ok(())
836    }
837
838    /// Stops a worker intentionally.
839    pub fn stop_worker(&self, worker_id: &str) -> FaultResult<()> {
840        let mut workers = self.workers.write();
841        let worker = workers.get_mut(worker_id).ok_or_else(|| {
842            FaultToleranceError::SupervisorRestartFailed {
843                worker_id: worker_id.to_string(),
844                attempts: 0,
845            }
846        })?;
847        worker.status = WorkerStatus::Stopped;
848        info!("Worker {} stopped", worker_id);
849        Ok(())
850    }
851
852    /// Returns the current status of a worker.
853    pub fn worker_status(&self, worker_id: &str) -> Option<WorkerStatus> {
854        self.workers.read().get(worker_id).map(|w| w.status.clone())
855    }
856
857    /// Returns all workers whose status matches the given status.
858    pub fn workers_with_status(&self, status: &WorkerStatus) -> Vec<String> {
859        self.workers
860            .read()
861            .values()
862            .filter(|w| &w.status == status)
863            .map(|w| w.worker_id.clone())
864            .collect()
865    }
866
867    /// Returns supervisor statistics.
868    pub fn stats(&self) -> SupervisorStats {
869        let workers = self.workers.read();
870        let running_workers = workers
871            .values()
872            .filter(|w| w.status == WorkerStatus::Running)
873            .count();
874        let exhausted_workers = workers
875            .values()
876            .filter(|w| w.status == WorkerStatus::Exhausted)
877            .count();
878        SupervisorStats {
879            total_workers: workers.len(),
880            running_workers,
881            exhausted_workers,
882            total_restarts: *self.total_restarts.read(),
883            restart_history_len: self.restart_history.read().len(),
884        }
885    }
886
887    /// Returns the full restart history.
888    pub fn restart_history(&self) -> Vec<RestartRecord> {
889        self.restart_history.read().clone()
890    }
891}
892
893// ─── Tests ───────────────────────────────────────────────────────────────────
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    // ── StreamHealthMonitor tests ────────────────────────────────────────────
900
901    #[test]
902    fn test_health_monitor_healthy_state() {
903        let config = HealthMonitorConfig::default();
904        let monitor = StreamHealthMonitor::new(config);
905        monitor.record_metric("error_rate", 0.001);
906        monitor.record_metric("latency_p99_ms", 50.0);
907        monitor.record_metric("backpressure_ratio", 0.1);
908
909        let snap = monitor.snapshot();
910        assert_eq!(snap.status, StreamHealthStatus::Healthy);
911        assert!(snap.active_alerts.is_empty());
912    }
913
914    #[test]
915    fn test_health_monitor_warning_alert() {
916        let config = HealthMonitorConfig::default();
917        let monitor = StreamHealthMonitor::new(config);
918
919        let alerts = monitor.record_metric("error_rate", 0.02);
920        assert_eq!(alerts.len(), 1);
921        assert_eq!(alerts[0].severity, HealthAlertSeverity::Warning);
922
923        let snap = monitor.snapshot();
924        assert_eq!(snap.status, StreamHealthStatus::Degraded);
925    }
926
927    #[test]
928    fn test_health_monitor_critical_alert() {
929        let config = HealthMonitorConfig::default();
930        let monitor = StreamHealthMonitor::new(config);
931
932        let alerts = monitor.record_metric("error_rate", 0.10);
933        assert_eq!(alerts.len(), 1);
934        assert_eq!(alerts[0].severity, HealthAlertSeverity::Critical);
935
936        let snap = monitor.snapshot();
937        assert_eq!(snap.status, StreamHealthStatus::Critical);
938    }
939
940    #[test]
941    fn test_health_monitor_recovery() {
942        let config = HealthMonitorConfig::default();
943        let monitor = StreamHealthMonitor::new(config);
944
945        monitor.record_metric("error_rate", 0.10); // critical
946        let snap = monitor.snapshot();
947        assert_eq!(snap.status, StreamHealthStatus::Critical);
948
949        monitor.record_metric("error_rate", 0.001); // recovered
950        let snap = monitor.snapshot();
951        assert!(snap.active_alerts.is_empty());
952    }
953
954    #[test]
955    fn test_health_monitor_total_alerts_count() {
956        let config = HealthMonitorConfig::default();
957        let monitor = StreamHealthMonitor::new(config);
958        monitor.record_metric("error_rate", 0.02);
959        monitor.record_metric("latency_p99_ms", 200.0);
960        assert_eq!(monitor.total_alerts_raised(), 2);
961    }
962
963    // ── BulkheadIsolator tests ───────────────────────────────────────────────
964
965    #[test]
966    fn test_bulkhead_acquire_and_release() {
967        let mut config = BulkheadConfig::default();
968        config.compartment_capacities.insert("test".to_string(), 2);
969        let isolator = BulkheadIsolator::new(config);
970
971        let p1 = isolator
972            .acquire("test")
973            .expect("first permit should succeed");
974        let p2 = isolator
975            .acquire("test")
976            .expect("second permit should succeed");
977
978        let result = isolator.acquire("test");
979        assert!(
980            matches!(result, Err(FaultToleranceError::BulkheadFull { .. })),
981            "third permit should be rejected"
982        );
983
984        let stats = isolator
985            .compartment_stats("test")
986            .expect("stats should exist");
987        assert_eq!(stats.active, 2);
988        assert_eq!(stats.rejected, 1);
989
990        drop(p1);
991        drop(p2);
992
993        let stats = isolator
994            .compartment_stats("test")
995            .expect("stats should exist");
996        assert_eq!(stats.active, 0);
997    }
998
999    #[test]
1000    fn test_bulkhead_auto_creates_compartment() {
1001        let config = BulkheadConfig {
1002            compartment_capacities: HashMap::new(),
1003            default_capacity: 5,
1004        };
1005        let isolator = BulkheadIsolator::new(config);
1006        let permit = isolator
1007            .acquire("new-compartment")
1008            .expect("should succeed with default capacity");
1009        drop(permit);
1010    }
1011
1012    #[test]
1013    fn test_bulkhead_different_compartments_isolated() {
1014        let mut config = BulkheadConfig::default();
1015        config.compartment_capacities.insert("a".to_string(), 1);
1016        config.compartment_capacities.insert("b".to_string(), 1);
1017        let isolator = BulkheadIsolator::new(config);
1018
1019        let _pa = isolator.acquire("a").expect("a should succeed");
1020        // a is full
1021        let result_a = isolator.acquire("a");
1022        assert!(matches!(
1023            result_a,
1024            Err(FaultToleranceError::BulkheadFull { .. })
1025        ));
1026
1027        // b should still be available
1028        let _pb = isolator.acquire("b").expect("b should be independent");
1029    }
1030
1031    // ── StreamRetryPolicy tests ──────────────────────────────────────────────
1032
1033    #[test]
1034    fn test_retry_policy_delay_increases() {
1035        let policy = StreamRetryPolicy {
1036            max_attempts: 5,
1037            initial_delay: Duration::from_millis(100),
1038            backoff_multiplier: 2.0,
1039            max_delay: Duration::from_secs(60),
1040            jitter: false,
1041        };
1042        let d0 = policy.delay_for_attempt(0);
1043        let d1 = policy.delay_for_attempt(1);
1044        let d2 = policy.delay_for_attempt(2);
1045        assert!(d0 < d1, "delay should increase");
1046        assert!(d1 < d2, "delay should increase");
1047    }
1048
1049    #[test]
1050    fn test_retry_policy_max_delay_cap() {
1051        let policy = StreamRetryPolicy {
1052            max_attempts: 10,
1053            initial_delay: Duration::from_millis(100),
1054            backoff_multiplier: 10.0,
1055            max_delay: Duration::from_millis(500),
1056            jitter: false,
1057        };
1058        let d = policy.delay_for_attempt(5);
1059        assert!(
1060            d <= Duration::from_millis(500) + Duration::from_millis(10),
1061            "delay should not exceed max"
1062        );
1063    }
1064
1065    #[test]
1066    fn test_retry_succeeds_on_first_attempt() {
1067        let policy = StreamRetryPolicy {
1068            max_attempts: 3,
1069            initial_delay: Duration::from_millis(1),
1070            backoff_multiplier: 2.0,
1071            max_delay: Duration::from_secs(1),
1072            jitter: false,
1073        };
1074        let result: FaultResult<i32> = policy.retry("test-op", || Ok::<i32, &str>(42));
1075        assert!(matches!(result, Ok(42)));
1076    }
1077
1078    #[test]
1079    fn test_retry_exhausts_attempts() {
1080        let policy = StreamRetryPolicy {
1081            max_attempts: 2,
1082            initial_delay: Duration::from_millis(1),
1083            backoff_multiplier: 1.0,
1084            max_delay: Duration::from_millis(5),
1085            jitter: false,
1086        };
1087        let mut calls = 0u32;
1088        let result: FaultResult<i32> = policy.retry("always-fail", || {
1089            calls += 1;
1090            Err::<i32, &str>("always fails")
1091        });
1092        assert!(matches!(
1093            result,
1094            Err(FaultToleranceError::MaxRetriesExceeded { .. })
1095        ));
1096        // Called max_attempts + 1 = 3 times
1097        assert_eq!(calls, 3);
1098    }
1099
1100    #[test]
1101    fn test_retry_succeeds_after_failures() {
1102        let policy = StreamRetryPolicy {
1103            max_attempts: 5,
1104            initial_delay: Duration::from_millis(1),
1105            backoff_multiplier: 1.0,
1106            max_delay: Duration::from_millis(10),
1107            jitter: false,
1108        };
1109        let mut calls = 0u32;
1110        let result: FaultResult<i32> = policy.retry("eventually-succeeds", || {
1111            calls += 1;
1112            if calls < 3 {
1113                Err::<i32, &str>("not yet")
1114            } else {
1115                Ok(99)
1116            }
1117        });
1118        assert!(matches!(result, Ok(99)));
1119        assert_eq!(calls, 3);
1120    }
1121
1122    #[tokio::test]
1123    async fn test_retry_async_succeeds() {
1124        let policy = StreamRetryPolicy {
1125            max_attempts: 3,
1126            initial_delay: Duration::from_millis(1),
1127            backoff_multiplier: 1.0,
1128            max_delay: Duration::from_millis(5),
1129            jitter: false,
1130        };
1131        let calls = Arc::new(RwLock::new(0u32));
1132        let calls_clone = Arc::clone(&calls);
1133        let result: FaultResult<i32> = policy
1134            .retry_async("async-op", move || {
1135                let c = Arc::clone(&calls_clone);
1136                async move {
1137                    let mut lock = c.write();
1138                    *lock += 1;
1139                    let v = *lock;
1140                    drop(lock);
1141                    if v < 2 {
1142                        Err::<i32, &str>("not ready")
1143                    } else {
1144                        Ok(7)
1145                    }
1146                }
1147            })
1148            .await;
1149        assert!(matches!(result, Ok(7)));
1150    }
1151
1152    // ── StreamSupervisor tests ───────────────────────────────────────────────
1153
1154    #[test]
1155    fn test_supervisor_register_and_failure_restart() {
1156        let config = SupervisorConfig::default();
1157        let supervisor = StreamSupervisor::new(config);
1158        supervisor.register_worker("worker-1");
1159
1160        let status = supervisor
1161            .report_failure("worker-1", "connection lost")
1162            .expect("should handle failure");
1163        assert_eq!(status, WorkerStatus::Restarting);
1164
1165        supervisor
1166            .acknowledge_restart("worker-1")
1167            .expect("ack should succeed");
1168        assert_eq!(
1169            supervisor.worker_status("worker-1"),
1170            Some(WorkerStatus::Running)
1171        );
1172    }
1173
1174    #[test]
1175    fn test_supervisor_exhausted_after_max_restarts() {
1176        let config = SupervisorConfig {
1177            max_restarts: 2,
1178            ..Default::default()
1179        };
1180        let supervisor = StreamSupervisor::new(config);
1181        supervisor.register_worker("worker-x");
1182
1183        for _ in 0..2 {
1184            let status = supervisor
1185                .report_failure("worker-x", "crash")
1186                .expect("failure should be handled");
1187            if status == WorkerStatus::Restarting {
1188                supervisor.acknowledge_restart("worker-x").ok();
1189            }
1190        }
1191
1192        let final_status = supervisor
1193            .report_failure("worker-x", "final crash")
1194            .expect("final failure should be handled");
1195        assert_eq!(final_status, WorkerStatus::Exhausted);
1196
1197        let stats = supervisor.stats();
1198        assert_eq!(stats.exhausted_workers, 1);
1199    }
1200
1201    #[test]
1202    fn test_supervisor_stop_worker() {
1203        let config = SupervisorConfig::default();
1204        let supervisor = StreamSupervisor::new(config);
1205        supervisor.register_worker("w1");
1206        supervisor.stop_worker("w1").expect("stop should succeed");
1207        assert_eq!(supervisor.worker_status("w1"), Some(WorkerStatus::Stopped));
1208    }
1209
1210    #[test]
1211    fn test_supervisor_one_for_all() {
1212        let config = SupervisorConfig {
1213            max_restarts: 5,
1214            one_for_all: true,
1215            ..Default::default()
1216        };
1217        let supervisor = StreamSupervisor::new(config);
1218        supervisor.register_worker("w1");
1219        supervisor.register_worker("w2");
1220        supervisor.register_worker("w3");
1221
1222        supervisor
1223            .report_failure("w1", "cascade test")
1224            .expect("failure should be handled");
1225
1226        // w2 and w3 should also be restarting due to one-for-all
1227        let restarting = supervisor.workers_with_status(&WorkerStatus::Restarting);
1228        // w1 + at least w2 and w3
1229        assert!(
1230            restarting.len() >= 2,
1231            "siblings should also restart: {:?}",
1232            restarting
1233        );
1234    }
1235
1236    #[test]
1237    fn test_supervisor_restart_history() {
1238        let config = SupervisorConfig::default();
1239        let supervisor = StreamSupervisor::new(config);
1240        supervisor.register_worker("wh");
1241
1242        supervisor.report_failure("wh", "reason-1").ok();
1243        supervisor.acknowledge_restart("wh").ok();
1244        supervisor.report_failure("wh", "reason-2").ok();
1245
1246        let history = supervisor.restart_history();
1247        assert!(history.len() >= 2);
1248        assert_eq!(history[0].reason, "reason-1");
1249    }
1250
1251    #[test]
1252    fn test_supervisor_stats() {
1253        let config = SupervisorConfig::default();
1254        let supervisor = StreamSupervisor::new(config);
1255        supervisor.register_worker("s1");
1256        supervisor.register_worker("s2");
1257
1258        supervisor.report_failure("s1", "err").ok();
1259        supervisor.acknowledge_restart("s1").ok();
1260
1261        let stats = supervisor.stats();
1262        assert_eq!(stats.total_workers, 2);
1263        assert_eq!(stats.running_workers, 2); // s1 restarted, s2 never failed
1264        assert_eq!(stats.total_restarts, 1);
1265    }
1266}