Skip to main content

llm_agent_runtime/
orchestrator.rs

1//! # Module: Orchestrator
2//!
3//! ## Responsibility
4//! Provides a composable LLM pipeline with circuit breaking, retry, deduplication,
5//! and backpressure. Mirrors the public API of `tokio-prompt-orchestrator`.
6//!
7//! ## Guarantees
8//! - Thread-safe: all types wrap state in `Arc<Mutex<_>>` or atomics
9//! - Circuit breaker opens after `threshold` consecutive failures
10//! - RetryPolicy delays grow exponentially and are capped at [`MAX_RETRY_DELAY`]
11//! - Deduplicator is deterministic and non-blocking
12//! - BackpressureGuard never exceeds declared hard capacity
13//! - Non-panicking: all operations return `Result`
14//!
15//! ## NOT Responsible For
16//! - Cross-node circuit breakers (single-process only, unless a distributed backend is provided)
17//! - Persistent deduplication (in-memory, bounded TTL)
18//! - Distributed backpressure
19//!
20//! ## Composing the Primitives
21//!
22//! The four primitives are designed to be layered. A typical production setup:
23//!
24//! ```text
25//! request
26//!   │
27//!   ▼
28//! BackpressureGuard  ← shed if too many in-flight requests
29//!   │
30//!   ▼
31//! Deduplicator       ← return cached result for duplicate keys
32//!   │
33//!   ▼
34//! CircuitBreaker     ← fast-fail if the downstream is unhealthy
35//!   │
36//!   ▼
37//! RetryPolicy        ← retry transient failures with exponential backoff
38//!   │
39//!   ▼
40//! Pipeline           ← transform request/response through named stages
41//! ```
42
43use crate::error::AgentRuntimeError;
44use crate::util::timed_lock;
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, Instant};
48
49/// Maximum delay between retries — caps exponential growth.
50pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
51
52// ── RetryPolicy ───────────────────────────────────────────────────────────────
53
54/// Retry mode: exponential backoff or constant interval.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum RetryKind {
57    /// Delay doubles each attempt: `base_delay * 2^(attempt-1)`.
58    Exponential,
59    /// Delay is fixed at `base_delay` for every attempt.
60    Constant,
61}
62
63/// Configurable retry policy with exponential backoff or constant interval.
64#[derive(Debug, Clone)]
65pub struct RetryPolicy {
66    /// Maximum number of attempts (including the first).
67    pub max_attempts: u32,
68    /// Base delay for the first retry.
69    pub base_delay: Duration,
70    /// Whether to use exponential or constant delay.
71    pub kind: RetryKind,
72}
73
74impl RetryPolicy {
75    /// Create an exponential retry policy.
76    ///
77    /// # Arguments
78    /// * `max_attempts` — total attempt budget (must be ≥ 1)
79    /// * `base_ms` — base delay in milliseconds for attempt 1
80    ///
81    /// # Returns
82    /// - `Ok(RetryPolicy)` — on success
83    /// - `Err(AgentRuntimeError::Orchestration)` — if `max_attempts == 0`
84    pub fn exponential(max_attempts: u32, base_ms: u64) -> Result<Self, AgentRuntimeError> {
85        if max_attempts == 0 {
86            return Err(AgentRuntimeError::Orchestration(
87                "max_attempts must be >= 1".into(),
88            ));
89        }
90        if base_ms == 0 {
91            return Err(AgentRuntimeError::Orchestration(
92                "base_ms must be >= 1 to avoid zero-delay busy-loop retries".into(),
93            ));
94        }
95        Ok(Self {
96            max_attempts,
97            base_delay: Duration::from_millis(base_ms),
98            kind: RetryKind::Exponential,
99        })
100    }
101
102    /// Create a constant (fixed-interval) retry policy.
103    ///
104    /// Every retry waits exactly `delay_ms` milliseconds regardless of attempt
105    /// number, unlike [`exponential`] which doubles the delay each time.
106    ///
107    /// [`exponential`]: RetryPolicy::exponential
108    ///
109    /// # Returns
110    /// - `Ok(RetryPolicy)` — on success
111    /// - `Err(AgentRuntimeError::Orchestration)` — if `max_attempts == 0` or `delay_ms == 0`
112    pub fn constant(max_attempts: u32, delay_ms: u64) -> Result<Self, AgentRuntimeError> {
113        if max_attempts == 0 {
114            return Err(AgentRuntimeError::Orchestration(
115                "max_attempts must be >= 1".into(),
116            ));
117        }
118        if delay_ms == 0 {
119            return Err(AgentRuntimeError::Orchestration(
120                "delay_ms must be >= 1 to avoid busy-loop retries".into(),
121            ));
122        }
123        Ok(Self {
124            max_attempts,
125            base_delay: Duration::from_millis(delay_ms),
126            kind: RetryKind::Constant,
127        })
128    }
129
130    /// Create a no-retry policy (single attempt, no delay).
131    ///
132    /// Useful for one-shot operations or when the caller manages retry logic externally.
133    pub fn none() -> Self {
134        Self {
135            max_attempts: 1,
136            base_delay: Duration::ZERO,
137            kind: RetryKind::Constant,
138        }
139    }
140
141    /// Return `true` if this policy makes at most one attempt with no delay.
142    ///
143    /// Equivalent to `max_attempts == 1 && base_delay == Duration::ZERO`.
144    pub fn is_none(&self) -> bool {
145        self.max_attempts == 1 && self.base_delay == Duration::ZERO
146    }
147
148    /// Return a copy of this policy with `max_attempts` changed.
149    ///
150    /// # Errors
151    /// Returns `Err` if `n == 0`.
152    pub fn with_max_attempts(mut self, n: u32) -> Result<Self, AgentRuntimeError> {
153        if n == 0 {
154            return Err(AgentRuntimeError::Orchestration(
155                "max_attempts must be >= 1".into(),
156            ));
157        }
158        self.max_attempts = n;
159        Ok(self)
160    }
161
162    /// Return the configured maximum number of attempts.
163    pub fn max_attempts(&self) -> u32 {
164        self.max_attempts
165    }
166
167    /// Return `true` if this policy performs no retries (max_attempts ≤ 1).
168    ///
169    /// Useful for short-circuiting retry logic in hot paths.
170    pub fn is_no_retry(&self) -> bool {
171        self.max_attempts <= 1
172    }
173
174    /// Return `true` if this policy allows at least one retry (max_attempts > 1).
175    ///
176    /// Complement of [`is_no_retry`].
177    ///
178    /// [`is_no_retry`]: RetryPolicy::is_no_retry
179    pub fn will_retry_at_all(&self) -> bool {
180        self.max_attempts > 1
181    }
182
183    /// Return `true` if this policy uses exponential back-off between retries.
184    pub fn is_exponential(&self) -> bool {
185        matches!(self.kind, RetryKind::Exponential)
186    }
187
188    /// Return `true` if this policy uses a constant (fixed-interval) delay between retries.
189    pub fn is_constant(&self) -> bool {
190        matches!(self.kind, RetryKind::Constant)
191    }
192
193    /// Return the configured base delay in milliseconds.
194    ///
195    /// For constant policies this equals every per-retry delay.  For
196    /// exponential policies this is the delay before the first retry.
197    pub fn base_delay_ms(&self) -> u64 {
198        self.base_delay.as_millis() as u64
199    }
200
201    /// Return the delay before the first retry in milliseconds.
202    ///
203    /// Alias for `base_delay_ms`; the name communicates intent more clearly at
204    /// call sites that only care about the first-retry delay.
205    pub fn first_delay_ms(&self) -> u64 {
206        self.base_delay_ms()
207    }
208
209    /// Return `true` if `attempt` is the last allowed attempt for this policy.
210    ///
211    /// `attempt` is 1-indexed: `attempt == max_attempts` means no more retries.
212    pub fn is_last_attempt(&self, attempt: u32) -> bool {
213        attempt >= self.max_attempts
214    }
215
216    /// Return the sum of all per-attempt delays across all attempts, in milliseconds.
217    ///
218    /// For exponential policies each attempt's delay is capped at
219    /// [`MAX_RETRY_DELAY`].  For constant policies every attempt uses
220    /// `base_delay_ms`.
221    pub fn max_total_delay_ms(&self) -> u64 {
222        (1..=self.max_attempts)
223            .map(|attempt| self.delay_for(attempt).as_millis() as u64)
224            .sum()
225    }
226
227    /// Return the sum of delays for the first `n` attempts, in milliseconds.
228    ///
229    /// If `n > max_attempts`, only `max_attempts` delays are summed.
230    pub fn delay_sum_ms(&self, n: u32) -> u64 {
231        let limit = n.min(self.max_attempts);
232        (1..=limit)
233            .map(|attempt| self.delay_for(attempt).as_millis() as u64)
234            .sum()
235    }
236
237    /// Return the average delay per attempt in milliseconds.
238    ///
239    /// Returns `0` for policies with no delay (e.g. `RetryPolicy::none()`).
240    pub fn avg_delay_ms(&self) -> u64 {
241        if self.max_attempts == 0 {
242            return 0;
243        }
244        self.max_total_delay_ms() / self.max_attempts as u64
245    }
246
247    /// Return the effective backoff factor per attempt.
248    ///
249    /// Returns `2.0` for exponential policies and `1.0` for constant policies.
250    pub fn backoff_factor(&self) -> f64 {
251        match self.kind {
252            RetryKind::Exponential => 2.0,
253            RetryKind::Constant => 1.0,
254        }
255    }
256
257    /// Return a copy of this policy with the base delay changed to `ms` milliseconds.
258    ///
259    /// # Errors
260    /// Returns `Err` if `ms == 0`.
261    pub fn with_base_delay_ms(mut self, ms: u64) -> Result<Self, AgentRuntimeError> {
262        if ms == 0 {
263            return Err(AgentRuntimeError::Orchestration(
264                "base_delay_ms must be >= 1 to avoid busy-loop retries".into(),
265            ));
266        }
267        self.base_delay = Duration::from_millis(ms);
268        Ok(self)
269    }
270
271    /// Return the delay for `attempt` in whole milliseconds.
272    ///
273    /// Convenience wrapper around [`delay_for`] for use in logging and metrics
274    /// where a `u64` is easier to handle than a `Duration`.
275    ///
276    /// [`delay_for`]: RetryPolicy::delay_for
277    pub fn delay_ms_for(&self, attempt: u32) -> u64 {
278        self.delay_for(attempt).as_millis() as u64
279    }
280
281    /// Return the total maximum delay in milliseconds across all retry attempts.
282    ///
283    /// Sums `delay_for(attempt)` for every attempt from 1 to `max_attempts`.
284    /// Useful for estimating worst-case latency budgets.
285    pub fn total_max_delay_ms(&self) -> u64 {
286        (1..=self.max_attempts)
287            .map(|a| self.delay_for(a).as_millis() as u64)
288            .sum()
289    }
290
291    /// Return the number of attempts still available after `attempt` have been made.
292    ///
293    /// Returns `0` once the budget is exhausted (`attempt >= max_attempts`).
294    pub fn attempts_remaining(&self, attempt: u32) -> u32 {
295        self.max_attempts.saturating_sub(attempt)
296    }
297
298    /// Return the fraction of the retry budget consumed after `attempt` attempts.
299    ///
300    /// Computed as `attempt / max_attempts`, clamped to `[0.0, 1.0]`.
301    /// Returns `1.0` when `max_attempts` is zero (budget fully consumed by
302    /// definition).
303    ///
304    /// Useful for surfacing "how far through the retry budget are we" in
305    /// dashboards and progress logs.
306    pub fn attempts_budget_used(&self, attempt: u32) -> f64 {
307        if self.max_attempts == 0 {
308            return 1.0;
309        }
310        (attempt as f64 / self.max_attempts as f64).min(1.0)
311    }
312
313    /// Return the maximum delay for any single attempt in milliseconds.
314    ///
315    /// For `Exponential` policies this is the delay for the last attempt
316    /// (which may be capped by `MAX_RETRY_DELAY`).  For `Constant` policies it
317    /// equals the base delay.  Returns `0` for a `none()` policy.
318    pub fn max_delay_ms(&self) -> u64 {
319        if self.max_attempts == 0 {
320            return 0;
321        }
322        self.delay_ms_for(self.max_attempts)
323    }
324
325    /// Return `true` if another attempt is permitted after `attempt` failures.
326    ///
327    /// `attempt` is the number of attempts already made (0-based: `0` means
328    /// no attempt has been made yet).  Returns `false` once the budget is
329    /// exhausted (i.e. `attempt >= max_attempts`).
330    pub fn can_retry(&self, attempt: u32) -> bool {
331        attempt < self.max_attempts
332    }
333
334    /// Compute the delay before the given attempt number (1-based).
335    ///
336    /// - [`RetryKind::Exponential`]: `base_delay * 2^(attempt-1)`, capped at `MAX_RETRY_DELAY`.
337    /// - [`RetryKind::Constant`]: always returns `base_delay`.
338    pub fn delay_for(&self, attempt: u32) -> Duration {
339        match self.kind {
340            RetryKind::Constant => self.base_delay.min(MAX_RETRY_DELAY),
341            RetryKind::Exponential => {
342                let exp = attempt.saturating_sub(1);
343                let multiplier = 1u64.checked_shl(exp.min(63)).unwrap_or(u64::MAX);
344                let millis = self
345                    .base_delay
346                    .as_millis()
347                    .saturating_mul(multiplier as u128);
348                let raw = Duration::from_millis(millis.min(u64::MAX as u128) as u64);
349                raw.min(MAX_RETRY_DELAY)
350            }
351        }
352    }
353
354    /// Return `true` if the policy has a finite retry limit.
355    ///
356    /// A policy with `max_attempts < u32::MAX` is considered bounded.
357    /// In practice all policies created via the public constructors are bounded.
358    pub fn is_bounded(&self) -> bool {
359        self.max_attempts < u32::MAX
360    }
361
362    /// Return the remaining wait budget in milliseconds after `attempts_done`
363    /// have been completed.
364    ///
365    /// Computed as `max_total_delay_ms().saturating_sub(delay_sum_ms(attempts_done))`.
366    /// Returns `0` when `attempts_done` equals or exceeds `max_attempts`.
367    pub fn remaining_wait_budget_ms(&self, attempts_done: u32) -> u64 {
368        self.max_total_delay_ms().saturating_sub(self.delay_sum_ms(attempts_done))
369    }
370
371    /// Return the maximum delay a single retry attempt can incur, in
372    /// milliseconds.
373    ///
374    /// For exponential policies this is `delay_for(max_attempts)` which equals
375    /// `base * 2^(max_attempts-1)` capped at the global `MAX_RETRY_DELAY`.
376    /// For constant policies this equals the configured base delay.
377    pub fn max_single_delay_ms(&self) -> u64 {
378        self.delay_for(self.max_attempts).as_millis() as u64
379    }
380
381    /// Return `true` if this policy allows at least `n` retry attempts
382    /// (i.e. `max_attempts > n`).
383    ///
384    /// Useful for validating that a policy can tolerate a minimum number of
385    /// consecutive failures before giving up.
386    pub fn covers_n_failures(&self, n: u32) -> bool {
387        self.max_attempts > n
388    }
389}
390
391impl std::fmt::Display for RetryPolicy {
392    /// Render as `"Exponential(n×, base=Xms)"` or `"Constant(n×, delay=Xms)"`.
393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394        match self.kind {
395            RetryKind::Exponential => write!(
396                f,
397                "Exponential({}×, base={}ms)",
398                self.max_attempts,
399                self.base_delay.as_millis()
400            ),
401            RetryKind::Constant => write!(
402                f,
403                "Constant({}×, delay={}ms)",
404                self.max_attempts,
405                self.base_delay.as_millis()
406            ),
407        }
408    }
409}
410
411// ── CircuitBreaker ────────────────────────────────────────────────────────────
412
413/// Tracks failure rates and opens when the threshold is exceeded.
414///
415/// States: `Closed` (normal) → `Open` (fast-fail) → `HalfOpen` (probe).
416///
417/// Note: `PartialEq` is implemented manually because the `Open` variant
418/// contains `std::time::Instant` which does not implement `Eq`. The manual
419/// implementation compares only the variant discriminant, not the timestamp.
420#[derive(Debug, Clone)]
421pub enum CircuitState {
422    /// Circuit is operating normally; requests pass through.
423    Closed,
424    /// Circuit has tripped; requests are fast-failed without calling the operation.
425    Open {
426        /// The instant at which the circuit was opened.
427        opened_at: Instant,
428    },
429    /// Recovery probe period; the next request will be attempted to test recovery.
430    HalfOpen,
431}
432
433impl PartialEq for CircuitState {
434    fn eq(&self, other: &Self) -> bool {
435        match (self, other) {
436            (CircuitState::Closed, CircuitState::Closed) => true,
437            (CircuitState::Open { .. }, CircuitState::Open { .. }) => true,
438            (CircuitState::HalfOpen, CircuitState::HalfOpen) => true,
439            _ => false,
440        }
441    }
442}
443
444impl Eq for CircuitState {}
445
446impl std::fmt::Display for CircuitState {
447    /// Render as `"Closed"`, `"Open"`, or `"HalfOpen"`.
448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449        match self {
450            CircuitState::Closed => write!(f, "Closed"),
451            CircuitState::Open { .. } => write!(f, "Open"),
452            CircuitState::HalfOpen => write!(f, "HalfOpen"),
453        }
454    }
455}
456
457/// Backend for circuit breaker state storage.
458///
459/// Implement this trait to share circuit breaker state across processes
460/// (e.g., via Redis). The in-process default is `InMemoryCircuitBreakerBackend`.
461///
462/// Note: Methods are synchronous to avoid pulling in `async-trait`. A
463/// distributed backend (e.g., Redis) can internally spawn a Tokio runtime.
464pub trait CircuitBreakerBackend: Send + Sync {
465    /// Increment the consecutive failure count for `service` and return the new count.
466    fn increment_failures(&self, service: &str) -> u32;
467    /// Reset the consecutive failure count for `service` to zero.
468    fn reset_failures(&self, service: &str);
469    /// Return the current consecutive failure count for `service`.
470    fn get_failures(&self, service: &str) -> u32;
471    /// Record the instant at which the circuit was opened for `service`.
472    fn set_open_at(&self, service: &str, at: std::time::Instant);
473    /// Clear the open-at timestamp, effectively moving the circuit to Closed or HalfOpen.
474    fn clear_open_at(&self, service: &str);
475    /// Return the instant at which the circuit was opened, or `None` if it is not open.
476    fn get_open_at(&self, service: &str) -> Option<std::time::Instant>;
477}
478
479// ── InMemoryCircuitBreakerBackend ─────────────────────────────────────────────
480
481/// In-process circuit breaker backend backed by a `Mutex<HashMap>`.
482///
483/// Each service name gets its own independent failure counter and open-at
484/// timestamp.  Multiple `CircuitBreaker` instances that share the same
485/// backend (via [`CircuitBreaker::with_backend`]) will correctly track
486/// failures per service rather than sharing a single counter.
487pub struct InMemoryCircuitBreakerBackend {
488    inner: Arc<Mutex<HashMap<String, InMemoryServiceState>>>,
489}
490
491#[derive(Default)]
492struct InMemoryServiceState {
493    consecutive_failures: u32,
494    open_at: Option<std::time::Instant>,
495}
496
497impl InMemoryCircuitBreakerBackend {
498    /// Create a new in-memory backend with all counters at zero.
499    pub fn new() -> Self {
500        Self {
501            inner: Arc::new(Mutex::new(HashMap::new())),
502        }
503    }
504}
505
506impl Default for InMemoryCircuitBreakerBackend {
507    fn default() -> Self {
508        Self::new()
509    }
510}
511
512impl CircuitBreakerBackend for InMemoryCircuitBreakerBackend {
513    fn increment_failures(&self, service: &str) -> u32 {
514        let mut map = timed_lock(
515            &self.inner,
516            "InMemoryCircuitBreakerBackend::increment_failures",
517        );
518        let state = map.entry(service.to_owned()).or_default();
519        state.consecutive_failures += 1;
520        state.consecutive_failures
521    }
522
523    fn reset_failures(&self, service: &str) {
524        let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::reset_failures");
525        if let Some(state) = map.get_mut(service) {
526            state.consecutive_failures = 0;
527        }
528    }
529
530    fn get_failures(&self, service: &str) -> u32 {
531        let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_failures");
532        map.get(service).map_or(0, |s| s.consecutive_failures)
533    }
534
535    fn set_open_at(&self, service: &str, at: std::time::Instant) {
536        let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::set_open_at");
537        map.entry(service.to_owned()).or_default().open_at = Some(at);
538    }
539
540    fn clear_open_at(&self, service: &str) {
541        let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::clear_open_at");
542        if let Some(state) = map.get_mut(service) {
543            state.open_at = None;
544        }
545    }
546
547    fn get_open_at(&self, service: &str) -> Option<std::time::Instant> {
548        let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_open_at");
549        map.get(service).and_then(|s| s.open_at)
550    }
551}
552
553// ── CircuitBreaker ────────────────────────────────────────────────────────────
554
555/// Circuit breaker guarding a fallible operation.
556///
557/// ## Guarantees
558/// - Opens after `threshold` consecutive failures
559/// - Transitions to `HalfOpen` after `recovery_window` has elapsed
560/// - Closes on the first successful probe in `HalfOpen`
561#[derive(Clone)]
562pub struct CircuitBreaker {
563    threshold: u32,
564    recovery_window: Duration,
565    service: String,
566    backend: Arc<dyn CircuitBreakerBackend>,
567}
568
569impl std::fmt::Debug for CircuitBreaker {
570    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571        f.debug_struct("CircuitBreaker")
572            .field("threshold", &self.threshold)
573            .field("recovery_window", &self.recovery_window)
574            .field("service", &self.service)
575            .finish()
576    }
577}
578
579impl CircuitBreaker {
580    /// Create a new circuit breaker backed by an in-memory backend.
581    ///
582    /// # Arguments
583    /// * `service` — name used in error messages and logs
584    /// * `threshold` — consecutive failures before opening
585    /// * `recovery_window` — how long to stay open before probing
586    pub fn new(
587        service: impl Into<String>,
588        threshold: u32,
589        recovery_window: Duration,
590    ) -> Result<Self, AgentRuntimeError> {
591        if threshold == 0 {
592            return Err(AgentRuntimeError::Orchestration(
593                "circuit breaker threshold must be >= 1".into(),
594            ));
595        }
596        let service = service.into();
597        Ok(Self {
598            threshold,
599            recovery_window,
600            service,
601            backend: Arc::new(InMemoryCircuitBreakerBackend::new()),
602        })
603    }
604
605    /// Replace the default in-memory backend with a custom one.
606    ///
607    /// Useful for sharing circuit breaker state across processes.
608    pub fn with_backend(mut self, backend: Arc<dyn CircuitBreakerBackend>) -> Self {
609        self.backend = backend;
610        self
611    }
612
613    /// Attempt to call `f`, respecting the circuit breaker state.
614    ///
615    /// # Errors
616    /// - `AgentRuntimeError::CircuitOpen` — the breaker is in the `Open` state
617    ///   and the recovery window has not yet elapsed
618    /// - `AgentRuntimeError::Orchestration` — `f` returned an error; the error
619    ///   message is the `Display` of the inner error. This call may open the
620    ///   breaker if it pushes the consecutive failure count above `threshold`.
621    #[tracing::instrument(skip(self, f))]
622    pub fn call<T, E, F>(&self, f: F) -> Result<T, AgentRuntimeError>
623    where
624        F: FnOnce() -> Result<T, E>,
625        E: std::fmt::Display,
626    {
627        // Determine effective state, potentially transitioning Open → HalfOpen.
628        let effective_state = match self.backend.get_open_at(&self.service) {
629            Some(opened_at) => {
630                if opened_at.elapsed() >= self.recovery_window {
631                    // Clear open_at to signal HalfOpen; failures remain.
632                    self.backend.clear_open_at(&self.service);
633                    tracing::info!("circuit moved to half-open for {}", self.service);
634                    CircuitState::HalfOpen
635                } else {
636                    CircuitState::Open { opened_at }
637                }
638            }
639            None => {
640                // Either Closed or HalfOpen (after a prior transition).
641                // We distinguish by checking whether failures >= threshold
642                // but no open_at is set — that means we are in HalfOpen.
643                let failures = self.backend.get_failures(&self.service);
644                if failures >= self.threshold {
645                    CircuitState::HalfOpen
646                } else {
647                    CircuitState::Closed
648                }
649            }
650        };
651
652        tracing::debug!("circuit state: {:?}", effective_state);
653
654        match effective_state {
655            CircuitState::Open { .. } => {
656                return Err(AgentRuntimeError::CircuitOpen {
657                    service: self.service.clone(),
658                });
659            }
660            CircuitState::Closed | CircuitState::HalfOpen => {}
661        }
662
663        // Execute the operation.
664        match f() {
665            Ok(val) => {
666                self.backend.reset_failures(&self.service);
667                self.backend.clear_open_at(&self.service);
668                tracing::info!("circuit closed for {}", self.service);
669                Ok(val)
670            }
671            Err(e) => {
672                let failures = self.backend.increment_failures(&self.service);
673                if failures >= self.threshold {
674                    let now = Instant::now();
675                    self.backend.set_open_at(&self.service, now);
676                    tracing::info!("circuit opened for {}", self.service);
677                }
678                Err(AgentRuntimeError::Orchestration(e.to_string()))
679            }
680        }
681    }
682
683    /// Return the current circuit state.
684    pub fn state(&self) -> Result<CircuitState, AgentRuntimeError> {
685        let state = match self.backend.get_open_at(&self.service) {
686            Some(opened_at) => {
687                if opened_at.elapsed() >= self.recovery_window {
688                    // Would transition to HalfOpen on next call; report HalfOpen.
689                    let failures = self.backend.get_failures(&self.service);
690                    if failures >= self.threshold {
691                        CircuitState::HalfOpen
692                    } else {
693                        CircuitState::Closed
694                    }
695                } else {
696                    CircuitState::Open { opened_at }
697                }
698            }
699            None => {
700                let failures = self.backend.get_failures(&self.service);
701                if failures >= self.threshold {
702                    CircuitState::HalfOpen
703                } else {
704                    CircuitState::Closed
705                }
706            }
707        };
708        Ok(state)
709    }
710
711    /// Return the consecutive failure count.
712    pub fn failure_count(&self) -> Result<u32, AgentRuntimeError> {
713        Ok(self.backend.get_failures(&self.service))
714    }
715
716    /// Record a successful call, resetting the consecutive failure counter.
717    ///
718    /// Call this when a protected operation succeeds so the circuit can
719    /// transition back to `Closed` after a `HalfOpen` probe.
720    pub fn record_success(&self) {
721        self.backend.reset_failures(&self.service);
722        self.backend.clear_open_at(&self.service);
723    }
724
725    /// Record a failed call, incrementing the consecutive failure counter.
726    ///
727    /// Opens the circuit when the failure count reaches `threshold`.
728    pub fn record_failure(&self) {
729        let failures = self.backend.increment_failures(&self.service);
730        if failures >= self.threshold {
731            self.backend.set_open_at(&self.service, Instant::now());
732            tracing::info!("circuit opened for {} (manual record)", self.service);
733        }
734    }
735
736    /// Return the service name this circuit breaker is protecting.
737    pub fn service_name(&self) -> &str {
738        &self.service
739    }
740
741    /// Return `true` if the circuit is currently `Closed` (healthy).
742    pub fn is_closed(&self) -> bool {
743        matches!(self.state(), Ok(CircuitState::Closed))
744    }
745
746    /// Return `true` if the circuit is currently `Open` (fast-failing).
747    pub fn is_open(&self) -> bool {
748        matches!(self.state(), Ok(CircuitState::Open { .. }))
749    }
750
751    /// Return `true` if the circuit is currently `HalfOpen` (probing).
752    pub fn is_half_open(&self) -> bool {
753        matches!(self.state(), Ok(CircuitState::HalfOpen))
754    }
755
756    /// Return `true` if the circuit is in a state that allows calls to proceed.
757    ///
758    /// Calls are allowed in both `Closed` and `HalfOpen` states; only `Open`
759    /// fast-fails.
760    pub fn is_healthy(&self) -> bool {
761        !self.is_open()
762    }
763
764    /// Return the configured consecutive-failure threshold.
765    ///
766    /// The circuit opens when `failure_count()` reaches this value.
767    pub fn threshold(&self) -> u32 {
768        self.threshold
769    }
770
771    /// Return how many more failures can be recorded before the circuit opens.
772    ///
773    /// Returns `0` when the circuit is already open or at the threshold.
774    /// Useful for alerting logic that needs to know how close the system is
775    /// to being cut off.
776    pub fn failure_headroom(&self) -> u32 {
777        let failures = self.backend.get_failures(&self.service);
778        self.threshold.saturating_sub(failures)
779    }
780
781    /// Return the current failure count as a ratio of the threshold.
782    ///
783    /// Returns a value in `[0.0, 1.0]` where `1.0` (or greater) means the
784    /// circuit will open (or is already open).  Returns `0.0` when the
785    /// threshold is zero to avoid division by zero.
786    pub fn failure_rate(&self) -> f64 {
787        if self.threshold == 0 {
788            return 0.0;
789        }
790        let failures = self.backend.get_failures(&self.service);
791        failures as f64 / self.threshold as f64
792    }
793
794    /// Return `true` when `failure_count()` has reached the configured threshold.
795    ///
796    /// The circuit opens immediately when this returns `true` on the next
797    /// `record_failure` call.
798    pub fn is_at_threshold(&self) -> bool {
799        let failures = self.backend.get_failures(&self.service);
800        failures >= self.threshold
801    }
802
803    /// Return the number of additional failures needed to open the circuit.
804    ///
805    /// Returns `0` when the circuit is already at or beyond threshold.
806    pub fn failures_until_open(&self) -> u32 {
807        let failures = self.backend.get_failures(&self.service);
808        self.threshold.saturating_sub(failures)
809    }
810
811    /// Return the configured recovery window duration.
812    ///
813    /// After the circuit has been `Open` for this long, it transitions to
814    /// `HalfOpen` and allows the next call through as a recovery probe.
815    pub fn recovery_window(&self) -> std::time::Duration {
816        self.recovery_window
817    }
818
819    /// Force the circuit back to `Closed` state, resetting all failure counters.
820    ///
821    /// Useful for tests and manual operator recovery.  Under normal operation
822    /// the circuit closes automatically after a successful `HalfOpen` probe.
823    pub fn reset(&self) {
824        self.backend.reset_failures(&self.service);
825        self.backend.clear_open_at(&self.service);
826        tracing::info!("circuit manually reset to Closed for {}", self.service);
827    }
828
829    /// Return a human-readable one-line summary of the circuit breaker state.
830    ///
831    /// Format: `"service='<name>' state=<State> failures=<n>/<threshold>"`.
832    ///
833    /// # Errors
834    /// Propagates any error returned by [`state`] or [`failure_count`].
835    ///
836    /// [`state`]: CircuitBreaker::state
837    /// [`failure_count`]: CircuitBreaker::failure_count
838    pub fn describe(&self) -> Result<String, AgentRuntimeError> {
839        let state = self.state()?;
840        let failures = self.failure_count()?;
841        Ok(format!(
842            "service='{}' state={} failures={}/{}",
843            self.service, state, failures, self.threshold
844        ))
845    }
846
847    /// Execute an async fallible operation under the circuit breaker using an
848    /// [`AsyncCircuitBreakerBackend`].
849    ///
850    /// This is the async counterpart of [`call`] and is intended for backends
851    /// that perform genuine async I/O (e.g. Redis, etcd, distributed stores).
852    /// The in-process default can be used via [`InMemoryCircuitBreakerBackend`]
853    /// which trivially implements `AsyncCircuitBreakerBackend`.
854    ///
855    /// [`call`]: CircuitBreaker::call
856    #[tracing::instrument(skip(self, backend, f))]
857    pub async fn async_call<T, E, F, Fut>(
858        &self,
859        backend: &dyn AsyncCircuitBreakerBackend,
860        f: F,
861    ) -> Result<T, AgentRuntimeError>
862    where
863        F: FnOnce() -> Fut,
864        Fut: std::future::Future<Output = Result<T, E>>,
865        E: std::fmt::Display,
866    {
867        // Determine effective state via async backend.
868        let effective_state = match backend.get_open_at(&self.service).await {
869            Some(opened_at) => {
870                if opened_at.elapsed() >= self.recovery_window {
871                    backend.clear_open_at(&self.service).await;
872                    tracing::info!("circuit async moved to half-open for {}", self.service);
873                    CircuitState::HalfOpen
874                } else {
875                    CircuitState::Open { opened_at }
876                }
877            }
878            None => {
879                let failures = backend.get_failures(&self.service).await;
880                if failures >= self.threshold {
881                    CircuitState::HalfOpen
882                } else {
883                    CircuitState::Closed
884                }
885            }
886        };
887
888        if let CircuitState::Open { .. } = effective_state {
889            return Err(AgentRuntimeError::CircuitOpen {
890                service: self.service.clone(),
891            });
892        }
893
894        match f().await {
895            Ok(val) => {
896                backend.reset_failures(&self.service).await;
897                backend.clear_open_at(&self.service).await;
898                Ok(val)
899            }
900            Err(e) => {
901                let failures = backend.increment_failures(&self.service).await;
902                if failures >= self.threshold {
903                    backend
904                        .set_open_at(&self.service, Instant::now())
905                        .await;
906                    tracing::info!("circuit async opened for {}", self.service);
907                }
908                Err(AgentRuntimeError::Orchestration(e.to_string()))
909            }
910        }
911    }
912}
913
914// ── AsyncCircuitBreakerBackend ────────────────────────────────────────────────
915
916/// Async counterpart of [`CircuitBreakerBackend`] for distributed backends.
917///
918/// Implement this trait for backends that require genuine async I/O — e.g. Redis,
919/// etcd, or any network-based store — so they don't need to embed their own
920/// blocking runtime.
921///
922/// [`InMemoryCircuitBreakerBackend`] implements this trait with trivially-async
923/// wrappers for use in testing and single-process deployments.
924#[async_trait::async_trait]
925pub trait AsyncCircuitBreakerBackend: Send + Sync {
926    /// Increment the consecutive failure count and return the new count.
927    async fn increment_failures(&self, service: &str) -> u32;
928    /// Reset the consecutive failure count to zero.
929    async fn reset_failures(&self, service: &str);
930    /// Return the current consecutive failure count.
931    async fn get_failures(&self, service: &str) -> u32;
932    /// Record the instant at which the circuit was opened.
933    async fn set_open_at(&self, service: &str, at: Instant);
934    /// Clear the open-at timestamp.
935    async fn clear_open_at(&self, service: &str);
936    /// Return the instant at which the circuit was opened, or `None`.
937    async fn get_open_at(&self, service: &str) -> Option<Instant>;
938}
939
940#[async_trait::async_trait]
941impl AsyncCircuitBreakerBackend for InMemoryCircuitBreakerBackend {
942    async fn increment_failures(&self, service: &str) -> u32 {
943        <Self as CircuitBreakerBackend>::increment_failures(self, service)
944    }
945    async fn reset_failures(&self, service: &str) {
946        <Self as CircuitBreakerBackend>::reset_failures(self, service);
947    }
948    async fn get_failures(&self, service: &str) -> u32 {
949        <Self as CircuitBreakerBackend>::get_failures(self, service)
950    }
951    async fn set_open_at(&self, service: &str, at: Instant) {
952        <Self as CircuitBreakerBackend>::set_open_at(self, service, at);
953    }
954    async fn clear_open_at(&self, service: &str) {
955        <Self as CircuitBreakerBackend>::clear_open_at(self, service);
956    }
957    async fn get_open_at(&self, service: &str) -> Option<Instant> {
958        <Self as CircuitBreakerBackend>::get_open_at(self, service)
959    }
960}
961
962// ── DeduplicationResult ───────────────────────────────────────────────────────
963
964/// Result of a deduplication check.
965#[derive(Debug, Clone, PartialEq)]
966pub enum DeduplicationResult {
967    /// This is a new, unseen request.
968    New,
969    /// A cached result exists for this key.
970    Cached(String),
971    /// A matching request is currently in-flight.
972    InProgress,
973}
974
975/// Deduplicates requests by key within a TTL window.
976///
977/// ## Guarantees
978/// - Deterministic: same key always maps to the same result
979/// - Thread-safe via `Arc<Mutex<_>>`
980/// - Entries expire after `ttl`
981/// - Optional `max_entries` cap bounds memory independently of TTL
982#[derive(Debug, Clone)]
983pub struct Deduplicator {
984    ttl: Duration,
985    /// Optional hard cap on cached entries. When exceeded the oldest entry is
986    /// evicted before inserting the new one, bounding memory growth even when
987    /// all keys are unique and none have expired yet.
988    max_entries: Option<usize>,
989    inner: Arc<Mutex<DeduplicatorInner>>,
990}
991
992#[derive(Debug)]
993struct DeduplicatorInner {
994    cache: HashMap<String, (String, Instant)>, // key → (result, inserted_at)
995    in_flight: HashMap<String, Instant>,       // key → started_at
996    /// Insertion-ordered keys for O(1) FIFO eviction when `max_entries` is set.
997    cache_order: std::collections::VecDeque<String>,
998    /// Tracks calls since the last full expiry scan. Full scans run every
999    /// `EXPIRY_INTERVAL` calls; per-key inline checks maintain correctness
1000    /// between scans.
1001    call_count: u64,
1002}
1003
1004impl Deduplicator {
1005    /// Create a new deduplicator with the given TTL.
1006    pub fn new(ttl: Duration) -> Self {
1007        Self {
1008            ttl,
1009            max_entries: None,
1010            inner: Arc::new(Mutex::new(DeduplicatorInner {
1011                cache: HashMap::new(),
1012                in_flight: HashMap::new(),
1013                cache_order: std::collections::VecDeque::new(),
1014                call_count: 0,
1015            })),
1016        }
1017    }
1018
1019    /// Set a hard cap on the number of cached (completed) entries.
1020    ///
1021    /// When the cache is full the oldest entry (by insertion time) is evicted
1022    /// before the new entry is stored.  This bounds memory growth for workloads
1023    /// where all request keys are unique and the TTL has not yet expired.
1024    ///
1025    /// # Returns
1026    /// - `Err(AgentRuntimeError::Orchestration)` if `max == 0`
1027    pub fn with_max_entries(mut self, max: usize) -> Result<Self, AgentRuntimeError> {
1028        if max == 0 {
1029            return Err(AgentRuntimeError::Orchestration(
1030                "Deduplicator max_entries must be >= 1".into(),
1031            ));
1032        }
1033        self.max_entries = Some(max);
1034        Ok(self)
1035    }
1036
1037    /// Check whether `key` is new, cached, or in-flight.
1038    ///
1039    /// Marks the key as in-flight if it is new.
1040    pub fn check_and_register(&self, key: &str) -> Result<DeduplicationResult, AgentRuntimeError> {
1041        let mut inner = timed_lock(&self.inner, "Deduplicator::check_and_register");
1042
1043        let now = Instant::now();
1044
1045        // Lazy expiry: full O(n) retain scan runs only every EXPIRY_INTERVAL
1046        // calls, amortising the cost. Per-key inline checks below keep
1047        // correctness between scans.
1048        const EXPIRY_INTERVAL: u64 = 64;
1049        inner.call_count = inner.call_count.wrapping_add(1);
1050        if inner.call_count % EXPIRY_INTERVAL == 0 {
1051            let ttl = self.ttl;
1052            inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1053            inner
1054                .in_flight
1055                .retain(|_, ts| now.duration_since(*ts) < ttl);
1056        }
1057
1058        // Inline expiry check for this specific key.
1059        match inner.cache.get(key) {
1060            Some((result, ts)) if now.duration_since(*ts) < self.ttl => {
1061                return Ok(DeduplicationResult::Cached(result.clone()));
1062            }
1063            Some(_) => {
1064                inner.cache.remove(key); // entry is expired
1065            }
1066            None => {}
1067        }
1068        match inner.in_flight.get(key) {
1069            Some(ts) if now.duration_since(*ts) < self.ttl => {
1070                return Ok(DeduplicationResult::InProgress);
1071            }
1072            Some(_) => {
1073                inner.in_flight.remove(key); // in-flight entry is expired
1074            }
1075            None => {}
1076        }
1077
1078        inner.in_flight.insert(key.to_owned(), now);
1079        Ok(DeduplicationResult::New)
1080    }
1081
1082    /// Check deduplication state for a key with a per-call TTL override.
1083    ///
1084    /// Marks the key as in-flight if it is new. Ignores the stored TTL and uses
1085    /// `ttl` instead for expiry checks.
1086    pub fn check(&self, key: &str, ttl: std::time::Duration) -> Result<DeduplicationResult, AgentRuntimeError> {
1087        let mut inner = timed_lock(&self.inner, "Deduplicator::check");
1088        let now = Instant::now();
1089
1090        // Lazy expiry: full scan every EXPIRY_INTERVAL calls.
1091        const EXPIRY_INTERVAL: u64 = 64;
1092        inner.call_count = inner.call_count.wrapping_add(1);
1093        if inner.call_count % EXPIRY_INTERVAL == 0 {
1094            inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1095            inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
1096        }
1097
1098        match inner.cache.get(key) {
1099            Some((result, ts)) if now.duration_since(*ts) < ttl => {
1100                return Ok(DeduplicationResult::Cached(result.clone()));
1101            }
1102            Some(_) => {
1103                inner.cache.remove(key);
1104            }
1105            None => {}
1106        }
1107        match inner.in_flight.get(key) {
1108            Some(ts) if now.duration_since(*ts) < ttl => {
1109                return Ok(DeduplicationResult::InProgress);
1110            }
1111            Some(_) => {
1112                inner.in_flight.remove(key);
1113            }
1114            None => {}
1115        }
1116
1117        inner.in_flight.insert(key.to_owned(), now);
1118        Ok(DeduplicationResult::New)
1119    }
1120
1121    /// Check deduplication state for multiple keys at once.
1122    ///
1123    /// Returns results in the same order as `requests`.
1124    /// Each entry is `(key, ttl)` — same signature as `check`.
1125    ///
1126    /// Acquires the internal mutex **once** for the entire batch, avoiding the
1127    /// per-key lock overhead of calling `check` in a loop.
1128    pub fn dedup_many(
1129        &self,
1130        requests: &[(&str, std::time::Duration)],
1131    ) -> Result<Vec<DeduplicationResult>, AgentRuntimeError> {
1132        if requests.is_empty() {
1133            return Ok(Vec::new());
1134        }
1135        let mut inner = timed_lock(&self.inner, "Deduplicator::dedup_many");
1136        let now = std::time::Instant::now();
1137        let mut results = Vec::with_capacity(requests.len());
1138
1139        for &(key, ttl) in requests {
1140            // Expire stale entries using this request's TTL.
1141            inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1142            inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
1143
1144            let result = if let Some((cached_result, _)) = inner.cache.get(key) {
1145                DeduplicationResult::Cached(cached_result.clone())
1146            } else if inner.in_flight.contains_key(key) {
1147                DeduplicationResult::InProgress
1148            } else {
1149                inner.in_flight.insert(key.to_owned(), now);
1150                DeduplicationResult::New
1151            };
1152            results.push(result);
1153        }
1154
1155        Ok(results)
1156    }
1157
1158    /// Complete a request: move from in-flight to cached with the given result.
1159    ///
1160    /// If `max_entries` is configured and the cache is full, the oldest cached
1161    /// entry (by insertion time) is evicted before the new one is stored.
1162    pub fn complete(&self, key: &str, result: impl Into<String>) -> Result<(), AgentRuntimeError> {
1163        let mut inner = timed_lock(&self.inner, "Deduplicator::complete");
1164        inner.in_flight.remove(key);
1165
1166        // Enforce max_entries cap: evict via insertion-ordered VecDeque (O(1) amortised).
1167        // Ghost entries (already expired by a prior `retain`) are skipped by looping.
1168        if let Some(max) = self.max_entries {
1169            while inner.cache.len() >= max {
1170                match inner.cache_order.pop_front() {
1171                    Some(oldest_key) => {
1172                        inner.cache.remove(&oldest_key);
1173                    }
1174                    None => break,
1175                }
1176            }
1177        }
1178
1179        let owned_key = key.to_owned();
1180        inner.cache_order.push_back(owned_key.clone());
1181        inner.cache.insert(owned_key, (result.into(), Instant::now()));
1182        Ok(())
1183    }
1184
1185    /// Remove a key from in-flight tracking without caching a result.
1186    ///
1187    /// Call this when an in-flight operation fails so that subsequent callers
1188    /// are not permanently blocked by a stuck `InProgress` entry for the full TTL.
1189    pub fn fail(&self, key: &str) -> Result<(), AgentRuntimeError> {
1190        let mut inner = timed_lock(&self.inner, "Deduplicator::fail");
1191        inner.in_flight.remove(key);
1192        Ok(())
1193    }
1194
1195    /// Return the number of keys currently in-flight (not yet completed or failed).
1196    pub fn in_flight_count(&self) -> Result<usize, AgentRuntimeError> {
1197        let inner = timed_lock(&self.inner, "Deduplicator::in_flight_count");
1198        Ok(inner.in_flight.len())
1199    }
1200
1201    /// Return a snapshot of all keys currently in-flight.
1202    pub fn in_flight_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1203        let inner = timed_lock(&self.inner, "Deduplicator::in_flight_keys");
1204        Ok(inner.in_flight.keys().cloned().collect())
1205    }
1206
1207    /// Return the number of keys currently in the completed result cache.
1208    ///
1209    /// Note: expired entries are only removed lazily on the next `check*` call.
1210    pub fn cached_count(&self) -> Result<usize, AgentRuntimeError> {
1211        let inner = timed_lock(&self.inner, "Deduplicator::cached_count");
1212        Ok(inner.cache.len())
1213    }
1214
1215    /// Return a snapshot of all keys that have cached results.
1216    ///
1217    /// Expired entries are included (they are removed lazily).  Use
1218    /// [`purge_expired`] first for a clean list of live keys.
1219    ///
1220    /// [`purge_expired`]: Deduplicator::purge_expired
1221    pub fn cached_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1222        let inner = timed_lock(&self.inner, "Deduplicator::cached_keys");
1223        Ok(inner.cache.keys().cloned().collect())
1224    }
1225
1226    /// Return the configured time-to-live for cached results.
1227    pub fn ttl(&self) -> Duration {
1228        self.ttl
1229    }
1230
1231    /// Return the configured maximum number of cached entries, if any.
1232    ///
1233    /// Returns `None` if no cap was set via [`with_max_entries`].
1234    ///
1235    /// [`with_max_entries`]: Deduplicator::with_max_entries
1236    pub fn max_entries(&self) -> Option<usize> {
1237        self.max_entries
1238    }
1239
1240    /// Return `true` if there are no in-flight requests.
1241    pub fn is_idle(&self) -> Result<bool, AgentRuntimeError> {
1242        let inner = timed_lock(&self.inner, "Deduplicator::is_idle");
1243        Ok(inner.in_flight.is_empty())
1244    }
1245
1246    /// Return the total number of items tracked by the deduplicator
1247    /// (in-flight + cached results, regardless of TTL expiry).
1248    pub fn total_count(&self) -> Result<usize, AgentRuntimeError> {
1249        let inner = timed_lock(&self.inner, "Deduplicator::total_count");
1250        Ok(inner.in_flight.len() + inner.cache.len())
1251    }
1252
1253    /// Return `true` if `key` is currently in-flight or has a cached result.
1254    ///
1255    /// Unlike [`check_and_register`] this is a read-only inspection — it does
1256    /// not register the key or consume a deduplication slot.
1257    ///
1258    /// [`check_and_register`]: Deduplicator::check_and_register
1259    pub fn contains(&self, key: &str) -> Result<bool, AgentRuntimeError> {
1260        let inner = timed_lock(&self.inner, "Deduplicator::contains");
1261        Ok(inner.in_flight.contains_key(key) || inner.cache.contains_key(key))
1262    }
1263
1264    /// Return the cached result for `key` if one exists and has not expired.
1265    ///
1266    /// Returns `None` when the key is not in the cache (either not yet
1267    /// completed or already expired).  Does not modify any state.
1268    pub fn get_result(&self, key: &str) -> Result<Option<String>, AgentRuntimeError> {
1269        let inner = timed_lock(&self.inner, "Deduplicator::get_result");
1270        let ttl = self.ttl;
1271        let now = std::time::Instant::now();
1272        Ok(inner.cache.get(key).and_then(|(result, inserted_at)| {
1273            if now.duration_since(*inserted_at) <= ttl {
1274                Some(result.clone())
1275            } else {
1276                None
1277            }
1278        }))
1279    }
1280
1281    /// Remove all in-flight entries and cached results.
1282    ///
1283    /// Useful for test teardown or hard resets.
1284    pub fn clear(&self) -> Result<(), AgentRuntimeError> {
1285        let mut inner = timed_lock(&self.inner, "Deduplicator::clear");
1286        inner.cache.clear();
1287        inner.in_flight.clear();
1288        inner.cache_order.clear();
1289        Ok(())
1290    }
1291
1292    /// Eagerly evict all cache entries whose TTL has elapsed.
1293    ///
1294    /// Under normal operation expired entries are removed lazily on the next
1295    /// `check*` call.  Call `purge_expired` for deterministic memory reclamation
1296    /// (e.g. before a `cached_count` snapshot or in a maintenance loop).
1297    ///
1298    /// Returns the number of entries that were removed.
1299    pub fn purge_expired(&self) -> Result<usize, AgentRuntimeError> {
1300        let mut inner = timed_lock(&self.inner, "Deduplicator::purge_expired");
1301        let ttl = self.ttl;
1302        let now = std::time::Instant::now();
1303        let before = inner.cache.len();
1304        inner.cache.retain(|_, (_, inserted_at)| {
1305            now.duration_since(*inserted_at) <= ttl
1306        });
1307        let removed = before - inner.cache.len();
1308        // Rebuild cache_order to drop ghost entries (keys purged from cache but
1309        // still referenced in the VecDeque).
1310        if removed > 0 {
1311            let live_keys: std::collections::HashSet<String> =
1312                inner.cache.keys().cloned().collect();
1313            inner.cache_order.retain(|k| live_keys.contains(k));
1314        }
1315        Ok(removed)
1316    }
1317
1318    /// Remove the oldest cached result entry (FIFO order).
1319    ///
1320    /// Returns `true` if an entry was removed, `false` if the cache was empty.
1321    pub fn evict_oldest(&self) -> Result<bool, AgentRuntimeError> {
1322        let mut inner = timed_lock(&self.inner, "Deduplicator::evict_oldest");
1323        while let Some(key) = inner.cache_order.pop_front() {
1324            if inner.cache.remove(&key).is_some() {
1325                return Ok(true);
1326            }
1327        }
1328        Ok(false)
1329    }
1330}
1331
1332// ── BackpressureGuard ─────────────────────────────────────────────────────────
1333
1334/// Tracks in-flight work count and enforces a capacity limit.
1335///
1336/// ## Guarantees
1337/// - Thread-safe via `Arc<Mutex<_>>`
1338/// - `try_acquire` is non-blocking
1339/// - `release` decrements the counter; no-op if counter is already 0
1340/// - Optional soft limit emits a warning when depth reaches the threshold
1341#[derive(Debug, Clone)]
1342pub struct BackpressureGuard {
1343    capacity: usize,
1344    soft_capacity: Option<usize>,
1345    inner: Arc<Mutex<usize>>,
1346}
1347
1348impl BackpressureGuard {
1349    /// Create a new guard with the given capacity.
1350    ///
1351    /// # Returns
1352    /// - `Ok(BackpressureGuard)` — on success
1353    /// - `Err(AgentRuntimeError::Orchestration)` — if `capacity == 0`
1354    pub fn new(capacity: usize) -> Result<Self, AgentRuntimeError> {
1355        if capacity == 0 {
1356            return Err(AgentRuntimeError::Orchestration(
1357                "BackpressureGuard capacity must be > 0".into(),
1358            ));
1359        }
1360        Ok(Self {
1361            capacity,
1362            soft_capacity: None,
1363            inner: Arc::new(Mutex::new(0)),
1364        })
1365    }
1366
1367    /// Set a soft capacity threshold. When depth reaches this level, a warning
1368    /// is logged but the request is still accepted (up to hard capacity).
1369    pub fn with_soft_limit(mut self, soft: usize) -> Result<Self, AgentRuntimeError> {
1370        if soft >= self.capacity {
1371            return Err(AgentRuntimeError::Orchestration(
1372                "soft_capacity must be less than hard capacity".into(),
1373            ));
1374        }
1375        self.soft_capacity = Some(soft);
1376        Ok(self)
1377    }
1378
1379    /// Try to acquire a slot.
1380    ///
1381    /// Emits a warning when the soft limit is reached (if configured), but
1382    /// still accepts the request until hard capacity is exceeded.
1383    ///
1384    /// # Returns
1385    /// - `Ok(())` — slot acquired
1386    /// - `Err(AgentRuntimeError::BackpressureShed)` — hard capacity exceeded
1387    pub fn try_acquire(&self) -> Result<(), AgentRuntimeError> {
1388        let mut depth = timed_lock(&self.inner, "BackpressureGuard::try_acquire");
1389        if *depth >= self.capacity {
1390            return Err(AgentRuntimeError::BackpressureShed {
1391                depth: *depth,
1392                capacity: self.capacity,
1393            });
1394        }
1395        *depth += 1;
1396        if let Some(soft) = self.soft_capacity {
1397            if *depth >= soft {
1398                tracing::warn!(
1399                    depth = *depth,
1400                    soft_capacity = soft,
1401                    hard_capacity = self.capacity,
1402                    "backpressure approaching hard limit"
1403                );
1404            }
1405        }
1406        Ok(())
1407    }
1408
1409    /// Release a previously acquired slot.
1410    pub fn release(&self) -> Result<(), AgentRuntimeError> {
1411        let mut depth = timed_lock(&self.inner, "BackpressureGuard::release");
1412        *depth = depth.saturating_sub(1);
1413        Ok(())
1414    }
1415
1416    /// Reset the current depth to zero.
1417    ///
1418    /// Useful in tests or after a controlled shutdown when all in-flight
1419    /// requests have been cancelled and the guard should start fresh.
1420    pub fn reset(&self) {
1421        let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset");
1422        *depth = 0;
1423    }
1424
1425    /// Return `true` if the guard is at or over its hard capacity.
1426    pub fn is_full(&self) -> Result<bool, AgentRuntimeError> {
1427        Ok(self.depth()? >= self.capacity)
1428    }
1429
1430    /// Return `true` if no slots are currently in use.
1431    pub fn is_empty(&self) -> Result<bool, AgentRuntimeError> {
1432        Ok(self.depth()? == 0)
1433    }
1434
1435    /// Return the number of additional request slots available before the hard cap.
1436    pub fn available_capacity(&self) -> Result<usize, AgentRuntimeError> {
1437        Ok(self.capacity.saturating_sub(self.depth()?))
1438    }
1439
1440    /// Return the hard capacity (maximum concurrent slots) configured for this guard.
1441    pub fn hard_capacity(&self) -> usize {
1442        self.capacity
1443    }
1444
1445    /// Return the soft capacity limit if one was configured, or `None`.
1446    pub fn soft_limit(&self) -> Option<usize> {
1447        self.soft_capacity
1448    }
1449
1450    /// Return `true` if a soft capacity limit has been configured.
1451    ///
1452    /// Equivalent to `self.soft_limit().is_some()` but more readable at call
1453    /// sites that only need a boolean check.
1454    pub fn is_soft_limited(&self) -> bool {
1455        self.soft_capacity.is_some()
1456    }
1457
1458    /// Return the current depth.
1459    pub fn depth(&self) -> Result<usize, AgentRuntimeError> {
1460        let depth = timed_lock(&self.inner, "BackpressureGuard::depth");
1461        Ok(*depth)
1462    }
1463
1464    /// Return the current depth as a percentage of the hard capacity.
1465    ///
1466    /// Returns a value in `[0.0, 100.0]`.  When `depth > capacity` (which
1467    /// cannot happen in normal operation) the result is clamped to `100.0`.
1468    pub fn percent_full(&self) -> Result<f64, AgentRuntimeError> {
1469        let depth = self.depth()?;
1470        Ok((depth as f64 / self.capacity as f64 * 100.0).min(100.0))
1471    }
1472
1473    /// Return the ratio of current depth to soft capacity as a value in `[0.0, ∞)`.
1474    ///
1475    /// Returns `0.0` if no soft limit has been configured.
1476    /// Values above `1.0` mean the soft limit has been exceeded.
1477    pub fn soft_depth_ratio(&self) -> f32 {
1478        match self.soft_capacity {
1479            None => 0.0,
1480            Some(soft) => {
1481                let depth = timed_lock(&self.inner, "BackpressureGuard::soft_depth_ratio");
1482                *depth as f32 / soft as f32
1483            }
1484        }
1485    }
1486
1487    /// Return the fraction of the hard capacity currently in use: `depth / capacity`.
1488    ///
1489    /// Returns `0.0` when no slots are in use, `1.0` when fully saturated.
1490    pub fn utilization_ratio(&self) -> Result<f32, AgentRuntimeError> {
1491        if self.capacity == 0 {
1492            return Ok(0.0);
1493        }
1494        let depth = self.depth()?;
1495        Ok(depth as f32 / self.capacity as f32)
1496    }
1497
1498    /// Return the number of additional slots that can be acquired before hitting
1499    /// the hard capacity limit.
1500    ///
1501    /// Returns `0` when the guard is full.
1502    pub fn remaining_capacity(&self) -> Result<usize, AgentRuntimeError> {
1503        let depth = self.depth()?;
1504        Ok(self.capacity.saturating_sub(depth))
1505    }
1506
1507    /// Force the in-flight depth counter to zero.
1508    ///
1509    /// Useful for test teardown or hard resets where acquired slots will never
1510    /// be released normally (e.g., after a test panics before calling `release`).
1511    pub fn reset_depth(&self) -> Result<(), AgentRuntimeError> {
1512        let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset_depth");
1513        *depth = 0;
1514        Ok(())
1515    }
1516
1517    /// Return the fraction of capacity that is still available, in `[0.0, 1.0]`.
1518    ///
1519    /// `1.0` means completely empty; `0.0` means at full capacity.
1520    pub fn headroom_ratio(&self) -> Result<f64, AgentRuntimeError> {
1521        Ok(self.available_capacity()? as f64 / self.capacity as f64)
1522    }
1523
1524    /// Return the number of currently held (acquired) slots.
1525    ///
1526    /// Equivalent to `capacity - available_capacity()`.
1527    pub fn acquired_count(&self) -> Result<usize, AgentRuntimeError> {
1528        Ok(self.capacity - self.available_capacity()?)
1529    }
1530
1531    /// Return `true` if the current depth exceeds the configured soft limit.
1532    ///
1533    /// Returns `false` if no soft limit is set.
1534    pub fn over_soft_limit(&self) -> Result<bool, AgentRuntimeError> {
1535        let soft = match self.soft_limit() {
1536            Some(s) => s,
1537            None => return Ok(false),
1538        };
1539        Ok(self.depth()? > soft)
1540    }
1541}
1542
1543// ── Pipeline ──────────────────────────────────────────────────────────────────
1544
1545/// Result of executing a pipeline, including per-stage timing.
1546#[derive(Debug)]
1547pub struct PipelineResult {
1548    /// Final output value after all stages.
1549    pub output: String,
1550    /// Per-stage timing: `(stage_name, duration_ms)` in execution order.
1551    pub stage_timings: Vec<(String, u64)>,
1552}
1553
1554impl PipelineResult {
1555    /// Return the total wall-clock time across all stages in milliseconds.
1556    pub fn total_duration_ms(&self) -> u64 {
1557        self.stage_timings.iter().map(|(_, ms)| ms).sum()
1558    }
1559
1560    /// Return the number of stages that recorded a timing entry.
1561    ///
1562    /// Normally this equals the pipeline's stage count, but may be less if
1563    /// the pipeline short-circuited after an error.
1564    pub fn stage_count(&self) -> usize {
1565        self.stage_timings.len()
1566    }
1567
1568    /// Return the name and duration of the slowest stage.
1569    ///
1570    /// Returns `None` if no stages ran.
1571    pub fn slowest_stage(&self) -> Option<(&str, u64)> {
1572        self.stage_timings
1573            .iter()
1574            .max_by_key(|(_, ms)| ms)
1575            .map(|(name, ms)| (name.as_str(), *ms))
1576    }
1577
1578    /// Return the name and duration of the fastest stage.
1579    ///
1580    /// Returns `None` if no stages ran.
1581    pub fn fastest_stage(&self) -> Option<(&str, u64)> {
1582        self.stage_timings
1583            .iter()
1584            .min_by_key(|(_, ms)| ms)
1585            .map(|(name, ms)| (name.as_str(), *ms))
1586    }
1587
1588    /// Return `true` if no stage timings were recorded (pipeline ran zero stages).
1589    pub fn is_empty(&self) -> bool {
1590        self.stage_timings.is_empty()
1591    }
1592}
1593
1594/// A single named stage in the pipeline.
1595pub struct Stage {
1596    /// Human-readable name used in log output and error messages.
1597    pub name: String,
1598    /// The transform function; receives the current string and returns the transformed string.
1599    pub handler: Box<dyn Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync>,
1600}
1601
1602impl std::fmt::Debug for Stage {
1603    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1604        f.debug_struct("Stage").field("name", &self.name).finish()
1605    }
1606}
1607
1608/// Error handler callback type for pipeline stage failures.
1609type StageErrorHandler = Box<dyn Fn(&str, &str) -> String + Send + Sync>;
1610
1611/// A composable pipeline that passes a string through a sequence of named stages.
1612///
1613/// ## Guarantees
1614/// - Stages execute in insertion order
1615/// - First stage failure short-circuits remaining stages (unless an error handler is set)
1616/// - Non-panicking
1617pub struct Pipeline {
1618    stages: Vec<Stage>,
1619    error_handler: Option<StageErrorHandler>,
1620}
1621
1622impl std::fmt::Debug for Pipeline {
1623    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1624        f.debug_struct("Pipeline")
1625            .field("stages", &self.stages)
1626            .field("has_error_handler", &self.error_handler.is_some())
1627            .finish()
1628    }
1629}
1630
1631impl Pipeline {
1632    /// Create a new empty pipeline.
1633    pub fn new() -> Self {
1634        Self { stages: Vec::new(), error_handler: None }
1635    }
1636
1637    /// Attach a recovery callback for stage failures.
1638    ///
1639    /// When a stage fails, `handler(stage_name, error_message)` is called.
1640    /// The returned string becomes the input to the next stage.
1641    /// If no handler is set, stage failures propagate as errors.
1642    pub fn with_error_handler(
1643        mut self,
1644        handler: impl Fn(&str, &str) -> String + Send + Sync + 'static,
1645    ) -> Self {
1646        self.error_handler = Some(Box::new(handler));
1647        self
1648    }
1649
1650    /// Append a stage to the pipeline.
1651    pub fn add_stage(
1652        mut self,
1653        name: impl Into<String>,
1654        handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1655    ) -> Self {
1656        self.stages.push(Stage {
1657            name: name.into(),
1658            handler: Box::new(handler),
1659        });
1660        self
1661    }
1662
1663    /// Insert a stage at the **front** of the pipeline (index 0).
1664    ///
1665    /// All existing stages are shifted to higher indices.  The pipeline's
1666    /// stage names remain unique only if the caller ensures uniqueness.
1667    pub fn prepend_stage(
1668        mut self,
1669        name: impl Into<String>,
1670        handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1671    ) -> Self {
1672        self.stages.insert(0, Stage {
1673            name: name.into(),
1674            handler: Box::new(handler),
1675        });
1676        self
1677    }
1678
1679    /// Return `true` if the pipeline has no stages.
1680    pub fn is_empty(&self) -> bool {
1681        self.stages.is_empty()
1682    }
1683
1684    /// Return `true` if a stage error handler has been configured via
1685    /// [`with_error_handler`].
1686    ///
1687    /// [`with_error_handler`]: Pipeline::with_error_handler
1688    pub fn has_error_handler(&self) -> bool {
1689        self.error_handler.is_some()
1690    }
1691
1692    /// Return the number of stages in the pipeline.
1693    pub fn stage_count(&self) -> usize {
1694        self.stages.len()
1695    }
1696
1697    /// Return `true` if a stage with the given name is registered.
1698    pub fn has_stage(&self, name: &str) -> bool {
1699        self.stages.iter().any(|s| s.name == name)
1700    }
1701
1702    /// Return the names of all stages in execution order.
1703    pub fn stage_names(&self) -> Vec<&str> {
1704        self.stages.iter().map(|s| s.name.as_str()).collect()
1705    }
1706
1707    /// Return the names of all stages as owned `String`s.
1708    ///
1709    /// Unlike [`stage_names`] this does not borrow `self`, making it easier to
1710    /// use the result after `self` is moved or mutated.
1711    ///
1712    /// [`stage_names`]: Pipeline::stage_names
1713    pub fn stage_names_owned(&self) -> Vec<String> {
1714        self.stages.iter().map(|s| s.name.clone()).collect()
1715    }
1716
1717    /// Return the name of the stage at zero-based `index`, or `None` if out of bounds.
1718    pub fn get_stage_name_at(&self, index: usize) -> Option<&str> {
1719        self.stages.get(index).map(|s| s.name.as_str())
1720    }
1721
1722    /// Return the zero-based index of the first stage with the given name.
1723    ///
1724    /// Returns `None` if no stage with that name exists.
1725    pub fn stage_index(&self, name: &str) -> Option<usize> {
1726        self.stages.iter().position(|s| s.name == name)
1727    }
1728
1729    /// Return the name of the first stage in the pipeline, or `None` if empty.
1730    pub fn first_stage_name(&self) -> Option<&str> {
1731        self.stages.first().map(|s| s.name.as_str())
1732    }
1733
1734    /// Return the name of the last stage in the pipeline, or `None` if empty.
1735    pub fn last_stage_name(&self) -> Option<&str> {
1736        self.stages.last().map(|s| s.name.as_str())
1737    }
1738
1739    /// Remove the first stage whose name equals `name`.
1740    ///
1741    /// Returns `true` if a stage was found and removed, `false` if no stage
1742    /// with that name was registered.
1743    pub fn remove_stage(&mut self, name: &str) -> bool {
1744        if let Some(pos) = self.stages.iter().position(|s| s.name == name) {
1745            self.stages.remove(pos);
1746            true
1747        } else {
1748            false
1749        }
1750    }
1751
1752    /// Rename the first stage whose name equals `old_name` to `new_name`.
1753    ///
1754    /// Returns `true` if a stage was found and renamed, `false` if no stage
1755    /// with `old_name` exists.
1756    pub fn rename_stage(&mut self, old_name: &str, new_name: impl Into<String>) -> bool {
1757        if let Some(stage) = self.stages.iter_mut().find(|s| s.name == old_name) {
1758            stage.name = new_name.into();
1759            true
1760        } else {
1761            false
1762        }
1763    }
1764
1765    /// Remove all stages from the pipeline.
1766    ///
1767    /// The error handler (if any) is preserved; only the stage list is cleared.
1768    pub fn clear(&mut self) {
1769        self.stages.clear();
1770    }
1771
1772    /// Return the number of stages whose name contains `keyword` (case-insensitive).
1773    pub fn count_stages_matching(&self, keyword: &str) -> usize {
1774        let kw = keyword.to_ascii_lowercase();
1775        self.stages
1776            .iter()
1777            .filter(|s| s.name.to_ascii_lowercase().contains(&kw))
1778            .count()
1779    }
1780
1781    /// Swap the positions of two stages by name.
1782    ///
1783    /// Returns `true` if both stages were found and swapped.  Returns `false`
1784    /// if either name is not present in the pipeline (no state change).
1785    pub fn swap_stages(&mut self, a: &str, b: &str) -> bool {
1786        let idx_a = self.stages.iter().position(|s| s.name == a);
1787        let idx_b = self.stages.iter().position(|s| s.name == b);
1788        match (idx_a, idx_b) {
1789            (Some(i), Some(j)) => {
1790                self.stages.swap(i, j);
1791                true
1792            }
1793            _ => false,
1794        }
1795    }
1796
1797    /// Execute the pipeline, passing `input` through each stage in order.
1798    #[tracing::instrument(skip(self))]
1799    pub fn run(&self, input: String) -> Result<String, AgentRuntimeError> {
1800        let mut current = input;
1801        for stage in &self.stages {
1802            tracing::debug!(stage = %stage.name, "running pipeline stage");
1803            match (stage.handler)(current) {
1804                Ok(out) => current = out,
1805                Err(e) => {
1806                    tracing::error!(stage = %stage.name, error = %e, "pipeline stage failed");
1807                    if let Some(ref handler) = self.error_handler {
1808                        current = handler(&stage.name, &e.to_string());
1809                    } else {
1810                        return Err(e);
1811                    }
1812                }
1813            }
1814        }
1815        Ok(current)
1816    }
1817
1818    /// Execute the pipeline with per-stage timing.
1819    ///
1820    /// Returns a [`PipelineResult`] whose `stage_timings` contains
1821    /// `(stage_name, duration_ms)` pairs in execution order.
1822    pub fn execute_timed(&self, input: String) -> Result<PipelineResult, AgentRuntimeError> {
1823        let mut current = input;
1824        let mut stage_timings = Vec::new();
1825        for stage in &self.stages {
1826            let start = std::time::Instant::now();
1827            tracing::debug!(stage = %stage.name, "running timed pipeline stage");
1828            match (stage.handler)(current) {
1829                Ok(out) => current = out,
1830                Err(e) => {
1831                    tracing::error!(stage = %stage.name, error = %e, "timed pipeline stage failed");
1832                    if let Some(ref handler) = self.error_handler {
1833                        current = handler(&stage.name, &e.to_string());
1834                    } else {
1835                        return Err(e);
1836                    }
1837                }
1838            }
1839            let duration_ms = start.elapsed().as_millis() as u64;
1840            stage_timings.push((stage.name.clone(), duration_ms));
1841        }
1842        Ok(PipelineResult {
1843            output: current,
1844            stage_timings,
1845        })
1846    }
1847
1848    /// Return a human-readable description of this pipeline.
1849    ///
1850    /// Format: `"Pipeline[{n} stage(s): stage1 → stage2 → ...]"`.
1851    /// Returns `"Pipeline[empty]"` when no stages have been added.
1852    ///
1853    /// Intended for logging and debugging — not a stable serialization format.
1854    pub fn description(&self) -> String {
1855        if self.stages.is_empty() {
1856            return "Pipeline[empty]".to_owned();
1857        }
1858        let names = self
1859            .stages
1860            .iter()
1861            .map(|s| s.name.as_str())
1862            .collect::<Vec<_>>()
1863            .join(" → ");
1864        let n = self.stages.len();
1865        let plural = if n == 1 { "stage" } else { "stages" };
1866        format!("Pipeline[{n} {plural}: {names}]")
1867    }
1868
1869    /// Return `true` if every stage in the pipeline has a unique name.
1870    ///
1871    /// Duplicate stage names can lead to ambiguous lookups with `stage_index`
1872    /// and `has_stage`.  Use this predicate to assert pipeline integrity
1873    /// during construction.
1874    pub fn has_unique_stage_names(&self) -> bool {
1875        let mut seen = std::collections::HashSet::new();
1876        self.stages.iter().all(|s| seen.insert(s.name.as_str()))
1877    }
1878
1879    /// Return stage names sorted in ascending lexicographic order.
1880    ///
1881    /// Unlike [`stage_names`], which returns names in insertion order, this
1882    /// always produces a stable sort regardless of the order stages were added.
1883    ///
1884    /// [`stage_names`]: Pipeline::stage_names
1885    pub fn stage_names_sorted(&self) -> Vec<&str> {
1886        let mut names: Vec<&str> = self.stages.iter().map(|s| s.name.as_str()).collect();
1887        names.sort_unstable();
1888        names
1889    }
1890
1891    /// Return the name of the stage with the most bytes, or `None` for an empty pipeline.
1892    ///
1893    /// When multiple stages share the maximum byte length, the first one in
1894    /// insertion order is returned.
1895    pub fn longest_stage_name(&self) -> Option<&str> {
1896        self.stages
1897            .iter()
1898            .max_by_key(|s| s.name.len())
1899            .map(|s| s.name.as_str())
1900    }
1901
1902    /// Return the name of the stage with the fewest bytes, or `None` for an empty pipeline.
1903    ///
1904    /// When multiple stages share the minimum byte length, the first one in
1905    /// insertion order is returned.
1906    pub fn shortest_stage_name(&self) -> Option<&str> {
1907        self.stages
1908            .iter()
1909            .min_by_key(|s| s.name.len())
1910            .map(|s| s.name.as_str())
1911    }
1912
1913    /// Return the byte lengths of all stage names in order.
1914    ///
1915    /// Returns an empty `Vec` for an empty pipeline.
1916    pub fn stage_name_lengths(&self) -> Vec<usize> {
1917        self.stages.iter().map(|s| s.name.len()).collect()
1918    }
1919
1920    /// Return the average byte length of stage names.
1921    ///
1922    /// Returns `0.0` for an empty pipeline.
1923    pub fn avg_stage_name_length(&self) -> f64 {
1924        if self.stages.is_empty() {
1925            return 0.0;
1926        }
1927        let total: usize = self.stages.iter().map(|s| s.name.len()).sum();
1928        total as f64 / self.stages.len() as f64
1929    }
1930
1931    /// Return the names of stages whose name contains `substring` (case-sensitive).
1932    ///
1933    /// Returns an empty `Vec` if no stage names match.
1934    pub fn stages_containing(&self, substring: &str) -> Vec<&str> {
1935        self.stages
1936            .iter()
1937            .filter(|s| s.name.contains(substring))
1938            .map(|s| s.name.as_str())
1939            .collect()
1940    }
1941
1942    /// Return `true` if the stage named `name` is the first stage in the pipeline.
1943    ///
1944    /// Returns `false` if the pipeline is empty or the stage is not present.
1945    pub fn stage_is_first(&self, name: &str) -> bool {
1946        self.stages.first().map_or(false, |s| s.name == name)
1947    }
1948
1949    /// Return `true` if the stage named `name` is the last stage in the pipeline.
1950    ///
1951    /// Returns `false` if the pipeline is empty or the stage is not present.
1952    pub fn stage_is_last(&self, name: &str) -> bool {
1953        self.stages.last().map_or(false, |s| s.name == name)
1954    }
1955
1956    /// Return the total byte length of all stage names combined.
1957    ///
1958    /// Returns `0` for an empty pipeline.
1959    pub fn total_stage_name_bytes(&self) -> usize {
1960        self.stages.iter().map(|s| s.name.len()).sum()
1961    }
1962
1963    /// Return the names of all stages that appear before `name` in the pipeline.
1964    ///
1965    /// Returns an empty `Vec` if `name` is not present, is the first stage,
1966    /// or the pipeline is empty.
1967    pub fn stages_before(&self, name: &str) -> Vec<&str> {
1968        let pos = self.stages.iter().position(|s| s.name == name);
1969        match pos {
1970            None | Some(0) => Vec::new(),
1971            Some(idx) => self.stages[..idx].iter().map(|s| s.name.as_str()).collect(),
1972        }
1973    }
1974
1975    /// Return the names of all stages that appear after `name` in the pipeline.
1976    ///
1977    /// Returns an empty `Vec` if `name` is not present, is the last stage,
1978    /// or the pipeline is empty.
1979    pub fn stages_after(&self, name: &str) -> Vec<&str> {
1980        let pos = self.stages.iter().position(|s| s.name == name);
1981        match pos {
1982            None => Vec::new(),
1983            Some(idx) if idx + 1 >= self.stages.len() => Vec::new(),
1984            Some(idx) => self.stages[idx + 1..].iter().map(|s| s.name.as_str()).collect(),
1985        }
1986    }
1987
1988    /// Return all consecutive stage name pairs `(from, to)` in pipeline order.
1989    ///
1990    /// For a pipeline with stages `[a, b, c]` this returns `[("a", "b"), ("b", "c")]`.
1991    /// Returns an empty `Vec` for pipelines with fewer than two stages.
1992    pub fn stage_pairs(&self) -> Vec<(&str, &str)> {
1993        self.stages
1994            .windows(2)
1995            .map(|w| (w[0].name.as_str(), w[1].name.as_str()))
1996            .collect()
1997    }
1998
1999    /// Return the number of stages whose name byte length is strictly greater
2000    /// than `min_len`.
2001    ///
2002    /// Returns `0` for an empty pipeline or when no stage name exceeds
2003    /// `min_len` bytes.
2004    pub fn stage_count_above_name_len(&self, min_len: usize) -> usize {
2005        self.stages.iter().filter(|s| s.name.len() > min_len).count()
2006    }
2007
2008    /// Return the number of stages whose name is strictly shorter than
2009    /// `max_len` bytes.
2010    ///
2011    /// Complement of [`stage_count_above_name_len`].
2012    ///
2013    /// [`stage_count_above_name_len`]: Pipeline::stage_count_above_name_len
2014    pub fn stage_count_below_name_len(&self, max_len: usize) -> usize {
2015        self.stages.iter().filter(|s| s.name.len() < max_len).count()
2016    }
2017
2018    /// Return the name of the stage at position `idx`, or `None` if `idx` is
2019    /// out of bounds.
2020    ///
2021    /// Indices are zero-based from the start of the pipeline.
2022    pub fn stage_at(&self, idx: usize) -> Option<&str> {
2023        self.stages.get(idx).map(|s| s.name.as_str())
2024    }
2025
2026    /// Return stage names in reverse pipeline order.
2027    ///
2028    /// For a pipeline `[a, b, c]` this returns `["c", "b", "a"]`.
2029    /// Returns an empty `Vec` for an empty pipeline.
2030    pub fn stages_reversed(&self) -> Vec<&str> {
2031        self.stages.iter().rev().map(|s| s.name.as_str()).collect()
2032    }
2033
2034    /// Return `true` if the pipeline has no stages.
2035    ///
2036    /// Equivalent to `stage_count() == 0`.
2037    pub fn pipeline_is_empty(&self) -> bool {
2038        self.stages.is_empty()
2039    }
2040
2041    /// Return sorted, deduplicated stage names.
2042    ///
2043    /// Stage names are unique by construction, so this is equivalent to
2044    /// `stage_names` sorted alphabetically.  Useful for set-membership checks
2045    /// without knowing insertion order.
2046    pub fn unique_stage_names(&self) -> Vec<&str> {
2047        let mut names: Vec<&str> = self.stages.iter().map(|s| s.name.as_str()).collect();
2048        names.sort_unstable();
2049        names
2050    }
2051
2052    /// Return all stage names whose name starts with `prefix`.
2053    ///
2054    /// Returned names preserve pipeline order.  Returns an empty `Vec` when
2055    /// no stage name has the given prefix or the pipeline is empty.
2056    pub fn stage_names_with_prefix<'a>(&'a self, prefix: &str) -> Vec<&'a str> {
2057        self.stages
2058            .iter()
2059            .filter(|s| s.name.starts_with(prefix))
2060            .map(|s| s.name.as_str())
2061            .collect()
2062    }
2063
2064    /// Return `true` if any stage name starts with `prefix`.
2065    ///
2066    /// A convenience predicate over [`stage_names_with_prefix`] that avoids
2067    /// allocating a `Vec` when only existence is needed.
2068    ///
2069    /// [`stage_names_with_prefix`]: Pipeline::stage_names_with_prefix
2070    pub fn contains_stage_with_prefix(&self, prefix: &str) -> bool {
2071        self.stages.iter().any(|s| s.name.starts_with(prefix))
2072    }
2073
2074    /// Return the names of all stages whose name ends with `suffix`.
2075    ///
2076    /// Complementary to [`stage_names_with_prefix`]; useful for filtering
2077    /// stages by a common naming convention (e.g. `"_validate"`).
2078    /// Returns an empty `Vec` when no stage matches or the pipeline is empty.
2079    ///
2080    /// [`stage_names_with_prefix`]: Pipeline::stage_names_with_prefix
2081    pub fn stages_with_suffix<'a>(&'a self, suffix: &str) -> Vec<&'a str> {
2082        self.stages
2083            .iter()
2084            .filter(|s| s.name.ends_with(suffix))
2085            .map(|s| s.name.as_str())
2086            .collect()
2087    }
2088
2089    /// Return `true` if any stage name contains `substr` as a substring.
2090    ///
2091    /// A quick existence check that avoids allocating a full `Vec`.
2092    /// Returns `false` for an empty pipeline.
2093    pub fn has_stage_with_name_containing(&self, substr: &str) -> bool {
2094        self.stages.iter().any(|s| s.name.contains(substr))
2095    }
2096
2097    /// Return the names of all stages whose name contains `substr`.
2098    ///
2099    /// Complements [`has_stage_with_name_containing`] by returning the full
2100    /// list rather than just a boolean.  Returns an empty `Vec` when no stage
2101    /// matches or the pipeline is empty.
2102    ///
2103    /// [`has_stage_with_name_containing`]: Pipeline::has_stage_with_name_containing
2104    pub fn stage_names_containing<'a>(&'a self, substr: &str) -> Vec<&'a str> {
2105        self.stages
2106            .iter()
2107            .filter(|s| s.name.contains(substr))
2108            .map(|s| s.name.as_str())
2109            .collect()
2110    }
2111
2112    /// Return the total number of bytes across all stage name strings.
2113    ///
2114    /// Useful for estimating the overhead of storing pipeline metadata.
2115    /// Returns `0` for an empty pipeline.
2116    pub fn stage_name_bytes_total(&self) -> usize {
2117        self.stages.iter().map(|s| s.name.len()).sum()
2118    }
2119
2120    /// Return the number of stages whose name byte length exceeds `min_bytes`.
2121    ///
2122    /// Useful for identifying long stage names that may indicate over-verbose
2123    /// naming conventions.  Returns `0` for an empty pipeline.
2124    pub fn stage_count_above_name_bytes(&self, min_bytes: usize) -> usize {
2125        self.stages.iter().filter(|s| s.name.len() > min_bytes).count()
2126    }
2127
2128    /// Return a reference to the stage at 0-based `index`, or `None` if out of range.
2129    pub fn stage_at_index(&self, index: usize) -> Option<&Stage> {
2130        self.stages.get(index)
2131    }
2132
2133    /// Return the position of the stage named `name` counted from the end of
2134    /// the pipeline (0 = last stage, 1 = second-to-last, …).
2135    ///
2136    /// Returns `None` if no stage with that name exists.
2137    pub fn stage_position_from_end(&self, name: &str) -> Option<usize> {
2138        let pos = self.stages.iter().position(|s| s.name == name)?;
2139        Some(self.stages.len() - 1 - pos)
2140    }
2141
2142    /// Return `true` if every name in `names` corresponds to an existing stage.
2143    ///
2144    /// Returns `true` for an empty `names` slice (vacuously true).
2145    pub fn contains_all_stages(&self, names: &[&str]) -> bool {
2146        names.iter().all(|&n| self.stages.iter().any(|s| s.name == n))
2147    }
2148
2149    /// Return the stage name at position `n` from the **end** of the pipeline
2150    /// (0-indexed, so `0` is the last stage).
2151    ///
2152    /// Returns `None` when `n` is out of bounds or the pipeline is empty.
2153    pub fn stage_name_from_end(&self, n: usize) -> Option<&str> {
2154        let len = self.stages.len();
2155        if n >= len {
2156            return None;
2157        }
2158        Some(self.stages[len - 1 - n].name.as_str())
2159    }
2160
2161    /// Return all stage names as an owned `Vec<String>`.
2162    ///
2163    /// Unlike [`unique_stage_names`] this preserves order and includes
2164    /// duplicates.
2165    ///
2166    /// [`unique_stage_names`]: Pipeline::unique_stage_names
2167    pub fn all_stage_names(&self) -> Vec<String> {
2168        self.stages.iter().map(|s| s.name.clone()).collect()
2169    }
2170
2171    /// Return `true` if the pipeline contains exactly `n` stages.
2172    pub fn has_exactly_n_stages(&self, n: usize) -> bool {
2173        self.stages.len() == n
2174    }
2175
2176    /// Return the 0-based index of the first stage whose name matches `name`,
2177    /// or `None` if no such stage exists.
2178    pub fn stage_index_of(&self, name: &str) -> Option<usize> {
2179        self.stages.iter().position(|s| s.name == name)
2180    }
2181
2182    /// Return `true` if the pipeline has no stages.
2183    ///
2184    /// Equivalent to `stage_count() == 0`.
2185    pub fn has_no_stages(&self) -> bool {
2186        self.stages.is_empty()
2187    }
2188
2189    /// Return the byte length of the longest stage name in the pipeline.
2190    ///
2191    /// Returns `0` for an empty pipeline.
2192    pub fn longest_stage_name_len(&self) -> usize {
2193        self.stages.iter().map(|s| s.name.len()).max().unwrap_or(0)
2194    }
2195
2196    /// Join all stage names with `sep` and return the resulting string.
2197    ///
2198    /// Returns an empty string for an empty pipeline.
2199    pub fn stage_names_joined(&self, sep: &str) -> String {
2200        self.stages
2201            .iter()
2202            .map(|s| s.name.as_str())
2203            .collect::<Vec<_>>()
2204            .join(sep)
2205    }
2206
2207    /// Return the count of stages whose name contains `substr`.
2208    ///
2209    /// Returns `0` for an empty pipeline or when no stage name matches.
2210    pub fn stage_count_with_name_containing(&self, substr: &str) -> usize {
2211        self.stages.iter().filter(|s| s.name.contains(substr)).count()
2212    }
2213
2214    /// Return `true` if a stage exists at zero-based index `idx`.
2215    pub fn has_stage_at_index(&self, idx: usize) -> bool {
2216        idx < self.stages.len()
2217    }
2218
2219    /// Return `true` if **all** stage names start with `prefix`.
2220    ///
2221    /// Returns `true` for an empty pipeline (vacuously true) and for an empty
2222    /// `prefix` string (all strings start with "").
2223    pub fn all_stage_names_start_with(&self, prefix: &str) -> bool {
2224        self.stages.iter().all(|s| s.name.starts_with(prefix))
2225    }
2226
2227    /// Return `true` if any stage in the pipeline has exactly the given
2228    /// `name`.
2229    ///
2230    /// Unlike [`Pipeline::has_stage_with_name_containing`] this checks for an
2231    /// exact match, and unlike [`Pipeline::contains_all_stages`] it accepts a
2232    /// single name without requiring slice syntax.
2233    pub fn any_stage_has_name(&self, name: &str) -> bool {
2234        self.stages.iter().any(|s| s.name == name)
2235    }
2236
2237    /// Return the name of the stage at `idx`, or `None` if the index is
2238    /// out of bounds.
2239    ///
2240    /// Convenience wrapper around [`stage_at_index`] that returns `Option<&str>`
2241    /// directly instead of `Option<&Stage>`, avoiding the need to project
2242    /// through the `Stage` struct at the call site.
2243    ///
2244    /// [`stage_at_index`]: Pipeline::stage_at_index
2245    pub fn stage_name_at(&self, idx: usize) -> Option<&str> {
2246        self.stages.get(idx).map(|s| s.name.as_str())
2247    }
2248
2249    /// Return `true` if every stage name contains `substr` as a substring.
2250    ///
2251    /// Returns `true` vacuously for an empty pipeline (no stages to violate
2252    /// the condition).
2253    pub fn all_stage_names_contain(&self, substr: &str) -> bool {
2254        self.stages.iter().all(|s| s.name.contains(substr))
2255    }
2256
2257}
2258
2259impl Default for Pipeline {
2260    fn default() -> Self {
2261        Self::new()
2262    }
2263}
2264
2265// ── Tests ─────────────────────────────────────────────────────────────────────
2266
2267#[cfg(test)]
2268mod tests {
2269    use super::*;
2270
2271    // ── RetryPolicy ───────────────────────────────────────────────────────────
2272
2273    #[test]
2274    fn test_retry_policy_rejects_zero_attempts() {
2275        assert!(RetryPolicy::exponential(0, 100).is_err());
2276    }
2277
2278    #[test]
2279    fn test_retry_policy_delay_attempt_1_equals_base() {
2280        let p = RetryPolicy::exponential(3, 100).unwrap();
2281        assert_eq!(p.delay_for(1), Duration::from_millis(100));
2282    }
2283
2284    #[test]
2285    fn test_retry_policy_delay_doubles_each_attempt() {
2286        let p = RetryPolicy::exponential(5, 100).unwrap();
2287        assert_eq!(p.delay_for(2), Duration::from_millis(200));
2288        assert_eq!(p.delay_for(3), Duration::from_millis(400));
2289        assert_eq!(p.delay_for(4), Duration::from_millis(800));
2290    }
2291
2292    #[test]
2293    fn test_retry_policy_delay_capped_at_max() {
2294        let p = RetryPolicy::exponential(10, 10_000).unwrap();
2295        assert_eq!(p.delay_for(10), MAX_RETRY_DELAY);
2296    }
2297
2298    #[test]
2299    fn test_retry_policy_delay_never_exceeds_max_for_any_attempt() {
2300        let p = RetryPolicy::exponential(10, 1000).unwrap();
2301        for attempt in 1..=10 {
2302            assert!(p.delay_for(attempt) <= MAX_RETRY_DELAY);
2303        }
2304    }
2305
2306    // ── Round 26: first_delay_ms ──────────────────────────────────────────────
2307
2308    #[test]
2309    fn test_retry_policy_first_delay_ms_equals_base_delay() {
2310        let p = RetryPolicy::exponential(3, 200).unwrap();
2311        assert_eq!(p.first_delay_ms(), p.base_delay_ms());
2312    }
2313
2314    #[test]
2315    fn test_retry_policy_first_delay_ms_constant_policy() {
2316        let p = RetryPolicy::constant(4, 150).unwrap();
2317        assert_eq!(p.first_delay_ms(), 150);
2318    }
2319
2320    // ── CircuitBreaker ────────────────────────────────────────────────────────
2321
2322    #[test]
2323    fn test_circuit_breaker_rejects_zero_threshold() {
2324        assert!(CircuitBreaker::new("svc", 0, Duration::from_secs(1)).is_err());
2325    }
2326
2327    #[test]
2328    fn test_circuit_breaker_starts_closed() {
2329        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
2330        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2331    }
2332
2333    #[test]
2334    fn test_circuit_breaker_success_keeps_closed() {
2335        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
2336        let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(42));
2337        assert!(result.is_ok());
2338        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2339    }
2340
2341    #[test]
2342    fn test_circuit_breaker_opens_after_threshold_failures() {
2343        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
2344        for _ in 0..3 {
2345            let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("oops".to_string()));
2346        }
2347        assert!(matches!(cb.state().unwrap(), CircuitState::Open { .. }));
2348    }
2349
2350    #[test]
2351    fn test_circuit_breaker_open_fast_fails() {
2352        let cb = CircuitBreaker::new("svc", 1, Duration::from_secs(3600)).unwrap();
2353        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2354        let result: Result<(), AgentRuntimeError> = cb.call(|| Ok::<(), AgentRuntimeError>(()));
2355        assert!(matches!(result, Err(AgentRuntimeError::CircuitOpen { .. })));
2356    }
2357
2358    #[test]
2359    fn test_circuit_breaker_success_resets_failure_count() {
2360        let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap();
2361        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2362        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2363        let _: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(1));
2364        assert_eq!(cb.failure_count().unwrap(), 0);
2365    }
2366
2367    #[test]
2368    fn test_circuit_breaker_half_open_on_recovery() {
2369        // Use a zero recovery window to immediately go half-open
2370        let cb = CircuitBreaker::new("svc", 1, Duration::ZERO).unwrap();
2371        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2372        // After recovery window, next call should probe (half-open → closed on success)
2373        let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(99));
2374        assert_eq!(result.unwrap_or(0), 99);
2375        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2376    }
2377
2378    #[test]
2379    fn test_circuit_breaker_with_custom_backend_uses_backend_state() {
2380        // Build a custom backend and share it between two circuit breakers
2381        // to verify that state is read from and written to the backend.
2382        let shared_backend: Arc<dyn CircuitBreakerBackend> =
2383            Arc::new(InMemoryCircuitBreakerBackend::new());
2384
2385        let cb1 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
2386            .unwrap()
2387            .with_backend(Arc::clone(&shared_backend));
2388
2389        let cb2 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
2390            .unwrap()
2391            .with_backend(Arc::clone(&shared_backend));
2392
2393        // Trigger one failure via cb1
2394        let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail".to_string()));
2395
2396        // cb2 should observe the failure recorded by cb1
2397        assert_eq!(cb2.failure_count().unwrap(), 1);
2398
2399        // Trigger the second failure to open the circuit via cb1
2400        let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail again".to_string()));
2401
2402        // cb2 should now see the circuit as open
2403        assert!(matches!(cb2.state().unwrap(), CircuitState::Open { .. }));
2404    }
2405
2406    #[test]
2407    fn test_in_memory_backend_increments_and_resets() {
2408        use super::CircuitBreakerBackend as CB;
2409        let backend = InMemoryCircuitBreakerBackend::new();
2410
2411        assert_eq!(CB::get_failures(&backend, "svc"), 0);
2412
2413        let count = CB::increment_failures(&backend, "svc");
2414        assert_eq!(count, 1);
2415
2416        let count = CB::increment_failures(&backend, "svc");
2417        assert_eq!(count, 2);
2418
2419        CB::reset_failures(&backend, "svc");
2420        assert_eq!(CB::get_failures(&backend, "svc"), 0);
2421
2422        // open_at round-trip
2423        assert!(CB::get_open_at(&backend, "svc").is_none());
2424        let now = Instant::now();
2425        CB::set_open_at(&backend, "svc", now);
2426        assert!(CB::get_open_at(&backend, "svc").is_some());
2427        CB::clear_open_at(&backend, "svc");
2428        assert!(CB::get_open_at(&backend, "svc").is_none());
2429    }
2430
2431    // ── Deduplicator ──────────────────────────────────────────────────────────
2432
2433    #[test]
2434    fn test_deduplicator_new_key_is_new() {
2435        let d = Deduplicator::new(Duration::from_secs(60));
2436        let r = d.check_and_register("key-1").unwrap();
2437        assert_eq!(r, DeduplicationResult::New);
2438    }
2439
2440    #[test]
2441    fn test_deduplicator_second_check_is_in_progress() {
2442        let d = Deduplicator::new(Duration::from_secs(60));
2443        d.check_and_register("key-1").unwrap();
2444        let r = d.check_and_register("key-1").unwrap();
2445        assert_eq!(r, DeduplicationResult::InProgress);
2446    }
2447
2448    #[test]
2449    fn test_deduplicator_complete_makes_cached() {
2450        let d = Deduplicator::new(Duration::from_secs(60));
2451        d.check_and_register("key-1").unwrap();
2452        d.complete("key-1", "result-value").unwrap();
2453        let r = d.check_and_register("key-1").unwrap();
2454        assert_eq!(r, DeduplicationResult::Cached("result-value".into()));
2455    }
2456
2457    #[test]
2458    fn test_deduplicator_different_keys_are_independent() {
2459        let d = Deduplicator::new(Duration::from_secs(60));
2460        d.check_and_register("key-a").unwrap();
2461        let r = d.check_and_register("key-b").unwrap();
2462        assert_eq!(r, DeduplicationResult::New);
2463    }
2464
2465    #[test]
2466    fn test_deduplicator_expired_entry_is_new() {
2467        let d = Deduplicator::new(Duration::ZERO); // instant TTL
2468        d.check_and_register("key-1").unwrap();
2469        d.complete("key-1", "old").unwrap();
2470        // Immediately expired — should be New again
2471        let r = d.check_and_register("key-1").unwrap();
2472        assert_eq!(r, DeduplicationResult::New);
2473    }
2474
2475    // ── BackpressureGuard ─────────────────────────────────────────────────────
2476
2477    #[test]
2478    fn test_backpressure_guard_rejects_zero_capacity() {
2479        assert!(BackpressureGuard::new(0).is_err());
2480    }
2481
2482    #[test]
2483    fn test_backpressure_guard_acquire_within_capacity() {
2484        let g = BackpressureGuard::new(5).unwrap();
2485        assert!(g.try_acquire().is_ok());
2486        assert_eq!(g.depth().unwrap(), 1);
2487    }
2488
2489    #[test]
2490    fn test_backpressure_guard_sheds_when_full() {
2491        let g = BackpressureGuard::new(2).unwrap();
2492        g.try_acquire().unwrap();
2493        g.try_acquire().unwrap();
2494        let result = g.try_acquire();
2495        assert!(matches!(
2496            result,
2497            Err(AgentRuntimeError::BackpressureShed { .. })
2498        ));
2499    }
2500
2501    #[test]
2502    fn test_backpressure_guard_release_decrements_depth() {
2503        let g = BackpressureGuard::new(3).unwrap();
2504        g.try_acquire().unwrap();
2505        g.try_acquire().unwrap();
2506        g.release().unwrap();
2507        assert_eq!(g.depth().unwrap(), 1);
2508    }
2509
2510    #[test]
2511    fn test_backpressure_guard_release_on_empty_is_noop() {
2512        let g = BackpressureGuard::new(3).unwrap();
2513        g.release().unwrap(); // Should not fail
2514        assert_eq!(g.depth().unwrap(), 0);
2515    }
2516
2517    // ── Pipeline ──────────────────────────────────────────────────────────────
2518
2519    #[test]
2520    fn test_pipeline_runs_stages_in_order() {
2521        let p = Pipeline::new()
2522            .add_stage("upper", |s| Ok(s.to_uppercase()))
2523            .add_stage("append", |s| Ok(format!("{s}!")));
2524        let result = p.run("hello".into()).unwrap();
2525        assert_eq!(result, "HELLO!");
2526    }
2527
2528    #[test]
2529    fn test_pipeline_empty_pipeline_returns_input() {
2530        let p = Pipeline::new();
2531        assert_eq!(p.run("test".into()).unwrap(), "test");
2532    }
2533
2534    #[test]
2535    fn test_pipeline_stage_failure_short_circuits() {
2536        let p = Pipeline::new()
2537            .add_stage("fail", |_| {
2538                Err(AgentRuntimeError::Orchestration("boom".into()))
2539            })
2540            .add_stage("never", |s| Ok(s));
2541        assert!(p.run("input".into()).is_err());
2542    }
2543
2544    #[test]
2545    fn test_pipeline_stage_count() {
2546        let p = Pipeline::new()
2547            .add_stage("s1", |s| Ok(s))
2548            .add_stage("s2", |s| Ok(s));
2549        assert_eq!(p.stage_count(), 2);
2550    }
2551
2552    #[test]
2553    fn test_pipeline_execute_timed_captures_stage_durations() {
2554        let p = Pipeline::new()
2555            .add_stage("s1", |s| Ok(format!("{s}1")))
2556            .add_stage("s2", |s| Ok(format!("{s}2")));
2557        let result = p.execute_timed("x".to_string()).unwrap();
2558        assert_eq!(result.output, "x12");
2559        assert_eq!(result.stage_timings.len(), 2);
2560        assert_eq!(result.stage_timings[0].0, "s1");
2561        assert_eq!(result.stage_timings[1].0, "s2");
2562    }
2563
2564    // ── Item 13: BackpressureGuard soft limit ──────────────────────────────────
2565
2566    #[test]
2567    fn test_backpressure_soft_limit_rejects_invalid_config() {
2568        // soft >= capacity must be rejected
2569        let g = BackpressureGuard::new(5).unwrap();
2570        assert!(g.with_soft_limit(5).is_err());
2571        let g = BackpressureGuard::new(5).unwrap();
2572        assert!(g.with_soft_limit(6).is_err());
2573    }
2574
2575    #[test]
2576    fn test_backpressure_soft_limit_accepts_requests_below_soft() {
2577        let g = BackpressureGuard::new(5)
2578            .unwrap()
2579            .with_soft_limit(2)
2580            .unwrap();
2581        // Both acquires below soft limit should succeed
2582        assert!(g.try_acquire().is_ok());
2583        assert!(g.try_acquire().is_ok());
2584        assert_eq!(g.depth().unwrap(), 2);
2585    }
2586
2587    #[test]
2588    fn test_backpressure_with_soft_limit_still_sheds_at_hard_capacity() {
2589        let g = BackpressureGuard::new(3)
2590            .unwrap()
2591            .with_soft_limit(2)
2592            .unwrap();
2593        g.try_acquire().unwrap();
2594        g.try_acquire().unwrap();
2595        g.try_acquire().unwrap(); // reaches hard limit
2596        let result = g.try_acquire();
2597        assert!(matches!(
2598            result,
2599            Err(AgentRuntimeError::BackpressureShed { .. })
2600        ));
2601    }
2602
2603    // ── #4/#31 BackpressureGuard::hard_capacity ───────────────────────────────
2604
2605    #[test]
2606    fn test_backpressure_hard_capacity_matches_new() {
2607        let g = BackpressureGuard::new(7).unwrap();
2608        assert_eq!(g.hard_capacity(), 7);
2609    }
2610
2611    // ── #10 Pipeline::with_error_handler ──────────────────────────────────────
2612
2613    #[test]
2614    fn test_pipeline_error_handler_recovers_from_stage_failure() {
2615        let p = Pipeline::new()
2616            .add_stage("fail_stage", |_| {
2617                Err(AgentRuntimeError::Orchestration("oops".into()))
2618            })
2619            .add_stage("append", |s| Ok(format!("{s}-recovered")))
2620            .with_error_handler(|stage_name, _err| format!("recovered_from_{stage_name}"));
2621        let result = p.run("input".to_string()).unwrap();
2622        assert_eq!(result, "recovered_from_fail_stage-recovered");
2623    }
2624
2625    // ── #11/#32 CircuitState PartialEq/Eq ────────────────────────────────────
2626
2627    #[test]
2628    fn test_circuit_state_eq() {
2629        assert_eq!(CircuitState::Closed, CircuitState::Closed);
2630        assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
2631        assert_eq!(
2632            CircuitState::Open { opened_at: std::time::Instant::now() },
2633            CircuitState::Open { opened_at: std::time::Instant::now() }
2634        );
2635        assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
2636        assert_ne!(CircuitState::Closed, CircuitState::Open { opened_at: std::time::Instant::now() });
2637    }
2638
2639    // ── #18 Deduplicator::dedup_many ──────────────────────────────────────────
2640
2641    #[test]
2642    fn test_dedup_many_independent_keys() {
2643        let d = Deduplicator::new(Duration::from_secs(60));
2644        let ttl = Duration::from_secs(60);
2645        let results = d.dedup_many(&[("key-a", ttl), ("key-b", ttl), ("key-c", ttl)]).unwrap();
2646        assert_eq!(results.len(), 3);
2647        assert!(results.iter().all(|r| matches!(r, DeduplicationResult::New)));
2648    }
2649
2650    // ── Task 11: Concurrent CircuitBreaker state transition tests ─────────────
2651
2652    #[test]
2653    fn test_concurrent_circuit_breaker_opens_under_concurrent_failures() {
2654        use std::sync::Arc;
2655        use std::thread;
2656
2657        let cb = Arc::new(
2658            CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap(),
2659        );
2660        let n_threads = 8;
2661        let failures_per_thread = 2;
2662
2663        let mut handles = Vec::new();
2664        for _ in 0..n_threads {
2665            let cb = Arc::clone(&cb);
2666            handles.push(thread::spawn(move || {
2667                for _ in 0..failures_per_thread {
2668                    let _ = cb.call(|| Err::<(), &str>("fail"));
2669                }
2670            }));
2671        }
2672        for h in handles {
2673            h.join().unwrap();
2674        }
2675
2676        // After n_threads * failures_per_thread = 16 failures with threshold=5,
2677        // the circuit must be Open.
2678        let state = cb.state().unwrap();
2679        assert!(
2680            matches!(state, CircuitState::Open { .. }),
2681            "circuit should be open after many concurrent failures; got: {state:?}"
2682        );
2683    }
2684
2685    #[test]
2686    fn test_per_service_tracking_is_independent() {
2687        let backend = Arc::new(InMemoryCircuitBreakerBackend::new());
2688
2689        let cb_a = CircuitBreaker::new("service-a", 3, Duration::from_secs(60))
2690            .unwrap()
2691            .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2692        let cb_b = CircuitBreaker::new("service-b", 3, Duration::from_secs(60))
2693            .unwrap()
2694            .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2695
2696        // Fail service-a 3 times → opens
2697        for _ in 0..3 {
2698            let _ = cb_a.call(|| Err::<(), &str>("fail"));
2699        }
2700
2701        // service-b should still be Closed
2702        let state_b = cb_b.state().unwrap();
2703        assert_eq!(
2704            state_b,
2705            CircuitState::Closed,
2706            "service-b should be unaffected by service-a failures"
2707        );
2708
2709        // service-a should be Open
2710        let state_a = cb_a.state().unwrap();
2711        assert!(
2712            matches!(state_a, CircuitState::Open { .. }),
2713            "service-a should be open"
2714        );
2715    }
2716
2717    // ── Item 14: timed_lock concurrency correctness ───────────────────────────
2718
2719    #[test]
2720    fn test_backpressure_concurrent_acquires_are_consistent() {
2721        use std::sync::Arc;
2722        use std::thread;
2723
2724        let g = Arc::new(BackpressureGuard::new(100).unwrap());
2725        let mut handles = Vec::new();
2726
2727        for _ in 0..10 {
2728            let g_clone = Arc::clone(&g);
2729            handles.push(thread::spawn(move || {
2730                g_clone.try_acquire().ok();
2731            }));
2732        }
2733
2734        for h in handles {
2735            h.join().unwrap();
2736        }
2737
2738        // All 10 threads acquired a slot; depth must be exactly 10
2739        assert_eq!(g.depth().unwrap(), 10);
2740    }
2741
2742    // ── New API tests (Rounds 4-8) ────────────────────────────────────────────
2743
2744    #[test]
2745    fn test_retry_policy_constant_has_fixed_delay() {
2746        let p = RetryPolicy::constant(3, 100).unwrap();
2747        assert_eq!(p.delay_for(1), Duration::from_millis(100));
2748        assert_eq!(p.delay_for(2), Duration::from_millis(100));
2749        assert_eq!(p.delay_for(10), Duration::from_millis(100));
2750    }
2751
2752    #[test]
2753    fn test_retry_policy_exponential_doubles() {
2754        let p = RetryPolicy::exponential(5, 10).unwrap();
2755        assert_eq!(p.delay_for(1), Duration::from_millis(10));
2756        assert_eq!(p.delay_for(2), Duration::from_millis(20));
2757        assert_eq!(p.delay_for(3), Duration::from_millis(40));
2758    }
2759
2760    #[test]
2761    fn test_retry_policy_with_max_attempts() {
2762        let p = RetryPolicy::constant(3, 50).unwrap();
2763        let p2 = p.with_max_attempts(7).unwrap();
2764        assert_eq!(p2.max_attempts, 7);
2765        assert!(RetryPolicy::constant(1, 50).unwrap().with_max_attempts(0).is_err());
2766    }
2767
2768    #[test]
2769    fn test_circuit_breaker_reset_returns_to_closed() {
2770        let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2771        cb.record_failure();
2772        cb.record_failure(); // should open
2773        assert_ne!(cb.state().unwrap(), CircuitState::Closed);
2774        cb.reset();
2775        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2776        assert_eq!(cb.failure_count().unwrap(), 0);
2777    }
2778
2779    #[test]
2780    fn test_deduplicator_clear_resets_all_state() {
2781        let d = Deduplicator::new(Duration::from_secs(60));
2782        d.check_and_register("k1").unwrap();
2783        d.check_and_register("k2").unwrap();
2784        d.complete("k1", "r1").unwrap();
2785        assert_eq!(d.in_flight_count().unwrap(), 1);
2786        assert_eq!(d.cached_count().unwrap(), 1);
2787        d.clear().unwrap();
2788        assert_eq!(d.in_flight_count().unwrap(), 0);
2789        assert_eq!(d.cached_count().unwrap(), 0);
2790    }
2791
2792    #[test]
2793    fn test_deduplicator_purge_expired_removes_stale() {
2794        let d = Deduplicator::new(Duration::from_millis(1));
2795        d.check_and_register("x").unwrap();
2796        d.complete("x", "result").unwrap();
2797        std::thread::sleep(Duration::from_millis(5));
2798        let removed = d.purge_expired().unwrap();
2799        assert_eq!(removed, 1);
2800        assert_eq!(d.cached_count().unwrap(), 0);
2801    }
2802
2803    #[test]
2804    fn test_backpressure_utilization_ratio() {
2805        let g = BackpressureGuard::new(4).unwrap();
2806        g.try_acquire().unwrap();
2807        g.try_acquire().unwrap();
2808        let ratio = g.utilization_ratio().unwrap();
2809        assert!((ratio - 0.5).abs() < 1e-5);
2810    }
2811
2812    #[test]
2813    fn test_pipeline_stage_count_and_names() {
2814        let p = Pipeline::new()
2815            .add_stage("first", |s| Ok(s + "1"))
2816            .add_stage("second", |s| Ok(s + "2"));
2817        assert_eq!(p.stage_count(), 2);
2818        assert_eq!(p.stage_names(), vec!["first", "second"]);
2819    }
2820
2821    #[test]
2822    fn test_pipeline_is_empty_true_for_new() {
2823        let p = Pipeline::new();
2824        assert!(p.is_empty());
2825    }
2826
2827    #[test]
2828    fn test_pipeline_is_empty_false_after_add_stage() {
2829        let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2830        assert!(!p.is_empty());
2831    }
2832
2833    #[test]
2834    fn test_circuit_breaker_service_name() {
2835        let cb = CircuitBreaker::new("my-service", 3, Duration::from_secs(1)).unwrap();
2836        assert_eq!(cb.service_name(), "my-service");
2837    }
2838
2839    #[test]
2840    fn test_retry_policy_none_has_max_one_attempt() {
2841        let p = RetryPolicy::none();
2842        assert_eq!(p.max_attempts, 1);
2843        assert_eq!(p.delay_for(0), Duration::ZERO);
2844    }
2845
2846    #[test]
2847    fn test_backpressure_is_full_false_when_empty() {
2848        let g = BackpressureGuard::new(5).unwrap();
2849        assert!(!g.is_full().unwrap());
2850    }
2851
2852    #[test]
2853    fn test_backpressure_is_full_true_when_at_capacity() {
2854        let g = BackpressureGuard::new(2).unwrap();
2855        g.try_acquire().unwrap();
2856        g.try_acquire().unwrap();
2857        assert!(g.is_full().unwrap());
2858    }
2859
2860    #[test]
2861    fn test_deduplicator_ttl_returns_configured_value() {
2862        let d = Deduplicator::new(Duration::from_secs(42));
2863        assert_eq!(d.ttl(), Duration::from_secs(42));
2864    }
2865
2866    #[test]
2867    fn test_circuit_breaker_is_closed_initially() {
2868        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(1)).unwrap();
2869        assert!(cb.is_closed());
2870        assert!(!cb.is_open());
2871        assert!(!cb.is_half_open());
2872    }
2873
2874    #[test]
2875    fn test_circuit_breaker_is_open_after_threshold_failures() {
2876        let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2877        cb.record_failure();
2878        cb.record_failure();
2879        assert!(cb.is_open());
2880        assert!(!cb.is_closed());
2881    }
2882
2883    #[test]
2884    fn test_retry_policy_total_max_delay_constant() {
2885        // constant 100ms × 3 attempts = 300ms total
2886        let p = RetryPolicy::constant(3, 100).unwrap();
2887        assert_eq!(p.total_max_delay_ms(), 300);
2888    }
2889
2890    #[test]
2891    fn test_retry_policy_total_max_delay_none_is_zero() {
2892        let p = RetryPolicy::none();
2893        assert_eq!(p.total_max_delay_ms(), 0);
2894    }
2895
2896    #[test]
2897    fn test_retry_policy_is_none_true_for_none() {
2898        let p = RetryPolicy::none();
2899        assert!(p.is_none());
2900    }
2901
2902    #[test]
2903    fn test_retry_policy_is_none_false_for_exponential() {
2904        let p = RetryPolicy::exponential(3, 10).unwrap();
2905        assert!(!p.is_none());
2906    }
2907
2908    #[test]
2909    fn test_pipeline_has_error_handler_false_by_default() {
2910        let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2911        assert!(!p.has_error_handler());
2912    }
2913
2914    #[test]
2915    fn test_pipeline_has_error_handler_true_after_set() {
2916        let p = Pipeline::new()
2917            .with_error_handler(|_stage, _err| "recovered".to_string());
2918        assert!(p.has_error_handler());
2919    }
2920
2921    #[test]
2922    fn test_backpressure_reset_clears_depth() {
2923        let g = BackpressureGuard::new(5).unwrap();
2924        g.try_acquire().unwrap();
2925        g.try_acquire().unwrap();
2926        assert_eq!(g.depth().unwrap(), 2);
2927        g.reset();
2928        assert_eq!(g.depth().unwrap(), 0);
2929    }
2930
2931    #[test]
2932    fn test_deduplicator_in_flight_keys_returns_started_keys() {
2933        let d = Deduplicator::new(Duration::from_secs(60));
2934        d.check("key-a", Duration::from_secs(60)).unwrap();
2935        d.check("key-b", Duration::from_secs(60)).unwrap();
2936        let mut keys = d.in_flight_keys().unwrap();
2937        keys.sort();
2938        assert_eq!(keys, vec!["key-a", "key-b"]);
2939    }
2940
2941    // ── Round 3: new methods ──────────────────────────────────────────────────
2942
2943    #[test]
2944    fn test_retry_policy_with_base_delay_ms_changes_delay() {
2945        let p = RetryPolicy::exponential(3, 100)
2946            .unwrap()
2947            .with_base_delay_ms(200)
2948            .unwrap();
2949        assert_eq!(p.delay_for(1), Duration::from_millis(200));
2950    }
2951
2952    #[test]
2953    fn test_retry_policy_with_base_delay_ms_rejects_zero() {
2954        let p = RetryPolicy::exponential(3, 100).unwrap();
2955        assert!(p.with_base_delay_ms(0).is_err());
2956    }
2957
2958    #[test]
2959    fn test_backpressure_reset_depth_clears_counter() {
2960        let guard = BackpressureGuard::new(5).unwrap();
2961        guard.try_acquire().unwrap();
2962        guard.try_acquire().unwrap();
2963        assert_eq!(guard.depth().unwrap(), 2);
2964        guard.reset_depth().unwrap();
2965        assert_eq!(guard.depth().unwrap(), 0);
2966    }
2967
2968    #[test]
2969    fn test_pipeline_remove_stage_returns_true_if_found() {
2970        let mut p = Pipeline::new()
2971            .add_stage("a", |s| Ok(s))
2972            .add_stage("b", |s| Ok(s));
2973        assert!(p.remove_stage("a"));
2974        assert_eq!(p.stage_count(), 1);
2975        assert_eq!(p.stage_names(), vec!["b"]);
2976    }
2977
2978    #[test]
2979    fn test_pipeline_remove_stage_returns_false_if_missing() {
2980        let mut p = Pipeline::new().add_stage("x", |s| Ok(s));
2981        assert!(!p.remove_stage("nope"));
2982        assert_eq!(p.stage_count(), 1);
2983    }
2984
2985    #[test]
2986    fn test_pipeline_clear_removes_all_stages() {
2987        let mut p = Pipeline::new()
2988            .add_stage("a", |s| Ok(s))
2989            .add_stage("b", |s| Ok(s));
2990        p.clear();
2991        assert!(p.is_empty());
2992    }
2993
2994    // ── Round 4: CircuitBreaker accessors / Pipeline::get_stage_name_at ──────
2995
2996    #[test]
2997    fn test_circuit_breaker_threshold_accessor() {
2998        let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(30)).unwrap();
2999        assert_eq!(cb.threshold(), 5);
3000    }
3001
3002    #[test]
3003    fn test_circuit_breaker_recovery_window_accessor() {
3004        let window = Duration::from_secs(45);
3005        let cb = CircuitBreaker::new("svc", 3, window).unwrap();
3006        assert_eq!(cb.recovery_window(), window);
3007    }
3008
3009    #[test]
3010    fn test_pipeline_get_stage_name_at_returns_correct_names() {
3011        let p = Pipeline::new()
3012            .add_stage("first", |s| Ok(s))
3013            .add_stage("second", |s| Ok(s));
3014        assert_eq!(p.get_stage_name_at(0), Some("first"));
3015        assert_eq!(p.get_stage_name_at(1), Some("second"));
3016        assert_eq!(p.get_stage_name_at(2), None);
3017    }
3018
3019    // ── Round 16: can_retry ───────────────────────────────────────────────────
3020
3021    #[test]
3022    fn test_retry_policy_can_retry_within_budget() {
3023        let p = RetryPolicy::exponential(3, 100).unwrap();
3024        assert!(p.can_retry(0));
3025        assert!(p.can_retry(1));
3026        assert!(p.can_retry(2));
3027    }
3028
3029    #[test]
3030    fn test_retry_policy_can_retry_false_when_exhausted() {
3031        let p = RetryPolicy::exponential(3, 100).unwrap();
3032        assert!(!p.can_retry(3));
3033        assert!(!p.can_retry(99));
3034    }
3035
3036    #[test]
3037    fn test_retry_policy_none_only_allows_first_attempt() {
3038        let p = RetryPolicy::none();
3039        assert!(p.can_retry(0));
3040        assert!(!p.can_retry(1));
3041    }
3042
3043    // ── Round 5: RetryPolicy::max_attempts / Pipeline::stage_names_owned ─────
3044
3045    #[test]
3046    fn test_retry_policy_max_attempts_accessor() {
3047        let p = RetryPolicy::exponential(7, 100).unwrap();
3048        assert_eq!(p.max_attempts(), 7);
3049    }
3050
3051    #[test]
3052    fn test_pipeline_stage_names_owned_returns_strings() {
3053        let p = Pipeline::new()
3054            .add_stage("alpha", |s| Ok(s))
3055            .add_stage("beta", |s| Ok(s));
3056        let owned = p.stage_names_owned();
3057        assert_eq!(owned, vec!["alpha".to_string(), "beta".to_string()]);
3058    }
3059
3060    #[test]
3061    fn test_pipeline_stage_names_owned_empty_when_no_stages() {
3062        let p = Pipeline::new();
3063        assert!(p.stage_names_owned().is_empty());
3064    }
3065
3066    // ── Round 17: attempts_remaining ─────────────────────────────────────────
3067
3068    #[test]
3069    fn test_attempts_remaining_full_at_zero() {
3070        let p = RetryPolicy::exponential(4, 100).unwrap();
3071        assert_eq!(p.attempts_remaining(0), 4);
3072    }
3073
3074    #[test]
3075    fn test_attempts_remaining_decrements_correctly() {
3076        let p = RetryPolicy::exponential(4, 100).unwrap();
3077        assert_eq!(p.attempts_remaining(2), 2);
3078        assert_eq!(p.attempts_remaining(4), 0);
3079    }
3080
3081    #[test]
3082    fn test_attempts_remaining_zero_when_exhausted() {
3083        let p = RetryPolicy::exponential(3, 100).unwrap();
3084        assert_eq!(p.attempts_remaining(10), 0);
3085    }
3086
3087    // ── Round 18: untested circuit-breaker, deduplicator, backpressure methods
3088
3089    #[test]
3090    fn test_retry_policy_max_attempts_getter() {
3091        let p = RetryPolicy::exponential(7, 50).unwrap();
3092        assert_eq!(p.max_attempts(), 7);
3093    }
3094
3095    #[test]
3096    fn test_circuit_breaker_failure_count_increments() {
3097        let cb = CircuitBreaker::new("svc2", 3, std::time::Duration::from_secs(60)).unwrap();
3098        cb.record_failure();
3099        cb.record_failure();
3100        assert_eq!(cb.failure_count().unwrap(), 2);
3101    }
3102
3103    #[test]
3104    fn test_circuit_breaker_record_success_resets_failures() {
3105        let cb = CircuitBreaker::new("svc3", 5, std::time::Duration::from_secs(60)).unwrap();
3106        cb.record_failure();
3107        cb.record_failure();
3108        cb.record_success();
3109        assert_eq!(cb.failure_count().unwrap(), 0);
3110        assert!(cb.is_closed());
3111    }
3112
3113    #[test]
3114    fn test_circuit_breaker_threshold_and_recovery_window() {
3115        let cb = CircuitBreaker::new("svc4", 3, std::time::Duration::from_secs(30)).unwrap();
3116        assert_eq!(cb.threshold(), 3);
3117        assert_eq!(cb.recovery_window(), std::time::Duration::from_secs(30));
3118    }
3119
3120    #[test]
3121    fn test_circuit_breaker_reset_clears_state() {
3122        let cb = CircuitBreaker::new("svc5", 2, std::time::Duration::from_secs(60)).unwrap();
3123        cb.record_failure();
3124        cb.record_failure(); // should open circuit
3125        assert!(cb.is_open());
3126        cb.reset();
3127        assert!(cb.is_closed());
3128        assert_eq!(cb.failure_count().unwrap(), 0);
3129    }
3130
3131    #[test]
3132    fn test_deduplicator_cached_count_after_complete() {
3133        let d = Deduplicator::new(Duration::from_secs(60));
3134        d.check("key1", Duration::from_secs(60)).unwrap();
3135        d.complete("key1", "result").unwrap();
3136        assert_eq!(d.cached_count().unwrap(), 1);
3137    }
3138
3139    #[test]
3140    fn test_deduplicator_ttl_matches_configured() {
3141        let d = Deduplicator::new(Duration::from_secs(42));
3142        assert_eq!(d.ttl(), Duration::from_secs(42));
3143    }
3144
3145    #[test]
3146    fn test_deduplicator_purge_expired_removes_stale_entries() {
3147        let d = Deduplicator::new(Duration::ZERO); // instant TTL
3148        d.check("stale", Duration::ZERO).unwrap();
3149        d.complete("stale", "val").unwrap();
3150        // Sleep briefly to ensure the entry expires
3151        std::thread::sleep(std::time::Duration::from_millis(1));
3152        let removed = d.purge_expired().unwrap();
3153        assert!(removed >= 1);
3154    }
3155
3156    #[test]
3157    fn test_backpressure_remaining_capacity() {
3158        let g = BackpressureGuard::new(5).unwrap();
3159        g.try_acquire().unwrap();
3160        assert_eq!(g.remaining_capacity().unwrap(), 4);
3161    }
3162
3163    #[test]
3164    fn test_backpressure_soft_depth_ratio_without_soft_limit() {
3165        let g = BackpressureGuard::new(5).unwrap();
3166        assert_eq!(g.soft_depth_ratio(), 0.0);
3167    }
3168
3169    #[test]
3170    fn test_backpressure_soft_depth_ratio_with_soft_limit() {
3171        let g = BackpressureGuard::new(10).unwrap()
3172            .with_soft_limit(4).unwrap();
3173        g.try_acquire().unwrap();
3174        g.try_acquire().unwrap();
3175        let ratio = g.soft_depth_ratio();
3176        assert!((ratio - 0.5).abs() < 1e-6);
3177    }
3178
3179    // ── Round 7: delay_ms_for / soft_limit / has_stage ───────────────────────
3180
3181    #[test]
3182    fn test_retry_delay_ms_for_matches_delay_for() {
3183        let p = RetryPolicy::exponential(5, 100).unwrap();
3184        assert_eq!(p.delay_ms_for(1), p.delay_for(1).as_millis() as u64);
3185        assert_eq!(p.delay_ms_for(3), p.delay_for(3).as_millis() as u64);
3186    }
3187
3188    #[test]
3189    fn test_backpressure_soft_limit_returns_configured_value() {
3190        let g = BackpressureGuard::new(10).unwrap()
3191            .with_soft_limit(5).unwrap();
3192        assert_eq!(g.soft_limit(), Some(5));
3193    }
3194
3195    #[test]
3196    fn test_backpressure_soft_limit_none_when_not_set() {
3197        let g = BackpressureGuard::new(10).unwrap();
3198        assert_eq!(g.soft_limit(), None);
3199    }
3200
3201    #[test]
3202    fn test_pipeline_has_stage_returns_true_when_present() {
3203        let p = Pipeline::new().add_stage("step1", |s| Ok(s));
3204        assert!(p.has_stage("step1"));
3205        assert!(!p.has_stage("step2"));
3206    }
3207
3208    #[test]
3209    fn test_pipeline_has_stage_false_for_empty_pipeline() {
3210        let p = Pipeline::new();
3211        assert!(!p.has_stage("anything"));
3212    }
3213
3214    // ── Round 20: Deduplicator::max_entries / RetryPolicy::delay_for ─────────
3215
3216    #[test]
3217    fn test_deduplicator_max_entries_none_by_default() {
3218        let d = Deduplicator::new(Duration::from_secs(60));
3219        assert_eq!(d.max_entries(), None);
3220    }
3221
3222    #[test]
3223    fn test_deduplicator_max_entries_set_via_builder() {
3224        let d = Deduplicator::new(Duration::from_secs(60))
3225            .with_max_entries(50)
3226            .unwrap();
3227        assert_eq!(d.max_entries(), Some(50));
3228    }
3229
3230    #[test]
3231    fn test_retry_policy_delay_for_exponential_grows() {
3232        let p = RetryPolicy::exponential(5, 100).unwrap();
3233        // delay_for uses saturating_sub(1): attempt 1 => multiplier 1, attempt 2 => multiplier 2
3234        let d1 = p.delay_for(1);
3235        let d2 = p.delay_for(2);
3236        assert!(d2 > d1, "exponential delay should grow: attempt 2 > attempt 1");
3237    }
3238
3239    #[test]
3240    fn test_retry_policy_delay_for_constant_stays_same() {
3241        let p = RetryPolicy::constant(5, 200).unwrap();
3242        assert_eq!(p.delay_for(0), p.delay_for(1));
3243        assert_eq!(p.delay_for(1), p.delay_for(3));
3244    }
3245
3246    // ── Round 9: is_no_retry ──────────────────────────────────────────────────
3247
3248    #[test]
3249    fn test_is_no_retry_true_for_none_policy() {
3250        let p = RetryPolicy::none();
3251        assert!(p.is_no_retry());
3252    }
3253
3254    #[test]
3255    fn test_is_no_retry_false_for_exponential_policy() {
3256        let p = RetryPolicy::exponential(3, 50).unwrap();
3257        assert!(!p.is_no_retry());
3258    }
3259
3260    #[test]
3261    fn test_is_no_retry_false_for_constant_policy_with_multiple_attempts() {
3262        let p = RetryPolicy::constant(2, 100).unwrap();
3263        assert!(!p.is_no_retry());
3264    }
3265
3266    // ── Round 10: is_exponential ──────────────────────────────────────────────
3267
3268    #[test]
3269    fn test_is_exponential_true_for_exponential_policy() {
3270        let p = RetryPolicy::exponential(3, 50).unwrap();
3271        assert!(p.is_exponential());
3272    }
3273
3274    #[test]
3275    fn test_is_exponential_false_for_constant_policy() {
3276        let p = RetryPolicy::constant(3, 50).unwrap();
3277        assert!(!p.is_exponential());
3278    }
3279
3280    #[test]
3281    fn test_is_exponential_false_for_none_policy() {
3282        let p = RetryPolicy::none();
3283        assert!(!p.is_exponential());
3284    }
3285
3286    // ── Round 11: BackpressureGuard::is_soft_limited ──────────────────────────
3287
3288    #[test]
3289    fn test_is_soft_limited_false_without_soft_limit() {
3290        let g = BackpressureGuard::new(10).unwrap();
3291        assert!(!g.is_soft_limited());
3292    }
3293
3294    #[test]
3295    fn test_is_soft_limited_true_when_soft_limit_set() {
3296        let g = BackpressureGuard::new(10)
3297            .unwrap()
3298            .with_soft_limit(5)
3299            .unwrap();
3300        assert!(g.is_soft_limited());
3301    }
3302
3303    // ── Round 12: RetryPolicy::base_delay_ms, BackpressureGuard::percent_full ─
3304
3305    #[test]
3306    fn test_retry_policy_base_delay_ms_exponential() {
3307        let p = RetryPolicy::exponential(3, 250).unwrap();
3308        assert_eq!(p.base_delay_ms(), 250);
3309    }
3310
3311    #[test]
3312    fn test_retry_policy_base_delay_ms_constant() {
3313        let p = RetryPolicy::constant(5, 100).unwrap();
3314        assert_eq!(p.base_delay_ms(), 100);
3315    }
3316
3317    #[test]
3318    fn test_retry_policy_base_delay_ms_none_is_zero() {
3319        let p = RetryPolicy::none();
3320        assert_eq!(p.base_delay_ms(), 0);
3321    }
3322
3323    #[test]
3324    fn test_backpressure_percent_full_zero_when_empty() {
3325        let g = BackpressureGuard::new(100).unwrap();
3326        let pct = g.percent_full().unwrap();
3327        assert!((pct - 0.0).abs() < 1e-9);
3328    }
3329
3330    #[test]
3331    fn test_backpressure_percent_full_capped_at_100() {
3332        let g = BackpressureGuard::new(10).unwrap();
3333        // Fill all slots via try_acquire
3334        for _ in 0..10 {
3335            g.try_acquire().unwrap();
3336        }
3337        let pct = g.percent_full().unwrap();
3338        assert!((pct - 100.0).abs() < 1e-9);
3339    }
3340
3341    // ── Round 27: get_result, rename_stage, failure_rate ─────────────────────
3342
3343    #[test]
3344    fn test_deduplicator_get_result_returns_cached_value() {
3345        let d = Deduplicator::new(std::time::Duration::from_secs(60));
3346        d.check_and_register("req-1").unwrap();
3347        d.complete("req-1", "the answer").unwrap();
3348        let result = d.get_result("req-1").unwrap();
3349        assert_eq!(result, Some("the answer".to_string()));
3350    }
3351
3352    #[test]
3353    fn test_deduplicator_get_result_missing_key_returns_none() {
3354        let d = Deduplicator::new(std::time::Duration::from_secs(60));
3355        assert_eq!(d.get_result("ghost").unwrap(), None);
3356    }
3357
3358    #[test]
3359    fn test_pipeline_rename_stage_succeeds() {
3360        let mut p = Pipeline::new().add_stage("old-name", |s: String| Ok(s));
3361        let renamed = p.rename_stage("old-name", "new-name");
3362        assert!(renamed);
3363        assert!(p.has_stage("new-name"));
3364        assert!(!p.has_stage("old-name"));
3365    }
3366
3367    #[test]
3368    fn test_pipeline_rename_stage_missing_returns_false() {
3369        let mut p = Pipeline::new();
3370        assert!(!p.rename_stage("nonexistent", "anything"));
3371    }
3372
3373    #[test]
3374    fn test_circuit_breaker_failure_rate_zero_initially() {
3375        let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
3376        assert!((cb.failure_rate() - 0.0).abs() < 1e-9);
3377    }
3378
3379    #[test]
3380    fn test_circuit_breaker_failure_rate_increases_with_failures() {
3381        let cb = CircuitBreaker::new("svc-fr", 4, std::time::Duration::from_secs(10)).unwrap();
3382        cb.record_failure();
3383        cb.record_failure();
3384        // 2 failures / threshold 4 = 0.5
3385        assert!((cb.failure_rate() - 0.5).abs() < 1e-9);
3386    }
3387
3388    // ── Round 14: Pipeline::prepend_stage ────────────────────────────────────
3389
3390    #[test]
3391    fn test_prepend_stage_inserts_at_front() {
3392        let p = Pipeline::new()
3393            .add_stage("second", |s| Ok(s))
3394            .prepend_stage("first", |s| Ok(s));
3395        let names = p.stage_names_owned();
3396        assert_eq!(names[0], "first");
3397        assert_eq!(names[1], "second");
3398    }
3399
3400    #[test]
3401    fn test_prepend_stage_executes_before_existing_stages() {
3402        let p = Pipeline::new()
3403            .add_stage("append", |s| Ok(format!("{s}_appended")))
3404            .prepend_stage("prefix", |s| Ok(format!("pre_{s}")));
3405        let result = p.run("input".to_string()).unwrap();
3406        assert_eq!(result, "pre_input_appended");
3407    }
3408
3409    #[test]
3410    fn test_prepend_stage_on_empty_pipeline() {
3411        let p = Pipeline::new().prepend_stage("only", |s| Ok(s.to_uppercase()));
3412        let result = p.run("hello".to_string()).unwrap();
3413        assert_eq!(result, "HELLO");
3414    }
3415
3416    // ── Round 21: CircuitBreaker::is_at_threshold, BackpressureGuard::headroom_ratio ──
3417
3418    #[test]
3419    fn test_circuit_breaker_is_at_threshold_false_initially() {
3420        let cb = CircuitBreaker::new("svc", 3, std::time::Duration::from_secs(10)).unwrap();
3421        assert!(!cb.is_at_threshold());
3422    }
3423
3424    #[test]
3425    fn test_circuit_breaker_is_at_threshold_true_when_failures_reach_threshold() {
3426        let cb = CircuitBreaker::new("svc-t", 2, std::time::Duration::from_secs(10)).unwrap();
3427        cb.record_failure();
3428        assert!(!cb.is_at_threshold());
3429        cb.record_failure();
3430        assert!(cb.is_at_threshold());
3431    }
3432
3433    #[test]
3434    fn test_backpressure_headroom_ratio_one_when_empty() {
3435        let g = BackpressureGuard::new(10).unwrap();
3436        let ratio = g.headroom_ratio().unwrap();
3437        assert!((ratio - 1.0).abs() < 1e-9);
3438    }
3439
3440    #[test]
3441    fn test_backpressure_headroom_ratio_decreases_on_acquire() {
3442        let g = BackpressureGuard::new(4).unwrap();
3443        g.try_acquire().unwrap(); // 1/4 used → headroom = 3/4
3444        let ratio = g.headroom_ratio().unwrap();
3445        assert!((ratio - 0.75).abs() < 1e-9);
3446    }
3447
3448    // ── Round 17: Pipeline first/last/stage_index, BackpressureGuard is_empty/available ──
3449
3450    #[test]
3451    fn test_pipeline_first_stage_name_returns_first() {
3452        let p = Pipeline::new()
3453            .add_stage("alpha", |s| Ok(s))
3454            .add_stage("beta", |s| Ok(s));
3455        assert_eq!(p.first_stage_name(), Some("alpha"));
3456    }
3457
3458    #[test]
3459    fn test_pipeline_first_stage_name_none_when_empty() {
3460        let p = Pipeline::new();
3461        assert!(p.first_stage_name().is_none());
3462    }
3463
3464    #[test]
3465    fn test_pipeline_last_stage_name_returns_last() {
3466        let p = Pipeline::new()
3467            .add_stage("alpha", |s| Ok(s))
3468            .add_stage("omega", |s| Ok(s));
3469        assert_eq!(p.last_stage_name(), Some("omega"));
3470    }
3471
3472    #[test]
3473    fn test_pipeline_stage_index_returns_correct_position() {
3474        let p = Pipeline::new()
3475            .add_stage("first", |s| Ok(s))
3476            .add_stage("second", |s| Ok(s))
3477            .add_stage("third", |s| Ok(s));
3478        assert_eq!(p.stage_index("first"), Some(0));
3479        assert_eq!(p.stage_index("second"), Some(1));
3480        assert_eq!(p.stage_index("third"), Some(2));
3481        assert_eq!(p.stage_index("missing"), None);
3482    }
3483
3484    #[test]
3485    fn test_backpressure_is_empty_true_when_no_slots_acquired() {
3486        let g = BackpressureGuard::new(10).unwrap();
3487        assert!(g.is_empty().unwrap());
3488    }
3489
3490    #[test]
3491    fn test_backpressure_is_empty_false_after_acquire() {
3492        let g = BackpressureGuard::new(10).unwrap();
3493        g.try_acquire().unwrap();
3494        assert!(!g.is_empty().unwrap());
3495    }
3496
3497    #[test]
3498    fn test_backpressure_available_capacity_decrements_on_acquire() {
3499        let g = BackpressureGuard::new(5).unwrap();
3500        assert_eq!(g.available_capacity().unwrap(), 5);
3501        g.try_acquire().unwrap();
3502        assert_eq!(g.available_capacity().unwrap(), 4);
3503    }
3504
3505    // ── Round 16: Deduplicator::evict_oldest ─────────────────────────────────
3506
3507    #[test]
3508    fn test_evict_oldest_removes_first_cached_entry() {
3509        let d = Deduplicator::new(std::time::Duration::from_secs(60));
3510        // Register and complete two entries to put them in cache
3511        d.check_and_register("alpha").unwrap();
3512        d.check_and_register("beta").unwrap();
3513        d.complete("alpha", "result_a").unwrap();
3514        d.complete("beta", "result_b").unwrap();
3515        // Evict the oldest (alpha)
3516        let removed = d.evict_oldest().unwrap();
3517        assert!(removed);
3518        assert!(d.get_result("alpha").unwrap().is_none());
3519        assert!(d.get_result("beta").unwrap().is_some());
3520    }
3521
3522    #[test]
3523    fn test_evict_oldest_returns_false_when_empty() {
3524        let d = Deduplicator::new(std::time::Duration::from_secs(60));
3525        assert!(!d.evict_oldest().unwrap());
3526    }
3527
3528    // ── Round 17: CircuitBreaker::is_at_threshold three-failure variant ──────
3529
3530    #[test]
3531    fn test_circuit_breaker_is_at_threshold_true_after_three_failures() {
3532        let cb = CircuitBreaker::new("svc-3", 3, std::time::Duration::from_secs(60)).unwrap();
3533        cb.record_failure();
3534        cb.record_failure();
3535        cb.record_failure();
3536        assert!(cb.is_at_threshold());
3537    }
3538
3539    // ── Round 22: CircuitBreaker::failures_until_open ─────────────────────────
3540
3541    #[test]
3542    fn test_failures_until_open_equals_threshold_initially() {
3543        let cb = CircuitBreaker::new("svc-fuo", 5, std::time::Duration::from_secs(60)).unwrap();
3544        assert_eq!(cb.failures_until_open(), 5);
3545    }
3546
3547    #[test]
3548    fn test_failures_until_open_decrements_with_each_failure() {
3549        let cb = CircuitBreaker::new("svc-fuo2", 4, std::time::Duration::from_secs(60)).unwrap();
3550        cb.record_failure();
3551        assert_eq!(cb.failures_until_open(), 3);
3552        cb.record_failure();
3553        assert_eq!(cb.failures_until_open(), 2);
3554    }
3555
3556    #[test]
3557    fn test_failures_until_open_zero_when_at_threshold() {
3558        let cb = CircuitBreaker::new("svc-fuo3", 2, std::time::Duration::from_secs(60)).unwrap();
3559        cb.record_failure();
3560        cb.record_failure();
3561        assert_eq!(cb.failures_until_open(), 0);
3562    }
3563
3564    // ── Round 29: Deduplicator::cached_keys ──────────────────────────────────
3565
3566    #[test]
3567    fn test_deduplicator_cached_keys_empty_initially() {
3568        let d = Deduplicator::new(Duration::from_secs(60));
3569        assert!(d.cached_keys().unwrap().is_empty());
3570    }
3571
3572    #[test]
3573    fn test_deduplicator_cached_keys_contains_completed_key() {
3574        let d = Deduplicator::new(Duration::from_secs(60));
3575        d.check_and_register("ck-key").unwrap();
3576        d.complete("ck-key", "result").unwrap();
3577        let keys = d.cached_keys().unwrap();
3578        assert!(keys.contains(&"ck-key".to_string()));
3579    }
3580
3581    #[test]
3582    fn test_deduplicator_cached_keys_excludes_in_flight() {
3583        let d = Deduplicator::new(Duration::from_secs(60));
3584        d.check_and_register("pending-key").unwrap();
3585        // In-flight keys live in a separate map, not in cache
3586        assert!(!d.cached_keys().unwrap().contains(&"pending-key".to_string()));
3587    }
3588
3589    #[test]
3590    fn test_deduplicator_cached_keys_multiple_entries() {
3591        let d = Deduplicator::new(Duration::from_secs(60));
3592        for k in ["alpha", "beta", "gamma"] {
3593            d.check_and_register(k).unwrap();
3594            d.complete(k, "v").unwrap();
3595        }
3596        let keys = d.cached_keys().unwrap();
3597        assert_eq!(keys.len(), 3);
3598    }
3599
3600    // ── Round 30: RetryPolicy::is_constant, total_max_delay_ms ───────────────
3601
3602    #[test]
3603    fn test_retry_policy_is_constant_true_for_constant() {
3604        let p = RetryPolicy::constant(3, 100).unwrap();
3605        assert!(p.is_constant());
3606        assert!(!p.is_exponential());
3607    }
3608
3609    #[test]
3610    fn test_retry_policy_is_constant_false_for_exponential() {
3611        let p = RetryPolicy::exponential(3, 100).unwrap();
3612        assert!(!p.is_constant());
3613    }
3614
3615    #[test]
3616    fn test_retry_policy_total_max_delay_ms_constant() {
3617        // constant(3, 100) → delays [100, 100, 100] = 300
3618        let p = RetryPolicy::constant(3, 100).unwrap();
3619        assert_eq!(p.total_max_delay_ms(), 300);
3620    }
3621
3622    #[test]
3623    fn test_retry_policy_total_max_delay_ms_exponential() {
3624        // exponential(3, 100) → delays [100, 200, 400] (capped at MAX)
3625        let p = RetryPolicy::exponential(3, 100).unwrap();
3626        let total = p.total_max_delay_ms();
3627        assert!(total >= 300); // at minimum 100+100+100
3628    }
3629
3630    // ── Round 30: CircuitBreaker::is_half_open, is_healthy ───────────────────
3631
3632    #[test]
3633    fn test_circuit_breaker_is_healthy_true_when_closed() {
3634        let cb = CircuitBreaker::new("svc-ih1", 3, Duration::from_secs(60)).unwrap();
3635        assert!(cb.is_healthy());
3636    }
3637
3638    #[test]
3639    fn test_circuit_breaker_is_healthy_false_when_open() {
3640        let cb = CircuitBreaker::new("svc-ih2", 1, Duration::from_secs(60)).unwrap();
3641        let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3642        assert!(!cb.is_healthy());
3643    }
3644
3645    #[test]
3646    fn test_circuit_breaker_is_half_open_after_zero_recovery() {
3647        let cb = CircuitBreaker::new("svc-ho1", 1, Duration::ZERO).unwrap();
3648        let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3649        // With zero recovery window the circuit immediately enters HalfOpen
3650        assert!(cb.is_half_open() || cb.is_healthy()); // HalfOpen or recovered
3651    }
3652
3653    // ── Round 30: Deduplicator::is_idle ──────────────────────────────────────
3654
3655    #[test]
3656    fn test_deduplicator_is_idle_true_when_empty() {
3657        let d = Deduplicator::new(Duration::from_secs(60));
3658        assert!(d.is_idle().unwrap());
3659    }
3660
3661    #[test]
3662    fn test_deduplicator_is_idle_false_when_in_flight() {
3663        let d = Deduplicator::new(Duration::from_secs(60));
3664        d.check_and_register("req-x").unwrap();
3665        assert!(!d.is_idle().unwrap());
3666    }
3667
3668    #[test]
3669    fn test_deduplicator_is_idle_true_after_complete() {
3670        let d = Deduplicator::new(Duration::from_secs(60));
3671        d.check_and_register("req-y").unwrap();
3672        d.complete("req-y", "done").unwrap();
3673        assert!(d.is_idle().unwrap());
3674    }
3675
3676    // ── Round 26: in_flight_count ─────────────────────────────────────────────
3677
3678    #[test]
3679    fn test_deduplicator_in_flight_count_zero_initially() {
3680        let d = Deduplicator::new(Duration::from_secs(60));
3681        assert_eq!(d.in_flight_count().unwrap(), 0);
3682    }
3683
3684    #[test]
3685    fn test_deduplicator_in_flight_count_increments_on_register() {
3686        let d = Deduplicator::new(Duration::from_secs(60));
3687        d.check_and_register("k1").unwrap();
3688        d.check_and_register("k2").unwrap();
3689        assert_eq!(d.in_flight_count().unwrap(), 2);
3690    }
3691
3692    #[test]
3693    fn test_deduplicator_in_flight_count_decrements_after_complete() {
3694        let d = Deduplicator::new(Duration::from_secs(60));
3695        d.check_and_register("k1").unwrap();
3696        d.complete("k1", "result").unwrap();
3697        assert_eq!(d.in_flight_count().unwrap(), 0);
3698    }
3699
3700    // ── Round 27: total_count, acquired_count, swap_stages, will_retry_at_all
3701
3702    #[test]
3703    fn test_deduplicator_total_count_sums_in_flight_and_cached() {
3704        let d = Deduplicator::new(Duration::from_secs(60));
3705        d.check_and_register("k1").unwrap(); // in-flight
3706        d.check_and_register("k2").unwrap(); // in-flight
3707        d.complete("k1", "done").unwrap();   // moves to cache
3708        // 1 in-flight + 1 cached = 2
3709        assert_eq!(d.total_count().unwrap(), 2);
3710    }
3711
3712    #[test]
3713    fn test_deduplicator_total_count_zero_when_empty() {
3714        let d = Deduplicator::new(Duration::from_secs(60));
3715        assert_eq!(d.total_count().unwrap(), 0);
3716    }
3717
3718    #[test]
3719    fn test_backpressure_acquired_count_zero_initially() {
3720        let g = BackpressureGuard::new(5).unwrap();
3721        assert_eq!(g.acquired_count().unwrap(), 0);
3722    }
3723
3724    #[test]
3725    fn test_backpressure_acquired_count_increments_on_acquire() {
3726        let g = BackpressureGuard::new(5).unwrap();
3727        g.try_acquire().unwrap();
3728        g.try_acquire().unwrap();
3729        assert_eq!(g.acquired_count().unwrap(), 2);
3730    }
3731
3732    #[test]
3733    fn test_pipeline_swap_stages_swaps_positions() {
3734        let mut p = Pipeline::new()
3735            .add_stage("a", |s| Ok(s + "A"))
3736            .add_stage("b", |s| Ok(s + "B"));
3737        let swapped = p.swap_stages("a", "b");
3738        assert!(swapped);
3739        assert_eq!(p.first_stage_name().unwrap(), "b");
3740        assert_eq!(p.last_stage_name().unwrap(), "a");
3741    }
3742
3743    #[test]
3744    fn test_pipeline_swap_stages_returns_false_for_unknown_stage() {
3745        let mut p = Pipeline::new().add_stage("a", |s| Ok(s));
3746        assert!(!p.swap_stages("a", "missing"));
3747    }
3748
3749    #[test]
3750    fn test_retry_policy_will_retry_at_all_false_for_none() {
3751        let p = RetryPolicy::none();
3752        assert!(!p.will_retry_at_all());
3753    }
3754
3755    #[test]
3756    fn test_retry_policy_will_retry_at_all_true_for_exponential() {
3757        let p = RetryPolicy::exponential(3, 100).unwrap();
3758        assert!(p.will_retry_at_all());
3759    }
3760
3761    // ── Round 31: Deduplicator::fail ─────────────────────────────────────────
3762
3763    #[test]
3764    fn test_deduplicator_fail_removes_in_flight_key() {
3765        let d = Deduplicator::new(Duration::from_secs(60));
3766        d.check_and_register("failing-req").unwrap();
3767        assert!(!d.is_idle().unwrap());
3768        d.fail("failing-req").unwrap();
3769        assert!(d.is_idle().unwrap());
3770    }
3771
3772    #[test]
3773    fn test_deduplicator_fail_on_unknown_key_is_noop() {
3774        let d = Deduplicator::new(Duration::from_secs(60));
3775        assert!(d.fail("nonexistent").is_ok());
3776    }
3777
3778    #[test]
3779    fn test_deduplicator_fail_allows_reregistration() {
3780        let d = Deduplicator::new(Duration::from_secs(60));
3781        d.check_and_register("retry-key").unwrap();
3782        d.fail("retry-key").unwrap();
3783        let result = d.check_and_register("retry-key").unwrap();
3784        assert_eq!(result, DeduplicationResult::New);
3785    }
3786
3787    // ── Round 27: max_total_delay_ms ──────────────────────────────────────────
3788
3789    #[test]
3790    fn test_retry_policy_max_total_delay_ms_constant_policy() {
3791        let p = RetryPolicy::constant(3, 100).unwrap();
3792        // 3 attempts × 100ms each = 300ms
3793        assert_eq!(p.max_total_delay_ms(), 300);
3794    }
3795
3796    #[test]
3797    fn test_retry_policy_max_total_delay_ms_single_attempt() {
3798        let p = RetryPolicy::constant(1, 50).unwrap();
3799        assert_eq!(p.max_total_delay_ms(), 50);
3800    }
3801
3802    // ── Round 28: is_last_attempt ─────────────────────────────────────────────
3803
3804    #[test]
3805    fn test_retry_policy_is_last_attempt_true_at_max() {
3806        let p = RetryPolicy::exponential(3, 100).unwrap();
3807        assert!(p.is_last_attempt(3));
3808    }
3809
3810    #[test]
3811    fn test_retry_policy_is_last_attempt_false_before_max() {
3812        let p = RetryPolicy::exponential(3, 100).unwrap();
3813        assert!(!p.is_last_attempt(2));
3814    }
3815
3816    #[test]
3817    fn test_retry_policy_is_last_attempt_true_beyond_max() {
3818        let p = RetryPolicy::exponential(3, 100).unwrap();
3819        assert!(p.is_last_attempt(4));
3820    }
3821
3822    // ── Round 29: delay_sum_ms ────────────────────────────────────────────────
3823
3824    #[test]
3825    fn test_retry_policy_delay_sum_ms_constant_two_attempts() {
3826        let p = RetryPolicy::constant(5, 100).unwrap();
3827        assert_eq!(p.delay_sum_ms(2), 200);
3828    }
3829
3830    #[test]
3831    fn test_retry_policy_delay_sum_ms_capped_at_max_attempts() {
3832        let p = RetryPolicy::constant(2, 50).unwrap();
3833        // n=10 but max_attempts=2 → only 2 delays summed = 100
3834        assert_eq!(p.delay_sum_ms(10), 100);
3835    }
3836
3837    // ── Round 30: avg_delay_ms ────────────────────────────────────────────────
3838
3839    #[test]
3840    fn test_retry_policy_avg_delay_ms_constant() {
3841        let p = RetryPolicy::constant(4, 100).unwrap();
3842        // every attempt = 100ms → average = 100ms
3843        assert_eq!(p.avg_delay_ms(), 100);
3844    }
3845
3846    #[test]
3847    fn test_retry_policy_avg_delay_ms_single_attempt_policy() {
3848        // RetryPolicy::none() has 1 attempt and ZERO delay
3849        let p = RetryPolicy::none();
3850        assert_eq!(p.avg_delay_ms(), 0);
3851    }
3852
3853    #[test]
3854    fn test_backoff_factor_exponential_returns_two() {
3855        let p = RetryPolicy::exponential(3, 100).unwrap();
3856        assert!((p.backoff_factor() - 2.0).abs() < 1e-9);
3857    }
3858
3859    #[test]
3860    fn test_backoff_factor_constant_returns_one() {
3861        let p = RetryPolicy::constant(3, 100).unwrap();
3862        assert!((p.backoff_factor() - 1.0).abs() < 1e-9);
3863    }
3864
3865    #[test]
3866    fn test_pipeline_count_stages_matching_counts_by_keyword() {
3867        let p = Pipeline::new()
3868            .add_stage("normalize-text", |s| Ok(s))
3869            .add_stage("text-trim", |s| Ok(s))
3870            .add_stage("embed", |s| Ok(s));
3871        assert_eq!(p.count_stages_matching("text"), 2);
3872        assert_eq!(p.count_stages_matching("embed"), 1);
3873        assert_eq!(p.count_stages_matching("missing"), 0);
3874    }
3875
3876    #[test]
3877    fn test_pipeline_count_stages_matching_case_insensitive() {
3878        let p = Pipeline::new().add_stage("TEXT-CLEAN", |s| Ok(s));
3879        assert_eq!(p.count_stages_matching("text"), 1);
3880    }
3881
3882    #[test]
3883    fn test_backpressure_guard_over_soft_limit_true_when_exceeded() {
3884        let guard = BackpressureGuard::new(10)
3885            .unwrap()
3886            .with_soft_limit(1)
3887            .unwrap();
3888        guard.try_acquire().unwrap();
3889        guard.try_acquire().unwrap();
3890        assert!(guard.over_soft_limit().unwrap());
3891    }
3892
3893    #[test]
3894    fn test_backpressure_guard_over_soft_limit_false_when_no_soft_limit() {
3895        let guard = BackpressureGuard::new(10).unwrap();
3896        guard.try_acquire().unwrap();
3897        assert!(!guard.over_soft_limit().unwrap());
3898    }
3899
3900    // ── Round 41: Pipeline::description, has_unique_stage_names ──────────────
3901
3902    #[test]
3903    fn test_pipeline_description_empty() {
3904        let p = Pipeline::new();
3905        assert_eq!(p.description(), "Pipeline[empty]");
3906    }
3907
3908    #[test]
3909    fn test_pipeline_description_single_stage() {
3910        let p = Pipeline::new().add_stage("trim", |s: String| Ok(s.trim().to_owned()));
3911        assert_eq!(p.description(), "Pipeline[1 stage: trim]");
3912    }
3913
3914    #[test]
3915    fn test_pipeline_description_multiple_stages() {
3916        let p = Pipeline::new()
3917            .add_stage("a", |s: String| Ok(s))
3918            .add_stage("b", |s: String| Ok(s))
3919            .add_stage("c", |s: String| Ok(s));
3920        let desc = p.description();
3921        assert!(desc.contains("3 stages"));
3922        assert!(desc.contains("a → b → c"));
3923    }
3924
3925    #[test]
3926    fn test_pipeline_has_unique_stage_names_true_when_all_unique() {
3927        let p = Pipeline::new()
3928            .add_stage("x", |s: String| Ok(s))
3929            .add_stage("y", |s: String| Ok(s));
3930        assert!(p.has_unique_stage_names());
3931    }
3932
3933    #[test]
3934    fn test_pipeline_has_unique_stage_names_false_when_duplicate() {
3935        let p = Pipeline::new()
3936            .add_stage("dup", |s: String| Ok(s))
3937            .add_stage("dup", |s: String| Ok(s));
3938        assert!(!p.has_unique_stage_names());
3939    }
3940
3941    #[test]
3942    fn test_pipeline_has_unique_stage_names_true_for_empty_pipeline() {
3943        let p = Pipeline::new();
3944        assert!(p.has_unique_stage_names());
3945    }
3946
3947    // ── Round 42 ──────────────────────────────────────────────────────────────
3948
3949    #[test]
3950    fn test_pipeline_stage_name_lengths_returns_byte_lengths_in_order() {
3951        let p = Pipeline::new()
3952            .add_stage("ab", |s: String| Ok(s))
3953            .add_stage("cdef", |s: String| Ok(s));
3954        assert_eq!(p.stage_name_lengths(), vec![2, 4]);
3955    }
3956
3957    #[test]
3958    fn test_pipeline_stage_name_lengths_empty_pipeline_returns_empty() {
3959        let p = Pipeline::new();
3960        assert!(p.stage_name_lengths().is_empty());
3961    }
3962
3963    #[test]
3964    fn test_pipeline_avg_stage_name_length_computed_correctly() {
3965        let p = Pipeline::new()
3966            .add_stage("ab", |s: String| Ok(s))   // 2
3967            .add_stage("abcd", |s: String| Ok(s)); // 4
3968        assert!((p.avg_stage_name_length() - 3.0).abs() < 1e-9);
3969    }
3970
3971    #[test]
3972    fn test_pipeline_avg_stage_name_length_zero_for_empty() {
3973        assert_eq!(Pipeline::new().avg_stage_name_length(), 0.0);
3974    }
3975
3976    #[test]
3977    fn test_pipeline_stages_containing_returns_matching_names() {
3978        let p = Pipeline::new()
3979            .add_stage("tokenize", |s: String| Ok(s))
3980            .add_stage("encode", |s: String| Ok(s))
3981            .add_stage("token-validate", |s: String| Ok(s));
3982        let result = p.stages_containing("token");
3983        assert_eq!(result.len(), 2);
3984        assert!(result.contains(&"tokenize"));
3985        assert!(result.contains(&"token-validate"));
3986    }
3987
3988    #[test]
3989    fn test_pipeline_stages_containing_returns_empty_when_no_match() {
3990        let p = Pipeline::new().add_stage("process", |s: String| Ok(s));
3991        assert!(p.stages_containing("xyz").is_empty());
3992    }
3993
3994    #[test]
3995    fn test_pipeline_stage_is_first_returns_true_for_first_stage() {
3996        let p = Pipeline::new()
3997            .add_stage("first", |s: String| Ok(s))
3998            .add_stage("second", |s: String| Ok(s));
3999        assert!(p.stage_is_first("first"));
4000        assert!(!p.stage_is_first("second"));
4001    }
4002
4003    #[test]
4004    fn test_pipeline_stage_is_last_returns_true_for_last_stage() {
4005        let p = Pipeline::new()
4006            .add_stage("first", |s: String| Ok(s))
4007            .add_stage("last", |s: String| Ok(s));
4008        assert!(p.stage_is_last("last"));
4009        assert!(!p.stage_is_last("first"));
4010    }
4011
4012    // ── Round 41: stage_names_sorted, longest/shortest_stage_name ─────────────
4013
4014    #[test]
4015    fn test_stage_names_sorted_returns_alphabetical_order() {
4016        let p = Pipeline::new()
4017            .add_stage("zebra", |s: String| Ok(s))
4018            .add_stage("alpha", |s: String| Ok(s))
4019            .add_stage("mango", |s: String| Ok(s));
4020        assert_eq!(p.stage_names_sorted(), vec!["alpha", "mango", "zebra"]);
4021    }
4022
4023    #[test]
4024    fn test_stage_names_sorted_empty_pipeline_returns_empty() {
4025        let p = Pipeline::new();
4026        assert!(p.stage_names_sorted().is_empty());
4027    }
4028
4029    #[test]
4030    fn test_longest_stage_name_returns_longest() {
4031        let p = Pipeline::new()
4032            .add_stage("ab", |s: String| Ok(s))
4033            .add_stage("abcde", |s: String| Ok(s))
4034            .add_stage("abc", |s: String| Ok(s));
4035        assert_eq!(p.longest_stage_name(), Some("abcde"));
4036    }
4037
4038    #[test]
4039    fn test_longest_stage_name_empty_pipeline_returns_none() {
4040        let p = Pipeline::new();
4041        assert_eq!(p.longest_stage_name(), None);
4042    }
4043
4044    #[test]
4045    fn test_shortest_stage_name_returns_shortest() {
4046        let p = Pipeline::new()
4047            .add_stage("ab", |s: String| Ok(s))
4048            .add_stage("abcde", |s: String| Ok(s))
4049            .add_stage("a", |s: String| Ok(s));
4050        assert_eq!(p.shortest_stage_name(), Some("a"));
4051    }
4052
4053    #[test]
4054    fn test_shortest_stage_name_empty_pipeline_returns_none() {
4055        let p = Pipeline::new();
4056        assert_eq!(p.shortest_stage_name(), None);
4057    }
4058
4059    // ── Round 42: Display impls ───────────────────────────────────────────────
4060
4061    #[test]
4062    fn test_circuit_state_display_closed() {
4063        let s = CircuitState::Closed;
4064        assert_eq!(s.to_string(), "Closed");
4065    }
4066
4067    #[test]
4068    fn test_circuit_state_display_open() {
4069        let s = CircuitState::Open { opened_at: std::time::Instant::now() };
4070        assert_eq!(s.to_string(), "Open");
4071    }
4072
4073    #[test]
4074    fn test_circuit_state_display_half_open() {
4075        let s = CircuitState::HalfOpen;
4076        assert_eq!(s.to_string(), "HalfOpen");
4077    }
4078
4079    #[test]
4080    fn test_retry_policy_display_exponential() {
4081        let p = RetryPolicy::exponential(3, 100).unwrap();
4082        let s = p.to_string();
4083        assert!(s.contains("Exponential"));
4084        assert!(s.contains('3'));
4085        assert!(s.contains("100ms"));
4086    }
4087
4088    #[test]
4089    fn test_retry_policy_display_constant() {
4090        let p = RetryPolicy::constant(5, 50).unwrap();
4091        let s = p.to_string();
4092        assert!(s.contains("Constant"));
4093        assert!(s.contains('5'));
4094        assert!(s.contains("50ms"));
4095    }
4096
4097    // ── Round 43 ──────────────────────────────────────────────────────────────
4098
4099    #[test]
4100    fn test_pipeline_total_stage_name_bytes_sums_correctly() {
4101        let p = Pipeline::new()
4102            .add_stage("ab", |s: String| Ok(s))   // 2
4103            .add_stage("xyz", |s: String| Ok(s));  // 3
4104        assert_eq!(p.total_stage_name_bytes(), 5);
4105    }
4106
4107    #[test]
4108    fn test_pipeline_total_stage_name_bytes_zero_for_empty() {
4109        assert_eq!(Pipeline::new().total_stage_name_bytes(), 0);
4110    }
4111
4112    #[test]
4113    fn test_pipeline_stages_before_returns_preceding_names() {
4114        let p = Pipeline::new()
4115            .add_stage("a", |s: String| Ok(s))
4116            .add_stage("b", |s: String| Ok(s))
4117            .add_stage("c", |s: String| Ok(s));
4118        assert_eq!(p.stages_before("c"), vec!["a", "b"]);
4119        assert!(p.stages_before("a").is_empty());
4120    }
4121
4122    #[test]
4123    fn test_pipeline_stages_before_returns_empty_for_unknown_stage() {
4124        let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4125        assert!(p.stages_before("missing").is_empty());
4126    }
4127
4128    // ── Round 43: stages_after, stage_position_from_end, contains_all_stages ──
4129
4130    #[test]
4131    fn test_stages_after_returns_stages_following_name() {
4132        let p = Pipeline::new()
4133            .add_stage("a", |s: String| Ok(s))
4134            .add_stage("b", |s: String| Ok(s))
4135            .add_stage("c", |s: String| Ok(s));
4136        assert_eq!(p.stages_after("a"), vec!["b", "c"]);
4137    }
4138
4139    #[test]
4140    fn test_stages_after_last_stage_returns_empty() {
4141        let p = Pipeline::new()
4142            .add_stage("a", |s: String| Ok(s))
4143            .add_stage("b", |s: String| Ok(s));
4144        assert!(p.stages_after("b").is_empty());
4145    }
4146
4147    #[test]
4148    fn test_stages_after_unknown_name_returns_empty() {
4149        let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4150        assert!(p.stages_after("missing").is_empty());
4151    }
4152
4153    #[test]
4154    fn test_stage_position_from_end_last_is_zero() {
4155        let p = Pipeline::new()
4156            .add_stage("x", |s: String| Ok(s))
4157            .add_stage("y", |s: String| Ok(s))
4158            .add_stage("z", |s: String| Ok(s));
4159        assert_eq!(p.stage_position_from_end("z"), Some(0));
4160        assert_eq!(p.stage_position_from_end("x"), Some(2));
4161    }
4162
4163    #[test]
4164    fn test_stage_position_from_end_unknown_returns_none() {
4165        let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4166        assert_eq!(p.stage_position_from_end("missing"), None);
4167    }
4168
4169    #[test]
4170    fn test_contains_all_stages_true_when_all_present() {
4171        let p = Pipeline::new()
4172            .add_stage("a", |s: String| Ok(s))
4173            .add_stage("b", |s: String| Ok(s));
4174        assert!(p.contains_all_stages(&["a", "b"]));
4175    }
4176
4177    #[test]
4178    fn test_contains_all_stages_false_when_one_missing() {
4179        let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4180        assert!(!p.contains_all_stages(&["a", "b"]));
4181    }
4182
4183    #[test]
4184    fn test_contains_all_stages_true_for_empty_names() {
4185        let p = Pipeline::new();
4186        assert!(p.contains_all_stages(&[]));
4187    }
4188
4189    // ── Round 44: stage_count_above_name_len, stage_pairs ─────────────────────
4190
4191    #[test]
4192    fn test_stage_count_above_name_len_counts_longer_names() {
4193        let p = Pipeline::new()
4194            .add_stage("ab", |s: String| Ok(s))
4195            .add_stage("abcde", |s: String| Ok(s))
4196            .add_stage("xyz", |s: String| Ok(s));
4197        assert_eq!(p.stage_count_above_name_len(2), 2); // "abcde" (5) and "xyz" (3)
4198    }
4199
4200    #[test]
4201    fn test_stage_count_above_name_len_zero_when_none_exceed() {
4202        let p = Pipeline::new()
4203            .add_stage("a", |s: String| Ok(s))
4204            .add_stage("b", |s: String| Ok(s));
4205        assert_eq!(p.stage_count_above_name_len(5), 0);
4206    }
4207
4208    #[test]
4209    fn test_stage_pairs_returns_consecutive_pairs() {
4210        let p = Pipeline::new()
4211            .add_stage("a", |s: String| Ok(s))
4212            .add_stage("b", |s: String| Ok(s))
4213            .add_stage("c", |s: String| Ok(s));
4214        assert_eq!(p.stage_pairs(), vec![("a", "b"), ("b", "c")]);
4215    }
4216
4217    #[test]
4218    fn test_stage_pairs_empty_for_single_stage_pipeline() {
4219        let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4220        assert!(p.stage_pairs().is_empty());
4221    }
4222
4223    // ── Round 44: CircuitBreaker::describe, RetryPolicy::attempts_budget_used ──
4224
4225    #[test]
4226    fn test_circuit_breaker_describe_contains_service_name() {
4227        let cb = CircuitBreaker::new("my-service", 3, std::time::Duration::from_secs(30)).unwrap();
4228        let desc = cb.describe().unwrap();
4229        assert!(desc.contains("my-service"));
4230    }
4231
4232    #[test]
4233    fn test_circuit_breaker_describe_shows_closed_state_initially() {
4234        let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
4235        let desc = cb.describe().unwrap();
4236        assert!(desc.contains("Closed"));
4237    }
4238
4239    #[test]
4240    fn test_circuit_breaker_describe_shows_failure_counts() {
4241        let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
4242        cb.record_failure();
4243        cb.record_failure();
4244        let desc = cb.describe().unwrap();
4245        assert!(desc.contains("2/5"));
4246    }
4247
4248    #[test]
4249    fn test_retry_policy_attempts_budget_used_zero_at_start() {
4250        let p = RetryPolicy::exponential(4, 10).unwrap();
4251        assert_eq!(p.attempts_budget_used(0), 0.0);
4252    }
4253
4254    #[test]
4255    fn test_retry_policy_attempts_budget_used_one_when_exhausted() {
4256        let p = RetryPolicy::exponential(4, 10).unwrap();
4257        assert_eq!(p.attempts_budget_used(4), 1.0);
4258    }
4259
4260    #[test]
4261    fn test_retry_policy_attempts_budget_used_clamped_to_one() {
4262        let p = RetryPolicy::exponential(4, 10).unwrap();
4263        assert_eq!(p.attempts_budget_used(10), 1.0);
4264    }
4265
4266    #[test]
4267    fn test_retry_policy_attempts_budget_used_half_way() {
4268        let p = RetryPolicy::constant(4, 10).unwrap();
4269        assert!((p.attempts_budget_used(2) - 0.5).abs() < 1e-9);
4270    }
4271
4272    #[test]
4273    fn test_retry_policy_attempts_budget_used_fully_used_for_none_policy_after_one_attempt() {
4274        // RetryPolicy::none() has max_attempts=1; after 1 attempt the budget is fully consumed.
4275        let p = RetryPolicy::none();
4276        assert_eq!(p.attempts_budget_used(1), 1.0);
4277        assert_eq!(p.attempts_budget_used(0), 0.0);
4278    }
4279
4280    // ── Round 45: stage_at, stages_reversed ───────────────────────────────────
4281
4282    #[test]
4283    fn test_stage_at_returns_name_at_index() {
4284        let p = Pipeline::new()
4285            .add_stage("first", |s: String| Ok(s))
4286            .add_stage("second", |s: String| Ok(s))
4287            .add_stage("third", |s: String| Ok(s));
4288        assert_eq!(p.stage_at(0), Some("first"));
4289        assert_eq!(p.stage_at(2), Some("third"));
4290        assert_eq!(p.stage_at(3), None);
4291    }
4292
4293    #[test]
4294    fn test_stage_at_returns_none_for_empty_pipeline() {
4295        let p = Pipeline::new();
4296        assert_eq!(p.stage_at(0), None);
4297    }
4298
4299    #[test]
4300    fn test_stages_reversed_returns_names_in_reverse_order() {
4301        let p = Pipeline::new()
4302            .add_stage("a", |s: String| Ok(s))
4303            .add_stage("b", |s: String| Ok(s))
4304            .add_stage("c", |s: String| Ok(s));
4305        assert_eq!(p.stages_reversed(), vec!["c", "b", "a"]);
4306    }
4307
4308    #[test]
4309    fn test_stages_reversed_empty_for_empty_pipeline() {
4310        let p = Pipeline::new();
4311        assert!(p.stages_reversed().is_empty());
4312    }
4313
4314    // ── Round 46: pipeline_is_empty ────────────────────────────────────────────
4315
4316    #[test]
4317    fn test_pipeline_is_empty_true_for_empty_pipeline() {
4318        let p = Pipeline::new();
4319        assert!(p.pipeline_is_empty());
4320    }
4321
4322    #[test]
4323    fn test_pipeline_is_empty_false_after_adding_stage() {
4324        let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4325        assert!(!p.pipeline_is_empty());
4326    }
4327
4328    // ── Round 45: unique_stage_names ──────────────────────────────────────────
4329
4330    #[test]
4331    fn test_unique_stage_names_returns_sorted_names() {
4332        let p = Pipeline::new()
4333            .add_stage("charlie", |s: String| Ok(s))
4334            .add_stage("alpha", |s: String| Ok(s))
4335            .add_stage("bravo", |s: String| Ok(s));
4336        assert_eq!(p.unique_stage_names(), vec!["alpha", "bravo", "charlie"]);
4337    }
4338
4339    #[test]
4340    fn test_unique_stage_names_empty_for_empty_pipeline() {
4341        let p = Pipeline::new();
4342        assert!(p.unique_stage_names().is_empty());
4343    }
4344
4345    // ── Round 47: stage_names_with_prefix ─────────────────────────────────────
4346
4347    #[test]
4348    fn test_stage_names_with_prefix_returns_matching_stages() {
4349        let p = Pipeline::new()
4350            .add_stage("validate_input", |s: String| Ok(s))
4351            .add_stage("transform_data", |s: String| Ok(s))
4352            .add_stage("validate_output", |s: String| Ok(s));
4353        let names = p.stage_names_with_prefix("validate");
4354        assert_eq!(names, vec!["validate_input", "validate_output"]);
4355    }
4356
4357    #[test]
4358    fn test_stage_names_with_prefix_empty_when_no_match() {
4359        let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
4360        assert!(p.stage_names_with_prefix("validate").is_empty());
4361    }
4362
4363    // ── Round 48: stages_with_suffix ───────────────────────────────────────────
4364
4365    #[test]
4366    fn test_stages_with_suffix_returns_matching_stages() {
4367        let p = Pipeline::new()
4368            .add_stage("input_validate", |s: String| Ok(s))
4369            .add_stage("transform_data", |s: String| Ok(s))
4370            .add_stage("output_validate", |s: String| Ok(s));
4371        let names = p.stages_with_suffix("validate");
4372        assert_eq!(names, vec!["input_validate", "output_validate"]);
4373    }
4374
4375    #[test]
4376    fn test_stages_with_suffix_empty_when_no_match() {
4377        let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
4378        assert!(p.stages_with_suffix("validate").is_empty());
4379    }
4380
4381    // ── Round 49: has_stage_with_name_containing, stage_name_bytes_total ───────
4382
4383    #[test]
4384    fn test_has_stage_with_name_containing_true_when_match_exists() {
4385        let p = Pipeline::new()
4386            .add_stage("transform_input", |s: String| Ok(s))
4387            .add_stage("write_output", |s: String| Ok(s));
4388        assert!(p.has_stage_with_name_containing("transform"));
4389    }
4390
4391    #[test]
4392    fn test_has_stage_with_name_containing_false_when_no_match() {
4393        let p = Pipeline::new().add_stage("write", |s: String| Ok(s));
4394        assert!(!p.has_stage_with_name_containing("transform"));
4395    }
4396
4397    #[test]
4398    fn test_stage_name_bytes_total_sums_name_lengths() {
4399        let p = Pipeline::new()
4400            .add_stage("ab", |s: String| Ok(s))
4401            .add_stage("cde", |s: String| Ok(s));
4402        assert_eq!(p.stage_name_bytes_total(), 5);
4403    }
4404
4405    #[test]
4406    fn test_stage_name_bytes_total_zero_for_empty_pipeline() {
4407        let p = Pipeline::new();
4408        assert_eq!(p.stage_name_bytes_total(), 0);
4409    }
4410
4411    // ── Round 47: failure_headroom ────────────────────────────────────────────
4412
4413    #[test]
4414    fn test_failure_headroom_full_when_no_failures_recorded() {
4415        let cb = CircuitBreaker::new("svc-r47", 3, std::time::Duration::from_secs(10)).unwrap();
4416        assert_eq!(cb.failure_headroom(), 3);
4417    }
4418
4419    #[test]
4420    fn test_failure_headroom_decreases_with_each_failure() {
4421        let cb = CircuitBreaker::new("svc-r47b", 3, std::time::Duration::from_secs(10)).unwrap();
4422        cb.record_failure();
4423        assert_eq!(cb.failure_headroom(), 2);
4424        cb.record_failure();
4425        assert_eq!(cb.failure_headroom(), 1);
4426    }
4427
4428    #[test]
4429    fn test_failure_headroom_zero_when_at_or_above_threshold() {
4430        let cb = CircuitBreaker::new("svc-r47c", 2, std::time::Duration::from_secs(10)).unwrap();
4431        cb.record_failure();
4432        cb.record_failure();
4433        assert_eq!(cb.failure_headroom(), 0);
4434    }
4435
4436    // ── Round 49: stage_count_below_name_len ──────────────────────────────────
4437
4438    #[test]
4439    fn test_stage_count_below_name_len_counts_short_names() {
4440        let p = Pipeline::new()
4441            .add_stage("ab", |s: String| Ok(s))      // len=2
4442            .add_stage("abcde", |s: String| Ok(s))   // len=5
4443            .add_stage("xyz", |s: String| Ok(s));     // len=3
4444        // strictly less than 4: "ab" (2) and "xyz" (3)
4445        assert_eq!(p.stage_count_below_name_len(4), 2);
4446    }
4447
4448    #[test]
4449    fn test_stage_count_below_name_len_zero_for_empty_pipeline() {
4450        let p = Pipeline::new();
4451        assert_eq!(p.stage_count_below_name_len(10), 0);
4452    }
4453
4454    // ── Round 50: stage_count_above_name_bytes ────────────────────────────────
4455
4456    #[test]
4457    fn test_stage_count_above_name_bytes_counts_long_names() {
4458        let p = Pipeline::new()
4459            .add_stage("ab", |s: String| Ok(s))
4460            .add_stage("a_very_long_name", |s: String| Ok(s));
4461        assert_eq!(p.stage_count_above_name_bytes(3), 1);
4462    }
4463
4464    #[test]
4465    fn test_stage_count_above_name_bytes_zero_for_empty_pipeline() {
4466        let p = Pipeline::new();
4467        assert_eq!(p.stage_count_above_name_bytes(0), 0);
4468    }
4469
4470    // ── Round 47: contains_stage_with_prefix ──────────────────────────────────
4471
4472    #[test]
4473    fn test_contains_stage_with_prefix_true_when_present() {
4474        let p = Pipeline::new()
4475            .add_stage("validate_input", |s: String| Ok(s))
4476            .add_stage("transform_data", |s: String| Ok(s));
4477        assert!(p.contains_stage_with_prefix("validate"));
4478    }
4479
4480    #[test]
4481    fn test_contains_stage_with_prefix_false_when_absent() {
4482        let p = Pipeline::new().add_stage("stage_a", |s: String| Ok(s));
4483        assert!(!p.contains_stage_with_prefix("missing"));
4484    }
4485
4486    #[test]
4487    fn test_contains_stage_with_prefix_false_for_empty_pipeline() {
4488        let p = Pipeline::new();
4489        assert!(!p.contains_stage_with_prefix("any"));
4490    }
4491
4492    // ── Round 50: is_bounded, remaining_wait_budget_ms ────────────────────────
4493
4494    #[test]
4495    fn test_retry_policy_is_bounded_true_for_normal_policy() {
4496        let p = RetryPolicy::constant(3, 100).unwrap();
4497        assert!(p.is_bounded());
4498    }
4499
4500    #[test]
4501    fn test_retry_policy_is_bounded_true_for_none_policy() {
4502        let p = RetryPolicy::none();
4503        assert!(p.is_bounded());
4504    }
4505
4506    #[test]
4507    fn test_retry_policy_remaining_wait_budget_full_at_zero_attempts() {
4508        let p = RetryPolicy::constant(3, 100).unwrap();
4509        // total budget = 3 * 100 = 300, no attempts done → remaining = 300
4510        assert_eq!(p.remaining_wait_budget_ms(0), 300);
4511    }
4512
4513    #[test]
4514    fn test_retry_policy_remaining_wait_budget_decreases_with_attempts() {
4515        let p = RetryPolicy::constant(3, 100).unwrap();
4516        // After 1 attempt: delay_sum_ms(1) = 100, remaining = 300 - 100 = 200
4517        assert_eq!(p.remaining_wait_budget_ms(1), 200);
4518    }
4519
4520    // ── Round 51: stage_names_containing ──────────────────────────────────────
4521
4522    #[test]
4523    fn test_stage_names_containing_returns_all_matching_stages() {
4524        let p = Pipeline::new()
4525            .add_stage("pre_process", |s: String| Ok(s))
4526            .add_stage("post_process", |s: String| Ok(s))
4527            .add_stage("transform", |s: String| Ok(s));
4528        let names = p.stage_names_containing("process");
4529        assert_eq!(names, vec!["pre_process", "post_process"]);
4530    }
4531
4532    #[test]
4533    fn test_stage_names_containing_empty_when_no_match() {
4534        let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
4535        assert!(p.stage_names_containing("process").is_empty());
4536    }
4537
4538    // ── Round 52 ──────────────────────────────────────────────────────────────
4539
4540    #[test]
4541    fn test_stage_name_from_end_zero_returns_last_stage() {
4542        let p = Pipeline::new()
4543            .add_stage("first", |s: String| Ok(s))
4544            .add_stage("second", |s: String| Ok(s))
4545            .add_stage("third", |s: String| Ok(s));
4546        assert_eq!(p.stage_name_from_end(0), Some("third"));
4547    }
4548
4549    #[test]
4550    fn test_stage_name_from_end_one_returns_second_to_last() {
4551        let p = Pipeline::new()
4552            .add_stage("first", |s: String| Ok(s))
4553            .add_stage("second", |s: String| Ok(s))
4554            .add_stage("third", |s: String| Ok(s));
4555        assert_eq!(p.stage_name_from_end(1), Some("second"));
4556    }
4557
4558    #[test]
4559    fn test_stage_name_from_end_out_of_bounds_returns_none() {
4560        let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4561        assert_eq!(p.stage_name_from_end(1), None);
4562    }
4563
4564    #[test]
4565    fn test_stage_name_from_end_none_for_empty_pipeline() {
4566        let p = Pipeline::new();
4567        assert_eq!(p.stage_name_from_end(0), None);
4568    }
4569
4570    // ── Round 52: stage_at_index ───────────────────────────────────────────────
4571
4572    #[test]
4573    fn test_stage_at_index_returns_correct_stage() {
4574        let p = Pipeline::new()
4575            .add_stage("alpha", |s: String| Ok(s))
4576            .add_stage("beta", |s: String| Ok(s));
4577        assert_eq!(p.stage_at_index(0).map(|s| s.name.as_str()), Some("alpha"));
4578        assert_eq!(p.stage_at_index(1).map(|s| s.name.as_str()), Some("beta"));
4579    }
4580
4581    #[test]
4582    fn test_stage_at_index_none_for_out_of_bounds() {
4583        let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4584        assert!(p.stage_at_index(5).is_none());
4585    }
4586
4587    #[test]
4588    fn test_stage_at_index_none_for_empty_pipeline() {
4589        let p = Pipeline::new();
4590        assert!(p.stage_at_index(0).is_none());
4591    }
4592
4593    // ── Round 53 ──────────────────────────────────────────────────────────────
4594
4595    #[test]
4596    fn test_max_single_delay_ms_constant_policy() {
4597        let p = RetryPolicy::constant(3, 100).unwrap();
4598        assert_eq!(p.max_single_delay_ms(), 100);
4599    }
4600
4601    #[test]
4602    fn test_max_single_delay_ms_exponential_grows_with_attempts() {
4603        let p = RetryPolicy::exponential(3, 50).unwrap();
4604        // attempt 3: 50 * 2^(3-1) = 50 * 4 = 200
4605        assert_eq!(p.max_single_delay_ms(), 200);
4606    }
4607
4608    // ── Round 48 ──────────────────────────────────────────────────────────────
4609
4610    #[test]
4611    fn test_all_stage_names_returns_all_in_order() {
4612        let p = Pipeline::new()
4613            .add_stage("alpha", |s: String| Ok(s))
4614            .add_stage("beta", |s: String| Ok(s))
4615            .add_stage("gamma", |s: String| Ok(s));
4616        assert_eq!(p.all_stage_names(), vec!["alpha", "beta", "gamma"]);
4617    }
4618
4619    #[test]
4620    fn test_all_stage_names_empty_for_empty_pipeline() {
4621        let p = Pipeline::new();
4622        assert!(p.all_stage_names().is_empty());
4623    }
4624
4625    #[test]
4626    fn test_all_stage_names_preserves_duplicates() {
4627        let p = Pipeline::new()
4628            .add_stage("a", |s: String| Ok(s))
4629            .add_stage("a", |s: String| Ok(s));
4630        assert_eq!(p.all_stage_names(), vec!["a", "a"]);
4631    }
4632
4633    #[test]
4634    fn test_has_exactly_n_stages_true() {
4635        let p = Pipeline::new()
4636            .add_stage("x", |s: String| Ok(s))
4637            .add_stage("y", |s: String| Ok(s));
4638        assert!(p.has_exactly_n_stages(2));
4639    }
4640
4641    #[test]
4642    fn test_has_exactly_n_stages_false_when_different() {
4643        let p = Pipeline::new().add_stage("x", |s: String| Ok(s));
4644        assert!(!p.has_exactly_n_stages(3));
4645    }
4646
4647    #[test]
4648    fn test_has_exactly_n_stages_true_for_empty() {
4649        let p = Pipeline::new();
4650        assert!(p.has_exactly_n_stages(0));
4651    }
4652
4653    // ── Round 49 ──────────────────────────────────────────────────────────────
4654
4655    #[test]
4656    fn test_stage_index_of_returns_correct_index() {
4657        let p = Pipeline::new()
4658            .add_stage("first", |s: String| Ok(s))
4659            .add_stage("second", |s: String| Ok(s))
4660            .add_stage("third", |s: String| Ok(s));
4661        assert_eq!(p.stage_index_of("second"), Some(1));
4662    }
4663
4664    #[test]
4665    fn test_stage_index_of_returns_none_when_absent() {
4666        let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
4667        assert_eq!(p.stage_index_of("beta"), None);
4668    }
4669
4670    #[test]
4671    fn test_stage_index_of_returns_first_match_for_duplicates() {
4672        let p = Pipeline::new()
4673            .add_stage("dup", |s: String| Ok(s))
4674            .add_stage("dup", |s: String| Ok(s));
4675        assert_eq!(p.stage_index_of("dup"), Some(0));
4676    }
4677
4678    // ── Round 54 ──────────────────────────────────────────────────────────────
4679
4680    #[test]
4681    fn test_all_stage_names_start_with_true_when_all_match() {
4682        let p = Pipeline::new()
4683            .add_stage("api_v1", |s: String| Ok(s))
4684            .add_stage("api_v2", |s: String| Ok(s));
4685        assert!(p.all_stage_names_start_with("api_"));
4686    }
4687
4688    #[test]
4689    fn test_all_stage_names_start_with_false_when_one_differs() {
4690        let p = Pipeline::new()
4691            .add_stage("api_v1", |s: String| Ok(s))
4692            .add_stage("transform", |s: String| Ok(s));
4693        assert!(!p.all_stage_names_start_with("api_"));
4694    }
4695
4696    #[test]
4697    fn test_all_stage_names_start_with_true_for_empty_pipeline() {
4698        let p = Pipeline::new();
4699        assert!(p.all_stage_names_start_with("anything"));
4700    }
4701
4702    // ── Round 51 ──────────────────────────────────────────────────────────────
4703
4704    #[test]
4705    fn test_has_no_stages_true_for_empty_pipeline() {
4706        let p = Pipeline::new();
4707        assert!(p.has_no_stages());
4708    }
4709
4710    #[test]
4711    fn test_has_no_stages_false_after_adding_stage() {
4712        let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
4713        assert!(!p.has_no_stages());
4714    }
4715
4716    // ── Round 59: longest_stage_name_len ─────────────────────────────────────
4717
4718    #[test]
4719    fn test_longest_stage_name_len_returns_max() {
4720        let p = Pipeline::new()
4721            .add_stage("short", |s: String| Ok(s))
4722            .add_stage("much-longer-name", |s: String| Ok(s));
4723        assert_eq!(p.longest_stage_name_len(), "much-longer-name".len());
4724    }
4725
4726    #[test]
4727    fn test_longest_stage_name_len_zero_for_empty_pipeline() {
4728        let p = Pipeline::new();
4729        assert_eq!(p.longest_stage_name_len(), 0);
4730    }
4731
4732    // ── Round 60: stage_names_joined ─────────────────────────────────────────
4733
4734    #[test]
4735    fn test_stage_names_joined_correct() {
4736        let p = Pipeline::new()
4737            .add_stage("alpha", |s: String| Ok(s))
4738            .add_stage("beta", |s: String| Ok(s));
4739        assert_eq!(p.stage_names_joined(", "), "alpha, beta");
4740    }
4741
4742    #[test]
4743    fn test_stage_names_joined_empty_for_empty_pipeline() {
4744        let p = Pipeline::new();
4745        assert_eq!(p.stage_names_joined("|"), "");
4746    }
4747
4748    // ── Round 62: stage_count_with_name_containing ────────────────────────────
4749
4750    #[test]
4751    fn test_stage_count_with_name_containing_correct() {
4752        let p = Pipeline::new()
4753            .add_stage("preprocess_input", |s: String| Ok(s))
4754            .add_stage("process_data", |s: String| Ok(s))
4755            .add_stage("postprocess_output", |s: String| Ok(s));
4756        assert_eq!(p.stage_count_with_name_containing("process"), 3);
4757        assert_eq!(p.stage_count_with_name_containing("pre"), 1);
4758    }
4759
4760    #[test]
4761    fn test_stage_count_with_name_containing_zero_when_none_match() {
4762        let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
4763        assert_eq!(p.stage_count_with_name_containing("beta"), 0);
4764    }
4765
4766    // ── Round 63: has_stage_at_index ──────────────────────────────────────────
4767
4768    #[test]
4769    fn test_has_stage_at_index_true_for_valid_index() {
4770        let p = Pipeline::new()
4771            .add_stage("first", |s: String| Ok(s))
4772            .add_stage("second", |s: String| Ok(s));
4773        assert!(p.has_stage_at_index(0));
4774        assert!(p.has_stage_at_index(1));
4775    }
4776
4777    #[test]
4778    fn test_has_stage_at_index_false_for_out_of_bounds() {
4779        let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4780        assert!(!p.has_stage_at_index(1));
4781    }
4782
4783    #[test]
4784    fn test_has_stage_at_index_false_for_empty_pipeline() {
4785        let p = Pipeline::new();
4786        assert!(!p.has_stage_at_index(0));
4787    }
4788
4789    // ── Round 57: any_stage_has_name ─────────────────────────────────────────
4790
4791    #[test]
4792    fn test_any_stage_has_name_true_for_existing_stage() {
4793        let p = Pipeline::new()
4794            .add_stage("alpha", |s: String| Ok(s))
4795            .add_stage("beta", |s: String| Ok(s));
4796        assert!(p.any_stage_has_name("alpha"));
4797        assert!(p.any_stage_has_name("beta"));
4798    }
4799
4800    #[test]
4801    fn test_any_stage_has_name_false_for_missing_stage() {
4802        let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
4803        assert!(!p.any_stage_has_name("gamma"));
4804    }
4805
4806    #[test]
4807    fn test_any_stage_has_name_false_for_empty_pipeline() {
4808        let p = Pipeline::new();
4809        assert!(!p.any_stage_has_name("anything"));
4810    }
4811
4812    // ── Round 58: covers_n_failures ───────────────────────────────────────────
4813
4814    #[test]
4815    fn test_covers_n_failures_true_when_max_attempts_exceeds_n() {
4816        let policy = RetryPolicy::exponential(5, 100).unwrap();
4817        assert!(policy.covers_n_failures(4));
4818        assert!(policy.covers_n_failures(0));
4819    }
4820
4821    #[test]
4822    fn test_covers_n_failures_false_when_max_attempts_equals_n() {
4823        let policy = RetryPolicy::exponential(3, 100).unwrap();
4824        assert!(!policy.covers_n_failures(3));
4825    }
4826
4827    #[test]
4828    fn test_covers_n_failures_false_for_no_retry_policy() {
4829        let policy = RetryPolicy::none();
4830        // none() has max_attempts == 1, so covers_n_failures(1) is false
4831        assert!(!policy.covers_n_failures(1));
4832    }
4833
4834    // ── Round 59: last_stage_name ─────────────────────────────────────────────
4835
4836    #[test]
4837    fn test_last_stage_name_returns_last_added() {
4838        let p = Pipeline::new()
4839            .add_stage("first", |s: String| Ok(s))
4840            .add_stage("last", |s: String| Ok(s));
4841        assert_eq!(p.last_stage_name(), Some("last"));
4842    }
4843
4844    #[test]
4845    fn test_last_stage_name_none_for_empty_pipeline() {
4846        let p = Pipeline::new();
4847        assert!(p.last_stage_name().is_none());
4848    }
4849
4850    // ── Round 62: stage_name_at ───────────────────────────────────────────────
4851
4852    #[test]
4853    fn test_stage_name_at_returns_correct_name() {
4854        let p = Pipeline::new()
4855            .add_stage("alpha", |s: String| Ok(s))
4856            .add_stage("beta", |s: String| Ok(s));
4857        assert_eq!(p.stage_name_at(0), Some("alpha"));
4858        assert_eq!(p.stage_name_at(1), Some("beta"));
4859    }
4860
4861    #[test]
4862    fn test_stage_name_at_none_for_out_of_bounds() {
4863        let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4864        assert!(p.stage_name_at(1).is_none());
4865    }
4866
4867    #[test]
4868    fn test_stage_name_at_none_for_empty_pipeline() {
4869        let p = Pipeline::new();
4870        assert!(p.stage_name_at(0).is_none());
4871    }
4872
4873    // ── Round 63: all_stage_names_contain ─────────────────────────────────────
4874
4875    #[test]
4876    fn test_all_stage_names_contain_true_when_all_match() {
4877        let p = Pipeline::new()
4878            .add_stage("step_alpha", |s: String| Ok(s))
4879            .add_stage("step_beta", |s: String| Ok(s));
4880        assert!(p.all_stage_names_contain("step_"));
4881    }
4882
4883    #[test]
4884    fn test_all_stage_names_contain_false_when_one_does_not_match() {
4885        let p = Pipeline::new()
4886            .add_stage("step_alpha", |s: String| Ok(s))
4887            .add_stage("gamma", |s: String| Ok(s));
4888        assert!(!p.all_stage_names_contain("step_"));
4889    }
4890
4891    #[test]
4892    fn test_all_stage_names_contain_true_for_empty_pipeline() {
4893        let p = Pipeline::new();
4894        assert!(p.all_stage_names_contain("anything"));
4895    }
4896}