1use crate::error::AgentRuntimeError;
44use crate::util::timed_lock;
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, Instant};
48
49pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
51
52#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum RetryKind {
57 Exponential,
59 Constant,
61}
62
63#[derive(Debug, Clone)]
65pub struct RetryPolicy {
66 pub max_attempts: u32,
68 pub base_delay: Duration,
70 pub kind: RetryKind,
72}
73
74impl RetryPolicy {
75 pub fn exponential(max_attempts: u32, base_ms: u64) -> Result<Self, AgentRuntimeError> {
85 if max_attempts == 0 {
86 return Err(AgentRuntimeError::Orchestration(
87 "max_attempts must be >= 1".into(),
88 ));
89 }
90 if base_ms == 0 {
91 return Err(AgentRuntimeError::Orchestration(
92 "base_ms must be >= 1 to avoid zero-delay busy-loop retries".into(),
93 ));
94 }
95 Ok(Self {
96 max_attempts,
97 base_delay: Duration::from_millis(base_ms),
98 kind: RetryKind::Exponential,
99 })
100 }
101
102 pub fn constant(max_attempts: u32, delay_ms: u64) -> Result<Self, AgentRuntimeError> {
113 if max_attempts == 0 {
114 return Err(AgentRuntimeError::Orchestration(
115 "max_attempts must be >= 1".into(),
116 ));
117 }
118 if delay_ms == 0 {
119 return Err(AgentRuntimeError::Orchestration(
120 "delay_ms must be >= 1 to avoid busy-loop retries".into(),
121 ));
122 }
123 Ok(Self {
124 max_attempts,
125 base_delay: Duration::from_millis(delay_ms),
126 kind: RetryKind::Constant,
127 })
128 }
129
130 pub fn none() -> Self {
134 Self {
135 max_attempts: 1,
136 base_delay: Duration::ZERO,
137 kind: RetryKind::Constant,
138 }
139 }
140
141 pub fn is_none(&self) -> bool {
145 self.max_attempts == 1 && self.base_delay == Duration::ZERO
146 }
147
148 pub fn with_max_attempts(mut self, n: u32) -> Result<Self, AgentRuntimeError> {
153 if n == 0 {
154 return Err(AgentRuntimeError::Orchestration(
155 "max_attempts must be >= 1".into(),
156 ));
157 }
158 self.max_attempts = n;
159 Ok(self)
160 }
161
162 pub fn max_attempts(&self) -> u32 {
164 self.max_attempts
165 }
166
167 pub fn is_no_retry(&self) -> bool {
171 self.max_attempts <= 1
172 }
173
174 pub fn will_retry_at_all(&self) -> bool {
180 self.max_attempts > 1
181 }
182
183 pub fn is_exponential(&self) -> bool {
185 matches!(self.kind, RetryKind::Exponential)
186 }
187
188 pub fn is_constant(&self) -> bool {
190 matches!(self.kind, RetryKind::Constant)
191 }
192
193 pub fn base_delay_ms(&self) -> u64 {
198 self.base_delay.as_millis() as u64
199 }
200
201 pub fn first_delay_ms(&self) -> u64 {
206 self.base_delay_ms()
207 }
208
209 pub fn is_last_attempt(&self, attempt: u32) -> bool {
213 attempt >= self.max_attempts
214 }
215
216 pub fn max_total_delay_ms(&self) -> u64 {
222 (1..=self.max_attempts)
223 .map(|attempt| self.delay_for(attempt).as_millis() as u64)
224 .sum()
225 }
226
227 pub fn delay_sum_ms(&self, n: u32) -> u64 {
231 let limit = n.min(self.max_attempts);
232 (1..=limit)
233 .map(|attempt| self.delay_for(attempt).as_millis() as u64)
234 .sum()
235 }
236
237 pub fn avg_delay_ms(&self) -> u64 {
241 if self.max_attempts == 0 {
242 return 0;
243 }
244 self.max_total_delay_ms() / self.max_attempts as u64
245 }
246
247 pub fn backoff_factor(&self) -> f64 {
251 match self.kind {
252 RetryKind::Exponential => 2.0,
253 RetryKind::Constant => 1.0,
254 }
255 }
256
257 pub fn with_base_delay_ms(mut self, ms: u64) -> Result<Self, AgentRuntimeError> {
262 if ms == 0 {
263 return Err(AgentRuntimeError::Orchestration(
264 "base_delay_ms must be >= 1 to avoid busy-loop retries".into(),
265 ));
266 }
267 self.base_delay = Duration::from_millis(ms);
268 Ok(self)
269 }
270
271 pub fn delay_ms_for(&self, attempt: u32) -> u64 {
278 self.delay_for(attempt).as_millis() as u64
279 }
280
281 pub fn total_max_delay_ms(&self) -> u64 {
286 (1..=self.max_attempts)
287 .map(|a| self.delay_for(a).as_millis() as u64)
288 .sum()
289 }
290
291 pub fn attempts_remaining(&self, attempt: u32) -> u32 {
295 self.max_attempts.saturating_sub(attempt)
296 }
297
298 pub fn can_retry(&self, attempt: u32) -> bool {
304 attempt < self.max_attempts
305 }
306
307 pub fn delay_for(&self, attempt: u32) -> Duration {
312 match self.kind {
313 RetryKind::Constant => self.base_delay.min(MAX_RETRY_DELAY),
314 RetryKind::Exponential => {
315 let exp = attempt.saturating_sub(1);
316 let multiplier = 1u64.checked_shl(exp.min(63)).unwrap_or(u64::MAX);
317 let millis = self
318 .base_delay
319 .as_millis()
320 .saturating_mul(multiplier as u128);
321 let raw = Duration::from_millis(millis.min(u64::MAX as u128) as u64);
322 raw.min(MAX_RETRY_DELAY)
323 }
324 }
325 }
326}
327
328#[derive(Debug, Clone)]
338pub enum CircuitState {
339 Closed,
341 Open {
343 opened_at: Instant,
345 },
346 HalfOpen,
348}
349
350impl PartialEq for CircuitState {
351 fn eq(&self, other: &Self) -> bool {
352 match (self, other) {
353 (CircuitState::Closed, CircuitState::Closed) => true,
354 (CircuitState::Open { .. }, CircuitState::Open { .. }) => true,
355 (CircuitState::HalfOpen, CircuitState::HalfOpen) => true,
356 _ => false,
357 }
358 }
359}
360
361impl Eq for CircuitState {}
362
363pub trait CircuitBreakerBackend: Send + Sync {
371 fn increment_failures(&self, service: &str) -> u32;
373 fn reset_failures(&self, service: &str);
375 fn get_failures(&self, service: &str) -> u32;
377 fn set_open_at(&self, service: &str, at: std::time::Instant);
379 fn clear_open_at(&self, service: &str);
381 fn get_open_at(&self, service: &str) -> Option<std::time::Instant>;
383}
384
385pub struct InMemoryCircuitBreakerBackend {
394 inner: Arc<Mutex<HashMap<String, InMemoryServiceState>>>,
395}
396
397#[derive(Default)]
398struct InMemoryServiceState {
399 consecutive_failures: u32,
400 open_at: Option<std::time::Instant>,
401}
402
403impl InMemoryCircuitBreakerBackend {
404 pub fn new() -> Self {
406 Self {
407 inner: Arc::new(Mutex::new(HashMap::new())),
408 }
409 }
410}
411
412impl Default for InMemoryCircuitBreakerBackend {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418impl CircuitBreakerBackend for InMemoryCircuitBreakerBackend {
419 fn increment_failures(&self, service: &str) -> u32 {
420 let mut map = timed_lock(
421 &self.inner,
422 "InMemoryCircuitBreakerBackend::increment_failures",
423 );
424 let state = map.entry(service.to_owned()).or_default();
425 state.consecutive_failures += 1;
426 state.consecutive_failures
427 }
428
429 fn reset_failures(&self, service: &str) {
430 let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::reset_failures");
431 if let Some(state) = map.get_mut(service) {
432 state.consecutive_failures = 0;
433 }
434 }
435
436 fn get_failures(&self, service: &str) -> u32 {
437 let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_failures");
438 map.get(service).map_or(0, |s| s.consecutive_failures)
439 }
440
441 fn set_open_at(&self, service: &str, at: std::time::Instant) {
442 let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::set_open_at");
443 map.entry(service.to_owned()).or_default().open_at = Some(at);
444 }
445
446 fn clear_open_at(&self, service: &str) {
447 let mut map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::clear_open_at");
448 if let Some(state) = map.get_mut(service) {
449 state.open_at = None;
450 }
451 }
452
453 fn get_open_at(&self, service: &str) -> Option<std::time::Instant> {
454 let map = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_open_at");
455 map.get(service).and_then(|s| s.open_at)
456 }
457}
458
459#[derive(Clone)]
468pub struct CircuitBreaker {
469 threshold: u32,
470 recovery_window: Duration,
471 service: String,
472 backend: Arc<dyn CircuitBreakerBackend>,
473}
474
475impl std::fmt::Debug for CircuitBreaker {
476 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477 f.debug_struct("CircuitBreaker")
478 .field("threshold", &self.threshold)
479 .field("recovery_window", &self.recovery_window)
480 .field("service", &self.service)
481 .finish()
482 }
483}
484
485impl CircuitBreaker {
486 pub fn new(
493 service: impl Into<String>,
494 threshold: u32,
495 recovery_window: Duration,
496 ) -> Result<Self, AgentRuntimeError> {
497 if threshold == 0 {
498 return Err(AgentRuntimeError::Orchestration(
499 "circuit breaker threshold must be >= 1".into(),
500 ));
501 }
502 let service = service.into();
503 Ok(Self {
504 threshold,
505 recovery_window,
506 service,
507 backend: Arc::new(InMemoryCircuitBreakerBackend::new()),
508 })
509 }
510
511 pub fn with_backend(mut self, backend: Arc<dyn CircuitBreakerBackend>) -> Self {
515 self.backend = backend;
516 self
517 }
518
519 #[tracing::instrument(skip(self, f))]
528 pub fn call<T, E, F>(&self, f: F) -> Result<T, AgentRuntimeError>
529 where
530 F: FnOnce() -> Result<T, E>,
531 E: std::fmt::Display,
532 {
533 let effective_state = match self.backend.get_open_at(&self.service) {
535 Some(opened_at) => {
536 if opened_at.elapsed() >= self.recovery_window {
537 self.backend.clear_open_at(&self.service);
539 tracing::info!("circuit moved to half-open for {}", self.service);
540 CircuitState::HalfOpen
541 } else {
542 CircuitState::Open { opened_at }
543 }
544 }
545 None => {
546 let failures = self.backend.get_failures(&self.service);
550 if failures >= self.threshold {
551 CircuitState::HalfOpen
552 } else {
553 CircuitState::Closed
554 }
555 }
556 };
557
558 tracing::debug!("circuit state: {:?}", effective_state);
559
560 match effective_state {
561 CircuitState::Open { .. } => {
562 return Err(AgentRuntimeError::CircuitOpen {
563 service: self.service.clone(),
564 });
565 }
566 CircuitState::Closed | CircuitState::HalfOpen => {}
567 }
568
569 match f() {
571 Ok(val) => {
572 self.backend.reset_failures(&self.service);
573 self.backend.clear_open_at(&self.service);
574 tracing::info!("circuit closed for {}", self.service);
575 Ok(val)
576 }
577 Err(e) => {
578 let failures = self.backend.increment_failures(&self.service);
579 if failures >= self.threshold {
580 let now = Instant::now();
581 self.backend.set_open_at(&self.service, now);
582 tracing::info!("circuit opened for {}", self.service);
583 }
584 Err(AgentRuntimeError::Orchestration(e.to_string()))
585 }
586 }
587 }
588
589 pub fn state(&self) -> Result<CircuitState, AgentRuntimeError> {
591 let state = match self.backend.get_open_at(&self.service) {
592 Some(opened_at) => {
593 if opened_at.elapsed() >= self.recovery_window {
594 let failures = self.backend.get_failures(&self.service);
596 if failures >= self.threshold {
597 CircuitState::HalfOpen
598 } else {
599 CircuitState::Closed
600 }
601 } else {
602 CircuitState::Open { opened_at }
603 }
604 }
605 None => {
606 let failures = self.backend.get_failures(&self.service);
607 if failures >= self.threshold {
608 CircuitState::HalfOpen
609 } else {
610 CircuitState::Closed
611 }
612 }
613 };
614 Ok(state)
615 }
616
617 pub fn failure_count(&self) -> Result<u32, AgentRuntimeError> {
619 Ok(self.backend.get_failures(&self.service))
620 }
621
622 pub fn record_success(&self) {
627 self.backend.reset_failures(&self.service);
628 self.backend.clear_open_at(&self.service);
629 }
630
631 pub fn record_failure(&self) {
635 let failures = self.backend.increment_failures(&self.service);
636 if failures >= self.threshold {
637 self.backend.set_open_at(&self.service, Instant::now());
638 tracing::info!("circuit opened for {} (manual record)", self.service);
639 }
640 }
641
642 pub fn service_name(&self) -> &str {
644 &self.service
645 }
646
647 pub fn is_closed(&self) -> bool {
649 matches!(self.state(), Ok(CircuitState::Closed))
650 }
651
652 pub fn is_open(&self) -> bool {
654 matches!(self.state(), Ok(CircuitState::Open { .. }))
655 }
656
657 pub fn is_half_open(&self) -> bool {
659 matches!(self.state(), Ok(CircuitState::HalfOpen))
660 }
661
662 pub fn is_healthy(&self) -> bool {
667 !self.is_open()
668 }
669
670 pub fn threshold(&self) -> u32 {
674 self.threshold
675 }
676
677 pub fn failure_rate(&self) -> f64 {
683 if self.threshold == 0 {
684 return 0.0;
685 }
686 let failures = self.backend.get_failures(&self.service);
687 failures as f64 / self.threshold as f64
688 }
689
690 pub fn is_at_threshold(&self) -> bool {
695 let failures = self.backend.get_failures(&self.service);
696 failures >= self.threshold
697 }
698
699 pub fn failures_until_open(&self) -> u32 {
703 let failures = self.backend.get_failures(&self.service);
704 self.threshold.saturating_sub(failures)
705 }
706
707 pub fn recovery_window(&self) -> std::time::Duration {
712 self.recovery_window
713 }
714
715 pub fn reset(&self) {
720 self.backend.reset_failures(&self.service);
721 self.backend.clear_open_at(&self.service);
722 tracing::info!("circuit manually reset to Closed for {}", self.service);
723 }
724
725 #[tracing::instrument(skip(self, backend, f))]
735 pub async fn async_call<T, E, F, Fut>(
736 &self,
737 backend: &dyn AsyncCircuitBreakerBackend,
738 f: F,
739 ) -> Result<T, AgentRuntimeError>
740 where
741 F: FnOnce() -> Fut,
742 Fut: std::future::Future<Output = Result<T, E>>,
743 E: std::fmt::Display,
744 {
745 let effective_state = match backend.get_open_at(&self.service).await {
747 Some(opened_at) => {
748 if opened_at.elapsed() >= self.recovery_window {
749 backend.clear_open_at(&self.service).await;
750 tracing::info!("circuit async moved to half-open for {}", self.service);
751 CircuitState::HalfOpen
752 } else {
753 CircuitState::Open { opened_at }
754 }
755 }
756 None => {
757 let failures = backend.get_failures(&self.service).await;
758 if failures >= self.threshold {
759 CircuitState::HalfOpen
760 } else {
761 CircuitState::Closed
762 }
763 }
764 };
765
766 if let CircuitState::Open { .. } = effective_state {
767 return Err(AgentRuntimeError::CircuitOpen {
768 service: self.service.clone(),
769 });
770 }
771
772 match f().await {
773 Ok(val) => {
774 backend.reset_failures(&self.service).await;
775 backend.clear_open_at(&self.service).await;
776 Ok(val)
777 }
778 Err(e) => {
779 let failures = backend.increment_failures(&self.service).await;
780 if failures >= self.threshold {
781 backend
782 .set_open_at(&self.service, Instant::now())
783 .await;
784 tracing::info!("circuit async opened for {}", self.service);
785 }
786 Err(AgentRuntimeError::Orchestration(e.to_string()))
787 }
788 }
789 }
790}
791
792#[async_trait::async_trait]
803pub trait AsyncCircuitBreakerBackend: Send + Sync {
804 async fn increment_failures(&self, service: &str) -> u32;
806 async fn reset_failures(&self, service: &str);
808 async fn get_failures(&self, service: &str) -> u32;
810 async fn set_open_at(&self, service: &str, at: Instant);
812 async fn clear_open_at(&self, service: &str);
814 async fn get_open_at(&self, service: &str) -> Option<Instant>;
816}
817
818#[async_trait::async_trait]
819impl AsyncCircuitBreakerBackend for InMemoryCircuitBreakerBackend {
820 async fn increment_failures(&self, service: &str) -> u32 {
821 <Self as CircuitBreakerBackend>::increment_failures(self, service)
822 }
823 async fn reset_failures(&self, service: &str) {
824 <Self as CircuitBreakerBackend>::reset_failures(self, service);
825 }
826 async fn get_failures(&self, service: &str) -> u32 {
827 <Self as CircuitBreakerBackend>::get_failures(self, service)
828 }
829 async fn set_open_at(&self, service: &str, at: Instant) {
830 <Self as CircuitBreakerBackend>::set_open_at(self, service, at);
831 }
832 async fn clear_open_at(&self, service: &str) {
833 <Self as CircuitBreakerBackend>::clear_open_at(self, service);
834 }
835 async fn get_open_at(&self, service: &str) -> Option<Instant> {
836 <Self as CircuitBreakerBackend>::get_open_at(self, service)
837 }
838}
839
840#[derive(Debug, Clone, PartialEq)]
844pub enum DeduplicationResult {
845 New,
847 Cached(String),
849 InProgress,
851}
852
853#[derive(Debug, Clone)]
861pub struct Deduplicator {
862 ttl: Duration,
863 max_entries: Option<usize>,
867 inner: Arc<Mutex<DeduplicatorInner>>,
868}
869
870#[derive(Debug)]
871struct DeduplicatorInner {
872 cache: HashMap<String, (String, Instant)>, in_flight: HashMap<String, Instant>, cache_order: std::collections::VecDeque<String>,
876 call_count: u64,
880}
881
882impl Deduplicator {
883 pub fn new(ttl: Duration) -> Self {
885 Self {
886 ttl,
887 max_entries: None,
888 inner: Arc::new(Mutex::new(DeduplicatorInner {
889 cache: HashMap::new(),
890 in_flight: HashMap::new(),
891 cache_order: std::collections::VecDeque::new(),
892 call_count: 0,
893 })),
894 }
895 }
896
897 pub fn with_max_entries(mut self, max: usize) -> Result<Self, AgentRuntimeError> {
906 if max == 0 {
907 return Err(AgentRuntimeError::Orchestration(
908 "Deduplicator max_entries must be >= 1".into(),
909 ));
910 }
911 self.max_entries = Some(max);
912 Ok(self)
913 }
914
915 pub fn check_and_register(&self, key: &str) -> Result<DeduplicationResult, AgentRuntimeError> {
919 let mut inner = timed_lock(&self.inner, "Deduplicator::check_and_register");
920
921 let now = Instant::now();
922
923 const EXPIRY_INTERVAL: u64 = 64;
927 inner.call_count = inner.call_count.wrapping_add(1);
928 if inner.call_count % EXPIRY_INTERVAL == 0 {
929 let ttl = self.ttl;
930 inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
931 inner
932 .in_flight
933 .retain(|_, ts| now.duration_since(*ts) < ttl);
934 }
935
936 match inner.cache.get(key) {
938 Some((result, ts)) if now.duration_since(*ts) < self.ttl => {
939 return Ok(DeduplicationResult::Cached(result.clone()));
940 }
941 Some(_) => {
942 inner.cache.remove(key); }
944 None => {}
945 }
946 match inner.in_flight.get(key) {
947 Some(ts) if now.duration_since(*ts) < self.ttl => {
948 return Ok(DeduplicationResult::InProgress);
949 }
950 Some(_) => {
951 inner.in_flight.remove(key); }
953 None => {}
954 }
955
956 inner.in_flight.insert(key.to_owned(), now);
957 Ok(DeduplicationResult::New)
958 }
959
960 pub fn check(&self, key: &str, ttl: std::time::Duration) -> Result<DeduplicationResult, AgentRuntimeError> {
965 let mut inner = timed_lock(&self.inner, "Deduplicator::check");
966 let now = Instant::now();
967
968 const EXPIRY_INTERVAL: u64 = 64;
970 inner.call_count = inner.call_count.wrapping_add(1);
971 if inner.call_count % EXPIRY_INTERVAL == 0 {
972 inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
973 inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
974 }
975
976 match inner.cache.get(key) {
977 Some((result, ts)) if now.duration_since(*ts) < ttl => {
978 return Ok(DeduplicationResult::Cached(result.clone()));
979 }
980 Some(_) => {
981 inner.cache.remove(key);
982 }
983 None => {}
984 }
985 match inner.in_flight.get(key) {
986 Some(ts) if now.duration_since(*ts) < ttl => {
987 return Ok(DeduplicationResult::InProgress);
988 }
989 Some(_) => {
990 inner.in_flight.remove(key);
991 }
992 None => {}
993 }
994
995 inner.in_flight.insert(key.to_owned(), now);
996 Ok(DeduplicationResult::New)
997 }
998
999 pub fn dedup_many(
1007 &self,
1008 requests: &[(&str, std::time::Duration)],
1009 ) -> Result<Vec<DeduplicationResult>, AgentRuntimeError> {
1010 if requests.is_empty() {
1011 return Ok(Vec::new());
1012 }
1013 let mut inner = timed_lock(&self.inner, "Deduplicator::dedup_many");
1014 let now = std::time::Instant::now();
1015 let mut results = Vec::with_capacity(requests.len());
1016
1017 for &(key, ttl) in requests {
1018 inner.cache.retain(|_, (_, ts)| now.duration_since(*ts) < ttl);
1020 inner.in_flight.retain(|_, ts| now.duration_since(*ts) < ttl);
1021
1022 let result = if let Some((cached_result, _)) = inner.cache.get(key) {
1023 DeduplicationResult::Cached(cached_result.clone())
1024 } else if inner.in_flight.contains_key(key) {
1025 DeduplicationResult::InProgress
1026 } else {
1027 inner.in_flight.insert(key.to_owned(), now);
1028 DeduplicationResult::New
1029 };
1030 results.push(result);
1031 }
1032
1033 Ok(results)
1034 }
1035
1036 pub fn complete(&self, key: &str, result: impl Into<String>) -> Result<(), AgentRuntimeError> {
1041 let mut inner = timed_lock(&self.inner, "Deduplicator::complete");
1042 inner.in_flight.remove(key);
1043
1044 if let Some(max) = self.max_entries {
1047 while inner.cache.len() >= max {
1048 match inner.cache_order.pop_front() {
1049 Some(oldest_key) => {
1050 inner.cache.remove(&oldest_key);
1051 }
1052 None => break,
1053 }
1054 }
1055 }
1056
1057 let owned_key = key.to_owned();
1058 inner.cache_order.push_back(owned_key.clone());
1059 inner.cache.insert(owned_key, (result.into(), Instant::now()));
1060 Ok(())
1061 }
1062
1063 pub fn fail(&self, key: &str) -> Result<(), AgentRuntimeError> {
1068 let mut inner = timed_lock(&self.inner, "Deduplicator::fail");
1069 inner.in_flight.remove(key);
1070 Ok(())
1071 }
1072
1073 pub fn in_flight_count(&self) -> Result<usize, AgentRuntimeError> {
1075 let inner = timed_lock(&self.inner, "Deduplicator::in_flight_count");
1076 Ok(inner.in_flight.len())
1077 }
1078
1079 pub fn in_flight_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1081 let inner = timed_lock(&self.inner, "Deduplicator::in_flight_keys");
1082 Ok(inner.in_flight.keys().cloned().collect())
1083 }
1084
1085 pub fn cached_count(&self) -> Result<usize, AgentRuntimeError> {
1089 let inner = timed_lock(&self.inner, "Deduplicator::cached_count");
1090 Ok(inner.cache.len())
1091 }
1092
1093 pub fn cached_keys(&self) -> Result<Vec<String>, AgentRuntimeError> {
1100 let inner = timed_lock(&self.inner, "Deduplicator::cached_keys");
1101 Ok(inner.cache.keys().cloned().collect())
1102 }
1103
1104 pub fn ttl(&self) -> Duration {
1106 self.ttl
1107 }
1108
1109 pub fn max_entries(&self) -> Option<usize> {
1115 self.max_entries
1116 }
1117
1118 pub fn is_idle(&self) -> Result<bool, AgentRuntimeError> {
1120 let inner = timed_lock(&self.inner, "Deduplicator::is_idle");
1121 Ok(inner.in_flight.is_empty())
1122 }
1123
1124 pub fn total_count(&self) -> Result<usize, AgentRuntimeError> {
1127 let inner = timed_lock(&self.inner, "Deduplicator::total_count");
1128 Ok(inner.in_flight.len() + inner.cache.len())
1129 }
1130
1131 pub fn contains(&self, key: &str) -> Result<bool, AgentRuntimeError> {
1138 let inner = timed_lock(&self.inner, "Deduplicator::contains");
1139 Ok(inner.in_flight.contains_key(key) || inner.cache.contains_key(key))
1140 }
1141
1142 pub fn get_result(&self, key: &str) -> Result<Option<String>, AgentRuntimeError> {
1147 let inner = timed_lock(&self.inner, "Deduplicator::get_result");
1148 let ttl = self.ttl;
1149 let now = std::time::Instant::now();
1150 Ok(inner.cache.get(key).and_then(|(result, inserted_at)| {
1151 if now.duration_since(*inserted_at) <= ttl {
1152 Some(result.clone())
1153 } else {
1154 None
1155 }
1156 }))
1157 }
1158
1159 pub fn clear(&self) -> Result<(), AgentRuntimeError> {
1163 let mut inner = timed_lock(&self.inner, "Deduplicator::clear");
1164 inner.cache.clear();
1165 inner.in_flight.clear();
1166 inner.cache_order.clear();
1167 Ok(())
1168 }
1169
1170 pub fn purge_expired(&self) -> Result<usize, AgentRuntimeError> {
1178 let mut inner = timed_lock(&self.inner, "Deduplicator::purge_expired");
1179 let ttl = self.ttl;
1180 let now = std::time::Instant::now();
1181 let before = inner.cache.len();
1182 inner.cache.retain(|_, (_, inserted_at)| {
1183 now.duration_since(*inserted_at) <= ttl
1184 });
1185 let removed = before - inner.cache.len();
1186 if removed > 0 {
1189 let live_keys: std::collections::HashSet<String> =
1190 inner.cache.keys().cloned().collect();
1191 inner.cache_order.retain(|k| live_keys.contains(k));
1192 }
1193 Ok(removed)
1194 }
1195
1196 pub fn evict_oldest(&self) -> Result<bool, AgentRuntimeError> {
1200 let mut inner = timed_lock(&self.inner, "Deduplicator::evict_oldest");
1201 while let Some(key) = inner.cache_order.pop_front() {
1202 if inner.cache.remove(&key).is_some() {
1203 return Ok(true);
1204 }
1205 }
1206 Ok(false)
1207 }
1208}
1209
1210#[derive(Debug, Clone)]
1220pub struct BackpressureGuard {
1221 capacity: usize,
1222 soft_capacity: Option<usize>,
1223 inner: Arc<Mutex<usize>>,
1224}
1225
1226impl BackpressureGuard {
1227 pub fn new(capacity: usize) -> Result<Self, AgentRuntimeError> {
1233 if capacity == 0 {
1234 return Err(AgentRuntimeError::Orchestration(
1235 "BackpressureGuard capacity must be > 0".into(),
1236 ));
1237 }
1238 Ok(Self {
1239 capacity,
1240 soft_capacity: None,
1241 inner: Arc::new(Mutex::new(0)),
1242 })
1243 }
1244
1245 pub fn with_soft_limit(mut self, soft: usize) -> Result<Self, AgentRuntimeError> {
1248 if soft >= self.capacity {
1249 return Err(AgentRuntimeError::Orchestration(
1250 "soft_capacity must be less than hard capacity".into(),
1251 ));
1252 }
1253 self.soft_capacity = Some(soft);
1254 Ok(self)
1255 }
1256
1257 pub fn try_acquire(&self) -> Result<(), AgentRuntimeError> {
1266 let mut depth = timed_lock(&self.inner, "BackpressureGuard::try_acquire");
1267 if *depth >= self.capacity {
1268 return Err(AgentRuntimeError::BackpressureShed {
1269 depth: *depth,
1270 capacity: self.capacity,
1271 });
1272 }
1273 *depth += 1;
1274 if let Some(soft) = self.soft_capacity {
1275 if *depth >= soft {
1276 tracing::warn!(
1277 depth = *depth,
1278 soft_capacity = soft,
1279 hard_capacity = self.capacity,
1280 "backpressure approaching hard limit"
1281 );
1282 }
1283 }
1284 Ok(())
1285 }
1286
1287 pub fn release(&self) -> Result<(), AgentRuntimeError> {
1289 let mut depth = timed_lock(&self.inner, "BackpressureGuard::release");
1290 *depth = depth.saturating_sub(1);
1291 Ok(())
1292 }
1293
1294 pub fn reset(&self) {
1299 let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset");
1300 *depth = 0;
1301 }
1302
1303 pub fn is_full(&self) -> Result<bool, AgentRuntimeError> {
1305 Ok(self.depth()? >= self.capacity)
1306 }
1307
1308 pub fn is_empty(&self) -> Result<bool, AgentRuntimeError> {
1310 Ok(self.depth()? == 0)
1311 }
1312
1313 pub fn available_capacity(&self) -> Result<usize, AgentRuntimeError> {
1315 Ok(self.capacity.saturating_sub(self.depth()?))
1316 }
1317
1318 pub fn hard_capacity(&self) -> usize {
1320 self.capacity
1321 }
1322
1323 pub fn soft_limit(&self) -> Option<usize> {
1325 self.soft_capacity
1326 }
1327
1328 pub fn is_soft_limited(&self) -> bool {
1333 self.soft_capacity.is_some()
1334 }
1335
1336 pub fn depth(&self) -> Result<usize, AgentRuntimeError> {
1338 let depth = timed_lock(&self.inner, "BackpressureGuard::depth");
1339 Ok(*depth)
1340 }
1341
1342 pub fn percent_full(&self) -> Result<f64, AgentRuntimeError> {
1347 let depth = self.depth()?;
1348 Ok((depth as f64 / self.capacity as f64 * 100.0).min(100.0))
1349 }
1350
1351 pub fn soft_depth_ratio(&self) -> f32 {
1356 match self.soft_capacity {
1357 None => 0.0,
1358 Some(soft) => {
1359 let depth = timed_lock(&self.inner, "BackpressureGuard::soft_depth_ratio");
1360 *depth as f32 / soft as f32
1361 }
1362 }
1363 }
1364
1365 pub fn utilization_ratio(&self) -> Result<f32, AgentRuntimeError> {
1369 if self.capacity == 0 {
1370 return Ok(0.0);
1371 }
1372 let depth = self.depth()?;
1373 Ok(depth as f32 / self.capacity as f32)
1374 }
1375
1376 pub fn remaining_capacity(&self) -> Result<usize, AgentRuntimeError> {
1381 let depth = self.depth()?;
1382 Ok(self.capacity.saturating_sub(depth))
1383 }
1384
1385 pub fn reset_depth(&self) -> Result<(), AgentRuntimeError> {
1390 let mut depth = timed_lock(&self.inner, "BackpressureGuard::reset_depth");
1391 *depth = 0;
1392 Ok(())
1393 }
1394
1395 pub fn headroom_ratio(&self) -> Result<f64, AgentRuntimeError> {
1399 Ok(self.available_capacity()? as f64 / self.capacity as f64)
1400 }
1401
1402 pub fn acquired_count(&self) -> Result<usize, AgentRuntimeError> {
1406 Ok(self.capacity - self.available_capacity()?)
1407 }
1408
1409 pub fn over_soft_limit(&self) -> Result<bool, AgentRuntimeError> {
1413 let soft = match self.soft_limit() {
1414 Some(s) => s,
1415 None => return Ok(false),
1416 };
1417 Ok(self.depth()? > soft)
1418 }
1419}
1420
1421#[derive(Debug)]
1425pub struct PipelineResult {
1426 pub output: String,
1428 pub stage_timings: Vec<(String, u64)>,
1430}
1431
1432pub struct Stage {
1434 pub name: String,
1436 pub handler: Box<dyn Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync>,
1438}
1439
1440impl std::fmt::Debug for Stage {
1441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1442 f.debug_struct("Stage").field("name", &self.name).finish()
1443 }
1444}
1445
1446type StageErrorHandler = Box<dyn Fn(&str, &str) -> String + Send + Sync>;
1448
1449pub struct Pipeline {
1456 stages: Vec<Stage>,
1457 error_handler: Option<StageErrorHandler>,
1458}
1459
1460impl std::fmt::Debug for Pipeline {
1461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462 f.debug_struct("Pipeline")
1463 .field("stages", &self.stages)
1464 .field("has_error_handler", &self.error_handler.is_some())
1465 .finish()
1466 }
1467}
1468
1469impl Pipeline {
1470 pub fn new() -> Self {
1472 Self { stages: Vec::new(), error_handler: None }
1473 }
1474
1475 pub fn with_error_handler(
1481 mut self,
1482 handler: impl Fn(&str, &str) -> String + Send + Sync + 'static,
1483 ) -> Self {
1484 self.error_handler = Some(Box::new(handler));
1485 self
1486 }
1487
1488 pub fn add_stage(
1490 mut self,
1491 name: impl Into<String>,
1492 handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1493 ) -> Self {
1494 self.stages.push(Stage {
1495 name: name.into(),
1496 handler: Box::new(handler),
1497 });
1498 self
1499 }
1500
1501 pub fn prepend_stage(
1506 mut self,
1507 name: impl Into<String>,
1508 handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
1509 ) -> Self {
1510 self.stages.insert(0, Stage {
1511 name: name.into(),
1512 handler: Box::new(handler),
1513 });
1514 self
1515 }
1516
1517 pub fn is_empty(&self) -> bool {
1519 self.stages.is_empty()
1520 }
1521
1522 pub fn has_error_handler(&self) -> bool {
1527 self.error_handler.is_some()
1528 }
1529
1530 pub fn stage_count(&self) -> usize {
1532 self.stages.len()
1533 }
1534
1535 pub fn has_stage(&self, name: &str) -> bool {
1537 self.stages.iter().any(|s| s.name == name)
1538 }
1539
1540 pub fn stage_names(&self) -> Vec<&str> {
1542 self.stages.iter().map(|s| s.name.as_str()).collect()
1543 }
1544
1545 pub fn stage_names_owned(&self) -> Vec<String> {
1552 self.stages.iter().map(|s| s.name.clone()).collect()
1553 }
1554
1555 pub fn get_stage_name_at(&self, index: usize) -> Option<&str> {
1557 self.stages.get(index).map(|s| s.name.as_str())
1558 }
1559
1560 pub fn stage_index(&self, name: &str) -> Option<usize> {
1564 self.stages.iter().position(|s| s.name == name)
1565 }
1566
1567 pub fn first_stage_name(&self) -> Option<&str> {
1569 self.stages.first().map(|s| s.name.as_str())
1570 }
1571
1572 pub fn last_stage_name(&self) -> Option<&str> {
1574 self.stages.last().map(|s| s.name.as_str())
1575 }
1576
1577 pub fn remove_stage(&mut self, name: &str) -> bool {
1582 if let Some(pos) = self.stages.iter().position(|s| s.name == name) {
1583 self.stages.remove(pos);
1584 true
1585 } else {
1586 false
1587 }
1588 }
1589
1590 pub fn rename_stage(&mut self, old_name: &str, new_name: impl Into<String>) -> bool {
1595 if let Some(stage) = self.stages.iter_mut().find(|s| s.name == old_name) {
1596 stage.name = new_name.into();
1597 true
1598 } else {
1599 false
1600 }
1601 }
1602
1603 pub fn clear(&mut self) {
1607 self.stages.clear();
1608 }
1609
1610 pub fn count_stages_matching(&self, keyword: &str) -> usize {
1612 let kw = keyword.to_ascii_lowercase();
1613 self.stages
1614 .iter()
1615 .filter(|s| s.name.to_ascii_lowercase().contains(&kw))
1616 .count()
1617 }
1618
1619 pub fn swap_stages(&mut self, a: &str, b: &str) -> bool {
1624 let idx_a = self.stages.iter().position(|s| s.name == a);
1625 let idx_b = self.stages.iter().position(|s| s.name == b);
1626 match (idx_a, idx_b) {
1627 (Some(i), Some(j)) => {
1628 self.stages.swap(i, j);
1629 true
1630 }
1631 _ => false,
1632 }
1633 }
1634
1635 #[tracing::instrument(skip(self))]
1637 pub fn run(&self, input: String) -> Result<String, AgentRuntimeError> {
1638 let mut current = input;
1639 for stage in &self.stages {
1640 tracing::debug!(stage = %stage.name, "running pipeline stage");
1641 match (stage.handler)(current) {
1642 Ok(out) => current = out,
1643 Err(e) => {
1644 tracing::error!(stage = %stage.name, error = %e, "pipeline stage failed");
1645 if let Some(ref handler) = self.error_handler {
1646 current = handler(&stage.name, &e.to_string());
1647 } else {
1648 return Err(e);
1649 }
1650 }
1651 }
1652 }
1653 Ok(current)
1654 }
1655
1656 pub fn execute_timed(&self, input: String) -> Result<PipelineResult, AgentRuntimeError> {
1661 let mut current = input;
1662 let mut stage_timings = Vec::new();
1663 for stage in &self.stages {
1664 let start = std::time::Instant::now();
1665 tracing::debug!(stage = %stage.name, "running timed pipeline stage");
1666 match (stage.handler)(current) {
1667 Ok(out) => current = out,
1668 Err(e) => {
1669 tracing::error!(stage = %stage.name, error = %e, "timed pipeline stage failed");
1670 if let Some(ref handler) = self.error_handler {
1671 current = handler(&stage.name, &e.to_string());
1672 } else {
1673 return Err(e);
1674 }
1675 }
1676 }
1677 let duration_ms = start.elapsed().as_millis() as u64;
1678 stage_timings.push((stage.name.clone(), duration_ms));
1679 }
1680 Ok(PipelineResult {
1681 output: current,
1682 stage_timings,
1683 })
1684 }
1685
1686}
1687
1688impl Default for Pipeline {
1689 fn default() -> Self {
1690 Self::new()
1691 }
1692}
1693
1694#[cfg(test)]
1697mod tests {
1698 use super::*;
1699
1700 #[test]
1703 fn test_retry_policy_rejects_zero_attempts() {
1704 assert!(RetryPolicy::exponential(0, 100).is_err());
1705 }
1706
1707 #[test]
1708 fn test_retry_policy_delay_attempt_1_equals_base() {
1709 let p = RetryPolicy::exponential(3, 100).unwrap();
1710 assert_eq!(p.delay_for(1), Duration::from_millis(100));
1711 }
1712
1713 #[test]
1714 fn test_retry_policy_delay_doubles_each_attempt() {
1715 let p = RetryPolicy::exponential(5, 100).unwrap();
1716 assert_eq!(p.delay_for(2), Duration::from_millis(200));
1717 assert_eq!(p.delay_for(3), Duration::from_millis(400));
1718 assert_eq!(p.delay_for(4), Duration::from_millis(800));
1719 }
1720
1721 #[test]
1722 fn test_retry_policy_delay_capped_at_max() {
1723 let p = RetryPolicy::exponential(10, 10_000).unwrap();
1724 assert_eq!(p.delay_for(10), MAX_RETRY_DELAY);
1725 }
1726
1727 #[test]
1728 fn test_retry_policy_delay_never_exceeds_max_for_any_attempt() {
1729 let p = RetryPolicy::exponential(10, 1000).unwrap();
1730 for attempt in 1..=10 {
1731 assert!(p.delay_for(attempt) <= MAX_RETRY_DELAY);
1732 }
1733 }
1734
1735 #[test]
1738 fn test_retry_policy_first_delay_ms_equals_base_delay() {
1739 let p = RetryPolicy::exponential(3, 200).unwrap();
1740 assert_eq!(p.first_delay_ms(), p.base_delay_ms());
1741 }
1742
1743 #[test]
1744 fn test_retry_policy_first_delay_ms_constant_policy() {
1745 let p = RetryPolicy::constant(4, 150).unwrap();
1746 assert_eq!(p.first_delay_ms(), 150);
1747 }
1748
1749 #[test]
1752 fn test_circuit_breaker_rejects_zero_threshold() {
1753 assert!(CircuitBreaker::new("svc", 0, Duration::from_secs(1)).is_err());
1754 }
1755
1756 #[test]
1757 fn test_circuit_breaker_starts_closed() {
1758 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
1759 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
1760 }
1761
1762 #[test]
1763 fn test_circuit_breaker_success_keeps_closed() {
1764 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
1765 let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(42));
1766 assert!(result.is_ok());
1767 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
1768 }
1769
1770 #[test]
1771 fn test_circuit_breaker_opens_after_threshold_failures() {
1772 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
1773 for _ in 0..3 {
1774 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("oops".to_string()));
1775 }
1776 assert!(matches!(cb.state().unwrap(), CircuitState::Open { .. }));
1777 }
1778
1779 #[test]
1780 fn test_circuit_breaker_open_fast_fails() {
1781 let cb = CircuitBreaker::new("svc", 1, Duration::from_secs(3600)).unwrap();
1782 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1783 let result: Result<(), AgentRuntimeError> = cb.call(|| Ok::<(), AgentRuntimeError>(()));
1784 assert!(matches!(result, Err(AgentRuntimeError::CircuitOpen { .. })));
1785 }
1786
1787 #[test]
1788 fn test_circuit_breaker_success_resets_failure_count() {
1789 let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap();
1790 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1791 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1792 let _: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(1));
1793 assert_eq!(cb.failure_count().unwrap(), 0);
1794 }
1795
1796 #[test]
1797 fn test_circuit_breaker_half_open_on_recovery() {
1798 let cb = CircuitBreaker::new("svc", 1, Duration::ZERO).unwrap();
1800 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
1801 let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(99));
1803 assert_eq!(result.unwrap_or(0), 99);
1804 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
1805 }
1806
1807 #[test]
1808 fn test_circuit_breaker_with_custom_backend_uses_backend_state() {
1809 let shared_backend: Arc<dyn CircuitBreakerBackend> =
1812 Arc::new(InMemoryCircuitBreakerBackend::new());
1813
1814 let cb1 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
1815 .unwrap()
1816 .with_backend(Arc::clone(&shared_backend));
1817
1818 let cb2 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
1819 .unwrap()
1820 .with_backend(Arc::clone(&shared_backend));
1821
1822 let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail".to_string()));
1824
1825 assert_eq!(cb2.failure_count().unwrap(), 1);
1827
1828 let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail again".to_string()));
1830
1831 assert!(matches!(cb2.state().unwrap(), CircuitState::Open { .. }));
1833 }
1834
1835 #[test]
1836 fn test_in_memory_backend_increments_and_resets() {
1837 use super::CircuitBreakerBackend as CB;
1838 let backend = InMemoryCircuitBreakerBackend::new();
1839
1840 assert_eq!(CB::get_failures(&backend, "svc"), 0);
1841
1842 let count = CB::increment_failures(&backend, "svc");
1843 assert_eq!(count, 1);
1844
1845 let count = CB::increment_failures(&backend, "svc");
1846 assert_eq!(count, 2);
1847
1848 CB::reset_failures(&backend, "svc");
1849 assert_eq!(CB::get_failures(&backend, "svc"), 0);
1850
1851 assert!(CB::get_open_at(&backend, "svc").is_none());
1853 let now = Instant::now();
1854 CB::set_open_at(&backend, "svc", now);
1855 assert!(CB::get_open_at(&backend, "svc").is_some());
1856 CB::clear_open_at(&backend, "svc");
1857 assert!(CB::get_open_at(&backend, "svc").is_none());
1858 }
1859
1860 #[test]
1863 fn test_deduplicator_new_key_is_new() {
1864 let d = Deduplicator::new(Duration::from_secs(60));
1865 let r = d.check_and_register("key-1").unwrap();
1866 assert_eq!(r, DeduplicationResult::New);
1867 }
1868
1869 #[test]
1870 fn test_deduplicator_second_check_is_in_progress() {
1871 let d = Deduplicator::new(Duration::from_secs(60));
1872 d.check_and_register("key-1").unwrap();
1873 let r = d.check_and_register("key-1").unwrap();
1874 assert_eq!(r, DeduplicationResult::InProgress);
1875 }
1876
1877 #[test]
1878 fn test_deduplicator_complete_makes_cached() {
1879 let d = Deduplicator::new(Duration::from_secs(60));
1880 d.check_and_register("key-1").unwrap();
1881 d.complete("key-1", "result-value").unwrap();
1882 let r = d.check_and_register("key-1").unwrap();
1883 assert_eq!(r, DeduplicationResult::Cached("result-value".into()));
1884 }
1885
1886 #[test]
1887 fn test_deduplicator_different_keys_are_independent() {
1888 let d = Deduplicator::new(Duration::from_secs(60));
1889 d.check_and_register("key-a").unwrap();
1890 let r = d.check_and_register("key-b").unwrap();
1891 assert_eq!(r, DeduplicationResult::New);
1892 }
1893
1894 #[test]
1895 fn test_deduplicator_expired_entry_is_new() {
1896 let d = Deduplicator::new(Duration::ZERO); d.check_and_register("key-1").unwrap();
1898 d.complete("key-1", "old").unwrap();
1899 let r = d.check_and_register("key-1").unwrap();
1901 assert_eq!(r, DeduplicationResult::New);
1902 }
1903
1904 #[test]
1907 fn test_backpressure_guard_rejects_zero_capacity() {
1908 assert!(BackpressureGuard::new(0).is_err());
1909 }
1910
1911 #[test]
1912 fn test_backpressure_guard_acquire_within_capacity() {
1913 let g = BackpressureGuard::new(5).unwrap();
1914 assert!(g.try_acquire().is_ok());
1915 assert_eq!(g.depth().unwrap(), 1);
1916 }
1917
1918 #[test]
1919 fn test_backpressure_guard_sheds_when_full() {
1920 let g = BackpressureGuard::new(2).unwrap();
1921 g.try_acquire().unwrap();
1922 g.try_acquire().unwrap();
1923 let result = g.try_acquire();
1924 assert!(matches!(
1925 result,
1926 Err(AgentRuntimeError::BackpressureShed { .. })
1927 ));
1928 }
1929
1930 #[test]
1931 fn test_backpressure_guard_release_decrements_depth() {
1932 let g = BackpressureGuard::new(3).unwrap();
1933 g.try_acquire().unwrap();
1934 g.try_acquire().unwrap();
1935 g.release().unwrap();
1936 assert_eq!(g.depth().unwrap(), 1);
1937 }
1938
1939 #[test]
1940 fn test_backpressure_guard_release_on_empty_is_noop() {
1941 let g = BackpressureGuard::new(3).unwrap();
1942 g.release().unwrap(); assert_eq!(g.depth().unwrap(), 0);
1944 }
1945
1946 #[test]
1949 fn test_pipeline_runs_stages_in_order() {
1950 let p = Pipeline::new()
1951 .add_stage("upper", |s| Ok(s.to_uppercase()))
1952 .add_stage("append", |s| Ok(format!("{s}!")));
1953 let result = p.run("hello".into()).unwrap();
1954 assert_eq!(result, "HELLO!");
1955 }
1956
1957 #[test]
1958 fn test_pipeline_empty_pipeline_returns_input() {
1959 let p = Pipeline::new();
1960 assert_eq!(p.run("test".into()).unwrap(), "test");
1961 }
1962
1963 #[test]
1964 fn test_pipeline_stage_failure_short_circuits() {
1965 let p = Pipeline::new()
1966 .add_stage("fail", |_| {
1967 Err(AgentRuntimeError::Orchestration("boom".into()))
1968 })
1969 .add_stage("never", |s| Ok(s));
1970 assert!(p.run("input".into()).is_err());
1971 }
1972
1973 #[test]
1974 fn test_pipeline_stage_count() {
1975 let p = Pipeline::new()
1976 .add_stage("s1", |s| Ok(s))
1977 .add_stage("s2", |s| Ok(s));
1978 assert_eq!(p.stage_count(), 2);
1979 }
1980
1981 #[test]
1982 fn test_pipeline_execute_timed_captures_stage_durations() {
1983 let p = Pipeline::new()
1984 .add_stage("s1", |s| Ok(format!("{s}1")))
1985 .add_stage("s2", |s| Ok(format!("{s}2")));
1986 let result = p.execute_timed("x".to_string()).unwrap();
1987 assert_eq!(result.output, "x12");
1988 assert_eq!(result.stage_timings.len(), 2);
1989 assert_eq!(result.stage_timings[0].0, "s1");
1990 assert_eq!(result.stage_timings[1].0, "s2");
1991 }
1992
1993 #[test]
1996 fn test_backpressure_soft_limit_rejects_invalid_config() {
1997 let g = BackpressureGuard::new(5).unwrap();
1999 assert!(g.with_soft_limit(5).is_err());
2000 let g = BackpressureGuard::new(5).unwrap();
2001 assert!(g.with_soft_limit(6).is_err());
2002 }
2003
2004 #[test]
2005 fn test_backpressure_soft_limit_accepts_requests_below_soft() {
2006 let g = BackpressureGuard::new(5)
2007 .unwrap()
2008 .with_soft_limit(2)
2009 .unwrap();
2010 assert!(g.try_acquire().is_ok());
2012 assert!(g.try_acquire().is_ok());
2013 assert_eq!(g.depth().unwrap(), 2);
2014 }
2015
2016 #[test]
2017 fn test_backpressure_with_soft_limit_still_sheds_at_hard_capacity() {
2018 let g = BackpressureGuard::new(3)
2019 .unwrap()
2020 .with_soft_limit(2)
2021 .unwrap();
2022 g.try_acquire().unwrap();
2023 g.try_acquire().unwrap();
2024 g.try_acquire().unwrap(); let result = g.try_acquire();
2026 assert!(matches!(
2027 result,
2028 Err(AgentRuntimeError::BackpressureShed { .. })
2029 ));
2030 }
2031
2032 #[test]
2035 fn test_backpressure_hard_capacity_matches_new() {
2036 let g = BackpressureGuard::new(7).unwrap();
2037 assert_eq!(g.hard_capacity(), 7);
2038 }
2039
2040 #[test]
2043 fn test_pipeline_error_handler_recovers_from_stage_failure() {
2044 let p = Pipeline::new()
2045 .add_stage("fail_stage", |_| {
2046 Err(AgentRuntimeError::Orchestration("oops".into()))
2047 })
2048 .add_stage("append", |s| Ok(format!("{s}-recovered")))
2049 .with_error_handler(|stage_name, _err| format!("recovered_from_{stage_name}"));
2050 let result = p.run("input".to_string()).unwrap();
2051 assert_eq!(result, "recovered_from_fail_stage-recovered");
2052 }
2053
2054 #[test]
2057 fn test_circuit_state_eq() {
2058 assert_eq!(CircuitState::Closed, CircuitState::Closed);
2059 assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
2060 assert_eq!(
2061 CircuitState::Open { opened_at: std::time::Instant::now() },
2062 CircuitState::Open { opened_at: std::time::Instant::now() }
2063 );
2064 assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
2065 assert_ne!(CircuitState::Closed, CircuitState::Open { opened_at: std::time::Instant::now() });
2066 }
2067
2068 #[test]
2071 fn test_dedup_many_independent_keys() {
2072 let d = Deduplicator::new(Duration::from_secs(60));
2073 let ttl = Duration::from_secs(60);
2074 let results = d.dedup_many(&[("key-a", ttl), ("key-b", ttl), ("key-c", ttl)]).unwrap();
2075 assert_eq!(results.len(), 3);
2076 assert!(results.iter().all(|r| matches!(r, DeduplicationResult::New)));
2077 }
2078
2079 #[test]
2082 fn test_concurrent_circuit_breaker_opens_under_concurrent_failures() {
2083 use std::sync::Arc;
2084 use std::thread;
2085
2086 let cb = Arc::new(
2087 CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap(),
2088 );
2089 let n_threads = 8;
2090 let failures_per_thread = 2;
2091
2092 let mut handles = Vec::new();
2093 for _ in 0..n_threads {
2094 let cb = Arc::clone(&cb);
2095 handles.push(thread::spawn(move || {
2096 for _ in 0..failures_per_thread {
2097 let _ = cb.call(|| Err::<(), &str>("fail"));
2098 }
2099 }));
2100 }
2101 for h in handles {
2102 h.join().unwrap();
2103 }
2104
2105 let state = cb.state().unwrap();
2108 assert!(
2109 matches!(state, CircuitState::Open { .. }),
2110 "circuit should be open after many concurrent failures; got: {state:?}"
2111 );
2112 }
2113
2114 #[test]
2115 fn test_per_service_tracking_is_independent() {
2116 let backend = Arc::new(InMemoryCircuitBreakerBackend::new());
2117
2118 let cb_a = CircuitBreaker::new("service-a", 3, Duration::from_secs(60))
2119 .unwrap()
2120 .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2121 let cb_b = CircuitBreaker::new("service-b", 3, Duration::from_secs(60))
2122 .unwrap()
2123 .with_backend(Arc::clone(&backend) as Arc<dyn CircuitBreakerBackend>);
2124
2125 for _ in 0..3 {
2127 let _ = cb_a.call(|| Err::<(), &str>("fail"));
2128 }
2129
2130 let state_b = cb_b.state().unwrap();
2132 assert_eq!(
2133 state_b,
2134 CircuitState::Closed,
2135 "service-b should be unaffected by service-a failures"
2136 );
2137
2138 let state_a = cb_a.state().unwrap();
2140 assert!(
2141 matches!(state_a, CircuitState::Open { .. }),
2142 "service-a should be open"
2143 );
2144 }
2145
2146 #[test]
2149 fn test_backpressure_concurrent_acquires_are_consistent() {
2150 use std::sync::Arc;
2151 use std::thread;
2152
2153 let g = Arc::new(BackpressureGuard::new(100).unwrap());
2154 let mut handles = Vec::new();
2155
2156 for _ in 0..10 {
2157 let g_clone = Arc::clone(&g);
2158 handles.push(thread::spawn(move || {
2159 g_clone.try_acquire().ok();
2160 }));
2161 }
2162
2163 for h in handles {
2164 h.join().unwrap();
2165 }
2166
2167 assert_eq!(g.depth().unwrap(), 10);
2169 }
2170
2171 #[test]
2174 fn test_retry_policy_constant_has_fixed_delay() {
2175 let p = RetryPolicy::constant(3, 100).unwrap();
2176 assert_eq!(p.delay_for(1), Duration::from_millis(100));
2177 assert_eq!(p.delay_for(2), Duration::from_millis(100));
2178 assert_eq!(p.delay_for(10), Duration::from_millis(100));
2179 }
2180
2181 #[test]
2182 fn test_retry_policy_exponential_doubles() {
2183 let p = RetryPolicy::exponential(5, 10).unwrap();
2184 assert_eq!(p.delay_for(1), Duration::from_millis(10));
2185 assert_eq!(p.delay_for(2), Duration::from_millis(20));
2186 assert_eq!(p.delay_for(3), Duration::from_millis(40));
2187 }
2188
2189 #[test]
2190 fn test_retry_policy_with_max_attempts() {
2191 let p = RetryPolicy::constant(3, 50).unwrap();
2192 let p2 = p.with_max_attempts(7).unwrap();
2193 assert_eq!(p2.max_attempts, 7);
2194 assert!(RetryPolicy::constant(1, 50).unwrap().with_max_attempts(0).is_err());
2195 }
2196
2197 #[test]
2198 fn test_circuit_breaker_reset_returns_to_closed() {
2199 let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2200 cb.record_failure();
2201 cb.record_failure(); assert_ne!(cb.state().unwrap(), CircuitState::Closed);
2203 cb.reset();
2204 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
2205 assert_eq!(cb.failure_count().unwrap(), 0);
2206 }
2207
2208 #[test]
2209 fn test_deduplicator_clear_resets_all_state() {
2210 let d = Deduplicator::new(Duration::from_secs(60));
2211 d.check_and_register("k1").unwrap();
2212 d.check_and_register("k2").unwrap();
2213 d.complete("k1", "r1").unwrap();
2214 assert_eq!(d.in_flight_count().unwrap(), 1);
2215 assert_eq!(d.cached_count().unwrap(), 1);
2216 d.clear().unwrap();
2217 assert_eq!(d.in_flight_count().unwrap(), 0);
2218 assert_eq!(d.cached_count().unwrap(), 0);
2219 }
2220
2221 #[test]
2222 fn test_deduplicator_purge_expired_removes_stale() {
2223 let d = Deduplicator::new(Duration::from_millis(1));
2224 d.check_and_register("x").unwrap();
2225 d.complete("x", "result").unwrap();
2226 std::thread::sleep(Duration::from_millis(5));
2227 let removed = d.purge_expired().unwrap();
2228 assert_eq!(removed, 1);
2229 assert_eq!(d.cached_count().unwrap(), 0);
2230 }
2231
2232 #[test]
2233 fn test_backpressure_utilization_ratio() {
2234 let g = BackpressureGuard::new(4).unwrap();
2235 g.try_acquire().unwrap();
2236 g.try_acquire().unwrap();
2237 let ratio = g.utilization_ratio().unwrap();
2238 assert!((ratio - 0.5).abs() < 1e-5);
2239 }
2240
2241 #[test]
2242 fn test_pipeline_stage_count_and_names() {
2243 let p = Pipeline::new()
2244 .add_stage("first", |s| Ok(s + "1"))
2245 .add_stage("second", |s| Ok(s + "2"));
2246 assert_eq!(p.stage_count(), 2);
2247 assert_eq!(p.stage_names(), vec!["first", "second"]);
2248 }
2249
2250 #[test]
2251 fn test_pipeline_is_empty_true_for_new() {
2252 let p = Pipeline::new();
2253 assert!(p.is_empty());
2254 }
2255
2256 #[test]
2257 fn test_pipeline_is_empty_false_after_add_stage() {
2258 let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2259 assert!(!p.is_empty());
2260 }
2261
2262 #[test]
2263 fn test_circuit_breaker_service_name() {
2264 let cb = CircuitBreaker::new("my-service", 3, Duration::from_secs(1)).unwrap();
2265 assert_eq!(cb.service_name(), "my-service");
2266 }
2267
2268 #[test]
2269 fn test_retry_policy_none_has_max_one_attempt() {
2270 let p = RetryPolicy::none();
2271 assert_eq!(p.max_attempts, 1);
2272 assert_eq!(p.delay_for(0), Duration::ZERO);
2273 }
2274
2275 #[test]
2276 fn test_backpressure_is_full_false_when_empty() {
2277 let g = BackpressureGuard::new(5).unwrap();
2278 assert!(!g.is_full().unwrap());
2279 }
2280
2281 #[test]
2282 fn test_backpressure_is_full_true_when_at_capacity() {
2283 let g = BackpressureGuard::new(2).unwrap();
2284 g.try_acquire().unwrap();
2285 g.try_acquire().unwrap();
2286 assert!(g.is_full().unwrap());
2287 }
2288
2289 #[test]
2290 fn test_deduplicator_ttl_returns_configured_value() {
2291 let d = Deduplicator::new(Duration::from_secs(42));
2292 assert_eq!(d.ttl(), Duration::from_secs(42));
2293 }
2294
2295 #[test]
2296 fn test_circuit_breaker_is_closed_initially() {
2297 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(1)).unwrap();
2298 assert!(cb.is_closed());
2299 assert!(!cb.is_open());
2300 assert!(!cb.is_half_open());
2301 }
2302
2303 #[test]
2304 fn test_circuit_breaker_is_open_after_threshold_failures() {
2305 let cb = CircuitBreaker::new("svc", 2, Duration::from_secs(60)).unwrap();
2306 cb.record_failure();
2307 cb.record_failure();
2308 assert!(cb.is_open());
2309 assert!(!cb.is_closed());
2310 }
2311
2312 #[test]
2313 fn test_retry_policy_total_max_delay_constant() {
2314 let p = RetryPolicy::constant(3, 100).unwrap();
2316 assert_eq!(p.total_max_delay_ms(), 300);
2317 }
2318
2319 #[test]
2320 fn test_retry_policy_total_max_delay_none_is_zero() {
2321 let p = RetryPolicy::none();
2322 assert_eq!(p.total_max_delay_ms(), 0);
2323 }
2324
2325 #[test]
2326 fn test_retry_policy_is_none_true_for_none() {
2327 let p = RetryPolicy::none();
2328 assert!(p.is_none());
2329 }
2330
2331 #[test]
2332 fn test_retry_policy_is_none_false_for_exponential() {
2333 let p = RetryPolicy::exponential(3, 10).unwrap();
2334 assert!(!p.is_none());
2335 }
2336
2337 #[test]
2338 fn test_pipeline_has_error_handler_false_by_default() {
2339 let p = Pipeline::new().add_stage("s", |s: String| Ok(s));
2340 assert!(!p.has_error_handler());
2341 }
2342
2343 #[test]
2344 fn test_pipeline_has_error_handler_true_after_set() {
2345 let p = Pipeline::new()
2346 .with_error_handler(|_stage, _err| "recovered".to_string());
2347 assert!(p.has_error_handler());
2348 }
2349
2350 #[test]
2351 fn test_backpressure_reset_clears_depth() {
2352 let g = BackpressureGuard::new(5).unwrap();
2353 g.try_acquire().unwrap();
2354 g.try_acquire().unwrap();
2355 assert_eq!(g.depth().unwrap(), 2);
2356 g.reset();
2357 assert_eq!(g.depth().unwrap(), 0);
2358 }
2359
2360 #[test]
2361 fn test_deduplicator_in_flight_keys_returns_started_keys() {
2362 let d = Deduplicator::new(Duration::from_secs(60));
2363 d.check("key-a", Duration::from_secs(60)).unwrap();
2364 d.check("key-b", Duration::from_secs(60)).unwrap();
2365 let mut keys = d.in_flight_keys().unwrap();
2366 keys.sort();
2367 assert_eq!(keys, vec!["key-a", "key-b"]);
2368 }
2369
2370 #[test]
2373 fn test_retry_policy_with_base_delay_ms_changes_delay() {
2374 let p = RetryPolicy::exponential(3, 100)
2375 .unwrap()
2376 .with_base_delay_ms(200)
2377 .unwrap();
2378 assert_eq!(p.delay_for(1), Duration::from_millis(200));
2379 }
2380
2381 #[test]
2382 fn test_retry_policy_with_base_delay_ms_rejects_zero() {
2383 let p = RetryPolicy::exponential(3, 100).unwrap();
2384 assert!(p.with_base_delay_ms(0).is_err());
2385 }
2386
2387 #[test]
2388 fn test_backpressure_reset_depth_clears_counter() {
2389 let guard = BackpressureGuard::new(5).unwrap();
2390 guard.try_acquire().unwrap();
2391 guard.try_acquire().unwrap();
2392 assert_eq!(guard.depth().unwrap(), 2);
2393 guard.reset_depth().unwrap();
2394 assert_eq!(guard.depth().unwrap(), 0);
2395 }
2396
2397 #[test]
2398 fn test_pipeline_remove_stage_returns_true_if_found() {
2399 let mut p = Pipeline::new()
2400 .add_stage("a", |s| Ok(s))
2401 .add_stage("b", |s| Ok(s));
2402 assert!(p.remove_stage("a"));
2403 assert_eq!(p.stage_count(), 1);
2404 assert_eq!(p.stage_names(), vec!["b"]);
2405 }
2406
2407 #[test]
2408 fn test_pipeline_remove_stage_returns_false_if_missing() {
2409 let mut p = Pipeline::new().add_stage("x", |s| Ok(s));
2410 assert!(!p.remove_stage("nope"));
2411 assert_eq!(p.stage_count(), 1);
2412 }
2413
2414 #[test]
2415 fn test_pipeline_clear_removes_all_stages() {
2416 let mut p = Pipeline::new()
2417 .add_stage("a", |s| Ok(s))
2418 .add_stage("b", |s| Ok(s));
2419 p.clear();
2420 assert!(p.is_empty());
2421 }
2422
2423 #[test]
2426 fn test_circuit_breaker_threshold_accessor() {
2427 let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(30)).unwrap();
2428 assert_eq!(cb.threshold(), 5);
2429 }
2430
2431 #[test]
2432 fn test_circuit_breaker_recovery_window_accessor() {
2433 let window = Duration::from_secs(45);
2434 let cb = CircuitBreaker::new("svc", 3, window).unwrap();
2435 assert_eq!(cb.recovery_window(), window);
2436 }
2437
2438 #[test]
2439 fn test_pipeline_get_stage_name_at_returns_correct_names() {
2440 let p = Pipeline::new()
2441 .add_stage("first", |s| Ok(s))
2442 .add_stage("second", |s| Ok(s));
2443 assert_eq!(p.get_stage_name_at(0), Some("first"));
2444 assert_eq!(p.get_stage_name_at(1), Some("second"));
2445 assert_eq!(p.get_stage_name_at(2), None);
2446 }
2447
2448 #[test]
2451 fn test_retry_policy_can_retry_within_budget() {
2452 let p = RetryPolicy::exponential(3, 100).unwrap();
2453 assert!(p.can_retry(0));
2454 assert!(p.can_retry(1));
2455 assert!(p.can_retry(2));
2456 }
2457
2458 #[test]
2459 fn test_retry_policy_can_retry_false_when_exhausted() {
2460 let p = RetryPolicy::exponential(3, 100).unwrap();
2461 assert!(!p.can_retry(3));
2462 assert!(!p.can_retry(99));
2463 }
2464
2465 #[test]
2466 fn test_retry_policy_none_only_allows_first_attempt() {
2467 let p = RetryPolicy::none();
2468 assert!(p.can_retry(0));
2469 assert!(!p.can_retry(1));
2470 }
2471
2472 #[test]
2475 fn test_retry_policy_max_attempts_accessor() {
2476 let p = RetryPolicy::exponential(7, 100).unwrap();
2477 assert_eq!(p.max_attempts(), 7);
2478 }
2479
2480 #[test]
2481 fn test_pipeline_stage_names_owned_returns_strings() {
2482 let p = Pipeline::new()
2483 .add_stage("alpha", |s| Ok(s))
2484 .add_stage("beta", |s| Ok(s));
2485 let owned = p.stage_names_owned();
2486 assert_eq!(owned, vec!["alpha".to_string(), "beta".to_string()]);
2487 }
2488
2489 #[test]
2490 fn test_pipeline_stage_names_owned_empty_when_no_stages() {
2491 let p = Pipeline::new();
2492 assert!(p.stage_names_owned().is_empty());
2493 }
2494
2495 #[test]
2498 fn test_attempts_remaining_full_at_zero() {
2499 let p = RetryPolicy::exponential(4, 100).unwrap();
2500 assert_eq!(p.attempts_remaining(0), 4);
2501 }
2502
2503 #[test]
2504 fn test_attempts_remaining_decrements_correctly() {
2505 let p = RetryPolicy::exponential(4, 100).unwrap();
2506 assert_eq!(p.attempts_remaining(2), 2);
2507 assert_eq!(p.attempts_remaining(4), 0);
2508 }
2509
2510 #[test]
2511 fn test_attempts_remaining_zero_when_exhausted() {
2512 let p = RetryPolicy::exponential(3, 100).unwrap();
2513 assert_eq!(p.attempts_remaining(10), 0);
2514 }
2515
2516 #[test]
2519 fn test_retry_policy_max_attempts_getter() {
2520 let p = RetryPolicy::exponential(7, 50).unwrap();
2521 assert_eq!(p.max_attempts(), 7);
2522 }
2523
2524 #[test]
2525 fn test_circuit_breaker_failure_count_increments() {
2526 let cb = CircuitBreaker::new("svc2", 3, std::time::Duration::from_secs(60)).unwrap();
2527 cb.record_failure();
2528 cb.record_failure();
2529 assert_eq!(cb.failure_count().unwrap(), 2);
2530 }
2531
2532 #[test]
2533 fn test_circuit_breaker_record_success_resets_failures() {
2534 let cb = CircuitBreaker::new("svc3", 5, std::time::Duration::from_secs(60)).unwrap();
2535 cb.record_failure();
2536 cb.record_failure();
2537 cb.record_success();
2538 assert_eq!(cb.failure_count().unwrap(), 0);
2539 assert!(cb.is_closed());
2540 }
2541
2542 #[test]
2543 fn test_circuit_breaker_threshold_and_recovery_window() {
2544 let cb = CircuitBreaker::new("svc4", 3, std::time::Duration::from_secs(30)).unwrap();
2545 assert_eq!(cb.threshold(), 3);
2546 assert_eq!(cb.recovery_window(), std::time::Duration::from_secs(30));
2547 }
2548
2549 #[test]
2550 fn test_circuit_breaker_reset_clears_state() {
2551 let cb = CircuitBreaker::new("svc5", 2, std::time::Duration::from_secs(60)).unwrap();
2552 cb.record_failure();
2553 cb.record_failure(); assert!(cb.is_open());
2555 cb.reset();
2556 assert!(cb.is_closed());
2557 assert_eq!(cb.failure_count().unwrap(), 0);
2558 }
2559
2560 #[test]
2561 fn test_deduplicator_cached_count_after_complete() {
2562 let d = Deduplicator::new(Duration::from_secs(60));
2563 d.check("key1", Duration::from_secs(60)).unwrap();
2564 d.complete("key1", "result").unwrap();
2565 assert_eq!(d.cached_count().unwrap(), 1);
2566 }
2567
2568 #[test]
2569 fn test_deduplicator_ttl_matches_configured() {
2570 let d = Deduplicator::new(Duration::from_secs(42));
2571 assert_eq!(d.ttl(), Duration::from_secs(42));
2572 }
2573
2574 #[test]
2575 fn test_deduplicator_purge_expired_removes_stale_entries() {
2576 let d = Deduplicator::new(Duration::ZERO); d.check("stale", Duration::ZERO).unwrap();
2578 d.complete("stale", "val").unwrap();
2579 std::thread::sleep(std::time::Duration::from_millis(1));
2581 let removed = d.purge_expired().unwrap();
2582 assert!(removed >= 1);
2583 }
2584
2585 #[test]
2586 fn test_backpressure_remaining_capacity() {
2587 let g = BackpressureGuard::new(5).unwrap();
2588 g.try_acquire().unwrap();
2589 assert_eq!(g.remaining_capacity().unwrap(), 4);
2590 }
2591
2592 #[test]
2593 fn test_backpressure_soft_depth_ratio_without_soft_limit() {
2594 let g = BackpressureGuard::new(5).unwrap();
2595 assert_eq!(g.soft_depth_ratio(), 0.0);
2596 }
2597
2598 #[test]
2599 fn test_backpressure_soft_depth_ratio_with_soft_limit() {
2600 let g = BackpressureGuard::new(10).unwrap()
2601 .with_soft_limit(4).unwrap();
2602 g.try_acquire().unwrap();
2603 g.try_acquire().unwrap();
2604 let ratio = g.soft_depth_ratio();
2605 assert!((ratio - 0.5).abs() < 1e-6);
2606 }
2607
2608 #[test]
2611 fn test_retry_delay_ms_for_matches_delay_for() {
2612 let p = RetryPolicy::exponential(5, 100).unwrap();
2613 assert_eq!(p.delay_ms_for(1), p.delay_for(1).as_millis() as u64);
2614 assert_eq!(p.delay_ms_for(3), p.delay_for(3).as_millis() as u64);
2615 }
2616
2617 #[test]
2618 fn test_backpressure_soft_limit_returns_configured_value() {
2619 let g = BackpressureGuard::new(10).unwrap()
2620 .with_soft_limit(5).unwrap();
2621 assert_eq!(g.soft_limit(), Some(5));
2622 }
2623
2624 #[test]
2625 fn test_backpressure_soft_limit_none_when_not_set() {
2626 let g = BackpressureGuard::new(10).unwrap();
2627 assert_eq!(g.soft_limit(), None);
2628 }
2629
2630 #[test]
2631 fn test_pipeline_has_stage_returns_true_when_present() {
2632 let p = Pipeline::new().add_stage("step1", |s| Ok(s));
2633 assert!(p.has_stage("step1"));
2634 assert!(!p.has_stage("step2"));
2635 }
2636
2637 #[test]
2638 fn test_pipeline_has_stage_false_for_empty_pipeline() {
2639 let p = Pipeline::new();
2640 assert!(!p.has_stage("anything"));
2641 }
2642
2643 #[test]
2646 fn test_deduplicator_max_entries_none_by_default() {
2647 let d = Deduplicator::new(Duration::from_secs(60));
2648 assert_eq!(d.max_entries(), None);
2649 }
2650
2651 #[test]
2652 fn test_deduplicator_max_entries_set_via_builder() {
2653 let d = Deduplicator::new(Duration::from_secs(60))
2654 .with_max_entries(50)
2655 .unwrap();
2656 assert_eq!(d.max_entries(), Some(50));
2657 }
2658
2659 #[test]
2660 fn test_retry_policy_delay_for_exponential_grows() {
2661 let p = RetryPolicy::exponential(5, 100).unwrap();
2662 let d1 = p.delay_for(1);
2664 let d2 = p.delay_for(2);
2665 assert!(d2 > d1, "exponential delay should grow: attempt 2 > attempt 1");
2666 }
2667
2668 #[test]
2669 fn test_retry_policy_delay_for_constant_stays_same() {
2670 let p = RetryPolicy::constant(5, 200).unwrap();
2671 assert_eq!(p.delay_for(0), p.delay_for(1));
2672 assert_eq!(p.delay_for(1), p.delay_for(3));
2673 }
2674
2675 #[test]
2678 fn test_is_no_retry_true_for_none_policy() {
2679 let p = RetryPolicy::none();
2680 assert!(p.is_no_retry());
2681 }
2682
2683 #[test]
2684 fn test_is_no_retry_false_for_exponential_policy() {
2685 let p = RetryPolicy::exponential(3, 50).unwrap();
2686 assert!(!p.is_no_retry());
2687 }
2688
2689 #[test]
2690 fn test_is_no_retry_false_for_constant_policy_with_multiple_attempts() {
2691 let p = RetryPolicy::constant(2, 100).unwrap();
2692 assert!(!p.is_no_retry());
2693 }
2694
2695 #[test]
2698 fn test_is_exponential_true_for_exponential_policy() {
2699 let p = RetryPolicy::exponential(3, 50).unwrap();
2700 assert!(p.is_exponential());
2701 }
2702
2703 #[test]
2704 fn test_is_exponential_false_for_constant_policy() {
2705 let p = RetryPolicy::constant(3, 50).unwrap();
2706 assert!(!p.is_exponential());
2707 }
2708
2709 #[test]
2710 fn test_is_exponential_false_for_none_policy() {
2711 let p = RetryPolicy::none();
2712 assert!(!p.is_exponential());
2713 }
2714
2715 #[test]
2718 fn test_is_soft_limited_false_without_soft_limit() {
2719 let g = BackpressureGuard::new(10).unwrap();
2720 assert!(!g.is_soft_limited());
2721 }
2722
2723 #[test]
2724 fn test_is_soft_limited_true_when_soft_limit_set() {
2725 let g = BackpressureGuard::new(10)
2726 .unwrap()
2727 .with_soft_limit(5)
2728 .unwrap();
2729 assert!(g.is_soft_limited());
2730 }
2731
2732 #[test]
2735 fn test_retry_policy_base_delay_ms_exponential() {
2736 let p = RetryPolicy::exponential(3, 250).unwrap();
2737 assert_eq!(p.base_delay_ms(), 250);
2738 }
2739
2740 #[test]
2741 fn test_retry_policy_base_delay_ms_constant() {
2742 let p = RetryPolicy::constant(5, 100).unwrap();
2743 assert_eq!(p.base_delay_ms(), 100);
2744 }
2745
2746 #[test]
2747 fn test_retry_policy_base_delay_ms_none_is_zero() {
2748 let p = RetryPolicy::none();
2749 assert_eq!(p.base_delay_ms(), 0);
2750 }
2751
2752 #[test]
2753 fn test_backpressure_percent_full_zero_when_empty() {
2754 let g = BackpressureGuard::new(100).unwrap();
2755 let pct = g.percent_full().unwrap();
2756 assert!((pct - 0.0).abs() < 1e-9);
2757 }
2758
2759 #[test]
2760 fn test_backpressure_percent_full_capped_at_100() {
2761 let g = BackpressureGuard::new(10).unwrap();
2762 for _ in 0..10 {
2764 g.try_acquire().unwrap();
2765 }
2766 let pct = g.percent_full().unwrap();
2767 assert!((pct - 100.0).abs() < 1e-9);
2768 }
2769
2770 #[test]
2773 fn test_deduplicator_get_result_returns_cached_value() {
2774 let d = Deduplicator::new(std::time::Duration::from_secs(60));
2775 d.check_and_register("req-1").unwrap();
2776 d.complete("req-1", "the answer").unwrap();
2777 let result = d.get_result("req-1").unwrap();
2778 assert_eq!(result, Some("the answer".to_string()));
2779 }
2780
2781 #[test]
2782 fn test_deduplicator_get_result_missing_key_returns_none() {
2783 let d = Deduplicator::new(std::time::Duration::from_secs(60));
2784 assert_eq!(d.get_result("ghost").unwrap(), None);
2785 }
2786
2787 #[test]
2788 fn test_pipeline_rename_stage_succeeds() {
2789 let mut p = Pipeline::new().add_stage("old-name", |s: String| Ok(s));
2790 let renamed = p.rename_stage("old-name", "new-name");
2791 assert!(renamed);
2792 assert!(p.has_stage("new-name"));
2793 assert!(!p.has_stage("old-name"));
2794 }
2795
2796 #[test]
2797 fn test_pipeline_rename_stage_missing_returns_false() {
2798 let mut p = Pipeline::new();
2799 assert!(!p.rename_stage("nonexistent", "anything"));
2800 }
2801
2802 #[test]
2803 fn test_circuit_breaker_failure_rate_zero_initially() {
2804 let cb = CircuitBreaker::new("svc", 5, std::time::Duration::from_secs(10)).unwrap();
2805 assert!((cb.failure_rate() - 0.0).abs() < 1e-9);
2806 }
2807
2808 #[test]
2809 fn test_circuit_breaker_failure_rate_increases_with_failures() {
2810 let cb = CircuitBreaker::new("svc-fr", 4, std::time::Duration::from_secs(10)).unwrap();
2811 cb.record_failure();
2812 cb.record_failure();
2813 assert!((cb.failure_rate() - 0.5).abs() < 1e-9);
2815 }
2816
2817 #[test]
2820 fn test_prepend_stage_inserts_at_front() {
2821 let p = Pipeline::new()
2822 .add_stage("second", |s| Ok(s))
2823 .prepend_stage("first", |s| Ok(s));
2824 let names = p.stage_names_owned();
2825 assert_eq!(names[0], "first");
2826 assert_eq!(names[1], "second");
2827 }
2828
2829 #[test]
2830 fn test_prepend_stage_executes_before_existing_stages() {
2831 let p = Pipeline::new()
2832 .add_stage("append", |s| Ok(format!("{s}_appended")))
2833 .prepend_stage("prefix", |s| Ok(format!("pre_{s}")));
2834 let result = p.run("input".to_string()).unwrap();
2835 assert_eq!(result, "pre_input_appended");
2836 }
2837
2838 #[test]
2839 fn test_prepend_stage_on_empty_pipeline() {
2840 let p = Pipeline::new().prepend_stage("only", |s| Ok(s.to_uppercase()));
2841 let result = p.run("hello".to_string()).unwrap();
2842 assert_eq!(result, "HELLO");
2843 }
2844
2845 #[test]
2848 fn test_circuit_breaker_is_at_threshold_false_initially() {
2849 let cb = CircuitBreaker::new("svc", 3, std::time::Duration::from_secs(10)).unwrap();
2850 assert!(!cb.is_at_threshold());
2851 }
2852
2853 #[test]
2854 fn test_circuit_breaker_is_at_threshold_true_when_failures_reach_threshold() {
2855 let cb = CircuitBreaker::new("svc-t", 2, std::time::Duration::from_secs(10)).unwrap();
2856 cb.record_failure();
2857 assert!(!cb.is_at_threshold());
2858 cb.record_failure();
2859 assert!(cb.is_at_threshold());
2860 }
2861
2862 #[test]
2863 fn test_backpressure_headroom_ratio_one_when_empty() {
2864 let g = BackpressureGuard::new(10).unwrap();
2865 let ratio = g.headroom_ratio().unwrap();
2866 assert!((ratio - 1.0).abs() < 1e-9);
2867 }
2868
2869 #[test]
2870 fn test_backpressure_headroom_ratio_decreases_on_acquire() {
2871 let g = BackpressureGuard::new(4).unwrap();
2872 g.try_acquire().unwrap(); let ratio = g.headroom_ratio().unwrap();
2874 assert!((ratio - 0.75).abs() < 1e-9);
2875 }
2876
2877 #[test]
2880 fn test_pipeline_first_stage_name_returns_first() {
2881 let p = Pipeline::new()
2882 .add_stage("alpha", |s| Ok(s))
2883 .add_stage("beta", |s| Ok(s));
2884 assert_eq!(p.first_stage_name(), Some("alpha"));
2885 }
2886
2887 #[test]
2888 fn test_pipeline_first_stage_name_none_when_empty() {
2889 let p = Pipeline::new();
2890 assert!(p.first_stage_name().is_none());
2891 }
2892
2893 #[test]
2894 fn test_pipeline_last_stage_name_returns_last() {
2895 let p = Pipeline::new()
2896 .add_stage("alpha", |s| Ok(s))
2897 .add_stage("omega", |s| Ok(s));
2898 assert_eq!(p.last_stage_name(), Some("omega"));
2899 }
2900
2901 #[test]
2902 fn test_pipeline_stage_index_returns_correct_position() {
2903 let p = Pipeline::new()
2904 .add_stage("first", |s| Ok(s))
2905 .add_stage("second", |s| Ok(s))
2906 .add_stage("third", |s| Ok(s));
2907 assert_eq!(p.stage_index("first"), Some(0));
2908 assert_eq!(p.stage_index("second"), Some(1));
2909 assert_eq!(p.stage_index("third"), Some(2));
2910 assert_eq!(p.stage_index("missing"), None);
2911 }
2912
2913 #[test]
2914 fn test_backpressure_is_empty_true_when_no_slots_acquired() {
2915 let g = BackpressureGuard::new(10).unwrap();
2916 assert!(g.is_empty().unwrap());
2917 }
2918
2919 #[test]
2920 fn test_backpressure_is_empty_false_after_acquire() {
2921 let g = BackpressureGuard::new(10).unwrap();
2922 g.try_acquire().unwrap();
2923 assert!(!g.is_empty().unwrap());
2924 }
2925
2926 #[test]
2927 fn test_backpressure_available_capacity_decrements_on_acquire() {
2928 let g = BackpressureGuard::new(5).unwrap();
2929 assert_eq!(g.available_capacity().unwrap(), 5);
2930 g.try_acquire().unwrap();
2931 assert_eq!(g.available_capacity().unwrap(), 4);
2932 }
2933
2934 #[test]
2937 fn test_evict_oldest_removes_first_cached_entry() {
2938 let d = Deduplicator::new(std::time::Duration::from_secs(60));
2939 d.check_and_register("alpha").unwrap();
2941 d.check_and_register("beta").unwrap();
2942 d.complete("alpha", "result_a").unwrap();
2943 d.complete("beta", "result_b").unwrap();
2944 let removed = d.evict_oldest().unwrap();
2946 assert!(removed);
2947 assert!(d.get_result("alpha").unwrap().is_none());
2948 assert!(d.get_result("beta").unwrap().is_some());
2949 }
2950
2951 #[test]
2952 fn test_evict_oldest_returns_false_when_empty() {
2953 let d = Deduplicator::new(std::time::Duration::from_secs(60));
2954 assert!(!d.evict_oldest().unwrap());
2955 }
2956
2957 #[test]
2960 fn test_circuit_breaker_is_at_threshold_true_after_three_failures() {
2961 let cb = CircuitBreaker::new("svc-3", 3, std::time::Duration::from_secs(60)).unwrap();
2962 cb.record_failure();
2963 cb.record_failure();
2964 cb.record_failure();
2965 assert!(cb.is_at_threshold());
2966 }
2967
2968 #[test]
2971 fn test_failures_until_open_equals_threshold_initially() {
2972 let cb = CircuitBreaker::new("svc-fuo", 5, std::time::Duration::from_secs(60)).unwrap();
2973 assert_eq!(cb.failures_until_open(), 5);
2974 }
2975
2976 #[test]
2977 fn test_failures_until_open_decrements_with_each_failure() {
2978 let cb = CircuitBreaker::new("svc-fuo2", 4, std::time::Duration::from_secs(60)).unwrap();
2979 cb.record_failure();
2980 assert_eq!(cb.failures_until_open(), 3);
2981 cb.record_failure();
2982 assert_eq!(cb.failures_until_open(), 2);
2983 }
2984
2985 #[test]
2986 fn test_failures_until_open_zero_when_at_threshold() {
2987 let cb = CircuitBreaker::new("svc-fuo3", 2, std::time::Duration::from_secs(60)).unwrap();
2988 cb.record_failure();
2989 cb.record_failure();
2990 assert_eq!(cb.failures_until_open(), 0);
2991 }
2992
2993 #[test]
2996 fn test_deduplicator_cached_keys_empty_initially() {
2997 let d = Deduplicator::new(Duration::from_secs(60));
2998 assert!(d.cached_keys().unwrap().is_empty());
2999 }
3000
3001 #[test]
3002 fn test_deduplicator_cached_keys_contains_completed_key() {
3003 let d = Deduplicator::new(Duration::from_secs(60));
3004 d.check_and_register("ck-key").unwrap();
3005 d.complete("ck-key", "result").unwrap();
3006 let keys = d.cached_keys().unwrap();
3007 assert!(keys.contains(&"ck-key".to_string()));
3008 }
3009
3010 #[test]
3011 fn test_deduplicator_cached_keys_excludes_in_flight() {
3012 let d = Deduplicator::new(Duration::from_secs(60));
3013 d.check_and_register("pending-key").unwrap();
3014 assert!(!d.cached_keys().unwrap().contains(&"pending-key".to_string()));
3016 }
3017
3018 #[test]
3019 fn test_deduplicator_cached_keys_multiple_entries() {
3020 let d = Deduplicator::new(Duration::from_secs(60));
3021 for k in ["alpha", "beta", "gamma"] {
3022 d.check_and_register(k).unwrap();
3023 d.complete(k, "v").unwrap();
3024 }
3025 let keys = d.cached_keys().unwrap();
3026 assert_eq!(keys.len(), 3);
3027 }
3028
3029 #[test]
3032 fn test_retry_policy_is_constant_true_for_constant() {
3033 let p = RetryPolicy::constant(3, 100).unwrap();
3034 assert!(p.is_constant());
3035 assert!(!p.is_exponential());
3036 }
3037
3038 #[test]
3039 fn test_retry_policy_is_constant_false_for_exponential() {
3040 let p = RetryPolicy::exponential(3, 100).unwrap();
3041 assert!(!p.is_constant());
3042 }
3043
3044 #[test]
3045 fn test_retry_policy_total_max_delay_ms_constant() {
3046 let p = RetryPolicy::constant(3, 100).unwrap();
3048 assert_eq!(p.total_max_delay_ms(), 300);
3049 }
3050
3051 #[test]
3052 fn test_retry_policy_total_max_delay_ms_exponential() {
3053 let p = RetryPolicy::exponential(3, 100).unwrap();
3055 let total = p.total_max_delay_ms();
3056 assert!(total >= 300); }
3058
3059 #[test]
3062 fn test_circuit_breaker_is_healthy_true_when_closed() {
3063 let cb = CircuitBreaker::new("svc-ih1", 3, Duration::from_secs(60)).unwrap();
3064 assert!(cb.is_healthy());
3065 }
3066
3067 #[test]
3068 fn test_circuit_breaker_is_healthy_false_when_open() {
3069 let cb = CircuitBreaker::new("svc-ih2", 1, Duration::from_secs(60)).unwrap();
3070 let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3071 assert!(!cb.is_healthy());
3072 }
3073
3074 #[test]
3075 fn test_circuit_breaker_is_half_open_after_zero_recovery() {
3076 let cb = CircuitBreaker::new("svc-ho1", 1, Duration::ZERO).unwrap();
3077 let _: Result<(), _> = cb.call(|| Err::<(), _>("fail".to_string()));
3078 assert!(cb.is_half_open() || cb.is_healthy()); }
3081
3082 #[test]
3085 fn test_deduplicator_is_idle_true_when_empty() {
3086 let d = Deduplicator::new(Duration::from_secs(60));
3087 assert!(d.is_idle().unwrap());
3088 }
3089
3090 #[test]
3091 fn test_deduplicator_is_idle_false_when_in_flight() {
3092 let d = Deduplicator::new(Duration::from_secs(60));
3093 d.check_and_register("req-x").unwrap();
3094 assert!(!d.is_idle().unwrap());
3095 }
3096
3097 #[test]
3098 fn test_deduplicator_is_idle_true_after_complete() {
3099 let d = Deduplicator::new(Duration::from_secs(60));
3100 d.check_and_register("req-y").unwrap();
3101 d.complete("req-y", "done").unwrap();
3102 assert!(d.is_idle().unwrap());
3103 }
3104
3105 #[test]
3108 fn test_deduplicator_in_flight_count_zero_initially() {
3109 let d = Deduplicator::new(Duration::from_secs(60));
3110 assert_eq!(d.in_flight_count().unwrap(), 0);
3111 }
3112
3113 #[test]
3114 fn test_deduplicator_in_flight_count_increments_on_register() {
3115 let d = Deduplicator::new(Duration::from_secs(60));
3116 d.check_and_register("k1").unwrap();
3117 d.check_and_register("k2").unwrap();
3118 assert_eq!(d.in_flight_count().unwrap(), 2);
3119 }
3120
3121 #[test]
3122 fn test_deduplicator_in_flight_count_decrements_after_complete() {
3123 let d = Deduplicator::new(Duration::from_secs(60));
3124 d.check_and_register("k1").unwrap();
3125 d.complete("k1", "result").unwrap();
3126 assert_eq!(d.in_flight_count().unwrap(), 0);
3127 }
3128
3129 #[test]
3132 fn test_deduplicator_total_count_sums_in_flight_and_cached() {
3133 let d = Deduplicator::new(Duration::from_secs(60));
3134 d.check_and_register("k1").unwrap(); d.check_and_register("k2").unwrap(); d.complete("k1", "done").unwrap(); assert_eq!(d.total_count().unwrap(), 2);
3139 }
3140
3141 #[test]
3142 fn test_deduplicator_total_count_zero_when_empty() {
3143 let d = Deduplicator::new(Duration::from_secs(60));
3144 assert_eq!(d.total_count().unwrap(), 0);
3145 }
3146
3147 #[test]
3148 fn test_backpressure_acquired_count_zero_initially() {
3149 let g = BackpressureGuard::new(5).unwrap();
3150 assert_eq!(g.acquired_count().unwrap(), 0);
3151 }
3152
3153 #[test]
3154 fn test_backpressure_acquired_count_increments_on_acquire() {
3155 let g = BackpressureGuard::new(5).unwrap();
3156 g.try_acquire().unwrap();
3157 g.try_acquire().unwrap();
3158 assert_eq!(g.acquired_count().unwrap(), 2);
3159 }
3160
3161 #[test]
3162 fn test_pipeline_swap_stages_swaps_positions() {
3163 let mut p = Pipeline::new()
3164 .add_stage("a", |s| Ok(s + "A"))
3165 .add_stage("b", |s| Ok(s + "B"));
3166 let swapped = p.swap_stages("a", "b");
3167 assert!(swapped);
3168 assert_eq!(p.first_stage_name().unwrap(), "b");
3169 assert_eq!(p.last_stage_name().unwrap(), "a");
3170 }
3171
3172 #[test]
3173 fn test_pipeline_swap_stages_returns_false_for_unknown_stage() {
3174 let mut p = Pipeline::new().add_stage("a", |s| Ok(s));
3175 assert!(!p.swap_stages("a", "missing"));
3176 }
3177
3178 #[test]
3179 fn test_retry_policy_will_retry_at_all_false_for_none() {
3180 let p = RetryPolicy::none();
3181 assert!(!p.will_retry_at_all());
3182 }
3183
3184 #[test]
3185 fn test_retry_policy_will_retry_at_all_true_for_exponential() {
3186 let p = RetryPolicy::exponential(3, 100).unwrap();
3187 assert!(p.will_retry_at_all());
3188 }
3189
3190 #[test]
3193 fn test_deduplicator_fail_removes_in_flight_key() {
3194 let d = Deduplicator::new(Duration::from_secs(60));
3195 d.check_and_register("failing-req").unwrap();
3196 assert!(!d.is_idle().unwrap());
3197 d.fail("failing-req").unwrap();
3198 assert!(d.is_idle().unwrap());
3199 }
3200
3201 #[test]
3202 fn test_deduplicator_fail_on_unknown_key_is_noop() {
3203 let d = Deduplicator::new(Duration::from_secs(60));
3204 assert!(d.fail("nonexistent").is_ok());
3205 }
3206
3207 #[test]
3208 fn test_deduplicator_fail_allows_reregistration() {
3209 let d = Deduplicator::new(Duration::from_secs(60));
3210 d.check_and_register("retry-key").unwrap();
3211 d.fail("retry-key").unwrap();
3212 let result = d.check_and_register("retry-key").unwrap();
3213 assert_eq!(result, DeduplicationResult::New);
3214 }
3215
3216 #[test]
3219 fn test_retry_policy_max_total_delay_ms_constant_policy() {
3220 let p = RetryPolicy::constant(3, 100).unwrap();
3221 assert_eq!(p.max_total_delay_ms(), 300);
3223 }
3224
3225 #[test]
3226 fn test_retry_policy_max_total_delay_ms_single_attempt() {
3227 let p = RetryPolicy::constant(1, 50).unwrap();
3228 assert_eq!(p.max_total_delay_ms(), 50);
3229 }
3230
3231 #[test]
3234 fn test_retry_policy_is_last_attempt_true_at_max() {
3235 let p = RetryPolicy::exponential(3, 100).unwrap();
3236 assert!(p.is_last_attempt(3));
3237 }
3238
3239 #[test]
3240 fn test_retry_policy_is_last_attempt_false_before_max() {
3241 let p = RetryPolicy::exponential(3, 100).unwrap();
3242 assert!(!p.is_last_attempt(2));
3243 }
3244
3245 #[test]
3246 fn test_retry_policy_is_last_attempt_true_beyond_max() {
3247 let p = RetryPolicy::exponential(3, 100).unwrap();
3248 assert!(p.is_last_attempt(4));
3249 }
3250
3251 #[test]
3254 fn test_retry_policy_delay_sum_ms_constant_two_attempts() {
3255 let p = RetryPolicy::constant(5, 100).unwrap();
3256 assert_eq!(p.delay_sum_ms(2), 200);
3257 }
3258
3259 #[test]
3260 fn test_retry_policy_delay_sum_ms_capped_at_max_attempts() {
3261 let p = RetryPolicy::constant(2, 50).unwrap();
3262 assert_eq!(p.delay_sum_ms(10), 100);
3264 }
3265
3266 #[test]
3269 fn test_retry_policy_avg_delay_ms_constant() {
3270 let p = RetryPolicy::constant(4, 100).unwrap();
3271 assert_eq!(p.avg_delay_ms(), 100);
3273 }
3274
3275 #[test]
3276 fn test_retry_policy_avg_delay_ms_single_attempt_policy() {
3277 let p = RetryPolicy::none();
3279 assert_eq!(p.avg_delay_ms(), 0);
3280 }
3281
3282 #[test]
3283 fn test_backoff_factor_exponential_returns_two() {
3284 let p = RetryPolicy::exponential(3, 100).unwrap();
3285 assert!((p.backoff_factor() - 2.0).abs() < 1e-9);
3286 }
3287
3288 #[test]
3289 fn test_backoff_factor_constant_returns_one() {
3290 let p = RetryPolicy::constant(3, 100).unwrap();
3291 assert!((p.backoff_factor() - 1.0).abs() < 1e-9);
3292 }
3293
3294 #[test]
3295 fn test_pipeline_count_stages_matching_counts_by_keyword() {
3296 let p = Pipeline::new()
3297 .add_stage("normalize-text", |s| Ok(s))
3298 .add_stage("text-trim", |s| Ok(s))
3299 .add_stage("embed", |s| Ok(s));
3300 assert_eq!(p.count_stages_matching("text"), 2);
3301 assert_eq!(p.count_stages_matching("embed"), 1);
3302 assert_eq!(p.count_stages_matching("missing"), 0);
3303 }
3304
3305 #[test]
3306 fn test_pipeline_count_stages_matching_case_insensitive() {
3307 let p = Pipeline::new().add_stage("TEXT-CLEAN", |s| Ok(s));
3308 assert_eq!(p.count_stages_matching("text"), 1);
3309 }
3310
3311 #[test]
3312 fn test_backpressure_guard_over_soft_limit_true_when_exceeded() {
3313 let guard = BackpressureGuard::new(10)
3314 .unwrap()
3315 .with_soft_limit(1)
3316 .unwrap();
3317 guard.try_acquire().unwrap();
3318 guard.try_acquire().unwrap();
3319 assert!(guard.over_soft_limit().unwrap());
3320 }
3321
3322 #[test]
3323 fn test_backpressure_guard_over_soft_limit_false_when_no_soft_limit() {
3324 let guard = BackpressureGuard::new(10).unwrap();
3325 guard.try_acquire().unwrap();
3326 assert!(!guard.over_soft_limit().unwrap());
3327 }
3328}