use crate::error::AgentRuntimeError;
use crate::util::timed_lock;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RetryKind {
Exponential,
Constant,
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub base_delay: Duration,
pub kind: RetryKind,
}
impl RetryPolicy {
pub fn exponential(max_attempts: u32, base_ms: u64) -> Result<Self, AgentRuntimeError> {
if max_attempts == 0 {
return Err(AgentRuntimeError::Orchestration(
"max_attempts must be >= 1".into(),
));
}
if base_ms == 0 {
return Err(AgentRuntimeError::Orchestration(
"base_ms must be >= 1 to avoid zero-delay busy-loop retries".into(),
));
}
Ok(Self {
max_attempts,
base_delay: Duration::from_millis(base_ms),
kind: RetryKind::Exponential,
})
}
pub fn constant(max_attempts: u32, delay_ms: u64) -> Result<Self, AgentRuntimeError> {
if max_attempts == 0 {
return Err(AgentRuntimeError::Orchestration(
"max_attempts must be >= 1".into(),
));
}
if delay_ms == 0 {
return Err(AgentRuntimeError::Orchestration(
"delay_ms must be >= 1 to avoid busy-loop retries".into(),
));
}
Ok(Self {
max_attempts,
base_delay: Duration::from_millis(delay_ms),
kind: RetryKind::Constant,
})
}
pub fn none() -> Self {
Self {
max_attempts: 1,
base_delay: Duration::ZERO,
kind: RetryKind::Constant,
}
}
pub fn is_none(&self) -> bool {
self.max_attempts == 1 && self.base_delay == Duration::ZERO
}
pub fn with_max_attempts(mut self, n: u32) -> Result<Self, AgentRuntimeError> {
if n == 0 {
return Err(AgentRuntimeError::Orchestration(
"max_attempts must be >= 1".into(),
));
}
self.max_attempts = n;
Ok(self)
}
pub fn max_attempts(&self) -> u32 {
self.max_attempts
}
pub fn is_no_retry(&self) -> bool {
self.max_attempts <= 1
}
pub fn will_retry_at_all(&self) -> bool {
self.max_attempts > 1
}
pub fn is_exponential(&self) -> bool {
matches!(self.kind, RetryKind::Exponential)
}
pub fn is_constant(&self) -> bool {
matches!(self.kind, RetryKind::Constant)
}
pub fn base_delay_ms(&self) -> u64 {
self.base_delay.as_millis() as u64
}
pub fn first_delay_ms(&self) -> u64 {
self.base_delay_ms()
}
pub fn is_last_attempt(&self, attempt: u32) -> bool {
attempt >= self.max_attempts
}
pub fn max_total_delay_ms(&self) -> u64 {
(1..=self.max_attempts)
.map(|attempt| self.delay_for(attempt).as_millis() as u64)
.sum()
}
pub fn delay_sum_ms(&self, n: u32) -> u64 {
let limit = n.min(self.max_attempts);
(1..=limit)
.map(|attempt| self.delay_for(attempt).as_millis() as u64)
.sum()
}
pub fn avg_delay_ms(&self) -> u64 {
if self.max_attempts == 0 {
return 0;
}
self.max_total_delay_ms() / self.max_attempts as u64
}
pub fn backoff_factor(&self) -> f64 {
match self.kind {
RetryKind::Exponential => 2.0,
RetryKind::Constant => 1.0,
}
}
pub fn with_base_delay_ms(mut self, ms: u64) -> Result<Self, AgentRuntimeError> {
if ms == 0 {
return Err(AgentRuntimeError::Orchestration(
"base_delay_ms must be >= 1 to avoid busy-loop retries".into(),
));
}
self.base_delay = Duration::from_millis(ms);
Ok(self)
}
pub fn delay_ms_for(&self, attempt: u32) -> u64 {
self.delay_for(attempt).as_millis() as u64
}
pub fn total_max_delay_ms(&self) -> u64 {
(1..=self.max_attempts)
.map(|a| self.delay_for(a).as_millis() as u64)
.sum()
}
pub fn attempts_remaining(&self, attempt: u32) -> u32 {
self.max_attempts.saturating_sub(attempt)
}
pub fn attempts_budget_used(&self, attempt: u32) -> f64 {
if self.max_attempts == 0 {
return 1.0;
}
(attempt as f64 / self.max_attempts as f64).min(1.0)
}
pub fn max_delay_ms(&self) -> u64 {
if self.max_attempts == 0 {
return 0;
}
self.delay_ms_for(self.max_attempts)
}
pub fn can_retry(&self, attempt: u32) -> bool {
attempt < self.max_attempts
}
pub fn delay_for(&self, attempt: u32) -> Duration {
match self.kind {
RetryKind::Constant => self.base_delay.min(MAX_RETRY_DELAY),
RetryKind::Exponential => {
let exp = attempt.saturating_sub(1);
let multiplier = 1u64.checked_shl(exp.min(63)).unwrap_or(u64::MAX);
let millis = self
.base_delay
.as_millis()
.saturating_mul(multiplier as u128);
let raw = Duration::from_millis(millis.min(u64::MAX as u128) as u64);
raw.min(MAX_RETRY_DELAY)
}
}
}
pub fn is_bounded(&self) -> bool {
self.max_attempts < u32::MAX
}
pub fn remaining_wait_budget_ms(&self, attempts_done: u32) -> u64 {
self.max_total_delay_ms().saturating_sub(self.delay_sum_ms(attempts_done))
}
pub fn max_single_delay_ms(&self) -> u64 {
self.delay_for(self.max_attempts).as_millis() as u64
}
pub fn covers_n_failures(&self, n: u32) -> bool {
self.max_attempts > n
}
}
impl std::fmt::Display for RetryPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
RetryKind::Exponential => write!(
f,
"Exponential({}×, base={}ms)",
self.max_attempts,
self.base_delay.as_millis()
),
RetryKind::Constant => write!(
f,
"Constant({}×, delay={}ms)",
self.max_attempts,
self.base_delay.as_millis()
),
}
}
}
#[derive(Debug, Clone)]
pub enum CircuitState {
Closed,
Open {
opened_at: Instant,
},
HalfOpen,
}
impl PartialEq for CircuitState {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(CircuitState::Closed, CircuitState::Closed) => true,
(CircuitState::Open { .. }, CircuitState::Open { .. }) => true,
(CircuitState::HalfOpen, CircuitState::HalfOpen) => true,
_ => false,
}
}
}
impl Eq for CircuitState {}
impl std::fmt::Display for CircuitState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CircuitState::Closed => write!(f, "Closed"),
CircuitState::Open { .. } => write!(f, "Open"),
CircuitState::HalfOpen => write!(f, "HalfOpen"),
}
}
}
pub trait CircuitBreakerBackend: Send + Sync {
fn increment_failures(&self, service: &str) -> u32;
fn reset_failures(&self, service: &str);
fn get_failures(&self, service: &str) -> u32;
fn set_open_at(&self, service: &str, at: std::time::Instant);
fn clear_open_at(&self, service: &str);
fn get_open_at(&self, service: &str) -> Option<std::time::Instant>;
}
pub struct InMemoryCircuitBreakerBackend {
inner: Arc<Mutex<HashMap<String, InMemoryServiceState>>>,
}
#[derive(Default)]
struct InMemoryServiceState {
consecutive_failures: u32,
open_at: Option<std::time::Instant>,
}
impl InMemoryCircuitBreakerBackend {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl Default for InMemoryCircuitBreakerBackend {
fn default() -> Self {
Self::new()
}
}
impl CircuitBreakerBackend for InMemoryCircuitBreakerBackend {
fn increment_failures(&self, service: &str) -> u32 {
let mut map = timed_lock(
&self.inner,
"InMemoryCircuitBreakerBackend::increment_failures",
);
let state = map.entry(service.to_owned()).or_default();
state.consecutive_failures += 1;
state.consecutive_failures
}
fn reset_failures(&self, service: &str) {
let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::reset_failures");
if let Some(state) = map.get_mut(service) {
state.consecutive_failures = 0;
}
}
fn get_failures(&self, service: &str) -> u32 {
let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_failures");
map.get(service).map_or(0, |s| s.consecutive_failures)
}
fn set_open_at(&self, service: &str, at: std::time::Instant) {
let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::set_open_at");
map.entry(service.to_owned()).or_default().open_at = Some(at);
}
fn clear_open_at(&self, service: &str) {
let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::clear_open_at");
if let Some(state) = map.get_mut(service) {
state.open_at = None;
}
}
fn get_open_at(&self, service: &str) -> Option<std::time::Instant> {
let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_open_at");
map.get(service).and_then(|s| s.open_at)
}
}
#[derive(Clone)]
pub struct CircuitBreaker {
threshold: u32,
recovery_window: Duration,
service: String,
backend: Arc<dyn CircuitBreakerBackend>,
}
impl std::fmt::Debug for CircuitBreaker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CircuitBreaker")
.field("threshold", &self.threshold)
.field("recovery_window", &self.recovery_window)
.field("service", &self.service)
.finish()
}
}
impl CircuitBreaker {
pub fn new(
service: impl Into<String>,
threshold: u32,
recovery_window: Duration,
) -> Result<Self, AgentRuntimeError> {
if threshold == 0 {
return Err(AgentRuntimeError::Orchestration(
"circuit breaker threshold must be >= 1".into(),
));
}
let service = service.into();
Ok(Self {
threshold,
recovery_window,
service,
backend: Arc::new(InMemoryCircuitBreakerBackend::new()),
})
}
pub fn with_backend(mut self, backend: Arc<dyn CircuitBreakerBackend>) -> Self {
self.backend = backend;
self
}
#[tracing::instrument(skip(self, f))]
pub fn call<T, E, F>(&self, f: F) -> Result<T, AgentRuntimeError>
where
F: FnOnce() -> Result<T, E>,
E: std::fmt::Display,
{
let effective_state = match self.backend.get_open_at(&self.service) {
Some(opened_at) => {
if opened_at.elapsed() >= self.recovery_window {
self.backend.clear_open_at(&self.service);
tracing::info!("circuit moved to half-open for {}", self.service);
CircuitState::HalfOpen
} else {
CircuitState::Open { opened_at }
}
}
None => {
let failures = self.backend.get_failures(&self.service);
if failures >= self.threshold {
CircuitState::HalfOpen
} else {
CircuitState::Closed
}
}
};
tracing::debug!("circuit state: {:?}", effective_state);
match effective_state {
CircuitState::Open { .. } => {
return Err(AgentRuntimeError::CircuitOpen {
service: self.service.clone(),
});
}
CircuitState::Closed | CircuitState::HalfOpen => {}
}
match f() {
Ok(val) => {
self.backend.reset_failures(&self.service);
self.backend.clear_open_at(&self.service);
tracing::info!("circuit closed for {}", self.service);
Ok(val)
}
Err(e) => {
let failures = self.backend.increment_failures(&self.service);
if failures >= self.threshold {
let now = Instant::now();
self.backend.set_open_at(&self.service, now);
tracing::info!("circuit opened for {}", self.service);
}
Err(AgentRuntimeError::Orchestration(e.to_string()))
}
}
}
pub fn state(&self) -> Result<CircuitState, AgentRuntimeError> {
let state = match self.backend.get_open_at(&self.service) {
Some(opened_at) => {
if opened_at.elapsed() >= self.recovery_window {
let failures = self.backend.get_failures(&self.service);
if failures >= self.threshold {
CircuitState::HalfOpen
} else {
CircuitState::Closed
}
} else {
CircuitState::Open { opened_at }
}
}
None => {
let failures = self.backend.get_failures(&self.service);
if failures >= self.threshold {
CircuitState::HalfOpen
} else {
CircuitState::Closed
}
}
};
Ok(state)
}
pub fn failure_count(&self) -> Result<u32, AgentRuntimeError> {
Ok(self.backend.get_failures(&self.service))
}
pub fn record_success(&self) {
self.backend.reset_failures(&self.service);
self.backend.clear_open_at(&self.service);
}
pub fn record_failure(&self) {
let failures = self.backend.increment_failures(&self.service);
if failures >= self.threshold {
self.backend.set_open_at(&self.service, Instant::now());
tracing::info!("circuit opened for {} (manual record)", self.service);
}
}
pub fn service_name(&self) -> &str {
&self.service
}
pub fn is_closed(&self) -> bool {
matches!(self.state(), Ok(CircuitState::Closed))
}
pub fn is_open(&self) -> bool {
matches!(self.state(), Ok(CircuitState::Open { .. }))
}
pub fn is_half_open(&self) -> bool {
matches!(self.state(), Ok(CircuitState::HalfOpen))
}
pub fn is_healthy(&self) -> bool {
!self.is_open()
}
pub fn threshold(&self) -> u32 {
self.threshold
}
pub fn failure_headroom(&self) -> u32 {
let failures = self.backend.get_failures(&self.service);
self.threshold.saturating_sub(failures)
}
pub fn failure_rate(&self) -> f64 {
if self.threshold == 0 {
return 0.0;
}
let failures = self.backend.get_failures(&self.service);
failures as f64 / self.threshold as f64
}
pub fn is_at_threshold(&self) -> bool {
let failures = self.backend.get_failures(&self.service);
failures >= self.threshold
}
pub fn failures_until_open(&self) -> u32 {
let failures = self.backend.get_failures(&self.service);
self.threshold.saturating_sub(failures)
}
pub fn recovery_window(&self) -> std::time::Duration {
self.recovery_window
}
pub fn reset(&self) {
self.backend.reset_failures(&self.service);
self.backend.clear_open_at(&self.service);
tracing::info!("circuit manually reset to Closed for {}", self.service);
}
pub fn describe(&self) -> Result<String, AgentRuntimeError> {
let state = self.state()?;
let failures = self.failure_count()?;
Ok(format!(
"service='{}' state={} failures={}/{}",
self.service, state, failures, self.threshold
))
}
#[tracing::instrument(skip(self, backend, f))]
pub async fn async_call<T, E, F, Fut>(
&self,
backend: &dyn AsyncCircuitBreakerBackend,
f: F,
) -> Result<T, AgentRuntimeError>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let effective_state = match backend.get_open_at(&self.service).await {
Some(opened_at) => {
if opened_at.elapsed() >= self.recovery_window {
backend.clear_open_at(&self.service).await;
tracing::info!("circuit async moved to half-open for {}", self.service);
CircuitState::HalfOpen
} else {
CircuitState::Open { opened_at }
}
}
None => {
let failures = backend.get_failures(&self.service).await;
if failures >= self.threshold {
CircuitState::HalfOpen
} else {
CircuitState::Closed
}
}
};
if let CircuitState::Open { .. } = effective_state {
return Err(AgentRuntimeError::CircuitOpen {
service: self.service.clone(),
});
}
match f().await {
Ok(val) => {
backend.reset_failures(&self.service).await;
backend.clear_open_at(&self.service).await;
Ok(val)
}
Err(e) => {
let failures = backend.increment_failures(&self.service).await;
if failures >= self.threshold {
backend
.set_open_at(&self.service, Instant::now())
.await;
tracing::info!("circuit async opened for {}", self.service);
}
Err(AgentRuntimeError::Orchestration(e.to_string()))
}
}
}
}
#[async_trait::async_trait]
pub trait AsyncCircuitBreakerBackend: Send + Sync {
async fn increment_failures(&self, service: &str) -> u32;
async fn reset_failures(&self, service: &str);
async fn get_failures(&self, service: &str) -> u32;
async fn set_open_at(&self, service: &str, at: Instant);
async fn clear_open_at(&self, service: &str);
async fn get_open_at(&self, service: &str) -> Option<Instant>;
}
#[async_trait::async_trait]
impl AsyncCircuitBreakerBackend for InMemoryCircuitBreakerBackend {
async fn increment_failures(&self, service: &str) -> u32 {
<Self as CircuitBreakerBackend>::increment_failures(self, service)
}
async fn reset_failures(&self, service: &str) {
<Self as CircuitBreakerBackend>::reset_failures(self, service);
}
async fn get_failures(&self, service: &str) -> u32 {
<Self as CircuitBreakerBackend>::get_failures(self, service)
}
async fn set_open_at(&self, service: &str, at: Instant) {
<Self as CircuitBreakerBackend>::set_open_at(self, service, at);
}
async fn clear_open_at(&self, service: &str) {
<Self as CircuitBreakerBackend>::clear_open_at(self, service);
}
async fn get_open_at(&self, service: &str) -> Option<Instant> {
<Self as CircuitBreakerBackend>::get_open_at(self, service)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum DeduplicationResult {
New,
Cached(String),
InProgress,
}
#[derive(Debug, Clone)]
pub struct Deduplicator {
ttl: Duration,
max_entries: Option<usize>,
inner: Arc<Mutex<DeduplicatorInner>>,
}
#[derive(Debug)]
struct DeduplicatorInner {
cache: HashMap<String, (String, Instant)>, in_flight: HashMap<String, Instant>, cache_order: std::collections::VecDeque<String>,
call_count: u64,
}
impl Deduplicator {
pub fn new(ttl: Duration) -> Self {
Self {
ttl,
max_entries: None,
inner: Arc::new(Mutex::new(DeduplicatorInner {
cache: HashMap::new(),
in_flight: HashMap::new(),
cache_order: std::collections::VecDeque::new(),
call_count: 0,
})),
}
}
pub fn with_max_entries(mut self, max: usize) -> Result<Self, AgentRuntimeError> {
if max == 0 {
return Err(AgentRuntimeError::Orchestration(
"Deduplicator max_entries must be >= 1".into(),
));
}
self.max_entries = Some(max);
Ok(self)
}
pub fn check_and_register(&self, key: &str) -> Result<DeduplicationResult, AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::check_and_register");
let now = Instant::now();
const EXPIRY_INTERVAL: u64 = 64;
inner.call_count = inner.call_count.wrapping_add(1);
if inner.call_count % EXPIRY_INTERVAL == 0 {
let ttl = self.ttl;
inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
inner
.in_flight
.retain(|_, ts| now.duration_since(*ts) < ttl);
}
match inner.cache.get(key) {
Some((result, ts)) if now.duration_since(*ts) < self.ttl => {
return Ok(DeduplicationResult::Cached(result.clone()));
}
Some(_) => {
inner.cache.remove(key); }
None => {}
}
match inner.in_flight.get(key) {
Some(ts) if now.duration_since(*ts) < self.ttl => {
return Ok(DeduplicationResult::InProgress);
}
Some(_) => {
inner.in_flight.remove(key); }
None => {}
}
inner.in_flight.insert(key.to_owned(), now);
Ok(DeduplicationResult::New)
}
pub fn check(&self, key: &str, ttl: std::time::Duration) -> Result<DeduplicationResult, AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::check");
let now = Instant::now();
const EXPIRY_INTERVAL: u64 = 64;
inner.call_count = inner.call_count.wrapping_add(1);
if inner.call_count % EXPIRY_INTERVAL == 0 {
inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
}
match inner.cache.get(key) {
Some((result, ts)) if now.duration_since(*ts) < ttl => {
return Ok(DeduplicationResult::Cached(result.clone()));
}
Some(_) => {
inner.cache.remove(key);
}
None => {}
}
match inner.in_flight.get(key) {
Some(ts) if now.duration_since(*ts) < ttl => {
return Ok(DeduplicationResult::InProgress);
}
Some(_) => {
inner.in_flight.remove(key);
}
None => {}
}
inner.in_flight.insert(key.to_owned(), now);
Ok(DeduplicationResult::New)
}
pub fn dedup_many(
&self,
requests: &[(&str, std::time::Duration)],
) -> Result<Vec<DeduplicationResult>, AgentRuntimeError> {
if requests.is_empty() {
return Ok(Vec::new());
}
let mut inner = timed_lock(&self.inner, "Deduplicator::dedup_many");
let now = std::time::Instant::now();
let mut results = Vec::with_capacity(requests.len());
for &(key, ttl) in requests {
inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
let result = if let Some((cached_result, _)) = inner.cache.get(key) {
DeduplicationResult::Cached(cached_result.clone())
} else if inner.in_flight.contains_key(key) {
DeduplicationResult::InProgress
} else {
inner.in_flight.insert(key.to_owned(), now);
DeduplicationResult::New
};
results.push(result);
}
Ok(results)
}
pub fn complete(&self, key: &str, result: impl Into<String>) -> Result<(), AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::complete");
inner.in_flight.remove(key);
if let Some(max) = self.max_entries {
while inner.cache.len() >= max {
match inner.cache_order.pop_front() {
Some(oldest_key) => {
inner.cache.remove(&oldest_key);
}
None => break,
}
}
}
let owned_key = key.to_owned();
inner.cache_order.push_back(owned_key.clone());
inner.cache.insert(owned_key, (result.into(), Instant::now()));
Ok(())
}
pub fn fail(&self, key: &str) -> Result<(), AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::fail");
inner.in_flight.remove(key);
Ok(())
}
pub fn in_flight_count(&self) -> Result<usize, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::in_flight_count");
Ok(inner.in_flight.len())
}
pub fn in_flight_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::in_flight_keys");
Ok(inner.in_flight.keys().cloned().collect())
}
pub fn cached_count(&self) -> Result<usize, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::cached_count");
Ok(inner.cache.len())
}
pub fn cached_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::cached_keys");
Ok(inner.cache.keys().cloned().collect())
}
pub fn ttl(&self) -> Duration {
self.ttl
}
pub fn max_entries(&self) -> Option<usize> {
self.max_entries
}
pub fn is_idle(&self) -> Result<bool, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::is_idle");
Ok(inner.in_flight.is_empty())
}
pub fn total_count(&self) -> Result<usize, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::total_count");
Ok(inner.in_flight.len() + inner.cache.len())
}
pub fn contains(&self, key: &str) -> Result<bool, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::contains");
Ok(inner.in_flight.contains_key(key) || inner.cache.contains_key(key))
}
pub fn get_result(&self, key: &str) -> Result<Option<String>, AgentRuntimeError> {
let inner = timed_lock(&self.inner, "Deduplicator::get_result");
let ttl = self.ttl;
let now = std::time::Instant::now();
Ok(inner.cache.get(key).and_then(|(result, inserted_at)| {
if now.duration_since(*inserted_at) <= ttl {
Some(result.clone())
} else {
None
}
}))
}
pub fn clear(&self) -> Result<(), AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::clear");
inner.cache.clear();
inner.in_flight.clear();
inner.cache_order.clear();
Ok(())
}
pub fn purge_expired(&self) -> Result<usize, AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::purge_expired");
let ttl = self.ttl;
let now = std::time::Instant::now();
let before = inner.cache.len();
inner.cache.retain(|_, (_, inserted_at)| {
now.duration_since(*inserted_at) <= ttl
});
let removed = before - inner.cache.len();
if removed > 0 {
let live_keys: std::collections::HashSet<String> =
inner.cache.keys().cloned().collect();
inner.cache_order.retain(|k| live_keys.contains(k));
}
Ok(removed)
}
pub fn evict_oldest(&self) -> Result<bool, AgentRuntimeError> {
let mut inner = timed_lock(&self.inner, "Deduplicator::evict_oldest");
while let Some(key) = inner.cache_order.pop_front() {
if inner.cache.remove(&key).is_some() {
return Ok(true);
}
}
Ok(false)
}
}
#[derive(Debug, Clone)]
pub struct BackpressureGuard {
capacity: usize,
soft_capacity: Option<usize>,
inner: Arc<Mutex<usize>>,
}
impl BackpressureGuard {
pub fn new(capacity: usize) -> Result<Self, AgentRuntimeError> {
if capacity == 0 {
return Err(AgentRuntimeError::Orchestration(
"BackpressureGuard capacity must be > 0".into(),
));
}
Ok(Self {
capacity,
soft_capacity: None,
inner: Arc::new(Mutex::new(0)),
})
}
pub fn with_soft_limit(mut self, soft: usize) -> Result<Self, AgentRuntimeError> {
if soft >= self.capacity {
return Err(AgentRuntimeError::Orchestration(
"soft_capacity must be less than hard capacity".into(),
));
}
self.soft_capacity = Some(soft);
Ok(self)
}
pub fn try_acquire(&self) -> Result<(), AgentRuntimeError> {
let mut depth = timed_lock(&self.inner, "BackpressureGuard::try_acquire");
if *depth >= self.capacity {
return Err(AgentRuntimeError::BackpressureShed {
depth: *depth,
capacity: self.capacity,
});
}
*depth += 1;
if let Some(soft) = self.soft_capacity {
if *depth >= soft {
tracing::warn!(
depth = *depth,
soft_capacity = soft,
hard_capacity = self.capacity,
"backpressure approaching hard limit"
);
}
}
Ok(())
}
pub fn release(&self) -> Result<(), AgentRuntimeError> {
let mut depth = timed_lock(&self.inner, "BackpressureGuard::release");
*depth = depth.saturating_sub(1);
Ok(())
}
pub fn reset(&self) {
let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset");
*depth = 0;
}
pub fn is_full(&self) -> Result<bool, AgentRuntimeError> {
Ok(self.depth()? >= self.capacity)
}
pub fn is_empty(&self) -> Result<bool, AgentRuntimeError> {
Ok(self.depth()? == 0)
}
pub fn available_capacity(&self) -> Result<usize, AgentRuntimeError> {
Ok(self.capacity.saturating_sub(self.depth()?))
}
pub fn hard_capacity(&self) -> usize {
self.capacity
}
pub fn soft_limit(&self) -> Option<usize> {
self.soft_capacity
}
pub fn is_soft_limited(&self) -> bool {
self.soft_capacity.is_some()
}
pub fn depth(&self) -> Result<usize, AgentRuntimeError> {
let depth = timed_lock(&self.inner, "BackpressureGuard::depth");
Ok(*depth)
}
pub fn percent_full(&self) -> Result<f64, AgentRuntimeError> {
let depth = self.depth()?;
Ok((depth as f64 / self.capacity as f64 * 100.0).min(100.0))
}
pub fn soft_depth_ratio(&self) -> f32 {
match self.soft_capacity {
None => 0.0,
Some(soft) => {
let depth = timed_lock(&self.inner, "BackpressureGuard::soft_depth_ratio");
*depth as f32 / soft as f32
}
}
}
pub fn utilization_ratio(&self) -> Result<f32, AgentRuntimeError> {
if self.capacity == 0 {
return Ok(0.0);
}
let depth = self.depth()?;
Ok(depth as f32 / self.capacity as f32)
}
pub fn remaining_capacity(&self) -> Result<usize, AgentRuntimeError> {
let depth = self.depth()?;
Ok(self.capacity.saturating_sub(depth))
}
pub fn reset_depth(&self) -> Result<(), AgentRuntimeError> {
let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset_depth");
*depth = 0;
Ok(())
}
pub fn headroom_ratio(&self) -> Result<f64, AgentRuntimeError> {
Ok(self.available_capacity()? as f64 / self.capacity as f64)
}
pub fn acquired_count(&self) -> Result<usize, AgentRuntimeError> {
Ok(self.capacity - self.available_capacity()?)
}
pub fn over_soft_limit(&self) -> Result<bool, AgentRuntimeError> {
let soft = match self.soft_limit() {
Some(s) => s,
None => return Ok(false),
};
Ok(self.depth()? > soft)
}
}
#[derive(Debug)]
pub struct PipelineResult {
pub output: String,
pub stage_timings: Vec<(String, u64)>,
}
impl PipelineResult {
pub fn total_duration_ms(&self) -> u64 {
self.stage_timings.iter().map(|(_, ms)| ms).sum()
}
pub fn stage_count(&self) -> usize {
self.stage_timings.len()
}
pub fn slowest_stage(&self) -> Option<(&str, u64)> {
self.stage_timings
.iter()
.max_by_key(|(_, ms)| ms)
.map(|(name, ms)| (name.as_str(), *ms))
}
pub fn fastest_stage(&self) -> Option<(&str, u64)> {
self.stage_timings
.iter()
.min_by_key(|(_, ms)| ms)
.map(|(name, ms)| (name.as_str(), *ms))
}
pub fn is_empty(&self) -> bool {
self.stage_timings.is_empty()
}
}
pub struct Stage {
pub name: String,
pub handler: Box<dyn Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync>,
}
impl std::fmt::Debug for Stage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Stage").field("name", &self.name).finish()
}
}
type StageErrorHandler = Box<dyn Fn(&str, &str) -> String + Send + Sync>;
pub struct Pipeline {
stages: Vec<Stage>,
error_handler: Option<StageErrorHandler>,
}
impl std::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages)
.field("has_error_handler", &self.error_handler.is_some())
.finish()
}
}
impl Pipeline {
pub fn new() -> Self {
Self { stages: Vec::new(), error_handler: None }
}
pub fn with_error_handler(
mut self,
handler: impl Fn(&str, &str) -> String + Send + Sync + 'static,
) -> Self {
self.error_handler = Some(Box::new(handler));
self
}
pub fn add_stage(
mut self,
name: impl Into<String>,
handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
) -> Self {
self.stages.push(Stage {
name: name.into(),
handler: Box::new(handler),
});
self
}
pub fn prepend_stage(
mut self,
name: impl Into<String>,
handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
) -> Self {
self.stages.insert(0, Stage {
name: name.into(),
handler: Box::new(handler),
});
self
}
pub fn is_empty(&self) -> bool {
self.stages.is_empty()
}
pub fn has_error_handler(&self) -> bool {
self.error_handler.is_some()
}
pub fn stage_count(&self) -> usize {
self.stages.len()
}
pub fn has_stage(&self, name: &str) -> bool {
self.stages.iter().any(|s| s.name == name)
}
pub fn stage_names(&self) -> Vec<&str> {
self.stages.iter().map(|s| s.name.as_str()).collect()
}
pub fn stage_names_owned(&self) -> Vec<String> {
self.stages.iter().map(|s| s.name.clone()).collect()
}
pub fn get_stage_name_at(&self, index: usize) -> Option<&str> {
self.stages.get(index).map(|s| s.name.as_str())
}
pub fn stage_index(&self, name: &str) -> Option<usize> {
self.stages.iter().position(|s| s.name == name)
}
pub fn first_stage_name(&self) -> Option<&str> {
self.stages.first().map(|s| s.name.as_str())
}
pub fn last_stage_name(&self) -> Option<&str> {
self.stages.last().map(|s| s.name.as_str())
}
pub fn remove_stage(&mut self, name: &str) -> bool {
if let Some(pos) = self.stages.iter().position(|s| s.name == name) {
self.stages.remove(pos);
true
} else {
false
}
}
pub fn rename_stage(&mut self, old_name: &str, new_name: impl Into<String>) -> bool {
if let Some(stage) = self.stages.iter_mut().find(|s| s.name == old_name) {
stage.name = new_name.into();
true
} else {
false
}
}
pub fn clear(&mut self) {
self.stages.clear();
}
pub fn count_stages_matching(&self, keyword: &str) -> usize {
let kw = keyword.to_ascii_lowercase();
self.stages
.iter()
.filter(|s| s.name.to_ascii_lowercase().contains(&kw))
.count()
}
pub fn swap_stages(&mut self, a: &str, b: &str) -> bool {
let idx_a = self.stages.iter().position(|s| s.name == a);
let idx_b = self.stages.iter().position(|s| s.name == b);
match (idx_a, idx_b) {
(Some(i), Some(j)) => {
self.stages.swap(i, j);
true
}
_ => false,
}
}
#[tracing::instrument(skip(self))]
pub fn run(&self, input: String) -> Result<String, AgentRuntimeError> {
let mut current = input;
for stage in &self.stages {
tracing::debug!(stage = %stage.name, "running pipeline stage");
match (stage.handler)(current) {
Ok(out) => current = out,
Err(e) => {
tracing::error!(stage = %stage.name, error = %e, "pipeline stage failed");
if let Some(ref handler) = self.error_handler {
current = handler(&stage.name, &e.to_string());
} else {
return Err(e);
}
}
}
}
Ok(current)
}
pub fn execute_timed(&self, input: String) -> Result<PipelineResult, AgentRuntimeError> {
let mut current = input;
let mut stage_timings = Vec::new();
for stage in &self.stages {
let start = std::time::Instant::now();
tracing::debug!(stage = %stage.name, "running timed pipeline stage");
match (stage.handler)(current) {
Ok(out) => current = out,
Err(e) => {
tracing::error!(stage = %stage.name, error = %e, "timed pipeline stage failed");
if let Some(ref handler) = self.error_handler {
current = handler(&stage.name, &e.to_string());
} else {
return Err(e);
}
}
}
let duration_ms = start.elapsed().as_millis() as u64;
stage_timings.push((stage.name.clone(), duration_ms));
}
Ok(PipelineResult {
output: current,
stage_timings,
})
}
pub fn description(&self) -> String {
if self.stages.is_empty() {
return "Pipeline[empty]".to_owned();
}
let names = self
.stages
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(" → ");
let n = self.stages.len();
let plural = if n == 1 { "stage" } else { "stages" };
format!("Pipeline[{n} {plural}: {names}]")
}
pub fn has_unique_stage_names(&self) -> bool {
let mut seen = std::collections::HashSet::new();
self.stages.iter().all(|s| seen.insert(s.name.as_str()))
}
pub fn stage_names_sorted(&self) -> Vec<&str> {
let mut names: Vec<&str> = self.stages.iter().map(|s| s.name.as_str()).collect();
names.sort_unstable();
names
}
pub fn longest_stage_name(&self) -> Option<&str> {
self.stages
.iter()
.max_by_key(|s| s.name.len())
.map(|s| s.name.as_str())
}
pub fn shortest_stage_name(&self) -> Option<&str> {
self.stages
.iter()
.min_by_key(|s| s.name.len())
.map(|s| s.name.as_str())
}
pub fn stage_name_lengths(&self) -> Vec<usize> {
self.stages.iter().map(|s| s.name.len()).collect()
}
pub fn avg_stage_name_length(&self) -> f64 {
if self.stages.is_empty() {
return 0.0;
}
let total: usize = self.stages.iter().map(|s| s.name.len()).sum();
total as f64 / self.stages.len() as f64
}
pub fn stages_containing(&self, substring: &str) -> Vec<&str> {
self.stages
.iter()
.filter(|s| s.name.contains(substring))
.map(|s| s.name.as_str())
.collect()
}
pub fn stage_is_first(&self, name: &str) -> bool {
self.stages.first().map_or(false, |s| s.name == name)
}
pub fn stage_is_last(&self, name: &str) -> bool {
self.stages.last().map_or(false, |s| s.name == name)
}
pub fn total_stage_name_bytes(&self) -> usize {
self.stages.iter().map(|s| s.name.len()).sum()
}
pub fn stages_before(&self, name: &str) -> Vec<&str> {
let pos = self.stages.iter().position(|s| s.name == name);
match pos {
None | Some(0) => Vec::new(),
Some(idx) => self.stages[..idx].iter().map(|s| s.name.as_str()).collect(),
}
}
pub fn stages_after(&self, name: &str) -> Vec<&str> {
let pos = self.stages.iter().position(|s| s.name == name);
match pos {
None => Vec::new(),
Some(idx) if idx + 1 >= self.stages.len() => Vec::new(),
Some(idx) => self.stages[idx + 1..].iter().map(|s| s.name.as_str()).collect(),
}
}
pub fn stage_pairs(&self) -> Vec<(&str, &str)> {
self.stages
.windows(2)
.map(|w| (w[0].name.as_str(), w[1].name.as_str()))
.collect()
}
pub fn stage_count_above_name_len(&self, min_len: usize) -> usize {
self.stages.iter().filter(|s| s.name.len() > min_len).count()
}
pub fn stage_count_below_name_len(&self, max_len: usize) -> usize {
self.stages.iter().filter(|s| s.name.len() < max_len).count()
}
pub fn stage_at(&self, idx: usize) -> Option<&str> {
self.stages.get(idx).map(|s| s.name.as_str())
}
pub fn stages_reversed(&self) -> Vec<&str> {
self.stages.iter().rev().map(|s| s.name.as_str()).collect()
}
pub fn pipeline_is_empty(&self) -> bool {
self.stages.is_empty()
}
pub fn unique_stage_names(&self) -> Vec<&str> {
let mut names: Vec<&str> = self.stages.iter().map(|s| s.name.as_str()).collect();
names.sort_unstable();
names
}
pub fn stage_names_with_prefix<'a>(&'a self, prefix: &str) -> Vec<&'a str> {
self.stages
.iter()
.filter(|s| s.name.starts_with(prefix))
.map(|s| s.name.as_str())
.collect()
}
pub fn contains_stage_with_prefix(&self, prefix: &str) -> bool {
self.stages.iter().any(|s| s.name.starts_with(prefix))
}
pub fn stages_with_suffix<'a>(&'a self, suffix: &str) -> Vec<&'a str> {
self.stages
.iter()
.filter(|s| s.name.ends_with(suffix))
.map(|s| s.name.as_str())
.collect()
}
pub fn has_stage_with_name_containing(&self, substr: &str) -> bool {
self.stages.iter().any(|s| s.name.contains(substr))
}
pub fn stage_names_containing<'a>(&'a self, substr: &str) -> Vec<&'a str> {
self.stages
.iter()
.filter(|s| s.name.contains(substr))
.map(|s| s.name.as_str())
.collect()
}
pub fn stage_name_bytes_total(&self) -> usize {
self.stages.iter().map(|s| s.name.len()).sum()
}
pub fn stage_count_above_name_bytes(&self, min_bytes: usize) -> usize {
self.stages.iter().filter(|s| s.name.len() > min_bytes).count()
}
pub fn stage_at_index(&self, index: usize) -> Option<&Stage> {
self.stages.get(index)
}
pub fn stage_position_from_end(&self, name: &str) -> Option<usize> {
let pos = self.stages.iter().position(|s| s.name == name)?;
Some(self.stages.len() - 1 - pos)
}
pub fn contains_all_stages(&self, names: &[&str]) -> bool {
names.iter().all(|&n| self.stages.iter().any(|s| s.name == n))
}
pub fn stage_name_from_end(&self, n: usize) -> Option<&str> {
let len = self.stages.len();
if n >= len {
return None;
}
Some(self.stages[len - 1 - n].name.as_str())
}
pub fn all_stage_names(&self) -> Vec<String> {
self.stages.iter().map(|s| s.name.clone()).collect()
}
pub fn has_exactly_n_stages(&self, n: usize) -> bool {
self.stages.len() == n
}
pub fn stage_index_of(&self, name: &str) -> Option<usize> {
self.stages.iter().position(|s| s.name == name)
}
pub fn has_no_stages(&self) -> bool {
self.stages.is_empty()
}
pub fn longest_stage_name_len(&self) -> usize {
self.stages.iter().map(|s| s.name.len()).max().unwrap_or(0)
}
pub fn stage_names_joined(&self, sep: &str) -> String {
self.stages
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(sep)
}
pub fn stage_count_with_name_containing(&self, substr: &str) -> usize {
self.stages.iter().filter(|s| s.name.contains(substr)).count()
}
pub fn has_stage_at_index(&self, idx: usize) -> bool {
idx < self.stages.len()
}
pub fn all_stage_names_start_with(&self, prefix: &str) -> bool {
self.stages.iter().all(|s| s.name.starts_with(prefix))
}
pub fn any_stage_has_name(&self, name: &str) -> bool {
self.stages.iter().any(|s| s.name == name)
}
pub fn stage_name_at(&self, idx: usize) -> Option<&str> {
self.stages.get(idx).map(|s| s.name.as_str())
}
pub fn all_stage_names_contain(&self, substr: &str) -> bool {
self.stages.iter().all(|s| s.name.contains(substr))
}
}
impl Default for Pipeline {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_policy_rejects_zero_attempts() {
assert!(RetryPolicy::exponential(0, 100).is_err());
}
#[test]
fn test_retry_policy_delay_attempt_1_equals_base() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert_eq!(p.delay_for(1), Duration::from_millis(100));
}
#[test]
fn test_retry_policy_delay_doubles_each_attempt() {
let p = RetryPolicy::exponential(5, 100).unwrap();
assert_eq!(p.delay_for(2), Duration::from_millis(200));
assert_eq!(p.delay_for(3), Duration::from_millis(400));
assert_eq!(p.delay_for(4), Duration::from_millis(800));
}
#[test]
fn test_retry_policy_delay_capped_at_max() {
let p = RetryPolicy::exponential(10, 10_000).unwrap();
assert_eq!(p.delay_for(10), MAX_RETRY_DELAY);
}
#[test]
fn test_retry_policy_delay_never_exceeds_max_for_any_attempt() {
let p = RetryPolicy::exponential(10, 1000).unwrap();
for attempt in 1..=10 {
assert!(p.delay_for(attempt) <= MAX_RETRY_DELAY);
}
}
#[test]
fn test_retry_policy_first_delay_ms_equals_base_delay() {
let p = RetryPolicy::exponential(3, 200).unwrap();
assert_eq!(p.first_delay_ms(), p.base_delay_ms());
}
#[test]
fn test_retry_policy_first_delay_ms_constant_policy() {
let p = RetryPolicy::constant(4, 150).unwrap();
assert_eq!(p.first_delay_ms(), 150);
}
#[test]
fn test_circuit_breaker_rejects_zero_threshold() {
assert!(CircuitBreaker::new("svc", 0, Duration::from_secs(1)).is_err());
}
#[test]
fn test_circuit_breaker_starts_closed() {
let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
assert_eq!(cb.state().unwrap(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_success_keeps_closed() {
let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(42));
assert!(result.is_ok());
assert_eq!(cb.state().unwrap(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_opens_after_threshold_failures() {
let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
for _ in 0..3 {
let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("oops".to_string()));
}
assert!(matches!(cb.state().unwrap(), CircuitState::Open { .. }));
}
#[test]
fn test_circuit_breaker_open_fast_fails() {
let cb = CircuitBreaker::new("svc", 1, Duration::from_secs(3600)).unwrap();
let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
let result: Result<(), AgentRuntimeError> = cb.call(|| Ok::<(), AgentRuntimeError>(()));
assert!(matches!(result, Err(AgentRuntimeError::CircuitOpen { .. })));
}
#[test]
fn test_circuit_breaker_success_resets_failure_count() {
let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap();
let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
let _: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(1));
assert_eq!(cb.failure_count().unwrap(), 0);
}
#[test]
fn test_circuit_breaker_half_open_on_recovery() {
let cb = CircuitBreaker::new("svc", 1, Duration::ZERO).unwrap();
let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(99));
assert_eq!(result.unwrap_or(0), 99);
assert_eq!(cb.state().unwrap(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_with_custom_backend_uses_backend_state() {
let shared_backend: Arc<dyn CircuitBreakerBackend> =
Arc::new(InMemoryCircuitBreakerBackend::new());
let cb1 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
.unwrap()
.with_backend(Arc::clone(&shared_backend));
let cb2 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
.unwrap()
.with_backend(Arc::clone(&shared_backend));
let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail".to_string()));
assert_eq!(cb2.failure_count().unwrap(), 1);
let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail again".to_string()));
assert!(matches!(cb2.state().unwrap(), CircuitState::Open { .. }));
}
#[test]
fn test_in_memory_backend_increments_and_resets() {
use super::CircuitBreakerBackend as CB;
let backend = InMemoryCircuitBreakerBackend::new();
assert_eq!(CB::get_failures(&backend, "svc"), 0);
let count = CB::increment_failures(&backend, "svc");
assert_eq!(count, 1);
let count = CB::increment_failures(&backend, "svc");
assert_eq!(count, 2);
CB::reset_failures(&backend, "svc");
assert_eq!(CB::get_failures(&backend, "svc"), 0);
assert!(CB::get_open_at(&backend, "svc").is_none());
let now = Instant::now();
CB::set_open_at(&backend, "svc", now);
assert!(CB::get_open_at(&backend, "svc").is_some());
CB::clear_open_at(&backend, "svc");
assert!(CB::get_open_at(&backend, "svc").is_none());
}
#[test]
fn test_deduplicator_new_key_is_new() {
let d = Deduplicator::new(Duration::from_secs(60));
let r = d.check_and_register("key-1").unwrap();
assert_eq!(r, DeduplicationResult::New);
}
#[test]
fn test_deduplicator_second_check_is_in_progress() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("key-1").unwrap();
let r = d.check_and_register("key-1").unwrap();
assert_eq!(r, DeduplicationResult::InProgress);
}
#[test]
fn test_deduplicator_complete_makes_cached() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("key-1").unwrap();
d.complete("key-1", "result-value").unwrap();
let r = d.check_and_register("key-1").unwrap();
assert_eq!(r, DeduplicationResult::Cached("result-value".into()));
}
#[test]
fn test_deduplicator_different_keys_are_independent() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("key-a").unwrap();
let r = d.check_and_register("key-b").unwrap();
assert_eq!(r, DeduplicationResult::New);
}
#[test]
fn test_deduplicator_expired_entry_is_new() {
let d = Deduplicator::new(Duration::ZERO); d.check_and_register("key-1").unwrap();
d.complete("key-1", "old").unwrap();
let r = d.check_and_register("key-1").unwrap();
assert_eq!(r, DeduplicationResult::New);
}
#[test]
fn test_backpressure_guard_rejects_zero_capacity() {
assert!(BackpressureGuard::new(0).is_err());
}
#[test]
fn test_backpressure_guard_acquire_within_capacity() {
let g = BackpressureGuard::new(5).unwrap();
assert!(g.try_acquire().is_ok());
assert_eq!(g.depth().unwrap(), 1);
}
#[test]
fn test_backpressure_guard_sheds_when_full() {
let g = BackpressureGuard::new(2).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
let result = g.try_acquire();
assert!(matches!(
result,
Err(AgentRuntimeError::BackpressureShed { .. })
));
}
#[test]
fn test_backpressure_guard_release_decrements_depth() {
let g = BackpressureGuard::new(3).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
g.release().unwrap();
assert_eq!(g.depth().unwrap(), 1);
}
#[test]
fn test_backpressure_guard_release_on_empty_is_noop() {
let g = BackpressureGuard::new(3).unwrap();
g.release().unwrap(); assert_eq!(g.depth().unwrap(), 0);
}
#[test]
fn test_pipeline_runs_stages_in_order() {
let p = Pipeline::new()
.add_stage("upper", |s| Ok(s.to_uppercase()))
.add_stage("append", |s| Ok(format!("{s}!")));
let result = p.run("hello".into()).unwrap();
assert_eq!(result, "HELLO!");
}
#[test]
fn test_pipeline_empty_pipeline_returns_input() {
let p = Pipeline::new();
assert_eq!(p.run("test".into()).unwrap(), "test");
}
#[test]
fn test_pipeline_stage_failure_short_circuits() {
let p = Pipeline::new()
.add_stage("fail", |_| {
Err(AgentRuntimeError::Orchestration("boom".into()))
})
.add_stage("never", |s| Ok(s));
assert!(p.run("input".into()).is_err());
}
#[test]
fn test_pipeline_stage_count() {
let p = Pipeline::new()
.add_stage("s1", |s| Ok(s))
.add_stage("s2", |s| Ok(s));
assert_eq!(p.stage_count(), 2);
}
#[test]
fn test_pipeline_execute_timed_captures_stage_durations() {
let p = Pipeline::new()
.add_stage("s1", |s| Ok(format!("{s}1")))
.add_stage("s2", |s| Ok(format!("{s}2")));
let result = p.execute_timed("x".to_string()).unwrap();
assert_eq!(result.output, "x12");
assert_eq!(result.stage_timings.len(), 2);
assert_eq!(result.stage_timings[0].0, "s1");
assert_eq!(result.stage_timings[1].0, "s2");
}
#[test]
fn test_backpressure_soft_limit_rejects_invalid_config() {
let g = BackpressureGuard::new(5).unwrap();
assert!(g.with_soft_limit(5).is_err());
let g = BackpressureGuard::new(5).unwrap();
assert!(g.with_soft_limit(6).is_err());
}
#[test]
fn test_backpressure_soft_limit_accepts_requests_below_soft() {
let g = BackpressureGuard::new(5)
.unwrap()
.with_soft_limit(2)
.unwrap();
assert!(g.try_acquire().is_ok());
assert!(g.try_acquire().is_ok());
assert_eq!(g.depth().unwrap(), 2);
}
#[test]
fn test_backpressure_with_soft_limit_still_sheds_at_hard_capacity() {
let g = BackpressureGuard::new(3)
.unwrap()
.with_soft_limit(2)
.unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap(); let result = g.try_acquire();
assert!(matches!(
result,
Err(AgentRuntimeError::BackpressureShed { .. })
));
}
#[test]
fn test_backpressure_hard_capacity_matches_new() {
let g = BackpressureGuard::new(7).unwrap();
assert_eq!(g.hard_capacity(), 7);
}
#[test]
fn test_pipeline_error_handler_recovers_from_stage_failure() {
let p = Pipeline::new()
.add_stage("fail_stage", |_| {
Err(AgentRuntimeError::Orchestration("oops".into()))
})
.add_stage("append", |s| Ok(format!("{s}-recovered")))
.with_error_handler(|stage_name, _err| format!("recovered_from_{stage_name}"));
let result = p.run("input".to_string()).unwrap();
assert_eq!(result, "recovered_from_fail_stage-recovered");
}
#[test]
fn test_circuit_state_eq() {
assert_eq!(CircuitState::Closed, CircuitState::Closed);
assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
assert_eq!(
CircuitState::Open { opened_at: std::time::Instant::now() },
CircuitState::Open { opened_at: std::time::Instant::now() }
);
assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
assert_ne!(CircuitState::Closed, CircuitState::Open { opened_at: std::time::Instant::now() });
}
#[test]
fn test_dedup_many_independent_keys() {
let d = Deduplicator::new(Duration::from_secs(60));
let ttl = Duration::from_secs(60);
let results = d.dedup_many(&[("key-a", ttl), ("key-b", ttl), ("key-c", ttl)]).unwrap();
assert_eq!(results.len(), 3);
assert!(results.iter().all(|r| matches!(r, DeduplicationResult::New)));
}
#[test]
fn test_concurrent_circuit_breaker_opens_under_concurrent_failures() {
use std::sync::Arc;
use std::thread;
let cb = Arc::new(
CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap(),
);
let n_threads = 8;
let failures_per_thread = 2;
let mut handles = Vec::new();
for _ in 0..n_threads {
let cb = Arc::clone(&cb);
handles.push(thread::spawn(move || {
for _ in 0..failures_per_thread {
let _ = cb.call(|| Err::<(), &str>("fail"));
}
}));
}
for h in handles {
h.join().unwrap();
}
let state = cb.state().unwrap();
assert!(
matches!(state, CircuitState::Open { .. }),
"circuit should be open after many concurrent failures; got: {state:?}"
);
}
#[test]
fn test_per_service_tracking_is_independent() {
let backend = Arc::new(InMemoryCircuitBreakerBackend::new());
let cb_a = CircuitBreaker::new("service-a", 3, Duration::from_secs(60))
.unwrap()
.with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
let cb_b = CircuitBreaker::new("service-b", 3, Duration::from_secs(60))
.unwrap()
.with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
for _ in 0..3 {
let _ = cb_a.call(|| Err::<(), &str>("fail"));
}
let state_b = cb_b.state().unwrap();
assert_eq!(
state_b,
CircuitState::Closed,
"service-b should be unaffected by service-a failures"
);
let state_a = cb_a.state().unwrap();
assert!(
matches!(state_a, CircuitState::Open { .. }),
"service-a should be open"
);
}
#[test]
fn test_backpressure_concurrent_acquires_are_consistent() {
use std::sync::Arc;
use std::thread;
let g = Arc::new(BackpressureGuard::new(100).unwrap());
let mut handles = Vec::new();
for _ in 0..10 {
let g_clone = Arc::clone(&g);
handles.push(thread::spawn(move || {
g_clone.try_acquire().ok();
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(g.depth().unwrap(), 10);
}
#[test]
fn test_retry_policy_constant_has_fixed_delay() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.delay_for(1), Duration::from_millis(100));
assert_eq!(p.delay_for(2), Duration::from_millis(100));
assert_eq!(p.delay_for(10), Duration::from_millis(100));
}
#[test]
fn test_retry_policy_exponential_doubles() {
let p = RetryPolicy::exponential(5, 10).unwrap();
assert_eq!(p.delay_for(1), Duration::from_millis(10));
assert_eq!(p.delay_for(2), Duration::from_millis(20));
assert_eq!(p.delay_for(3), Duration::from_millis(40));
}
#[test]
fn test_retry_policy_with_max_attempts() {
let p = RetryPolicy::constant(3, 50).unwrap();
let p2 = p.with_max_attempts(7).unwrap();
assert_eq!(p2.max_attempts, 7);
assert!(RetryPolicy::constant(1, 50).unwrap().with_max_attempts(0).is_err());
}
#[test]
fn test_circuit_breaker_reset_returns_to_closed() {
let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure(); assert_ne!(cb.state().unwrap(), CircuitState::Closed);
cb.reset();
assert_eq!(cb.state().unwrap(), CircuitState::Closed);
assert_eq!(cb.failure_count().unwrap(), 0);
}
#[test]
fn test_deduplicator_clear_resets_all_state() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("k1").unwrap();
d.check_and_register("k2").unwrap();
d.complete("k1", "r1").unwrap();
assert_eq!(d.in_flight_count().unwrap(), 1);
assert_eq!(d.cached_count().unwrap(), 1);
d.clear().unwrap();
assert_eq!(d.in_flight_count().unwrap(), 0);
assert_eq!(d.cached_count().unwrap(), 0);
}
#[test]
fn test_deduplicator_purge_expired_removes_stale() {
let d = Deduplicator::new(Duration::from_millis(1));
d.check_and_register("x").unwrap();
d.complete("x", "result").unwrap();
std::thread::sleep(Duration::from_millis(5));
let removed = d.purge_expired().unwrap();
assert_eq!(removed, 1);
assert_eq!(d.cached_count().unwrap(), 0);
}
#[test]
fn test_backpressure_utilization_ratio() {
let g = BackpressureGuard::new(4).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
let ratio = g.utilization_ratio().unwrap();
assert!((ratio - 0.5).abs() < 1e-5);
}
#[test]
fn test_pipeline_stage_count_and_names() {
let p = Pipeline::new()
.add_stage("first", |s| Ok(s + "1"))
.add_stage("second", |s| Ok(s + "2"));
assert_eq!(p.stage_count(), 2);
assert_eq!(p.stage_names(), vec!["first", "second"]);
}
#[test]
fn test_pipeline_is_empty_true_for_new() {
let p = Pipeline::new();
assert!(p.is_empty());
}
#[test]
fn test_pipeline_is_empty_false_after_add_stage() {
let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
assert!(!p.is_empty());
}
#[test]
fn test_circuit_breaker_service_name() {
let cb = CircuitBreaker::new("my-service", 3, Duration::from_secs(1)).unwrap();
assert_eq!(cb.service_name(), "my-service");
}
#[test]
fn test_retry_policy_none_has_max_one_attempt() {
let p = RetryPolicy::none();
assert_eq!(p.max_attempts, 1);
assert_eq!(p.delay_for(0), Duration::ZERO);
}
#[test]
fn test_backpressure_is_full_false_when_empty() {
let g = BackpressureGuard::new(5).unwrap();
assert!(!g.is_full().unwrap());
}
#[test]
fn test_backpressure_is_full_true_when_at_capacity() {
let g = BackpressureGuard::new(2).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
assert!(g.is_full().unwrap());
}
#[test]
fn test_deduplicator_ttl_returns_configured_value() {
let d = Deduplicator::new(Duration::from_secs(42));
assert_eq!(d.ttl(), Duration::from_secs(42));
}
#[test]
fn test_circuit_breaker_is_closed_initially() {
let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(1)).unwrap();
assert!(cb.is_closed());
assert!(!cb.is_open());
assert!(!cb.is_half_open());
}
#[test]
fn test_circuit_breaker_is_open_after_threshold_failures() {
let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure();
assert!(cb.is_open());
assert!(!cb.is_closed());
}
#[test]
fn test_retry_policy_total_max_delay_constant() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.total_max_delay_ms(), 300);
}
#[test]
fn test_retry_policy_total_max_delay_none_is_zero() {
let p = RetryPolicy::none();
assert_eq!(p.total_max_delay_ms(), 0);
}
#[test]
fn test_retry_policy_is_none_true_for_none() {
let p = RetryPolicy::none();
assert!(p.is_none());
}
#[test]
fn test_retry_policy_is_none_false_for_exponential() {
let p = RetryPolicy::exponential(3, 10).unwrap();
assert!(!p.is_none());
}
#[test]
fn test_pipeline_has_error_handler_false_by_default() {
let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
assert!(!p.has_error_handler());
}
#[test]
fn test_pipeline_has_error_handler_true_after_set() {
let p = Pipeline::new()
.with_error_handler(|_stage, _err| "recovered".to_string());
assert!(p.has_error_handler());
}
#[test]
fn test_backpressure_reset_clears_depth() {
let g = BackpressureGuard::new(5).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
assert_eq!(g.depth().unwrap(), 2);
g.reset();
assert_eq!(g.depth().unwrap(), 0);
}
#[test]
fn test_deduplicator_in_flight_keys_returns_started_keys() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check("key-a", Duration::from_secs(60)).unwrap();
d.check("key-b", Duration::from_secs(60)).unwrap();
let mut keys = d.in_flight_keys().unwrap();
keys.sort();
assert_eq!(keys, vec!["key-a", "key-b"]);
}
#[test]
fn test_retry_policy_with_base_delay_ms_changes_delay() {
let p = RetryPolicy::exponential(3, 100)
.unwrap()
.with_base_delay_ms(200)
.unwrap();
assert_eq!(p.delay_for(1), Duration::from_millis(200));
}
#[test]
fn test_retry_policy_with_base_delay_ms_rejects_zero() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(p.with_base_delay_ms(0).is_err());
}
#[test]
fn test_backpressure_reset_depth_clears_counter() {
let guard = BackpressureGuard::new(5).unwrap();
guard.try_acquire().unwrap();
guard.try_acquire().unwrap();
assert_eq!(guard.depth().unwrap(), 2);
guard.reset_depth().unwrap();
assert_eq!(guard.depth().unwrap(), 0);
}
#[test]
fn test_pipeline_remove_stage_returns_true_if_found() {
let mut p = Pipeline::new()
.add_stage("a", |s| Ok(s))
.add_stage("b", |s| Ok(s));
assert!(p.remove_stage("a"));
assert_eq!(p.stage_count(), 1);
assert_eq!(p.stage_names(), vec!["b"]);
}
#[test]
fn test_pipeline_remove_stage_returns_false_if_missing() {
let mut p = Pipeline::new().add_stage("x", |s| Ok(s));
assert!(!p.remove_stage("nope"));
assert_eq!(p.stage_count(), 1);
}
#[test]
fn test_pipeline_clear_removes_all_stages() {
let mut p = Pipeline::new()
.add_stage("a", |s| Ok(s))
.add_stage("b", |s| Ok(s));
p.clear();
assert!(p.is_empty());
}
#[test]
fn test_circuit_breaker_threshold_accessor() {
let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(30)).unwrap();
assert_eq!(cb.threshold(), 5);
}
#[test]
fn test_circuit_breaker_recovery_window_accessor() {
let window = Duration::from_secs(45);
let cb = CircuitBreaker::new("svc", 3, window).unwrap();
assert_eq!(cb.recovery_window(), window);
}
#[test]
fn test_pipeline_get_stage_name_at_returns_correct_names() {
let p = Pipeline::new()
.add_stage("first", |s| Ok(s))
.add_stage("second", |s| Ok(s));
assert_eq!(p.get_stage_name_at(0), Some("first"));
assert_eq!(p.get_stage_name_at(1), Some("second"));
assert_eq!(p.get_stage_name_at(2), None);
}
#[test]
fn test_retry_policy_can_retry_within_budget() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(p.can_retry(0));
assert!(p.can_retry(1));
assert!(p.can_retry(2));
}
#[test]
fn test_retry_policy_can_retry_false_when_exhausted() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(!p.can_retry(3));
assert!(!p.can_retry(99));
}
#[test]
fn test_retry_policy_none_only_allows_first_attempt() {
let p = RetryPolicy::none();
assert!(p.can_retry(0));
assert!(!p.can_retry(1));
}
#[test]
fn test_retry_policy_max_attempts_accessor() {
let p = RetryPolicy::exponential(7, 100).unwrap();
assert_eq!(p.max_attempts(), 7);
}
#[test]
fn test_pipeline_stage_names_owned_returns_strings() {
let p = Pipeline::new()
.add_stage("alpha", |s| Ok(s))
.add_stage("beta", |s| Ok(s));
let owned = p.stage_names_owned();
assert_eq!(owned, vec!["alpha".to_string(), "beta".to_string()]);
}
#[test]
fn test_pipeline_stage_names_owned_empty_when_no_stages() {
let p = Pipeline::new();
assert!(p.stage_names_owned().is_empty());
}
#[test]
fn test_attempts_remaining_full_at_zero() {
let p = RetryPolicy::exponential(4, 100).unwrap();
assert_eq!(p.attempts_remaining(0), 4);
}
#[test]
fn test_attempts_remaining_decrements_correctly() {
let p = RetryPolicy::exponential(4, 100).unwrap();
assert_eq!(p.attempts_remaining(2), 2);
assert_eq!(p.attempts_remaining(4), 0);
}
#[test]
fn test_attempts_remaining_zero_when_exhausted() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert_eq!(p.attempts_remaining(10), 0);
}
#[test]
fn test_retry_policy_max_attempts_getter() {
let p = RetryPolicy::exponential(7, 50).unwrap();
assert_eq!(p.max_attempts(), 7);
}
#[test]
fn test_circuit_breaker_failure_count_increments() {
let cb = CircuitBreaker::new("svc2", 3, std::time::Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure();
assert_eq!(cb.failure_count().unwrap(), 2);
}
#[test]
fn test_circuit_breaker_record_success_resets_failures() {
let cb = CircuitBreaker::new("svc3", 5, std::time::Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure();
cb.record_success();
assert_eq!(cb.failure_count().unwrap(), 0);
assert!(cb.is_closed());
}
#[test]
fn test_circuit_breaker_threshold_and_recovery_window() {
let cb = CircuitBreaker::new("svc4", 3, std::time::Duration::from_secs(30)).unwrap();
assert_eq!(cb.threshold(), 3);
assert_eq!(cb.recovery_window(), std::time::Duration::from_secs(30));
}
#[test]
fn test_circuit_breaker_reset_clears_state() {
let cb = CircuitBreaker::new("svc5", 2, std::time::Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure(); assert!(cb.is_open());
cb.reset();
assert!(cb.is_closed());
assert_eq!(cb.failure_count().unwrap(), 0);
}
#[test]
fn test_deduplicator_cached_count_after_complete() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check("key1", Duration::from_secs(60)).unwrap();
d.complete("key1", "result").unwrap();
assert_eq!(d.cached_count().unwrap(), 1);
}
#[test]
fn test_deduplicator_ttl_matches_configured() {
let d = Deduplicator::new(Duration::from_secs(42));
assert_eq!(d.ttl(), Duration::from_secs(42));
}
#[test]
fn test_deduplicator_purge_expired_removes_stale_entries() {
let d = Deduplicator::new(Duration::ZERO); d.check("stale", Duration::ZERO).unwrap();
d.complete("stale", "val").unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
let removed = d.purge_expired().unwrap();
assert!(removed >= 1);
}
#[test]
fn test_backpressure_remaining_capacity() {
let g = BackpressureGuard::new(5).unwrap();
g.try_acquire().unwrap();
assert_eq!(g.remaining_capacity().unwrap(), 4);
}
#[test]
fn test_backpressure_soft_depth_ratio_without_soft_limit() {
let g = BackpressureGuard::new(5).unwrap();
assert_eq!(g.soft_depth_ratio(), 0.0);
}
#[test]
fn test_backpressure_soft_depth_ratio_with_soft_limit() {
let g = BackpressureGuard::new(10).unwrap()
.with_soft_limit(4).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
let ratio = g.soft_depth_ratio();
assert!((ratio - 0.5).abs() < 1e-6);
}
#[test]
fn test_retry_delay_ms_for_matches_delay_for() {
let p = RetryPolicy::exponential(5, 100).unwrap();
assert_eq!(p.delay_ms_for(1), p.delay_for(1).as_millis() as u64);
assert_eq!(p.delay_ms_for(3), p.delay_for(3).as_millis() as u64);
}
#[test]
fn test_backpressure_soft_limit_returns_configured_value() {
let g = BackpressureGuard::new(10).unwrap()
.with_soft_limit(5).unwrap();
assert_eq!(g.soft_limit(), Some(5));
}
#[test]
fn test_backpressure_soft_limit_none_when_not_set() {
let g = BackpressureGuard::new(10).unwrap();
assert_eq!(g.soft_limit(), None);
}
#[test]
fn test_pipeline_has_stage_returns_true_when_present() {
let p = Pipeline::new().add_stage("step1", |s| Ok(s));
assert!(p.has_stage("step1"));
assert!(!p.has_stage("step2"));
}
#[test]
fn test_pipeline_has_stage_false_for_empty_pipeline() {
let p = Pipeline::new();
assert!(!p.has_stage("anything"));
}
#[test]
fn test_deduplicator_max_entries_none_by_default() {
let d = Deduplicator::new(Duration::from_secs(60));
assert_eq!(d.max_entries(), None);
}
#[test]
fn test_deduplicator_max_entries_set_via_builder() {
let d = Deduplicator::new(Duration::from_secs(60))
.with_max_entries(50)
.unwrap();
assert_eq!(d.max_entries(), Some(50));
}
#[test]
fn test_retry_policy_delay_for_exponential_grows() {
let p = RetryPolicy::exponential(5, 100).unwrap();
let d1 = p.delay_for(1);
let d2 = p.delay_for(2);
assert!(d2 > d1, "exponential delay should grow: attempt 2 > attempt 1");
}
#[test]
fn test_retry_policy_delay_for_constant_stays_same() {
let p = RetryPolicy::constant(5, 200).unwrap();
assert_eq!(p.delay_for(0), p.delay_for(1));
assert_eq!(p.delay_for(1), p.delay_for(3));
}
#[test]
fn test_is_no_retry_true_for_none_policy() {
let p = RetryPolicy::none();
assert!(p.is_no_retry());
}
#[test]
fn test_is_no_retry_false_for_exponential_policy() {
let p = RetryPolicy::exponential(3, 50).unwrap();
assert!(!p.is_no_retry());
}
#[test]
fn test_is_no_retry_false_for_constant_policy_with_multiple_attempts() {
let p = RetryPolicy::constant(2, 100).unwrap();
assert!(!p.is_no_retry());
}
#[test]
fn test_is_exponential_true_for_exponential_policy() {
let p = RetryPolicy::exponential(3, 50).unwrap();
assert!(p.is_exponential());
}
#[test]
fn test_is_exponential_false_for_constant_policy() {
let p = RetryPolicy::constant(3, 50).unwrap();
assert!(!p.is_exponential());
}
#[test]
fn test_is_exponential_false_for_none_policy() {
let p = RetryPolicy::none();
assert!(!p.is_exponential());
}
#[test]
fn test_is_soft_limited_false_without_soft_limit() {
let g = BackpressureGuard::new(10).unwrap();
assert!(!g.is_soft_limited());
}
#[test]
fn test_is_soft_limited_true_when_soft_limit_set() {
let g = BackpressureGuard::new(10)
.unwrap()
.with_soft_limit(5)
.unwrap();
assert!(g.is_soft_limited());
}
#[test]
fn test_retry_policy_base_delay_ms_exponential() {
let p = RetryPolicy::exponential(3, 250).unwrap();
assert_eq!(p.base_delay_ms(), 250);
}
#[test]
fn test_retry_policy_base_delay_ms_constant() {
let p = RetryPolicy::constant(5, 100).unwrap();
assert_eq!(p.base_delay_ms(), 100);
}
#[test]
fn test_retry_policy_base_delay_ms_none_is_zero() {
let p = RetryPolicy::none();
assert_eq!(p.base_delay_ms(), 0);
}
#[test]
fn test_backpressure_percent_full_zero_when_empty() {
let g = BackpressureGuard::new(100).unwrap();
let pct = g.percent_full().unwrap();
assert!((pct - 0.0).abs() < 1e-9);
}
#[test]
fn test_backpressure_percent_full_capped_at_100() {
let g = BackpressureGuard::new(10).unwrap();
for _ in 0..10 {
g.try_acquire().unwrap();
}
let pct = g.percent_full().unwrap();
assert!((pct - 100.0).abs() < 1e-9);
}
#[test]
fn test_deduplicator_get_result_returns_cached_value() {
let d = Deduplicator::new(std::time::Duration::from_secs(60));
d.check_and_register("req-1").unwrap();
d.complete("req-1", "the answer").unwrap();
let result = d.get_result("req-1").unwrap();
assert_eq!(result, Some("the answer".to_string()));
}
#[test]
fn test_deduplicator_get_result_missing_key_returns_none() {
let d = Deduplicator::new(std::time::Duration::from_secs(60));
assert_eq!(d.get_result("ghost").unwrap(), None);
}
#[test]
fn test_pipeline_rename_stage_succeeds() {
let mut p = Pipeline::new().add_stage("old-name", |s: String| Ok(s));
let renamed = p.rename_stage("old-name", "new-name");
assert!(renamed);
assert!(p.has_stage("new-name"));
assert!(!p.has_stage("old-name"));
}
#[test]
fn test_pipeline_rename_stage_missing_returns_false() {
let mut p = Pipeline::new();
assert!(!p.rename_stage("nonexistent", "anything"));
}
#[test]
fn test_circuit_breaker_failure_rate_zero_initially() {
let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
assert!((cb.failure_rate() - 0.0).abs() < 1e-9);
}
#[test]
fn test_circuit_breaker_failure_rate_increases_with_failures() {
let cb = CircuitBreaker::new("svc-fr", 4, std::time::Duration::from_secs(10)).unwrap();
cb.record_failure();
cb.record_failure();
assert!((cb.failure_rate() - 0.5).abs() < 1e-9);
}
#[test]
fn test_prepend_stage_inserts_at_front() {
let p = Pipeline::new()
.add_stage("second", |s| Ok(s))
.prepend_stage("first", |s| Ok(s));
let names = p.stage_names_owned();
assert_eq!(names[0], "first");
assert_eq!(names[1], "second");
}
#[test]
fn test_prepend_stage_executes_before_existing_stages() {
let p = Pipeline::new()
.add_stage("append", |s| Ok(format!("{s}_appended")))
.prepend_stage("prefix", |s| Ok(format!("pre_{s}")));
let result = p.run("input".to_string()).unwrap();
assert_eq!(result, "pre_input_appended");
}
#[test]
fn test_prepend_stage_on_empty_pipeline() {
let p = Pipeline::new().prepend_stage("only", |s| Ok(s.to_uppercase()));
let result = p.run("hello".to_string()).unwrap();
assert_eq!(result, "HELLO");
}
#[test]
fn test_circuit_breaker_is_at_threshold_false_initially() {
let cb = CircuitBreaker::new("svc", 3, std::time::Duration::from_secs(10)).unwrap();
assert!(!cb.is_at_threshold());
}
#[test]
fn test_circuit_breaker_is_at_threshold_true_when_failures_reach_threshold() {
let cb = CircuitBreaker::new("svc-t", 2, std::time::Duration::from_secs(10)).unwrap();
cb.record_failure();
assert!(!cb.is_at_threshold());
cb.record_failure();
assert!(cb.is_at_threshold());
}
#[test]
fn test_backpressure_headroom_ratio_one_when_empty() {
let g = BackpressureGuard::new(10).unwrap();
let ratio = g.headroom_ratio().unwrap();
assert!((ratio - 1.0).abs() < 1e-9);
}
#[test]
fn test_backpressure_headroom_ratio_decreases_on_acquire() {
let g = BackpressureGuard::new(4).unwrap();
g.try_acquire().unwrap(); let ratio = g.headroom_ratio().unwrap();
assert!((ratio - 0.75).abs() < 1e-9);
}
#[test]
fn test_pipeline_first_stage_name_returns_first() {
let p = Pipeline::new()
.add_stage("alpha", |s| Ok(s))
.add_stage("beta", |s| Ok(s));
assert_eq!(p.first_stage_name(), Some("alpha"));
}
#[test]
fn test_pipeline_first_stage_name_none_when_empty() {
let p = Pipeline::new();
assert!(p.first_stage_name().is_none());
}
#[test]
fn test_pipeline_last_stage_name_returns_last() {
let p = Pipeline::new()
.add_stage("alpha", |s| Ok(s))
.add_stage("omega", |s| Ok(s));
assert_eq!(p.last_stage_name(), Some("omega"));
}
#[test]
fn test_pipeline_stage_index_returns_correct_position() {
let p = Pipeline::new()
.add_stage("first", |s| Ok(s))
.add_stage("second", |s| Ok(s))
.add_stage("third", |s| Ok(s));
assert_eq!(p.stage_index("first"), Some(0));
assert_eq!(p.stage_index("second"), Some(1));
assert_eq!(p.stage_index("third"), Some(2));
assert_eq!(p.stage_index("missing"), None);
}
#[test]
fn test_backpressure_is_empty_true_when_no_slots_acquired() {
let g = BackpressureGuard::new(10).unwrap();
assert!(g.is_empty().unwrap());
}
#[test]
fn test_backpressure_is_empty_false_after_acquire() {
let g = BackpressureGuard::new(10).unwrap();
g.try_acquire().unwrap();
assert!(!g.is_empty().unwrap());
}
#[test]
fn test_backpressure_available_capacity_decrements_on_acquire() {
let g = BackpressureGuard::new(5).unwrap();
assert_eq!(g.available_capacity().unwrap(), 5);
g.try_acquire().unwrap();
assert_eq!(g.available_capacity().unwrap(), 4);
}
#[test]
fn test_evict_oldest_removes_first_cached_entry() {
let d = Deduplicator::new(std::time::Duration::from_secs(60));
d.check_and_register("alpha").unwrap();
d.check_and_register("beta").unwrap();
d.complete("alpha", "result_a").unwrap();
d.complete("beta", "result_b").unwrap();
let removed = d.evict_oldest().unwrap();
assert!(removed);
assert!(d.get_result("alpha").unwrap().is_none());
assert!(d.get_result("beta").unwrap().is_some());
}
#[test]
fn test_evict_oldest_returns_false_when_empty() {
let d = Deduplicator::new(std::time::Duration::from_secs(60));
assert!(!d.evict_oldest().unwrap());
}
#[test]
fn test_circuit_breaker_is_at_threshold_true_after_three_failures() {
let cb = CircuitBreaker::new("svc-3", 3, std::time::Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure();
cb.record_failure();
assert!(cb.is_at_threshold());
}
#[test]
fn test_failures_until_open_equals_threshold_initially() {
let cb = CircuitBreaker::new("svc-fuo", 5, std::time::Duration::from_secs(60)).unwrap();
assert_eq!(cb.failures_until_open(), 5);
}
#[test]
fn test_failures_until_open_decrements_with_each_failure() {
let cb = CircuitBreaker::new("svc-fuo2", 4, std::time::Duration::from_secs(60)).unwrap();
cb.record_failure();
assert_eq!(cb.failures_until_open(), 3);
cb.record_failure();
assert_eq!(cb.failures_until_open(), 2);
}
#[test]
fn test_failures_until_open_zero_when_at_threshold() {
let cb = CircuitBreaker::new("svc-fuo3", 2, std::time::Duration::from_secs(60)).unwrap();
cb.record_failure();
cb.record_failure();
assert_eq!(cb.failures_until_open(), 0);
}
#[test]
fn test_deduplicator_cached_keys_empty_initially() {
let d = Deduplicator::new(Duration::from_secs(60));
assert!(d.cached_keys().unwrap().is_empty());
}
#[test]
fn test_deduplicator_cached_keys_contains_completed_key() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("ck-key").unwrap();
d.complete("ck-key", "result").unwrap();
let keys = d.cached_keys().unwrap();
assert!(keys.contains(&"ck-key".to_string()));
}
#[test]
fn test_deduplicator_cached_keys_excludes_in_flight() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("pending-key").unwrap();
assert!(!d.cached_keys().unwrap().contains(&"pending-key".to_string()));
}
#[test]
fn test_deduplicator_cached_keys_multiple_entries() {
let d = Deduplicator::new(Duration::from_secs(60));
for k in ["alpha", "beta", "gamma"] {
d.check_and_register(k).unwrap();
d.complete(k, "v").unwrap();
}
let keys = d.cached_keys().unwrap();
assert_eq!(keys.len(), 3);
}
#[test]
fn test_retry_policy_is_constant_true_for_constant() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert!(p.is_constant());
assert!(!p.is_exponential());
}
#[test]
fn test_retry_policy_is_constant_false_for_exponential() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(!p.is_constant());
}
#[test]
fn test_retry_policy_total_max_delay_ms_constant() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.total_max_delay_ms(), 300);
}
#[test]
fn test_retry_policy_total_max_delay_ms_exponential() {
let p = RetryPolicy::exponential(3, 100).unwrap();
let total = p.total_max_delay_ms();
assert!(total >= 300); }
#[test]
fn test_circuit_breaker_is_healthy_true_when_closed() {
let cb = CircuitBreaker::new("svc-ih1", 3, Duration::from_secs(60)).unwrap();
assert!(cb.is_healthy());
}
#[test]
fn test_circuit_breaker_is_healthy_false_when_open() {
let cb = CircuitBreaker::new("svc-ih2", 1, Duration::from_secs(60)).unwrap();
let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
assert!(!cb.is_healthy());
}
#[test]
fn test_circuit_breaker_is_half_open_after_zero_recovery() {
let cb = CircuitBreaker::new("svc-ho1", 1, Duration::ZERO).unwrap();
let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
assert!(cb.is_half_open() || cb.is_healthy()); }
#[test]
fn test_deduplicator_is_idle_true_when_empty() {
let d = Deduplicator::new(Duration::from_secs(60));
assert!(d.is_idle().unwrap());
}
#[test]
fn test_deduplicator_is_idle_false_when_in_flight() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("req-x").unwrap();
assert!(!d.is_idle().unwrap());
}
#[test]
fn test_deduplicator_is_idle_true_after_complete() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("req-y").unwrap();
d.complete("req-y", "done").unwrap();
assert!(d.is_idle().unwrap());
}
#[test]
fn test_deduplicator_in_flight_count_zero_initially() {
let d = Deduplicator::new(Duration::from_secs(60));
assert_eq!(d.in_flight_count().unwrap(), 0);
}
#[test]
fn test_deduplicator_in_flight_count_increments_on_register() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("k1").unwrap();
d.check_and_register("k2").unwrap();
assert_eq!(d.in_flight_count().unwrap(), 2);
}
#[test]
fn test_deduplicator_in_flight_count_decrements_after_complete() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("k1").unwrap();
d.complete("k1", "result").unwrap();
assert_eq!(d.in_flight_count().unwrap(), 0);
}
#[test]
fn test_deduplicator_total_count_sums_in_flight_and_cached() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("k1").unwrap(); d.check_and_register("k2").unwrap(); d.complete("k1", "done").unwrap(); assert_eq!(d.total_count().unwrap(), 2);
}
#[test]
fn test_deduplicator_total_count_zero_when_empty() {
let d = Deduplicator::new(Duration::from_secs(60));
assert_eq!(d.total_count().unwrap(), 0);
}
#[test]
fn test_backpressure_acquired_count_zero_initially() {
let g = BackpressureGuard::new(5).unwrap();
assert_eq!(g.acquired_count().unwrap(), 0);
}
#[test]
fn test_backpressure_acquired_count_increments_on_acquire() {
let g = BackpressureGuard::new(5).unwrap();
g.try_acquire().unwrap();
g.try_acquire().unwrap();
assert_eq!(g.acquired_count().unwrap(), 2);
}
#[test]
fn test_pipeline_swap_stages_swaps_positions() {
let mut p = Pipeline::new()
.add_stage("a", |s| Ok(s + "A"))
.add_stage("b", |s| Ok(s + "B"));
let swapped = p.swap_stages("a", "b");
assert!(swapped);
assert_eq!(p.first_stage_name().unwrap(), "b");
assert_eq!(p.last_stage_name().unwrap(), "a");
}
#[test]
fn test_pipeline_swap_stages_returns_false_for_unknown_stage() {
let mut p = Pipeline::new().add_stage("a", |s| Ok(s));
assert!(!p.swap_stages("a", "missing"));
}
#[test]
fn test_retry_policy_will_retry_at_all_false_for_none() {
let p = RetryPolicy::none();
assert!(!p.will_retry_at_all());
}
#[test]
fn test_retry_policy_will_retry_at_all_true_for_exponential() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(p.will_retry_at_all());
}
#[test]
fn test_deduplicator_fail_removes_in_flight_key() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("failing-req").unwrap();
assert!(!d.is_idle().unwrap());
d.fail("failing-req").unwrap();
assert!(d.is_idle().unwrap());
}
#[test]
fn test_deduplicator_fail_on_unknown_key_is_noop() {
let d = Deduplicator::new(Duration::from_secs(60));
assert!(d.fail("nonexistent").is_ok());
}
#[test]
fn test_deduplicator_fail_allows_reregistration() {
let d = Deduplicator::new(Duration::from_secs(60));
d.check_and_register("retry-key").unwrap();
d.fail("retry-key").unwrap();
let result = d.check_and_register("retry-key").unwrap();
assert_eq!(result, DeduplicationResult::New);
}
#[test]
fn test_retry_policy_max_total_delay_ms_constant_policy() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.max_total_delay_ms(), 300);
}
#[test]
fn test_retry_policy_max_total_delay_ms_single_attempt() {
let p = RetryPolicy::constant(1, 50).unwrap();
assert_eq!(p.max_total_delay_ms(), 50);
}
#[test]
fn test_retry_policy_is_last_attempt_true_at_max() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(p.is_last_attempt(3));
}
#[test]
fn test_retry_policy_is_last_attempt_false_before_max() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(!p.is_last_attempt(2));
}
#[test]
fn test_retry_policy_is_last_attempt_true_beyond_max() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!(p.is_last_attempt(4));
}
#[test]
fn test_retry_policy_delay_sum_ms_constant_two_attempts() {
let p = RetryPolicy::constant(5, 100).unwrap();
assert_eq!(p.delay_sum_ms(2), 200);
}
#[test]
fn test_retry_policy_delay_sum_ms_capped_at_max_attempts() {
let p = RetryPolicy::constant(2, 50).unwrap();
assert_eq!(p.delay_sum_ms(10), 100);
}
#[test]
fn test_retry_policy_avg_delay_ms_constant() {
let p = RetryPolicy::constant(4, 100).unwrap();
assert_eq!(p.avg_delay_ms(), 100);
}
#[test]
fn test_retry_policy_avg_delay_ms_single_attempt_policy() {
let p = RetryPolicy::none();
assert_eq!(p.avg_delay_ms(), 0);
}
#[test]
fn test_backoff_factor_exponential_returns_two() {
let p = RetryPolicy::exponential(3, 100).unwrap();
assert!((p.backoff_factor() - 2.0).abs() < 1e-9);
}
#[test]
fn test_backoff_factor_constant_returns_one() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert!((p.backoff_factor() - 1.0).abs() < 1e-9);
}
#[test]
fn test_pipeline_count_stages_matching_counts_by_keyword() {
let p = Pipeline::new()
.add_stage("normalize-text", |s| Ok(s))
.add_stage("text-trim", |s| Ok(s))
.add_stage("embed", |s| Ok(s));
assert_eq!(p.count_stages_matching("text"), 2);
assert_eq!(p.count_stages_matching("embed"), 1);
assert_eq!(p.count_stages_matching("missing"), 0);
}
#[test]
fn test_pipeline_count_stages_matching_case_insensitive() {
let p = Pipeline::new().add_stage("TEXT-CLEAN", |s| Ok(s));
assert_eq!(p.count_stages_matching("text"), 1);
}
#[test]
fn test_backpressure_guard_over_soft_limit_true_when_exceeded() {
let guard = BackpressureGuard::new(10)
.unwrap()
.with_soft_limit(1)
.unwrap();
guard.try_acquire().unwrap();
guard.try_acquire().unwrap();
assert!(guard.over_soft_limit().unwrap());
}
#[test]
fn test_backpressure_guard_over_soft_limit_false_when_no_soft_limit() {
let guard = BackpressureGuard::new(10).unwrap();
guard.try_acquire().unwrap();
assert!(!guard.over_soft_limit().unwrap());
}
#[test]
fn test_pipeline_description_empty() {
let p = Pipeline::new();
assert_eq!(p.description(), "Pipeline[empty]");
}
#[test]
fn test_pipeline_description_single_stage() {
let p = Pipeline::new().add_stage("trim", |s: String| Ok(s.trim().to_owned()));
assert_eq!(p.description(), "Pipeline[1 stage: trim]");
}
#[test]
fn test_pipeline_description_multiple_stages() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s))
.add_stage("c", |s: String| Ok(s));
let desc = p.description();
assert!(desc.contains("3 stages"));
assert!(desc.contains("a → b → c"));
}
#[test]
fn test_pipeline_has_unique_stage_names_true_when_all_unique() {
let p = Pipeline::new()
.add_stage("x", |s: String| Ok(s))
.add_stage("y", |s: String| Ok(s));
assert!(p.has_unique_stage_names());
}
#[test]
fn test_pipeline_has_unique_stage_names_false_when_duplicate() {
let p = Pipeline::new()
.add_stage("dup", |s: String| Ok(s))
.add_stage("dup", |s: String| Ok(s));
assert!(!p.has_unique_stage_names());
}
#[test]
fn test_pipeline_has_unique_stage_names_true_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.has_unique_stage_names());
}
#[test]
fn test_pipeline_stage_name_lengths_returns_byte_lengths_in_order() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s))
.add_stage("cdef", |s: String| Ok(s));
assert_eq!(p.stage_name_lengths(), vec![2, 4]);
}
#[test]
fn test_pipeline_stage_name_lengths_empty_pipeline_returns_empty() {
let p = Pipeline::new();
assert!(p.stage_name_lengths().is_empty());
}
#[test]
fn test_pipeline_avg_stage_name_length_computed_correctly() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s)) .add_stage("abcd", |s: String| Ok(s)); assert!((p.avg_stage_name_length() - 3.0).abs() < 1e-9);
}
#[test]
fn test_pipeline_avg_stage_name_length_zero_for_empty() {
assert_eq!(Pipeline::new().avg_stage_name_length(), 0.0);
}
#[test]
fn test_pipeline_stages_containing_returns_matching_names() {
let p = Pipeline::new()
.add_stage("tokenize", |s: String| Ok(s))
.add_stage("encode", |s: String| Ok(s))
.add_stage("token-validate", |s: String| Ok(s));
let result = p.stages_containing("token");
assert_eq!(result.len(), 2);
assert!(result.contains(&"tokenize"));
assert!(result.contains(&"token-validate"));
}
#[test]
fn test_pipeline_stages_containing_returns_empty_when_no_match() {
let p = Pipeline::new().add_stage("process", |s: String| Ok(s));
assert!(p.stages_containing("xyz").is_empty());
}
#[test]
fn test_pipeline_stage_is_first_returns_true_for_first_stage() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("second", |s: String| Ok(s));
assert!(p.stage_is_first("first"));
assert!(!p.stage_is_first("second"));
}
#[test]
fn test_pipeline_stage_is_last_returns_true_for_last_stage() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("last", |s: String| Ok(s));
assert!(p.stage_is_last("last"));
assert!(!p.stage_is_last("first"));
}
#[test]
fn test_stage_names_sorted_returns_alphabetical_order() {
let p = Pipeline::new()
.add_stage("zebra", |s: String| Ok(s))
.add_stage("alpha", |s: String| Ok(s))
.add_stage("mango", |s: String| Ok(s));
assert_eq!(p.stage_names_sorted(), vec!["alpha", "mango", "zebra"]);
}
#[test]
fn test_stage_names_sorted_empty_pipeline_returns_empty() {
let p = Pipeline::new();
assert!(p.stage_names_sorted().is_empty());
}
#[test]
fn test_longest_stage_name_returns_longest() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s))
.add_stage("abcde", |s: String| Ok(s))
.add_stage("abc", |s: String| Ok(s));
assert_eq!(p.longest_stage_name(), Some("abcde"));
}
#[test]
fn test_longest_stage_name_empty_pipeline_returns_none() {
let p = Pipeline::new();
assert_eq!(p.longest_stage_name(), None);
}
#[test]
fn test_shortest_stage_name_returns_shortest() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s))
.add_stage("abcde", |s: String| Ok(s))
.add_stage("a", |s: String| Ok(s));
assert_eq!(p.shortest_stage_name(), Some("a"));
}
#[test]
fn test_shortest_stage_name_empty_pipeline_returns_none() {
let p = Pipeline::new();
assert_eq!(p.shortest_stage_name(), None);
}
#[test]
fn test_circuit_state_display_closed() {
let s = CircuitState::Closed;
assert_eq!(s.to_string(), "Closed");
}
#[test]
fn test_circuit_state_display_open() {
let s = CircuitState::Open { opened_at: std::time::Instant::now() };
assert_eq!(s.to_string(), "Open");
}
#[test]
fn test_circuit_state_display_half_open() {
let s = CircuitState::HalfOpen;
assert_eq!(s.to_string(), "HalfOpen");
}
#[test]
fn test_retry_policy_display_exponential() {
let p = RetryPolicy::exponential(3, 100).unwrap();
let s = p.to_string();
assert!(s.contains("Exponential"));
assert!(s.contains('3'));
assert!(s.contains("100ms"));
}
#[test]
fn test_retry_policy_display_constant() {
let p = RetryPolicy::constant(5, 50).unwrap();
let s = p.to_string();
assert!(s.contains("Constant"));
assert!(s.contains('5'));
assert!(s.contains("50ms"));
}
#[test]
fn test_pipeline_total_stage_name_bytes_sums_correctly() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s)) .add_stage("xyz", |s: String| Ok(s)); assert_eq!(p.total_stage_name_bytes(), 5);
}
#[test]
fn test_pipeline_total_stage_name_bytes_zero_for_empty() {
assert_eq!(Pipeline::new().total_stage_name_bytes(), 0);
}
#[test]
fn test_pipeline_stages_before_returns_preceding_names() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s))
.add_stage("c", |s: String| Ok(s));
assert_eq!(p.stages_before("c"), vec!["a", "b"]);
assert!(p.stages_before("a").is_empty());
}
#[test]
fn test_pipeline_stages_before_returns_empty_for_unknown_stage() {
let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
assert!(p.stages_before("missing").is_empty());
}
#[test]
fn test_stages_after_returns_stages_following_name() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s))
.add_stage("c", |s: String| Ok(s));
assert_eq!(p.stages_after("a"), vec!["b", "c"]);
}
#[test]
fn test_stages_after_last_stage_returns_empty() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s));
assert!(p.stages_after("b").is_empty());
}
#[test]
fn test_stages_after_unknown_name_returns_empty() {
let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
assert!(p.stages_after("missing").is_empty());
}
#[test]
fn test_stage_position_from_end_last_is_zero() {
let p = Pipeline::new()
.add_stage("x", |s: String| Ok(s))
.add_stage("y", |s: String| Ok(s))
.add_stage("z", |s: String| Ok(s));
assert_eq!(p.stage_position_from_end("z"), Some(0));
assert_eq!(p.stage_position_from_end("x"), Some(2));
}
#[test]
fn test_stage_position_from_end_unknown_returns_none() {
let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
assert_eq!(p.stage_position_from_end("missing"), None);
}
#[test]
fn test_contains_all_stages_true_when_all_present() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s));
assert!(p.contains_all_stages(&["a", "b"]));
}
#[test]
fn test_contains_all_stages_false_when_one_missing() {
let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
assert!(!p.contains_all_stages(&["a", "b"]));
}
#[test]
fn test_contains_all_stages_true_for_empty_names() {
let p = Pipeline::new();
assert!(p.contains_all_stages(&[]));
}
#[test]
fn test_stage_count_above_name_len_counts_longer_names() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s))
.add_stage("abcde", |s: String| Ok(s))
.add_stage("xyz", |s: String| Ok(s));
assert_eq!(p.stage_count_above_name_len(2), 2); }
#[test]
fn test_stage_count_above_name_len_zero_when_none_exceed() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s));
assert_eq!(p.stage_count_above_name_len(5), 0);
}
#[test]
fn test_stage_pairs_returns_consecutive_pairs() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s))
.add_stage("c", |s: String| Ok(s));
assert_eq!(p.stage_pairs(), vec![("a", "b"), ("b", "c")]);
}
#[test]
fn test_stage_pairs_empty_for_single_stage_pipeline() {
let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
assert!(p.stage_pairs().is_empty());
}
#[test]
fn test_circuit_breaker_describe_contains_service_name() {
let cb = CircuitBreaker::new("my-service", 3, std::time::Duration::from_secs(30)).unwrap();
let desc = cb.describe().unwrap();
assert!(desc.contains("my-service"));
}
#[test]
fn test_circuit_breaker_describe_shows_closed_state_initially() {
let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
let desc = cb.describe().unwrap();
assert!(desc.contains("Closed"));
}
#[test]
fn test_circuit_breaker_describe_shows_failure_counts() {
let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
cb.record_failure();
cb.record_failure();
let desc = cb.describe().unwrap();
assert!(desc.contains("2/5"));
}
#[test]
fn test_retry_policy_attempts_budget_used_zero_at_start() {
let p = RetryPolicy::exponential(4, 10).unwrap();
assert_eq!(p.attempts_budget_used(0), 0.0);
}
#[test]
fn test_retry_policy_attempts_budget_used_one_when_exhausted() {
let p = RetryPolicy::exponential(4, 10).unwrap();
assert_eq!(p.attempts_budget_used(4), 1.0);
}
#[test]
fn test_retry_policy_attempts_budget_used_clamped_to_one() {
let p = RetryPolicy::exponential(4, 10).unwrap();
assert_eq!(p.attempts_budget_used(10), 1.0);
}
#[test]
fn test_retry_policy_attempts_budget_used_half_way() {
let p = RetryPolicy::constant(4, 10).unwrap();
assert!((p.attempts_budget_used(2) - 0.5).abs() < 1e-9);
}
#[test]
fn test_retry_policy_attempts_budget_used_fully_used_for_none_policy_after_one_attempt() {
let p = RetryPolicy::none();
assert_eq!(p.attempts_budget_used(1), 1.0);
assert_eq!(p.attempts_budget_used(0), 0.0);
}
#[test]
fn test_stage_at_returns_name_at_index() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("second", |s: String| Ok(s))
.add_stage("third", |s: String| Ok(s));
assert_eq!(p.stage_at(0), Some("first"));
assert_eq!(p.stage_at(2), Some("third"));
assert_eq!(p.stage_at(3), None);
}
#[test]
fn test_stage_at_returns_none_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.stage_at(0), None);
}
#[test]
fn test_stages_reversed_returns_names_in_reverse_order() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("b", |s: String| Ok(s))
.add_stage("c", |s: String| Ok(s));
assert_eq!(p.stages_reversed(), vec!["c", "b", "a"]);
}
#[test]
fn test_stages_reversed_empty_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.stages_reversed().is_empty());
}
#[test]
fn test_pipeline_is_empty_true_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.pipeline_is_empty());
}
#[test]
fn test_pipeline_is_empty_false_after_adding_stage() {
let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
assert!(!p.pipeline_is_empty());
}
#[test]
fn test_unique_stage_names_returns_sorted_names() {
let p = Pipeline::new()
.add_stage("charlie", |s: String| Ok(s))
.add_stage("alpha", |s: String| Ok(s))
.add_stage("bravo", |s: String| Ok(s));
assert_eq!(p.unique_stage_names(), vec!["alpha", "bravo", "charlie"]);
}
#[test]
fn test_unique_stage_names_empty_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.unique_stage_names().is_empty());
}
#[test]
fn test_stage_names_with_prefix_returns_matching_stages() {
let p = Pipeline::new()
.add_stage("validate_input", |s: String| Ok(s))
.add_stage("transform_data", |s: String| Ok(s))
.add_stage("validate_output", |s: String| Ok(s));
let names = p.stage_names_with_prefix("validate");
assert_eq!(names, vec!["validate_input", "validate_output"]);
}
#[test]
fn test_stage_names_with_prefix_empty_when_no_match() {
let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
assert!(p.stage_names_with_prefix("validate").is_empty());
}
#[test]
fn test_stages_with_suffix_returns_matching_stages() {
let p = Pipeline::new()
.add_stage("input_validate", |s: String| Ok(s))
.add_stage("transform_data", |s: String| Ok(s))
.add_stage("output_validate", |s: String| Ok(s));
let names = p.stages_with_suffix("validate");
assert_eq!(names, vec!["input_validate", "output_validate"]);
}
#[test]
fn test_stages_with_suffix_empty_when_no_match() {
let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
assert!(p.stages_with_suffix("validate").is_empty());
}
#[test]
fn test_has_stage_with_name_containing_true_when_match_exists() {
let p = Pipeline::new()
.add_stage("transform_input", |s: String| Ok(s))
.add_stage("write_output", |s: String| Ok(s));
assert!(p.has_stage_with_name_containing("transform"));
}
#[test]
fn test_has_stage_with_name_containing_false_when_no_match() {
let p = Pipeline::new().add_stage("write", |s: String| Ok(s));
assert!(!p.has_stage_with_name_containing("transform"));
}
#[test]
fn test_stage_name_bytes_total_sums_name_lengths() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s))
.add_stage("cde", |s: String| Ok(s));
assert_eq!(p.stage_name_bytes_total(), 5);
}
#[test]
fn test_stage_name_bytes_total_zero_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.stage_name_bytes_total(), 0);
}
#[test]
fn test_failure_headroom_full_when_no_failures_recorded() {
let cb = CircuitBreaker::new("svc-r47", 3, std::time::Duration::from_secs(10)).unwrap();
assert_eq!(cb.failure_headroom(), 3);
}
#[test]
fn test_failure_headroom_decreases_with_each_failure() {
let cb = CircuitBreaker::new("svc-r47b", 3, std::time::Duration::from_secs(10)).unwrap();
cb.record_failure();
assert_eq!(cb.failure_headroom(), 2);
cb.record_failure();
assert_eq!(cb.failure_headroom(), 1);
}
#[test]
fn test_failure_headroom_zero_when_at_or_above_threshold() {
let cb = CircuitBreaker::new("svc-r47c", 2, std::time::Duration::from_secs(10)).unwrap();
cb.record_failure();
cb.record_failure();
assert_eq!(cb.failure_headroom(), 0);
}
#[test]
fn test_stage_count_below_name_len_counts_short_names() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s)) .add_stage("abcde", |s: String| Ok(s)) .add_stage("xyz", |s: String| Ok(s)); assert_eq!(p.stage_count_below_name_len(4), 2);
}
#[test]
fn test_stage_count_below_name_len_zero_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.stage_count_below_name_len(10), 0);
}
#[test]
fn test_stage_count_above_name_bytes_counts_long_names() {
let p = Pipeline::new()
.add_stage("ab", |s: String| Ok(s))
.add_stage("a_very_long_name", |s: String| Ok(s));
assert_eq!(p.stage_count_above_name_bytes(3), 1);
}
#[test]
fn test_stage_count_above_name_bytes_zero_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.stage_count_above_name_bytes(0), 0);
}
#[test]
fn test_contains_stage_with_prefix_true_when_present() {
let p = Pipeline::new()
.add_stage("validate_input", |s: String| Ok(s))
.add_stage("transform_data", |s: String| Ok(s));
assert!(p.contains_stage_with_prefix("validate"));
}
#[test]
fn test_contains_stage_with_prefix_false_when_absent() {
let p = Pipeline::new().add_stage("stage_a", |s: String| Ok(s));
assert!(!p.contains_stage_with_prefix("missing"));
}
#[test]
fn test_contains_stage_with_prefix_false_for_empty_pipeline() {
let p = Pipeline::new();
assert!(!p.contains_stage_with_prefix("any"));
}
#[test]
fn test_retry_policy_is_bounded_true_for_normal_policy() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert!(p.is_bounded());
}
#[test]
fn test_retry_policy_is_bounded_true_for_none_policy() {
let p = RetryPolicy::none();
assert!(p.is_bounded());
}
#[test]
fn test_retry_policy_remaining_wait_budget_full_at_zero_attempts() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.remaining_wait_budget_ms(0), 300);
}
#[test]
fn test_retry_policy_remaining_wait_budget_decreases_with_attempts() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.remaining_wait_budget_ms(1), 200);
}
#[test]
fn test_stage_names_containing_returns_all_matching_stages() {
let p = Pipeline::new()
.add_stage("pre_process", |s: String| Ok(s))
.add_stage("post_process", |s: String| Ok(s))
.add_stage("transform", |s: String| Ok(s));
let names = p.stage_names_containing("process");
assert_eq!(names, vec!["pre_process", "post_process"]);
}
#[test]
fn test_stage_names_containing_empty_when_no_match() {
let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
assert!(p.stage_names_containing("process").is_empty());
}
#[test]
fn test_stage_name_from_end_zero_returns_last_stage() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("second", |s: String| Ok(s))
.add_stage("third", |s: String| Ok(s));
assert_eq!(p.stage_name_from_end(0), Some("third"));
}
#[test]
fn test_stage_name_from_end_one_returns_second_to_last() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("second", |s: String| Ok(s))
.add_stage("third", |s: String| Ok(s));
assert_eq!(p.stage_name_from_end(1), Some("second"));
}
#[test]
fn test_stage_name_from_end_out_of_bounds_returns_none() {
let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
assert_eq!(p.stage_name_from_end(1), None);
}
#[test]
fn test_stage_name_from_end_none_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.stage_name_from_end(0), None);
}
#[test]
fn test_stage_at_index_returns_correct_stage() {
let p = Pipeline::new()
.add_stage("alpha", |s: String| Ok(s))
.add_stage("beta", |s: String| Ok(s));
assert_eq!(p.stage_at_index(0).map(|s| s.name.as_str()), Some("alpha"));
assert_eq!(p.stage_at_index(1).map(|s| s.name.as_str()), Some("beta"));
}
#[test]
fn test_stage_at_index_none_for_out_of_bounds() {
let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
assert!(p.stage_at_index(5).is_none());
}
#[test]
fn test_stage_at_index_none_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.stage_at_index(0).is_none());
}
#[test]
fn test_max_single_delay_ms_constant_policy() {
let p = RetryPolicy::constant(3, 100).unwrap();
assert_eq!(p.max_single_delay_ms(), 100);
}
#[test]
fn test_max_single_delay_ms_exponential_grows_with_attempts() {
let p = RetryPolicy::exponential(3, 50).unwrap();
assert_eq!(p.max_single_delay_ms(), 200);
}
#[test]
fn test_all_stage_names_returns_all_in_order() {
let p = Pipeline::new()
.add_stage("alpha", |s: String| Ok(s))
.add_stage("beta", |s: String| Ok(s))
.add_stage("gamma", |s: String| Ok(s));
assert_eq!(p.all_stage_names(), vec!["alpha", "beta", "gamma"]);
}
#[test]
fn test_all_stage_names_empty_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.all_stage_names().is_empty());
}
#[test]
fn test_all_stage_names_preserves_duplicates() {
let p = Pipeline::new()
.add_stage("a", |s: String| Ok(s))
.add_stage("a", |s: String| Ok(s));
assert_eq!(p.all_stage_names(), vec!["a", "a"]);
}
#[test]
fn test_has_exactly_n_stages_true() {
let p = Pipeline::new()
.add_stage("x", |s: String| Ok(s))
.add_stage("y", |s: String| Ok(s));
assert!(p.has_exactly_n_stages(2));
}
#[test]
fn test_has_exactly_n_stages_false_when_different() {
let p = Pipeline::new().add_stage("x", |s: String| Ok(s));
assert!(!p.has_exactly_n_stages(3));
}
#[test]
fn test_has_exactly_n_stages_true_for_empty() {
let p = Pipeline::new();
assert!(p.has_exactly_n_stages(0));
}
#[test]
fn test_stage_index_of_returns_correct_index() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("second", |s: String| Ok(s))
.add_stage("third", |s: String| Ok(s));
assert_eq!(p.stage_index_of("second"), Some(1));
}
#[test]
fn test_stage_index_of_returns_none_when_absent() {
let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
assert_eq!(p.stage_index_of("beta"), None);
}
#[test]
fn test_stage_index_of_returns_first_match_for_duplicates() {
let p = Pipeline::new()
.add_stage("dup", |s: String| Ok(s))
.add_stage("dup", |s: String| Ok(s));
assert_eq!(p.stage_index_of("dup"), Some(0));
}
#[test]
fn test_all_stage_names_start_with_true_when_all_match() {
let p = Pipeline::new()
.add_stage("api_v1", |s: String| Ok(s))
.add_stage("api_v2", |s: String| Ok(s));
assert!(p.all_stage_names_start_with("api_"));
}
#[test]
fn test_all_stage_names_start_with_false_when_one_differs() {
let p = Pipeline::new()
.add_stage("api_v1", |s: String| Ok(s))
.add_stage("transform", |s: String| Ok(s));
assert!(!p.all_stage_names_start_with("api_"));
}
#[test]
fn test_all_stage_names_start_with_true_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.all_stage_names_start_with("anything"));
}
#[test]
fn test_has_no_stages_true_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.has_no_stages());
}
#[test]
fn test_has_no_stages_false_after_adding_stage() {
let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
assert!(!p.has_no_stages());
}
#[test]
fn test_longest_stage_name_len_returns_max() {
let p = Pipeline::new()
.add_stage("short", |s: String| Ok(s))
.add_stage("much-longer-name", |s: String| Ok(s));
assert_eq!(p.longest_stage_name_len(), "much-longer-name".len());
}
#[test]
fn test_longest_stage_name_len_zero_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.longest_stage_name_len(), 0);
}
#[test]
fn test_stage_names_joined_correct() {
let p = Pipeline::new()
.add_stage("alpha", |s: String| Ok(s))
.add_stage("beta", |s: String| Ok(s));
assert_eq!(p.stage_names_joined(", "), "alpha, beta");
}
#[test]
fn test_stage_names_joined_empty_for_empty_pipeline() {
let p = Pipeline::new();
assert_eq!(p.stage_names_joined("|"), "");
}
#[test]
fn test_stage_count_with_name_containing_correct() {
let p = Pipeline::new()
.add_stage("preprocess_input", |s: String| Ok(s))
.add_stage("process_data", |s: String| Ok(s))
.add_stage("postprocess_output", |s: String| Ok(s));
assert_eq!(p.stage_count_with_name_containing("process"), 3);
assert_eq!(p.stage_count_with_name_containing("pre"), 1);
}
#[test]
fn test_stage_count_with_name_containing_zero_when_none_match() {
let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
assert_eq!(p.stage_count_with_name_containing("beta"), 0);
}
#[test]
fn test_has_stage_at_index_true_for_valid_index() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("second", |s: String| Ok(s));
assert!(p.has_stage_at_index(0));
assert!(p.has_stage_at_index(1));
}
#[test]
fn test_has_stage_at_index_false_for_out_of_bounds() {
let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
assert!(!p.has_stage_at_index(1));
}
#[test]
fn test_has_stage_at_index_false_for_empty_pipeline() {
let p = Pipeline::new();
assert!(!p.has_stage_at_index(0));
}
#[test]
fn test_any_stage_has_name_true_for_existing_stage() {
let p = Pipeline::new()
.add_stage("alpha", |s: String| Ok(s))
.add_stage("beta", |s: String| Ok(s));
assert!(p.any_stage_has_name("alpha"));
assert!(p.any_stage_has_name("beta"));
}
#[test]
fn test_any_stage_has_name_false_for_missing_stage() {
let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
assert!(!p.any_stage_has_name("gamma"));
}
#[test]
fn test_any_stage_has_name_false_for_empty_pipeline() {
let p = Pipeline::new();
assert!(!p.any_stage_has_name("anything"));
}
#[test]
fn test_covers_n_failures_true_when_max_attempts_exceeds_n() {
let policy = RetryPolicy::exponential(5, 100).unwrap();
assert!(policy.covers_n_failures(4));
assert!(policy.covers_n_failures(0));
}
#[test]
fn test_covers_n_failures_false_when_max_attempts_equals_n() {
let policy = RetryPolicy::exponential(3, 100).unwrap();
assert!(!policy.covers_n_failures(3));
}
#[test]
fn test_covers_n_failures_false_for_no_retry_policy() {
let policy = RetryPolicy::none();
assert!(!policy.covers_n_failures(1));
}
#[test]
fn test_last_stage_name_returns_last_added() {
let p = Pipeline::new()
.add_stage("first", |s: String| Ok(s))
.add_stage("last", |s: String| Ok(s));
assert_eq!(p.last_stage_name(), Some("last"));
}
#[test]
fn test_last_stage_name_none_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.last_stage_name().is_none());
}
#[test]
fn test_stage_name_at_returns_correct_name() {
let p = Pipeline::new()
.add_stage("alpha", |s: String| Ok(s))
.add_stage("beta", |s: String| Ok(s));
assert_eq!(p.stage_name_at(0), Some("alpha"));
assert_eq!(p.stage_name_at(1), Some("beta"));
}
#[test]
fn test_stage_name_at_none_for_out_of_bounds() {
let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
assert!(p.stage_name_at(1).is_none());
}
#[test]
fn test_stage_name_at_none_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.stage_name_at(0).is_none());
}
#[test]
fn test_all_stage_names_contain_true_when_all_match() {
let p = Pipeline::new()
.add_stage("step_alpha", |s: String| Ok(s))
.add_stage("step_beta", |s: String| Ok(s));
assert!(p.all_stage_names_contain("step_"));
}
#[test]
fn test_all_stage_names_contain_false_when_one_does_not_match() {
let p = Pipeline::new()
.add_stage("step_alpha", |s: String| Ok(s))
.add_stage("gamma", |s: String| Ok(s));
assert!(!p.all_stage_names_contain("step_"));
}
#[test]
fn test_all_stage_names_contain_true_for_empty_pipeline() {
let p = Pipeline::new();
assert!(p.all_stage_names_contain("anything"));
}
}