1use crate::error::AgentRuntimeError;
44use crate::util::timed_lock;
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, Instant};
48
49pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
51
52#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum RetryKind {
57 Exponential,
59 Constant,
61}
62
63#[derive(Debug, Clone)]
65pub struct RetryPolicy {
66 pub max_attempts: u32,
68 pub base_delay: Duration,
70 pub kind: RetryKind,
72}
73
74impl RetryPolicy {
75 pub fn exponential(max_attempts: u32, base_ms: u64) -> Result<Self, AgentRuntimeError> {
85 if max_attempts == 0 {
86 return Err(AgentRuntimeError::Orchestration(
87 "max_attempts must be >= 1".into(),
88 ));
89 }
90 if base_ms == 0 {
91 return Err(AgentRuntimeError::Orchestration(
92 "base_ms must be >= 1 to avoid zero-delay busy-loop retries".into(),
93 ));
94 }
95 Ok(Self {
96 max_attempts,
97 base_delay: Duration::from_millis(base_ms),
98 kind: RetryKind::Exponential,
99 })
100 }
101
102 pub fn constant(max_attempts: u32, delay_ms: u64) -> Result<Self, AgentRuntimeError> {
113 if max_attempts == 0 {
114 return Err(AgentRuntimeError::Orchestration(
115 "max_attempts must be >= 1".into(),
116 ));
117 }
118 if delay_ms == 0 {
119 return Err(AgentRuntimeError::Orchestration(
120 "delay_ms must be >= 1 to avoid busy-loop retries".into(),
121 ));
122 }
123 Ok(Self {
124 max_attempts,
125 base_delay: Duration::from_millis(delay_ms),
126 kind: RetryKind::Constant,
127 })
128 }
129
130 pub fn none() -> Self {
134 Self {
135 max_attempts: 1,
136 base_delay: Duration::ZERO,
137 kind: RetryKind::Constant,
138 }
139 }
140
141 pub fn is_none(&self) -> bool {
145 self.max_attempts == 1 && self.base_delay == Duration::ZERO
146 }
147
148 pub fn with_max_attempts(mut self, n: u32) -> Result<Self, AgentRuntimeError> {
153 if n == 0 {
154 return Err(AgentRuntimeError::Orchestration(
155 "max_attempts must be >= 1".into(),
156 ));
157 }
158 self.max_attempts = n;
159 Ok(self)
160 }
161
162 pub fn max_attempts(&self) -> u32 {
164 self.max_attempts
165 }
166
167 pub fn is_no_retry(&self) -> bool {
171 self.max_attempts <= 1
172 }
173
174 pub fn will_retry_at_all(&self) -> bool {
180 self.max_attempts > 1
181 }
182
183 pub fn is_exponential(&self) -> bool {
185 matches!(self.kind, RetryKind::Exponential)
186 }
187
188 pub fn is_constant(&self) -> bool {
190 matches!(self.kind, RetryKind::Constant)
191 }
192
193 pub fn base_delay_ms(&self) -> u64 {
198 self.base_delay.as_millis() as u64
199 }
200
201 pub fn first_delay_ms(&self) -> u64 {
206 self.base_delay_ms()
207 }
208
209 pub fn is_last_attempt(&self, attempt: u32) -> bool {
213 attempt >= self.max_attempts
214 }
215
216 pub fn max_total_delay_ms(&self) -> u64 {
222 (1..=self.max_attempts)
223 .map(|attempt| self.delay_for(attempt).as_millis() as u64)
224 .sum()
225 }
226
227 pub fn delay_sum_ms(&self, n: u32) -> u64 {
231 let limit = n.min(self.max_attempts);
232 (1..=limit)
233 .map(|attempt| self.delay_for(attempt).as_millis() as u64)
234 .sum()
235 }
236
237 pub fn avg_delay_ms(&self) -> u64 {
241 if self.max_attempts == 0 {
242 return 0;
243 }
244 self.max_total_delay_ms() / self.max_attempts as u64
245 }
246
247 pub fn backoff_factor(&self) -> f64 {
251 match self.kind {
252 RetryKind::Exponential => 2.0,
253 RetryKind::Constant => 1.0,
254 }
255 }
256
257 pub fn with_base_delay_ms(mut self, ms: u64) -> Result<Self, AgentRuntimeError> {
262 if ms == 0 {
263 return Err(AgentRuntimeError::Orchestration(
264 "base_delay_ms must be >= 1 to avoid busy-loop retries".into(),
265 ));
266 }
267 self.base_delay = Duration::from_millis(ms);
268 Ok(self)
269 }
270
271 pub fn delay_ms_for(&self, attempt: u32) -> u64 {
278 self.delay_for(attempt).as_millis() as u64
279 }
280
281 pub fn total_max_delay_ms(&self) -> u64 {
286 (1..=self.max_attempts)
287 .map(|a| self.delay_for(a).as_millis() as u64)
288 .sum()
289 }
290
291 pub fn attempts_remaining(&self, attempt: u32) -> u32 {
295 self.max_attempts.saturating_sub(attempt)
296 }
297
298 pub fn attempts_budget_used(&self, attempt: u32) -> f64 {
307 if self.max_attempts == 0 {
308 return 1.0;
309 }
310 (attempt as f64 / self.max_attempts as f64).min(1.0)
311 }
312
313 pub fn max_delay_ms(&self) -> u64 {
319 if self.max_attempts == 0 {
320 return 0;
321 }
322 self.delay_ms_for(self.max_attempts)
323 }
324
325 pub fn can_retry(&self, attempt: u32) -> bool {
331 attempt < self.max_attempts
332 }
333
334 pub fn delay_for(&self, attempt: u32) -> Duration {
339 match self.kind {
340 RetryKind::Constant => self.base_delay.min(MAX_RETRY_DELAY),
341 RetryKind::Exponential => {
342 let exp = attempt.saturating_sub(1);
343 let multiplier = 1u64.checked_shl(exp.min(63)).unwrap_or(u64::MAX);
344 let millis = self
345 .base_delay
346 .as_millis()
347 .saturating_mul(multiplier as u128);
348 let raw = Duration::from_millis(millis.min(u64::MAX as u128) as u64);
349 raw.min(MAX_RETRY_DELAY)
350 }
351 }
352 }
353
354 pub fn is_bounded(&self) -> bool {
359 self.max_attempts < u32::MAX
360 }
361
362 pub fn remaining_wait_budget_ms(&self, attempts_done: u32) -> u64 {
368 self.max_total_delay_ms().saturating_sub(self.delay_sum_ms(attempts_done))
369 }
370
371 pub fn max_single_delay_ms(&self) -> u64 {
378 self.delay_for(self.max_attempts).as_millis() as u64
379 }
380
381 pub fn covers_n_failures(&self, n: u32) -> bool {
387 self.max_attempts > n
388 }
389}
390
391impl std::fmt::Display for RetryPolicy {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394 match self.kind {
395 RetryKind::Exponential => write!(
396 f,
397 "Exponential({}×, base={}ms)",
398 self.max_attempts,
399 self.base_delay.as_millis()
400 ),
401 RetryKind::Constant => write!(
402 f,
403 "Constant({}×, delay={}ms)",
404 self.max_attempts,
405 self.base_delay.as_millis()
406 ),
407 }
408 }
409}
410
411#[derive(Debug, Clone)]
421pub enum CircuitState {
422 Closed,
424 Open {
426 opened_at: Instant,
428 },
429 HalfOpen,
431}
432
433impl PartialEq for CircuitState {
434 fn eq(&self, other: &Self) -> bool {
435 match (self, other) {
436 (CircuitState::Closed, CircuitState::Closed) => true,
437 (CircuitState::Open { .. }, CircuitState::Open { .. }) => true,
438 (CircuitState::HalfOpen, CircuitState::HalfOpen) => true,
439 _ => false,
440 }
441 }
442}
443
444impl Eq for CircuitState {}
445
446impl std::fmt::Display for CircuitState {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449 match self {
450 CircuitState::Closed => write!(f, "Closed"),
451 CircuitState::Open { .. } => write!(f, "Open"),
452 CircuitState::HalfOpen => write!(f, "HalfOpen"),
453 }
454 }
455}
456
457pub trait CircuitBreakerBackend: Send + Sync {
465 fn increment_failures(&self, service: &str) -> u32;
467 fn reset_failures(&self, service: &str);
469 fn get_failures(&self, service: &str) -> u32;
471 fn set_open_at(&self, service: &str, at: std::time::Instant);
473 fn clear_open_at(&self, service: &str);
475 fn get_open_at(&self, service: &str) -> Option<std::time::Instant>;
477}
478
479pub struct InMemoryCircuitBreakerBackend {
488 inner: Arc<Mutex<HashMap<String, InMemoryServiceState>>>,
489}
490
491#[derive(Default)]
492struct InMemoryServiceState {
493 consecutive_failures: u32,
494 open_at: Option<std::time::Instant>,
495}
496
497impl InMemoryCircuitBreakerBackend {
498 pub fn new() -> Self {
500 Self {
501 inner: Arc::new(Mutex::new(HashMap::new())),
502 }
503 }
504}
505
506impl Default for InMemoryCircuitBreakerBackend {
507 fn default() -> Self {
508 Self::new()
509 }
510}
511
512impl CircuitBreakerBackend for InMemoryCircuitBreakerBackend {
513 fn increment_failures(&self, service: &str) -> u32 {
514 let mut map = timed_lock(
515 &self.inner,
516 "InMemoryCircuitBreakerBackend::increment_failures",
517 );
518 let state = map.entry(service.to_owned()).or_default();
519 state.consecutive_failures += 1;
520 state.consecutive_failures
521 }
522
523 fn reset_failures(&self, service: &str) {
524 let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::reset_failures");
525 if let Some(state) = map.get_mut(service) {
526 state.consecutive_failures = 0;
527 }
528 }
529
530 fn get_failures(&self, service: &str) -> u32 {
531 let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_failures");
532 map.get(service).map_or(0, |s| s.consecutive_failures)
533 }
534
535 fn set_open_at(&self, service: &str, at: std::time::Instant) {
536 let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::set_open_at");
537 map.entry(service.to_owned()).or_default().open_at = Some(at);
538 }
539
540 fn clear_open_at(&self, service: &str) {
541 let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::clear_open_at");
542 if let Some(state) = map.get_mut(service) {
543 state.open_at = None;
544 }
545 }
546
547 fn get_open_at(&self, service: &str) -> Option<std::time::Instant> {
548 let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_open_at");
549 map.get(service).and_then(|s| s.open_at)
550 }
551}
552
553#[derive(Clone)]
562pub struct CircuitBreaker {
563 threshold: u32,
564 recovery_window: Duration,
565 service: String,
566 backend: Arc<dyn CircuitBreakerBackend>,
567}
568
569impl std::fmt::Debug for CircuitBreaker {
570 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571 f.debug_struct("CircuitBreaker")
572 .field("threshold", &self.threshold)
573 .field("recovery_window", &self.recovery_window)
574 .field("service", &self.service)
575 .finish()
576 }
577}
578
579impl CircuitBreaker {
580 pub fn new(
587 service: impl Into<String>,
588 threshold: u32,
589 recovery_window: Duration,
590 ) -> Result<Self, AgentRuntimeError> {
591 if threshold == 0 {
592 return Err(AgentRuntimeError::Orchestration(
593 "circuit breaker threshold must be >= 1".into(),
594 ));
595 }
596 let service = service.into();
597 Ok(Self {
598 threshold,
599 recovery_window,
600 service,
601 backend: Arc::new(InMemoryCircuitBreakerBackend::new()),
602 })
603 }
604
605 pub fn with_backend(mut self, backend: Arc<dyn CircuitBreakerBackend>) -> Self {
609 self.backend = backend;
610 self
611 }
612
613 #[tracing::instrument(skip(self, f))]
622 pub fn call<T, E, F>(&self, f: F) -> Result<T, AgentRuntimeError>
623 where
624 F: FnOnce() -> Result<T, E>,
625 E: std::fmt::Display,
626 {
627 let effective_state = match self.backend.get_open_at(&self.service) {
629 Some(opened_at) => {
630 if opened_at.elapsed() >= self.recovery_window {
631 self.backend.clear_open_at(&self.service);
633 tracing::info!("circuit moved to half-open for {}", self.service);
634 CircuitState::HalfOpen
635 } else {
636 CircuitState::Open { opened_at }
637 }
638 }
639 None => {
640 let failures = self.backend.get_failures(&self.service);
644 if failures >= self.threshold {
645 CircuitState::HalfOpen
646 } else {
647 CircuitState::Closed
648 }
649 }
650 };
651
652 tracing::debug!("circuit state: {:?}", effective_state);
653
654 match effective_state {
655 CircuitState::Open { .. } => {
656 return Err(AgentRuntimeError::CircuitOpen {
657 service: self.service.clone(),
658 });
659 }
660 CircuitState::Closed | CircuitState::HalfOpen => {}
661 }
662
663 match f() {
665 Ok(val) => {
666 self.backend.reset_failures(&self.service);
667 self.backend.clear_open_at(&self.service);
668 tracing::info!("circuit closed for {}", self.service);
669 Ok(val)
670 }
671 Err(e) => {
672 let failures = self.backend.increment_failures(&self.service);
673 if failures >= self.threshold {
674 let now = Instant::now();
675 self.backend.set_open_at(&self.service, now);
676 tracing::info!("circuit opened for {}", self.service);
677 }
678 Err(AgentRuntimeError::Orchestration(e.to_string()))
679 }
680 }
681 }
682
683 pub fn state(&self) -> Result<CircuitState, AgentRuntimeError> {
685 let state = match self.backend.get_open_at(&self.service) {
686 Some(opened_at) => {
687 if opened_at.elapsed() >= self.recovery_window {
688 let failures = self.backend.get_failures(&self.service);
690 if failures >= self.threshold {
691 CircuitState::HalfOpen
692 } else {
693 CircuitState::Closed
694 }
695 } else {
696 CircuitState::Open { opened_at }
697 }
698 }
699 None => {
700 let failures = self.backend.get_failures(&self.service);
701 if failures >= self.threshold {
702 CircuitState::HalfOpen
703 } else {
704 CircuitState::Closed
705 }
706 }
707 };
708 Ok(state)
709 }
710
711 pub fn failure_count(&self) -> Result<u32, AgentRuntimeError> {
713 Ok(self.backend.get_failures(&self.service))
714 }
715
716 pub fn record_success(&self) {
721 self.backend.reset_failures(&self.service);
722 self.backend.clear_open_at(&self.service);
723 }
724
725 pub fn record_failure(&self) {
729 let failures = self.backend.increment_failures(&self.service);
730 if failures >= self.threshold {
731 self.backend.set_open_at(&self.service, Instant::now());
732 tracing::info!("circuit opened for {} (manual record)", self.service);
733 }
734 }
735
736 pub fn service_name(&self) -> &str {
738 &self.service
739 }
740
741 pub fn is_closed(&self) -> bool {
743 matches!(self.state(), Ok(CircuitState::Closed))
744 }
745
746 pub fn is_open(&self) -> bool {
748 matches!(self.state(), Ok(CircuitState::Open { .. }))
749 }
750
751 pub fn is_half_open(&self) -> bool {
753 matches!(self.state(), Ok(CircuitState::HalfOpen))
754 }
755
756 pub fn is_healthy(&self) -> bool {
761 !self.is_open()
762 }
763
764 pub fn threshold(&self) -> u32 {
768 self.threshold
769 }
770
771 pub fn failure_headroom(&self) -> u32 {
777 let failures = self.backend.get_failures(&self.service);
778 self.threshold.saturating_sub(failures)
779 }
780
781 pub fn failure_rate(&self) -> f64 {
787 if self.threshold == 0 {
788 return 0.0;
789 }
790 let failures = self.backend.get_failures(&self.service);
791 failures as f64 / self.threshold as f64
792 }
793
794 pub fn is_at_threshold(&self) -> bool {
799 let failures = self.backend.get_failures(&self.service);
800 failures >= self.threshold
801 }
802
803 pub fn failures_until_open(&self) -> u32 {
807 let failures = self.backend.get_failures(&self.service);
808 self.threshold.saturating_sub(failures)
809 }
810
811 pub fn recovery_window(&self) -> std::time::Duration {
816 self.recovery_window
817 }
818
819 pub fn reset(&self) {
824 self.backend.reset_failures(&self.service);
825 self.backend.clear_open_at(&self.service);
826 tracing::info!("circuit manually reset to Closed for {}", self.service);
827 }
828
829 pub fn describe(&self) -> Result<String, AgentRuntimeError> {
839 let state = self.state()?;
840 let failures = self.failure_count()?;
841 Ok(format!(
842 "service='{}' state={} failures={}/{}",
843 self.service, state, failures, self.threshold
844 ))
845 }
846
847 #[tracing::instrument(skip(self, backend, f))]
857 pub async fn async_call<T, E, F, Fut>(
858 &self,
859 backend: &dyn AsyncCircuitBreakerBackend,
860 f: F,
861 ) -> Result<T, AgentRuntimeError>
862 where
863 F: FnOnce() -> Fut,
864 Fut: std::future::Future<Output = Result<T, E>>,
865 E: std::fmt::Display,
866 {
867 let effective_state = match backend.get_open_at(&self.service).await {
869 Some(opened_at) => {
870 if opened_at.elapsed() >= self.recovery_window {
871 backend.clear_open_at(&self.service).await;
872 tracing::info!("circuit async moved to half-open for {}", self.service);
873 CircuitState::HalfOpen
874 } else {
875 CircuitState::Open { opened_at }
876 }
877 }
878 None => {
879 let failures = backend.get_failures(&self.service).await;
880 if failures >= self.threshold {
881 CircuitState::HalfOpen
882 } else {
883 CircuitState::Closed
884 }
885 }
886 };
887
888 if let CircuitState::Open { .. } = effective_state {
889 return Err(AgentRuntimeError::CircuitOpen {
890 service: self.service.clone(),
891 });
892 }
893
894 match f().await {
895 Ok(val) => {
896 backend.reset_failures(&self.service).await;
897 backend.clear_open_at(&self.service).await;
898 Ok(val)
899 }
900 Err(e) => {
901 let failures = backend.increment_failures(&self.service).await;
902 if failures >= self.threshold {
903 backend
904 .set_open_at(&self.service, Instant::now())
905 .await;
906 tracing::info!("circuit async opened for {}", self.service);
907 }
908 Err(AgentRuntimeError::Orchestration(e.to_string()))
909 }
910 }
911 }
912}
913
914#[async_trait::async_trait]
925pub trait AsyncCircuitBreakerBackend: Send + Sync {
926 async fn increment_failures(&self, service: &str) -> u32;
928 async fn reset_failures(&self, service: &str);
930 async fn get_failures(&self, service: &str) -> u32;
932 async fn set_open_at(&self, service: &str, at: Instant);
934 async fn clear_open_at(&self, service: &str);
936 async fn get_open_at(&self, service: &str) -> Option<Instant>;
938}
939
940#[async_trait::async_trait]
941impl AsyncCircuitBreakerBackend for InMemoryCircuitBreakerBackend {
942 async fn increment_failures(&self, service: &str) -> u32 {
943 <Self as CircuitBreakerBackend>::increment_failures(self, service)
944 }
945 async fn reset_failures(&self, service: &str) {
946 <Self as CircuitBreakerBackend>::reset_failures(self, service);
947 }
948 async fn get_failures(&self, service: &str) -> u32 {
949 <Self as CircuitBreakerBackend>::get_failures(self, service)
950 }
951 async fn set_open_at(&self, service: &str, at: Instant) {
952 <Self as CircuitBreakerBackend>::set_open_at(self, service, at);
953 }
954 async fn clear_open_at(&self, service: &str) {
955 <Self as CircuitBreakerBackend>::clear_open_at(self, service);
956 }
957 async fn get_open_at(&self, service: &str) -> Option<Instant> {
958 <Self as CircuitBreakerBackend>::get_open_at(self, service)
959 }
960}
961
962#[derive(Debug, Clone, PartialEq)]
966pub enum DeduplicationResult {
967 New,
969 Cached(String),
971 InProgress,
973}
974
975#[derive(Debug, Clone)]
983pub struct Deduplicator {
984 ttl: Duration,
985 max_entries: Option<usize>,
989 inner: Arc<Mutex<DeduplicatorInner>>,
990}
991
992#[derive(Debug)]
993struct DeduplicatorInner {
994 cache: HashMap<String, (String, Instant)>, in_flight: HashMap<String, Instant>, cache_order: std::collections::VecDeque<String>,
998 call_count: u64,
1002}
1003
1004impl Deduplicator {
1005 pub fn new(ttl: Duration) -> Self {
1007 Self {
1008 ttl,
1009 max_entries: None,
1010 inner: Arc::new(Mutex::new(DeduplicatorInner {
1011 cache: HashMap::new(),
1012 in_flight: HashMap::new(),
1013 cache_order: std::collections::VecDeque::new(),
1014 call_count: 0,
1015 })),
1016 }
1017 }
1018
1019 pub fn with_max_entries(mut self, max: usize) -> Result<Self, AgentRuntimeError> {
1028 if max == 0 {
1029 return Err(AgentRuntimeError::Orchestration(
1030 "Deduplicator max_entries must be >= 1".into(),
1031 ));
1032 }
1033 self.max_entries = Some(max);
1034 Ok(self)
1035 }
1036
1037 pub fn check_and_register(&self, key: &str) -> Result<DeduplicationResult, AgentRuntimeError> {
1041 let mut inner = timed_lock(&self.inner, "Deduplicator::check_and_register");
1042
1043 let now = Instant::now();
1044
1045 const EXPIRY_INTERVAL: u64 = 64;
1049 inner.call_count = inner.call_count.wrapping_add(1);
1050 if inner.call_count % EXPIRY_INTERVAL == 0 {
1051 let ttl = self.ttl;
1052 inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1053 inner
1054 .in_flight
1055 .retain(|_, ts| now.duration_since(*ts) < ttl);
1056 }
1057
1058 match inner.cache.get(key) {
1060 Some((result, ts)) if now.duration_since(*ts) < self.ttl => {
1061 return Ok(DeduplicationResult::Cached(result.clone()));
1062 }
1063 Some(_) => {
1064 inner.cache.remove(key); }
1066 None => {}
1067 }
1068 match inner.in_flight.get(key) {
1069 Some(ts) if now.duration_since(*ts) < self.ttl => {
1070 return Ok(DeduplicationResult::InProgress);
1071 }
1072 Some(_) => {
1073 inner.in_flight.remove(key); }
1075 None => {}
1076 }
1077
1078 inner.in_flight.insert(key.to_owned(), now);
1079 Ok(DeduplicationResult::New)
1080 }
1081
1082 pub fn check(&self, key: &str, ttl: std::time::Duration) -> Result<DeduplicationResult, AgentRuntimeError> {
1087 let mut inner = timed_lock(&self.inner, "Deduplicator::check");
1088 let now = Instant::now();
1089
1090 const EXPIRY_INTERVAL: u64 = 64;
1092 inner.call_count = inner.call_count.wrapping_add(1);
1093 if inner.call_count % EXPIRY_INTERVAL == 0 {
1094 inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1095 inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
1096 }
1097
1098 match inner.cache.get(key) {
1099 Some((result, ts)) if now.duration_since(*ts) < ttl => {
1100 return Ok(DeduplicationResult::Cached(result.clone()));
1101 }
1102 Some(_) => {
1103 inner.cache.remove(key);
1104 }
1105 None => {}
1106 }
1107 match inner.in_flight.get(key) {
1108 Some(ts) if now.duration_since(*ts) < ttl => {
1109 return Ok(DeduplicationResult::InProgress);
1110 }
1111 Some(_) => {
1112 inner.in_flight.remove(key);
1113 }
1114 None => {}
1115 }
1116
1117 inner.in_flight.insert(key.to_owned(), now);
1118 Ok(DeduplicationResult::New)
1119 }
1120
1121 pub fn dedup_many(
1129 &self,
1130 requests: &[(&str, std::time::Duration)],
1131 ) -> Result<Vec<DeduplicationResult>, AgentRuntimeError> {
1132 if requests.is_empty() {
1133 return Ok(Vec::new());
1134 }
1135 let mut inner = timed_lock(&self.inner, "Deduplicator::dedup_many");
1136 let now = std::time::Instant::now();
1137 let mut results = Vec::with_capacity(requests.len());
1138
1139 for &(key, ttl) in requests {
1140 inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1142 inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
1143
1144 let result = if let Some((cached_result, _)) = inner.cache.get(key) {
1145 DeduplicationResult::Cached(cached_result.clone())
1146 } else if inner.in_flight.contains_key(key) {
1147 DeduplicationResult::InProgress
1148 } else {
1149 inner.in_flight.insert(key.to_owned(), now);
1150 DeduplicationResult::New
1151 };
1152 results.push(result);
1153 }
1154
1155 Ok(results)
1156 }
1157
1158 pub fn complete(&self, key: &str, result: impl Into<String>) -> Result<(), AgentRuntimeError> {
1163 let mut inner = timed_lock(&self.inner, "Deduplicator::complete");
1164 inner.in_flight.remove(key);
1165
1166 if let Some(max) = self.max_entries {
1169 while inner.cache.len() >= max {
1170 match inner.cache_order.pop_front() {
1171 Some(oldest_key) => {
1172 inner.cache.remove(&oldest_key);
1173 }
1174 None => break,
1175 }
1176 }
1177 }
1178
1179 let owned_key = key.to_owned();
1180 inner.cache_order.push_back(owned_key.clone());
1181 inner.cache.insert(owned_key, (result.into(), Instant::now()));
1182 Ok(())
1183 }
1184
1185 pub fn fail(&self, key: &str) -> Result<(), AgentRuntimeError> {
1190 let mut inner = timed_lock(&self.inner, "Deduplicator::fail");
1191 inner.in_flight.remove(key);
1192 Ok(())
1193 }
1194
1195 pub fn in_flight_count(&self) -> Result<usize, AgentRuntimeError> {
1197 let inner = timed_lock(&self.inner, "Deduplicator::in_flight_count");
1198 Ok(inner.in_flight.len())
1199 }
1200
1201 pub fn in_flight_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1203 let inner = timed_lock(&self.inner, "Deduplicator::in_flight_keys");
1204 Ok(inner.in_flight.keys().cloned().collect())
1205 }
1206
1207 pub fn cached_count(&self) -> Result<usize, AgentRuntimeError> {
1211 let inner = timed_lock(&self.inner, "Deduplicator::cached_count");
1212 Ok(inner.cache.len())
1213 }
1214
1215 pub fn cached_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1222 let inner = timed_lock(&self.inner, "Deduplicator::cached_keys");
1223 Ok(inner.cache.keys().cloned().collect())
1224 }
1225
1226 pub fn ttl(&self) -> Duration {
1228 self.ttl
1229 }
1230
1231 pub fn max_entries(&self) -> Option<usize> {
1237 self.max_entries
1238 }
1239
1240 pub fn is_idle(&self) -> Result<bool, AgentRuntimeError> {
1242 let inner = timed_lock(&self.inner, "Deduplicator::is_idle");
1243 Ok(inner.in_flight.is_empty())
1244 }
1245
1246 pub fn total_count(&self) -> Result<usize, AgentRuntimeError> {
1249 let inner = timed_lock(&self.inner, "Deduplicator::total_count");
1250 Ok(inner.in_flight.len() + inner.cache.len())
1251 }
1252
1253 pub fn contains(&self, key: &str) -> Result<bool, AgentRuntimeError> {
1260 let inner = timed_lock(&self.inner, "Deduplicator::contains");
1261 Ok(inner.in_flight.contains_key(key) || inner.cache.contains_key(key))
1262 }
1263
1264 pub fn get_result(&self, key: &str) -> Result<Option<String>, AgentRuntimeError> {
1269 let inner = timed_lock(&self.inner, "Deduplicator::get_result");
1270 let ttl = self.ttl;
1271 let now = std::time::Instant::now();
1272 Ok(inner.cache.get(key).and_then(|(result, inserted_at)| {
1273 if now.duration_since(*inserted_at) <= ttl {
1274 Some(result.clone())
1275 } else {
1276 None
1277 }
1278 }))
1279 }
1280
1281 pub fn clear(&self) -> Result<(), AgentRuntimeError> {
1285 let mut inner = timed_lock(&self.inner, "Deduplicator::clear");
1286 inner.cache.clear();
1287 inner.in_flight.clear();
1288 inner.cache_order.clear();
1289 Ok(())
1290 }
1291
1292 pub fn purge_expired(&self) -> Result<usize, AgentRuntimeError> {
1300 let mut inner = timed_lock(&self.inner, "Deduplicator::purge_expired");
1301 let ttl = self.ttl;
1302 let now = std::time::Instant::now();
1303 let before = inner.cache.len();
1304 inner.cache.retain(|_, (_, inserted_at)| {
1305 now.duration_since(*inserted_at) <= ttl
1306 });
1307 let removed = before - inner.cache.len();
1308 if removed > 0 {
1311 let live_keys: std::collections::HashSet<String> =
1312 inner.cache.keys().cloned().collect();
1313 inner.cache_order.retain(|k| live_keys.contains(k));
1314 }
1315 Ok(removed)
1316 }
1317
1318 pub fn evict_oldest(&self) -> Result<bool, AgentRuntimeError> {
1322 let mut inner = timed_lock(&self.inner, "Deduplicator::evict_oldest");
1323 while let Some(key) = inner.cache_order.pop_front() {
1324 if inner.cache.remove(&key).is_some() {
1325 return Ok(true);
1326 }
1327 }
1328 Ok(false)
1329 }
1330}
1331
1332#[derive(Debug, Clone)]
1342pub struct BackpressureGuard {
1343 capacity: usize,
1344 soft_capacity: Option<usize>,
1345 inner: Arc<Mutex<usize>>,
1346}
1347
1348impl BackpressureGuard {
1349 pub fn new(capacity: usize) -> Result<Self, AgentRuntimeError> {
1355 if capacity == 0 {
1356 return Err(AgentRuntimeError::Orchestration(
1357 "BackpressureGuard capacity must be > 0".into(),
1358 ));
1359 }
1360 Ok(Self {
1361 capacity,
1362 soft_capacity: None,
1363 inner: Arc::new(Mutex::new(0)),
1364 })
1365 }
1366
1367 pub fn with_soft_limit(mut self, soft: usize) -> Result<Self, AgentRuntimeError> {
1370 if soft >= self.capacity {
1371 return Err(AgentRuntimeError::Orchestration(
1372 "soft_capacity must be less than hard capacity".into(),
1373 ));
1374 }
1375 self.soft_capacity = Some(soft);
1376 Ok(self)
1377 }
1378
1379 pub fn try_acquire(&self) -> Result<(), AgentRuntimeError> {
1388 let mut depth = timed_lock(&self.inner, "BackpressureGuard::try_acquire");
1389 if *depth >= self.capacity {
1390 return Err(AgentRuntimeError::BackpressureShed {
1391 depth: *depth,
1392 capacity: self.capacity,
1393 });
1394 }
1395 *depth += 1;
1396 if let Some(soft) = self.soft_capacity {
1397 if *depth >= soft {
1398 tracing::warn!(
1399 depth = *depth,
1400 soft_capacity = soft,
1401 hard_capacity = self.capacity,
1402 "backpressure approaching hard limit"
1403 );
1404 }
1405 }
1406 Ok(())
1407 }
1408
1409 pub fn release(&self) -> Result<(), AgentRuntimeError> {
1411 let mut depth = timed_lock(&self.inner, "BackpressureGuard::release");
1412 *depth = depth.saturating_sub(1);
1413 Ok(())
1414 }
1415
1416 pub fn reset(&self) {
1421 let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset");
1422 *depth = 0;
1423 }
1424
1425 pub fn is_full(&self) -> Result<bool, AgentRuntimeError> {
1427 Ok(self.depth()? >= self.capacity)
1428 }
1429
1430 pub fn is_empty(&self) -> Result<bool, AgentRuntimeError> {
1432 Ok(self.depth()? == 0)
1433 }
1434
1435 pub fn available_capacity(&self) -> Result<usize, AgentRuntimeError> {
1437 Ok(self.capacity.saturating_sub(self.depth()?))
1438 }
1439
1440 pub fn hard_capacity(&self) -> usize {
1442 self.capacity
1443 }
1444
1445 pub fn soft_limit(&self) -> Option<usize> {
1447 self.soft_capacity
1448 }
1449
1450 pub fn is_soft_limited(&self) -> bool {
1455 self.soft_capacity.is_some()
1456 }
1457
1458 pub fn depth(&self) -> Result<usize, AgentRuntimeError> {
1460 let depth = timed_lock(&self.inner, "BackpressureGuard::depth");
1461 Ok(*depth)
1462 }
1463
1464 pub fn percent_full(&self) -> Result<f64, AgentRuntimeError> {
1469 let depth = self.depth()?;
1470 Ok((depth as f64 / self.capacity as f64 * 100.0).min(100.0))
1471 }
1472
1473 pub fn soft_depth_ratio(&self) -> f32 {
1478 match self.soft_capacity {
1479 None => 0.0,
1480 Some(soft) => {
1481 let depth = timed_lock(&self.inner, "BackpressureGuard::soft_depth_ratio");
1482 *depth as f32 / soft as f32
1483 }
1484 }
1485 }
1486
1487 pub fn utilization_ratio(&self) -> Result<f32, AgentRuntimeError> {
1491 if self.capacity == 0 {
1492 return Ok(0.0);
1493 }
1494 let depth = self.depth()?;
1495 Ok(depth as f32 / self.capacity as f32)
1496 }
1497
1498 pub fn remaining_capacity(&self) -> Result<usize, AgentRuntimeError> {
1503 let depth = self.depth()?;
1504 Ok(self.capacity.saturating_sub(depth))
1505 }
1506
1507 pub fn reset_depth(&self) -> Result<(), AgentRuntimeError> {
1512 let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset_depth");
1513 *depth = 0;
1514 Ok(())
1515 }
1516
1517 pub fn headroom_ratio(&self) -> Result<f64, AgentRuntimeError> {
1521 Ok(self.available_capacity()? as f64 / self.capacity as f64)
1522 }
1523
1524 pub fn acquired_count(&self) -> Result<usize, AgentRuntimeError> {
1528 Ok(self.capacity - self.available_capacity()?)
1529 }
1530
1531 pub fn over_soft_limit(&self) -> Result<bool, AgentRuntimeError> {
1535 let soft = match self.soft_limit() {
1536 Some(s) => s,
1537 None => return Ok(false),
1538 };
1539 Ok(self.depth()? > soft)
1540 }
1541}
1542
1543#[derive(Debug)]
1547pub struct PipelineResult {
1548 pub output: String,
1550 pub stage_timings: Vec<(String, u64)>,
1552}
1553
1554impl PipelineResult {
1555 pub fn total_duration_ms(&self) -> u64 {
1557 self.stage_timings.iter().map(|(_, ms)| ms).sum()
1558 }
1559
1560 pub fn stage_count(&self) -> usize {
1565 self.stage_timings.len()
1566 }
1567
1568 pub fn slowest_stage(&self) -> Option<(&str, u64)> {
1572 self.stage_timings
1573 .iter()
1574 .max_by_key(|(_, ms)| ms)
1575 .map(|(name, ms)| (name.as_str(), *ms))
1576 }
1577
1578 pub fn fastest_stage(&self) -> Option<(&str, u64)> {
1582 self.stage_timings
1583 .iter()
1584 .min_by_key(|(_, ms)| ms)
1585 .map(|(name, ms)| (name.as_str(), *ms))
1586 }
1587
1588 pub fn is_empty(&self) -> bool {
1590 self.stage_timings.is_empty()
1591 }
1592}
1593
1594pub struct Stage {
1596 pub name: String,
1598 pub handler: Box<dyn Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync>,
1600}
1601
1602impl std::fmt::Debug for Stage {
1603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1604 f.debug_struct("Stage").field("name", &self.name).finish()
1605 }
1606}
1607
1608type StageErrorHandler = Box<dyn Fn(&str, &str) -> String + Send + Sync>;
1610
1611pub struct Pipeline {
1618 stages: Vec<Stage>,
1619 error_handler: Option<StageErrorHandler>,
1620}
1621
1622impl std::fmt::Debug for Pipeline {
1623 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1624 f.debug_struct("Pipeline")
1625 .field("stages", &self.stages)
1626 .field("has_error_handler", &self.error_handler.is_some())
1627 .finish()
1628 }
1629}
1630
1631impl Pipeline {
1632 pub fn new() -> Self {
1634 Self { stages: Vec::new(), error_handler: None }
1635 }
1636
1637 pub fn with_error_handler(
1643 mut self,
1644 handler: impl Fn(&str, &str) -> String + Send + Sync + 'static,
1645 ) -> Self {
1646 self.error_handler = Some(Box::new(handler));
1647 self
1648 }
1649
1650 pub fn add_stage(
1652 mut self,
1653 name: impl Into<String>,
1654 handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1655 ) -> Self {
1656 self.stages.push(Stage {
1657 name: name.into(),
1658 handler: Box::new(handler),
1659 });
1660 self
1661 }
1662
1663 pub fn prepend_stage(
1668 mut self,
1669 name: impl Into<String>,
1670 handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1671 ) -> Self {
1672 self.stages.insert(0, Stage {
1673 name: name.into(),
1674 handler: Box::new(handler),
1675 });
1676 self
1677 }
1678
1679 pub fn is_empty(&self) -> bool {
1681 self.stages.is_empty()
1682 }
1683
1684 pub fn has_error_handler(&self) -> bool {
1689 self.error_handler.is_some()
1690 }
1691
1692 pub fn stage_count(&self) -> usize {
1694 self.stages.len()
1695 }
1696
1697 pub fn has_stage(&self, name: &str) -> bool {
1699 self.stages.iter().any(|s| s.name == name)
1700 }
1701
1702 pub fn stage_names(&self) -> Vec<&str> {
1704 self.stages.iter().map(|s| s.name.as_str()).collect()
1705 }
1706
1707 pub fn stage_names_owned(&self) -> Vec<String> {
1714 self.stages.iter().map(|s| s.name.clone()).collect()
1715 }
1716
1717 pub fn get_stage_name_at(&self, index: usize) -> Option<&str> {
1719 self.stages.get(index).map(|s| s.name.as_str())
1720 }
1721
1722 pub fn stage_index(&self, name: &str) -> Option<usize> {
1726 self.stages.iter().position(|s| s.name == name)
1727 }
1728
1729 pub fn first_stage_name(&self) -> Option<&str> {
1731 self.stages.first().map(|s| s.name.as_str())
1732 }
1733
1734 pub fn last_stage_name(&self) -> Option<&str> {
1736 self.stages.last().map(|s| s.name.as_str())
1737 }
1738
1739 pub fn remove_stage(&mut self, name: &str) -> bool {
1744 if let Some(pos) = self.stages.iter().position(|s| s.name == name) {
1745 self.stages.remove(pos);
1746 true
1747 } else {
1748 false
1749 }
1750 }
1751
1752 pub fn rename_stage(&mut self, old_name: &str, new_name: impl Into<String>) -> bool {
1757 if let Some(stage) = self.stages.iter_mut().find(|s| s.name == old_name) {
1758 stage.name = new_name.into();
1759 true
1760 } else {
1761 false
1762 }
1763 }
1764
1765 pub fn clear(&mut self) {
1769 self.stages.clear();
1770 }
1771
1772 pub fn count_stages_matching(&self, keyword: &str) -> usize {
1774 let kw = keyword.to_ascii_lowercase();
1775 self.stages
1776 .iter()
1777 .filter(|s| s.name.to_ascii_lowercase().contains(&kw))
1778 .count()
1779 }
1780
1781 pub fn swap_stages(&mut self, a: &str, b: &str) -> bool {
1786 let idx_a = self.stages.iter().position(|s| s.name == a);
1787 let idx_b = self.stages.iter().position(|s| s.name == b);
1788 match (idx_a, idx_b) {
1789 (Some(i), Some(j)) => {
1790 self.stages.swap(i, j);
1791 true
1792 }
1793 _ => false,
1794 }
1795 }
1796
1797 #[tracing::instrument(skip(self))]
1799 pub fn run(&self, input: String) -> Result<String, AgentRuntimeError> {
1800 let mut current = input;
1801 for stage in &self.stages {
1802 tracing::debug!(stage = %stage.name, "running pipeline stage");
1803 match (stage.handler)(current) {
1804 Ok(out) => current = out,
1805 Err(e) => {
1806 tracing::error!(stage = %stage.name, error = %e, "pipeline stage failed");
1807 if let Some(ref handler) = self.error_handler {
1808 current = handler(&stage.name, &e.to_string());
1809 } else {
1810 return Err(e);
1811 }
1812 }
1813 }
1814 }
1815 Ok(current)
1816 }
1817
1818 pub fn execute_timed(&self, input: String) -> Result<PipelineResult, AgentRuntimeError> {
1823 let mut current = input;
1824 let mut stage_timings = Vec::new();
1825 for stage in &self.stages {
1826 let start = std::time::Instant::now();
1827 tracing::debug!(stage = %stage.name, "running timed pipeline stage");
1828 match (stage.handler)(current) {
1829 Ok(out) => current = out,
1830 Err(e) => {
1831 tracing::error!(stage = %stage.name, error = %e, "timed pipeline stage failed");
1832 if let Some(ref handler) = self.error_handler {
1833 current = handler(&stage.name, &e.to_string());
1834 } else {
1835 return Err(e);
1836 }
1837 }
1838 }
1839 let duration_ms = start.elapsed().as_millis() as u64;
1840 stage_timings.push((stage.name.clone(), duration_ms));
1841 }
1842 Ok(PipelineResult {
1843 output: current,
1844 stage_timings,
1845 })
1846 }
1847
1848 pub fn description(&self) -> String {
1855 if self.stages.is_empty() {
1856 return "Pipeline[empty]".to_owned();
1857 }
1858 let names = self
1859 .stages
1860 .iter()
1861 .map(|s| s.name.as_str())
1862 .collect::<Vec<_>>()
1863 .join(" → ");
1864 let n = self.stages.len();
1865 let plural = if n == 1 { "stage" } else { "stages" };
1866 format!("Pipeline[{n} {plural}: {names}]")
1867 }
1868
1869 pub fn has_unique_stage_names(&self) -> bool {
1875 let mut seen = std::collections::HashSet::new();
1876 self.stages.iter().all(|s| seen.insert(s.name.as_str()))
1877 }
1878
1879 pub fn stage_names_sorted(&self) -> Vec<&str> {
1886 let mut names: Vec<&str> = self.stages.iter().map(|s| s.name.as_str()).collect();
1887 names.sort_unstable();
1888 names
1889 }
1890
1891 pub fn longest_stage_name(&self) -> Option<&str> {
1896 self.stages
1897 .iter()
1898 .max_by_key(|s| s.name.len())
1899 .map(|s| s.name.as_str())
1900 }
1901
1902 pub fn shortest_stage_name(&self) -> Option<&str> {
1907 self.stages
1908 .iter()
1909 .min_by_key(|s| s.name.len())
1910 .map(|s| s.name.as_str())
1911 }
1912
1913 pub fn stage_name_lengths(&self) -> Vec<usize> {
1917 self.stages.iter().map(|s| s.name.len()).collect()
1918 }
1919
1920 pub fn avg_stage_name_length(&self) -> f64 {
1924 if self.stages.is_empty() {
1925 return 0.0;
1926 }
1927 let total: usize = self.stages.iter().map(|s| s.name.len()).sum();
1928 total as f64 / self.stages.len() as f64
1929 }
1930
1931 pub fn stages_containing(&self, substring: &str) -> Vec<&str> {
1935 self.stages
1936 .iter()
1937 .filter(|s| s.name.contains(substring))
1938 .map(|s| s.name.as_str())
1939 .collect()
1940 }
1941
1942 pub fn stage_is_first(&self, name: &str) -> bool {
1946 self.stages.first().map_or(false, |s| s.name == name)
1947 }
1948
1949 pub fn stage_is_last(&self, name: &str) -> bool {
1953 self.stages.last().map_or(false, |s| s.name == name)
1954 }
1955
1956 pub fn total_stage_name_bytes(&self) -> usize {
1960 self.stages.iter().map(|s| s.name.len()).sum()
1961 }
1962
1963 pub fn stages_before(&self, name: &str) -> Vec<&str> {
1968 let pos = self.stages.iter().position(|s| s.name == name);
1969 match pos {
1970 None | Some(0) => Vec::new(),
1971 Some(idx) => self.stages[..idx].iter().map(|s| s.name.as_str()).collect(),
1972 }
1973 }
1974
1975 pub fn stages_after(&self, name: &str) -> Vec<&str> {
1980 let pos = self.stages.iter().position(|s| s.name == name);
1981 match pos {
1982 None => Vec::new(),
1983 Some(idx) if idx + 1 >= self.stages.len() => Vec::new(),
1984 Some(idx) => self.stages[idx + 1..].iter().map(|s| s.name.as_str()).collect(),
1985 }
1986 }
1987
1988 pub fn stage_pairs(&self) -> Vec<(&str, &str)> {
1993 self.stages
1994 .windows(2)
1995 .map(|w| (w[0].name.as_str(), w[1].name.as_str()))
1996 .collect()
1997 }
1998
1999 pub fn stage_count_above_name_len(&self, min_len: usize) -> usize {
2005 self.stages.iter().filter(|s| s.name.len() > min_len).count()
2006 }
2007
2008 pub fn stage_count_below_name_len(&self, max_len: usize) -> usize {
2015 self.stages.iter().filter(|s| s.name.len() < max_len).count()
2016 }
2017
2018 pub fn stage_at(&self, idx: usize) -> Option<&str> {
2023 self.stages.get(idx).map(|s| s.name.as_str())
2024 }
2025
2026 pub fn stages_reversed(&self) -> Vec<&str> {
2031 self.stages.iter().rev().map(|s| s.name.as_str()).collect()
2032 }
2033
2034 pub fn pipeline_is_empty(&self) -> bool {
2038 self.stages.is_empty()
2039 }
2040
2041 pub fn unique_stage_names(&self) -> Vec<&str> {
2047 let mut names: Vec<&str> = self.stages.iter().map(|s| s.name.as_str()).collect();
2048 names.sort_unstable();
2049 names
2050 }
2051
2052 pub fn stage_names_with_prefix<'a>(&'a self, prefix: &str) -> Vec<&'a str> {
2057 self.stages
2058 .iter()
2059 .filter(|s| s.name.starts_with(prefix))
2060 .map(|s| s.name.as_str())
2061 .collect()
2062 }
2063
2064 pub fn contains_stage_with_prefix(&self, prefix: &str) -> bool {
2071 self.stages.iter().any(|s| s.name.starts_with(prefix))
2072 }
2073
2074 pub fn stages_with_suffix<'a>(&'a self, suffix: &str) -> Vec<&'a str> {
2082 self.stages
2083 .iter()
2084 .filter(|s| s.name.ends_with(suffix))
2085 .map(|s| s.name.as_str())
2086 .collect()
2087 }
2088
2089 pub fn has_stage_with_name_containing(&self, substr: &str) -> bool {
2094 self.stages.iter().any(|s| s.name.contains(substr))
2095 }
2096
2097 pub fn stage_names_containing<'a>(&'a self, substr: &str) -> Vec<&'a str> {
2105 self.stages
2106 .iter()
2107 .filter(|s| s.name.contains(substr))
2108 .map(|s| s.name.as_str())
2109 .collect()
2110 }
2111
2112 pub fn stage_name_bytes_total(&self) -> usize {
2117 self.stages.iter().map(|s| s.name.len()).sum()
2118 }
2119
2120 pub fn stage_count_above_name_bytes(&self, min_bytes: usize) -> usize {
2125 self.stages.iter().filter(|s| s.name.len() > min_bytes).count()
2126 }
2127
2128 pub fn stage_at_index(&self, index: usize) -> Option<&Stage> {
2130 self.stages.get(index)
2131 }
2132
2133 pub fn stage_position_from_end(&self, name: &str) -> Option<usize> {
2138 let pos = self.stages.iter().position(|s| s.name == name)?;
2139 Some(self.stages.len() - 1 - pos)
2140 }
2141
2142 pub fn contains_all_stages(&self, names: &[&str]) -> bool {
2146 names.iter().all(|&n| self.stages.iter().any(|s| s.name == n))
2147 }
2148
2149 pub fn stage_name_from_end(&self, n: usize) -> Option<&str> {
2154 let len = self.stages.len();
2155 if n >= len {
2156 return None;
2157 }
2158 Some(self.stages[len - 1 - n].name.as_str())
2159 }
2160
2161 pub fn all_stage_names(&self) -> Vec<String> {
2168 self.stages.iter().map(|s| s.name.clone()).collect()
2169 }
2170
2171 pub fn has_exactly_n_stages(&self, n: usize) -> bool {
2173 self.stages.len() == n
2174 }
2175
2176 pub fn stage_index_of(&self, name: &str) -> Option<usize> {
2179 self.stages.iter().position(|s| s.name == name)
2180 }
2181
2182 pub fn has_no_stages(&self) -> bool {
2186 self.stages.is_empty()
2187 }
2188
2189 pub fn longest_stage_name_len(&self) -> usize {
2193 self.stages.iter().map(|s| s.name.len()).max().unwrap_or(0)
2194 }
2195
2196 pub fn stage_names_joined(&self, sep: &str) -> String {
2200 self.stages
2201 .iter()
2202 .map(|s| s.name.as_str())
2203 .collect::<Vec<_>>()
2204 .join(sep)
2205 }
2206
2207 pub fn stage_count_with_name_containing(&self, substr: &str) -> usize {
2211 self.stages.iter().filter(|s| s.name.contains(substr)).count()
2212 }
2213
2214 pub fn has_stage_at_index(&self, idx: usize) -> bool {
2216 idx < self.stages.len()
2217 }
2218
2219 pub fn all_stage_names_start_with(&self, prefix: &str) -> bool {
2224 self.stages.iter().all(|s| s.name.starts_with(prefix))
2225 }
2226
2227 pub fn any_stage_has_name(&self, name: &str) -> bool {
2234 self.stages.iter().any(|s| s.name == name)
2235 }
2236
2237 pub fn stage_name_at(&self, idx: usize) -> Option<&str> {
2246 self.stages.get(idx).map(|s| s.name.as_str())
2247 }
2248
2249 pub fn all_stage_names_contain(&self, substr: &str) -> bool {
2254 self.stages.iter().all(|s| s.name.contains(substr))
2255 }
2256
2257}
2258
2259impl Default for Pipeline {
2260 fn default() -> Self {
2261 Self::new()
2262 }
2263}
2264
2265#[cfg(test)]
2268mod tests {
2269 use super::*;
2270
2271 #[test]
2274 fn test_retry_policy_rejects_zero_attempts() {
2275 assert!(RetryPolicy::exponential(0, 100).is_err());
2276 }
2277
2278 #[test]
2279 fn test_retry_policy_delay_attempt_1_equals_base() {
2280 let p = RetryPolicy::exponential(3, 100).unwrap();
2281 assert_eq!(p.delay_for(1), Duration::from_millis(100));
2282 }
2283
2284 #[test]
2285 fn test_retry_policy_delay_doubles_each_attempt() {
2286 let p = RetryPolicy::exponential(5, 100).unwrap();
2287 assert_eq!(p.delay_for(2), Duration::from_millis(200));
2288 assert_eq!(p.delay_for(3), Duration::from_millis(400));
2289 assert_eq!(p.delay_for(4), Duration::from_millis(800));
2290 }
2291
2292 #[test]
2293 fn test_retry_policy_delay_capped_at_max() {
2294 let p = RetryPolicy::exponential(10, 10_000).unwrap();
2295 assert_eq!(p.delay_for(10), MAX_RETRY_DELAY);
2296 }
2297
2298 #[test]
2299 fn test_retry_policy_delay_never_exceeds_max_for_any_attempt() {
2300 let p = RetryPolicy::exponential(10, 1000).unwrap();
2301 for attempt in 1..=10 {
2302 assert!(p.delay_for(attempt) <= MAX_RETRY_DELAY);
2303 }
2304 }
2305
2306 #[test]
2309 fn test_retry_policy_first_delay_ms_equals_base_delay() {
2310 let p = RetryPolicy::exponential(3, 200).unwrap();
2311 assert_eq!(p.first_delay_ms(), p.base_delay_ms());
2312 }
2313
2314 #[test]
2315 fn test_retry_policy_first_delay_ms_constant_policy() {
2316 let p = RetryPolicy::constant(4, 150).unwrap();
2317 assert_eq!(p.first_delay_ms(), 150);
2318 }
2319
2320 #[test]
2323 fn test_circuit_breaker_rejects_zero_threshold() {
2324 assert!(CircuitBreaker::new("svc", 0, Duration::from_secs(1)).is_err());
2325 }
2326
2327 #[test]
2328 fn test_circuit_breaker_starts_closed() {
2329 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
2330 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2331 }
2332
2333 #[test]
2334 fn test_circuit_breaker_success_keeps_closed() {
2335 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
2336 let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(42));
2337 assert!(result.is_ok());
2338 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2339 }
2340
2341 #[test]
2342 fn test_circuit_breaker_opens_after_threshold_failures() {
2343 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
2344 for _ in 0..3 {
2345 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("oops".to_string()));
2346 }
2347 assert!(matches!(cb.state().unwrap(), CircuitState::Open { .. }));
2348 }
2349
2350 #[test]
2351 fn test_circuit_breaker_open_fast_fails() {
2352 let cb = CircuitBreaker::new("svc", 1, Duration::from_secs(3600)).unwrap();
2353 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2354 let result: Result<(), AgentRuntimeError> = cb.call(|| Ok::<(), AgentRuntimeError>(()));
2355 assert!(matches!(result, Err(AgentRuntimeError::CircuitOpen { .. })));
2356 }
2357
2358 #[test]
2359 fn test_circuit_breaker_success_resets_failure_count() {
2360 let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap();
2361 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2362 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2363 let _: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(1));
2364 assert_eq!(cb.failure_count().unwrap(), 0);
2365 }
2366
2367 #[test]
2368 fn test_circuit_breaker_half_open_on_recovery() {
2369 let cb = CircuitBreaker::new("svc", 1, Duration::ZERO).unwrap();
2371 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
2372 let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(99));
2374 assert_eq!(result.unwrap_or(0), 99);
2375 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2376 }
2377
2378 #[test]
2379 fn test_circuit_breaker_with_custom_backend_uses_backend_state() {
2380 let shared_backend: Arc<dyn CircuitBreakerBackend> =
2383 Arc::new(InMemoryCircuitBreakerBackend::new());
2384
2385 let cb1 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
2386 .unwrap()
2387 .with_backend(Arc::clone(&shared_backend));
2388
2389 let cb2 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
2390 .unwrap()
2391 .with_backend(Arc::clone(&shared_backend));
2392
2393 let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail".to_string()));
2395
2396 assert_eq!(cb2.failure_count().unwrap(), 1);
2398
2399 let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail again".to_string()));
2401
2402 assert!(matches!(cb2.state().unwrap(), CircuitState::Open { .. }));
2404 }
2405
2406 #[test]
2407 fn test_in_memory_backend_increments_and_resets() {
2408 use super::CircuitBreakerBackend as CB;
2409 let backend = InMemoryCircuitBreakerBackend::new();
2410
2411 assert_eq!(CB::get_failures(&backend, "svc"), 0);
2412
2413 let count = CB::increment_failures(&backend, "svc");
2414 assert_eq!(count, 1);
2415
2416 let count = CB::increment_failures(&backend, "svc");
2417 assert_eq!(count, 2);
2418
2419 CB::reset_failures(&backend, "svc");
2420 assert_eq!(CB::get_failures(&backend, "svc"), 0);
2421
2422 assert!(CB::get_open_at(&backend, "svc").is_none());
2424 let now = Instant::now();
2425 CB::set_open_at(&backend, "svc", now);
2426 assert!(CB::get_open_at(&backend, "svc").is_some());
2427 CB::clear_open_at(&backend, "svc");
2428 assert!(CB::get_open_at(&backend, "svc").is_none());
2429 }
2430
2431 #[test]
2434 fn test_deduplicator_new_key_is_new() {
2435 let d = Deduplicator::new(Duration::from_secs(60));
2436 let r = d.check_and_register("key-1").unwrap();
2437 assert_eq!(r, DeduplicationResult::New);
2438 }
2439
2440 #[test]
2441 fn test_deduplicator_second_check_is_in_progress() {
2442 let d = Deduplicator::new(Duration::from_secs(60));
2443 d.check_and_register("key-1").unwrap();
2444 let r = d.check_and_register("key-1").unwrap();
2445 assert_eq!(r, DeduplicationResult::InProgress);
2446 }
2447
2448 #[test]
2449 fn test_deduplicator_complete_makes_cached() {
2450 let d = Deduplicator::new(Duration::from_secs(60));
2451 d.check_and_register("key-1").unwrap();
2452 d.complete("key-1", "result-value").unwrap();
2453 let r = d.check_and_register("key-1").unwrap();
2454 assert_eq!(r, DeduplicationResult::Cached("result-value".into()));
2455 }
2456
2457 #[test]
2458 fn test_deduplicator_different_keys_are_independent() {
2459 let d = Deduplicator::new(Duration::from_secs(60));
2460 d.check_and_register("key-a").unwrap();
2461 let r = d.check_and_register("key-b").unwrap();
2462 assert_eq!(r, DeduplicationResult::New);
2463 }
2464
2465 #[test]
2466 fn test_deduplicator_expired_entry_is_new() {
2467 let d = Deduplicator::new(Duration::ZERO); d.check_and_register("key-1").unwrap();
2469 d.complete("key-1", "old").unwrap();
2470 let r = d.check_and_register("key-1").unwrap();
2472 assert_eq!(r, DeduplicationResult::New);
2473 }
2474
2475 #[test]
2478 fn test_backpressure_guard_rejects_zero_capacity() {
2479 assert!(BackpressureGuard::new(0).is_err());
2480 }
2481
2482 #[test]
2483 fn test_backpressure_guard_acquire_within_capacity() {
2484 let g = BackpressureGuard::new(5).unwrap();
2485 assert!(g.try_acquire().is_ok());
2486 assert_eq!(g.depth().unwrap(), 1);
2487 }
2488
2489 #[test]
2490 fn test_backpressure_guard_sheds_when_full() {
2491 let g = BackpressureGuard::new(2).unwrap();
2492 g.try_acquire().unwrap();
2493 g.try_acquire().unwrap();
2494 let result = g.try_acquire();
2495 assert!(matches!(
2496 result,
2497 Err(AgentRuntimeError::BackpressureShed { .. })
2498 ));
2499 }
2500
2501 #[test]
2502 fn test_backpressure_guard_release_decrements_depth() {
2503 let g = BackpressureGuard::new(3).unwrap();
2504 g.try_acquire().unwrap();
2505 g.try_acquire().unwrap();
2506 g.release().unwrap();
2507 assert_eq!(g.depth().unwrap(), 1);
2508 }
2509
2510 #[test]
2511 fn test_backpressure_guard_release_on_empty_is_noop() {
2512 let g = BackpressureGuard::new(3).unwrap();
2513 g.release().unwrap(); assert_eq!(g.depth().unwrap(), 0);
2515 }
2516
2517 #[test]
2520 fn test_pipeline_runs_stages_in_order() {
2521 let p = Pipeline::new()
2522 .add_stage("upper", |s| Ok(s.to_uppercase()))
2523 .add_stage("append", |s| Ok(format!("{s}!")));
2524 let result = p.run("hello".into()).unwrap();
2525 assert_eq!(result, "HELLO!");
2526 }
2527
2528 #[test]
2529 fn test_pipeline_empty_pipeline_returns_input() {
2530 let p = Pipeline::new();
2531 assert_eq!(p.run("test".into()).unwrap(), "test");
2532 }
2533
2534 #[test]
2535 fn test_pipeline_stage_failure_short_circuits() {
2536 let p = Pipeline::new()
2537 .add_stage("fail", |_| {
2538 Err(AgentRuntimeError::Orchestration("boom".into()))
2539 })
2540 .add_stage("never", |s| Ok(s));
2541 assert!(p.run("input".into()).is_err());
2542 }
2543
2544 #[test]
2545 fn test_pipeline_stage_count() {
2546 let p = Pipeline::new()
2547 .add_stage("s1", |s| Ok(s))
2548 .add_stage("s2", |s| Ok(s));
2549 assert_eq!(p.stage_count(), 2);
2550 }
2551
2552 #[test]
2553 fn test_pipeline_execute_timed_captures_stage_durations() {
2554 let p = Pipeline::new()
2555 .add_stage("s1", |s| Ok(format!("{s}1")))
2556 .add_stage("s2", |s| Ok(format!("{s}2")));
2557 let result = p.execute_timed("x".to_string()).unwrap();
2558 assert_eq!(result.output, "x12");
2559 assert_eq!(result.stage_timings.len(), 2);
2560 assert_eq!(result.stage_timings[0].0, "s1");
2561 assert_eq!(result.stage_timings[1].0, "s2");
2562 }
2563
2564 #[test]
2567 fn test_backpressure_soft_limit_rejects_invalid_config() {
2568 let g = BackpressureGuard::new(5).unwrap();
2570 assert!(g.with_soft_limit(5).is_err());
2571 let g = BackpressureGuard::new(5).unwrap();
2572 assert!(g.with_soft_limit(6).is_err());
2573 }
2574
2575 #[test]
2576 fn test_backpressure_soft_limit_accepts_requests_below_soft() {
2577 let g = BackpressureGuard::new(5)
2578 .unwrap()
2579 .with_soft_limit(2)
2580 .unwrap();
2581 assert!(g.try_acquire().is_ok());
2583 assert!(g.try_acquire().is_ok());
2584 assert_eq!(g.depth().unwrap(), 2);
2585 }
2586
2587 #[test]
2588 fn test_backpressure_with_soft_limit_still_sheds_at_hard_capacity() {
2589 let g = BackpressureGuard::new(3)
2590 .unwrap()
2591 .with_soft_limit(2)
2592 .unwrap();
2593 g.try_acquire().unwrap();
2594 g.try_acquire().unwrap();
2595 g.try_acquire().unwrap(); let result = g.try_acquire();
2597 assert!(matches!(
2598 result,
2599 Err(AgentRuntimeError::BackpressureShed { .. })
2600 ));
2601 }
2602
2603 #[test]
2606 fn test_backpressure_hard_capacity_matches_new() {
2607 let g = BackpressureGuard::new(7).unwrap();
2608 assert_eq!(g.hard_capacity(), 7);
2609 }
2610
2611 #[test]
2614 fn test_pipeline_error_handler_recovers_from_stage_failure() {
2615 let p = Pipeline::new()
2616 .add_stage("fail_stage", |_| {
2617 Err(AgentRuntimeError::Orchestration("oops".into()))
2618 })
2619 .add_stage("append", |s| Ok(format!("{s}-recovered")))
2620 .with_error_handler(|stage_name, _err| format!("recovered_from_{stage_name}"));
2621 let result = p.run("input".to_string()).unwrap();
2622 assert_eq!(result, "recovered_from_fail_stage-recovered");
2623 }
2624
2625 #[test]
2628 fn test_circuit_state_eq() {
2629 assert_eq!(CircuitState::Closed, CircuitState::Closed);
2630 assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
2631 assert_eq!(
2632 CircuitState::Open { opened_at: std::time::Instant::now() },
2633 CircuitState::Open { opened_at: std::time::Instant::now() }
2634 );
2635 assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
2636 assert_ne!(CircuitState::Closed, CircuitState::Open { opened_at: std::time::Instant::now() });
2637 }
2638
2639 #[test]
2642 fn test_dedup_many_independent_keys() {
2643 let d = Deduplicator::new(Duration::from_secs(60));
2644 let ttl = Duration::from_secs(60);
2645 let results = d.dedup_many(&[("key-a", ttl), ("key-b", ttl), ("key-c", ttl)]).unwrap();
2646 assert_eq!(results.len(), 3);
2647 assert!(results.iter().all(|r| matches!(r, DeduplicationResult::New)));
2648 }
2649
2650 #[test]
2653 fn test_concurrent_circuit_breaker_opens_under_concurrent_failures() {
2654 use std::sync::Arc;
2655 use std::thread;
2656
2657 let cb = Arc::new(
2658 CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap(),
2659 );
2660 let n_threads = 8;
2661 let failures_per_thread = 2;
2662
2663 let mut handles = Vec::new();
2664 for _ in 0..n_threads {
2665 let cb = Arc::clone(&cb);
2666 handles.push(thread::spawn(move || {
2667 for _ in 0..failures_per_thread {
2668 let _ = cb.call(|| Err::<(), &str>("fail"));
2669 }
2670 }));
2671 }
2672 for h in handles {
2673 h.join().unwrap();
2674 }
2675
2676 let state = cb.state().unwrap();
2679 assert!(
2680 matches!(state, CircuitState::Open { .. }),
2681 "circuit should be open after many concurrent failures; got: {state:?}"
2682 );
2683 }
2684
2685 #[test]
2686 fn test_per_service_tracking_is_independent() {
2687 let backend = Arc::new(InMemoryCircuitBreakerBackend::new());
2688
2689 let cb_a = CircuitBreaker::new("service-a", 3, Duration::from_secs(60))
2690 .unwrap()
2691 .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2692 let cb_b = CircuitBreaker::new("service-b", 3, Duration::from_secs(60))
2693 .unwrap()
2694 .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2695
2696 for _ in 0..3 {
2698 let _ = cb_a.call(|| Err::<(), &str>("fail"));
2699 }
2700
2701 let state_b = cb_b.state().unwrap();
2703 assert_eq!(
2704 state_b,
2705 CircuitState::Closed,
2706 "service-b should be unaffected by service-a failures"
2707 );
2708
2709 let state_a = cb_a.state().unwrap();
2711 assert!(
2712 matches!(state_a, CircuitState::Open { .. }),
2713 "service-a should be open"
2714 );
2715 }
2716
2717 #[test]
2720 fn test_backpressure_concurrent_acquires_are_consistent() {
2721 use std::sync::Arc;
2722 use std::thread;
2723
2724 let g = Arc::new(BackpressureGuard::new(100).unwrap());
2725 let mut handles = Vec::new();
2726
2727 for _ in 0..10 {
2728 let g_clone = Arc::clone(&g);
2729 handles.push(thread::spawn(move || {
2730 g_clone.try_acquire().ok();
2731 }));
2732 }
2733
2734 for h in handles {
2735 h.join().unwrap();
2736 }
2737
2738 assert_eq!(g.depth().unwrap(), 10);
2740 }
2741
2742 #[test]
2745 fn test_retry_policy_constant_has_fixed_delay() {
2746 let p = RetryPolicy::constant(3, 100).unwrap();
2747 assert_eq!(p.delay_for(1), Duration::from_millis(100));
2748 assert_eq!(p.delay_for(2), Duration::from_millis(100));
2749 assert_eq!(p.delay_for(10), Duration::from_millis(100));
2750 }
2751
2752 #[test]
2753 fn test_retry_policy_exponential_doubles() {
2754 let p = RetryPolicy::exponential(5, 10).unwrap();
2755 assert_eq!(p.delay_for(1), Duration::from_millis(10));
2756 assert_eq!(p.delay_for(2), Duration::from_millis(20));
2757 assert_eq!(p.delay_for(3), Duration::from_millis(40));
2758 }
2759
2760 #[test]
2761 fn test_retry_policy_with_max_attempts() {
2762 let p = RetryPolicy::constant(3, 50).unwrap();
2763 let p2 = p.with_max_attempts(7).unwrap();
2764 assert_eq!(p2.max_attempts, 7);
2765 assert!(RetryPolicy::constant(1, 50).unwrap().with_max_attempts(0).is_err());
2766 }
2767
2768 #[test]
2769 fn test_circuit_breaker_reset_returns_to_closed() {
2770 let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2771 cb.record_failure();
2772 cb.record_failure(); assert_ne!(cb.state().unwrap(), CircuitState::Closed);
2774 cb.reset();
2775 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2776 assert_eq!(cb.failure_count().unwrap(), 0);
2777 }
2778
2779 #[test]
2780 fn test_deduplicator_clear_resets_all_state() {
2781 let d = Deduplicator::new(Duration::from_secs(60));
2782 d.check_and_register("k1").unwrap();
2783 d.check_and_register("k2").unwrap();
2784 d.complete("k1", "r1").unwrap();
2785 assert_eq!(d.in_flight_count().unwrap(), 1);
2786 assert_eq!(d.cached_count().unwrap(), 1);
2787 d.clear().unwrap();
2788 assert_eq!(d.in_flight_count().unwrap(), 0);
2789 assert_eq!(d.cached_count().unwrap(), 0);
2790 }
2791
2792 #[test]
2793 fn test_deduplicator_purge_expired_removes_stale() {
2794 let d = Deduplicator::new(Duration::from_millis(1));
2795 d.check_and_register("x").unwrap();
2796 d.complete("x", "result").unwrap();
2797 std::thread::sleep(Duration::from_millis(5));
2798 let removed = d.purge_expired().unwrap();
2799 assert_eq!(removed, 1);
2800 assert_eq!(d.cached_count().unwrap(), 0);
2801 }
2802
2803 #[test]
2804 fn test_backpressure_utilization_ratio() {
2805 let g = BackpressureGuard::new(4).unwrap();
2806 g.try_acquire().unwrap();
2807 g.try_acquire().unwrap();
2808 let ratio = g.utilization_ratio().unwrap();
2809 assert!((ratio - 0.5).abs() < 1e-5);
2810 }
2811
2812 #[test]
2813 fn test_pipeline_stage_count_and_names() {
2814 let p = Pipeline::new()
2815 .add_stage("first", |s| Ok(s + "1"))
2816 .add_stage("second", |s| Ok(s + "2"));
2817 assert_eq!(p.stage_count(), 2);
2818 assert_eq!(p.stage_names(), vec!["first", "second"]);
2819 }
2820
2821 #[test]
2822 fn test_pipeline_is_empty_true_for_new() {
2823 let p = Pipeline::new();
2824 assert!(p.is_empty());
2825 }
2826
2827 #[test]
2828 fn test_pipeline_is_empty_false_after_add_stage() {
2829 let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2830 assert!(!p.is_empty());
2831 }
2832
2833 #[test]
2834 fn test_circuit_breaker_service_name() {
2835 let cb = CircuitBreaker::new("my-service", 3, Duration::from_secs(1)).unwrap();
2836 assert_eq!(cb.service_name(), "my-service");
2837 }
2838
2839 #[test]
2840 fn test_retry_policy_none_has_max_one_attempt() {
2841 let p = RetryPolicy::none();
2842 assert_eq!(p.max_attempts, 1);
2843 assert_eq!(p.delay_for(0), Duration::ZERO);
2844 }
2845
2846 #[test]
2847 fn test_backpressure_is_full_false_when_empty() {
2848 let g = BackpressureGuard::new(5).unwrap();
2849 assert!(!g.is_full().unwrap());
2850 }
2851
2852 #[test]
2853 fn test_backpressure_is_full_true_when_at_capacity() {
2854 let g = BackpressureGuard::new(2).unwrap();
2855 g.try_acquire().unwrap();
2856 g.try_acquire().unwrap();
2857 assert!(g.is_full().unwrap());
2858 }
2859
2860 #[test]
2861 fn test_deduplicator_ttl_returns_configured_value() {
2862 let d = Deduplicator::new(Duration::from_secs(42));
2863 assert_eq!(d.ttl(), Duration::from_secs(42));
2864 }
2865
2866 #[test]
2867 fn test_circuit_breaker_is_closed_initially() {
2868 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(1)).unwrap();
2869 assert!(cb.is_closed());
2870 assert!(!cb.is_open());
2871 assert!(!cb.is_half_open());
2872 }
2873
2874 #[test]
2875 fn test_circuit_breaker_is_open_after_threshold_failures() {
2876 let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2877 cb.record_failure();
2878 cb.record_failure();
2879 assert!(cb.is_open());
2880 assert!(!cb.is_closed());
2881 }
2882
2883 #[test]
2884 fn test_retry_policy_total_max_delay_constant() {
2885 let p = RetryPolicy::constant(3, 100).unwrap();
2887 assert_eq!(p.total_max_delay_ms(), 300);
2888 }
2889
2890 #[test]
2891 fn test_retry_policy_total_max_delay_none_is_zero() {
2892 let p = RetryPolicy::none();
2893 assert_eq!(p.total_max_delay_ms(), 0);
2894 }
2895
2896 #[test]
2897 fn test_retry_policy_is_none_true_for_none() {
2898 let p = RetryPolicy::none();
2899 assert!(p.is_none());
2900 }
2901
2902 #[test]
2903 fn test_retry_policy_is_none_false_for_exponential() {
2904 let p = RetryPolicy::exponential(3, 10).unwrap();
2905 assert!(!p.is_none());
2906 }
2907
2908 #[test]
2909 fn test_pipeline_has_error_handler_false_by_default() {
2910 let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2911 assert!(!p.has_error_handler());
2912 }
2913
2914 #[test]
2915 fn test_pipeline_has_error_handler_true_after_set() {
2916 let p = Pipeline::new()
2917 .with_error_handler(|_stage, _err| "recovered".to_string());
2918 assert!(p.has_error_handler());
2919 }
2920
2921 #[test]
2922 fn test_backpressure_reset_clears_depth() {
2923 let g = BackpressureGuard::new(5).unwrap();
2924 g.try_acquire().unwrap();
2925 g.try_acquire().unwrap();
2926 assert_eq!(g.depth().unwrap(), 2);
2927 g.reset();
2928 assert_eq!(g.depth().unwrap(), 0);
2929 }
2930
2931 #[test]
2932 fn test_deduplicator_in_flight_keys_returns_started_keys() {
2933 let d = Deduplicator::new(Duration::from_secs(60));
2934 d.check("key-a", Duration::from_secs(60)).unwrap();
2935 d.check("key-b", Duration::from_secs(60)).unwrap();
2936 let mut keys = d.in_flight_keys().unwrap();
2937 keys.sort();
2938 assert_eq!(keys, vec!["key-a", "key-b"]);
2939 }
2940
2941 #[test]
2944 fn test_retry_policy_with_base_delay_ms_changes_delay() {
2945 let p = RetryPolicy::exponential(3, 100)
2946 .unwrap()
2947 .with_base_delay_ms(200)
2948 .unwrap();
2949 assert_eq!(p.delay_for(1), Duration::from_millis(200));
2950 }
2951
2952 #[test]
2953 fn test_retry_policy_with_base_delay_ms_rejects_zero() {
2954 let p = RetryPolicy::exponential(3, 100).unwrap();
2955 assert!(p.with_base_delay_ms(0).is_err());
2956 }
2957
2958 #[test]
2959 fn test_backpressure_reset_depth_clears_counter() {
2960 let guard = BackpressureGuard::new(5).unwrap();
2961 guard.try_acquire().unwrap();
2962 guard.try_acquire().unwrap();
2963 assert_eq!(guard.depth().unwrap(), 2);
2964 guard.reset_depth().unwrap();
2965 assert_eq!(guard.depth().unwrap(), 0);
2966 }
2967
2968 #[test]
2969 fn test_pipeline_remove_stage_returns_true_if_found() {
2970 let mut p = Pipeline::new()
2971 .add_stage("a", |s| Ok(s))
2972 .add_stage("b", |s| Ok(s));
2973 assert!(p.remove_stage("a"));
2974 assert_eq!(p.stage_count(), 1);
2975 assert_eq!(p.stage_names(), vec!["b"]);
2976 }
2977
2978 #[test]
2979 fn test_pipeline_remove_stage_returns_false_if_missing() {
2980 let mut p = Pipeline::new().add_stage("x", |s| Ok(s));
2981 assert!(!p.remove_stage("nope"));
2982 assert_eq!(p.stage_count(), 1);
2983 }
2984
2985 #[test]
2986 fn test_pipeline_clear_removes_all_stages() {
2987 let mut p = Pipeline::new()
2988 .add_stage("a", |s| Ok(s))
2989 .add_stage("b", |s| Ok(s));
2990 p.clear();
2991 assert!(p.is_empty());
2992 }
2993
2994 #[test]
2997 fn test_circuit_breaker_threshold_accessor() {
2998 let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(30)).unwrap();
2999 assert_eq!(cb.threshold(), 5);
3000 }
3001
3002 #[test]
3003 fn test_circuit_breaker_recovery_window_accessor() {
3004 let window = Duration::from_secs(45);
3005 let cb = CircuitBreaker::new("svc", 3, window).unwrap();
3006 assert_eq!(cb.recovery_window(), window);
3007 }
3008
3009 #[test]
3010 fn test_pipeline_get_stage_name_at_returns_correct_names() {
3011 let p = Pipeline::new()
3012 .add_stage("first", |s| Ok(s))
3013 .add_stage("second", |s| Ok(s));
3014 assert_eq!(p.get_stage_name_at(0), Some("first"));
3015 assert_eq!(p.get_stage_name_at(1), Some("second"));
3016 assert_eq!(p.get_stage_name_at(2), None);
3017 }
3018
3019 #[test]
3022 fn test_retry_policy_can_retry_within_budget() {
3023 let p = RetryPolicy::exponential(3, 100).unwrap();
3024 assert!(p.can_retry(0));
3025 assert!(p.can_retry(1));
3026 assert!(p.can_retry(2));
3027 }
3028
3029 #[test]
3030 fn test_retry_policy_can_retry_false_when_exhausted() {
3031 let p = RetryPolicy::exponential(3, 100).unwrap();
3032 assert!(!p.can_retry(3));
3033 assert!(!p.can_retry(99));
3034 }
3035
3036 #[test]
3037 fn test_retry_policy_none_only_allows_first_attempt() {
3038 let p = RetryPolicy::none();
3039 assert!(p.can_retry(0));
3040 assert!(!p.can_retry(1));
3041 }
3042
3043 #[test]
3046 fn test_retry_policy_max_attempts_accessor() {
3047 let p = RetryPolicy::exponential(7, 100).unwrap();
3048 assert_eq!(p.max_attempts(), 7);
3049 }
3050
3051 #[test]
3052 fn test_pipeline_stage_names_owned_returns_strings() {
3053 let p = Pipeline::new()
3054 .add_stage("alpha", |s| Ok(s))
3055 .add_stage("beta", |s| Ok(s));
3056 let owned = p.stage_names_owned();
3057 assert_eq!(owned, vec!["alpha".to_string(), "beta".to_string()]);
3058 }
3059
3060 #[test]
3061 fn test_pipeline_stage_names_owned_empty_when_no_stages() {
3062 let p = Pipeline::new();
3063 assert!(p.stage_names_owned().is_empty());
3064 }
3065
3066 #[test]
3069 fn test_attempts_remaining_full_at_zero() {
3070 let p = RetryPolicy::exponential(4, 100).unwrap();
3071 assert_eq!(p.attempts_remaining(0), 4);
3072 }
3073
3074 #[test]
3075 fn test_attempts_remaining_decrements_correctly() {
3076 let p = RetryPolicy::exponential(4, 100).unwrap();
3077 assert_eq!(p.attempts_remaining(2), 2);
3078 assert_eq!(p.attempts_remaining(4), 0);
3079 }
3080
3081 #[test]
3082 fn test_attempts_remaining_zero_when_exhausted() {
3083 let p = RetryPolicy::exponential(3, 100).unwrap();
3084 assert_eq!(p.attempts_remaining(10), 0);
3085 }
3086
3087 #[test]
3090 fn test_retry_policy_max_attempts_getter() {
3091 let p = RetryPolicy::exponential(7, 50).unwrap();
3092 assert_eq!(p.max_attempts(), 7);
3093 }
3094
3095 #[test]
3096 fn test_circuit_breaker_failure_count_increments() {
3097 let cb = CircuitBreaker::new("svc2", 3, std::time::Duration::from_secs(60)).unwrap();
3098 cb.record_failure();
3099 cb.record_failure();
3100 assert_eq!(cb.failure_count().unwrap(), 2);
3101 }
3102
3103 #[test]
3104 fn test_circuit_breaker_record_success_resets_failures() {
3105 let cb = CircuitBreaker::new("svc3", 5, std::time::Duration::from_secs(60)).unwrap();
3106 cb.record_failure();
3107 cb.record_failure();
3108 cb.record_success();
3109 assert_eq!(cb.failure_count().unwrap(), 0);
3110 assert!(cb.is_closed());
3111 }
3112
3113 #[test]
3114 fn test_circuit_breaker_threshold_and_recovery_window() {
3115 let cb = CircuitBreaker::new("svc4", 3, std::time::Duration::from_secs(30)).unwrap();
3116 assert_eq!(cb.threshold(), 3);
3117 assert_eq!(cb.recovery_window(), std::time::Duration::from_secs(30));
3118 }
3119
3120 #[test]
3121 fn test_circuit_breaker_reset_clears_state() {
3122 let cb = CircuitBreaker::new("svc5", 2, std::time::Duration::from_secs(60)).unwrap();
3123 cb.record_failure();
3124 cb.record_failure(); assert!(cb.is_open());
3126 cb.reset();
3127 assert!(cb.is_closed());
3128 assert_eq!(cb.failure_count().unwrap(), 0);
3129 }
3130
3131 #[test]
3132 fn test_deduplicator_cached_count_after_complete() {
3133 let d = Deduplicator::new(Duration::from_secs(60));
3134 d.check("key1", Duration::from_secs(60)).unwrap();
3135 d.complete("key1", "result").unwrap();
3136 assert_eq!(d.cached_count().unwrap(), 1);
3137 }
3138
3139 #[test]
3140 fn test_deduplicator_ttl_matches_configured() {
3141 let d = Deduplicator::new(Duration::from_secs(42));
3142 assert_eq!(d.ttl(), Duration::from_secs(42));
3143 }
3144
3145 #[test]
3146 fn test_deduplicator_purge_expired_removes_stale_entries() {
3147 let d = Deduplicator::new(Duration::ZERO); d.check("stale", Duration::ZERO).unwrap();
3149 d.complete("stale", "val").unwrap();
3150 std::thread::sleep(std::time::Duration::from_millis(1));
3152 let removed = d.purge_expired().unwrap();
3153 assert!(removed >= 1);
3154 }
3155
3156 #[test]
3157 fn test_backpressure_remaining_capacity() {
3158 let g = BackpressureGuard::new(5).unwrap();
3159 g.try_acquire().unwrap();
3160 assert_eq!(g.remaining_capacity().unwrap(), 4);
3161 }
3162
3163 #[test]
3164 fn test_backpressure_soft_depth_ratio_without_soft_limit() {
3165 let g = BackpressureGuard::new(5).unwrap();
3166 assert_eq!(g.soft_depth_ratio(), 0.0);
3167 }
3168
3169 #[test]
3170 fn test_backpressure_soft_depth_ratio_with_soft_limit() {
3171 let g = BackpressureGuard::new(10).unwrap()
3172 .with_soft_limit(4).unwrap();
3173 g.try_acquire().unwrap();
3174 g.try_acquire().unwrap();
3175 let ratio = g.soft_depth_ratio();
3176 assert!((ratio - 0.5).abs() < 1e-6);
3177 }
3178
3179 #[test]
3182 fn test_retry_delay_ms_for_matches_delay_for() {
3183 let p = RetryPolicy::exponential(5, 100).unwrap();
3184 assert_eq!(p.delay_ms_for(1), p.delay_for(1).as_millis() as u64);
3185 assert_eq!(p.delay_ms_for(3), p.delay_for(3).as_millis() as u64);
3186 }
3187
3188 #[test]
3189 fn test_backpressure_soft_limit_returns_configured_value() {
3190 let g = BackpressureGuard::new(10).unwrap()
3191 .with_soft_limit(5).unwrap();
3192 assert_eq!(g.soft_limit(), Some(5));
3193 }
3194
3195 #[test]
3196 fn test_backpressure_soft_limit_none_when_not_set() {
3197 let g = BackpressureGuard::new(10).unwrap();
3198 assert_eq!(g.soft_limit(), None);
3199 }
3200
3201 #[test]
3202 fn test_pipeline_has_stage_returns_true_when_present() {
3203 let p = Pipeline::new().add_stage("step1", |s| Ok(s));
3204 assert!(p.has_stage("step1"));
3205 assert!(!p.has_stage("step2"));
3206 }
3207
3208 #[test]
3209 fn test_pipeline_has_stage_false_for_empty_pipeline() {
3210 let p = Pipeline::new();
3211 assert!(!p.has_stage("anything"));
3212 }
3213
3214 #[test]
3217 fn test_deduplicator_max_entries_none_by_default() {
3218 let d = Deduplicator::new(Duration::from_secs(60));
3219 assert_eq!(d.max_entries(), None);
3220 }
3221
3222 #[test]
3223 fn test_deduplicator_max_entries_set_via_builder() {
3224 let d = Deduplicator::new(Duration::from_secs(60))
3225 .with_max_entries(50)
3226 .unwrap();
3227 assert_eq!(d.max_entries(), Some(50));
3228 }
3229
3230 #[test]
3231 fn test_retry_policy_delay_for_exponential_grows() {
3232 let p = RetryPolicy::exponential(5, 100).unwrap();
3233 let d1 = p.delay_for(1);
3235 let d2 = p.delay_for(2);
3236 assert!(d2 > d1, "exponential delay should grow: attempt 2 > attempt 1");
3237 }
3238
3239 #[test]
3240 fn test_retry_policy_delay_for_constant_stays_same() {
3241 let p = RetryPolicy::constant(5, 200).unwrap();
3242 assert_eq!(p.delay_for(0), p.delay_for(1));
3243 assert_eq!(p.delay_for(1), p.delay_for(3));
3244 }
3245
3246 #[test]
3249 fn test_is_no_retry_true_for_none_policy() {
3250 let p = RetryPolicy::none();
3251 assert!(p.is_no_retry());
3252 }
3253
3254 #[test]
3255 fn test_is_no_retry_false_for_exponential_policy() {
3256 let p = RetryPolicy::exponential(3, 50).unwrap();
3257 assert!(!p.is_no_retry());
3258 }
3259
3260 #[test]
3261 fn test_is_no_retry_false_for_constant_policy_with_multiple_attempts() {
3262 let p = RetryPolicy::constant(2, 100).unwrap();
3263 assert!(!p.is_no_retry());
3264 }
3265
3266 #[test]
3269 fn test_is_exponential_true_for_exponential_policy() {
3270 let p = RetryPolicy::exponential(3, 50).unwrap();
3271 assert!(p.is_exponential());
3272 }
3273
3274 #[test]
3275 fn test_is_exponential_false_for_constant_policy() {
3276 let p = RetryPolicy::constant(3, 50).unwrap();
3277 assert!(!p.is_exponential());
3278 }
3279
3280 #[test]
3281 fn test_is_exponential_false_for_none_policy() {
3282 let p = RetryPolicy::none();
3283 assert!(!p.is_exponential());
3284 }
3285
3286 #[test]
3289 fn test_is_soft_limited_false_without_soft_limit() {
3290 let g = BackpressureGuard::new(10).unwrap();
3291 assert!(!g.is_soft_limited());
3292 }
3293
3294 #[test]
3295 fn test_is_soft_limited_true_when_soft_limit_set() {
3296 let g = BackpressureGuard::new(10)
3297 .unwrap()
3298 .with_soft_limit(5)
3299 .unwrap();
3300 assert!(g.is_soft_limited());
3301 }
3302
3303 #[test]
3306 fn test_retry_policy_base_delay_ms_exponential() {
3307 let p = RetryPolicy::exponential(3, 250).unwrap();
3308 assert_eq!(p.base_delay_ms(), 250);
3309 }
3310
3311 #[test]
3312 fn test_retry_policy_base_delay_ms_constant() {
3313 let p = RetryPolicy::constant(5, 100).unwrap();
3314 assert_eq!(p.base_delay_ms(), 100);
3315 }
3316
3317 #[test]
3318 fn test_retry_policy_base_delay_ms_none_is_zero() {
3319 let p = RetryPolicy::none();
3320 assert_eq!(p.base_delay_ms(), 0);
3321 }
3322
3323 #[test]
3324 fn test_backpressure_percent_full_zero_when_empty() {
3325 let g = BackpressureGuard::new(100).unwrap();
3326 let pct = g.percent_full().unwrap();
3327 assert!((pct - 0.0).abs() < 1e-9);
3328 }
3329
3330 #[test]
3331 fn test_backpressure_percent_full_capped_at_100() {
3332 let g = BackpressureGuard::new(10).unwrap();
3333 for _ in 0..10 {
3335 g.try_acquire().unwrap();
3336 }
3337 let pct = g.percent_full().unwrap();
3338 assert!((pct - 100.0).abs() < 1e-9);
3339 }
3340
3341 #[test]
3344 fn test_deduplicator_get_result_returns_cached_value() {
3345 let d = Deduplicator::new(std::time::Duration::from_secs(60));
3346 d.check_and_register("req-1").unwrap();
3347 d.complete("req-1", "the answer").unwrap();
3348 let result = d.get_result("req-1").unwrap();
3349 assert_eq!(result, Some("the answer".to_string()));
3350 }
3351
3352 #[test]
3353 fn test_deduplicator_get_result_missing_key_returns_none() {
3354 let d = Deduplicator::new(std::time::Duration::from_secs(60));
3355 assert_eq!(d.get_result("ghost").unwrap(), None);
3356 }
3357
3358 #[test]
3359 fn test_pipeline_rename_stage_succeeds() {
3360 let mut p = Pipeline::new().add_stage("old-name", |s: String| Ok(s));
3361 let renamed = p.rename_stage("old-name", "new-name");
3362 assert!(renamed);
3363 assert!(p.has_stage("new-name"));
3364 assert!(!p.has_stage("old-name"));
3365 }
3366
3367 #[test]
3368 fn test_pipeline_rename_stage_missing_returns_false() {
3369 let mut p = Pipeline::new();
3370 assert!(!p.rename_stage("nonexistent", "anything"));
3371 }
3372
3373 #[test]
3374 fn test_circuit_breaker_failure_rate_zero_initially() {
3375 let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
3376 assert!((cb.failure_rate() - 0.0).abs() < 1e-9);
3377 }
3378
3379 #[test]
3380 fn test_circuit_breaker_failure_rate_increases_with_failures() {
3381 let cb = CircuitBreaker::new("svc-fr", 4, std::time::Duration::from_secs(10)).unwrap();
3382 cb.record_failure();
3383 cb.record_failure();
3384 assert!((cb.failure_rate() - 0.5).abs() < 1e-9);
3386 }
3387
3388 #[test]
3391 fn test_prepend_stage_inserts_at_front() {
3392 let p = Pipeline::new()
3393 .add_stage("second", |s| Ok(s))
3394 .prepend_stage("first", |s| Ok(s));
3395 let names = p.stage_names_owned();
3396 assert_eq!(names[0], "first");
3397 assert_eq!(names[1], "second");
3398 }
3399
3400 #[test]
3401 fn test_prepend_stage_executes_before_existing_stages() {
3402 let p = Pipeline::new()
3403 .add_stage("append", |s| Ok(format!("{s}_appended")))
3404 .prepend_stage("prefix", |s| Ok(format!("pre_{s}")));
3405 let result = p.run("input".to_string()).unwrap();
3406 assert_eq!(result, "pre_input_appended");
3407 }
3408
3409 #[test]
3410 fn test_prepend_stage_on_empty_pipeline() {
3411 let p = Pipeline::new().prepend_stage("only", |s| Ok(s.to_uppercase()));
3412 let result = p.run("hello".to_string()).unwrap();
3413 assert_eq!(result, "HELLO");
3414 }
3415
3416 #[test]
3419 fn test_circuit_breaker_is_at_threshold_false_initially() {
3420 let cb = CircuitBreaker::new("svc", 3, std::time::Duration::from_secs(10)).unwrap();
3421 assert!(!cb.is_at_threshold());
3422 }
3423
3424 #[test]
3425 fn test_circuit_breaker_is_at_threshold_true_when_failures_reach_threshold() {
3426 let cb = CircuitBreaker::new("svc-t", 2, std::time::Duration::from_secs(10)).unwrap();
3427 cb.record_failure();
3428 assert!(!cb.is_at_threshold());
3429 cb.record_failure();
3430 assert!(cb.is_at_threshold());
3431 }
3432
3433 #[test]
3434 fn test_backpressure_headroom_ratio_one_when_empty() {
3435 let g = BackpressureGuard::new(10).unwrap();
3436 let ratio = g.headroom_ratio().unwrap();
3437 assert!((ratio - 1.0).abs() < 1e-9);
3438 }
3439
3440 #[test]
3441 fn test_backpressure_headroom_ratio_decreases_on_acquire() {
3442 let g = BackpressureGuard::new(4).unwrap();
3443 g.try_acquire().unwrap(); let ratio = g.headroom_ratio().unwrap();
3445 assert!((ratio - 0.75).abs() < 1e-9);
3446 }
3447
3448 #[test]
3451 fn test_pipeline_first_stage_name_returns_first() {
3452 let p = Pipeline::new()
3453 .add_stage("alpha", |s| Ok(s))
3454 .add_stage("beta", |s| Ok(s));
3455 assert_eq!(p.first_stage_name(), Some("alpha"));
3456 }
3457
3458 #[test]
3459 fn test_pipeline_first_stage_name_none_when_empty() {
3460 let p = Pipeline::new();
3461 assert!(p.first_stage_name().is_none());
3462 }
3463
3464 #[test]
3465 fn test_pipeline_last_stage_name_returns_last() {
3466 let p = Pipeline::new()
3467 .add_stage("alpha", |s| Ok(s))
3468 .add_stage("omega", |s| Ok(s));
3469 assert_eq!(p.last_stage_name(), Some("omega"));
3470 }
3471
3472 #[test]
3473 fn test_pipeline_stage_index_returns_correct_position() {
3474 let p = Pipeline::new()
3475 .add_stage("first", |s| Ok(s))
3476 .add_stage("second", |s| Ok(s))
3477 .add_stage("third", |s| Ok(s));
3478 assert_eq!(p.stage_index("first"), Some(0));
3479 assert_eq!(p.stage_index("second"), Some(1));
3480 assert_eq!(p.stage_index("third"), Some(2));
3481 assert_eq!(p.stage_index("missing"), None);
3482 }
3483
3484 #[test]
3485 fn test_backpressure_is_empty_true_when_no_slots_acquired() {
3486 let g = BackpressureGuard::new(10).unwrap();
3487 assert!(g.is_empty().unwrap());
3488 }
3489
3490 #[test]
3491 fn test_backpressure_is_empty_false_after_acquire() {
3492 let g = BackpressureGuard::new(10).unwrap();
3493 g.try_acquire().unwrap();
3494 assert!(!g.is_empty().unwrap());
3495 }
3496
3497 #[test]
3498 fn test_backpressure_available_capacity_decrements_on_acquire() {
3499 let g = BackpressureGuard::new(5).unwrap();
3500 assert_eq!(g.available_capacity().unwrap(), 5);
3501 g.try_acquire().unwrap();
3502 assert_eq!(g.available_capacity().unwrap(), 4);
3503 }
3504
3505 #[test]
3508 fn test_evict_oldest_removes_first_cached_entry() {
3509 let d = Deduplicator::new(std::time::Duration::from_secs(60));
3510 d.check_and_register("alpha").unwrap();
3512 d.check_and_register("beta").unwrap();
3513 d.complete("alpha", "result_a").unwrap();
3514 d.complete("beta", "result_b").unwrap();
3515 let removed = d.evict_oldest().unwrap();
3517 assert!(removed);
3518 assert!(d.get_result("alpha").unwrap().is_none());
3519 assert!(d.get_result("beta").unwrap().is_some());
3520 }
3521
3522 #[test]
3523 fn test_evict_oldest_returns_false_when_empty() {
3524 let d = Deduplicator::new(std::time::Duration::from_secs(60));
3525 assert!(!d.evict_oldest().unwrap());
3526 }
3527
3528 #[test]
3531 fn test_circuit_breaker_is_at_threshold_true_after_three_failures() {
3532 let cb = CircuitBreaker::new("svc-3", 3, std::time::Duration::from_secs(60)).unwrap();
3533 cb.record_failure();
3534 cb.record_failure();
3535 cb.record_failure();
3536 assert!(cb.is_at_threshold());
3537 }
3538
3539 #[test]
3542 fn test_failures_until_open_equals_threshold_initially() {
3543 let cb = CircuitBreaker::new("svc-fuo", 5, std::time::Duration::from_secs(60)).unwrap();
3544 assert_eq!(cb.failures_until_open(), 5);
3545 }
3546
3547 #[test]
3548 fn test_failures_until_open_decrements_with_each_failure() {
3549 let cb = CircuitBreaker::new("svc-fuo2", 4, std::time::Duration::from_secs(60)).unwrap();
3550 cb.record_failure();
3551 assert_eq!(cb.failures_until_open(), 3);
3552 cb.record_failure();
3553 assert_eq!(cb.failures_until_open(), 2);
3554 }
3555
3556 #[test]
3557 fn test_failures_until_open_zero_when_at_threshold() {
3558 let cb = CircuitBreaker::new("svc-fuo3", 2, std::time::Duration::from_secs(60)).unwrap();
3559 cb.record_failure();
3560 cb.record_failure();
3561 assert_eq!(cb.failures_until_open(), 0);
3562 }
3563
3564 #[test]
3567 fn test_deduplicator_cached_keys_empty_initially() {
3568 let d = Deduplicator::new(Duration::from_secs(60));
3569 assert!(d.cached_keys().unwrap().is_empty());
3570 }
3571
3572 #[test]
3573 fn test_deduplicator_cached_keys_contains_completed_key() {
3574 let d = Deduplicator::new(Duration::from_secs(60));
3575 d.check_and_register("ck-key").unwrap();
3576 d.complete("ck-key", "result").unwrap();
3577 let keys = d.cached_keys().unwrap();
3578 assert!(keys.contains(&"ck-key".to_string()));
3579 }
3580
3581 #[test]
3582 fn test_deduplicator_cached_keys_excludes_in_flight() {
3583 let d = Deduplicator::new(Duration::from_secs(60));
3584 d.check_and_register("pending-key").unwrap();
3585 assert!(!d.cached_keys().unwrap().contains(&"pending-key".to_string()));
3587 }
3588
3589 #[test]
3590 fn test_deduplicator_cached_keys_multiple_entries() {
3591 let d = Deduplicator::new(Duration::from_secs(60));
3592 for k in ["alpha", "beta", "gamma"] {
3593 d.check_and_register(k).unwrap();
3594 d.complete(k, "v").unwrap();
3595 }
3596 let keys = d.cached_keys().unwrap();
3597 assert_eq!(keys.len(), 3);
3598 }
3599
3600 #[test]
3603 fn test_retry_policy_is_constant_true_for_constant() {
3604 let p = RetryPolicy::constant(3, 100).unwrap();
3605 assert!(p.is_constant());
3606 assert!(!p.is_exponential());
3607 }
3608
3609 #[test]
3610 fn test_retry_policy_is_constant_false_for_exponential() {
3611 let p = RetryPolicy::exponential(3, 100).unwrap();
3612 assert!(!p.is_constant());
3613 }
3614
3615 #[test]
3616 fn test_retry_policy_total_max_delay_ms_constant() {
3617 let p = RetryPolicy::constant(3, 100).unwrap();
3619 assert_eq!(p.total_max_delay_ms(), 300);
3620 }
3621
3622 #[test]
3623 fn test_retry_policy_total_max_delay_ms_exponential() {
3624 let p = RetryPolicy::exponential(3, 100).unwrap();
3626 let total = p.total_max_delay_ms();
3627 assert!(total >= 300); }
3629
3630 #[test]
3633 fn test_circuit_breaker_is_healthy_true_when_closed() {
3634 let cb = CircuitBreaker::new("svc-ih1", 3, Duration::from_secs(60)).unwrap();
3635 assert!(cb.is_healthy());
3636 }
3637
3638 #[test]
3639 fn test_circuit_breaker_is_healthy_false_when_open() {
3640 let cb = CircuitBreaker::new("svc-ih2", 1, Duration::from_secs(60)).unwrap();
3641 let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3642 assert!(!cb.is_healthy());
3643 }
3644
3645 #[test]
3646 fn test_circuit_breaker_is_half_open_after_zero_recovery() {
3647 let cb = CircuitBreaker::new("svc-ho1", 1, Duration::ZERO).unwrap();
3648 let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3649 assert!(cb.is_half_open() || cb.is_healthy()); }
3652
3653 #[test]
3656 fn test_deduplicator_is_idle_true_when_empty() {
3657 let d = Deduplicator::new(Duration::from_secs(60));
3658 assert!(d.is_idle().unwrap());
3659 }
3660
3661 #[test]
3662 fn test_deduplicator_is_idle_false_when_in_flight() {
3663 let d = Deduplicator::new(Duration::from_secs(60));
3664 d.check_and_register("req-x").unwrap();
3665 assert!(!d.is_idle().unwrap());
3666 }
3667
3668 #[test]
3669 fn test_deduplicator_is_idle_true_after_complete() {
3670 let d = Deduplicator::new(Duration::from_secs(60));
3671 d.check_and_register("req-y").unwrap();
3672 d.complete("req-y", "done").unwrap();
3673 assert!(d.is_idle().unwrap());
3674 }
3675
3676 #[test]
3679 fn test_deduplicator_in_flight_count_zero_initially() {
3680 let d = Deduplicator::new(Duration::from_secs(60));
3681 assert_eq!(d.in_flight_count().unwrap(), 0);
3682 }
3683
3684 #[test]
3685 fn test_deduplicator_in_flight_count_increments_on_register() {
3686 let d = Deduplicator::new(Duration::from_secs(60));
3687 d.check_and_register("k1").unwrap();
3688 d.check_and_register("k2").unwrap();
3689 assert_eq!(d.in_flight_count().unwrap(), 2);
3690 }
3691
3692 #[test]
3693 fn test_deduplicator_in_flight_count_decrements_after_complete() {
3694 let d = Deduplicator::new(Duration::from_secs(60));
3695 d.check_and_register("k1").unwrap();
3696 d.complete("k1", "result").unwrap();
3697 assert_eq!(d.in_flight_count().unwrap(), 0);
3698 }
3699
3700 #[test]
3703 fn test_deduplicator_total_count_sums_in_flight_and_cached() {
3704 let d = Deduplicator::new(Duration::from_secs(60));
3705 d.check_and_register("k1").unwrap(); d.check_and_register("k2").unwrap(); d.complete("k1", "done").unwrap(); assert_eq!(d.total_count().unwrap(), 2);
3710 }
3711
3712 #[test]
3713 fn test_deduplicator_total_count_zero_when_empty() {
3714 let d = Deduplicator::new(Duration::from_secs(60));
3715 assert_eq!(d.total_count().unwrap(), 0);
3716 }
3717
3718 #[test]
3719 fn test_backpressure_acquired_count_zero_initially() {
3720 let g = BackpressureGuard::new(5).unwrap();
3721 assert_eq!(g.acquired_count().unwrap(), 0);
3722 }
3723
3724 #[test]
3725 fn test_backpressure_acquired_count_increments_on_acquire() {
3726 let g = BackpressureGuard::new(5).unwrap();
3727 g.try_acquire().unwrap();
3728 g.try_acquire().unwrap();
3729 assert_eq!(g.acquired_count().unwrap(), 2);
3730 }
3731
3732 #[test]
3733 fn test_pipeline_swap_stages_swaps_positions() {
3734 let mut p = Pipeline::new()
3735 .add_stage("a", |s| Ok(s + "A"))
3736 .add_stage("b", |s| Ok(s + "B"));
3737 let swapped = p.swap_stages("a", "b");
3738 assert!(swapped);
3739 assert_eq!(p.first_stage_name().unwrap(), "b");
3740 assert_eq!(p.last_stage_name().unwrap(), "a");
3741 }
3742
3743 #[test]
3744 fn test_pipeline_swap_stages_returns_false_for_unknown_stage() {
3745 let mut p = Pipeline::new().add_stage("a", |s| Ok(s));
3746 assert!(!p.swap_stages("a", "missing"));
3747 }
3748
3749 #[test]
3750 fn test_retry_policy_will_retry_at_all_false_for_none() {
3751 let p = RetryPolicy::none();
3752 assert!(!p.will_retry_at_all());
3753 }
3754
3755 #[test]
3756 fn test_retry_policy_will_retry_at_all_true_for_exponential() {
3757 let p = RetryPolicy::exponential(3, 100).unwrap();
3758 assert!(p.will_retry_at_all());
3759 }
3760
3761 #[test]
3764 fn test_deduplicator_fail_removes_in_flight_key() {
3765 let d = Deduplicator::new(Duration::from_secs(60));
3766 d.check_and_register("failing-req").unwrap();
3767 assert!(!d.is_idle().unwrap());
3768 d.fail("failing-req").unwrap();
3769 assert!(d.is_idle().unwrap());
3770 }
3771
3772 #[test]
3773 fn test_deduplicator_fail_on_unknown_key_is_noop() {
3774 let d = Deduplicator::new(Duration::from_secs(60));
3775 assert!(d.fail("nonexistent").is_ok());
3776 }
3777
3778 #[test]
3779 fn test_deduplicator_fail_allows_reregistration() {
3780 let d = Deduplicator::new(Duration::from_secs(60));
3781 d.check_and_register("retry-key").unwrap();
3782 d.fail("retry-key").unwrap();
3783 let result = d.check_and_register("retry-key").unwrap();
3784 assert_eq!(result, DeduplicationResult::New);
3785 }
3786
3787 #[test]
3790 fn test_retry_policy_max_total_delay_ms_constant_policy() {
3791 let p = RetryPolicy::constant(3, 100).unwrap();
3792 assert_eq!(p.max_total_delay_ms(), 300);
3794 }
3795
3796 #[test]
3797 fn test_retry_policy_max_total_delay_ms_single_attempt() {
3798 let p = RetryPolicy::constant(1, 50).unwrap();
3799 assert_eq!(p.max_total_delay_ms(), 50);
3800 }
3801
3802 #[test]
3805 fn test_retry_policy_is_last_attempt_true_at_max() {
3806 let p = RetryPolicy::exponential(3, 100).unwrap();
3807 assert!(p.is_last_attempt(3));
3808 }
3809
3810 #[test]
3811 fn test_retry_policy_is_last_attempt_false_before_max() {
3812 let p = RetryPolicy::exponential(3, 100).unwrap();
3813 assert!(!p.is_last_attempt(2));
3814 }
3815
3816 #[test]
3817 fn test_retry_policy_is_last_attempt_true_beyond_max() {
3818 let p = RetryPolicy::exponential(3, 100).unwrap();
3819 assert!(p.is_last_attempt(4));
3820 }
3821
3822 #[test]
3825 fn test_retry_policy_delay_sum_ms_constant_two_attempts() {
3826 let p = RetryPolicy::constant(5, 100).unwrap();
3827 assert_eq!(p.delay_sum_ms(2), 200);
3828 }
3829
3830 #[test]
3831 fn test_retry_policy_delay_sum_ms_capped_at_max_attempts() {
3832 let p = RetryPolicy::constant(2, 50).unwrap();
3833 assert_eq!(p.delay_sum_ms(10), 100);
3835 }
3836
3837 #[test]
3840 fn test_retry_policy_avg_delay_ms_constant() {
3841 let p = RetryPolicy::constant(4, 100).unwrap();
3842 assert_eq!(p.avg_delay_ms(), 100);
3844 }
3845
3846 #[test]
3847 fn test_retry_policy_avg_delay_ms_single_attempt_policy() {
3848 let p = RetryPolicy::none();
3850 assert_eq!(p.avg_delay_ms(), 0);
3851 }
3852
3853 #[test]
3854 fn test_backoff_factor_exponential_returns_two() {
3855 let p = RetryPolicy::exponential(3, 100).unwrap();
3856 assert!((p.backoff_factor() - 2.0).abs() < 1e-9);
3857 }
3858
3859 #[test]
3860 fn test_backoff_factor_constant_returns_one() {
3861 let p = RetryPolicy::constant(3, 100).unwrap();
3862 assert!((p.backoff_factor() - 1.0).abs() < 1e-9);
3863 }
3864
3865 #[test]
3866 fn test_pipeline_count_stages_matching_counts_by_keyword() {
3867 let p = Pipeline::new()
3868 .add_stage("normalize-text", |s| Ok(s))
3869 .add_stage("text-trim", |s| Ok(s))
3870 .add_stage("embed", |s| Ok(s));
3871 assert_eq!(p.count_stages_matching("text"), 2);
3872 assert_eq!(p.count_stages_matching("embed"), 1);
3873 assert_eq!(p.count_stages_matching("missing"), 0);
3874 }
3875
3876 #[test]
3877 fn test_pipeline_count_stages_matching_case_insensitive() {
3878 let p = Pipeline::new().add_stage("TEXT-CLEAN", |s| Ok(s));
3879 assert_eq!(p.count_stages_matching("text"), 1);
3880 }
3881
3882 #[test]
3883 fn test_backpressure_guard_over_soft_limit_true_when_exceeded() {
3884 let guard = BackpressureGuard::new(10)
3885 .unwrap()
3886 .with_soft_limit(1)
3887 .unwrap();
3888 guard.try_acquire().unwrap();
3889 guard.try_acquire().unwrap();
3890 assert!(guard.over_soft_limit().unwrap());
3891 }
3892
3893 #[test]
3894 fn test_backpressure_guard_over_soft_limit_false_when_no_soft_limit() {
3895 let guard = BackpressureGuard::new(10).unwrap();
3896 guard.try_acquire().unwrap();
3897 assert!(!guard.over_soft_limit().unwrap());
3898 }
3899
3900 #[test]
3903 fn test_pipeline_description_empty() {
3904 let p = Pipeline::new();
3905 assert_eq!(p.description(), "Pipeline[empty]");
3906 }
3907
3908 #[test]
3909 fn test_pipeline_description_single_stage() {
3910 let p = Pipeline::new().add_stage("trim", |s: String| Ok(s.trim().to_owned()));
3911 assert_eq!(p.description(), "Pipeline[1 stage: trim]");
3912 }
3913
3914 #[test]
3915 fn test_pipeline_description_multiple_stages() {
3916 let p = Pipeline::new()
3917 .add_stage("a", |s: String| Ok(s))
3918 .add_stage("b", |s: String| Ok(s))
3919 .add_stage("c", |s: String| Ok(s));
3920 let desc = p.description();
3921 assert!(desc.contains("3 stages"));
3922 assert!(desc.contains("a → b → c"));
3923 }
3924
3925 #[test]
3926 fn test_pipeline_has_unique_stage_names_true_when_all_unique() {
3927 let p = Pipeline::new()
3928 .add_stage("x", |s: String| Ok(s))
3929 .add_stage("y", |s: String| Ok(s));
3930 assert!(p.has_unique_stage_names());
3931 }
3932
3933 #[test]
3934 fn test_pipeline_has_unique_stage_names_false_when_duplicate() {
3935 let p = Pipeline::new()
3936 .add_stage("dup", |s: String| Ok(s))
3937 .add_stage("dup", |s: String| Ok(s));
3938 assert!(!p.has_unique_stage_names());
3939 }
3940
3941 #[test]
3942 fn test_pipeline_has_unique_stage_names_true_for_empty_pipeline() {
3943 let p = Pipeline::new();
3944 assert!(p.has_unique_stage_names());
3945 }
3946
3947 #[test]
3950 fn test_pipeline_stage_name_lengths_returns_byte_lengths_in_order() {
3951 let p = Pipeline::new()
3952 .add_stage("ab", |s: String| Ok(s))
3953 .add_stage("cdef", |s: String| Ok(s));
3954 assert_eq!(p.stage_name_lengths(), vec![2, 4]);
3955 }
3956
3957 #[test]
3958 fn test_pipeline_stage_name_lengths_empty_pipeline_returns_empty() {
3959 let p = Pipeline::new();
3960 assert!(p.stage_name_lengths().is_empty());
3961 }
3962
3963 #[test]
3964 fn test_pipeline_avg_stage_name_length_computed_correctly() {
3965 let p = Pipeline::new()
3966 .add_stage("ab", |s: String| Ok(s)) .add_stage("abcd", |s: String| Ok(s)); assert!((p.avg_stage_name_length() - 3.0).abs() < 1e-9);
3969 }
3970
3971 #[test]
3972 fn test_pipeline_avg_stage_name_length_zero_for_empty() {
3973 assert_eq!(Pipeline::new().avg_stage_name_length(), 0.0);
3974 }
3975
3976 #[test]
3977 fn test_pipeline_stages_containing_returns_matching_names() {
3978 let p = Pipeline::new()
3979 .add_stage("tokenize", |s: String| Ok(s))
3980 .add_stage("encode", |s: String| Ok(s))
3981 .add_stage("token-validate", |s: String| Ok(s));
3982 let result = p.stages_containing("token");
3983 assert_eq!(result.len(), 2);
3984 assert!(result.contains(&"tokenize"));
3985 assert!(result.contains(&"token-validate"));
3986 }
3987
3988 #[test]
3989 fn test_pipeline_stages_containing_returns_empty_when_no_match() {
3990 let p = Pipeline::new().add_stage("process", |s: String| Ok(s));
3991 assert!(p.stages_containing("xyz").is_empty());
3992 }
3993
3994 #[test]
3995 fn test_pipeline_stage_is_first_returns_true_for_first_stage() {
3996 let p = Pipeline::new()
3997 .add_stage("first", |s: String| Ok(s))
3998 .add_stage("second", |s: String| Ok(s));
3999 assert!(p.stage_is_first("first"));
4000 assert!(!p.stage_is_first("second"));
4001 }
4002
4003 #[test]
4004 fn test_pipeline_stage_is_last_returns_true_for_last_stage() {
4005 let p = Pipeline::new()
4006 .add_stage("first", |s: String| Ok(s))
4007 .add_stage("last", |s: String| Ok(s));
4008 assert!(p.stage_is_last("last"));
4009 assert!(!p.stage_is_last("first"));
4010 }
4011
4012 #[test]
4015 fn test_stage_names_sorted_returns_alphabetical_order() {
4016 let p = Pipeline::new()
4017 .add_stage("zebra", |s: String| Ok(s))
4018 .add_stage("alpha", |s: String| Ok(s))
4019 .add_stage("mango", |s: String| Ok(s));
4020 assert_eq!(p.stage_names_sorted(), vec!["alpha", "mango", "zebra"]);
4021 }
4022
4023 #[test]
4024 fn test_stage_names_sorted_empty_pipeline_returns_empty() {
4025 let p = Pipeline::new();
4026 assert!(p.stage_names_sorted().is_empty());
4027 }
4028
4029 #[test]
4030 fn test_longest_stage_name_returns_longest() {
4031 let p = Pipeline::new()
4032 .add_stage("ab", |s: String| Ok(s))
4033 .add_stage("abcde", |s: String| Ok(s))
4034 .add_stage("abc", |s: String| Ok(s));
4035 assert_eq!(p.longest_stage_name(), Some("abcde"));
4036 }
4037
4038 #[test]
4039 fn test_longest_stage_name_empty_pipeline_returns_none() {
4040 let p = Pipeline::new();
4041 assert_eq!(p.longest_stage_name(), None);
4042 }
4043
4044 #[test]
4045 fn test_shortest_stage_name_returns_shortest() {
4046 let p = Pipeline::new()
4047 .add_stage("ab", |s: String| Ok(s))
4048 .add_stage("abcde", |s: String| Ok(s))
4049 .add_stage("a", |s: String| Ok(s));
4050 assert_eq!(p.shortest_stage_name(), Some("a"));
4051 }
4052
4053 #[test]
4054 fn test_shortest_stage_name_empty_pipeline_returns_none() {
4055 let p = Pipeline::new();
4056 assert_eq!(p.shortest_stage_name(), None);
4057 }
4058
4059 #[test]
4062 fn test_circuit_state_display_closed() {
4063 let s = CircuitState::Closed;
4064 assert_eq!(s.to_string(), "Closed");
4065 }
4066
4067 #[test]
4068 fn test_circuit_state_display_open() {
4069 let s = CircuitState::Open { opened_at: std::time::Instant::now() };
4070 assert_eq!(s.to_string(), "Open");
4071 }
4072
4073 #[test]
4074 fn test_circuit_state_display_half_open() {
4075 let s = CircuitState::HalfOpen;
4076 assert_eq!(s.to_string(), "HalfOpen");
4077 }
4078
4079 #[test]
4080 fn test_retry_policy_display_exponential() {
4081 let p = RetryPolicy::exponential(3, 100).unwrap();
4082 let s = p.to_string();
4083 assert!(s.contains("Exponential"));
4084 assert!(s.contains('3'));
4085 assert!(s.contains("100ms"));
4086 }
4087
4088 #[test]
4089 fn test_retry_policy_display_constant() {
4090 let p = RetryPolicy::constant(5, 50).unwrap();
4091 let s = p.to_string();
4092 assert!(s.contains("Constant"));
4093 assert!(s.contains('5'));
4094 assert!(s.contains("50ms"));
4095 }
4096
4097 #[test]
4100 fn test_pipeline_total_stage_name_bytes_sums_correctly() {
4101 let p = Pipeline::new()
4102 .add_stage("ab", |s: String| Ok(s)) .add_stage("xyz", |s: String| Ok(s)); assert_eq!(p.total_stage_name_bytes(), 5);
4105 }
4106
4107 #[test]
4108 fn test_pipeline_total_stage_name_bytes_zero_for_empty() {
4109 assert_eq!(Pipeline::new().total_stage_name_bytes(), 0);
4110 }
4111
4112 #[test]
4113 fn test_pipeline_stages_before_returns_preceding_names() {
4114 let p = Pipeline::new()
4115 .add_stage("a", |s: String| Ok(s))
4116 .add_stage("b", |s: String| Ok(s))
4117 .add_stage("c", |s: String| Ok(s));
4118 assert_eq!(p.stages_before("c"), vec!["a", "b"]);
4119 assert!(p.stages_before("a").is_empty());
4120 }
4121
4122 #[test]
4123 fn test_pipeline_stages_before_returns_empty_for_unknown_stage() {
4124 let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4125 assert!(p.stages_before("missing").is_empty());
4126 }
4127
4128 #[test]
4131 fn test_stages_after_returns_stages_following_name() {
4132 let p = Pipeline::new()
4133 .add_stage("a", |s: String| Ok(s))
4134 .add_stage("b", |s: String| Ok(s))
4135 .add_stage("c", |s: String| Ok(s));
4136 assert_eq!(p.stages_after("a"), vec!["b", "c"]);
4137 }
4138
4139 #[test]
4140 fn test_stages_after_last_stage_returns_empty() {
4141 let p = Pipeline::new()
4142 .add_stage("a", |s: String| Ok(s))
4143 .add_stage("b", |s: String| Ok(s));
4144 assert!(p.stages_after("b").is_empty());
4145 }
4146
4147 #[test]
4148 fn test_stages_after_unknown_name_returns_empty() {
4149 let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4150 assert!(p.stages_after("missing").is_empty());
4151 }
4152
4153 #[test]
4154 fn test_stage_position_from_end_last_is_zero() {
4155 let p = Pipeline::new()
4156 .add_stage("x", |s: String| Ok(s))
4157 .add_stage("y", |s: String| Ok(s))
4158 .add_stage("z", |s: String| Ok(s));
4159 assert_eq!(p.stage_position_from_end("z"), Some(0));
4160 assert_eq!(p.stage_position_from_end("x"), Some(2));
4161 }
4162
4163 #[test]
4164 fn test_stage_position_from_end_unknown_returns_none() {
4165 let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4166 assert_eq!(p.stage_position_from_end("missing"), None);
4167 }
4168
4169 #[test]
4170 fn test_contains_all_stages_true_when_all_present() {
4171 let p = Pipeline::new()
4172 .add_stage("a", |s: String| Ok(s))
4173 .add_stage("b", |s: String| Ok(s));
4174 assert!(p.contains_all_stages(&["a", "b"]));
4175 }
4176
4177 #[test]
4178 fn test_contains_all_stages_false_when_one_missing() {
4179 let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4180 assert!(!p.contains_all_stages(&["a", "b"]));
4181 }
4182
4183 #[test]
4184 fn test_contains_all_stages_true_for_empty_names() {
4185 let p = Pipeline::new();
4186 assert!(p.contains_all_stages(&[]));
4187 }
4188
4189 #[test]
4192 fn test_stage_count_above_name_len_counts_longer_names() {
4193 let p = Pipeline::new()
4194 .add_stage("ab", |s: String| Ok(s))
4195 .add_stage("abcde", |s: String| Ok(s))
4196 .add_stage("xyz", |s: String| Ok(s));
4197 assert_eq!(p.stage_count_above_name_len(2), 2); }
4199
4200 #[test]
4201 fn test_stage_count_above_name_len_zero_when_none_exceed() {
4202 let p = Pipeline::new()
4203 .add_stage("a", |s: String| Ok(s))
4204 .add_stage("b", |s: String| Ok(s));
4205 assert_eq!(p.stage_count_above_name_len(5), 0);
4206 }
4207
4208 #[test]
4209 fn test_stage_pairs_returns_consecutive_pairs() {
4210 let p = Pipeline::new()
4211 .add_stage("a", |s: String| Ok(s))
4212 .add_stage("b", |s: String| Ok(s))
4213 .add_stage("c", |s: String| Ok(s));
4214 assert_eq!(p.stage_pairs(), vec![("a", "b"), ("b", "c")]);
4215 }
4216
4217 #[test]
4218 fn test_stage_pairs_empty_for_single_stage_pipeline() {
4219 let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4220 assert!(p.stage_pairs().is_empty());
4221 }
4222
4223 #[test]
4226 fn test_circuit_breaker_describe_contains_service_name() {
4227 let cb = CircuitBreaker::new("my-service", 3, std::time::Duration::from_secs(30)).unwrap();
4228 let desc = cb.describe().unwrap();
4229 assert!(desc.contains("my-service"));
4230 }
4231
4232 #[test]
4233 fn test_circuit_breaker_describe_shows_closed_state_initially() {
4234 let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
4235 let desc = cb.describe().unwrap();
4236 assert!(desc.contains("Closed"));
4237 }
4238
4239 #[test]
4240 fn test_circuit_breaker_describe_shows_failure_counts() {
4241 let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
4242 cb.record_failure();
4243 cb.record_failure();
4244 let desc = cb.describe().unwrap();
4245 assert!(desc.contains("2/5"));
4246 }
4247
4248 #[test]
4249 fn test_retry_policy_attempts_budget_used_zero_at_start() {
4250 let p = RetryPolicy::exponential(4, 10).unwrap();
4251 assert_eq!(p.attempts_budget_used(0), 0.0);
4252 }
4253
4254 #[test]
4255 fn test_retry_policy_attempts_budget_used_one_when_exhausted() {
4256 let p = RetryPolicy::exponential(4, 10).unwrap();
4257 assert_eq!(p.attempts_budget_used(4), 1.0);
4258 }
4259
4260 #[test]
4261 fn test_retry_policy_attempts_budget_used_clamped_to_one() {
4262 let p = RetryPolicy::exponential(4, 10).unwrap();
4263 assert_eq!(p.attempts_budget_used(10), 1.0);
4264 }
4265
4266 #[test]
4267 fn test_retry_policy_attempts_budget_used_half_way() {
4268 let p = RetryPolicy::constant(4, 10).unwrap();
4269 assert!((p.attempts_budget_used(2) - 0.5).abs() < 1e-9);
4270 }
4271
4272 #[test]
4273 fn test_retry_policy_attempts_budget_used_fully_used_for_none_policy_after_one_attempt() {
4274 let p = RetryPolicy::none();
4276 assert_eq!(p.attempts_budget_used(1), 1.0);
4277 assert_eq!(p.attempts_budget_used(0), 0.0);
4278 }
4279
4280 #[test]
4283 fn test_stage_at_returns_name_at_index() {
4284 let p = Pipeline::new()
4285 .add_stage("first", |s: String| Ok(s))
4286 .add_stage("second", |s: String| Ok(s))
4287 .add_stage("third", |s: String| Ok(s));
4288 assert_eq!(p.stage_at(0), Some("first"));
4289 assert_eq!(p.stage_at(2), Some("third"));
4290 assert_eq!(p.stage_at(3), None);
4291 }
4292
4293 #[test]
4294 fn test_stage_at_returns_none_for_empty_pipeline() {
4295 let p = Pipeline::new();
4296 assert_eq!(p.stage_at(0), None);
4297 }
4298
4299 #[test]
4300 fn test_stages_reversed_returns_names_in_reverse_order() {
4301 let p = Pipeline::new()
4302 .add_stage("a", |s: String| Ok(s))
4303 .add_stage("b", |s: String| Ok(s))
4304 .add_stage("c", |s: String| Ok(s));
4305 assert_eq!(p.stages_reversed(), vec!["c", "b", "a"]);
4306 }
4307
4308 #[test]
4309 fn test_stages_reversed_empty_for_empty_pipeline() {
4310 let p = Pipeline::new();
4311 assert!(p.stages_reversed().is_empty());
4312 }
4313
4314 #[test]
4317 fn test_pipeline_is_empty_true_for_empty_pipeline() {
4318 let p = Pipeline::new();
4319 assert!(p.pipeline_is_empty());
4320 }
4321
4322 #[test]
4323 fn test_pipeline_is_empty_false_after_adding_stage() {
4324 let p = Pipeline::new().add_stage("a", |s: String| Ok(s));
4325 assert!(!p.pipeline_is_empty());
4326 }
4327
4328 #[test]
4331 fn test_unique_stage_names_returns_sorted_names() {
4332 let p = Pipeline::new()
4333 .add_stage("charlie", |s: String| Ok(s))
4334 .add_stage("alpha", |s: String| Ok(s))
4335 .add_stage("bravo", |s: String| Ok(s));
4336 assert_eq!(p.unique_stage_names(), vec!["alpha", "bravo", "charlie"]);
4337 }
4338
4339 #[test]
4340 fn test_unique_stage_names_empty_for_empty_pipeline() {
4341 let p = Pipeline::new();
4342 assert!(p.unique_stage_names().is_empty());
4343 }
4344
4345 #[test]
4348 fn test_stage_names_with_prefix_returns_matching_stages() {
4349 let p = Pipeline::new()
4350 .add_stage("validate_input", |s: String| Ok(s))
4351 .add_stage("transform_data", |s: String| Ok(s))
4352 .add_stage("validate_output", |s: String| Ok(s));
4353 let names = p.stage_names_with_prefix("validate");
4354 assert_eq!(names, vec!["validate_input", "validate_output"]);
4355 }
4356
4357 #[test]
4358 fn test_stage_names_with_prefix_empty_when_no_match() {
4359 let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
4360 assert!(p.stage_names_with_prefix("validate").is_empty());
4361 }
4362
4363 #[test]
4366 fn test_stages_with_suffix_returns_matching_stages() {
4367 let p = Pipeline::new()
4368 .add_stage("input_validate", |s: String| Ok(s))
4369 .add_stage("transform_data", |s: String| Ok(s))
4370 .add_stage("output_validate", |s: String| Ok(s));
4371 let names = p.stages_with_suffix("validate");
4372 assert_eq!(names, vec!["input_validate", "output_validate"]);
4373 }
4374
4375 #[test]
4376 fn test_stages_with_suffix_empty_when_no_match() {
4377 let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
4378 assert!(p.stages_with_suffix("validate").is_empty());
4379 }
4380
4381 #[test]
4384 fn test_has_stage_with_name_containing_true_when_match_exists() {
4385 let p = Pipeline::new()
4386 .add_stage("transform_input", |s: String| Ok(s))
4387 .add_stage("write_output", |s: String| Ok(s));
4388 assert!(p.has_stage_with_name_containing("transform"));
4389 }
4390
4391 #[test]
4392 fn test_has_stage_with_name_containing_false_when_no_match() {
4393 let p = Pipeline::new().add_stage("write", |s: String| Ok(s));
4394 assert!(!p.has_stage_with_name_containing("transform"));
4395 }
4396
4397 #[test]
4398 fn test_stage_name_bytes_total_sums_name_lengths() {
4399 let p = Pipeline::new()
4400 .add_stage("ab", |s: String| Ok(s))
4401 .add_stage("cde", |s: String| Ok(s));
4402 assert_eq!(p.stage_name_bytes_total(), 5);
4403 }
4404
4405 #[test]
4406 fn test_stage_name_bytes_total_zero_for_empty_pipeline() {
4407 let p = Pipeline::new();
4408 assert_eq!(p.stage_name_bytes_total(), 0);
4409 }
4410
4411 #[test]
4414 fn test_failure_headroom_full_when_no_failures_recorded() {
4415 let cb = CircuitBreaker::new("svc-r47", 3, std::time::Duration::from_secs(10)).unwrap();
4416 assert_eq!(cb.failure_headroom(), 3);
4417 }
4418
4419 #[test]
4420 fn test_failure_headroom_decreases_with_each_failure() {
4421 let cb = CircuitBreaker::new("svc-r47b", 3, std::time::Duration::from_secs(10)).unwrap();
4422 cb.record_failure();
4423 assert_eq!(cb.failure_headroom(), 2);
4424 cb.record_failure();
4425 assert_eq!(cb.failure_headroom(), 1);
4426 }
4427
4428 #[test]
4429 fn test_failure_headroom_zero_when_at_or_above_threshold() {
4430 let cb = CircuitBreaker::new("svc-r47c", 2, std::time::Duration::from_secs(10)).unwrap();
4431 cb.record_failure();
4432 cb.record_failure();
4433 assert_eq!(cb.failure_headroom(), 0);
4434 }
4435
4436 #[test]
4439 fn test_stage_count_below_name_len_counts_short_names() {
4440 let p = Pipeline::new()
4441 .add_stage("ab", |s: String| Ok(s)) .add_stage("abcde", |s: String| Ok(s)) .add_stage("xyz", |s: String| Ok(s)); assert_eq!(p.stage_count_below_name_len(4), 2);
4446 }
4447
4448 #[test]
4449 fn test_stage_count_below_name_len_zero_for_empty_pipeline() {
4450 let p = Pipeline::new();
4451 assert_eq!(p.stage_count_below_name_len(10), 0);
4452 }
4453
4454 #[test]
4457 fn test_stage_count_above_name_bytes_counts_long_names() {
4458 let p = Pipeline::new()
4459 .add_stage("ab", |s: String| Ok(s))
4460 .add_stage("a_very_long_name", |s: String| Ok(s));
4461 assert_eq!(p.stage_count_above_name_bytes(3), 1);
4462 }
4463
4464 #[test]
4465 fn test_stage_count_above_name_bytes_zero_for_empty_pipeline() {
4466 let p = Pipeline::new();
4467 assert_eq!(p.stage_count_above_name_bytes(0), 0);
4468 }
4469
4470 #[test]
4473 fn test_contains_stage_with_prefix_true_when_present() {
4474 let p = Pipeline::new()
4475 .add_stage("validate_input", |s: String| Ok(s))
4476 .add_stage("transform_data", |s: String| Ok(s));
4477 assert!(p.contains_stage_with_prefix("validate"));
4478 }
4479
4480 #[test]
4481 fn test_contains_stage_with_prefix_false_when_absent() {
4482 let p = Pipeline::new().add_stage("stage_a", |s: String| Ok(s));
4483 assert!(!p.contains_stage_with_prefix("missing"));
4484 }
4485
4486 #[test]
4487 fn test_contains_stage_with_prefix_false_for_empty_pipeline() {
4488 let p = Pipeline::new();
4489 assert!(!p.contains_stage_with_prefix("any"));
4490 }
4491
4492 #[test]
4495 fn test_retry_policy_is_bounded_true_for_normal_policy() {
4496 let p = RetryPolicy::constant(3, 100).unwrap();
4497 assert!(p.is_bounded());
4498 }
4499
4500 #[test]
4501 fn test_retry_policy_is_bounded_true_for_none_policy() {
4502 let p = RetryPolicy::none();
4503 assert!(p.is_bounded());
4504 }
4505
4506 #[test]
4507 fn test_retry_policy_remaining_wait_budget_full_at_zero_attempts() {
4508 let p = RetryPolicy::constant(3, 100).unwrap();
4509 assert_eq!(p.remaining_wait_budget_ms(0), 300);
4511 }
4512
4513 #[test]
4514 fn test_retry_policy_remaining_wait_budget_decreases_with_attempts() {
4515 let p = RetryPolicy::constant(3, 100).unwrap();
4516 assert_eq!(p.remaining_wait_budget_ms(1), 200);
4518 }
4519
4520 #[test]
4523 fn test_stage_names_containing_returns_all_matching_stages() {
4524 let p = Pipeline::new()
4525 .add_stage("pre_process", |s: String| Ok(s))
4526 .add_stage("post_process", |s: String| Ok(s))
4527 .add_stage("transform", |s: String| Ok(s));
4528 let names = p.stage_names_containing("process");
4529 assert_eq!(names, vec!["pre_process", "post_process"]);
4530 }
4531
4532 #[test]
4533 fn test_stage_names_containing_empty_when_no_match() {
4534 let p = Pipeline::new().add_stage("transform", |s: String| Ok(s));
4535 assert!(p.stage_names_containing("process").is_empty());
4536 }
4537
4538 #[test]
4541 fn test_stage_name_from_end_zero_returns_last_stage() {
4542 let p = Pipeline::new()
4543 .add_stage("first", |s: String| Ok(s))
4544 .add_stage("second", |s: String| Ok(s))
4545 .add_stage("third", |s: String| Ok(s));
4546 assert_eq!(p.stage_name_from_end(0), Some("third"));
4547 }
4548
4549 #[test]
4550 fn test_stage_name_from_end_one_returns_second_to_last() {
4551 let p = Pipeline::new()
4552 .add_stage("first", |s: String| Ok(s))
4553 .add_stage("second", |s: String| Ok(s))
4554 .add_stage("third", |s: String| Ok(s));
4555 assert_eq!(p.stage_name_from_end(1), Some("second"));
4556 }
4557
4558 #[test]
4559 fn test_stage_name_from_end_out_of_bounds_returns_none() {
4560 let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4561 assert_eq!(p.stage_name_from_end(1), None);
4562 }
4563
4564 #[test]
4565 fn test_stage_name_from_end_none_for_empty_pipeline() {
4566 let p = Pipeline::new();
4567 assert_eq!(p.stage_name_from_end(0), None);
4568 }
4569
4570 #[test]
4573 fn test_stage_at_index_returns_correct_stage() {
4574 let p = Pipeline::new()
4575 .add_stage("alpha", |s: String| Ok(s))
4576 .add_stage("beta", |s: String| Ok(s));
4577 assert_eq!(p.stage_at_index(0).map(|s| s.name.as_str()), Some("alpha"));
4578 assert_eq!(p.stage_at_index(1).map(|s| s.name.as_str()), Some("beta"));
4579 }
4580
4581 #[test]
4582 fn test_stage_at_index_none_for_out_of_bounds() {
4583 let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4584 assert!(p.stage_at_index(5).is_none());
4585 }
4586
4587 #[test]
4588 fn test_stage_at_index_none_for_empty_pipeline() {
4589 let p = Pipeline::new();
4590 assert!(p.stage_at_index(0).is_none());
4591 }
4592
4593 #[test]
4596 fn test_max_single_delay_ms_constant_policy() {
4597 let p = RetryPolicy::constant(3, 100).unwrap();
4598 assert_eq!(p.max_single_delay_ms(), 100);
4599 }
4600
4601 #[test]
4602 fn test_max_single_delay_ms_exponential_grows_with_attempts() {
4603 let p = RetryPolicy::exponential(3, 50).unwrap();
4604 assert_eq!(p.max_single_delay_ms(), 200);
4606 }
4607
4608 #[test]
4611 fn test_all_stage_names_returns_all_in_order() {
4612 let p = Pipeline::new()
4613 .add_stage("alpha", |s: String| Ok(s))
4614 .add_stage("beta", |s: String| Ok(s))
4615 .add_stage("gamma", |s: String| Ok(s));
4616 assert_eq!(p.all_stage_names(), vec!["alpha", "beta", "gamma"]);
4617 }
4618
4619 #[test]
4620 fn test_all_stage_names_empty_for_empty_pipeline() {
4621 let p = Pipeline::new();
4622 assert!(p.all_stage_names().is_empty());
4623 }
4624
4625 #[test]
4626 fn test_all_stage_names_preserves_duplicates() {
4627 let p = Pipeline::new()
4628 .add_stage("a", |s: String| Ok(s))
4629 .add_stage("a", |s: String| Ok(s));
4630 assert_eq!(p.all_stage_names(), vec!["a", "a"]);
4631 }
4632
4633 #[test]
4634 fn test_has_exactly_n_stages_true() {
4635 let p = Pipeline::new()
4636 .add_stage("x", |s: String| Ok(s))
4637 .add_stage("y", |s: String| Ok(s));
4638 assert!(p.has_exactly_n_stages(2));
4639 }
4640
4641 #[test]
4642 fn test_has_exactly_n_stages_false_when_different() {
4643 let p = Pipeline::new().add_stage("x", |s: String| Ok(s));
4644 assert!(!p.has_exactly_n_stages(3));
4645 }
4646
4647 #[test]
4648 fn test_has_exactly_n_stages_true_for_empty() {
4649 let p = Pipeline::new();
4650 assert!(p.has_exactly_n_stages(0));
4651 }
4652
4653 #[test]
4656 fn test_stage_index_of_returns_correct_index() {
4657 let p = Pipeline::new()
4658 .add_stage("first", |s: String| Ok(s))
4659 .add_stage("second", |s: String| Ok(s))
4660 .add_stage("third", |s: String| Ok(s));
4661 assert_eq!(p.stage_index_of("second"), Some(1));
4662 }
4663
4664 #[test]
4665 fn test_stage_index_of_returns_none_when_absent() {
4666 let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
4667 assert_eq!(p.stage_index_of("beta"), None);
4668 }
4669
4670 #[test]
4671 fn test_stage_index_of_returns_first_match_for_duplicates() {
4672 let p = Pipeline::new()
4673 .add_stage("dup", |s: String| Ok(s))
4674 .add_stage("dup", |s: String| Ok(s));
4675 assert_eq!(p.stage_index_of("dup"), Some(0));
4676 }
4677
4678 #[test]
4681 fn test_all_stage_names_start_with_true_when_all_match() {
4682 let p = Pipeline::new()
4683 .add_stage("api_v1", |s: String| Ok(s))
4684 .add_stage("api_v2", |s: String| Ok(s));
4685 assert!(p.all_stage_names_start_with("api_"));
4686 }
4687
4688 #[test]
4689 fn test_all_stage_names_start_with_false_when_one_differs() {
4690 let p = Pipeline::new()
4691 .add_stage("api_v1", |s: String| Ok(s))
4692 .add_stage("transform", |s: String| Ok(s));
4693 assert!(!p.all_stage_names_start_with("api_"));
4694 }
4695
4696 #[test]
4697 fn test_all_stage_names_start_with_true_for_empty_pipeline() {
4698 let p = Pipeline::new();
4699 assert!(p.all_stage_names_start_with("anything"));
4700 }
4701
4702 #[test]
4705 fn test_has_no_stages_true_for_empty_pipeline() {
4706 let p = Pipeline::new();
4707 assert!(p.has_no_stages());
4708 }
4709
4710 #[test]
4711 fn test_has_no_stages_false_after_adding_stage() {
4712 let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
4713 assert!(!p.has_no_stages());
4714 }
4715
4716 #[test]
4719 fn test_longest_stage_name_len_returns_max() {
4720 let p = Pipeline::new()
4721 .add_stage("short", |s: String| Ok(s))
4722 .add_stage("much-longer-name", |s: String| Ok(s));
4723 assert_eq!(p.longest_stage_name_len(), "much-longer-name".len());
4724 }
4725
4726 #[test]
4727 fn test_longest_stage_name_len_zero_for_empty_pipeline() {
4728 let p = Pipeline::new();
4729 assert_eq!(p.longest_stage_name_len(), 0);
4730 }
4731
4732 #[test]
4735 fn test_stage_names_joined_correct() {
4736 let p = Pipeline::new()
4737 .add_stage("alpha", |s: String| Ok(s))
4738 .add_stage("beta", |s: String| Ok(s));
4739 assert_eq!(p.stage_names_joined(", "), "alpha, beta");
4740 }
4741
4742 #[test]
4743 fn test_stage_names_joined_empty_for_empty_pipeline() {
4744 let p = Pipeline::new();
4745 assert_eq!(p.stage_names_joined("|"), "");
4746 }
4747
4748 #[test]
4751 fn test_stage_count_with_name_containing_correct() {
4752 let p = Pipeline::new()
4753 .add_stage("preprocess_input", |s: String| Ok(s))
4754 .add_stage("process_data", |s: String| Ok(s))
4755 .add_stage("postprocess_output", |s: String| Ok(s));
4756 assert_eq!(p.stage_count_with_name_containing("process"), 3);
4757 assert_eq!(p.stage_count_with_name_containing("pre"), 1);
4758 }
4759
4760 #[test]
4761 fn test_stage_count_with_name_containing_zero_when_none_match() {
4762 let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
4763 assert_eq!(p.stage_count_with_name_containing("beta"), 0);
4764 }
4765
4766 #[test]
4769 fn test_has_stage_at_index_true_for_valid_index() {
4770 let p = Pipeline::new()
4771 .add_stage("first", |s: String| Ok(s))
4772 .add_stage("second", |s: String| Ok(s));
4773 assert!(p.has_stage_at_index(0));
4774 assert!(p.has_stage_at_index(1));
4775 }
4776
4777 #[test]
4778 fn test_has_stage_at_index_false_for_out_of_bounds() {
4779 let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4780 assert!(!p.has_stage_at_index(1));
4781 }
4782
4783 #[test]
4784 fn test_has_stage_at_index_false_for_empty_pipeline() {
4785 let p = Pipeline::new();
4786 assert!(!p.has_stage_at_index(0));
4787 }
4788
4789 #[test]
4792 fn test_any_stage_has_name_true_for_existing_stage() {
4793 let p = Pipeline::new()
4794 .add_stage("alpha", |s: String| Ok(s))
4795 .add_stage("beta", |s: String| Ok(s));
4796 assert!(p.any_stage_has_name("alpha"));
4797 assert!(p.any_stage_has_name("beta"));
4798 }
4799
4800 #[test]
4801 fn test_any_stage_has_name_false_for_missing_stage() {
4802 let p = Pipeline::new().add_stage("alpha", |s: String| Ok(s));
4803 assert!(!p.any_stage_has_name("gamma"));
4804 }
4805
4806 #[test]
4807 fn test_any_stage_has_name_false_for_empty_pipeline() {
4808 let p = Pipeline::new();
4809 assert!(!p.any_stage_has_name("anything"));
4810 }
4811
4812 #[test]
4815 fn test_covers_n_failures_true_when_max_attempts_exceeds_n() {
4816 let policy = RetryPolicy::exponential(5, 100).unwrap();
4817 assert!(policy.covers_n_failures(4));
4818 assert!(policy.covers_n_failures(0));
4819 }
4820
4821 #[test]
4822 fn test_covers_n_failures_false_when_max_attempts_equals_n() {
4823 let policy = RetryPolicy::exponential(3, 100).unwrap();
4824 assert!(!policy.covers_n_failures(3));
4825 }
4826
4827 #[test]
4828 fn test_covers_n_failures_false_for_no_retry_policy() {
4829 let policy = RetryPolicy::none();
4830 assert!(!policy.covers_n_failures(1));
4832 }
4833
4834 #[test]
4837 fn test_last_stage_name_returns_last_added() {
4838 let p = Pipeline::new()
4839 .add_stage("first", |s: String| Ok(s))
4840 .add_stage("last", |s: String| Ok(s));
4841 assert_eq!(p.last_stage_name(), Some("last"));
4842 }
4843
4844 #[test]
4845 fn test_last_stage_name_none_for_empty_pipeline() {
4846 let p = Pipeline::new();
4847 assert!(p.last_stage_name().is_none());
4848 }
4849
4850 #[test]
4853 fn test_stage_name_at_returns_correct_name() {
4854 let p = Pipeline::new()
4855 .add_stage("alpha", |s: String| Ok(s))
4856 .add_stage("beta", |s: String| Ok(s));
4857 assert_eq!(p.stage_name_at(0), Some("alpha"));
4858 assert_eq!(p.stage_name_at(1), Some("beta"));
4859 }
4860
4861 #[test]
4862 fn test_stage_name_at_none_for_out_of_bounds() {
4863 let p = Pipeline::new().add_stage("only", |s: String| Ok(s));
4864 assert!(p.stage_name_at(1).is_none());
4865 }
4866
4867 #[test]
4868 fn test_stage_name_at_none_for_empty_pipeline() {
4869 let p = Pipeline::new();
4870 assert!(p.stage_name_at(0).is_none());
4871 }
4872
4873 #[test]
4876 fn test_all_stage_names_contain_true_when_all_match() {
4877 let p = Pipeline::new()
4878 .add_stage("step_alpha", |s: String| Ok(s))
4879 .add_stage("step_beta", |s: String| Ok(s));
4880 assert!(p.all_stage_names_contain("step_"));
4881 }
4882
4883 #[test]
4884 fn test_all_stage_names_contain_false_when_one_does_not_match() {
4885 let p = Pipeline::new()
4886 .add_stage("step_alpha", |s: String| Ok(s))
4887 .add_stage("gamma", |s: String| Ok(s));
4888 assert!(!p.all_stage_names_contain("step_"));
4889 }
4890
4891 #[test]
4892 fn test_all_stage_names_contain_true_for_empty_pipeline() {
4893 let p = Pipeline::new();
4894 assert!(p.all_stage_names_contain("anything"));
4895 }
4896}