1pub mod checkpoint_recovery;
15pub use checkpoint_recovery::*;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant, SystemTime};
20
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use tracing::{debug, info, warn};
25
26#[derive(Error, Debug, Clone)]
30pub enum FaultToleranceError {
31 #[error("Bulkhead full: compartment {compartment} has reached capacity {capacity}")]
32 BulkheadFull {
33 compartment: String,
34 capacity: usize,
35 },
36
37 #[error("Max retries exceeded: {attempts} attempts for operation {operation}")]
38 MaxRetriesExceeded { attempts: u32, operation: String },
39
40 #[error("Worker {worker_id} failed to restart after {attempts} attempts")]
41 SupervisorRestartFailed { worker_id: String, attempts: u32 },
42
43 #[error("Health check failed: metric {metric} value {value} exceeds threshold {threshold}")]
44 HealthCheckFailed {
45 metric: String,
46 value: f64,
47 threshold: f64,
48 },
49
50 #[error("Operation timeout after {elapsed_ms}ms (limit {timeout_ms}ms)")]
51 OperationTimeout { elapsed_ms: u64, timeout_ms: u64 },
52}
53
54pub type FaultResult<T> = Result<T, FaultToleranceError>;
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct HealthThreshold {
62 pub metric_name: String,
64 pub warn_threshold: f64,
66 pub critical_threshold: f64,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub enum HealthAlertSeverity {
73 Warning,
75 Critical,
77 Recovered,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct HealthAlert {
84 pub metric_name: String,
86 pub current_value: f64,
88 pub threshold: f64,
90 pub severity: HealthAlertSeverity,
92 pub raised_at: SystemTime,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub enum StreamHealthStatus {
99 Healthy,
101 Degraded,
103 Critical,
105 Unknown,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct HealthSnapshot {
112 pub status: StreamHealthStatus,
114 pub metrics: HashMap<String, f64>,
116 pub active_alerts: Vec<HealthAlert>,
118 pub snapshot_time: SystemTime,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HealthMonitorConfig {
125 pub thresholds: Vec<HealthThreshold>,
127 pub metric_staleness: Duration,
129 pub check_interval: Duration,
131}
132
133impl Default for HealthMonitorConfig {
134 fn default() -> Self {
135 Self {
136 thresholds: vec![
137 HealthThreshold {
138 metric_name: "error_rate".to_string(),
139 warn_threshold: 0.01,
140 critical_threshold: 0.05,
141 },
142 HealthThreshold {
143 metric_name: "latency_p99_ms".to_string(),
144 warn_threshold: 100.0,
145 critical_threshold: 500.0,
146 },
147 HealthThreshold {
148 metric_name: "backpressure_ratio".to_string(),
149 warn_threshold: 0.5,
150 critical_threshold: 0.9,
151 },
152 ],
153 metric_staleness: Duration::from_secs(60),
154 check_interval: Duration::from_secs(5),
155 }
156 }
157}
158
159pub struct StreamHealthMonitor {
164 config: HealthMonitorConfig,
165 metrics: Arc<RwLock<HashMap<String, (f64, Instant)>>>,
167 active_alerts: Arc<RwLock<Vec<HealthAlert>>>,
169 alert_history: Arc<RwLock<Vec<HealthAlert>>>,
171 total_alerts_raised: Arc<RwLock<u64>>,
173}
174
175impl StreamHealthMonitor {
176 pub fn new(config: HealthMonitorConfig) -> Self {
178 Self {
179 config,
180 metrics: Arc::new(RwLock::new(HashMap::new())),
181 active_alerts: Arc::new(RwLock::new(Vec::new())),
182 alert_history: Arc::new(RwLock::new(Vec::new())),
183 total_alerts_raised: Arc::new(RwLock::new(0)),
184 }
185 }
186
187 pub fn record_metric(&self, metric_name: &str, value: f64) -> Vec<HealthAlert> {
191 self.metrics
192 .write()
193 .insert(metric_name.to_string(), (value, Instant::now()));
194 self.evaluate_thresholds(metric_name, value)
195 }
196
197 pub fn snapshot(&self) -> HealthSnapshot {
199 let metrics = self.metrics.read();
200 let now = Instant::now();
201 let stale_limit = self.config.metric_staleness;
202
203 let all_fresh = metrics
205 .values()
206 .all(|(_, ts)| now.duration_since(*ts) < stale_limit);
207
208 let metric_values: HashMap<String, f64> =
209 metrics.iter().map(|(k, (v, _))| (k.clone(), *v)).collect();
210
211 let active_alerts = self.active_alerts.read().clone();
212
213 let status = if !all_fresh || metric_values.is_empty() {
214 StreamHealthStatus::Unknown
215 } else if active_alerts
216 .iter()
217 .any(|a| a.severity == HealthAlertSeverity::Critical)
218 {
219 StreamHealthStatus::Critical
220 } else if active_alerts
221 .iter()
222 .any(|a| a.severity == HealthAlertSeverity::Warning)
223 {
224 StreamHealthStatus::Degraded
225 } else {
226 StreamHealthStatus::Healthy
227 };
228
229 HealthSnapshot {
230 status,
231 metrics: metric_values,
232 active_alerts,
233 snapshot_time: SystemTime::now(),
234 }
235 }
236
237 pub fn current_metric(&self, name: &str) -> Option<f64> {
239 self.metrics.read().get(name).map(|(v, _)| *v)
240 }
241
242 pub fn total_alerts_raised(&self) -> u64 {
244 *self.total_alerts_raised.read()
245 }
246
247 fn evaluate_thresholds(&self, metric_name: &str, value: f64) -> Vec<HealthAlert> {
248 let mut new_alerts = Vec::new();
249 let thresholds = self.config.thresholds.clone();
250
251 for threshold in &thresholds {
252 if threshold.metric_name != metric_name {
253 continue;
254 }
255 let severity = if value >= threshold.critical_threshold {
256 Some(HealthAlertSeverity::Critical)
257 } else if value >= threshold.warn_threshold {
258 Some(HealthAlertSeverity::Warning)
259 } else {
260 let mut active = self.active_alerts.write();
262 active.retain(|a| a.metric_name != metric_name);
263 None
264 };
265
266 if let Some(sev) = severity {
267 let threshold_val = if sev == HealthAlertSeverity::Critical {
268 threshold.critical_threshold
269 } else {
270 threshold.warn_threshold
271 };
272 let alert = HealthAlert {
273 metric_name: metric_name.to_string(),
274 current_value: value,
275 threshold: threshold_val,
276 severity: sev,
277 raised_at: SystemTime::now(),
278 };
279 let mut active = self.active_alerts.write();
281 active.retain(|a| a.metric_name != metric_name);
282 active.push(alert.clone());
283 drop(active);
284
285 let mut history = self.alert_history.write();
287 if history.len() >= 1000 {
288 history.remove(0);
289 }
290 history.push(alert.clone());
291
292 *self.total_alerts_raised.write() += 1;
293 new_alerts.push(alert);
294 debug!("Health alert raised for metric {}: {}", metric_name, value);
295 }
296 }
297 new_alerts
298 }
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct CompartmentStats {
306 pub compartment_id: String,
308 pub capacity: usize,
310 pub active: usize,
312 pub rejected: u64,
314 pub accepted: u64,
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct BulkheadConfig {
321 pub compartment_capacities: HashMap<String, usize>,
323 pub default_capacity: usize,
325}
326
327impl Default for BulkheadConfig {
328 fn default() -> Self {
329 let mut compartments = HashMap::new();
330 compartments.insert("critical".to_string(), 100);
331 compartments.insert("standard".to_string(), 50);
332 compartments.insert("background".to_string(), 20);
333 Self {
334 compartment_capacities: compartments,
335 default_capacity: 30,
336 }
337 }
338}
339
340pub struct BulkheadPermit {
342 compartment_id: String,
343 active_counter: Arc<RwLock<usize>>,
344}
345
346impl Drop for BulkheadPermit {
347 fn drop(&mut self) {
348 let mut active = self.active_counter.write();
349 if *active > 0 {
350 *active -= 1;
351 }
352 debug!(
353 "Bulkhead permit released for compartment {}",
354 self.compartment_id
355 );
356 }
357}
358
359struct Compartment {
361 capacity: usize,
362 active: Arc<RwLock<usize>>,
363 rejected: Arc<RwLock<u64>>,
364 accepted: Arc<RwLock<u64>>,
365}
366
367pub struct BulkheadIsolator {
372 compartments: Arc<RwLock<HashMap<String, Compartment>>>,
373 default_capacity: usize,
374}
375
376impl BulkheadIsolator {
377 pub fn new(config: BulkheadConfig) -> Self {
379 let mut compartments = HashMap::new();
380 for (id, capacity) in &config.compartment_capacities {
381 compartments.insert(
382 id.clone(),
383 Compartment {
384 capacity: *capacity,
385 active: Arc::new(RwLock::new(0)),
386 rejected: Arc::new(RwLock::new(0)),
387 accepted: Arc::new(RwLock::new(0)),
388 },
389 );
390 }
391 Self {
392 compartments: Arc::new(RwLock::new(compartments)),
393 default_capacity: config.default_capacity,
394 }
395 }
396
397 pub fn acquire(&self, compartment_id: &str) -> FaultResult<BulkheadPermit> {
401 let mut compartments = self.compartments.write();
402 let compartment = compartments
404 .entry(compartment_id.to_string())
405 .or_insert_with(|| Compartment {
406 capacity: self.default_capacity,
407 active: Arc::new(RwLock::new(0)),
408 rejected: Arc::new(RwLock::new(0)),
409 accepted: Arc::new(RwLock::new(0)),
410 });
411
412 let current = *compartment.active.read();
413 if current >= compartment.capacity {
414 *compartment.rejected.write() += 1;
415 return Err(FaultToleranceError::BulkheadFull {
416 compartment: compartment_id.to_string(),
417 capacity: compartment.capacity,
418 });
419 }
420 *compartment.active.write() += 1;
421 *compartment.accepted.write() += 1;
422 debug!(
423 "Bulkhead permit acquired for compartment {} ({}/{})",
424 compartment_id,
425 current + 1,
426 compartment.capacity
427 );
428
429 Ok(BulkheadPermit {
430 compartment_id: compartment_id.to_string(),
431 active_counter: Arc::clone(&compartment.active),
432 })
433 }
434
435 pub fn stats(&self) -> Vec<CompartmentStats> {
437 self.compartments
438 .read()
439 .iter()
440 .map(|(id, c)| CompartmentStats {
441 compartment_id: id.clone(),
442 capacity: c.capacity,
443 active: *c.active.read(),
444 rejected: *c.rejected.read(),
445 accepted: *c.accepted.read(),
446 })
447 .collect()
448 }
449
450 pub fn compartment_stats(&self, compartment_id: &str) -> Option<CompartmentStats> {
452 self.compartments
453 .read()
454 .get(compartment_id)
455 .map(|c| CompartmentStats {
456 compartment_id: compartment_id.to_string(),
457 capacity: c.capacity,
458 active: *c.active.read(),
459 rejected: *c.rejected.read(),
460 accepted: *c.accepted.read(),
461 })
462 }
463}
464
465#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct StreamRetryPolicy {
470 pub max_attempts: u32,
472 pub initial_delay: Duration,
474 pub backoff_multiplier: f64,
476 pub max_delay: Duration,
478 pub jitter: bool,
480}
481
482impl Default for StreamRetryPolicy {
483 fn default() -> Self {
484 Self {
485 max_attempts: 3,
486 initial_delay: Duration::from_millis(100),
487 backoff_multiplier: 2.0,
488 max_delay: Duration::from_secs(30),
489 jitter: true,
490 }
491 }
492}
493
494impl StreamRetryPolicy {
495 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
500 let factor = self.backoff_multiplier.powi(attempt as i32);
501 let base_ms = self.initial_delay.as_millis() as f64 * factor;
502 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
503
504 let jitter_ms = if self.jitter {
505 let pseudo = ((attempt as u64)
507 .wrapping_mul(6364136223846793005)
508 .wrapping_add(1))
509 % 1000;
510 let ratio = pseudo as f64 / 4000.0; capped_ms * ratio
512 } else {
513 0.0
514 };
515
516 Duration::from_millis((capped_ms + jitter_ms) as u64)
517 }
518
519 pub fn retry<F, T, E>(&self, operation_name: &str, mut f: F) -> FaultResult<T>
524 where
525 F: FnMut() -> Result<T, E>,
526 E: std::fmt::Debug,
527 {
528 for attempt in 0..=self.max_attempts {
529 match f() {
530 Ok(result) => {
531 if attempt > 0 {
532 info!(
533 "Operation {} succeeded after {} retries",
534 operation_name, attempt
535 );
536 }
537 return Ok(result);
538 }
539 Err(err) => {
540 if attempt >= self.max_attempts {
541 warn!(
542 "Operation {} failed after {} attempts: {:?}",
543 operation_name,
544 attempt + 1,
545 err
546 );
547 return Err(FaultToleranceError::MaxRetriesExceeded {
548 attempts: attempt + 1,
549 operation: operation_name.to_string(),
550 });
551 }
552 let delay = self.delay_for_attempt(attempt);
553 debug!(
554 "Operation {} attempt {} failed, retrying in {:?}",
555 operation_name,
556 attempt + 1,
557 delay
558 );
559 std::thread::sleep(delay);
560 }
561 }
562 }
563 Err(FaultToleranceError::MaxRetriesExceeded {
565 attempts: self.max_attempts + 1,
566 operation: operation_name.to_string(),
567 })
568 }
569
570 pub async fn retry_async<F, Fut, T, E>(&self, operation_name: &str, mut f: F) -> FaultResult<T>
572 where
573 F: FnMut() -> Fut,
574 Fut: std::future::Future<Output = Result<T, E>>,
575 E: std::fmt::Debug,
576 {
577 for attempt in 0..=self.max_attempts {
578 match f().await {
579 Ok(result) => {
580 if attempt > 0 {
581 info!(
582 "Async operation {} succeeded after {} retries",
583 operation_name, attempt
584 );
585 }
586 return Ok(result);
587 }
588 Err(err) => {
589 if attempt >= self.max_attempts {
590 warn!(
591 "Async operation {} failed after {} attempts: {:?}",
592 operation_name,
593 attempt + 1,
594 err
595 );
596 return Err(FaultToleranceError::MaxRetriesExceeded {
597 attempts: attempt + 1,
598 operation: operation_name.to_string(),
599 });
600 }
601 let delay = self.delay_for_attempt(attempt);
602 debug!(
603 "Async operation {} attempt {} failed, retrying in {:?}",
604 operation_name,
605 attempt + 1,
606 delay
607 );
608 tokio::time::sleep(delay).await;
609 }
610 }
611 }
612 Err(FaultToleranceError::MaxRetriesExceeded {
613 attempts: self.max_attempts + 1,
614 operation: operation_name.to_string(),
615 })
616 }
617}
618
619#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
623pub enum WorkerStatus {
624 Running,
626 Failed,
628 Restarting,
630 Stopped,
632 Exhausted,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct RestartRecord {
639 pub worker_id: String,
641 pub attempt: u32,
643 pub reason: String,
645 pub restarted_at: SystemTime,
647 pub success: bool,
649}
650
651#[derive(Debug, Clone)]
653struct WorkerState {
654 worker_id: String,
655 status: WorkerStatus,
656 restart_count: u32,
657 max_restarts: u32,
658 last_failure: Option<SystemTime>,
659 last_restart: Option<SystemTime>,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize)]
664pub struct SupervisorConfig {
665 pub max_restarts: u32,
667 pub restart_policy: StreamRetryPolicy,
669 pub one_for_all: bool,
671}
672
673impl Default for SupervisorConfig {
674 fn default() -> Self {
675 Self {
676 max_restarts: 5,
677 restart_policy: StreamRetryPolicy {
678 max_attempts: 5,
679 initial_delay: Duration::from_millis(500),
680 backoff_multiplier: 2.0,
681 max_delay: Duration::from_secs(60),
682 jitter: true,
683 },
684 one_for_all: false,
685 }
686 }
687}
688
689#[derive(Debug, Clone, Serialize, Deserialize)]
691pub struct SupervisorStats {
692 pub total_workers: usize,
694 pub running_workers: usize,
696 pub exhausted_workers: usize,
698 pub total_restarts: u64,
700 pub restart_history_len: usize,
702}
703
704pub struct StreamSupervisor {
709 config: SupervisorConfig,
710 workers: Arc<RwLock<HashMap<String, WorkerState>>>,
711 restart_history: Arc<RwLock<Vec<RestartRecord>>>,
712 total_restarts: Arc<RwLock<u64>>,
713}
714
715impl StreamSupervisor {
716 pub fn new(config: SupervisorConfig) -> Self {
718 Self {
719 config,
720 workers: Arc::new(RwLock::new(HashMap::new())),
721 restart_history: Arc::new(RwLock::new(Vec::new())),
722 total_restarts: Arc::new(RwLock::new(0)),
723 }
724 }
725
726 pub fn register_worker(&self, worker_id: impl Into<String>) {
728 let id = worker_id.into();
729 self.workers.write().insert(
730 id.clone(),
731 WorkerState {
732 worker_id: id,
733 status: WorkerStatus::Running,
734 restart_count: 0,
735 max_restarts: self.config.max_restarts,
736 last_failure: None,
737 last_restart: None,
738 },
739 );
740 }
741
742 pub fn report_failure(&self, worker_id: &str, reason: &str) -> FaultResult<WorkerStatus> {
749 let new_status = {
750 let mut workers = self.workers.write();
751 let worker = workers.get_mut(worker_id).ok_or_else(|| {
752 FaultToleranceError::SupervisorRestartFailed {
753 worker_id: worker_id.to_string(),
754 attempts: 0,
755 }
756 })?;
757
758 worker.last_failure = Some(SystemTime::now());
759
760 if worker.restart_count >= worker.max_restarts {
761 worker.status = WorkerStatus::Exhausted;
762 warn!(
763 "Worker {} permanently failed after {} restarts",
764 worker_id, worker.restart_count
765 );
766 WorkerStatus::Exhausted
767 } else {
768 worker.status = WorkerStatus::Restarting;
769 worker.restart_count += 1;
770 worker.last_restart = Some(SystemTime::now());
771 WorkerStatus::Restarting
772 }
773 };
774
775 let attempt = self
777 .workers
778 .read()
779 .get(worker_id)
780 .map(|w| w.restart_count)
781 .unwrap_or(0);
782 let record = RestartRecord {
783 worker_id: worker_id.to_string(),
784 attempt,
785 reason: reason.to_string(),
786 restarted_at: SystemTime::now(),
787 success: new_status == WorkerStatus::Restarting,
788 };
789 let mut history = self.restart_history.write();
790 if history.len() >= 10_000 {
791 history.remove(0);
792 }
793 history.push(record);
794
795 if new_status == WorkerStatus::Restarting {
796 *self.total_restarts.write() += 1;
797 info!("Restarting worker {} (attempt {})", worker_id, attempt);
798
799 if self.config.one_for_all {
801 let siblings: Vec<String> = self
802 .workers
803 .read()
804 .keys()
805 .filter(|k| k.as_str() != worker_id)
806 .cloned()
807 .collect();
808 for sibling_id in siblings {
809 let mut workers = self.workers.write();
810 if let Some(sibling) = workers.get_mut(&sibling_id) {
811 if sibling.status == WorkerStatus::Running {
812 sibling.status = WorkerStatus::Restarting;
813 sibling.restart_count += 1;
814 sibling.last_restart = Some(SystemTime::now());
815 }
816 }
817 }
818 }
819 }
820
821 Ok(new_status)
822 }
823
824 pub fn acknowledge_restart(&self, worker_id: &str) -> FaultResult<()> {
826 let mut workers = self.workers.write();
827 let worker = workers.get_mut(worker_id).ok_or_else(|| {
828 FaultToleranceError::SupervisorRestartFailed {
829 worker_id: worker_id.to_string(),
830 attempts: 0,
831 }
832 })?;
833 worker.status = WorkerStatus::Running;
834 info!("Worker {} successfully restarted", worker_id);
835 Ok(())
836 }
837
838 pub fn stop_worker(&self, worker_id: &str) -> FaultResult<()> {
840 let mut workers = self.workers.write();
841 let worker = workers.get_mut(worker_id).ok_or_else(|| {
842 FaultToleranceError::SupervisorRestartFailed {
843 worker_id: worker_id.to_string(),
844 attempts: 0,
845 }
846 })?;
847 worker.status = WorkerStatus::Stopped;
848 info!("Worker {} stopped", worker_id);
849 Ok(())
850 }
851
852 pub fn worker_status(&self, worker_id: &str) -> Option<WorkerStatus> {
854 self.workers.read().get(worker_id).map(|w| w.status.clone())
855 }
856
857 pub fn workers_with_status(&self, status: &WorkerStatus) -> Vec<String> {
859 self.workers
860 .read()
861 .values()
862 .filter(|w| &w.status == status)
863 .map(|w| w.worker_id.clone())
864 .collect()
865 }
866
867 pub fn stats(&self) -> SupervisorStats {
869 let workers = self.workers.read();
870 let running_workers = workers
871 .values()
872 .filter(|w| w.status == WorkerStatus::Running)
873 .count();
874 let exhausted_workers = workers
875 .values()
876 .filter(|w| w.status == WorkerStatus::Exhausted)
877 .count();
878 SupervisorStats {
879 total_workers: workers.len(),
880 running_workers,
881 exhausted_workers,
882 total_restarts: *self.total_restarts.read(),
883 restart_history_len: self.restart_history.read().len(),
884 }
885 }
886
887 pub fn restart_history(&self) -> Vec<RestartRecord> {
889 self.restart_history.read().clone()
890 }
891}
892
893#[cfg(test)]
896mod tests {
897 use super::*;
898
899 #[test]
902 fn test_health_monitor_healthy_state() {
903 let config = HealthMonitorConfig::default();
904 let monitor = StreamHealthMonitor::new(config);
905 monitor.record_metric("error_rate", 0.001);
906 monitor.record_metric("latency_p99_ms", 50.0);
907 monitor.record_metric("backpressure_ratio", 0.1);
908
909 let snap = monitor.snapshot();
910 assert_eq!(snap.status, StreamHealthStatus::Healthy);
911 assert!(snap.active_alerts.is_empty());
912 }
913
914 #[test]
915 fn test_health_monitor_warning_alert() {
916 let config = HealthMonitorConfig::default();
917 let monitor = StreamHealthMonitor::new(config);
918
919 let alerts = monitor.record_metric("error_rate", 0.02);
920 assert_eq!(alerts.len(), 1);
921 assert_eq!(alerts[0].severity, HealthAlertSeverity::Warning);
922
923 let snap = monitor.snapshot();
924 assert_eq!(snap.status, StreamHealthStatus::Degraded);
925 }
926
927 #[test]
928 fn test_health_monitor_critical_alert() {
929 let config = HealthMonitorConfig::default();
930 let monitor = StreamHealthMonitor::new(config);
931
932 let alerts = monitor.record_metric("error_rate", 0.10);
933 assert_eq!(alerts.len(), 1);
934 assert_eq!(alerts[0].severity, HealthAlertSeverity::Critical);
935
936 let snap = monitor.snapshot();
937 assert_eq!(snap.status, StreamHealthStatus::Critical);
938 }
939
940 #[test]
941 fn test_health_monitor_recovery() {
942 let config = HealthMonitorConfig::default();
943 let monitor = StreamHealthMonitor::new(config);
944
945 monitor.record_metric("error_rate", 0.10); let snap = monitor.snapshot();
947 assert_eq!(snap.status, StreamHealthStatus::Critical);
948
949 monitor.record_metric("error_rate", 0.001); let snap = monitor.snapshot();
951 assert!(snap.active_alerts.is_empty());
952 }
953
954 #[test]
955 fn test_health_monitor_total_alerts_count() {
956 let config = HealthMonitorConfig::default();
957 let monitor = StreamHealthMonitor::new(config);
958 monitor.record_metric("error_rate", 0.02);
959 monitor.record_metric("latency_p99_ms", 200.0);
960 assert_eq!(monitor.total_alerts_raised(), 2);
961 }
962
963 #[test]
966 fn test_bulkhead_acquire_and_release() {
967 let mut config = BulkheadConfig::default();
968 config.compartment_capacities.insert("test".to_string(), 2);
969 let isolator = BulkheadIsolator::new(config);
970
971 let p1 = isolator
972 .acquire("test")
973 .expect("first permit should succeed");
974 let p2 = isolator
975 .acquire("test")
976 .expect("second permit should succeed");
977
978 let result = isolator.acquire("test");
979 assert!(
980 matches!(result, Err(FaultToleranceError::BulkheadFull { .. })),
981 "third permit should be rejected"
982 );
983
984 let stats = isolator
985 .compartment_stats("test")
986 .expect("stats should exist");
987 assert_eq!(stats.active, 2);
988 assert_eq!(stats.rejected, 1);
989
990 drop(p1);
991 drop(p2);
992
993 let stats = isolator
994 .compartment_stats("test")
995 .expect("stats should exist");
996 assert_eq!(stats.active, 0);
997 }
998
999 #[test]
1000 fn test_bulkhead_auto_creates_compartment() {
1001 let config = BulkheadConfig {
1002 compartment_capacities: HashMap::new(),
1003 default_capacity: 5,
1004 };
1005 let isolator = BulkheadIsolator::new(config);
1006 let permit = isolator
1007 .acquire("new-compartment")
1008 .expect("should succeed with default capacity");
1009 drop(permit);
1010 }
1011
1012 #[test]
1013 fn test_bulkhead_different_compartments_isolated() {
1014 let mut config = BulkheadConfig::default();
1015 config.compartment_capacities.insert("a".to_string(), 1);
1016 config.compartment_capacities.insert("b".to_string(), 1);
1017 let isolator = BulkheadIsolator::new(config);
1018
1019 let _pa = isolator.acquire("a").expect("a should succeed");
1020 let result_a = isolator.acquire("a");
1022 assert!(matches!(
1023 result_a,
1024 Err(FaultToleranceError::BulkheadFull { .. })
1025 ));
1026
1027 let _pb = isolator.acquire("b").expect("b should be independent");
1029 }
1030
1031 #[test]
1034 fn test_retry_policy_delay_increases() {
1035 let policy = StreamRetryPolicy {
1036 max_attempts: 5,
1037 initial_delay: Duration::from_millis(100),
1038 backoff_multiplier: 2.0,
1039 max_delay: Duration::from_secs(60),
1040 jitter: false,
1041 };
1042 let d0 = policy.delay_for_attempt(0);
1043 let d1 = policy.delay_for_attempt(1);
1044 let d2 = policy.delay_for_attempt(2);
1045 assert!(d0 < d1, "delay should increase");
1046 assert!(d1 < d2, "delay should increase");
1047 }
1048
1049 #[test]
1050 fn test_retry_policy_max_delay_cap() {
1051 let policy = StreamRetryPolicy {
1052 max_attempts: 10,
1053 initial_delay: Duration::from_millis(100),
1054 backoff_multiplier: 10.0,
1055 max_delay: Duration::from_millis(500),
1056 jitter: false,
1057 };
1058 let d = policy.delay_for_attempt(5);
1059 assert!(
1060 d <= Duration::from_millis(500) + Duration::from_millis(10),
1061 "delay should not exceed max"
1062 );
1063 }
1064
1065 #[test]
1066 fn test_retry_succeeds_on_first_attempt() {
1067 let policy = StreamRetryPolicy {
1068 max_attempts: 3,
1069 initial_delay: Duration::from_millis(1),
1070 backoff_multiplier: 2.0,
1071 max_delay: Duration::from_secs(1),
1072 jitter: false,
1073 };
1074 let result: FaultResult<i32> = policy.retry("test-op", || Ok::<i32, &str>(42));
1075 assert!(matches!(result, Ok(42)));
1076 }
1077
1078 #[test]
1079 fn test_retry_exhausts_attempts() {
1080 let policy = StreamRetryPolicy {
1081 max_attempts: 2,
1082 initial_delay: Duration::from_millis(1),
1083 backoff_multiplier: 1.0,
1084 max_delay: Duration::from_millis(5),
1085 jitter: false,
1086 };
1087 let mut calls = 0u32;
1088 let result: FaultResult<i32> = policy.retry("always-fail", || {
1089 calls += 1;
1090 Err::<i32, &str>("always fails")
1091 });
1092 assert!(matches!(
1093 result,
1094 Err(FaultToleranceError::MaxRetriesExceeded { .. })
1095 ));
1096 assert_eq!(calls, 3);
1098 }
1099
1100 #[test]
1101 fn test_retry_succeeds_after_failures() {
1102 let policy = StreamRetryPolicy {
1103 max_attempts: 5,
1104 initial_delay: Duration::from_millis(1),
1105 backoff_multiplier: 1.0,
1106 max_delay: Duration::from_millis(10),
1107 jitter: false,
1108 };
1109 let mut calls = 0u32;
1110 let result: FaultResult<i32> = policy.retry("eventually-succeeds", || {
1111 calls += 1;
1112 if calls < 3 {
1113 Err::<i32, &str>("not yet")
1114 } else {
1115 Ok(99)
1116 }
1117 });
1118 assert!(matches!(result, Ok(99)));
1119 assert_eq!(calls, 3);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_retry_async_succeeds() {
1124 let policy = StreamRetryPolicy {
1125 max_attempts: 3,
1126 initial_delay: Duration::from_millis(1),
1127 backoff_multiplier: 1.0,
1128 max_delay: Duration::from_millis(5),
1129 jitter: false,
1130 };
1131 let calls = Arc::new(RwLock::new(0u32));
1132 let calls_clone = Arc::clone(&calls);
1133 let result: FaultResult<i32> = policy
1134 .retry_async("async-op", move || {
1135 let c = Arc::clone(&calls_clone);
1136 async move {
1137 let mut lock = c.write();
1138 *lock += 1;
1139 let v = *lock;
1140 drop(lock);
1141 if v < 2 {
1142 Err::<i32, &str>("not ready")
1143 } else {
1144 Ok(7)
1145 }
1146 }
1147 })
1148 .await;
1149 assert!(matches!(result, Ok(7)));
1150 }
1151
1152 #[test]
1155 fn test_supervisor_register_and_failure_restart() {
1156 let config = SupervisorConfig::default();
1157 let supervisor = StreamSupervisor::new(config);
1158 supervisor.register_worker("worker-1");
1159
1160 let status = supervisor
1161 .report_failure("worker-1", "connection lost")
1162 .expect("should handle failure");
1163 assert_eq!(status, WorkerStatus::Restarting);
1164
1165 supervisor
1166 .acknowledge_restart("worker-1")
1167 .expect("ack should succeed");
1168 assert_eq!(
1169 supervisor.worker_status("worker-1"),
1170 Some(WorkerStatus::Running)
1171 );
1172 }
1173
1174 #[test]
1175 fn test_supervisor_exhausted_after_max_restarts() {
1176 let config = SupervisorConfig {
1177 max_restarts: 2,
1178 ..Default::default()
1179 };
1180 let supervisor = StreamSupervisor::new(config);
1181 supervisor.register_worker("worker-x");
1182
1183 for _ in 0..2 {
1184 let status = supervisor
1185 .report_failure("worker-x", "crash")
1186 .expect("failure should be handled");
1187 if status == WorkerStatus::Restarting {
1188 supervisor.acknowledge_restart("worker-x").ok();
1189 }
1190 }
1191
1192 let final_status = supervisor
1193 .report_failure("worker-x", "final crash")
1194 .expect("final failure should be handled");
1195 assert_eq!(final_status, WorkerStatus::Exhausted);
1196
1197 let stats = supervisor.stats();
1198 assert_eq!(stats.exhausted_workers, 1);
1199 }
1200
1201 #[test]
1202 fn test_supervisor_stop_worker() {
1203 let config = SupervisorConfig::default();
1204 let supervisor = StreamSupervisor::new(config);
1205 supervisor.register_worker("w1");
1206 supervisor.stop_worker("w1").expect("stop should succeed");
1207 assert_eq!(supervisor.worker_status("w1"), Some(WorkerStatus::Stopped));
1208 }
1209
1210 #[test]
1211 fn test_supervisor_one_for_all() {
1212 let config = SupervisorConfig {
1213 max_restarts: 5,
1214 one_for_all: true,
1215 ..Default::default()
1216 };
1217 let supervisor = StreamSupervisor::new(config);
1218 supervisor.register_worker("w1");
1219 supervisor.register_worker("w2");
1220 supervisor.register_worker("w3");
1221
1222 supervisor
1223 .report_failure("w1", "cascade test")
1224 .expect("failure should be handled");
1225
1226 let restarting = supervisor.workers_with_status(&WorkerStatus::Restarting);
1228 assert!(
1230 restarting.len() >= 2,
1231 "siblings should also restart: {:?}",
1232 restarting
1233 );
1234 }
1235
1236 #[test]
1237 fn test_supervisor_restart_history() {
1238 let config = SupervisorConfig::default();
1239 let supervisor = StreamSupervisor::new(config);
1240 supervisor.register_worker("wh");
1241
1242 supervisor.report_failure("wh", "reason-1").ok();
1243 supervisor.acknowledge_restart("wh").ok();
1244 supervisor.report_failure("wh", "reason-2").ok();
1245
1246 let history = supervisor.restart_history();
1247 assert!(history.len() >= 2);
1248 assert_eq!(history[0].reason, "reason-1");
1249 }
1250
1251 #[test]
1252 fn test_supervisor_stats() {
1253 let config = SupervisorConfig::default();
1254 let supervisor = StreamSupervisor::new(config);
1255 supervisor.register_worker("s1");
1256 supervisor.register_worker("s2");
1257
1258 supervisor.report_failure("s1", "err").ok();
1259 supervisor.acknowledge_restart("s1").ok();
1260
1261 let stats = supervisor.stats();
1262 assert_eq!(stats.total_workers, 2);
1263 assert_eq!(stats.running_workers, 2); assert_eq!(stats.total_restarts, 1);
1265 }
1266}