Skip to main content

llm_agent_runtime/
orchestrator.rs

1//! # Module: Orchestrator
2//!
3//! ## Responsibility
4//! Provides a composable LLM pipeline with circuit breaking, retry, deduplication,
5//! and backpressure. Mirrors the public API of `tokio-prompt-orchestrator`.
6//!
7//! ## Guarantees
8//! - Thread-safe: all types wrap state in `Arc<Mutex<_>>` or atomics
9//! - Circuit breaker opens after `threshold` consecutive failures
10//! - RetryPolicy delays grow exponentially and are capped at [`MAX_RETRY_DELAY`]
11//! - Deduplicator is deterministic and non-blocking
12//! - BackpressureGuard never exceeds declared hard capacity
13//! - Non-panicking: all operations return `Result`
14//!
15//! ## NOT Responsible For
16//! - Cross-node circuit breakers (single-process only, unless a distributed backend is provided)
17//! - Persistent deduplication (in-memory, bounded TTL)
18//! - Distributed backpressure
19//!
20//! ## Composing the Primitives
21//!
22//! The four primitives are designed to be layered. A typical production setup:
23//!
24//! ```text
25//! request
26//!   │
27//!   ▼
28//! BackpressureGuard  ← shed if too many in-flight requests
29//!   │
30//!   ▼
31//! Deduplicator       ← return cached result for duplicate keys
32//!   │
33//!   ▼
34//! CircuitBreaker     ← fast-fail if the downstream is unhealthy
35//!   │
36//!   ▼
37//! RetryPolicy        ← retry transient failures with exponential backoff
38//!   │
39//!   ▼
40//! Pipeline           ← transform request/response through named stages
41//! ```
42
43use crate::error::AgentRuntimeError;
44use crate::util::timed_lock;
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, Instant};
48
49/// Maximum delay between retries — caps exponential growth.
50pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
51
52// ── RetryPolicy ───────────────────────────────────────────────────────────────
53
54/// Retry mode: exponential backoff or constant interval.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum RetryKind {
57    /// Delay doubles each attempt: `base_delay * 2^(attempt-1)`.
58    Exponential,
59    /// Delay is fixed at `base_delay` for every attempt.
60    Constant,
61}
62
63/// Configurable retry policy with exponential backoff or constant interval.
64#[derive(Debug, Clone)]
65pub struct RetryPolicy {
66    /// Maximum number of attempts (including the first).
67    pub max_attempts: u32,
68    /// Base delay for the first retry.
69    pub base_delay: Duration,
70    /// Whether to use exponential or constant delay.
71    pub kind: RetryKind,
72}
73
74impl RetryPolicy {
75    /// Create an exponential retry policy.
76    ///
77    /// # Arguments
78    /// * `max_attempts` — total attempt budget (must be ≥ 1)
79    /// * `base_ms` — base delay in milliseconds for attempt 1
80    ///
81    /// # Returns
82    /// - `Ok(RetryPolicy)` — on success
83    /// - `Err(AgentRuntimeError::Orchestration)` — if `max_attempts == 0`
84    pub fn exponential(max_attempts: u32, base_ms: u64) -> Result<Self, AgentRuntimeError> {
85        if max_attempts == 0 {
86            return Err(AgentRuntimeError::Orchestration(
87                "max_attempts must be >= 1".into(),
88            ));
89        }
90        if base_ms == 0 {
91            return Err(AgentRuntimeError::Orchestration(
92                "base_ms must be >= 1 to avoid zero-delay busy-loop retries".into(),
93            ));
94        }
95        Ok(Self {
96            max_attempts,
97            base_delay: Duration::from_millis(base_ms),
98            kind: RetryKind::Exponential,
99        })
100    }
101
102    /// Create a constant (fixed-interval) retry policy.
103    ///
104    /// Every retry waits exactly `delay_ms` milliseconds regardless of attempt
105    /// number, unlike [`exponential`] which doubles the delay each time.
106    ///
107    /// [`exponential`]: RetryPolicy::exponential
108    ///
109    /// # Returns
110    /// - `Ok(RetryPolicy)` — on success
111    /// - `Err(AgentRuntimeError::Orchestration)` — if `max_attempts == 0` or `delay_ms == 0`
112    pub fn constant(max_attempts: u32, delay_ms: u64) -> Result<Self, AgentRuntimeError> {
113        if max_attempts == 0 {
114            return Err(AgentRuntimeError::Orchestration(
115                "max_attempts must be >= 1".into(),
116            ));
117        }
118        if delay_ms == 0 {
119            return Err(AgentRuntimeError::Orchestration(
120                "delay_ms must be >= 1 to avoid busy-loop retries".into(),
121            ));
122        }
123        Ok(Self {
124            max_attempts,
125            base_delay: Duration::from_millis(delay_ms),
126            kind: RetryKind::Constant,
127        })
128    }
129
130    /// Create a no-retry policy (single attempt, no delay).
131    ///
132    /// Useful for one-shot operations or when the caller manages retry logic externally.
133    pub fn none() -> Self {
134        Self {
135            max_attempts: 1,
136            base_delay: Duration::ZERO,
137            kind: RetryKind::Constant,
138        }
139    }
140
141    /// Return `true` if this policy makes at most one attempt with no delay.
142    ///
143    /// Equivalent to `max_attempts == 1 && base_delay == Duration::ZERO`.
144    pub fn is_none(&self) -> bool {
145        self.max_attempts == 1 && self.base_delay == Duration::ZERO
146    }
147
148    /// Return a copy of this policy with `max_attempts` changed.
149    ///
150    /// # Errors
151    /// Returns `Err` if `n == 0`.
152    pub fn with_max_attempts(mut self, n: u32) -> Result<Self, AgentRuntimeError> {
153        if n == 0 {
154            return Err(AgentRuntimeError::Orchestration(
155                "max_attempts must be >= 1".into(),
156            ));
157        }
158        self.max_attempts = n;
159        Ok(self)
160    }
161
162    /// Return the configured maximum number of attempts.
163    pub fn max_attempts(&self) -> u32 {
164        self.max_attempts
165    }
166
167    /// Return `true` if this policy performs no retries (max_attempts ≤ 1).
168    ///
169    /// Useful for short-circuiting retry logic in hot paths.
170    pub fn is_no_retry(&self) -> bool {
171        self.max_attempts <= 1
172    }
173
174    /// Return `true` if this policy allows at least one retry (max_attempts > 1).
175    ///
176    /// Complement of [`is_no_retry`].
177    ///
178    /// [`is_no_retry`]: RetryPolicy::is_no_retry
179    pub fn will_retry_at_all(&self) -> bool {
180        self.max_attempts > 1
181    }
182
183    /// Return `true` if this policy uses exponential back-off between retries.
184    pub fn is_exponential(&self) -> bool {
185        matches!(self.kind, RetryKind::Exponential)
186    }
187
188    /// Return `true` if this policy uses a constant (fixed-interval) delay between retries.
189    pub fn is_constant(&self) -> bool {
190        matches!(self.kind, RetryKind::Constant)
191    }
192
193    /// Return the configured base delay in milliseconds.
194    ///
195    /// For constant policies this equals every per-retry delay.  For
196    /// exponential policies this is the delay before the first retry.
197    pub fn base_delay_ms(&self) -> u64 {
198        self.base_delay.as_millis() as u64
199    }
200
201    /// Return the delay before the first retry in milliseconds.
202    ///
203    /// Alias for `base_delay_ms`; the name communicates intent more clearly at
204    /// call sites that only care about the first-retry delay.
205    pub fn first_delay_ms(&self) -> u64 {
206        self.base_delay_ms()
207    }
208
209    /// Return `true` if `attempt` is the last allowed attempt for this policy.
210    ///
211    /// `attempt` is 1-indexed: `attempt == max_attempts` means no more retries.
212    pub fn is_last_attempt(&self, attempt: u32) -> bool {
213        attempt >= self.max_attempts
214    }
215
216    /// Return the sum of all per-attempt delays across all attempts, in milliseconds.
217    ///
218    /// For exponential policies each attempt's delay is capped at
219    /// [`MAX_RETRY_DELAY`].  For constant policies every attempt uses
220    /// `base_delay_ms`.
221    pub fn max_total_delay_ms(&self) -> u64 {
222        (1..=self.max_attempts)
223            .map(|attempt| self.delay_for(attempt).as_millis() as u64)
224            .sum()
225    }
226
227    /// Return the sum of delays for the first `n` attempts, in milliseconds.
228    ///
229    /// If `n > max_attempts`, only `max_attempts` delays are summed.
230    pub fn delay_sum_ms(&self, n: u32) -> u64 {
231        let limit = n.min(self.max_attempts);
232        (1..=limit)
233            .map(|attempt| self.delay_for(attempt).as_millis() as u64)
234            .sum()
235    }
236
237    /// Return the average delay per attempt in milliseconds.
238    ///
239    /// Returns `0` for policies with no delay (e.g. `RetryPolicy::none()`).
240    pub fn avg_delay_ms(&self) -> u64 {
241        if self.max_attempts == 0 {
242            return 0;
243        }
244        self.max_total_delay_ms() / self.max_attempts as u64
245    }
246
247    /// Return the effective backoff factor per attempt.
248    ///
249    /// Returns `2.0` for exponential policies and `1.0` for constant policies.
250    pub fn backoff_factor(&self) -> f64 {
251        match self.kind {
252            RetryKind::Exponential => 2.0,
253            RetryKind::Constant => 1.0,
254        }
255    }
256
257    /// Return a copy of this policy with the base delay changed to `ms` milliseconds.
258    ///
259    /// # Errors
260    /// Returns `Err` if `ms == 0`.
261    pub fn with_base_delay_ms(mut self, ms: u64) -> Result<Self, AgentRuntimeError> {
262        if ms == 0 {
263            return Err(AgentRuntimeError::Orchestration(
264                "base_delay_ms must be >= 1 to avoid busy-loop retries".into(),
265            ));
266        }
267        self.base_delay = Duration::from_millis(ms);
268        Ok(self)
269    }
270
271    /// Return the delay for `attempt` in whole milliseconds.
272    ///
273    /// Convenience wrapper around [`delay_for`] for use in logging and metrics
274    /// where a `u64` is easier to handle than a `Duration`.
275    ///
276    /// [`delay_for`]: RetryPolicy::delay_for
277    pub fn delay_ms_for(&self, attempt: u32) -> u64 {
278        self.delay_for(attempt).as_millis() as u64
279    }
280
281    /// Return the total maximum delay in milliseconds across all retry attempts.
282    ///
283    /// Sums `delay_for(attempt)` for every attempt from 1 to `max_attempts`.
284    /// Useful for estimating worst-case latency budgets.
285    pub fn total_max_delay_ms(&self) -> u64 {
286        (1..=self.max_attempts)
287            .map(|a| self.delay_for(a).as_millis() as u64)
288            .sum()
289    }
290
291    /// Return the number of attempts still available after `attempt` have been made.
292    ///
293    /// Returns `0` once the budget is exhausted (`attempt >= max_attempts`).
294    pub fn attempts_remaining(&self, attempt: u32) -> u32 {
295        self.max_attempts.saturating_sub(attempt)
296    }
297
298    /// Return `true` if another attempt is permitted after `attempt` failures.
299    ///
300    /// `attempt` is the number of attempts already made (0-based: `0` means
301    /// no attempt has been made yet).  Returns `false` once the budget is
302    /// exhausted (i.e. `attempt >= max_attempts`).
303    pub fn can_retry(&self, attempt: u32) -> bool {
304        attempt < self.max_attempts
305    }
306
307    /// Compute the delay before the given attempt number (1-based).
308    ///
309    /// - [`RetryKind::Exponential`]: `base_delay * 2^(attempt-1)`, capped at `MAX_RETRY_DELAY`.
310    /// - [`RetryKind::Constant`]: always returns `base_delay`.
311    pub fn delay_for(&self, attempt: u32) -> Duration {
312        match self.kind {
313            RetryKind::Constant => self.base_delay.min(MAX_RETRY_DELAY),
314            RetryKind::Exponential => {
315                let exp = attempt.saturating_sub(1);
316                let multiplier = 1u64.checked_shl(exp.min(63)).unwrap_or(u64::MAX);
317                let millis = self
318                    .base_delay
319                    .as_millis()
320                    .saturating_mul(multiplier as u128);
321                let raw = Duration::from_millis(millis.min(u64::MAX as u128) as u64);
322                raw.min(MAX_RETRY_DELAY)
323            }
324        }
325    }
326}
327
328// ── CircuitBreaker ────────────────────────────────────────────────────────────
329
330/// Tracks failure rates and opens when the threshold is exceeded.
331///
332/// States: `Closed` (normal) → `Open` (fast-fail) → `HalfOpen` (probe).
333///
334/// Note: `PartialEq` is implemented manually because the `Open` variant
335/// contains `std::time::Instant` which does not implement `Eq`. The manual
336/// implementation compares only the variant discriminant, not the timestamp.
337#[derive(Debug, Clone)]
338pub enum CircuitState {
339    /// Circuit is operating normally; requests pass through.
340    Closed,
341    /// Circuit has tripped; requests are fast-failed without calling the operation.
342    Open {
343        /// The instant at which the circuit was opened.
344        opened_at: Instant,
345    },
346    /// Recovery probe period; the next request will be attempted to test recovery.
347    HalfOpen,
348}
349
350impl PartialEq for CircuitState {
351    fn eq(&self, other: &Self) -> bool {
352        match (self, other) {
353            (CircuitState::Closed, CircuitState::Closed) => true,
354            (CircuitState::Open { .. }, CircuitState::Open { .. }) => true,
355            (CircuitState::HalfOpen, CircuitState::HalfOpen) => true,
356            _ => false,
357        }
358    }
359}
360
361impl Eq for CircuitState {}
362
363/// Backend for circuit breaker state storage.
364///
365/// Implement this trait to share circuit breaker state across processes
366/// (e.g., via Redis). The in-process default is `InMemoryCircuitBreakerBackend`.
367///
368/// Note: Methods are synchronous to avoid pulling in `async-trait`. A
369/// distributed backend (e.g., Redis) can internally spawn a Tokio runtime.
370pub trait CircuitBreakerBackend: Send + Sync {
371    /// Increment the consecutive failure count for `service` and return the new count.
372    fn increment_failures(&self, service: &str) -> u32;
373    /// Reset the consecutive failure count for `service` to zero.
374    fn reset_failures(&self, service: &str);
375    /// Return the current consecutive failure count for `service`.
376    fn get_failures(&self, service: &str) -> u32;
377    /// Record the instant at which the circuit was opened for `service`.
378    fn set_open_at(&self, service: &str, at: std::time::Instant);
379    /// Clear the open-at timestamp, effectively moving the circuit to Closed or HalfOpen.
380    fn clear_open_at(&self, service: &str);
381    /// Return the instant at which the circuit was opened, or `None` if it is not open.
382    fn get_open_at(&self, service: &str) -> Option<std::time::Instant>;
383}
384
385// ── InMemoryCircuitBreakerBackend ─────────────────────────────────────────────
386
387/// In-process circuit breaker backend backed by a `Mutex<HashMap>`.
388///
389/// Each service name gets its own independent failure counter and open-at
390/// timestamp.  Multiple `CircuitBreaker` instances that share the same
391/// backend (via [`CircuitBreaker::with_backend`]) will correctly track
392/// failures per service rather than sharing a single counter.
393pub struct InMemoryCircuitBreakerBackend {
394    inner: Arc<Mutex<HashMap<String, InMemoryServiceState>>>,
395}
396
397#[derive(Default)]
398struct InMemoryServiceState {
399    consecutive_failures: u32,
400    open_at: Option<std::time::Instant>,
401}
402
403impl InMemoryCircuitBreakerBackend {
404    /// Create a new in-memory backend with all counters at zero.
405    pub fn new() -> Self {
406        Self {
407            inner: Arc::new(Mutex::new(HashMap::new())),
408        }
409    }
410}
411
412impl Default for InMemoryCircuitBreakerBackend {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418impl CircuitBreakerBackend for InMemoryCircuitBreakerBackend {
419    fn increment_failures(&self, service: &str) -> u32 {
420        let mut map = timed_lock(
421            &self.inner,
422            "InMemoryCircuitBreakerBackend::increment_failures",
423        );
424        let state = map.entry(service.to_owned()).or_default();
425        state.consecutive_failures += 1;
426        state.consecutive_failures
427    }
428
429    fn reset_failures(&self, service: &str) {
430        let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::reset_failures");
431        if let Some(state) = map.get_mut(service) {
432            state.consecutive_failures = 0;
433        }
434    }
435
436    fn get_failures(&self, service: &str) -> u32 {
437        let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_failures");
438        map.get(service).map_or(0, |s| s.consecutive_failures)
439    }
440
441    fn set_open_at(&self, service: &str, at: std::time::Instant) {
442        let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::set_open_at");
443        map.entry(service.to_owned()).or_default().open_at = Some(at);
444    }
445
446    fn clear_open_at(&self, service: &str) {
447        let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::clear_open_at");
448        if let Some(state) = map.get_mut(service) {
449            state.open_at = None;
450        }
451    }
452
453    fn get_open_at(&self, service: &str) -> Option<std::time::Instant> {
454        let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_open_at");
455        map.get(service).and_then(|s| s.open_at)
456    }
457}
458
459// ── CircuitBreaker ────────────────────────────────────────────────────────────
460
461/// Circuit breaker guarding a fallible operation.
462///
463/// ## Guarantees
464/// - Opens after `threshold` consecutive failures
465/// - Transitions to `HalfOpen` after `recovery_window` has elapsed
466/// - Closes on the first successful probe in `HalfOpen`
467#[derive(Clone)]
468pub struct CircuitBreaker {
469    threshold: u32,
470    recovery_window: Duration,
471    service: String,
472    backend: Arc<dyn CircuitBreakerBackend>,
473}
474
475impl std::fmt::Debug for CircuitBreaker {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        f.debug_struct("CircuitBreaker")
478            .field("threshold", &self.threshold)
479            .field("recovery_window", &self.recovery_window)
480            .field("service", &self.service)
481            .finish()
482    }
483}
484
485impl CircuitBreaker {
486    /// Create a new circuit breaker backed by an in-memory backend.
487    ///
488    /// # Arguments
489    /// * `service` — name used in error messages and logs
490    /// * `threshold` — consecutive failures before opening
491    /// * `recovery_window` — how long to stay open before probing
492    pub fn new(
493        service: impl Into<String>,
494        threshold: u32,
495        recovery_window: Duration,
496    ) -> Result<Self, AgentRuntimeError> {
497        if threshold == 0 {
498            return Err(AgentRuntimeError::Orchestration(
499                "circuit breaker threshold must be >= 1".into(),
500            ));
501        }
502        let service = service.into();
503        Ok(Self {
504            threshold,
505            recovery_window,
506            service,
507            backend: Arc::new(InMemoryCircuitBreakerBackend::new()),
508        })
509    }
510
511    /// Replace the default in-memory backend with a custom one.
512    ///
513    /// Useful for sharing circuit breaker state across processes.
514    pub fn with_backend(mut self, backend: Arc<dyn CircuitBreakerBackend>) -> Self {
515        self.backend = backend;
516        self
517    }
518
519    /// Attempt to call `f`, respecting the circuit breaker state.
520    ///
521    /// # Errors
522    /// - `AgentRuntimeError::CircuitOpen` — the breaker is in the `Open` state
523    ///   and the recovery window has not yet elapsed
524    /// - `AgentRuntimeError::Orchestration` — `f` returned an error; the error
525    ///   message is the `Display` of the inner error. This call may open the
526    ///   breaker if it pushes the consecutive failure count above `threshold`.
527    #[tracing::instrument(skip(self, f))]
528    pub fn call<T, E, F>(&self, f: F) -> Result<T, AgentRuntimeError>
529    where
530        F: FnOnce() -> Result<T, E>,
531        E: std::fmt::Display,
532    {
533        // Determine effective state, potentially transitioning Open → HalfOpen.
534        let effective_state = match self.backend.get_open_at(&self.service) {
535            Some(opened_at) => {
536                if opened_at.elapsed() >= self.recovery_window {
537                    // Clear open_at to signal HalfOpen; failures remain.
538                    self.backend.clear_open_at(&self.service);
539                    tracing::info!("circuit moved to half-open for {}", self.service);
540                    CircuitState::HalfOpen
541                } else {
542                    CircuitState::Open { opened_at }
543                }
544            }
545            None => {
546                // Either Closed or HalfOpen (after a prior transition).
547                // We distinguish by checking whether failures >= threshold
548                // but no open_at is set — that means we are in HalfOpen.
549                let failures = self.backend.get_failures(&self.service);
550                if failures >= self.threshold {
551                    CircuitState::HalfOpen
552                } else {
553                    CircuitState::Closed
554                }
555            }
556        };
557
558        tracing::debug!("circuit state: {:?}", effective_state);
559
560        match effective_state {
561            CircuitState::Open { .. } => {
562                return Err(AgentRuntimeError::CircuitOpen {
563                    service: self.service.clone(),
564                });
565            }
566            CircuitState::Closed | CircuitState::HalfOpen => {}
567        }
568
569        // Execute the operation.
570        match f() {
571            Ok(val) => {
572                self.backend.reset_failures(&self.service);
573                self.backend.clear_open_at(&self.service);
574                tracing::info!("circuit closed for {}", self.service);
575                Ok(val)
576            }
577            Err(e) => {
578                let failures = self.backend.increment_failures(&self.service);
579                if failures >= self.threshold {
580                    let now = Instant::now();
581                    self.backend.set_open_at(&self.service, now);
582                    tracing::info!("circuit opened for {}", self.service);
583                }
584                Err(AgentRuntimeError::Orchestration(e.to_string()))
585            }
586        }
587    }
588
589    /// Return the current circuit state.
590    pub fn state(&self) -> Result<CircuitState, AgentRuntimeError> {
591        let state = match self.backend.get_open_at(&self.service) {
592            Some(opened_at) => {
593                if opened_at.elapsed() >= self.recovery_window {
594                    // Would transition to HalfOpen on next call; report HalfOpen.
595                    let failures = self.backend.get_failures(&self.service);
596                    if failures >= self.threshold {
597                        CircuitState::HalfOpen
598                    } else {
599                        CircuitState::Closed
600                    }
601                } else {
602                    CircuitState::Open { opened_at }
603                }
604            }
605            None => {
606                let failures = self.backend.get_failures(&self.service);
607                if failures >= self.threshold {
608                    CircuitState::HalfOpen
609                } else {
610                    CircuitState::Closed
611                }
612            }
613        };
614        Ok(state)
615    }
616
617    /// Return the consecutive failure count.
618    pub fn failure_count(&self) -> Result<u32, AgentRuntimeError> {
619        Ok(self.backend.get_failures(&self.service))
620    }
621
622    /// Record a successful call, resetting the consecutive failure counter.
623    ///
624    /// Call this when a protected operation succeeds so the circuit can
625    /// transition back to `Closed` after a `HalfOpen` probe.
626    pub fn record_success(&self) {
627        self.backend.reset_failures(&self.service);
628        self.backend.clear_open_at(&self.service);
629    }
630
631    /// Record a failed call, incrementing the consecutive failure counter.
632    ///
633    /// Opens the circuit when the failure count reaches `threshold`.
634    pub fn record_failure(&self) {
635        let failures = self.backend.increment_failures(&self.service);
636        if failures >= self.threshold {
637            self.backend.set_open_at(&self.service, Instant::now());
638            tracing::info!("circuit opened for {} (manual record)", self.service);
639        }
640    }
641
642    /// Return the service name this circuit breaker is protecting.
643    pub fn service_name(&self) -> &str {
644        &self.service
645    }
646
647    /// Return `true` if the circuit is currently `Closed` (healthy).
648    pub fn is_closed(&self) -> bool {
649        matches!(self.state(), Ok(CircuitState::Closed))
650    }
651
652    /// Return `true` if the circuit is currently `Open` (fast-failing).
653    pub fn is_open(&self) -> bool {
654        matches!(self.state(), Ok(CircuitState::Open { .. }))
655    }
656
657    /// Return `true` if the circuit is currently `HalfOpen` (probing).
658    pub fn is_half_open(&self) -> bool {
659        matches!(self.state(), Ok(CircuitState::HalfOpen))
660    }
661
662    /// Return `true` if the circuit is in a state that allows calls to proceed.
663    ///
664    /// Calls are allowed in both `Closed` and `HalfOpen` states; only `Open`
665    /// fast-fails.
666    pub fn is_healthy(&self) -> bool {
667        !self.is_open()
668    }
669
670    /// Return the configured consecutive-failure threshold.
671    ///
672    /// The circuit opens when `failure_count()` reaches this value.
673    pub fn threshold(&self) -> u32 {
674        self.threshold
675    }
676
677    /// Return the current failure count as a ratio of the threshold.
678    ///
679    /// Returns a value in `[0.0, 1.0]` where `1.0` (or greater) means the
680    /// circuit will open (or is already open).  Returns `0.0` when the
681    /// threshold is zero to avoid division by zero.
682    pub fn failure_rate(&self) -> f64 {
683        if self.threshold == 0 {
684            return 0.0;
685        }
686        let failures = self.backend.get_failures(&self.service);
687        failures as f64 / self.threshold as f64
688    }
689
690    /// Return `true` when `failure_count()` has reached the configured threshold.
691    ///
692    /// The circuit opens immediately when this returns `true` on the next
693    /// `record_failure` call.
694    pub fn is_at_threshold(&self) -> bool {
695        let failures = self.backend.get_failures(&self.service);
696        failures >= self.threshold
697    }
698
699    /// Return the number of additional failures needed to open the circuit.
700    ///
701    /// Returns `0` when the circuit is already at or beyond threshold.
702    pub fn failures_until_open(&self) -> u32 {
703        let failures = self.backend.get_failures(&self.service);
704        self.threshold.saturating_sub(failures)
705    }
706
707    /// Return the configured recovery window duration.
708    ///
709    /// After the circuit has been `Open` for this long, it transitions to
710    /// `HalfOpen` and allows the next call through as a recovery probe.
711    pub fn recovery_window(&self) -> std::time::Duration {
712        self.recovery_window
713    }
714
715    /// Force the circuit back to `Closed` state, resetting all failure counters.
716    ///
717    /// Useful for tests and manual operator recovery.  Under normal operation
718    /// the circuit closes automatically after a successful `HalfOpen` probe.
719    pub fn reset(&self) {
720        self.backend.reset_failures(&self.service);
721        self.backend.clear_open_at(&self.service);
722        tracing::info!("circuit manually reset to Closed for {}", self.service);
723    }
724
725    /// Execute an async fallible operation under the circuit breaker using an
726    /// [`AsyncCircuitBreakerBackend`].
727    ///
728    /// This is the async counterpart of [`call`] and is intended for backends
729    /// that perform genuine async I/O (e.g. Redis, etcd, distributed stores).
730    /// The in-process default can be used via [`InMemoryCircuitBreakerBackend`]
731    /// which trivially implements `AsyncCircuitBreakerBackend`.
732    ///
733    /// [`call`]: CircuitBreaker::call
734    #[tracing::instrument(skip(self, backend, f))]
735    pub async fn async_call<T, E, F, Fut>(
736        &self,
737        backend: &dyn AsyncCircuitBreakerBackend,
738        f: F,
739    ) -> Result<T, AgentRuntimeError>
740    where
741        F: FnOnce() -> Fut,
742        Fut: std::future::Future<Output = Result<T, E>>,
743        E: std::fmt::Display,
744    {
745        // Determine effective state via async backend.
746        let effective_state = match backend.get_open_at(&self.service).await {
747            Some(opened_at) => {
748                if opened_at.elapsed() >= self.recovery_window {
749                    backend.clear_open_at(&self.service).await;
750                    tracing::info!("circuit async moved to half-open for {}", self.service);
751                    CircuitState::HalfOpen
752                } else {
753                    CircuitState::Open { opened_at }
754                }
755            }
756            None => {
757                let failures = backend.get_failures(&self.service).await;
758                if failures >= self.threshold {
759                    CircuitState::HalfOpen
760                } else {
761                    CircuitState::Closed
762                }
763            }
764        };
765
766        if let CircuitState::Open { .. } = effective_state {
767            return Err(AgentRuntimeError::CircuitOpen {
768                service: self.service.clone(),
769            });
770        }
771
772        match f().await {
773            Ok(val) => {
774                backend.reset_failures(&self.service).await;
775                backend.clear_open_at(&self.service).await;
776                Ok(val)
777            }
778            Err(e) => {
779                let failures = backend.increment_failures(&self.service).await;
780                if failures >= self.threshold {
781                    backend
782                        .set_open_at(&self.service, Instant::now())
783                        .await;
784                    tracing::info!("circuit async opened for {}", self.service);
785                }
786                Err(AgentRuntimeError::Orchestration(e.to_string()))
787            }
788        }
789    }
790}
791
792// ── AsyncCircuitBreakerBackend ────────────────────────────────────────────────
793
794/// Async counterpart of [`CircuitBreakerBackend`] for distributed backends.
795///
796/// Implement this trait for backends that require genuine async I/O — e.g. Redis,
797/// etcd, or any network-based store — so they don't need to embed their own
798/// blocking runtime.
799///
800/// [`InMemoryCircuitBreakerBackend`] implements this trait with trivially-async
801/// wrappers for use in testing and single-process deployments.
802#[async_trait::async_trait]
803pub trait AsyncCircuitBreakerBackend: Send + Sync {
804    /// Increment the consecutive failure count and return the new count.
805    async fn increment_failures(&self, service: &str) -> u32;
806    /// Reset the consecutive failure count to zero.
807    async fn reset_failures(&self, service: &str);
808    /// Return the current consecutive failure count.
809    async fn get_failures(&self, service: &str) -> u32;
810    /// Record the instant at which the circuit was opened.
811    async fn set_open_at(&self, service: &str, at: Instant);
812    /// Clear the open-at timestamp.
813    async fn clear_open_at(&self, service: &str);
814    /// Return the instant at which the circuit was opened, or `None`.
815    async fn get_open_at(&self, service: &str) -> Option<Instant>;
816}
817
818#[async_trait::async_trait]
819impl AsyncCircuitBreakerBackend for InMemoryCircuitBreakerBackend {
820    async fn increment_failures(&self, service: &str) -> u32 {
821        <Self as CircuitBreakerBackend>::increment_failures(self, service)
822    }
823    async fn reset_failures(&self, service: &str) {
824        <Self as CircuitBreakerBackend>::reset_failures(self, service);
825    }
826    async fn get_failures(&self, service: &str) -> u32 {
827        <Self as CircuitBreakerBackend>::get_failures(self, service)
828    }
829    async fn set_open_at(&self, service: &str, at: Instant) {
830        <Self as CircuitBreakerBackend>::set_open_at(self, service, at);
831    }
832    async fn clear_open_at(&self, service: &str) {
833        <Self as CircuitBreakerBackend>::clear_open_at(self, service);
834    }
835    async fn get_open_at(&self, service: &str) -> Option<Instant> {
836        <Self as CircuitBreakerBackend>::get_open_at(self, service)
837    }
838}
839
840// ── DeduplicationResult ───────────────────────────────────────────────────────
841
842/// Result of a deduplication check.
843#[derive(Debug, Clone, PartialEq)]
844pub enum DeduplicationResult {
845    /// This is a new, unseen request.
846    New,
847    /// A cached result exists for this key.
848    Cached(String),
849    /// A matching request is currently in-flight.
850    InProgress,
851}
852
853/// Deduplicates requests by key within a TTL window.
854///
855/// ## Guarantees
856/// - Deterministic: same key always maps to the same result
857/// - Thread-safe via `Arc<Mutex<_>>`
858/// - Entries expire after `ttl`
859/// - Optional `max_entries` cap bounds memory independently of TTL
860#[derive(Debug, Clone)]
861pub struct Deduplicator {
862    ttl: Duration,
863    /// Optional hard cap on cached entries. When exceeded the oldest entry is
864    /// evicted before inserting the new one, bounding memory growth even when
865    /// all keys are unique and none have expired yet.
866    max_entries: Option<usize>,
867    inner: Arc<Mutex<DeduplicatorInner>>,
868}
869
870#[derive(Debug)]
871struct DeduplicatorInner {
872    cache: HashMap<String, (String, Instant)>, // key → (result, inserted_at)
873    in_flight: HashMap<String, Instant>,       // key → started_at
874    /// Insertion-ordered keys for O(1) FIFO eviction when `max_entries` is set.
875    cache_order: std::collections::VecDeque<String>,
876    /// Tracks calls since the last full expiry scan. Full scans run every
877    /// `EXPIRY_INTERVAL` calls; per-key inline checks maintain correctness
878    /// between scans.
879    call_count: u64,
880}
881
882impl Deduplicator {
883    /// Create a new deduplicator with the given TTL.
884    pub fn new(ttl: Duration) -> Self {
885        Self {
886            ttl,
887            max_entries: None,
888            inner: Arc::new(Mutex::new(DeduplicatorInner {
889                cache: HashMap::new(),
890                in_flight: HashMap::new(),
891                cache_order: std::collections::VecDeque::new(),
892                call_count: 0,
893            })),
894        }
895    }
896
897    /// Set a hard cap on the number of cached (completed) entries.
898    ///
899    /// When the cache is full the oldest entry (by insertion time) is evicted
900    /// before the new entry is stored.  This bounds memory growth for workloads
901    /// where all request keys are unique and the TTL has not yet expired.
902    ///
903    /// # Returns
904    /// - `Err(AgentRuntimeError::Orchestration)` if `max == 0`
905    pub fn with_max_entries(mut self, max: usize) -> Result<Self, AgentRuntimeError> {
906        if max == 0 {
907            return Err(AgentRuntimeError::Orchestration(
908                "Deduplicator max_entries must be >= 1".into(),
909            ));
910        }
911        self.max_entries = Some(max);
912        Ok(self)
913    }
914
915    /// Check whether `key` is new, cached, or in-flight.
916    ///
917    /// Marks the key as in-flight if it is new.
918    pub fn check_and_register(&self, key: &str) -> Result<DeduplicationResult, AgentRuntimeError> {
919        let mut inner = timed_lock(&self.inner, "Deduplicator::check_and_register");
920
921        let now = Instant::now();
922
923        // Lazy expiry: full O(n) retain scan runs only every EXPIRY_INTERVAL
924        // calls, amortising the cost. Per-key inline checks below keep
925        // correctness between scans.
926        const EXPIRY_INTERVAL: u64 = 64;
927        inner.call_count = inner.call_count.wrapping_add(1);
928        if inner.call_count % EXPIRY_INTERVAL == 0 {
929            let ttl = self.ttl;
930            inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
931            inner
932                .in_flight
933                .retain(|_, ts| now.duration_since(*ts) < ttl);
934        }
935
936        // Inline expiry check for this specific key.
937        match inner.cache.get(key) {
938            Some((result, ts)) if now.duration_since(*ts) < self.ttl => {
939                return Ok(DeduplicationResult::Cached(result.clone()));
940            }
941            Some(_) => {
942                inner.cache.remove(key); // entry is expired
943            }
944            None => {}
945        }
946        match inner.in_flight.get(key) {
947            Some(ts) if now.duration_since(*ts) < self.ttl => {
948                return Ok(DeduplicationResult::InProgress);
949            }
950            Some(_) => {
951                inner.in_flight.remove(key); // in-flight entry is expired
952            }
953            None => {}
954        }
955
956        inner.in_flight.insert(key.to_owned(), now);
957        Ok(DeduplicationResult::New)
958    }
959
960    /// Check deduplication state for a key with a per-call TTL override.
961    ///
962    /// Marks the key as in-flight if it is new. Ignores the stored TTL and uses
963    /// `ttl` instead for expiry checks.
964    pub fn check(&self, key: &str, ttl: std::time::Duration) -> Result<DeduplicationResult, AgentRuntimeError> {
965        let mut inner = timed_lock(&self.inner, "Deduplicator::check");
966        let now = Instant::now();
967
968        // Lazy expiry: full scan every EXPIRY_INTERVAL calls.
969        const EXPIRY_INTERVAL: u64 = 64;
970        inner.call_count = inner.call_count.wrapping_add(1);
971        if inner.call_count % EXPIRY_INTERVAL == 0 {
972            inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
973            inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
974        }
975
976        match inner.cache.get(key) {
977            Some((result, ts)) if now.duration_since(*ts) < ttl => {
978                return Ok(DeduplicationResult::Cached(result.clone()));
979            }
980            Some(_) => {
981                inner.cache.remove(key);
982            }
983            None => {}
984        }
985        match inner.in_flight.get(key) {
986            Some(ts) if now.duration_since(*ts) < ttl => {
987                return Ok(DeduplicationResult::InProgress);
988            }
989            Some(_) => {
990                inner.in_flight.remove(key);
991            }
992            None => {}
993        }
994
995        inner.in_flight.insert(key.to_owned(), now);
996        Ok(DeduplicationResult::New)
997    }
998
999    /// Check deduplication state for multiple keys at once.
1000    ///
1001    /// Returns results in the same order as `requests`.
1002    /// Each entry is `(key, ttl)` — same signature as `check`.
1003    ///
1004    /// Acquires the internal mutex **once** for the entire batch, avoiding the
1005    /// per-key lock overhead of calling `check` in a loop.
1006    pub fn dedup_many(
1007        &self,
1008        requests: &[(&str, std::time::Duration)],
1009    ) -> Result<Vec<DeduplicationResult>, AgentRuntimeError> {
1010        if requests.is_empty() {
1011            return Ok(Vec::new());
1012        }
1013        let mut inner = timed_lock(&self.inner, "Deduplicator::dedup_many");
1014        let now = std::time::Instant::now();
1015        let mut results = Vec::with_capacity(requests.len());
1016
1017        for &(key, ttl) in requests {
1018            // Expire stale entries using this request's TTL.
1019            inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1020            inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
1021
1022            let result = if let Some((cached_result, _)) = inner.cache.get(key) {
1023                DeduplicationResult::Cached(cached_result.clone())
1024            } else if inner.in_flight.contains_key(key) {
1025                DeduplicationResult::InProgress
1026            } else {
1027                inner.in_flight.insert(key.to_owned(), now);
1028                DeduplicationResult::New
1029            };
1030            results.push(result);
1031        }
1032
1033        Ok(results)
1034    }
1035
1036    /// Complete a request: move from in-flight to cached with the given result.
1037    ///
1038    /// If `max_entries` is configured and the cache is full, the oldest cached
1039    /// entry (by insertion time) is evicted before the new one is stored.
1040    pub fn complete(&self, key: &str, result: impl Into<String>) -> Result<(), AgentRuntimeError> {
1041        let mut inner = timed_lock(&self.inner, "Deduplicator::complete");
1042        inner.in_flight.remove(key);
1043
1044        // Enforce max_entries cap: evict via insertion-ordered VecDeque (O(1) amortised).
1045        // Ghost entries (already expired by a prior `retain`) are skipped by looping.
1046        if let Some(max) = self.max_entries {
1047            while inner.cache.len() >= max {
1048                match inner.cache_order.pop_front() {
1049                    Some(oldest_key) => {
1050                        inner.cache.remove(&oldest_key);
1051                    }
1052                    None => break,
1053                }
1054            }
1055        }
1056
1057        let owned_key = key.to_owned();
1058        inner.cache_order.push_back(owned_key.clone());
1059        inner.cache.insert(owned_key, (result.into(), Instant::now()));
1060        Ok(())
1061    }
1062
1063    /// Remove a key from in-flight tracking without caching a result.
1064    ///
1065    /// Call this when an in-flight operation fails so that subsequent callers
1066    /// are not permanently blocked by a stuck `InProgress` entry for the full TTL.
1067    pub fn fail(&self, key: &str) -> Result<(), AgentRuntimeError> {
1068        let mut inner = timed_lock(&self.inner, "Deduplicator::fail");
1069        inner.in_flight.remove(key);
1070        Ok(())
1071    }
1072
1073    /// Return the number of keys currently in-flight (not yet completed or failed).
1074    pub fn in_flight_count(&self) -> Result<usize, AgentRuntimeError> {
1075        let inner = timed_lock(&self.inner, "Deduplicator::in_flight_count");
1076        Ok(inner.in_flight.len())
1077    }
1078
1079    /// Return a snapshot of all keys currently in-flight.
1080    pub fn in_flight_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1081        let inner = timed_lock(&self.inner, "Deduplicator::in_flight_keys");
1082        Ok(inner.in_flight.keys().cloned().collect())
1083    }
1084
1085    /// Return the number of keys currently in the completed result cache.
1086    ///
1087    /// Note: expired entries are only removed lazily on the next `check*` call.
1088    pub fn cached_count(&self) -> Result<usize, AgentRuntimeError> {
1089        let inner = timed_lock(&self.inner, "Deduplicator::cached_count");
1090        Ok(inner.cache.len())
1091    }
1092
1093    /// Return a snapshot of all keys that have cached results.
1094    ///
1095    /// Expired entries are included (they are removed lazily).  Use
1096    /// [`purge_expired`] first for a clean list of live keys.
1097    ///
1098    /// [`purge_expired`]: Deduplicator::purge_expired
1099    pub fn cached_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1100        let inner = timed_lock(&self.inner, "Deduplicator::cached_keys");
1101        Ok(inner.cache.keys().cloned().collect())
1102    }
1103
1104    /// Return the configured time-to-live for cached results.
1105    pub fn ttl(&self) -> Duration {
1106        self.ttl
1107    }
1108
1109    /// Return the configured maximum number of cached entries, if any.
1110    ///
1111    /// Returns `None` if no cap was set via [`with_max_entries`].
1112    ///
1113    /// [`with_max_entries`]: Deduplicator::with_max_entries
1114    pub fn max_entries(&self) -> Option<usize> {
1115        self.max_entries
1116    }
1117
1118    /// Return `true` if there are no in-flight requests.
1119    pub fn is_idle(&self) -> Result<bool, AgentRuntimeError> {
1120        let inner = timed_lock(&self.inner, "Deduplicator::is_idle");
1121        Ok(inner.in_flight.is_empty())
1122    }
1123
1124    /// Return the total number of items tracked by the deduplicator
1125    /// (in-flight + cached results, regardless of TTL expiry).
1126    pub fn total_count(&self) -> Result<usize, AgentRuntimeError> {
1127        let inner = timed_lock(&self.inner, "Deduplicator::total_count");
1128        Ok(inner.in_flight.len() + inner.cache.len())
1129    }
1130
1131    /// Return `true` if `key` is currently in-flight or has a cached result.
1132    ///
1133    /// Unlike [`check_and_register`] this is a read-only inspection — it does
1134    /// not register the key or consume a deduplication slot.
1135    ///
1136    /// [`check_and_register`]: Deduplicator::check_and_register
1137    pub fn contains(&self, key: &str) -> Result<bool, AgentRuntimeError> {
1138        let inner = timed_lock(&self.inner, "Deduplicator::contains");
1139        Ok(inner.in_flight.contains_key(key) || inner.cache.contains_key(key))
1140    }
1141
1142    /// Return the cached result for `key` if one exists and has not expired.
1143    ///
1144    /// Returns `None` when the key is not in the cache (either not yet
1145    /// completed or already expired).  Does not modify any state.
1146    pub fn get_result(&self, key: &str) -> Result<Option<String>, AgentRuntimeError> {
1147        let inner = timed_lock(&self.inner, "Deduplicator::get_result");
1148        let ttl = self.ttl;
1149        let now = std::time::Instant::now();
1150        Ok(inner.cache.get(key).and_then(|(result, inserted_at)| {
1151            if now.duration_since(*inserted_at) <= ttl {
1152                Some(result.clone())
1153            } else {
1154                None
1155            }
1156        }))
1157    }
1158
1159    /// Remove all in-flight entries and cached results.
1160    ///
1161    /// Useful for test teardown or hard resets.
1162    pub fn clear(&self) -> Result<(), AgentRuntimeError> {
1163        let mut inner = timed_lock(&self.inner, "Deduplicator::clear");
1164        inner.cache.clear();
1165        inner.in_flight.clear();
1166        inner.cache_order.clear();
1167        Ok(())
1168    }
1169
1170    /// Eagerly evict all cache entries whose TTL has elapsed.
1171    ///
1172    /// Under normal operation expired entries are removed lazily on the next
1173    /// `check*` call.  Call `purge_expired` for deterministic memory reclamation
1174    /// (e.g. before a `cached_count` snapshot or in a maintenance loop).
1175    ///
1176    /// Returns the number of entries that were removed.
1177    pub fn purge_expired(&self) -> Result<usize, AgentRuntimeError> {
1178        let mut inner = timed_lock(&self.inner, "Deduplicator::purge_expired");
1179        let ttl = self.ttl;
1180        let now = std::time::Instant::now();
1181        let before = inner.cache.len();
1182        inner.cache.retain(|_, (_, inserted_at)| {
1183            now.duration_since(*inserted_at) <= ttl
1184        });
1185        let removed = before - inner.cache.len();
1186        // Rebuild cache_order to drop ghost entries (keys purged from cache but
1187        // still referenced in the VecDeque).
1188        if removed > 0 {
1189            let live_keys: std::collections::HashSet<String> =
1190                inner.cache.keys().cloned().collect();
1191            inner.cache_order.retain(|k| live_keys.contains(k));
1192        }
1193        Ok(removed)
1194    }
1195
1196    /// Remove the oldest cached result entry (FIFO order).
1197    ///
1198    /// Returns `true` if an entry was removed, `false` if the cache was empty.
1199    pub fn evict_oldest(&self) -> Result<bool, AgentRuntimeError> {
1200        let mut inner = timed_lock(&self.inner, "Deduplicator::evict_oldest");
1201        while let Some(key) = inner.cache_order.pop_front() {
1202            if inner.cache.remove(&key).is_some() {
1203                return Ok(true);
1204            }
1205        }
1206        Ok(false)
1207    }
1208}
1209
1210// ── BackpressureGuard ─────────────────────────────────────────────────────────
1211
1212/// Tracks in-flight work count and enforces a capacity limit.
1213///
1214/// ## Guarantees
1215/// - Thread-safe via `Arc<Mutex<_>>`
1216/// - `try_acquire` is non-blocking
1217/// - `release` decrements the counter; no-op if counter is already 0
1218/// - Optional soft limit emits a warning when depth reaches the threshold
1219#[derive(Debug, Clone)]
1220pub struct BackpressureGuard {
1221    capacity: usize,
1222    soft_capacity: Option<usize>,
1223    inner: Arc<Mutex<usize>>,
1224}
1225
1226impl BackpressureGuard {
1227    /// Create a new guard with the given capacity.
1228    ///
1229    /// # Returns
1230    /// - `Ok(BackpressureGuard)` — on success
1231    /// - `Err(AgentRuntimeError::Orchestration)` — if `capacity == 0`
1232    pub fn new(capacity: usize) -> Result<Self, AgentRuntimeError> {
1233        if capacity == 0 {
1234            return Err(AgentRuntimeError::Orchestration(
1235                "BackpressureGuard capacity must be > 0".into(),
1236            ));
1237        }
1238        Ok(Self {
1239            capacity,
1240            soft_capacity: None,
1241            inner: Arc::new(Mutex::new(0)),
1242        })
1243    }
1244
1245    /// Set a soft capacity threshold. When depth reaches this level, a warning
1246    /// is logged but the request is still accepted (up to hard capacity).
1247    pub fn with_soft_limit(mut self, soft: usize) -> Result<Self, AgentRuntimeError> {
1248        if soft >= self.capacity {
1249            return Err(AgentRuntimeError::Orchestration(
1250                "soft_capacity must be less than hard capacity".into(),
1251            ));
1252        }
1253        self.soft_capacity = Some(soft);
1254        Ok(self)
1255    }
1256
1257    /// Try to acquire a slot.
1258    ///
1259    /// Emits a warning when the soft limit is reached (if configured), but
1260    /// still accepts the request until hard capacity is exceeded.
1261    ///
1262    /// # Returns
1263    /// - `Ok(())` — slot acquired
1264    /// - `Err(AgentRuntimeError::BackpressureShed)` — hard capacity exceeded
1265    pub fn try_acquire(&self) -> Result<(), AgentRuntimeError> {
1266        let mut depth = timed_lock(&self.inner, "BackpressureGuard::try_acquire");
1267        if *depth >= self.capacity {
1268            return Err(AgentRuntimeError::BackpressureShed {
1269                depth: *depth,
1270                capacity: self.capacity,
1271            });
1272        }
1273        *depth += 1;
1274        if let Some(soft) = self.soft_capacity {
1275            if *depth >= soft {
1276                tracing::warn!(
1277                    depth = *depth,
1278                    soft_capacity = soft,
1279                    hard_capacity = self.capacity,
1280                    "backpressure approaching hard limit"
1281                );
1282            }
1283        }
1284        Ok(())
1285    }
1286
1287    /// Release a previously acquired slot.
1288    pub fn release(&self) -> Result<(), AgentRuntimeError> {
1289        let mut depth = timed_lock(&self.inner, "BackpressureGuard::release");
1290        *depth = depth.saturating_sub(1);
1291        Ok(())
1292    }
1293
1294    /// Reset the current depth to zero.
1295    ///
1296    /// Useful in tests or after a controlled shutdown when all in-flight
1297    /// requests have been cancelled and the guard should start fresh.
1298    pub fn reset(&self) {
1299        let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset");
1300        *depth = 0;
1301    }
1302
1303    /// Return `true` if the guard is at or over its hard capacity.
1304    pub fn is_full(&self) -> Result<bool, AgentRuntimeError> {
1305        Ok(self.depth()? >= self.capacity)
1306    }
1307
1308    /// Return `true` if no slots are currently in use.
1309    pub fn is_empty(&self) -> Result<bool, AgentRuntimeError> {
1310        Ok(self.depth()? == 0)
1311    }
1312
1313    /// Return the number of additional request slots available before the hard cap.
1314    pub fn available_capacity(&self) -> Result<usize, AgentRuntimeError> {
1315        Ok(self.capacity.saturating_sub(self.depth()?))
1316    }
1317
1318    /// Return the hard capacity (maximum concurrent slots) configured for this guard.
1319    pub fn hard_capacity(&self) -> usize {
1320        self.capacity
1321    }
1322
1323    /// Return the soft capacity limit if one was configured, or `None`.
1324    pub fn soft_limit(&self) -> Option<usize> {
1325        self.soft_capacity
1326    }
1327
1328    /// Return `true` if a soft capacity limit has been configured.
1329    ///
1330    /// Equivalent to `self.soft_limit().is_some()` but more readable at call
1331    /// sites that only need a boolean check.
1332    pub fn is_soft_limited(&self) -> bool {
1333        self.soft_capacity.is_some()
1334    }
1335
1336    /// Return the current depth.
1337    pub fn depth(&self) -> Result<usize, AgentRuntimeError> {
1338        let depth = timed_lock(&self.inner, "BackpressureGuard::depth");
1339        Ok(*depth)
1340    }
1341
1342    /// Return the current depth as a percentage of the hard capacity.
1343    ///
1344    /// Returns a value in `[0.0, 100.0]`.  When `depth > capacity` (which
1345    /// cannot happen in normal operation) the result is clamped to `100.0`.
1346    pub fn percent_full(&self) -> Result<f64, AgentRuntimeError> {
1347        let depth = self.depth()?;
1348        Ok((depth as f64 / self.capacity as f64 * 100.0).min(100.0))
1349    }
1350
1351    /// Return the ratio of current depth to soft capacity as a value in `[0.0, ∞)`.
1352    ///
1353    /// Returns `0.0` if no soft limit has been configured.
1354    /// Values above `1.0` mean the soft limit has been exceeded.
1355    pub fn soft_depth_ratio(&self) -> f32 {
1356        match self.soft_capacity {
1357            None => 0.0,
1358            Some(soft) => {
1359                let depth = timed_lock(&self.inner, "BackpressureGuard::soft_depth_ratio");
1360                *depth as f32 / soft as f32
1361            }
1362        }
1363    }
1364
1365    /// Return the fraction of the hard capacity currently in use: `depth / capacity`.
1366    ///
1367    /// Returns `0.0` when no slots are in use, `1.0` when fully saturated.
1368    pub fn utilization_ratio(&self) -> Result<f32, AgentRuntimeError> {
1369        if self.capacity == 0 {
1370            return Ok(0.0);
1371        }
1372        let depth = self.depth()?;
1373        Ok(depth as f32 / self.capacity as f32)
1374    }
1375
1376    /// Return the number of additional slots that can be acquired before hitting
1377    /// the hard capacity limit.
1378    ///
1379    /// Returns `0` when the guard is full.
1380    pub fn remaining_capacity(&self) -> Result<usize, AgentRuntimeError> {
1381        let depth = self.depth()?;
1382        Ok(self.capacity.saturating_sub(depth))
1383    }
1384
1385    /// Force the in-flight depth counter to zero.
1386    ///
1387    /// Useful for test teardown or hard resets where acquired slots will never
1388    /// be released normally (e.g., after a test panics before calling `release`).
1389    pub fn reset_depth(&self) -> Result<(), AgentRuntimeError> {
1390        let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset_depth");
1391        *depth = 0;
1392        Ok(())
1393    }
1394
1395    /// Return the fraction of capacity that is still available, in `[0.0, 1.0]`.
1396    ///
1397    /// `1.0` means completely empty; `0.0` means at full capacity.
1398    pub fn headroom_ratio(&self) -> Result<f64, AgentRuntimeError> {
1399        Ok(self.available_capacity()? as f64 / self.capacity as f64)
1400    }
1401
1402    /// Return the number of currently held (acquired) slots.
1403    ///
1404    /// Equivalent to `capacity - available_capacity()`.
1405    pub fn acquired_count(&self) -> Result<usize, AgentRuntimeError> {
1406        Ok(self.capacity - self.available_capacity()?)
1407    }
1408
1409    /// Return `true` if the current depth exceeds the configured soft limit.
1410    ///
1411    /// Returns `false` if no soft limit is set.
1412    pub fn over_soft_limit(&self) -> Result<bool, AgentRuntimeError> {
1413        let soft = match self.soft_limit() {
1414            Some(s) => s,
1415            None => return Ok(false),
1416        };
1417        Ok(self.depth()? > soft)
1418    }
1419}
1420
1421// ── Pipeline ──────────────────────────────────────────────────────────────────
1422
1423/// Result of executing a pipeline, including per-stage timing.
1424#[derive(Debug)]
1425pub struct PipelineResult {
1426    /// Final output value after all stages.
1427    pub output: String,
1428    /// Per-stage timing: `(stage_name, duration_ms)` in execution order.
1429    pub stage_timings: Vec<(String, u64)>,
1430}
1431
1432/// A single named stage in the pipeline.
1433pub struct Stage {
1434    /// Human-readable name used in log output and error messages.
1435    pub name: String,
1436    /// The transform function; receives the current string and returns the transformed string.
1437    pub handler: Box<dyn Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync>,
1438}
1439
1440impl std::fmt::Debug for Stage {
1441    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1442        f.debug_struct("Stage").field("name", &self.name).finish()
1443    }
1444}
1445
1446/// Error handler callback type for pipeline stage failures.
1447type StageErrorHandler = Box<dyn Fn(&str, &str) -> String + Send + Sync>;
1448
1449/// A composable pipeline that passes a string through a sequence of named stages.
1450///
1451/// ## Guarantees
1452/// - Stages execute in insertion order
1453/// - First stage failure short-circuits remaining stages (unless an error handler is set)
1454/// - Non-panicking
1455pub struct Pipeline {
1456    stages: Vec<Stage>,
1457    error_handler: Option<StageErrorHandler>,
1458}
1459
1460impl std::fmt::Debug for Pipeline {
1461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462        f.debug_struct("Pipeline")
1463            .field("stages", &self.stages)
1464            .field("has_error_handler", &self.error_handler.is_some())
1465            .finish()
1466    }
1467}
1468
1469impl Pipeline {
1470    /// Create a new empty pipeline.
1471    pub fn new() -> Self {
1472        Self { stages: Vec::new(), error_handler: None }
1473    }
1474
1475    /// Attach a recovery callback for stage failures.
1476    ///
1477    /// When a stage fails, `handler(stage_name, error_message)` is called.
1478    /// The returned string becomes the input to the next stage.
1479    /// If no handler is set, stage failures propagate as errors.
1480    pub fn with_error_handler(
1481        mut self,
1482        handler: impl Fn(&str, &str) -> String + Send + Sync + 'static,
1483    ) -> Self {
1484        self.error_handler = Some(Box::new(handler));
1485        self
1486    }
1487
1488    /// Append a stage to the pipeline.
1489    pub fn add_stage(
1490        mut self,
1491        name: impl Into<String>,
1492        handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1493    ) -> Self {
1494        self.stages.push(Stage {
1495            name: name.into(),
1496            handler: Box::new(handler),
1497        });
1498        self
1499    }
1500
1501    /// Insert a stage at the **front** of the pipeline (index 0).
1502    ///
1503    /// All existing stages are shifted to higher indices.  The pipeline's
1504    /// stage names remain unique only if the caller ensures uniqueness.
1505    pub fn prepend_stage(
1506        mut self,
1507        name: impl Into<String>,
1508        handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1509    ) -> Self {
1510        self.stages.insert(0, Stage {
1511            name: name.into(),
1512            handler: Box::new(handler),
1513        });
1514        self
1515    }
1516
1517    /// Return `true` if the pipeline has no stages.
1518    pub fn is_empty(&self) -> bool {
1519        self.stages.is_empty()
1520    }
1521
1522    /// Return `true` if a stage error handler has been configured via
1523    /// [`with_error_handler`].
1524    ///
1525    /// [`with_error_handler`]: Pipeline::with_error_handler
1526    pub fn has_error_handler(&self) -> bool {
1527        self.error_handler.is_some()
1528    }
1529
1530    /// Return the number of stages in the pipeline.
1531    pub fn stage_count(&self) -> usize {
1532        self.stages.len()
1533    }
1534
1535    /// Return `true` if a stage with the given name is registered.
1536    pub fn has_stage(&self, name: &str) -> bool {
1537        self.stages.iter().any(|s| s.name == name)
1538    }
1539
1540    /// Return the names of all stages in execution order.
1541    pub fn stage_names(&self) -> Vec<&str> {
1542        self.stages.iter().map(|s| s.name.as_str()).collect()
1543    }
1544
1545    /// Return the names of all stages as owned `String`s.
1546    ///
1547    /// Unlike [`stage_names`] this does not borrow `self`, making it easier to
1548    /// use the result after `self` is moved or mutated.
1549    ///
1550    /// [`stage_names`]: Pipeline::stage_names
1551    pub fn stage_names_owned(&self) -> Vec<String> {
1552        self.stages.iter().map(|s| s.name.clone()).collect()
1553    }
1554
1555    /// Return the name of the stage at zero-based `index`, or `None` if out of bounds.
1556    pub fn get_stage_name_at(&self, index: usize) -> Option<&str> {
1557        self.stages.get(index).map(|s| s.name.as_str())
1558    }
1559
1560    /// Return the zero-based index of the first stage with the given name.
1561    ///
1562    /// Returns `None` if no stage with that name exists.
1563    pub fn stage_index(&self, name: &str) -> Option<usize> {
1564        self.stages.iter().position(|s| s.name == name)
1565    }
1566
1567    /// Return the name of the first stage in the pipeline, or `None` if empty.
1568    pub fn first_stage_name(&self) -> Option<&str> {
1569        self.stages.first().map(|s| s.name.as_str())
1570    }
1571
1572    /// Return the name of the last stage in the pipeline, or `None` if empty.
1573    pub fn last_stage_name(&self) -> Option<&str> {
1574        self.stages.last().map(|s| s.name.as_str())
1575    }
1576
1577    /// Remove the first stage whose name equals `name`.
1578    ///
1579    /// Returns `true` if a stage was found and removed, `false` if no stage
1580    /// with that name was registered.
1581    pub fn remove_stage(&mut self, name: &str) -> bool {
1582        if let Some(pos) = self.stages.iter().position(|s| s.name == name) {
1583            self.stages.remove(pos);
1584            true
1585        } else {
1586            false
1587        }
1588    }
1589
1590    /// Rename the first stage whose name equals `old_name` to `new_name`.
1591    ///
1592    /// Returns `true` if a stage was found and renamed, `false` if no stage
1593    /// with `old_name` exists.
1594    pub fn rename_stage(&mut self, old_name: &str, new_name: impl Into<String>) -> bool {
1595        if let Some(stage) = self.stages.iter_mut().find(|s| s.name == old_name) {
1596            stage.name = new_name.into();
1597            true
1598        } else {
1599            false
1600        }
1601    }
1602
1603    /// Remove all stages from the pipeline.
1604    ///
1605    /// The error handler (if any) is preserved; only the stage list is cleared.
1606    pub fn clear(&mut self) {
1607        self.stages.clear();
1608    }
1609
1610    /// Return the number of stages whose name contains `keyword` (case-insensitive).
1611    pub fn count_stages_matching(&self, keyword: &str) -> usize {
1612        let kw = keyword.to_ascii_lowercase();
1613        self.stages
1614            .iter()
1615            .filter(|s| s.name.to_ascii_lowercase().contains(&kw))
1616            .count()
1617    }
1618
1619    /// Swap the positions of two stages by name.
1620    ///
1621    /// Returns `true` if both stages were found and swapped.  Returns `false`
1622    /// if either name is not present in the pipeline (no state change).
1623    pub fn swap_stages(&mut self, a: &str, b: &str) -> bool {
1624        let idx_a = self.stages.iter().position(|s| s.name == a);
1625        let idx_b = self.stages.iter().position(|s| s.name == b);
1626        match (idx_a, idx_b) {
1627            (Some(i), Some(j)) => {
1628                self.stages.swap(i, j);
1629                true
1630            }
1631            _ => false,
1632        }
1633    }
1634
1635    /// Execute the pipeline, passing `input` through each stage in order.
1636    #[tracing::instrument(skip(self))]
1637    pub fn run(&self, input: String) -> Result<String, AgentRuntimeError> {
1638        let mut current = input;
1639        for stage in &self.stages {
1640            tracing::debug!(stage = %stage.name, "running pipeline stage");
1641            match (stage.handler)(current) {
1642                Ok(out) => current = out,
1643                Err(e) => {
1644                    tracing::error!(stage = %stage.name, error = %e, "pipeline stage failed");
1645                    if let Some(ref handler) = self.error_handler {
1646                        current = handler(&stage.name, &e.to_string());
1647                    } else {
1648                        return Err(e);
1649                    }
1650                }
1651            }
1652        }
1653        Ok(current)
1654    }
1655
1656    /// Execute the pipeline with per-stage timing.
1657    ///
1658    /// Returns a [`PipelineResult`] whose `stage_timings` contains
1659    /// `(stage_name, duration_ms)` pairs in execution order.
1660    pub fn execute_timed(&self, input: String) -> Result<PipelineResult, AgentRuntimeError> {
1661        let mut current = input;
1662        let mut stage_timings = Vec::new();
1663        for stage in &self.stages {
1664            let start = std::time::Instant::now();
1665            tracing::debug!(stage = %stage.name, "running timed pipeline stage");
1666            match (stage.handler)(current) {
1667                Ok(out) => current = out,
1668                Err(e) => {
1669                    tracing::error!(stage = %stage.name, error = %e, "timed pipeline stage failed");
1670                    if let Some(ref handler) = self.error_handler {
1671                        current = handler(&stage.name, &e.to_string());
1672                    } else {
1673                        return Err(e);
1674                    }
1675                }
1676            }
1677            let duration_ms = start.elapsed().as_millis() as u64;
1678            stage_timings.push((stage.name.clone(), duration_ms));
1679        }
1680        Ok(PipelineResult {
1681            output: current,
1682            stage_timings,
1683        })
1684    }
1685
1686}
1687
1688impl Default for Pipeline {
1689    fn default() -> Self {
1690        Self::new()
1691    }
1692}
1693
1694// ── Tests ─────────────────────────────────────────────────────────────────────
1695
1696#[cfg(test)]
1697mod tests {
1698    use super::*;
1699
1700    // ── RetryPolicy ───────────────────────────────────────────────────────────
1701
1702    #[test]
1703    fn test_retry_policy_rejects_zero_attempts() {
1704        assert!(RetryPolicy::exponential(0, 100).is_err());
1705    }
1706
1707    #[test]
1708    fn test_retry_policy_delay_attempt_1_equals_base() {
1709        let p = RetryPolicy::exponential(3, 100).unwrap();
1710        assert_eq!(p.delay_for(1), Duration::from_millis(100));
1711    }
1712
1713    #[test]
1714    fn test_retry_policy_delay_doubles_each_attempt() {
1715        let p = RetryPolicy::exponential(5, 100).unwrap();
1716        assert_eq!(p.delay_for(2), Duration::from_millis(200));
1717        assert_eq!(p.delay_for(3), Duration::from_millis(400));
1718        assert_eq!(p.delay_for(4), Duration::from_millis(800));
1719    }
1720
1721    #[test]
1722    fn test_retry_policy_delay_capped_at_max() {
1723        let p = RetryPolicy::exponential(10, 10_000).unwrap();
1724        assert_eq!(p.delay_for(10), MAX_RETRY_DELAY);
1725    }
1726
1727    #[test]
1728    fn test_retry_policy_delay_never_exceeds_max_for_any_attempt() {
1729        let p = RetryPolicy::exponential(10, 1000).unwrap();
1730        for attempt in 1..=10 {
1731            assert!(p.delay_for(attempt) <= MAX_RETRY_DELAY);
1732        }
1733    }
1734
1735    // ── Round 26: first_delay_ms ──────────────────────────────────────────────
1736
1737    #[test]
1738    fn test_retry_policy_first_delay_ms_equals_base_delay() {
1739        let p = RetryPolicy::exponential(3, 200).unwrap();
1740        assert_eq!(p.first_delay_ms(), p.base_delay_ms());
1741    }
1742
1743    #[test]
1744    fn test_retry_policy_first_delay_ms_constant_policy() {
1745        let p = RetryPolicy::constant(4, 150).unwrap();
1746        assert_eq!(p.first_delay_ms(), 150);
1747    }
1748
1749    // ── CircuitBreaker ────────────────────────────────────────────────────────
1750
1751    #[test]
1752    fn test_circuit_breaker_rejects_zero_threshold() {
1753        assert!(CircuitBreaker::new("svc", 0, Duration::from_secs(1)).is_err());
1754    }
1755
1756    #[test]
1757    fn test_circuit_breaker_starts_closed() {
1758        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
1759        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
1760    }
1761
1762    #[test]
1763    fn test_circuit_breaker_success_keeps_closed() {
1764        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
1765        let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(42));
1766        assert!(result.is_ok());
1767        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
1768    }
1769
1770    #[test]
1771    fn test_circuit_breaker_opens_after_threshold_failures() {
1772        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
1773        for _ in 0..3 {
1774            let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("oops".to_string()));
1775        }
1776        assert!(matches!(cb.state().unwrap(), CircuitState::Open { .. }));
1777    }
1778
1779    #[test]
1780    fn test_circuit_breaker_open_fast_fails() {
1781        let cb = CircuitBreaker::new("svc", 1, Duration::from_secs(3600)).unwrap();
1782        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1783        let result: Result<(), AgentRuntimeError> = cb.call(|| Ok::<(), AgentRuntimeError>(()));
1784        assert!(matches!(result, Err(AgentRuntimeError::CircuitOpen { .. })));
1785    }
1786
1787    #[test]
1788    fn test_circuit_breaker_success_resets_failure_count() {
1789        let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap();
1790        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1791        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1792        let _: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(1));
1793        assert_eq!(cb.failure_count().unwrap(), 0);
1794    }
1795
1796    #[test]
1797    fn test_circuit_breaker_half_open_on_recovery() {
1798        // Use a zero recovery window to immediately go half-open
1799        let cb = CircuitBreaker::new("svc", 1, Duration::ZERO).unwrap();
1800        let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1801        // After recovery window, next call should probe (half-open → closed on success)
1802        let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(99));
1803        assert_eq!(result.unwrap_or(0), 99);
1804        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
1805    }
1806
1807    #[test]
1808    fn test_circuit_breaker_with_custom_backend_uses_backend_state() {
1809        // Build a custom backend and share it between two circuit breakers
1810        // to verify that state is read from and written to the backend.
1811        let shared_backend: Arc<dyn CircuitBreakerBackend> =
1812            Arc::new(InMemoryCircuitBreakerBackend::new());
1813
1814        let cb1 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
1815            .unwrap()
1816            .with_backend(Arc::clone(&shared_backend));
1817
1818        let cb2 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
1819            .unwrap()
1820            .with_backend(Arc::clone(&shared_backend));
1821
1822        // Trigger one failure via cb1
1823        let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail".to_string()));
1824
1825        // cb2 should observe the failure recorded by cb1
1826        assert_eq!(cb2.failure_count().unwrap(), 1);
1827
1828        // Trigger the second failure to open the circuit via cb1
1829        let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail again".to_string()));
1830
1831        // cb2 should now see the circuit as open
1832        assert!(matches!(cb2.state().unwrap(), CircuitState::Open { .. }));
1833    }
1834
1835    #[test]
1836    fn test_in_memory_backend_increments_and_resets() {
1837        use super::CircuitBreakerBackend as CB;
1838        let backend = InMemoryCircuitBreakerBackend::new();
1839
1840        assert_eq!(CB::get_failures(&backend, "svc"), 0);
1841
1842        let count = CB::increment_failures(&backend, "svc");
1843        assert_eq!(count, 1);
1844
1845        let count = CB::increment_failures(&backend, "svc");
1846        assert_eq!(count, 2);
1847
1848        CB::reset_failures(&backend, "svc");
1849        assert_eq!(CB::get_failures(&backend, "svc"), 0);
1850
1851        // open_at round-trip
1852        assert!(CB::get_open_at(&backend, "svc").is_none());
1853        let now = Instant::now();
1854        CB::set_open_at(&backend, "svc", now);
1855        assert!(CB::get_open_at(&backend, "svc").is_some());
1856        CB::clear_open_at(&backend, "svc");
1857        assert!(CB::get_open_at(&backend, "svc").is_none());
1858    }
1859
1860    // ── Deduplicator ──────────────────────────────────────────────────────────
1861
1862    #[test]
1863    fn test_deduplicator_new_key_is_new() {
1864        let d = Deduplicator::new(Duration::from_secs(60));
1865        let r = d.check_and_register("key-1").unwrap();
1866        assert_eq!(r, DeduplicationResult::New);
1867    }
1868
1869    #[test]
1870    fn test_deduplicator_second_check_is_in_progress() {
1871        let d = Deduplicator::new(Duration::from_secs(60));
1872        d.check_and_register("key-1").unwrap();
1873        let r = d.check_and_register("key-1").unwrap();
1874        assert_eq!(r, DeduplicationResult::InProgress);
1875    }
1876
1877    #[test]
1878    fn test_deduplicator_complete_makes_cached() {
1879        let d = Deduplicator::new(Duration::from_secs(60));
1880        d.check_and_register("key-1").unwrap();
1881        d.complete("key-1", "result-value").unwrap();
1882        let r = d.check_and_register("key-1").unwrap();
1883        assert_eq!(r, DeduplicationResult::Cached("result-value".into()));
1884    }
1885
1886    #[test]
1887    fn test_deduplicator_different_keys_are_independent() {
1888        let d = Deduplicator::new(Duration::from_secs(60));
1889        d.check_and_register("key-a").unwrap();
1890        let r = d.check_and_register("key-b").unwrap();
1891        assert_eq!(r, DeduplicationResult::New);
1892    }
1893
1894    #[test]
1895    fn test_deduplicator_expired_entry_is_new() {
1896        let d = Deduplicator::new(Duration::ZERO); // instant TTL
1897        d.check_and_register("key-1").unwrap();
1898        d.complete("key-1", "old").unwrap();
1899        // Immediately expired — should be New again
1900        let r = d.check_and_register("key-1").unwrap();
1901        assert_eq!(r, DeduplicationResult::New);
1902    }
1903
1904    // ── BackpressureGuard ─────────────────────────────────────────────────────
1905
1906    #[test]
1907    fn test_backpressure_guard_rejects_zero_capacity() {
1908        assert!(BackpressureGuard::new(0).is_err());
1909    }
1910
1911    #[test]
1912    fn test_backpressure_guard_acquire_within_capacity() {
1913        let g = BackpressureGuard::new(5).unwrap();
1914        assert!(g.try_acquire().is_ok());
1915        assert_eq!(g.depth().unwrap(), 1);
1916    }
1917
1918    #[test]
1919    fn test_backpressure_guard_sheds_when_full() {
1920        let g = BackpressureGuard::new(2).unwrap();
1921        g.try_acquire().unwrap();
1922        g.try_acquire().unwrap();
1923        let result = g.try_acquire();
1924        assert!(matches!(
1925            result,
1926            Err(AgentRuntimeError::BackpressureShed { .. })
1927        ));
1928    }
1929
1930    #[test]
1931    fn test_backpressure_guard_release_decrements_depth() {
1932        let g = BackpressureGuard::new(3).unwrap();
1933        g.try_acquire().unwrap();
1934        g.try_acquire().unwrap();
1935        g.release().unwrap();
1936        assert_eq!(g.depth().unwrap(), 1);
1937    }
1938
1939    #[test]
1940    fn test_backpressure_guard_release_on_empty_is_noop() {
1941        let g = BackpressureGuard::new(3).unwrap();
1942        g.release().unwrap(); // Should not fail
1943        assert_eq!(g.depth().unwrap(), 0);
1944    }
1945
1946    // ── Pipeline ──────────────────────────────────────────────────────────────
1947
1948    #[test]
1949    fn test_pipeline_runs_stages_in_order() {
1950        let p = Pipeline::new()
1951            .add_stage("upper", |s| Ok(s.to_uppercase()))
1952            .add_stage("append", |s| Ok(format!("{s}!")));
1953        let result = p.run("hello".into()).unwrap();
1954        assert_eq!(result, "HELLO!");
1955    }
1956
1957    #[test]
1958    fn test_pipeline_empty_pipeline_returns_input() {
1959        let p = Pipeline::new();
1960        assert_eq!(p.run("test".into()).unwrap(), "test");
1961    }
1962
1963    #[test]
1964    fn test_pipeline_stage_failure_short_circuits() {
1965        let p = Pipeline::new()
1966            .add_stage("fail", |_| {
1967                Err(AgentRuntimeError::Orchestration("boom".into()))
1968            })
1969            .add_stage("never", |s| Ok(s));
1970        assert!(p.run("input".into()).is_err());
1971    }
1972
1973    #[test]
1974    fn test_pipeline_stage_count() {
1975        let p = Pipeline::new()
1976            .add_stage("s1", |s| Ok(s))
1977            .add_stage("s2", |s| Ok(s));
1978        assert_eq!(p.stage_count(), 2);
1979    }
1980
1981    #[test]
1982    fn test_pipeline_execute_timed_captures_stage_durations() {
1983        let p = Pipeline::new()
1984            .add_stage("s1", |s| Ok(format!("{s}1")))
1985            .add_stage("s2", |s| Ok(format!("{s}2")));
1986        let result = p.execute_timed("x".to_string()).unwrap();
1987        assert_eq!(result.output, "x12");
1988        assert_eq!(result.stage_timings.len(), 2);
1989        assert_eq!(result.stage_timings[0].0, "s1");
1990        assert_eq!(result.stage_timings[1].0, "s2");
1991    }
1992
1993    // ── Item 13: BackpressureGuard soft limit ──────────────────────────────────
1994
1995    #[test]
1996    fn test_backpressure_soft_limit_rejects_invalid_config() {
1997        // soft >= capacity must be rejected
1998        let g = BackpressureGuard::new(5).unwrap();
1999        assert!(g.with_soft_limit(5).is_err());
2000        let g = BackpressureGuard::new(5).unwrap();
2001        assert!(g.with_soft_limit(6).is_err());
2002    }
2003
2004    #[test]
2005    fn test_backpressure_soft_limit_accepts_requests_below_soft() {
2006        let g = BackpressureGuard::new(5)
2007            .unwrap()
2008            .with_soft_limit(2)
2009            .unwrap();
2010        // Both acquires below soft limit should succeed
2011        assert!(g.try_acquire().is_ok());
2012        assert!(g.try_acquire().is_ok());
2013        assert_eq!(g.depth().unwrap(), 2);
2014    }
2015
2016    #[test]
2017    fn test_backpressure_with_soft_limit_still_sheds_at_hard_capacity() {
2018        let g = BackpressureGuard::new(3)
2019            .unwrap()
2020            .with_soft_limit(2)
2021            .unwrap();
2022        g.try_acquire().unwrap();
2023        g.try_acquire().unwrap();
2024        g.try_acquire().unwrap(); // reaches hard limit
2025        let result = g.try_acquire();
2026        assert!(matches!(
2027            result,
2028            Err(AgentRuntimeError::BackpressureShed { .. })
2029        ));
2030    }
2031
2032    // ── #4/#31 BackpressureGuard::hard_capacity ───────────────────────────────
2033
2034    #[test]
2035    fn test_backpressure_hard_capacity_matches_new() {
2036        let g = BackpressureGuard::new(7).unwrap();
2037        assert_eq!(g.hard_capacity(), 7);
2038    }
2039
2040    // ── #10 Pipeline::with_error_handler ──────────────────────────────────────
2041
2042    #[test]
2043    fn test_pipeline_error_handler_recovers_from_stage_failure() {
2044        let p = Pipeline::new()
2045            .add_stage("fail_stage", |_| {
2046                Err(AgentRuntimeError::Orchestration("oops".into()))
2047            })
2048            .add_stage("append", |s| Ok(format!("{s}-recovered")))
2049            .with_error_handler(|stage_name, _err| format!("recovered_from_{stage_name}"));
2050        let result = p.run("input".to_string()).unwrap();
2051        assert_eq!(result, "recovered_from_fail_stage-recovered");
2052    }
2053
2054    // ── #11/#32 CircuitState PartialEq/Eq ────────────────────────────────────
2055
2056    #[test]
2057    fn test_circuit_state_eq() {
2058        assert_eq!(CircuitState::Closed, CircuitState::Closed);
2059        assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
2060        assert_eq!(
2061            CircuitState::Open { opened_at: std::time::Instant::now() },
2062            CircuitState::Open { opened_at: std::time::Instant::now() }
2063        );
2064        assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
2065        assert_ne!(CircuitState::Closed, CircuitState::Open { opened_at: std::time::Instant::now() });
2066    }
2067
2068    // ── #18 Deduplicator::dedup_many ──────────────────────────────────────────
2069
2070    #[test]
2071    fn test_dedup_many_independent_keys() {
2072        let d = Deduplicator::new(Duration::from_secs(60));
2073        let ttl = Duration::from_secs(60);
2074        let results = d.dedup_many(&[("key-a", ttl), ("key-b", ttl), ("key-c", ttl)]).unwrap();
2075        assert_eq!(results.len(), 3);
2076        assert!(results.iter().all(|r| matches!(r, DeduplicationResult::New)));
2077    }
2078
2079    // ── Task 11: Concurrent CircuitBreaker state transition tests ─────────────
2080
2081    #[test]
2082    fn test_concurrent_circuit_breaker_opens_under_concurrent_failures() {
2083        use std::sync::Arc;
2084        use std::thread;
2085
2086        let cb = Arc::new(
2087            CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap(),
2088        );
2089        let n_threads = 8;
2090        let failures_per_thread = 2;
2091
2092        let mut handles = Vec::new();
2093        for _ in 0..n_threads {
2094            let cb = Arc::clone(&cb);
2095            handles.push(thread::spawn(move || {
2096                for _ in 0..failures_per_thread {
2097                    let _ = cb.call(|| Err::<(), &str>("fail"));
2098                }
2099            }));
2100        }
2101        for h in handles {
2102            h.join().unwrap();
2103        }
2104
2105        // After n_threads * failures_per_thread = 16 failures with threshold=5,
2106        // the circuit must be Open.
2107        let state = cb.state().unwrap();
2108        assert!(
2109            matches!(state, CircuitState::Open { .. }),
2110            "circuit should be open after many concurrent failures; got: {state:?}"
2111        );
2112    }
2113
2114    #[test]
2115    fn test_per_service_tracking_is_independent() {
2116        let backend = Arc::new(InMemoryCircuitBreakerBackend::new());
2117
2118        let cb_a = CircuitBreaker::new("service-a", 3, Duration::from_secs(60))
2119            .unwrap()
2120            .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2121        let cb_b = CircuitBreaker::new("service-b", 3, Duration::from_secs(60))
2122            .unwrap()
2123            .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2124
2125        // Fail service-a 3 times → opens
2126        for _ in 0..3 {
2127            let _ = cb_a.call(|| Err::<(), &str>("fail"));
2128        }
2129
2130        // service-b should still be Closed
2131        let state_b = cb_b.state().unwrap();
2132        assert_eq!(
2133            state_b,
2134            CircuitState::Closed,
2135            "service-b should be unaffected by service-a failures"
2136        );
2137
2138        // service-a should be Open
2139        let state_a = cb_a.state().unwrap();
2140        assert!(
2141            matches!(state_a, CircuitState::Open { .. }),
2142            "service-a should be open"
2143        );
2144    }
2145
2146    // ── Item 14: timed_lock concurrency correctness ───────────────────────────
2147
2148    #[test]
2149    fn test_backpressure_concurrent_acquires_are_consistent() {
2150        use std::sync::Arc;
2151        use std::thread;
2152
2153        let g = Arc::new(BackpressureGuard::new(100).unwrap());
2154        let mut handles = Vec::new();
2155
2156        for _ in 0..10 {
2157            let g_clone = Arc::clone(&g);
2158            handles.push(thread::spawn(move || {
2159                g_clone.try_acquire().ok();
2160            }));
2161        }
2162
2163        for h in handles {
2164            h.join().unwrap();
2165        }
2166
2167        // All 10 threads acquired a slot; depth must be exactly 10
2168        assert_eq!(g.depth().unwrap(), 10);
2169    }
2170
2171    // ── New API tests (Rounds 4-8) ────────────────────────────────────────────
2172
2173    #[test]
2174    fn test_retry_policy_constant_has_fixed_delay() {
2175        let p = RetryPolicy::constant(3, 100).unwrap();
2176        assert_eq!(p.delay_for(1), Duration::from_millis(100));
2177        assert_eq!(p.delay_for(2), Duration::from_millis(100));
2178        assert_eq!(p.delay_for(10), Duration::from_millis(100));
2179    }
2180
2181    #[test]
2182    fn test_retry_policy_exponential_doubles() {
2183        let p = RetryPolicy::exponential(5, 10).unwrap();
2184        assert_eq!(p.delay_for(1), Duration::from_millis(10));
2185        assert_eq!(p.delay_for(2), Duration::from_millis(20));
2186        assert_eq!(p.delay_for(3), Duration::from_millis(40));
2187    }
2188
2189    #[test]
2190    fn test_retry_policy_with_max_attempts() {
2191        let p = RetryPolicy::constant(3, 50).unwrap();
2192        let p2 = p.with_max_attempts(7).unwrap();
2193        assert_eq!(p2.max_attempts, 7);
2194        assert!(RetryPolicy::constant(1, 50).unwrap().with_max_attempts(0).is_err());
2195    }
2196
2197    #[test]
2198    fn test_circuit_breaker_reset_returns_to_closed() {
2199        let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2200        cb.record_failure();
2201        cb.record_failure(); // should open
2202        assert_ne!(cb.state().unwrap(), CircuitState::Closed);
2203        cb.reset();
2204        assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2205        assert_eq!(cb.failure_count().unwrap(), 0);
2206    }
2207
2208    #[test]
2209    fn test_deduplicator_clear_resets_all_state() {
2210        let d = Deduplicator::new(Duration::from_secs(60));
2211        d.check_and_register("k1").unwrap();
2212        d.check_and_register("k2").unwrap();
2213        d.complete("k1", "r1").unwrap();
2214        assert_eq!(d.in_flight_count().unwrap(), 1);
2215        assert_eq!(d.cached_count().unwrap(), 1);
2216        d.clear().unwrap();
2217        assert_eq!(d.in_flight_count().unwrap(), 0);
2218        assert_eq!(d.cached_count().unwrap(), 0);
2219    }
2220
2221    #[test]
2222    fn test_deduplicator_purge_expired_removes_stale() {
2223        let d = Deduplicator::new(Duration::from_millis(1));
2224        d.check_and_register("x").unwrap();
2225        d.complete("x", "result").unwrap();
2226        std::thread::sleep(Duration::from_millis(5));
2227        let removed = d.purge_expired().unwrap();
2228        assert_eq!(removed, 1);
2229        assert_eq!(d.cached_count().unwrap(), 0);
2230    }
2231
2232    #[test]
2233    fn test_backpressure_utilization_ratio() {
2234        let g = BackpressureGuard::new(4).unwrap();
2235        g.try_acquire().unwrap();
2236        g.try_acquire().unwrap();
2237        let ratio = g.utilization_ratio().unwrap();
2238        assert!((ratio - 0.5).abs() < 1e-5);
2239    }
2240
2241    #[test]
2242    fn test_pipeline_stage_count_and_names() {
2243        let p = Pipeline::new()
2244            .add_stage("first", |s| Ok(s + "1"))
2245            .add_stage("second", |s| Ok(s + "2"));
2246        assert_eq!(p.stage_count(), 2);
2247        assert_eq!(p.stage_names(), vec!["first", "second"]);
2248    }
2249
2250    #[test]
2251    fn test_pipeline_is_empty_true_for_new() {
2252        let p = Pipeline::new();
2253        assert!(p.is_empty());
2254    }
2255
2256    #[test]
2257    fn test_pipeline_is_empty_false_after_add_stage() {
2258        let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2259        assert!(!p.is_empty());
2260    }
2261
2262    #[test]
2263    fn test_circuit_breaker_service_name() {
2264        let cb = CircuitBreaker::new("my-service", 3, Duration::from_secs(1)).unwrap();
2265        assert_eq!(cb.service_name(), "my-service");
2266    }
2267
2268    #[test]
2269    fn test_retry_policy_none_has_max_one_attempt() {
2270        let p = RetryPolicy::none();
2271        assert_eq!(p.max_attempts, 1);
2272        assert_eq!(p.delay_for(0), Duration::ZERO);
2273    }
2274
2275    #[test]
2276    fn test_backpressure_is_full_false_when_empty() {
2277        let g = BackpressureGuard::new(5).unwrap();
2278        assert!(!g.is_full().unwrap());
2279    }
2280
2281    #[test]
2282    fn test_backpressure_is_full_true_when_at_capacity() {
2283        let g = BackpressureGuard::new(2).unwrap();
2284        g.try_acquire().unwrap();
2285        g.try_acquire().unwrap();
2286        assert!(g.is_full().unwrap());
2287    }
2288
2289    #[test]
2290    fn test_deduplicator_ttl_returns_configured_value() {
2291        let d = Deduplicator::new(Duration::from_secs(42));
2292        assert_eq!(d.ttl(), Duration::from_secs(42));
2293    }
2294
2295    #[test]
2296    fn test_circuit_breaker_is_closed_initially() {
2297        let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(1)).unwrap();
2298        assert!(cb.is_closed());
2299        assert!(!cb.is_open());
2300        assert!(!cb.is_half_open());
2301    }
2302
2303    #[test]
2304    fn test_circuit_breaker_is_open_after_threshold_failures() {
2305        let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2306        cb.record_failure();
2307        cb.record_failure();
2308        assert!(cb.is_open());
2309        assert!(!cb.is_closed());
2310    }
2311
2312    #[test]
2313    fn test_retry_policy_total_max_delay_constant() {
2314        // constant 100ms × 3 attempts = 300ms total
2315        let p = RetryPolicy::constant(3, 100).unwrap();
2316        assert_eq!(p.total_max_delay_ms(), 300);
2317    }
2318
2319    #[test]
2320    fn test_retry_policy_total_max_delay_none_is_zero() {
2321        let p = RetryPolicy::none();
2322        assert_eq!(p.total_max_delay_ms(), 0);
2323    }
2324
2325    #[test]
2326    fn test_retry_policy_is_none_true_for_none() {
2327        let p = RetryPolicy::none();
2328        assert!(p.is_none());
2329    }
2330
2331    #[test]
2332    fn test_retry_policy_is_none_false_for_exponential() {
2333        let p = RetryPolicy::exponential(3, 10).unwrap();
2334        assert!(!p.is_none());
2335    }
2336
2337    #[test]
2338    fn test_pipeline_has_error_handler_false_by_default() {
2339        let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2340        assert!(!p.has_error_handler());
2341    }
2342
2343    #[test]
2344    fn test_pipeline_has_error_handler_true_after_set() {
2345        let p = Pipeline::new()
2346            .with_error_handler(|_stage, _err| "recovered".to_string());
2347        assert!(p.has_error_handler());
2348    }
2349
2350    #[test]
2351    fn test_backpressure_reset_clears_depth() {
2352        let g = BackpressureGuard::new(5).unwrap();
2353        g.try_acquire().unwrap();
2354        g.try_acquire().unwrap();
2355        assert_eq!(g.depth().unwrap(), 2);
2356        g.reset();
2357        assert_eq!(g.depth().unwrap(), 0);
2358    }
2359
2360    #[test]
2361    fn test_deduplicator_in_flight_keys_returns_started_keys() {
2362        let d = Deduplicator::new(Duration::from_secs(60));
2363        d.check("key-a", Duration::from_secs(60)).unwrap();
2364        d.check("key-b", Duration::from_secs(60)).unwrap();
2365        let mut keys = d.in_flight_keys().unwrap();
2366        keys.sort();
2367        assert_eq!(keys, vec!["key-a", "key-b"]);
2368    }
2369
2370    // ── Round 3: new methods ──────────────────────────────────────────────────
2371
2372    #[test]
2373    fn test_retry_policy_with_base_delay_ms_changes_delay() {
2374        let p = RetryPolicy::exponential(3, 100)
2375            .unwrap()
2376            .with_base_delay_ms(200)
2377            .unwrap();
2378        assert_eq!(p.delay_for(1), Duration::from_millis(200));
2379    }
2380
2381    #[test]
2382    fn test_retry_policy_with_base_delay_ms_rejects_zero() {
2383        let p = RetryPolicy::exponential(3, 100).unwrap();
2384        assert!(p.with_base_delay_ms(0).is_err());
2385    }
2386
2387    #[test]
2388    fn test_backpressure_reset_depth_clears_counter() {
2389        let guard = BackpressureGuard::new(5).unwrap();
2390        guard.try_acquire().unwrap();
2391        guard.try_acquire().unwrap();
2392        assert_eq!(guard.depth().unwrap(), 2);
2393        guard.reset_depth().unwrap();
2394        assert_eq!(guard.depth().unwrap(), 0);
2395    }
2396
2397    #[test]
2398    fn test_pipeline_remove_stage_returns_true_if_found() {
2399        let mut p = Pipeline::new()
2400            .add_stage("a", |s| Ok(s))
2401            .add_stage("b", |s| Ok(s));
2402        assert!(p.remove_stage("a"));
2403        assert_eq!(p.stage_count(), 1);
2404        assert_eq!(p.stage_names(), vec!["b"]);
2405    }
2406
2407    #[test]
2408    fn test_pipeline_remove_stage_returns_false_if_missing() {
2409        let mut p = Pipeline::new().add_stage("x", |s| Ok(s));
2410        assert!(!p.remove_stage("nope"));
2411        assert_eq!(p.stage_count(), 1);
2412    }
2413
2414    #[test]
2415    fn test_pipeline_clear_removes_all_stages() {
2416        let mut p = Pipeline::new()
2417            .add_stage("a", |s| Ok(s))
2418            .add_stage("b", |s| Ok(s));
2419        p.clear();
2420        assert!(p.is_empty());
2421    }
2422
2423    // ── Round 4: CircuitBreaker accessors / Pipeline::get_stage_name_at ──────
2424
2425    #[test]
2426    fn test_circuit_breaker_threshold_accessor() {
2427        let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(30)).unwrap();
2428        assert_eq!(cb.threshold(), 5);
2429    }
2430
2431    #[test]
2432    fn test_circuit_breaker_recovery_window_accessor() {
2433        let window = Duration::from_secs(45);
2434        let cb = CircuitBreaker::new("svc", 3, window).unwrap();
2435        assert_eq!(cb.recovery_window(), window);
2436    }
2437
2438    #[test]
2439    fn test_pipeline_get_stage_name_at_returns_correct_names() {
2440        let p = Pipeline::new()
2441            .add_stage("first", |s| Ok(s))
2442            .add_stage("second", |s| Ok(s));
2443        assert_eq!(p.get_stage_name_at(0), Some("first"));
2444        assert_eq!(p.get_stage_name_at(1), Some("second"));
2445        assert_eq!(p.get_stage_name_at(2), None);
2446    }
2447
2448    // ── Round 16: can_retry ───────────────────────────────────────────────────
2449
2450    #[test]
2451    fn test_retry_policy_can_retry_within_budget() {
2452        let p = RetryPolicy::exponential(3, 100).unwrap();
2453        assert!(p.can_retry(0));
2454        assert!(p.can_retry(1));
2455        assert!(p.can_retry(2));
2456    }
2457
2458    #[test]
2459    fn test_retry_policy_can_retry_false_when_exhausted() {
2460        let p = RetryPolicy::exponential(3, 100).unwrap();
2461        assert!(!p.can_retry(3));
2462        assert!(!p.can_retry(99));
2463    }
2464
2465    #[test]
2466    fn test_retry_policy_none_only_allows_first_attempt() {
2467        let p = RetryPolicy::none();
2468        assert!(p.can_retry(0));
2469        assert!(!p.can_retry(1));
2470    }
2471
2472    // ── Round 5: RetryPolicy::max_attempts / Pipeline::stage_names_owned ─────
2473
2474    #[test]
2475    fn test_retry_policy_max_attempts_accessor() {
2476        let p = RetryPolicy::exponential(7, 100).unwrap();
2477        assert_eq!(p.max_attempts(), 7);
2478    }
2479
2480    #[test]
2481    fn test_pipeline_stage_names_owned_returns_strings() {
2482        let p = Pipeline::new()
2483            .add_stage("alpha", |s| Ok(s))
2484            .add_stage("beta", |s| Ok(s));
2485        let owned = p.stage_names_owned();
2486        assert_eq!(owned, vec!["alpha".to_string(), "beta".to_string()]);
2487    }
2488
2489    #[test]
2490    fn test_pipeline_stage_names_owned_empty_when_no_stages() {
2491        let p = Pipeline::new();
2492        assert!(p.stage_names_owned().is_empty());
2493    }
2494
2495    // ── Round 17: attempts_remaining ─────────────────────────────────────────
2496
2497    #[test]
2498    fn test_attempts_remaining_full_at_zero() {
2499        let p = RetryPolicy::exponential(4, 100).unwrap();
2500        assert_eq!(p.attempts_remaining(0), 4);
2501    }
2502
2503    #[test]
2504    fn test_attempts_remaining_decrements_correctly() {
2505        let p = RetryPolicy::exponential(4, 100).unwrap();
2506        assert_eq!(p.attempts_remaining(2), 2);
2507        assert_eq!(p.attempts_remaining(4), 0);
2508    }
2509
2510    #[test]
2511    fn test_attempts_remaining_zero_when_exhausted() {
2512        let p = RetryPolicy::exponential(3, 100).unwrap();
2513        assert_eq!(p.attempts_remaining(10), 0);
2514    }
2515
2516    // ── Round 18: untested circuit-breaker, deduplicator, backpressure methods
2517
2518    #[test]
2519    fn test_retry_policy_max_attempts_getter() {
2520        let p = RetryPolicy::exponential(7, 50).unwrap();
2521        assert_eq!(p.max_attempts(), 7);
2522    }
2523
2524    #[test]
2525    fn test_circuit_breaker_failure_count_increments() {
2526        let cb = CircuitBreaker::new("svc2", 3, std::time::Duration::from_secs(60)).unwrap();
2527        cb.record_failure();
2528        cb.record_failure();
2529        assert_eq!(cb.failure_count().unwrap(), 2);
2530    }
2531
2532    #[test]
2533    fn test_circuit_breaker_record_success_resets_failures() {
2534        let cb = CircuitBreaker::new("svc3", 5, std::time::Duration::from_secs(60)).unwrap();
2535        cb.record_failure();
2536        cb.record_failure();
2537        cb.record_success();
2538        assert_eq!(cb.failure_count().unwrap(), 0);
2539        assert!(cb.is_closed());
2540    }
2541
2542    #[test]
2543    fn test_circuit_breaker_threshold_and_recovery_window() {
2544        let cb = CircuitBreaker::new("svc4", 3, std::time::Duration::from_secs(30)).unwrap();
2545        assert_eq!(cb.threshold(), 3);
2546        assert_eq!(cb.recovery_window(), std::time::Duration::from_secs(30));
2547    }
2548
2549    #[test]
2550    fn test_circuit_breaker_reset_clears_state() {
2551        let cb = CircuitBreaker::new("svc5", 2, std::time::Duration::from_secs(60)).unwrap();
2552        cb.record_failure();
2553        cb.record_failure(); // should open circuit
2554        assert!(cb.is_open());
2555        cb.reset();
2556        assert!(cb.is_closed());
2557        assert_eq!(cb.failure_count().unwrap(), 0);
2558    }
2559
2560    #[test]
2561    fn test_deduplicator_cached_count_after_complete() {
2562        let d = Deduplicator::new(Duration::from_secs(60));
2563        d.check("key1", Duration::from_secs(60)).unwrap();
2564        d.complete("key1", "result").unwrap();
2565        assert_eq!(d.cached_count().unwrap(), 1);
2566    }
2567
2568    #[test]
2569    fn test_deduplicator_ttl_matches_configured() {
2570        let d = Deduplicator::new(Duration::from_secs(42));
2571        assert_eq!(d.ttl(), Duration::from_secs(42));
2572    }
2573
2574    #[test]
2575    fn test_deduplicator_purge_expired_removes_stale_entries() {
2576        let d = Deduplicator::new(Duration::ZERO); // instant TTL
2577        d.check("stale", Duration::ZERO).unwrap();
2578        d.complete("stale", "val").unwrap();
2579        // Sleep briefly to ensure the entry expires
2580        std::thread::sleep(std::time::Duration::from_millis(1));
2581        let removed = d.purge_expired().unwrap();
2582        assert!(removed >= 1);
2583    }
2584
2585    #[test]
2586    fn test_backpressure_remaining_capacity() {
2587        let g = BackpressureGuard::new(5).unwrap();
2588        g.try_acquire().unwrap();
2589        assert_eq!(g.remaining_capacity().unwrap(), 4);
2590    }
2591
2592    #[test]
2593    fn test_backpressure_soft_depth_ratio_without_soft_limit() {
2594        let g = BackpressureGuard::new(5).unwrap();
2595        assert_eq!(g.soft_depth_ratio(), 0.0);
2596    }
2597
2598    #[test]
2599    fn test_backpressure_soft_depth_ratio_with_soft_limit() {
2600        let g = BackpressureGuard::new(10).unwrap()
2601            .with_soft_limit(4).unwrap();
2602        g.try_acquire().unwrap();
2603        g.try_acquire().unwrap();
2604        let ratio = g.soft_depth_ratio();
2605        assert!((ratio - 0.5).abs() < 1e-6);
2606    }
2607
2608    // ── Round 7: delay_ms_for / soft_limit / has_stage ───────────────────────
2609
2610    #[test]
2611    fn test_retry_delay_ms_for_matches_delay_for() {
2612        let p = RetryPolicy::exponential(5, 100).unwrap();
2613        assert_eq!(p.delay_ms_for(1), p.delay_for(1).as_millis() as u64);
2614        assert_eq!(p.delay_ms_for(3), p.delay_for(3).as_millis() as u64);
2615    }
2616
2617    #[test]
2618    fn test_backpressure_soft_limit_returns_configured_value() {
2619        let g = BackpressureGuard::new(10).unwrap()
2620            .with_soft_limit(5).unwrap();
2621        assert_eq!(g.soft_limit(), Some(5));
2622    }
2623
2624    #[test]
2625    fn test_backpressure_soft_limit_none_when_not_set() {
2626        let g = BackpressureGuard::new(10).unwrap();
2627        assert_eq!(g.soft_limit(), None);
2628    }
2629
2630    #[test]
2631    fn test_pipeline_has_stage_returns_true_when_present() {
2632        let p = Pipeline::new().add_stage("step1", |s| Ok(s));
2633        assert!(p.has_stage("step1"));
2634        assert!(!p.has_stage("step2"));
2635    }
2636
2637    #[test]
2638    fn test_pipeline_has_stage_false_for_empty_pipeline() {
2639        let p = Pipeline::new();
2640        assert!(!p.has_stage("anything"));
2641    }
2642
2643    // ── Round 20: Deduplicator::max_entries / RetryPolicy::delay_for ─────────
2644
2645    #[test]
2646    fn test_deduplicator_max_entries_none_by_default() {
2647        let d = Deduplicator::new(Duration::from_secs(60));
2648        assert_eq!(d.max_entries(), None);
2649    }
2650
2651    #[test]
2652    fn test_deduplicator_max_entries_set_via_builder() {
2653        let d = Deduplicator::new(Duration::from_secs(60))
2654            .with_max_entries(50)
2655            .unwrap();
2656        assert_eq!(d.max_entries(), Some(50));
2657    }
2658
2659    #[test]
2660    fn test_retry_policy_delay_for_exponential_grows() {
2661        let p = RetryPolicy::exponential(5, 100).unwrap();
2662        // delay_for uses saturating_sub(1): attempt 1 => multiplier 1, attempt 2 => multiplier 2
2663        let d1 = p.delay_for(1);
2664        let d2 = p.delay_for(2);
2665        assert!(d2 > d1, "exponential delay should grow: attempt 2 > attempt 1");
2666    }
2667
2668    #[test]
2669    fn test_retry_policy_delay_for_constant_stays_same() {
2670        let p = RetryPolicy::constant(5, 200).unwrap();
2671        assert_eq!(p.delay_for(0), p.delay_for(1));
2672        assert_eq!(p.delay_for(1), p.delay_for(3));
2673    }
2674
2675    // ── Round 9: is_no_retry ──────────────────────────────────────────────────
2676
2677    #[test]
2678    fn test_is_no_retry_true_for_none_policy() {
2679        let p = RetryPolicy::none();
2680        assert!(p.is_no_retry());
2681    }
2682
2683    #[test]
2684    fn test_is_no_retry_false_for_exponential_policy() {
2685        let p = RetryPolicy::exponential(3, 50).unwrap();
2686        assert!(!p.is_no_retry());
2687    }
2688
2689    #[test]
2690    fn test_is_no_retry_false_for_constant_policy_with_multiple_attempts() {
2691        let p = RetryPolicy::constant(2, 100).unwrap();
2692        assert!(!p.is_no_retry());
2693    }
2694
2695    // ── Round 10: is_exponential ──────────────────────────────────────────────
2696
2697    #[test]
2698    fn test_is_exponential_true_for_exponential_policy() {
2699        let p = RetryPolicy::exponential(3, 50).unwrap();
2700        assert!(p.is_exponential());
2701    }
2702
2703    #[test]
2704    fn test_is_exponential_false_for_constant_policy() {
2705        let p = RetryPolicy::constant(3, 50).unwrap();
2706        assert!(!p.is_exponential());
2707    }
2708
2709    #[test]
2710    fn test_is_exponential_false_for_none_policy() {
2711        let p = RetryPolicy::none();
2712        assert!(!p.is_exponential());
2713    }
2714
2715    // ── Round 11: BackpressureGuard::is_soft_limited ──────────────────────────
2716
2717    #[test]
2718    fn test_is_soft_limited_false_without_soft_limit() {
2719        let g = BackpressureGuard::new(10).unwrap();
2720        assert!(!g.is_soft_limited());
2721    }
2722
2723    #[test]
2724    fn test_is_soft_limited_true_when_soft_limit_set() {
2725        let g = BackpressureGuard::new(10)
2726            .unwrap()
2727            .with_soft_limit(5)
2728            .unwrap();
2729        assert!(g.is_soft_limited());
2730    }
2731
2732    // ── Round 12: RetryPolicy::base_delay_ms, BackpressureGuard::percent_full ─
2733
2734    #[test]
2735    fn test_retry_policy_base_delay_ms_exponential() {
2736        let p = RetryPolicy::exponential(3, 250).unwrap();
2737        assert_eq!(p.base_delay_ms(), 250);
2738    }
2739
2740    #[test]
2741    fn test_retry_policy_base_delay_ms_constant() {
2742        let p = RetryPolicy::constant(5, 100).unwrap();
2743        assert_eq!(p.base_delay_ms(), 100);
2744    }
2745
2746    #[test]
2747    fn test_retry_policy_base_delay_ms_none_is_zero() {
2748        let p = RetryPolicy::none();
2749        assert_eq!(p.base_delay_ms(), 0);
2750    }
2751
2752    #[test]
2753    fn test_backpressure_percent_full_zero_when_empty() {
2754        let g = BackpressureGuard::new(100).unwrap();
2755        let pct = g.percent_full().unwrap();
2756        assert!((pct - 0.0).abs() < 1e-9);
2757    }
2758
2759    #[test]
2760    fn test_backpressure_percent_full_capped_at_100() {
2761        let g = BackpressureGuard::new(10).unwrap();
2762        // Fill all slots via try_acquire
2763        for _ in 0..10 {
2764            g.try_acquire().unwrap();
2765        }
2766        let pct = g.percent_full().unwrap();
2767        assert!((pct - 100.0).abs() < 1e-9);
2768    }
2769
2770    // ── Round 27: get_result, rename_stage, failure_rate ─────────────────────
2771
2772    #[test]
2773    fn test_deduplicator_get_result_returns_cached_value() {
2774        let d = Deduplicator::new(std::time::Duration::from_secs(60));
2775        d.check_and_register("req-1").unwrap();
2776        d.complete("req-1", "the answer").unwrap();
2777        let result = d.get_result("req-1").unwrap();
2778        assert_eq!(result, Some("the answer".to_string()));
2779    }
2780
2781    #[test]
2782    fn test_deduplicator_get_result_missing_key_returns_none() {
2783        let d = Deduplicator::new(std::time::Duration::from_secs(60));
2784        assert_eq!(d.get_result("ghost").unwrap(), None);
2785    }
2786
2787    #[test]
2788    fn test_pipeline_rename_stage_succeeds() {
2789        let mut p = Pipeline::new().add_stage("old-name", |s: String| Ok(s));
2790        let renamed = p.rename_stage("old-name", "new-name");
2791        assert!(renamed);
2792        assert!(p.has_stage("new-name"));
2793        assert!(!p.has_stage("old-name"));
2794    }
2795
2796    #[test]
2797    fn test_pipeline_rename_stage_missing_returns_false() {
2798        let mut p = Pipeline::new();
2799        assert!(!p.rename_stage("nonexistent", "anything"));
2800    }
2801
2802    #[test]
2803    fn test_circuit_breaker_failure_rate_zero_initially() {
2804        let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
2805        assert!((cb.failure_rate() - 0.0).abs() < 1e-9);
2806    }
2807
2808    #[test]
2809    fn test_circuit_breaker_failure_rate_increases_with_failures() {
2810        let cb = CircuitBreaker::new("svc-fr", 4, std::time::Duration::from_secs(10)).unwrap();
2811        cb.record_failure();
2812        cb.record_failure();
2813        // 2 failures / threshold 4 = 0.5
2814        assert!((cb.failure_rate() - 0.5).abs() < 1e-9);
2815    }
2816
2817    // ── Round 14: Pipeline::prepend_stage ────────────────────────────────────
2818
2819    #[test]
2820    fn test_prepend_stage_inserts_at_front() {
2821        let p = Pipeline::new()
2822            .add_stage("second", |s| Ok(s))
2823            .prepend_stage("first", |s| Ok(s));
2824        let names = p.stage_names_owned();
2825        assert_eq!(names[0], "first");
2826        assert_eq!(names[1], "second");
2827    }
2828
2829    #[test]
2830    fn test_prepend_stage_executes_before_existing_stages() {
2831        let p = Pipeline::new()
2832            .add_stage("append", |s| Ok(format!("{s}_appended")))
2833            .prepend_stage("prefix", |s| Ok(format!("pre_{s}")));
2834        let result = p.run("input".to_string()).unwrap();
2835        assert_eq!(result, "pre_input_appended");
2836    }
2837
2838    #[test]
2839    fn test_prepend_stage_on_empty_pipeline() {
2840        let p = Pipeline::new().prepend_stage("only", |s| Ok(s.to_uppercase()));
2841        let result = p.run("hello".to_string()).unwrap();
2842        assert_eq!(result, "HELLO");
2843    }
2844
2845    // ── Round 21: CircuitBreaker::is_at_threshold, BackpressureGuard::headroom_ratio ──
2846
2847    #[test]
2848    fn test_circuit_breaker_is_at_threshold_false_initially() {
2849        let cb = CircuitBreaker::new("svc", 3, std::time::Duration::from_secs(10)).unwrap();
2850        assert!(!cb.is_at_threshold());
2851    }
2852
2853    #[test]
2854    fn test_circuit_breaker_is_at_threshold_true_when_failures_reach_threshold() {
2855        let cb = CircuitBreaker::new("svc-t", 2, std::time::Duration::from_secs(10)).unwrap();
2856        cb.record_failure();
2857        assert!(!cb.is_at_threshold());
2858        cb.record_failure();
2859        assert!(cb.is_at_threshold());
2860    }
2861
2862    #[test]
2863    fn test_backpressure_headroom_ratio_one_when_empty() {
2864        let g = BackpressureGuard::new(10).unwrap();
2865        let ratio = g.headroom_ratio().unwrap();
2866        assert!((ratio - 1.0).abs() < 1e-9);
2867    }
2868
2869    #[test]
2870    fn test_backpressure_headroom_ratio_decreases_on_acquire() {
2871        let g = BackpressureGuard::new(4).unwrap();
2872        g.try_acquire().unwrap(); // 1/4 used → headroom = 3/4
2873        let ratio = g.headroom_ratio().unwrap();
2874        assert!((ratio - 0.75).abs() < 1e-9);
2875    }
2876
2877    // ── Round 17: Pipeline first/last/stage_index, BackpressureGuard is_empty/available ──
2878
2879    #[test]
2880    fn test_pipeline_first_stage_name_returns_first() {
2881        let p = Pipeline::new()
2882            .add_stage("alpha", |s| Ok(s))
2883            .add_stage("beta", |s| Ok(s));
2884        assert_eq!(p.first_stage_name(), Some("alpha"));
2885    }
2886
2887    #[test]
2888    fn test_pipeline_first_stage_name_none_when_empty() {
2889        let p = Pipeline::new();
2890        assert!(p.first_stage_name().is_none());
2891    }
2892
2893    #[test]
2894    fn test_pipeline_last_stage_name_returns_last() {
2895        let p = Pipeline::new()
2896            .add_stage("alpha", |s| Ok(s))
2897            .add_stage("omega", |s| Ok(s));
2898        assert_eq!(p.last_stage_name(), Some("omega"));
2899    }
2900
2901    #[test]
2902    fn test_pipeline_stage_index_returns_correct_position() {
2903        let p = Pipeline::new()
2904            .add_stage("first", |s| Ok(s))
2905            .add_stage("second", |s| Ok(s))
2906            .add_stage("third", |s| Ok(s));
2907        assert_eq!(p.stage_index("first"), Some(0));
2908        assert_eq!(p.stage_index("second"), Some(1));
2909        assert_eq!(p.stage_index("third"), Some(2));
2910        assert_eq!(p.stage_index("missing"), None);
2911    }
2912
2913    #[test]
2914    fn test_backpressure_is_empty_true_when_no_slots_acquired() {
2915        let g = BackpressureGuard::new(10).unwrap();
2916        assert!(g.is_empty().unwrap());
2917    }
2918
2919    #[test]
2920    fn test_backpressure_is_empty_false_after_acquire() {
2921        let g = BackpressureGuard::new(10).unwrap();
2922        g.try_acquire().unwrap();
2923        assert!(!g.is_empty().unwrap());
2924    }
2925
2926    #[test]
2927    fn test_backpressure_available_capacity_decrements_on_acquire() {
2928        let g = BackpressureGuard::new(5).unwrap();
2929        assert_eq!(g.available_capacity().unwrap(), 5);
2930        g.try_acquire().unwrap();
2931        assert_eq!(g.available_capacity().unwrap(), 4);
2932    }
2933
2934    // ── Round 16: Deduplicator::evict_oldest ─────────────────────────────────
2935
2936    #[test]
2937    fn test_evict_oldest_removes_first_cached_entry() {
2938        let d = Deduplicator::new(std::time::Duration::from_secs(60));
2939        // Register and complete two entries to put them in cache
2940        d.check_and_register("alpha").unwrap();
2941        d.check_and_register("beta").unwrap();
2942        d.complete("alpha", "result_a").unwrap();
2943        d.complete("beta", "result_b").unwrap();
2944        // Evict the oldest (alpha)
2945        let removed = d.evict_oldest().unwrap();
2946        assert!(removed);
2947        assert!(d.get_result("alpha").unwrap().is_none());
2948        assert!(d.get_result("beta").unwrap().is_some());
2949    }
2950
2951    #[test]
2952    fn test_evict_oldest_returns_false_when_empty() {
2953        let d = Deduplicator::new(std::time::Duration::from_secs(60));
2954        assert!(!d.evict_oldest().unwrap());
2955    }
2956
2957    // ── Round 17: CircuitBreaker::is_at_threshold three-failure variant ──────
2958
2959    #[test]
2960    fn test_circuit_breaker_is_at_threshold_true_after_three_failures() {
2961        let cb = CircuitBreaker::new("svc-3", 3, std::time::Duration::from_secs(60)).unwrap();
2962        cb.record_failure();
2963        cb.record_failure();
2964        cb.record_failure();
2965        assert!(cb.is_at_threshold());
2966    }
2967
2968    // ── Round 22: CircuitBreaker::failures_until_open ─────────────────────────
2969
2970    #[test]
2971    fn test_failures_until_open_equals_threshold_initially() {
2972        let cb = CircuitBreaker::new("svc-fuo", 5, std::time::Duration::from_secs(60)).unwrap();
2973        assert_eq!(cb.failures_until_open(), 5);
2974    }
2975
2976    #[test]
2977    fn test_failures_until_open_decrements_with_each_failure() {
2978        let cb = CircuitBreaker::new("svc-fuo2", 4, std::time::Duration::from_secs(60)).unwrap();
2979        cb.record_failure();
2980        assert_eq!(cb.failures_until_open(), 3);
2981        cb.record_failure();
2982        assert_eq!(cb.failures_until_open(), 2);
2983    }
2984
2985    #[test]
2986    fn test_failures_until_open_zero_when_at_threshold() {
2987        let cb = CircuitBreaker::new("svc-fuo3", 2, std::time::Duration::from_secs(60)).unwrap();
2988        cb.record_failure();
2989        cb.record_failure();
2990        assert_eq!(cb.failures_until_open(), 0);
2991    }
2992
2993    // ── Round 29: Deduplicator::cached_keys ──────────────────────────────────
2994
2995    #[test]
2996    fn test_deduplicator_cached_keys_empty_initially() {
2997        let d = Deduplicator::new(Duration::from_secs(60));
2998        assert!(d.cached_keys().unwrap().is_empty());
2999    }
3000
3001    #[test]
3002    fn test_deduplicator_cached_keys_contains_completed_key() {
3003        let d = Deduplicator::new(Duration::from_secs(60));
3004        d.check_and_register("ck-key").unwrap();
3005        d.complete("ck-key", "result").unwrap();
3006        let keys = d.cached_keys().unwrap();
3007        assert!(keys.contains(&"ck-key".to_string()));
3008    }
3009
3010    #[test]
3011    fn test_deduplicator_cached_keys_excludes_in_flight() {
3012        let d = Deduplicator::new(Duration::from_secs(60));
3013        d.check_and_register("pending-key").unwrap();
3014        // In-flight keys live in a separate map, not in cache
3015        assert!(!d.cached_keys().unwrap().contains(&"pending-key".to_string()));
3016    }
3017
3018    #[test]
3019    fn test_deduplicator_cached_keys_multiple_entries() {
3020        let d = Deduplicator::new(Duration::from_secs(60));
3021        for k in ["alpha", "beta", "gamma"] {
3022            d.check_and_register(k).unwrap();
3023            d.complete(k, "v").unwrap();
3024        }
3025        let keys = d.cached_keys().unwrap();
3026        assert_eq!(keys.len(), 3);
3027    }
3028
3029    // ── Round 30: RetryPolicy::is_constant, total_max_delay_ms ───────────────
3030
3031    #[test]
3032    fn test_retry_policy_is_constant_true_for_constant() {
3033        let p = RetryPolicy::constant(3, 100).unwrap();
3034        assert!(p.is_constant());
3035        assert!(!p.is_exponential());
3036    }
3037
3038    #[test]
3039    fn test_retry_policy_is_constant_false_for_exponential() {
3040        let p = RetryPolicy::exponential(3, 100).unwrap();
3041        assert!(!p.is_constant());
3042    }
3043
3044    #[test]
3045    fn test_retry_policy_total_max_delay_ms_constant() {
3046        // constant(3, 100) → delays [100, 100, 100] = 300
3047        let p = RetryPolicy::constant(3, 100).unwrap();
3048        assert_eq!(p.total_max_delay_ms(), 300);
3049    }
3050
3051    #[test]
3052    fn test_retry_policy_total_max_delay_ms_exponential() {
3053        // exponential(3, 100) → delays [100, 200, 400] (capped at MAX)
3054        let p = RetryPolicy::exponential(3, 100).unwrap();
3055        let total = p.total_max_delay_ms();
3056        assert!(total >= 300); // at minimum 100+100+100
3057    }
3058
3059    // ── Round 30: CircuitBreaker::is_half_open, is_healthy ───────────────────
3060
3061    #[test]
3062    fn test_circuit_breaker_is_healthy_true_when_closed() {
3063        let cb = CircuitBreaker::new("svc-ih1", 3, Duration::from_secs(60)).unwrap();
3064        assert!(cb.is_healthy());
3065    }
3066
3067    #[test]
3068    fn test_circuit_breaker_is_healthy_false_when_open() {
3069        let cb = CircuitBreaker::new("svc-ih2", 1, Duration::from_secs(60)).unwrap();
3070        let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3071        assert!(!cb.is_healthy());
3072    }
3073
3074    #[test]
3075    fn test_circuit_breaker_is_half_open_after_zero_recovery() {
3076        let cb = CircuitBreaker::new("svc-ho1", 1, Duration::ZERO).unwrap();
3077        let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3078        // With zero recovery window the circuit immediately enters HalfOpen
3079        assert!(cb.is_half_open() || cb.is_healthy()); // HalfOpen or recovered
3080    }
3081
3082    // ── Round 30: Deduplicator::is_idle ──────────────────────────────────────
3083
3084    #[test]
3085    fn test_deduplicator_is_idle_true_when_empty() {
3086        let d = Deduplicator::new(Duration::from_secs(60));
3087        assert!(d.is_idle().unwrap());
3088    }
3089
3090    #[test]
3091    fn test_deduplicator_is_idle_false_when_in_flight() {
3092        let d = Deduplicator::new(Duration::from_secs(60));
3093        d.check_and_register("req-x").unwrap();
3094        assert!(!d.is_idle().unwrap());
3095    }
3096
3097    #[test]
3098    fn test_deduplicator_is_idle_true_after_complete() {
3099        let d = Deduplicator::new(Duration::from_secs(60));
3100        d.check_and_register("req-y").unwrap();
3101        d.complete("req-y", "done").unwrap();
3102        assert!(d.is_idle().unwrap());
3103    }
3104
3105    // ── Round 26: in_flight_count ─────────────────────────────────────────────
3106
3107    #[test]
3108    fn test_deduplicator_in_flight_count_zero_initially() {
3109        let d = Deduplicator::new(Duration::from_secs(60));
3110        assert_eq!(d.in_flight_count().unwrap(), 0);
3111    }
3112
3113    #[test]
3114    fn test_deduplicator_in_flight_count_increments_on_register() {
3115        let d = Deduplicator::new(Duration::from_secs(60));
3116        d.check_and_register("k1").unwrap();
3117        d.check_and_register("k2").unwrap();
3118        assert_eq!(d.in_flight_count().unwrap(), 2);
3119    }
3120
3121    #[test]
3122    fn test_deduplicator_in_flight_count_decrements_after_complete() {
3123        let d = Deduplicator::new(Duration::from_secs(60));
3124        d.check_and_register("k1").unwrap();
3125        d.complete("k1", "result").unwrap();
3126        assert_eq!(d.in_flight_count().unwrap(), 0);
3127    }
3128
3129    // ── Round 27: total_count, acquired_count, swap_stages, will_retry_at_all
3130
3131    #[test]
3132    fn test_deduplicator_total_count_sums_in_flight_and_cached() {
3133        let d = Deduplicator::new(Duration::from_secs(60));
3134        d.check_and_register("k1").unwrap(); // in-flight
3135        d.check_and_register("k2").unwrap(); // in-flight
3136        d.complete("k1", "done").unwrap();   // moves to cache
3137        // 1 in-flight + 1 cached = 2
3138        assert_eq!(d.total_count().unwrap(), 2);
3139    }
3140
3141    #[test]
3142    fn test_deduplicator_total_count_zero_when_empty() {
3143        let d = Deduplicator::new(Duration::from_secs(60));
3144        assert_eq!(d.total_count().unwrap(), 0);
3145    }
3146
3147    #[test]
3148    fn test_backpressure_acquired_count_zero_initially() {
3149        let g = BackpressureGuard::new(5).unwrap();
3150        assert_eq!(g.acquired_count().unwrap(), 0);
3151    }
3152
3153    #[test]
3154    fn test_backpressure_acquired_count_increments_on_acquire() {
3155        let g = BackpressureGuard::new(5).unwrap();
3156        g.try_acquire().unwrap();
3157        g.try_acquire().unwrap();
3158        assert_eq!(g.acquired_count().unwrap(), 2);
3159    }
3160
3161    #[test]
3162    fn test_pipeline_swap_stages_swaps_positions() {
3163        let mut p = Pipeline::new()
3164            .add_stage("a", |s| Ok(s + "A"))
3165            .add_stage("b", |s| Ok(s + "B"));
3166        let swapped = p.swap_stages("a", "b");
3167        assert!(swapped);
3168        assert_eq!(p.first_stage_name().unwrap(), "b");
3169        assert_eq!(p.last_stage_name().unwrap(), "a");
3170    }
3171
3172    #[test]
3173    fn test_pipeline_swap_stages_returns_false_for_unknown_stage() {
3174        let mut p = Pipeline::new().add_stage("a", |s| Ok(s));
3175        assert!(!p.swap_stages("a", "missing"));
3176    }
3177
3178    #[test]
3179    fn test_retry_policy_will_retry_at_all_false_for_none() {
3180        let p = RetryPolicy::none();
3181        assert!(!p.will_retry_at_all());
3182    }
3183
3184    #[test]
3185    fn test_retry_policy_will_retry_at_all_true_for_exponential() {
3186        let p = RetryPolicy::exponential(3, 100).unwrap();
3187        assert!(p.will_retry_at_all());
3188    }
3189
3190    // ── Round 31: Deduplicator::fail ─────────────────────────────────────────
3191
3192    #[test]
3193    fn test_deduplicator_fail_removes_in_flight_key() {
3194        let d = Deduplicator::new(Duration::from_secs(60));
3195        d.check_and_register("failing-req").unwrap();
3196        assert!(!d.is_idle().unwrap());
3197        d.fail("failing-req").unwrap();
3198        assert!(d.is_idle().unwrap());
3199    }
3200
3201    #[test]
3202    fn test_deduplicator_fail_on_unknown_key_is_noop() {
3203        let d = Deduplicator::new(Duration::from_secs(60));
3204        assert!(d.fail("nonexistent").is_ok());
3205    }
3206
3207    #[test]
3208    fn test_deduplicator_fail_allows_reregistration() {
3209        let d = Deduplicator::new(Duration::from_secs(60));
3210        d.check_and_register("retry-key").unwrap();
3211        d.fail("retry-key").unwrap();
3212        let result = d.check_and_register("retry-key").unwrap();
3213        assert_eq!(result, DeduplicationResult::New);
3214    }
3215
3216    // ── Round 27: max_total_delay_ms ──────────────────────────────────────────
3217
3218    #[test]
3219    fn test_retry_policy_max_total_delay_ms_constant_policy() {
3220        let p = RetryPolicy::constant(3, 100).unwrap();
3221        // 3 attempts × 100ms each = 300ms
3222        assert_eq!(p.max_total_delay_ms(), 300);
3223    }
3224
3225    #[test]
3226    fn test_retry_policy_max_total_delay_ms_single_attempt() {
3227        let p = RetryPolicy::constant(1, 50).unwrap();
3228        assert_eq!(p.max_total_delay_ms(), 50);
3229    }
3230
3231    // ── Round 28: is_last_attempt ─────────────────────────────────────────────
3232
3233    #[test]
3234    fn test_retry_policy_is_last_attempt_true_at_max() {
3235        let p = RetryPolicy::exponential(3, 100).unwrap();
3236        assert!(p.is_last_attempt(3));
3237    }
3238
3239    #[test]
3240    fn test_retry_policy_is_last_attempt_false_before_max() {
3241        let p = RetryPolicy::exponential(3, 100).unwrap();
3242        assert!(!p.is_last_attempt(2));
3243    }
3244
3245    #[test]
3246    fn test_retry_policy_is_last_attempt_true_beyond_max() {
3247        let p = RetryPolicy::exponential(3, 100).unwrap();
3248        assert!(p.is_last_attempt(4));
3249    }
3250
3251    // ── Round 29: delay_sum_ms ────────────────────────────────────────────────
3252
3253    #[test]
3254    fn test_retry_policy_delay_sum_ms_constant_two_attempts() {
3255        let p = RetryPolicy::constant(5, 100).unwrap();
3256        assert_eq!(p.delay_sum_ms(2), 200);
3257    }
3258
3259    #[test]
3260    fn test_retry_policy_delay_sum_ms_capped_at_max_attempts() {
3261        let p = RetryPolicy::constant(2, 50).unwrap();
3262        // n=10 but max_attempts=2 → only 2 delays summed = 100
3263        assert_eq!(p.delay_sum_ms(10), 100);
3264    }
3265
3266    // ── Round 30: avg_delay_ms ────────────────────────────────────────────────
3267
3268    #[test]
3269    fn test_retry_policy_avg_delay_ms_constant() {
3270        let p = RetryPolicy::constant(4, 100).unwrap();
3271        // every attempt = 100ms → average = 100ms
3272        assert_eq!(p.avg_delay_ms(), 100);
3273    }
3274
3275    #[test]
3276    fn test_retry_policy_avg_delay_ms_single_attempt_policy() {
3277        // RetryPolicy::none() has 1 attempt and ZERO delay
3278        let p = RetryPolicy::none();
3279        assert_eq!(p.avg_delay_ms(), 0);
3280    }
3281
3282    #[test]
3283    fn test_backoff_factor_exponential_returns_two() {
3284        let p = RetryPolicy::exponential(3, 100).unwrap();
3285        assert!((p.backoff_factor() - 2.0).abs() < 1e-9);
3286    }
3287
3288    #[test]
3289    fn test_backoff_factor_constant_returns_one() {
3290        let p = RetryPolicy::constant(3, 100).unwrap();
3291        assert!((p.backoff_factor() - 1.0).abs() < 1e-9);
3292    }
3293
3294    #[test]
3295    fn test_pipeline_count_stages_matching_counts_by_keyword() {
3296        let p = Pipeline::new()
3297            .add_stage("normalize-text", |s| Ok(s))
3298            .add_stage("text-trim", |s| Ok(s))
3299            .add_stage("embed", |s| Ok(s));
3300        assert_eq!(p.count_stages_matching("text"), 2);
3301        assert_eq!(p.count_stages_matching("embed"), 1);
3302        assert_eq!(p.count_stages_matching("missing"), 0);
3303    }
3304
3305    #[test]
3306    fn test_pipeline_count_stages_matching_case_insensitive() {
3307        let p = Pipeline::new().add_stage("TEXT-CLEAN", |s| Ok(s));
3308        assert_eq!(p.count_stages_matching("text"), 1);
3309    }
3310
3311    #[test]
3312    fn test_backpressure_guard_over_soft_limit_true_when_exceeded() {
3313        let guard = BackpressureGuard::new(10)
3314            .unwrap()
3315            .with_soft_limit(1)
3316            .unwrap();
3317        guard.try_acquire().unwrap();
3318        guard.try_acquire().unwrap();
3319        assert!(guard.over_soft_limit().unwrap());
3320    }
3321
3322    #[test]
3323    fn test_backpressure_guard_over_soft_limit_false_when_no_soft_limit() {
3324        let guard = BackpressureGuard::new(10).unwrap();
3325        guard.try_acquire().unwrap();
3326        assert!(!guard.over_soft_limit().unwrap());
3327    }
3328}