1use serde::{Deserialize, Serialize};
24use std::collections::{HashMap, VecDeque};
25use std::sync::RwLock;
26
27const MAX_TRACKED_PIPELINES: usize = 10_000;
33
34const MAX_EVENTS_PER_PIPELINE: usize = 10_000;
36
37const MAX_TRACKED_CHAINS: usize = 50_000;
39
40const MAX_PIPELINE_ID_LEN: usize = 512;
42
43const MAX_CHAIN_ID_LEN: usize = 512;
45
46const ABSOLUTE_MAX_CHAIN_DEPTH: u32 = 100;
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(deny_unknown_fields)]
56pub struct CascadingConfig {
57 #[serde(default = "default_enabled")]
60 pub enabled: bool,
61
62 #[serde(default = "default_max_chain_depth")]
66 pub max_chain_depth: u32,
67
68 #[serde(default = "default_error_rate_threshold")]
72 pub error_rate_threshold: f64,
73
74 #[serde(default = "default_window_secs")]
77 pub window_secs: u64,
78
79 #[serde(default = "default_min_window_events")]
83 pub min_window_events: u32,
84
85 #[serde(default = "default_break_duration_secs")]
88 pub break_duration_secs: u64,
89}
90
91fn default_enabled() -> bool {
92 true
93}
94fn default_max_chain_depth() -> u32 {
95 10
96}
97fn default_error_rate_threshold() -> f64 {
98 0.5
99}
100fn default_window_secs() -> u64 {
101 300
102}
103fn default_min_window_events() -> u32 {
104 10
105}
106fn default_break_duration_secs() -> u64 {
107 60
108}
109
110impl Default for CascadingConfig {
111 fn default() -> Self {
112 Self {
113 enabled: default_enabled(),
114 max_chain_depth: default_max_chain_depth(),
115 error_rate_threshold: default_error_rate_threshold(),
116 window_secs: default_window_secs(),
117 min_window_events: default_min_window_events(),
118 break_duration_secs: default_break_duration_secs(),
119 }
120 }
121}
122
123#[derive(Debug, Clone, PartialEq)]
129pub enum CascadingError {
130 InvalidConfig(String),
132 LockPoisoned(String),
134 InvalidInput(String),
136 ChainDepthExceeded { current: u32, max: u32 },
138 PipelineBroken {
140 pipeline_id: String,
141 error_rate: f64,
142 },
143}
144
145impl std::fmt::Display for CascadingError {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 match self {
148 CascadingError::InvalidConfig(msg) => write!(f, "invalid cascading config: {msg}"),
149 CascadingError::LockPoisoned(msg) => {
150 write!(f, "cascading breaker lock poisoned (fail-closed): {msg}")
151 }
152 CascadingError::InvalidInput(msg) => {
153 write!(f, "cascading breaker input validation failed: {msg}")
154 }
155 CascadingError::ChainDepthExceeded { current, max } => {
156 write!(
157 f,
158 "tool call chain depth {current} exceeds maximum {max} (OWASP ASI08)"
159 )
160 }
161 CascadingError::PipelineBroken {
162 pipeline_id,
163 error_rate: _,
164 } => {
165 write!(
170 f,
171 "Pipeline '{}' circuit is broken — requests blocked until recovery",
172 pipeline_id
173 )
174 }
175 }
176 }
177}
178
179impl std::error::Error for CascadingError {}
180
181impl CascadingConfig {
182 pub fn validate(&self) -> Result<(), CascadingError> {
184 if self.max_chain_depth == 0 || self.max_chain_depth > ABSOLUTE_MAX_CHAIN_DEPTH {
185 return Err(CascadingError::InvalidConfig(format!(
186 "max_chain_depth must be in [1, {}], got {}",
187 ABSOLUTE_MAX_CHAIN_DEPTH, self.max_chain_depth
188 )));
189 }
190 if !self.error_rate_threshold.is_finite()
192 || self.error_rate_threshold < 0.0
193 || self.error_rate_threshold > 1.0
194 {
195 return Err(CascadingError::InvalidConfig(format!(
196 "error_rate_threshold must be in [0.0, 1.0], got {}",
197 self.error_rate_threshold
198 )));
199 }
200 const MAX_WINDOW_SECS: u64 = 86_400; const MAX_BREAK_DURATION_SECS: u64 = 86_400; if self.window_secs == 0 || self.window_secs > MAX_WINDOW_SECS {
206 return Err(CascadingError::InvalidConfig(format!(
207 "window_secs must be in [1, {MAX_WINDOW_SECS}], got {}",
208 self.window_secs
209 )));
210 }
211 if self.break_duration_secs == 0 || self.break_duration_secs > MAX_BREAK_DURATION_SECS {
212 return Err(CascadingError::InvalidConfig(format!(
213 "break_duration_secs must be in [1, {MAX_BREAK_DURATION_SECS}], got {}",
214 self.break_duration_secs
215 )));
216 }
217 const MAX_MIN_WINDOW_EVENTS: u32 = 100_000;
222 if self.min_window_events == 0 {
223 return Err(CascadingError::InvalidConfig(
224 "min_window_events must be >= 1, got 0".to_string(),
225 ));
226 }
227 if self.min_window_events > MAX_MIN_WINDOW_EVENTS {
228 return Err(CascadingError::InvalidConfig(format!(
229 "min_window_events must be <= {}, got {}",
230 MAX_MIN_WINDOW_EVENTS, self.min_window_events
231 )));
232 }
233 Ok(())
234 }
235}
236
237#[derive(Debug, Clone, Copy)]
243struct PipelineEvent {
244 timestamp: u64,
245 is_error: bool,
246}
247
248#[derive(Debug)]
250struct PipelineState {
251 events: VecDeque<PipelineEvent>,
253 is_broken: bool,
255 broken_at: Option<u64>,
257 break_count: u32,
259}
260
261impl PipelineState {
262 fn new() -> Self {
263 Self {
264 events: VecDeque::new(),
265 is_broken: false,
266 broken_at: None,
267 break_count: 0,
268 }
269 }
270}
271
272#[derive(Debug, Clone)]
274struct CallChain {
275 depth: u32,
277 #[allow(dead_code)]
279 pipeline_id: String,
280 #[allow(dead_code)]
282 started_at: u64,
283}
284
285pub struct CascadingBreaker {
293 config: CascadingConfig,
294 pipelines: RwLock<HashMap<String, PipelineState>>,
296 chains: RwLock<HashMap<String, CallChain>>,
298}
299
300impl std::fmt::Debug for CascadingBreaker {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 f.debug_struct("CascadingBreaker")
303 .field("config", &self.config)
304 .field("pipelines", &"<locked>")
305 .field("chains", &"<locked>")
306 .finish()
307 }
308}
309
310impl CascadingBreaker {
311 pub fn new(config: CascadingConfig) -> Result<Self, CascadingError> {
313 config.validate()?;
314 Ok(Self {
315 config,
316 pipelines: RwLock::new(HashMap::new()),
317 chains: RwLock::new(HashMap::new()),
318 })
319 }
320
321 pub fn is_enabled(&self) -> bool {
323 self.config.enabled
324 }
325
326 pub fn max_chain_depth(&self) -> u32 {
328 self.config.max_chain_depth
329 }
330
331 fn validate_pipeline_id(pipeline_id: &str) -> Result<(), CascadingError> {
336 if pipeline_id.is_empty() || pipeline_id.len() > MAX_PIPELINE_ID_LEN {
337 return Err(CascadingError::InvalidInput(format!(
338 "pipeline_id length {} out of range [1, {}]",
339 pipeline_id.len(),
340 MAX_PIPELINE_ID_LEN
341 )));
342 }
343 if vellaveto_types::has_dangerous_chars(pipeline_id) {
344 return Err(CascadingError::InvalidInput(
345 "pipeline_id contains control or Unicode format characters".to_string(),
346 ));
347 }
348 Ok(())
349 }
350
351 fn validate_chain_id(chain_id: &str) -> Result<(), CascadingError> {
352 if chain_id.is_empty() || chain_id.len() > MAX_CHAIN_ID_LEN {
353 return Err(CascadingError::InvalidInput(format!(
354 "chain_id length {} out of range [1, {}]",
355 chain_id.len(),
356 MAX_CHAIN_ID_LEN
357 )));
358 }
359 if vellaveto_types::has_dangerous_chars(chain_id) {
360 return Err(CascadingError::InvalidInput(
361 "chain_id contains control or Unicode format characters".to_string(),
362 ));
363 }
364 Ok(())
365 }
366
367 pub fn enter_chain(&self, chain_id: &str, pipeline_id: &str) -> Result<u32, CascadingError> {
379 if !self.config.enabled {
380 return Ok(0);
381 }
382 Self::validate_chain_id(chain_id)?;
383 Self::validate_pipeline_id(pipeline_id)?;
384
385 let mut chains = self
386 .chains
387 .write()
388 .map_err(|_| CascadingError::LockPoisoned("chains write lock".to_string()))?;
389
390 if let Some(chain) = chains.get_mut(chain_id) {
391 let new_depth = chain.depth.saturating_add(1);
392 if new_depth > self.config.max_chain_depth {
393 metrics::counter!(
394 "vellaveto_cascading_depth_exceeded_total",
395 "pipeline" => pipeline_id.to_string()
396 )
397 .increment(1);
398
399 tracing::warn!(
400 chain_id = %chain_id,
401 pipeline_id = %pipeline_id,
402 current_depth = %new_depth,
403 max_depth = %self.config.max_chain_depth,
404 "Tool call chain depth exceeded (OWASP ASI08)"
405 );
406
407 return Err(CascadingError::ChainDepthExceeded {
408 current: new_depth,
409 max: self.config.max_chain_depth,
410 });
411 }
412 chain.depth = new_depth;
413 Ok(new_depth)
414 } else {
415 if chains.len() >= MAX_TRACKED_CHAINS {
417 tracing::warn!(
418 max = MAX_TRACKED_CHAINS,
419 "Cascading chain tracker at capacity, denying new chain (fail-closed)"
420 );
421 return Err(CascadingError::ChainDepthExceeded {
422 current: 1,
423 max: 0, });
425 }
426
427 let now = Self::now_secs();
428 chains.insert(
429 chain_id.to_string(),
430 CallChain {
431 depth: 1,
432 pipeline_id: pipeline_id.to_string(),
433 started_at: now,
434 },
435 );
436 Ok(1)
437 }
438 }
439
440 pub fn exit_chain(&self, chain_id: &str) -> Result<u32, CascadingError> {
444 if !self.config.enabled {
445 return Ok(0);
446 }
447 Self::validate_chain_id(chain_id)?;
448
449 let mut chains = self
450 .chains
451 .write()
452 .map_err(|_| CascadingError::LockPoisoned("chains write lock".to_string()))?;
453
454 if let Some(chain) = chains.get_mut(chain_id) {
455 if chain.depth <= 1 {
456 chains.remove(chain_id);
457 return Ok(0);
458 }
459 chain.depth = chain.depth.saturating_sub(1);
460 Ok(chain.depth)
461 } else {
462 Ok(0)
463 }
464 }
465
466 pub fn chain_depth(&self, chain_id: &str) -> Result<u32, CascadingError> {
468 if !self.config.enabled {
469 return Ok(0);
470 }
471 Self::validate_chain_id(chain_id)?;
472
473 let chains = self
474 .chains
475 .read()
476 .map_err(|_| CascadingError::LockPoisoned("chains read lock".to_string()))?;
477
478 Ok(chains.get(chain_id).map(|c| c.depth).unwrap_or(0))
479 }
480
481 #[must_use = "pipeline break results must not be discarded"]
490 pub fn check_pipeline(&self, pipeline_id: &str) -> Result<(), CascadingError> {
491 if !self.config.enabled {
492 return Ok(());
493 }
494 Self::validate_pipeline_id(pipeline_id)?;
495
496 let pipelines = self
497 .pipelines
498 .read()
499 .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
500
501 if let Some(state) = pipelines.get(pipeline_id) {
502 if state.is_broken {
503 let now = Self::now_secs();
504 if let Some(broken_at) = state.broken_at {
505 if now >= broken_at.saturating_add(self.config.break_duration_secs) {
506 return Ok(());
508 }
509 }
510
511 let error_rate = self.compute_error_rate_inner(state, now);
512 return Err(CascadingError::PipelineBroken {
513 pipeline_id: pipeline_id.to_string(),
514 error_rate: error_rate * 100.0,
515 });
516 }
517 }
518
519 Ok(())
520 }
521
522 pub fn record_pipeline_success(&self, pipeline_id: &str) -> Result<(), CascadingError> {
524 if !self.config.enabled {
525 return Ok(());
526 }
527 Self::validate_pipeline_id(pipeline_id)?;
528 self.record_pipeline_event(pipeline_id, false)
529 }
530
531 pub fn record_pipeline_error(&self, pipeline_id: &str) -> Result<bool, CascadingError> {
534 if !self.config.enabled {
535 return Ok(false);
536 }
537 Self::validate_pipeline_id(pipeline_id)?;
538 self.record_pipeline_event(pipeline_id, true)?;
539
540 let mut pipelines = self.pipelines.write().map_err(|_| {
542 CascadingError::LockPoisoned("pipelines write lock for break check".to_string())
543 })?;
544
545 if let Some(state) = pipelines.get_mut(pipeline_id) {
546 if !state.is_broken {
547 let now = Self::now_secs();
548 let error_rate = self.compute_error_rate_inner(state, now);
549 let total_events = state.events.len();
550
551 if total_events >= self.config.min_window_events as usize
552 && error_rate >= self.config.error_rate_threshold
553 {
554 state.is_broken = true;
555 state.broken_at = Some(now);
556 state.break_count = state.break_count.saturating_add(1);
557
558 metrics::counter!(
559 "vellaveto_cascading_pipeline_breaks_total",
560 "pipeline" => pipeline_id.to_string()
561 )
562 .increment(1);
563
564 tracing::warn!(
565 pipeline_id = %pipeline_id,
566 error_rate = %format!("{:.1}%", error_rate * 100.0),
567 break_count = %state.break_count,
568 "Pipeline circuit broken due to high error rate (OWASP ASI08)"
569 );
570
571 return Ok(true);
572 }
573 }
574 }
575
576 Ok(false)
577 }
578
579 fn record_pipeline_event(
581 &self,
582 pipeline_id: &str,
583 is_error: bool,
584 ) -> Result<(), CascadingError> {
585 let mut pipelines = self
586 .pipelines
587 .write()
588 .map_err(|_| CascadingError::LockPoisoned("pipelines write lock".to_string()))?;
589
590 if !pipelines.contains_key(pipeline_id) && pipelines.len() >= MAX_TRACKED_PIPELINES {
594 tracing::warn!(
595 max = MAX_TRACKED_PIPELINES,
596 "Cascading pipeline tracker at capacity — denying new pipeline"
597 );
598 return Err(CascadingError::ChainDepthExceeded {
599 current: u32::try_from(pipelines.len()).unwrap_or(u32::MAX),
600 max: u32::try_from(MAX_TRACKED_PIPELINES).unwrap_or(u32::MAX),
601 });
602 }
603
604 let now = Self::now_secs();
605 let state = pipelines
606 .entry(pipeline_id.to_string())
607 .or_insert_with(PipelineState::new);
608
609 let cutoff = now.saturating_sub(self.config.window_secs);
611 while let Some(front) = state.events.front() {
612 if front.timestamp < cutoff {
613 state.events.pop_front();
614 } else {
615 break;
616 }
617 }
618
619 if state.events.len() >= MAX_EVENTS_PER_PIPELINE {
621 state.events.pop_front();
622 }
623
624 state.events.push_back(PipelineEvent {
625 timestamp: now,
626 is_error,
627 });
628
629 if state.is_broken {
631 if let Some(broken_at) = state.broken_at {
632 if now >= broken_at.saturating_add(self.config.break_duration_secs) {
633 let error_rate = self.compute_error_rate_inner(state, now);
635 if error_rate < self.config.error_rate_threshold {
636 state.is_broken = false;
637 state.broken_at = None;
638
639 metrics::counter!(
640 "vellaveto_cascading_pipeline_recoveries_total",
641 "pipeline" => pipeline_id.to_string()
642 )
643 .increment(1);
644
645 tracing::info!(
646 pipeline_id = %pipeline_id,
647 error_rate = %format!("{:.1}%", error_rate * 100.0),
648 "Pipeline circuit recovered"
649 );
650 }
651 }
652 }
653 }
654
655 Ok(())
656 }
657
658 fn compute_error_rate_inner(&self, state: &PipelineState, now: u64) -> f64 {
663 if state.events.is_empty() {
664 return 0.0;
665 }
666 let cutoff = now.saturating_sub(self.config.window_secs);
667
668 let mut total = 0u64;
669 let mut errors = 0u64;
670 for event in &state.events {
671 if event.timestamp >= cutoff {
672 total = total.saturating_add(1);
673 if event.is_error {
674 errors = errors.saturating_add(1);
675 }
676 }
677 }
678
679 if total == 0 {
680 return 0.0;
681 }
682
683 let rate = errors as f64 / total as f64;
684 if !rate.is_finite() {
685 return 1.0; }
687 rate
688 }
689
690 pub fn pipeline_error_rate(&self, pipeline_id: &str) -> Result<f64, CascadingError> {
692 if !self.config.enabled {
693 return Ok(0.0);
694 }
695 Self::validate_pipeline_id(pipeline_id)?;
696
697 let pipelines = self
698 .pipelines
699 .read()
700 .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
701
702 if let Some(state) = pipelines.get(pipeline_id) {
703 Ok(self.compute_error_rate_inner(state, Self::now_secs()))
704 } else {
705 Ok(0.0)
706 }
707 }
708
709 pub fn is_pipeline_broken(&self, pipeline_id: &str) -> Result<bool, CascadingError> {
711 if !self.config.enabled {
712 return Ok(false);
713 }
714 Self::validate_pipeline_id(pipeline_id)?;
715
716 let pipelines = self
717 .pipelines
718 .read()
719 .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
720
721 Ok(pipelines
722 .get(pipeline_id)
723 .map(|s| s.is_broken)
724 .unwrap_or(false))
725 }
726
727 pub fn pipeline_summary(&self) -> Result<CascadingSummary, CascadingError> {
729 let pipelines = self
730 .pipelines
731 .read()
732 .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
733 let chains = self
734 .chains
735 .read()
736 .map_err(|_| CascadingError::LockPoisoned("chains read lock".to_string()))?;
737
738 let mut healthy = 0usize;
739 let mut broken = 0usize;
740 for state in pipelines.values() {
741 if state.is_broken {
742 broken = broken.saturating_add(1);
743 } else {
744 healthy = healthy.saturating_add(1);
745 }
746 }
747
748 Ok(CascadingSummary {
749 total_pipelines: pipelines.len(),
750 healthy_pipelines: healthy,
751 broken_pipelines: broken,
752 active_chains: chains.len(),
753 max_chain_depth: self.config.max_chain_depth,
754 })
755 }
756
757 fn now_secs() -> u64 {
762 std::time::SystemTime::now()
763 .duration_since(std::time::UNIX_EPOCH)
764 .map(|d| d.as_secs())
765 .unwrap_or_else(|e| {
766 tracing::warn!(error = %e, "SystemTime before UNIX_EPOCH — using 1");
770 1
771 })
772 }
773}
774
775#[derive(Debug, Clone, Default, Serialize, Deserialize)]
777#[serde(deny_unknown_fields)]
778pub struct CascadingSummary {
779 pub total_pipelines: usize,
780 pub healthy_pipelines: usize,
781 pub broken_pipelines: usize,
782 pub active_chains: usize,
783 pub max_chain_depth: u32,
784}
785
786#[cfg(test)]
791mod tests {
792 use super::*;
793
794 fn default_config() -> CascadingConfig {
795 CascadingConfig::default()
796 }
797
798 fn make_breaker() -> CascadingBreaker {
799 CascadingBreaker::new(default_config()).unwrap()
800 }
801
802 #[test]
807 fn test_config_validate_default_ok() {
808 assert!(CascadingConfig::default().validate().is_ok());
809 }
810
811 #[test]
812 fn test_config_validate_zero_depth_rejected() {
813 let mut cfg = default_config();
814 cfg.max_chain_depth = 0;
815 assert!(cfg.validate().is_err());
816 }
817
818 #[test]
819 fn test_config_validate_excessive_depth_rejected() {
820 let mut cfg = default_config();
821 cfg.max_chain_depth = ABSOLUTE_MAX_CHAIN_DEPTH + 1;
822 assert!(cfg.validate().is_err());
823 }
824
825 #[test]
826 fn test_config_validate_nan_error_rate_rejected() {
827 let mut cfg = default_config();
828 cfg.error_rate_threshold = f64::NAN;
829 assert!(cfg.validate().is_err());
830 }
831
832 #[test]
833 fn test_config_validate_negative_error_rate_rejected() {
834 let mut cfg = default_config();
835 cfg.error_rate_threshold = -0.1;
836 assert!(cfg.validate().is_err());
837 }
838
839 #[test]
840 fn test_config_validate_above_one_error_rate_rejected() {
841 let mut cfg = default_config();
842 cfg.error_rate_threshold = 1.1;
843 assert!(cfg.validate().is_err());
844 }
845
846 #[test]
847 fn test_config_validate_zero_window_rejected() {
848 let mut cfg = default_config();
849 cfg.window_secs = 0;
850 assert!(cfg.validate().is_err());
851 }
852
853 #[test]
854 fn test_config_validate_zero_break_duration_rejected() {
855 let mut cfg = default_config();
856 cfg.break_duration_secs = 0;
857 assert!(cfg.validate().is_err());
858 }
859
860 #[test]
863 fn test_r245_config_validate_zero_min_window_events_rejected() {
864 let mut cfg = default_config();
865 cfg.min_window_events = 0;
866 let err = cfg.validate().unwrap_err();
867 let msg = format!("{err:?}");
868 assert!(msg.contains("min_window_events must be >= 1"));
869 }
870
871 #[test]
872 fn test_r245_config_validate_one_min_window_events_accepted() {
873 let mut cfg = default_config();
874 cfg.min_window_events = 1;
875 assert!(cfg.validate().is_ok());
876 }
877
878 #[test]
883 fn test_enter_chain_starts_at_depth_one() {
884 let breaker = make_breaker();
885 let depth = breaker.enter_chain("chain-1", "pipeline-1").unwrap();
886 assert_eq!(depth, 1);
887 }
888
889 #[test]
890 fn test_enter_chain_increments_depth() {
891 let breaker = make_breaker();
892 assert_eq!(breaker.enter_chain("chain-1", "pipe-1").unwrap(), 1);
893 assert_eq!(breaker.enter_chain("chain-1", "pipe-1").unwrap(), 2);
894 assert_eq!(breaker.enter_chain("chain-1", "pipe-1").unwrap(), 3);
895 }
896
897 #[test]
898 fn test_enter_chain_depth_exceeded_denied() {
899 let mut cfg = default_config();
900 cfg.max_chain_depth = 3;
901 let breaker = CascadingBreaker::new(cfg).unwrap();
902
903 assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 1);
904 assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 2);
905 assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 3);
906
907 let result = breaker.enter_chain("c1", "p1");
909 assert!(result.is_err());
910 match result.err().unwrap() {
911 CascadingError::ChainDepthExceeded { current, max } => {
912 assert_eq!(current, 4);
913 assert_eq!(max, 3);
914 }
915 other => panic!("Expected ChainDepthExceeded, got {other:?}"),
916 }
917 }
918
919 #[test]
920 fn test_exit_chain_decrements_depth() {
921 let breaker = make_breaker();
922 breaker.enter_chain("c1", "p1").unwrap();
923 breaker.enter_chain("c1", "p1").unwrap();
924 breaker.enter_chain("c1", "p1").unwrap();
925
926 assert_eq!(breaker.exit_chain("c1").unwrap(), 2);
927 assert_eq!(breaker.exit_chain("c1").unwrap(), 1);
928 assert_eq!(breaker.exit_chain("c1").unwrap(), 0);
929 }
930
931 #[test]
932 fn test_exit_chain_removes_at_zero() {
933 let breaker = make_breaker();
934 breaker.enter_chain("c1", "p1").unwrap();
935 breaker.exit_chain("c1").unwrap();
936 assert_eq!(breaker.chain_depth("c1").unwrap(), 0);
937 }
938
939 #[test]
940 fn test_exit_chain_nonexistent_returns_zero() {
941 let breaker = make_breaker();
942 assert_eq!(breaker.exit_chain("nonexistent").unwrap(), 0);
943 }
944
945 #[test]
946 fn test_chain_depth_nonexistent_returns_zero() {
947 let breaker = make_breaker();
948 assert_eq!(breaker.chain_depth("nonexistent").unwrap(), 0);
949 }
950
951 #[test]
952 fn test_enter_chain_disabled_returns_zero() {
953 let mut cfg = default_config();
954 cfg.enabled = false;
955 let breaker = CascadingBreaker::new(cfg).unwrap();
956 assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 0);
957 }
958
959 #[test]
964 fn test_check_pipeline_healthy_ok() {
965 let breaker = make_breaker();
966 assert!(breaker.check_pipeline("pipe-1").is_ok());
967 }
968
969 #[test]
970 fn test_record_pipeline_mostly_success_no_break() {
971 let breaker = make_breaker();
972 for _ in 0..20 {
974 breaker.record_pipeline_success("pipe-1").unwrap();
975 breaker.record_pipeline_success("pipe-1").unwrap();
976 breaker.record_pipeline_success("pipe-1").unwrap();
977 breaker.record_pipeline_success("pipe-1").unwrap();
978 assert!(!breaker.record_pipeline_error("pipe-1").unwrap());
979 }
980 assert!(breaker.check_pipeline("pipe-1").is_ok());
981 }
982
983 #[test]
984 fn test_record_pipeline_error_breaks_circuit() {
985 let mut cfg = default_config();
986 cfg.error_rate_threshold = 0.5;
987 cfg.min_window_events = 4;
988 let breaker = CascadingBreaker::new(cfg).unwrap();
989
990 for i in 0..5 {
992 let broke = breaker.record_pipeline_error("pipe-1").unwrap();
993 if i >= 3 {
994 if broke {
996 assert!(breaker.is_pipeline_broken("pipe-1").unwrap());
997 return;
998 }
999 }
1000 }
1001 assert!(
1003 breaker.is_pipeline_broken("pipe-1").unwrap(),
1004 "Pipeline should be broken after 5 consecutive errors"
1005 );
1006 }
1007
1008 #[test]
1009 fn test_pipeline_error_rate_computed_correctly() {
1010 let mut cfg = default_config();
1011 cfg.min_window_events = 2;
1012 let breaker = CascadingBreaker::new(cfg).unwrap();
1013
1014 breaker.record_pipeline_success("pipe-1").unwrap();
1015 breaker.record_pipeline_success("pipe-1").unwrap();
1016 breaker.record_pipeline_error("pipe-1").unwrap();
1017 breaker.record_pipeline_error("pipe-1").unwrap();
1018
1019 let rate = breaker.pipeline_error_rate("pipe-1").unwrap();
1020 assert!(
1021 (rate - 0.5).abs() < 0.01,
1022 "Error rate should be ~0.5, got {rate}"
1023 );
1024 }
1025
1026 #[test]
1027 fn test_pipeline_error_rate_nonexistent_returns_zero() {
1028 let breaker = make_breaker();
1029 assert_eq!(breaker.pipeline_error_rate("nonexistent").unwrap(), 0.0);
1030 }
1031
1032 #[test]
1033 fn test_check_pipeline_disabled_ok() {
1034 let mut cfg = default_config();
1035 cfg.enabled = false;
1036 let breaker = CascadingBreaker::new(cfg).unwrap();
1037 assert!(breaker.check_pipeline("pipe-1").is_ok());
1038 }
1039
1040 #[test]
1041 fn test_is_pipeline_broken_nonexistent_false() {
1042 let breaker = make_breaker();
1043 assert!(!breaker.is_pipeline_broken("nonexistent").unwrap());
1044 }
1045
1046 #[test]
1051 fn test_pipeline_summary_empty() {
1052 let breaker = make_breaker();
1053 let summary = breaker.pipeline_summary().unwrap();
1054 assert_eq!(summary.total_pipelines, 0);
1055 assert_eq!(summary.healthy_pipelines, 0);
1056 assert_eq!(summary.broken_pipelines, 0);
1057 assert_eq!(summary.active_chains, 0);
1058 assert_eq!(summary.max_chain_depth, 10);
1059 }
1060
1061 #[test]
1062 fn test_pipeline_summary_with_data() {
1063 let mut cfg = default_config();
1064 cfg.error_rate_threshold = 0.5;
1065 cfg.min_window_events = 2;
1066 let breaker = CascadingBreaker::new(cfg).unwrap();
1067
1068 breaker.record_pipeline_success("pipe-1").unwrap();
1070 breaker.record_pipeline_success("pipe-1").unwrap();
1071
1072 breaker.record_pipeline_error("pipe-2").unwrap();
1074 breaker.record_pipeline_error("pipe-2").unwrap();
1075 breaker.record_pipeline_error("pipe-2").unwrap();
1076
1077 breaker.enter_chain("chain-1", "pipe-1").unwrap();
1079
1080 let summary = breaker.pipeline_summary().unwrap();
1081 assert_eq!(summary.total_pipelines, 2);
1082 assert_eq!(summary.active_chains, 1);
1083 assert!(summary.broken_pipelines >= 1);
1085 }
1086
1087 #[test]
1092 fn test_validate_pipeline_id_empty_rejected() {
1093 let breaker = make_breaker();
1094 assert!(breaker.check_pipeline("").is_err());
1095 }
1096
1097 #[test]
1098 fn test_validate_pipeline_id_too_long_rejected() {
1099 let breaker = make_breaker();
1100 let long_id = "p".repeat(MAX_PIPELINE_ID_LEN + 1);
1101 assert!(breaker.check_pipeline(&long_id).is_err());
1102 }
1103
1104 #[test]
1105 fn test_validate_pipeline_id_control_chars_rejected() {
1106 let breaker = make_breaker();
1107 assert!(breaker.check_pipeline("pipe\0line").is_err());
1108 }
1109
1110 #[test]
1111 fn test_validate_chain_id_empty_rejected() {
1112 let breaker = make_breaker();
1113 assert!(breaker.enter_chain("", "pipe-1").is_err());
1114 }
1115
1116 #[test]
1121 fn test_config_serialization_roundtrip() {
1122 let cfg = CascadingConfig::default();
1123 let json = serde_json::to_string(&cfg).unwrap();
1124 let parsed: CascadingConfig = serde_json::from_str(&json).unwrap();
1125 assert_eq!(parsed.max_chain_depth, cfg.max_chain_depth);
1126 assert_eq!(parsed.error_rate_threshold, cfg.error_rate_threshold);
1127 }
1128
1129 #[test]
1130 fn test_config_deny_unknown_fields() {
1131 let json = r#"{"enabled": true, "bogus": 42}"#;
1132 let result: Result<CascadingConfig, _> = serde_json::from_str(json);
1133 assert!(
1134 result.is_err(),
1135 "deny_unknown_fields should reject unknown fields"
1136 );
1137 }
1138
1139 #[test]
1140 fn test_summary_serialization_roundtrip() {
1141 let summary = CascadingSummary {
1142 total_pipelines: 5,
1143 healthy_pipelines: 3,
1144 broken_pipelines: 2,
1145 active_chains: 10,
1146 max_chain_depth: 10,
1147 };
1148 let json = serde_json::to_string(&summary).unwrap();
1149 let parsed: CascadingSummary = serde_json::from_str(&json).unwrap();
1150 assert_eq!(parsed.total_pipelines, 5);
1151 assert_eq!(parsed.broken_pipelines, 2);
1152 }
1153
1154 #[test]
1155 fn test_summary_deny_unknown_fields() {
1156 let json = r#"{"total_pipelines":5,"healthy_pipelines":3,"broken_pipelines":2,"active_chains":10,"max_chain_depth":10,"extra":"bad"}"#;
1157 let result: Result<CascadingSummary, _> = serde_json::from_str(json);
1158 assert!(
1159 result.is_err(),
1160 "deny_unknown_fields should reject unknown fields"
1161 );
1162 }
1163
1164 #[test]
1167 fn test_r229_pipeline_capacity_returns_error_not_ok() {
1168 let _breaker = make_breaker();
1171 let err = CascadingError::ChainDepthExceeded {
1175 current: 10_000,
1176 max: 10_000,
1177 };
1178 let msg = format!("{err:?}");
1179 assert!(msg.contains("ChainDepthExceeded"));
1180 assert!(msg.contains("10000"));
1181 }
1182
1183 #[test]
1184 fn test_r245_pipeline_broken_display_redacts_error_rate() {
1185 let err = CascadingError::PipelineBroken {
1186 pipeline_id: "pipe-1".to_string(),
1187 error_rate: 87.5,
1188 };
1189
1190 let msg = format!("{err}");
1191 assert!(msg.contains("pipe-1"));
1192 assert!(!msg.contains("87.5"));
1193 assert!(!msg.contains('%'));
1194 }
1195}