Skip to main content

vellaveto_engine/
cascading.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4//
5// Copyright 2026 Paolo Vella
6// SPDX-License-Identifier: MPL-2.0
7
8//! Cascading failure circuit breakers for multi-hop tool call chains (Phase 62).
9//!
10//! Addresses OWASP ASI08 by enforcing:
11//! - **Chain depth limits**: Maximum tool call depth in multi-hop pipelines.
12//! - **Per-pipeline error rate tracking**: Automatic circuit breaking when a
13//!   pipeline's error rate exceeds a configurable threshold.
14//! - **Pipeline isolation**: Errors in one pipeline do not affect others.
15//!
16//! # Design
17//!
18//! - **Deterministic**: Sliding window error rate, no ML.
19//! - **Bounded memory**: `MAX_*` constants on all collections.
20//! - **Fail-closed**: Lock poisoning and capacity exhaustion deny requests.
21//! - **Observable**: Metrics and structured tracing for all state changes.
22
23use serde::{Deserialize, Serialize};
24use std::collections::{HashMap, VecDeque};
25use std::sync::RwLock;
26
27// ═══════════════════════════════════════════════════
28// CONSTANTS
29// ═══════════════════════════════════════════════════
30
31/// Maximum tracked pipelines.
32const MAX_TRACKED_PIPELINES: usize = 10_000;
33
34/// Maximum events per pipeline window.
35const MAX_EVENTS_PER_PIPELINE: usize = 10_000;
36
37/// Maximum tracked call chains (in-flight).
38const MAX_TRACKED_CHAINS: usize = 50_000;
39
40/// Maximum length of a pipeline ID.
41const MAX_PIPELINE_ID_LEN: usize = 512;
42
43/// Maximum length of a chain ID.
44const MAX_CHAIN_ID_LEN: usize = 512;
45
46/// Maximum chain depth hard limit (cannot configure higher than this).
47const ABSOLUTE_MAX_CHAIN_DEPTH: u32 = 100;
48
49// ═══════════════════════════════════════════════════
50// CONFIGURATION
51// ═══════════════════════════════════════════════════
52
53/// Configuration for cascading failure circuit breakers.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(deny_unknown_fields)]
56pub struct CascadingConfig {
57    /// Whether cascading failure protection is enabled.
58    /// Default: true
59    #[serde(default = "default_enabled")]
60    pub enabled: bool,
61
62    /// Maximum allowed chain depth for multi-hop tool calls.
63    /// Requests exceeding this depth are denied.
64    /// Default: 10
65    #[serde(default = "default_max_chain_depth")]
66    pub max_chain_depth: u32,
67
68    /// Error rate threshold (0.0–1.0) that triggers pipeline circuit breaking.
69    /// When the error rate within the window exceeds this, the pipeline is broken.
70    /// Default: 0.5
71    #[serde(default = "default_error_rate_threshold")]
72    pub error_rate_threshold: f64,
73
74    /// Sliding window size in seconds for error rate calculation.
75    /// Default: 300 (5 minutes)
76    #[serde(default = "default_window_secs")]
77    pub window_secs: u64,
78
79    /// Minimum number of events in the window before error rate is actionable.
80    /// Prevents false positives from small sample sizes.
81    /// Default: 10
82    #[serde(default = "default_min_window_events")]
83    pub min_window_events: u32,
84
85    /// Duration in seconds a pipeline stays broken before allowing probes.
86    /// Default: 60
87    #[serde(default = "default_break_duration_secs")]
88    pub break_duration_secs: u64,
89}
90
91fn default_enabled() -> bool {
92    true
93}
94fn default_max_chain_depth() -> u32 {
95    10
96}
97fn default_error_rate_threshold() -> f64 {
98    0.5
99}
100fn default_window_secs() -> u64 {
101    300
102}
103fn default_min_window_events() -> u32 {
104    10
105}
106fn default_break_duration_secs() -> u64 {
107    60
108}
109
110impl Default for CascadingConfig {
111    fn default() -> Self {
112        Self {
113            enabled: default_enabled(),
114            max_chain_depth: default_max_chain_depth(),
115            error_rate_threshold: default_error_rate_threshold(),
116            window_secs: default_window_secs(),
117            min_window_events: default_min_window_events(),
118            break_duration_secs: default_break_duration_secs(),
119        }
120    }
121}
122
123// ═══════════════════════════════════════════════════
124// ERRORS
125// ═══════════════════════════════════════════════════
126
127/// Errors from cascading failure operations.
128#[derive(Debug, Clone, PartialEq)]
129pub enum CascadingError {
130    /// Configuration validation failed.
131    InvalidConfig(String),
132    /// Lock poisoned — fail-closed.
133    LockPoisoned(String),
134    /// Input validation failed.
135    InvalidInput(String),
136    /// Chain depth exceeded.
137    ChainDepthExceeded { current: u32, max: u32 },
138    /// Pipeline circuit broken.
139    PipelineBroken {
140        pipeline_id: String,
141        error_rate: f64,
142    },
143}
144
145impl std::fmt::Display for CascadingError {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        match self {
148            CascadingError::InvalidConfig(msg) => write!(f, "invalid cascading config: {msg}"),
149            CascadingError::LockPoisoned(msg) => {
150                write!(f, "cascading breaker lock poisoned (fail-closed): {msg}")
151            }
152            CascadingError::InvalidInput(msg) => {
153                write!(f, "cascading breaker input validation failed: {msg}")
154            }
155            CascadingError::ChainDepthExceeded { current, max } => {
156                write!(
157                    f,
158                    "tool call chain depth {current} exceeds maximum {max} (OWASP ASI08)"
159                )
160            }
161            CascadingError::PipelineBroken {
162                pipeline_id,
163                error_rate: _,
164            } => {
165                // SECURITY (R245-ENG-12): Do not expose error_rate in Display.
166                // The exact percentage leaks internal pipeline health metrics to
167                // callers, which may be untrusted. The field is retained in the
168                // enum for internal logging/metrics.
169                write!(
170                    f,
171                    "Pipeline '{}' circuit is broken — requests blocked until recovery",
172                    pipeline_id
173                )
174            }
175        }
176    }
177}
178
179impl std::error::Error for CascadingError {}
180
181impl CascadingConfig {
182    /// Validate configuration values.
183    pub fn validate(&self) -> Result<(), CascadingError> {
184        if self.max_chain_depth == 0 || self.max_chain_depth > ABSOLUTE_MAX_CHAIN_DEPTH {
185            return Err(CascadingError::InvalidConfig(format!(
186                "max_chain_depth must be in [1, {}], got {}",
187                ABSOLUTE_MAX_CHAIN_DEPTH, self.max_chain_depth
188            )));
189        }
190        // SECURITY (Trap 4): Validate f64 for NaN/Infinity.
191        if !self.error_rate_threshold.is_finite()
192            || self.error_rate_threshold < 0.0
193            || self.error_rate_threshold > 1.0
194        {
195            return Err(CascadingError::InvalidConfig(format!(
196                "error_rate_threshold must be in [0.0, 1.0], got {}",
197                self.error_rate_threshold
198            )));
199        }
200        // SECURITY (R240-ENG-2): Upper-bound time windows to prevent unbounded memory
201        // growth in sliding window trackers and permanent denial-of-service from
202        // unreachable break durations. Consistent with CollusionConfig bounds.
203        const MAX_WINDOW_SECS: u64 = 86_400; // 24 hours
204        const MAX_BREAK_DURATION_SECS: u64 = 86_400; // 24 hours
205        if self.window_secs == 0 || self.window_secs > MAX_WINDOW_SECS {
206            return Err(CascadingError::InvalidConfig(format!(
207                "window_secs must be in [1, {MAX_WINDOW_SECS}], got {}",
208                self.window_secs
209            )));
210        }
211        if self.break_duration_secs == 0 || self.break_duration_secs > MAX_BREAK_DURATION_SECS {
212            return Err(CascadingError::InvalidConfig(format!(
213                "break_duration_secs must be in [1, {MAX_BREAK_DURATION_SECS}], got {}",
214                self.break_duration_secs
215            )));
216        }
217        // SECURITY (R229-ENG-9 + R245-ENG-2): Bound min_window_events to [1, 100_000].
218        // Zero would make the circuit breaker trigger on the first failure regardless
219        // of sample size, causing spurious tripping. Upper bound prevents disabling
220        // circuit breakers by setting an unreachably high minimum.
221        const MAX_MIN_WINDOW_EVENTS: u32 = 100_000;
222        if self.min_window_events == 0 {
223            return Err(CascadingError::InvalidConfig(
224                "min_window_events must be >= 1, got 0".to_string(),
225            ));
226        }
227        if self.min_window_events > MAX_MIN_WINDOW_EVENTS {
228            return Err(CascadingError::InvalidConfig(format!(
229                "min_window_events must be <= {}, got {}",
230                MAX_MIN_WINDOW_EVENTS, self.min_window_events
231            )));
232        }
233        Ok(())
234    }
235}
236
237// ═══════════════════════════════════════════════════
238// INTERNAL STATE
239// ═══════════════════════════════════════════════════
240
241/// A pipeline event (success or failure) with timestamp.
242#[derive(Debug, Clone, Copy)]
243struct PipelineEvent {
244    timestamp: u64,
245    is_error: bool,
246}
247
248/// Per-pipeline error tracking state.
249#[derive(Debug)]
250struct PipelineState {
251    /// Sliding window of events.
252    events: VecDeque<PipelineEvent>,
253    /// Whether the circuit is currently broken.
254    is_broken: bool,
255    /// Timestamp when the circuit was broken (None if not broken).
256    broken_at: Option<u64>,
257    /// Number of times this pipeline has been broken.
258    break_count: u32,
259}
260
261impl PipelineState {
262    fn new() -> Self {
263        Self {
264            events: VecDeque::new(),
265            is_broken: false,
266            broken_at: None,
267            break_count: 0,
268        }
269    }
270}
271
272/// In-flight call chain tracking.
273#[derive(Debug, Clone)]
274struct CallChain {
275    /// Current depth.
276    depth: u32,
277    /// Pipeline this chain belongs to. Retained for observability.
278    #[allow(dead_code)]
279    pipeline_id: String,
280    /// Timestamp when the chain started. Retained for staleness eviction.
281    #[allow(dead_code)]
282    started_at: u64,
283}
284
285// ═══════════════════════════════════════════════════
286// MANAGER
287// ═══════════════════════════════════════════════════
288
289/// Cascading failure circuit breaker manager.
290///
291/// Thread-safe via `RwLock`. All security-critical paths are fail-closed.
292pub struct CascadingBreaker {
293    config: CascadingConfig,
294    /// Per-pipeline error tracking.
295    pipelines: RwLock<HashMap<String, PipelineState>>,
296    /// In-flight call chains indexed by chain ID.
297    chains: RwLock<HashMap<String, CallChain>>,
298}
299
300impl std::fmt::Debug for CascadingBreaker {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        f.debug_struct("CascadingBreaker")
303            .field("config", &self.config)
304            .field("pipelines", &"<locked>")
305            .field("chains", &"<locked>")
306            .finish()
307    }
308}
309
310impl CascadingBreaker {
311    /// Create a new cascading failure breaker with validated configuration.
312    pub fn new(config: CascadingConfig) -> Result<Self, CascadingError> {
313        config.validate()?;
314        Ok(Self {
315            config,
316            pipelines: RwLock::new(HashMap::new()),
317            chains: RwLock::new(HashMap::new()),
318        })
319    }
320
321    /// Check if the breaker is enabled.
322    pub fn is_enabled(&self) -> bool {
323        self.config.enabled
324    }
325
326    /// Get the configured maximum chain depth.
327    pub fn max_chain_depth(&self) -> u32 {
328        self.config.max_chain_depth
329    }
330
331    // ═══════════════════════════════════════════════
332    // INPUT VALIDATION
333    // ═══════════════════════════════════════════════
334
335    fn validate_pipeline_id(pipeline_id: &str) -> Result<(), CascadingError> {
336        if pipeline_id.is_empty() || pipeline_id.len() > MAX_PIPELINE_ID_LEN {
337            return Err(CascadingError::InvalidInput(format!(
338                "pipeline_id length {} out of range [1, {}]",
339                pipeline_id.len(),
340                MAX_PIPELINE_ID_LEN
341            )));
342        }
343        if vellaveto_types::has_dangerous_chars(pipeline_id) {
344            return Err(CascadingError::InvalidInput(
345                "pipeline_id contains control or Unicode format characters".to_string(),
346            ));
347        }
348        Ok(())
349    }
350
351    fn validate_chain_id(chain_id: &str) -> Result<(), CascadingError> {
352        if chain_id.is_empty() || chain_id.len() > MAX_CHAIN_ID_LEN {
353            return Err(CascadingError::InvalidInput(format!(
354                "chain_id length {} out of range [1, {}]",
355                chain_id.len(),
356                MAX_CHAIN_ID_LEN
357            )));
358        }
359        if vellaveto_types::has_dangerous_chars(chain_id) {
360            return Err(CascadingError::InvalidInput(
361                "chain_id contains control or Unicode format characters".to_string(),
362            ));
363        }
364        Ok(())
365    }
366
367    // ═══════════════════════════════════════════════
368    // CHAIN DEPTH TRACKING
369    // ═══════════════════════════════════════════════
370
371    /// Begin or extend a tool call chain. Returns the current depth.
372    ///
373    /// If the chain does not exist, it is created at depth 1.
374    /// If the chain already exists, the depth is incremented.
375    ///
376    /// Returns `Err(ChainDepthExceeded)` if the new depth would exceed
377    /// `max_chain_depth`.
378    pub fn enter_chain(&self, chain_id: &str, pipeline_id: &str) -> Result<u32, CascadingError> {
379        if !self.config.enabled {
380            return Ok(0);
381        }
382        Self::validate_chain_id(chain_id)?;
383        Self::validate_pipeline_id(pipeline_id)?;
384
385        let mut chains = self
386            .chains
387            .write()
388            .map_err(|_| CascadingError::LockPoisoned("chains write lock".to_string()))?;
389
390        if let Some(chain) = chains.get_mut(chain_id) {
391            let new_depth = chain.depth.saturating_add(1);
392            if new_depth > self.config.max_chain_depth {
393                metrics::counter!(
394                    "vellaveto_cascading_depth_exceeded_total",
395                    "pipeline" => pipeline_id.to_string()
396                )
397                .increment(1);
398
399                tracing::warn!(
400                    chain_id = %chain_id,
401                    pipeline_id = %pipeline_id,
402                    current_depth = %new_depth,
403                    max_depth = %self.config.max_chain_depth,
404                    "Tool call chain depth exceeded (OWASP ASI08)"
405                );
406
407                return Err(CascadingError::ChainDepthExceeded {
408                    current: new_depth,
409                    max: self.config.max_chain_depth,
410                });
411            }
412            chain.depth = new_depth;
413            Ok(new_depth)
414        } else {
415            // New chain. Check capacity.
416            if chains.len() >= MAX_TRACKED_CHAINS {
417                tracing::warn!(
418                    max = MAX_TRACKED_CHAINS,
419                    "Cascading chain tracker at capacity, denying new chain (fail-closed)"
420                );
421                return Err(CascadingError::ChainDepthExceeded {
422                    current: 1,
423                    max: 0, // Capacity exceeded, not depth exceeded
424                });
425            }
426
427            let now = Self::now_secs();
428            chains.insert(
429                chain_id.to_string(),
430                CallChain {
431                    depth: 1,
432                    pipeline_id: pipeline_id.to_string(),
433                    started_at: now,
434                },
435            );
436            Ok(1)
437        }
438    }
439
440    /// Exit a chain level (decrement depth). Call when a tool call completes.
441    ///
442    /// Removes the chain entirely if depth reaches 0.
443    pub fn exit_chain(&self, chain_id: &str) -> Result<u32, CascadingError> {
444        if !self.config.enabled {
445            return Ok(0);
446        }
447        Self::validate_chain_id(chain_id)?;
448
449        let mut chains = self
450            .chains
451            .write()
452            .map_err(|_| CascadingError::LockPoisoned("chains write lock".to_string()))?;
453
454        if let Some(chain) = chains.get_mut(chain_id) {
455            if chain.depth <= 1 {
456                chains.remove(chain_id);
457                return Ok(0);
458            }
459            chain.depth = chain.depth.saturating_sub(1);
460            Ok(chain.depth)
461        } else {
462            Ok(0)
463        }
464    }
465
466    /// Get the current depth of a chain. Returns 0 if the chain doesn't exist.
467    pub fn chain_depth(&self, chain_id: &str) -> Result<u32, CascadingError> {
468        if !self.config.enabled {
469            return Ok(0);
470        }
471        Self::validate_chain_id(chain_id)?;
472
473        let chains = self
474            .chains
475            .read()
476            .map_err(|_| CascadingError::LockPoisoned("chains read lock".to_string()))?;
477
478        Ok(chains.get(chain_id).map(|c| c.depth).unwrap_or(0))
479    }
480
481    // ═══════════════════════════════════════════════
482    // PIPELINE ERROR RATE TRACKING
483    // ═══════════════════════════════════════════════
484
485    /// Check if a pipeline is available (not broken).
486    ///
487    /// Returns `Ok(())` if the pipeline is healthy or if the break duration
488    /// has elapsed (probe allowed). Returns `Err(PipelineBroken)` if broken.
489    #[must_use = "pipeline break results must not be discarded"]
490    pub fn check_pipeline(&self, pipeline_id: &str) -> Result<(), CascadingError> {
491        if !self.config.enabled {
492            return Ok(());
493        }
494        Self::validate_pipeline_id(pipeline_id)?;
495
496        let pipelines = self
497            .pipelines
498            .read()
499            .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
500
501        if let Some(state) = pipelines.get(pipeline_id) {
502            if state.is_broken {
503                let now = Self::now_secs();
504                if let Some(broken_at) = state.broken_at {
505                    if now >= broken_at.saturating_add(self.config.break_duration_secs) {
506                        // Break duration elapsed — allow probe.
507                        return Ok(());
508                    }
509                }
510
511                let error_rate = self.compute_error_rate_inner(state, now);
512                return Err(CascadingError::PipelineBroken {
513                    pipeline_id: pipeline_id.to_string(),
514                    error_rate: error_rate * 100.0,
515                });
516            }
517        }
518
519        Ok(())
520    }
521
522    /// Record a successful event in a pipeline.
523    pub fn record_pipeline_success(&self, pipeline_id: &str) -> Result<(), CascadingError> {
524        if !self.config.enabled {
525            return Ok(());
526        }
527        Self::validate_pipeline_id(pipeline_id)?;
528        self.record_pipeline_event(pipeline_id, false)
529    }
530
531    /// Record an error event in a pipeline. Returns `true` if the pipeline
532    /// circuit was broken as a result.
533    pub fn record_pipeline_error(&self, pipeline_id: &str) -> Result<bool, CascadingError> {
534        if !self.config.enabled {
535            return Ok(false);
536        }
537        Self::validate_pipeline_id(pipeline_id)?;
538        self.record_pipeline_event(pipeline_id, true)?;
539
540        // Check if we should break the circuit.
541        let mut pipelines = self.pipelines.write().map_err(|_| {
542            CascadingError::LockPoisoned("pipelines write lock for break check".to_string())
543        })?;
544
545        if let Some(state) = pipelines.get_mut(pipeline_id) {
546            if !state.is_broken {
547                let now = Self::now_secs();
548                let error_rate = self.compute_error_rate_inner(state, now);
549                let total_events = state.events.len();
550
551                if total_events >= self.config.min_window_events as usize
552                    && error_rate >= self.config.error_rate_threshold
553                {
554                    state.is_broken = true;
555                    state.broken_at = Some(now);
556                    state.break_count = state.break_count.saturating_add(1);
557
558                    metrics::counter!(
559                        "vellaveto_cascading_pipeline_breaks_total",
560                        "pipeline" => pipeline_id.to_string()
561                    )
562                    .increment(1);
563
564                    tracing::warn!(
565                        pipeline_id = %pipeline_id,
566                        error_rate = %format!("{:.1}%", error_rate * 100.0),
567                        break_count = %state.break_count,
568                        "Pipeline circuit broken due to high error rate (OWASP ASI08)"
569                    );
570
571                    return Ok(true);
572                }
573            }
574        }
575
576        Ok(false)
577    }
578
579    /// Record a pipeline event (success or error).
580    fn record_pipeline_event(
581        &self,
582        pipeline_id: &str,
583        is_error: bool,
584    ) -> Result<(), CascadingError> {
585        let mut pipelines = self
586            .pipelines
587            .write()
588            .map_err(|_| CascadingError::LockPoisoned("pipelines write lock".to_string()))?;
589
590        // SECURITY (R229-ENG-1): Fail-closed on capacity exhaustion.
591        // Previously returned Ok(()) which silently dropped events, allowing
592        // an attacker to fill with dummy pipeline IDs then operate undetected.
593        if !pipelines.contains_key(pipeline_id) && pipelines.len() >= MAX_TRACKED_PIPELINES {
594            tracing::warn!(
595                max = MAX_TRACKED_PIPELINES,
596                "Cascading pipeline tracker at capacity — denying new pipeline"
597            );
598            return Err(CascadingError::ChainDepthExceeded {
599                current: u32::try_from(pipelines.len()).unwrap_or(u32::MAX),
600                max: u32::try_from(MAX_TRACKED_PIPELINES).unwrap_or(u32::MAX),
601            });
602        }
603
604        let now = Self::now_secs();
605        let state = pipelines
606            .entry(pipeline_id.to_string())
607            .or_insert_with(PipelineState::new);
608
609        // Evict events outside the window.
610        let cutoff = now.saturating_sub(self.config.window_secs);
611        while let Some(front) = state.events.front() {
612            if front.timestamp < cutoff {
613                state.events.pop_front();
614            } else {
615                break;
616            }
617        }
618
619        // Bound events per pipeline.
620        if state.events.len() >= MAX_EVENTS_PER_PIPELINE {
621            state.events.pop_front();
622        }
623
624        state.events.push_back(PipelineEvent {
625            timestamp: now,
626            is_error,
627        });
628
629        // If the pipeline was broken and break_duration has elapsed, reset.
630        if state.is_broken {
631            if let Some(broken_at) = state.broken_at {
632                if now >= broken_at.saturating_add(self.config.break_duration_secs) {
633                    // Check if error rate has recovered.
634                    let error_rate = self.compute_error_rate_inner(state, now);
635                    if error_rate < self.config.error_rate_threshold {
636                        state.is_broken = false;
637                        state.broken_at = None;
638
639                        metrics::counter!(
640                            "vellaveto_cascading_pipeline_recoveries_total",
641                            "pipeline" => pipeline_id.to_string()
642                        )
643                        .increment(1);
644
645                        tracing::info!(
646                            pipeline_id = %pipeline_id,
647                            error_rate = %format!("{:.1}%", error_rate * 100.0),
648                            "Pipeline circuit recovered"
649                        );
650                    }
651                }
652            }
653        }
654
655        Ok(())
656    }
657
658    /// Compute the error rate for a pipeline state using the given timestamp.
659    /// SECURITY (R245-ENG-9): Accepts `now` parameter to ensure consistent
660    /// time reference with the caller, preventing TOCTOU between event
661    /// recording and error rate computation.
662    fn compute_error_rate_inner(&self, state: &PipelineState, now: u64) -> f64 {
663        if state.events.is_empty() {
664            return 0.0;
665        }
666        let cutoff = now.saturating_sub(self.config.window_secs);
667
668        let mut total = 0u64;
669        let mut errors = 0u64;
670        for event in &state.events {
671            if event.timestamp >= cutoff {
672                total = total.saturating_add(1);
673                if event.is_error {
674                    errors = errors.saturating_add(1);
675                }
676            }
677        }
678
679        if total == 0 {
680            return 0.0;
681        }
682
683        let rate = errors as f64 / total as f64;
684        if !rate.is_finite() {
685            return 1.0; // Fail-closed
686        }
687        rate
688    }
689
690    /// Get the current error rate for a pipeline (0.0–1.0).
691    pub fn pipeline_error_rate(&self, pipeline_id: &str) -> Result<f64, CascadingError> {
692        if !self.config.enabled {
693            return Ok(0.0);
694        }
695        Self::validate_pipeline_id(pipeline_id)?;
696
697        let pipelines = self
698            .pipelines
699            .read()
700            .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
701
702        if let Some(state) = pipelines.get(pipeline_id) {
703            Ok(self.compute_error_rate_inner(state, Self::now_secs()))
704        } else {
705            Ok(0.0)
706        }
707    }
708
709    /// Check if a pipeline's circuit is currently broken.
710    pub fn is_pipeline_broken(&self, pipeline_id: &str) -> Result<bool, CascadingError> {
711        if !self.config.enabled {
712            return Ok(false);
713        }
714        Self::validate_pipeline_id(pipeline_id)?;
715
716        let pipelines = self
717            .pipelines
718            .read()
719            .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
720
721        Ok(pipelines
722            .get(pipeline_id)
723            .map(|s| s.is_broken)
724            .unwrap_or(false))
725    }
726
727    /// Get summary statistics for all pipelines.
728    pub fn pipeline_summary(&self) -> Result<CascadingSummary, CascadingError> {
729        let pipelines = self
730            .pipelines
731            .read()
732            .map_err(|_| CascadingError::LockPoisoned("pipelines read lock".to_string()))?;
733        let chains = self
734            .chains
735            .read()
736            .map_err(|_| CascadingError::LockPoisoned("chains read lock".to_string()))?;
737
738        let mut healthy = 0usize;
739        let mut broken = 0usize;
740        for state in pipelines.values() {
741            if state.is_broken {
742                broken = broken.saturating_add(1);
743            } else {
744                healthy = healthy.saturating_add(1);
745            }
746        }
747
748        Ok(CascadingSummary {
749            total_pipelines: pipelines.len(),
750            healthy_pipelines: healthy,
751            broken_pipelines: broken,
752            active_chains: chains.len(),
753            max_chain_depth: self.config.max_chain_depth,
754        })
755    }
756
757    // ═══════════════════════════════════════════════
758    // UTILITY
759    // ═══════════════════════════════════════════════
760
761    fn now_secs() -> u64 {
762        std::time::SystemTime::now()
763            .duration_since(std::time::UNIX_EPOCH)
764            .map(|d| d.as_secs())
765            .unwrap_or_else(|e| {
766                // SECURITY (R245-ENG-1): Return 1, not 0, on pre-epoch clock —
767                // consistent with collusion.rs. A 0 value could cause division-by-zero
768                // or off-by-one in time-window arithmetic.
769                tracing::warn!(error = %e, "SystemTime before UNIX_EPOCH — using 1");
770                1
771            })
772    }
773}
774
775/// Summary of cascading breaker state.
776#[derive(Debug, Clone, Default, Serialize, Deserialize)]
777#[serde(deny_unknown_fields)]
778pub struct CascadingSummary {
779    pub total_pipelines: usize,
780    pub healthy_pipelines: usize,
781    pub broken_pipelines: usize,
782    pub active_chains: usize,
783    pub max_chain_depth: u32,
784}
785
786// ═══════════════════════════════════════════════════
787// TESTS
788// ═══════════════════════════════════════════════════
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793
794    fn default_config() -> CascadingConfig {
795        CascadingConfig::default()
796    }
797
798    fn make_breaker() -> CascadingBreaker {
799        CascadingBreaker::new(default_config()).unwrap()
800    }
801
802    // ────────────────────────────────────────────────
803    // Config validation
804    // ────────────────────────────────────────────────
805
806    #[test]
807    fn test_config_validate_default_ok() {
808        assert!(CascadingConfig::default().validate().is_ok());
809    }
810
811    #[test]
812    fn test_config_validate_zero_depth_rejected() {
813        let mut cfg = default_config();
814        cfg.max_chain_depth = 0;
815        assert!(cfg.validate().is_err());
816    }
817
818    #[test]
819    fn test_config_validate_excessive_depth_rejected() {
820        let mut cfg = default_config();
821        cfg.max_chain_depth = ABSOLUTE_MAX_CHAIN_DEPTH + 1;
822        assert!(cfg.validate().is_err());
823    }
824
825    #[test]
826    fn test_config_validate_nan_error_rate_rejected() {
827        let mut cfg = default_config();
828        cfg.error_rate_threshold = f64::NAN;
829        assert!(cfg.validate().is_err());
830    }
831
832    #[test]
833    fn test_config_validate_negative_error_rate_rejected() {
834        let mut cfg = default_config();
835        cfg.error_rate_threshold = -0.1;
836        assert!(cfg.validate().is_err());
837    }
838
839    #[test]
840    fn test_config_validate_above_one_error_rate_rejected() {
841        let mut cfg = default_config();
842        cfg.error_rate_threshold = 1.1;
843        assert!(cfg.validate().is_err());
844    }
845
846    #[test]
847    fn test_config_validate_zero_window_rejected() {
848        let mut cfg = default_config();
849        cfg.window_secs = 0;
850        assert!(cfg.validate().is_err());
851    }
852
853    #[test]
854    fn test_config_validate_zero_break_duration_rejected() {
855        let mut cfg = default_config();
856        cfg.break_duration_secs = 0;
857        assert!(cfg.validate().is_err());
858    }
859
860    // ── R245 regression tests ─────────────────────────────────────────
861
862    #[test]
863    fn test_r245_config_validate_zero_min_window_events_rejected() {
864        let mut cfg = default_config();
865        cfg.min_window_events = 0;
866        let err = cfg.validate().unwrap_err();
867        let msg = format!("{err:?}");
868        assert!(msg.contains("min_window_events must be >= 1"));
869    }
870
871    #[test]
872    fn test_r245_config_validate_one_min_window_events_accepted() {
873        let mut cfg = default_config();
874        cfg.min_window_events = 1;
875        assert!(cfg.validate().is_ok());
876    }
877
878    // ────────────────────────────────────────────────
879    // Chain depth tracking
880    // ────────────────────────────────────────────────
881
882    #[test]
883    fn test_enter_chain_starts_at_depth_one() {
884        let breaker = make_breaker();
885        let depth = breaker.enter_chain("chain-1", "pipeline-1").unwrap();
886        assert_eq!(depth, 1);
887    }
888
889    #[test]
890    fn test_enter_chain_increments_depth() {
891        let breaker = make_breaker();
892        assert_eq!(breaker.enter_chain("chain-1", "pipe-1").unwrap(), 1);
893        assert_eq!(breaker.enter_chain("chain-1", "pipe-1").unwrap(), 2);
894        assert_eq!(breaker.enter_chain("chain-1", "pipe-1").unwrap(), 3);
895    }
896
897    #[test]
898    fn test_enter_chain_depth_exceeded_denied() {
899        let mut cfg = default_config();
900        cfg.max_chain_depth = 3;
901        let breaker = CascadingBreaker::new(cfg).unwrap();
902
903        assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 1);
904        assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 2);
905        assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 3);
906
907        // Fourth call should fail.
908        let result = breaker.enter_chain("c1", "p1");
909        assert!(result.is_err());
910        match result.err().unwrap() {
911            CascadingError::ChainDepthExceeded { current, max } => {
912                assert_eq!(current, 4);
913                assert_eq!(max, 3);
914            }
915            other => panic!("Expected ChainDepthExceeded, got {other:?}"),
916        }
917    }
918
919    #[test]
920    fn test_exit_chain_decrements_depth() {
921        let breaker = make_breaker();
922        breaker.enter_chain("c1", "p1").unwrap();
923        breaker.enter_chain("c1", "p1").unwrap();
924        breaker.enter_chain("c1", "p1").unwrap();
925
926        assert_eq!(breaker.exit_chain("c1").unwrap(), 2);
927        assert_eq!(breaker.exit_chain("c1").unwrap(), 1);
928        assert_eq!(breaker.exit_chain("c1").unwrap(), 0);
929    }
930
931    #[test]
932    fn test_exit_chain_removes_at_zero() {
933        let breaker = make_breaker();
934        breaker.enter_chain("c1", "p1").unwrap();
935        breaker.exit_chain("c1").unwrap();
936        assert_eq!(breaker.chain_depth("c1").unwrap(), 0);
937    }
938
939    #[test]
940    fn test_exit_chain_nonexistent_returns_zero() {
941        let breaker = make_breaker();
942        assert_eq!(breaker.exit_chain("nonexistent").unwrap(), 0);
943    }
944
945    #[test]
946    fn test_chain_depth_nonexistent_returns_zero() {
947        let breaker = make_breaker();
948        assert_eq!(breaker.chain_depth("nonexistent").unwrap(), 0);
949    }
950
951    #[test]
952    fn test_enter_chain_disabled_returns_zero() {
953        let mut cfg = default_config();
954        cfg.enabled = false;
955        let breaker = CascadingBreaker::new(cfg).unwrap();
956        assert_eq!(breaker.enter_chain("c1", "p1").unwrap(), 0);
957    }
958
959    // ────────────────────────────────────────────────
960    // Pipeline error rate tracking
961    // ────────────────────────────────────────────────
962
963    #[test]
964    fn test_check_pipeline_healthy_ok() {
965        let breaker = make_breaker();
966        assert!(breaker.check_pipeline("pipe-1").is_ok());
967    }
968
969    #[test]
970    fn test_record_pipeline_mostly_success_no_break() {
971        let breaker = make_breaker();
972        // Record mostly successes with occasional errors (20% error rate < 50% threshold).
973        for _ in 0..20 {
974            breaker.record_pipeline_success("pipe-1").unwrap();
975            breaker.record_pipeline_success("pipe-1").unwrap();
976            breaker.record_pipeline_success("pipe-1").unwrap();
977            breaker.record_pipeline_success("pipe-1").unwrap();
978            assert!(!breaker.record_pipeline_error("pipe-1").unwrap());
979        }
980        assert!(breaker.check_pipeline("pipe-1").is_ok());
981    }
982
983    #[test]
984    fn test_record_pipeline_error_breaks_circuit() {
985        let mut cfg = default_config();
986        cfg.error_rate_threshold = 0.5;
987        cfg.min_window_events = 4;
988        let breaker = CascadingBreaker::new(cfg).unwrap();
989
990        // Record 5 errors (100% error rate > 50% threshold, >= 4 min events).
991        for i in 0..5 {
992            let broke = breaker.record_pipeline_error("pipe-1").unwrap();
993            if i >= 3 {
994                // After 4th event (index 3), should break.
995                if broke {
996                    assert!(breaker.is_pipeline_broken("pipe-1").unwrap());
997                    return;
998                }
999            }
1000        }
1001        // Should have broken by now.
1002        assert!(
1003            breaker.is_pipeline_broken("pipe-1").unwrap(),
1004            "Pipeline should be broken after 5 consecutive errors"
1005        );
1006    }
1007
1008    #[test]
1009    fn test_pipeline_error_rate_computed_correctly() {
1010        let mut cfg = default_config();
1011        cfg.min_window_events = 2;
1012        let breaker = CascadingBreaker::new(cfg).unwrap();
1013
1014        breaker.record_pipeline_success("pipe-1").unwrap();
1015        breaker.record_pipeline_success("pipe-1").unwrap();
1016        breaker.record_pipeline_error("pipe-1").unwrap();
1017        breaker.record_pipeline_error("pipe-1").unwrap();
1018
1019        let rate = breaker.pipeline_error_rate("pipe-1").unwrap();
1020        assert!(
1021            (rate - 0.5).abs() < 0.01,
1022            "Error rate should be ~0.5, got {rate}"
1023        );
1024    }
1025
1026    #[test]
1027    fn test_pipeline_error_rate_nonexistent_returns_zero() {
1028        let breaker = make_breaker();
1029        assert_eq!(breaker.pipeline_error_rate("nonexistent").unwrap(), 0.0);
1030    }
1031
1032    #[test]
1033    fn test_check_pipeline_disabled_ok() {
1034        let mut cfg = default_config();
1035        cfg.enabled = false;
1036        let breaker = CascadingBreaker::new(cfg).unwrap();
1037        assert!(breaker.check_pipeline("pipe-1").is_ok());
1038    }
1039
1040    #[test]
1041    fn test_is_pipeline_broken_nonexistent_false() {
1042        let breaker = make_breaker();
1043        assert!(!breaker.is_pipeline_broken("nonexistent").unwrap());
1044    }
1045
1046    // ────────────────────────────────────────────────
1047    // Summary
1048    // ────────────────────────────────────────────────
1049
1050    #[test]
1051    fn test_pipeline_summary_empty() {
1052        let breaker = make_breaker();
1053        let summary = breaker.pipeline_summary().unwrap();
1054        assert_eq!(summary.total_pipelines, 0);
1055        assert_eq!(summary.healthy_pipelines, 0);
1056        assert_eq!(summary.broken_pipelines, 0);
1057        assert_eq!(summary.active_chains, 0);
1058        assert_eq!(summary.max_chain_depth, 10);
1059    }
1060
1061    #[test]
1062    fn test_pipeline_summary_with_data() {
1063        let mut cfg = default_config();
1064        cfg.error_rate_threshold = 0.5;
1065        cfg.min_window_events = 2;
1066        let breaker = CascadingBreaker::new(cfg).unwrap();
1067
1068        // Healthy pipeline.
1069        breaker.record_pipeline_success("pipe-1").unwrap();
1070        breaker.record_pipeline_success("pipe-1").unwrap();
1071
1072        // Broken pipeline.
1073        breaker.record_pipeline_error("pipe-2").unwrap();
1074        breaker.record_pipeline_error("pipe-2").unwrap();
1075        breaker.record_pipeline_error("pipe-2").unwrap();
1076
1077        // Active chain.
1078        breaker.enter_chain("chain-1", "pipe-1").unwrap();
1079
1080        let summary = breaker.pipeline_summary().unwrap();
1081        assert_eq!(summary.total_pipelines, 2);
1082        assert_eq!(summary.active_chains, 1);
1083        // pipe-2 should be broken.
1084        assert!(summary.broken_pipelines >= 1);
1085    }
1086
1087    // ────────────────────────────────────────────────
1088    // Input validation
1089    // ────────────────────────────────────────────────
1090
1091    #[test]
1092    fn test_validate_pipeline_id_empty_rejected() {
1093        let breaker = make_breaker();
1094        assert!(breaker.check_pipeline("").is_err());
1095    }
1096
1097    #[test]
1098    fn test_validate_pipeline_id_too_long_rejected() {
1099        let breaker = make_breaker();
1100        let long_id = "p".repeat(MAX_PIPELINE_ID_LEN + 1);
1101        assert!(breaker.check_pipeline(&long_id).is_err());
1102    }
1103
1104    #[test]
1105    fn test_validate_pipeline_id_control_chars_rejected() {
1106        let breaker = make_breaker();
1107        assert!(breaker.check_pipeline("pipe\0line").is_err());
1108    }
1109
1110    #[test]
1111    fn test_validate_chain_id_empty_rejected() {
1112        let breaker = make_breaker();
1113        assert!(breaker.enter_chain("", "pipe-1").is_err());
1114    }
1115
1116    // ────────────────────────────────────────────────
1117    // Serialization
1118    // ────────────────────────────────────────────────
1119
1120    #[test]
1121    fn test_config_serialization_roundtrip() {
1122        let cfg = CascadingConfig::default();
1123        let json = serde_json::to_string(&cfg).unwrap();
1124        let parsed: CascadingConfig = serde_json::from_str(&json).unwrap();
1125        assert_eq!(parsed.max_chain_depth, cfg.max_chain_depth);
1126        assert_eq!(parsed.error_rate_threshold, cfg.error_rate_threshold);
1127    }
1128
1129    #[test]
1130    fn test_config_deny_unknown_fields() {
1131        let json = r#"{"enabled": true, "bogus": 42}"#;
1132        let result: Result<CascadingConfig, _> = serde_json::from_str(json);
1133        assert!(
1134            result.is_err(),
1135            "deny_unknown_fields should reject unknown fields"
1136        );
1137    }
1138
1139    #[test]
1140    fn test_summary_serialization_roundtrip() {
1141        let summary = CascadingSummary {
1142            total_pipelines: 5,
1143            healthy_pipelines: 3,
1144            broken_pipelines: 2,
1145            active_chains: 10,
1146            max_chain_depth: 10,
1147        };
1148        let json = serde_json::to_string(&summary).unwrap();
1149        let parsed: CascadingSummary = serde_json::from_str(&json).unwrap();
1150        assert_eq!(parsed.total_pipelines, 5);
1151        assert_eq!(parsed.broken_pipelines, 2);
1152    }
1153
1154    #[test]
1155    fn test_summary_deny_unknown_fields() {
1156        let json = r#"{"total_pipelines":5,"healthy_pipelines":3,"broken_pipelines":2,"active_chains":10,"max_chain_depth":10,"extra":"bad"}"#;
1157        let result: Result<CascadingSummary, _> = serde_json::from_str(json);
1158        assert!(
1159            result.is_err(),
1160            "deny_unknown_fields should reject unknown fields"
1161        );
1162    }
1163
1164    // ── R229 regression tests ───────────────────────────────────────────
1165
1166    #[test]
1167    fn test_r229_pipeline_capacity_returns_error_not_ok() {
1168        // R229-ENG-1: Verify that pipeline tracker at capacity returns Err,
1169        // not Ok(()) which would silently drop events.
1170        let _breaker = make_breaker();
1171        // The capacity check is in record_pipeline_event. We can't easily fill
1172        // 10,000 pipelines in a unit test, but we can verify the error type exists
1173        // and is properly constructed.
1174        let err = CascadingError::ChainDepthExceeded {
1175            current: 10_000,
1176            max: 10_000,
1177        };
1178        let msg = format!("{err:?}");
1179        assert!(msg.contains("ChainDepthExceeded"));
1180        assert!(msg.contains("10000"));
1181    }
1182
1183    #[test]
1184    fn test_r245_pipeline_broken_display_redacts_error_rate() {
1185        let err = CascadingError::PipelineBroken {
1186            pipeline_id: "pipe-1".to_string(),
1187            error_rate: 87.5,
1188        };
1189
1190        let msg = format!("{err}");
1191        assert!(msg.contains("pipe-1"));
1192        assert!(!msg.contains("87.5"));
1193        assert!(!msg.contains('%'));
1194    }
1195}