1use crate::error::AgentRuntimeError;
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24
25pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
27
28#[allow(dead_code)]
35fn recover_lock<'a, T>(
36 result: std::sync::LockResult<std::sync::MutexGuard<'a, T>>,
37 ctx: &str,
38) -> std::sync::MutexGuard<'a, T>
39where
40 T: ?Sized,
41{
42 match result {
43 Ok(guard) => guard,
44 Err(poisoned) => {
45 tracing::warn!("mutex poisoned in {ctx}, recovering inner value");
46 poisoned.into_inner()
47 }
48 }
49}
50
51fn timed_lock<'a, T>(mutex: &'a Mutex<T>, ctx: &str) -> std::sync::MutexGuard<'a, T>
56where
57 T: ?Sized,
58{
59 let start = std::time::Instant::now();
60 let result = mutex.lock();
61 let elapsed = start.elapsed();
62 if elapsed > std::time::Duration::from_millis(5) {
63 tracing::warn!(
64 duration_ms = elapsed.as_millis(),
65 ctx = ctx,
66 "slow mutex acquisition"
67 );
68 }
69 match result {
70 Ok(guard) => guard,
71 Err(poisoned) => {
72 tracing::warn!("mutex poisoned in {ctx}, recovering inner value");
73 poisoned.into_inner()
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
82pub struct RetryPolicy {
83 pub max_attempts: u32,
85 pub base_delay: Duration,
87}
88
89impl RetryPolicy {
90 pub fn exponential(max_attempts: u32, base_ms: u64) -> Result<Self, AgentRuntimeError> {
100 if max_attempts == 0 {
101 return Err(AgentRuntimeError::Orchestration(
102 "max_attempts must be >= 1".into(),
103 ));
104 }
105 Ok(Self {
106 max_attempts,
107 base_delay: Duration::from_millis(base_ms),
108 })
109 }
110
111 pub fn delay_for(&self, attempt: u32) -> Duration {
115 let exp = attempt.saturating_sub(1);
116 let multiplier = 1u64.checked_shl(exp.min(63)).unwrap_or(u64::MAX);
117 let millis = self
118 .base_delay
119 .as_millis()
120 .saturating_mul(multiplier as u128);
121 let raw = Duration::from_millis(millis.min(u64::MAX as u128) as u64);
122 raw.min(MAX_RETRY_DELAY)
123 }
124}
125
126#[derive(Debug, Clone, PartialEq)]
132pub enum CircuitState {
133 Closed,
135 Open {
137 opened_at: Instant,
139 },
140 HalfOpen,
142}
143
144pub trait CircuitBreakerBackend: Send + Sync {
152 fn increment_failures(&self, service: &str) -> u32;
154 fn reset_failures(&self, service: &str);
156 fn get_failures(&self, service: &str) -> u32;
158 fn set_open_at(&self, service: &str, at: std::time::Instant);
160 fn clear_open_at(&self, service: &str);
162 fn get_open_at(&self, service: &str) -> Option<std::time::Instant>;
164}
165
166pub struct InMemoryCircuitBreakerBackend {
170 inner: Arc<Mutex<InMemoryBackendState>>,
171}
172
173struct InMemoryBackendState {
174 consecutive_failures: u32,
175 open_at: Option<std::time::Instant>,
176}
177
178impl InMemoryCircuitBreakerBackend {
179 pub fn new() -> Self {
181 Self {
182 inner: Arc::new(Mutex::new(InMemoryBackendState {
183 consecutive_failures: 0,
184 open_at: None,
185 })),
186 }
187 }
188}
189
190impl Default for InMemoryCircuitBreakerBackend {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl CircuitBreakerBackend for InMemoryCircuitBreakerBackend {
197 fn increment_failures(&self, _service: &str) -> u32 {
198 let mut state = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::increment_failures");
199 state.consecutive_failures += 1;
200 state.consecutive_failures
201 }
202
203 fn reset_failures(&self, _service: &str) {
204 let mut state = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::reset_failures");
205 state.consecutive_failures = 0;
206 }
207
208 fn get_failures(&self, _service: &str) -> u32 {
209 let state = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_failures");
210 state.consecutive_failures
211 }
212
213 fn set_open_at(&self, _service: &str, at: std::time::Instant) {
214 let mut state = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::set_open_at");
215 state.open_at = Some(at);
216 }
217
218 fn clear_open_at(&self, _service: &str) {
219 let mut state = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::clear_open_at");
220 state.open_at = None;
221 }
222
223 fn get_open_at(&self, _service: &str) -> Option<std::time::Instant> {
224 let state = timed_lock(&self.inner, "InMemoryCircuitBreakerBackend::get_open_at");
225 state.open_at
226 }
227}
228
229#[derive(Clone)]
238pub struct CircuitBreaker {
239 threshold: u32,
240 recovery_window: Duration,
241 service: String,
242 backend: Arc<dyn CircuitBreakerBackend>,
243}
244
245impl std::fmt::Debug for CircuitBreaker {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 f.debug_struct("CircuitBreaker")
248 .field("threshold", &self.threshold)
249 .field("recovery_window", &self.recovery_window)
250 .field("service", &self.service)
251 .finish()
252 }
253}
254
255impl CircuitBreaker {
256 pub fn new(
263 service: impl Into<String>,
264 threshold: u32,
265 recovery_window: Duration,
266 ) -> Result<Self, AgentRuntimeError> {
267 if threshold == 0 {
268 return Err(AgentRuntimeError::Orchestration(
269 "circuit breaker threshold must be >= 1".into(),
270 ));
271 }
272 let service = service.into();
273 Ok(Self {
274 threshold,
275 recovery_window,
276 service,
277 backend: Arc::new(InMemoryCircuitBreakerBackend::new()),
278 })
279 }
280
281 pub fn with_backend(mut self, backend: Arc<dyn CircuitBreakerBackend>) -> Self {
285 self.backend = backend;
286 self
287 }
288
289 #[allow(dead_code)]
291 fn current_state(&self) -> CircuitState {
292 match self.backend.get_open_at(&self.service) {
293 Some(opened_at) => CircuitState::Open { opened_at },
294 None => {
295 CircuitState::Closed
301 }
302 }
303 }
304
305 #[tracing::instrument(skip(self, f))]
312 pub fn call<T, E, F>(&self, f: F) -> Result<T, AgentRuntimeError>
313 where
314 F: FnOnce() -> Result<T, E>,
315 E: std::fmt::Display,
316 {
317 let effective_state = match self.backend.get_open_at(&self.service) {
319 Some(opened_at) => {
320 if opened_at.elapsed() >= self.recovery_window {
321 self.backend.clear_open_at(&self.service);
323 tracing::info!("circuit moved to half-open for {}", self.service);
324 CircuitState::HalfOpen
325 } else {
326 CircuitState::Open { opened_at }
327 }
328 }
329 None => {
330 let failures = self.backend.get_failures(&self.service);
334 if failures >= self.threshold {
335 CircuitState::HalfOpen
336 } else {
337 CircuitState::Closed
338 }
339 }
340 };
341
342 tracing::debug!("circuit state: {:?}", effective_state);
343
344 match effective_state {
345 CircuitState::Open { .. } => {
346 return Err(AgentRuntimeError::CircuitOpen {
347 service: self.service.clone(),
348 });
349 }
350 CircuitState::Closed | CircuitState::HalfOpen => {}
351 }
352
353 match f() {
355 Ok(val) => {
356 self.backend.reset_failures(&self.service);
357 self.backend.clear_open_at(&self.service);
358 tracing::info!("circuit closed for {}", self.service);
359 Ok(val)
360 }
361 Err(e) => {
362 let failures = self.backend.increment_failures(&self.service);
363 if failures >= self.threshold {
364 let now = Instant::now();
365 self.backend.set_open_at(&self.service, now);
366 tracing::info!("circuit opened for {}", self.service);
367 }
368 Err(AgentRuntimeError::Orchestration(e.to_string()))
369 }
370 }
371 }
372
373 pub fn state(&self) -> Result<CircuitState, AgentRuntimeError> {
375 let state = match self.backend.get_open_at(&self.service) {
376 Some(opened_at) => {
377 if opened_at.elapsed() >= self.recovery_window {
378 let failures = self.backend.get_failures(&self.service);
380 if failures >= self.threshold {
381 CircuitState::HalfOpen
382 } else {
383 CircuitState::Closed
384 }
385 } else {
386 CircuitState::Open { opened_at }
387 }
388 }
389 None => {
390 let failures = self.backend.get_failures(&self.service);
391 if failures >= self.threshold {
392 CircuitState::HalfOpen
393 } else {
394 CircuitState::Closed
395 }
396 }
397 };
398 Ok(state)
399 }
400
401 pub fn failure_count(&self) -> Result<u32, AgentRuntimeError> {
403 Ok(self.backend.get_failures(&self.service))
404 }
405}
406
407#[derive(Debug, Clone, PartialEq)]
411pub enum DeduplicationResult {
412 New,
414 Cached(String),
416 InProgress,
418}
419
420#[derive(Debug, Clone)]
427pub struct Deduplicator {
428 ttl: Duration,
429 inner: Arc<Mutex<DeduplicatorInner>>,
430}
431
432#[derive(Debug)]
433struct DeduplicatorInner {
434 cache: HashMap<String, (String, Instant)>, in_flight: HashMap<String, Instant>, }
437
438impl Deduplicator {
439 pub fn new(ttl: Duration) -> Self {
441 Self {
442 ttl,
443 inner: Arc::new(Mutex::new(DeduplicatorInner {
444 cache: HashMap::new(),
445 in_flight: HashMap::new(),
446 })),
447 }
448 }
449
450 pub fn check_and_register(&self, key: &str) -> Result<DeduplicationResult, AgentRuntimeError> {
454 let mut inner = timed_lock(&self.inner, "Deduplicator::check_and_register");
455
456 let now = Instant::now();
457
458 inner
460 .cache
461 .retain(|_, (_, ts)| now.duration_since(*ts) < self.ttl);
462 inner
463 .in_flight
464 .retain(|_, ts| now.duration_since(*ts) < self.ttl);
465
466 if let Some((result, _)) = inner.cache.get(key) {
467 return Ok(DeduplicationResult::Cached(result.clone()));
468 }
469
470 if inner.in_flight.contains_key(key) {
471 return Ok(DeduplicationResult::InProgress);
472 }
473
474 inner.in_flight.insert(key.to_owned(), now);
475 Ok(DeduplicationResult::New)
476 }
477
478 pub fn complete(&self, key: &str, result: impl Into<String>) -> Result<(), AgentRuntimeError> {
480 let mut inner = timed_lock(&self.inner, "Deduplicator::complete");
481 inner.in_flight.remove(key);
482 inner
483 .cache
484 .insert(key.to_owned(), (result.into(), Instant::now()));
485 Ok(())
486 }
487}
488
489#[derive(Debug, Clone)]
499pub struct BackpressureGuard {
500 capacity: usize,
501 soft_capacity: Option<usize>,
502 inner: Arc<Mutex<usize>>,
503}
504
505impl BackpressureGuard {
506 pub fn new(capacity: usize) -> Result<Self, AgentRuntimeError> {
512 if capacity == 0 {
513 return Err(AgentRuntimeError::Orchestration(
514 "BackpressureGuard capacity must be > 0".into(),
515 ));
516 }
517 Ok(Self {
518 capacity,
519 soft_capacity: None,
520 inner: Arc::new(Mutex::new(0)),
521 })
522 }
523
524 pub fn with_soft_limit(mut self, soft: usize) -> Result<Self, AgentRuntimeError> {
527 if soft >= self.capacity {
528 return Err(AgentRuntimeError::Orchestration(
529 "soft_capacity must be less than hard capacity".into(),
530 ));
531 }
532 self.soft_capacity = Some(soft);
533 Ok(self)
534 }
535
536 pub fn try_acquire(&self) -> Result<(), AgentRuntimeError> {
545 let mut depth = timed_lock(&self.inner, "BackpressureGuard::try_acquire");
546 if *depth >= self.capacity {
547 return Err(AgentRuntimeError::BackpressureShed {
548 depth: *depth,
549 capacity: self.capacity,
550 });
551 }
552 *depth += 1;
553 if let Some(soft) = self.soft_capacity {
554 if *depth >= soft {
555 tracing::warn!(
556 depth = *depth,
557 soft_capacity = soft,
558 hard_capacity = self.capacity,
559 "backpressure approaching hard limit"
560 );
561 }
562 }
563 Ok(())
564 }
565
566 pub fn release(&self) -> Result<(), AgentRuntimeError> {
568 let mut depth = timed_lock(&self.inner, "BackpressureGuard::release");
569 *depth = depth.saturating_sub(1);
570 Ok(())
571 }
572
573 pub fn depth(&self) -> Result<usize, AgentRuntimeError> {
575 let depth = timed_lock(&self.inner, "BackpressureGuard::depth");
576 Ok(*depth)
577 }
578
579 pub fn soft_depth_ratio(&self) -> f32 {
584 match self.soft_capacity {
585 None => 0.0,
586 Some(soft) => {
587 let depth = timed_lock(&self.inner, "BackpressureGuard::soft_depth_ratio");
588 *depth as f32 / soft as f32
589 }
590 }
591 }
592}
593
594pub struct Stage {
598 pub name: String,
600 pub handler: Box<dyn Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync>,
602}
603
604impl std::fmt::Debug for Stage {
605 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606 f.debug_struct("Stage").field("name", &self.name).finish()
607 }
608}
609
610#[derive(Debug)]
617pub struct Pipeline {
618 stages: Vec<Stage>,
619}
620
621impl Pipeline {
622 pub fn new() -> Self {
624 Self { stages: Vec::new() }
625 }
626
627 pub fn add_stage(
629 mut self,
630 name: impl Into<String>,
631 handler: impl Fn(String) -> Result<String, AgentRuntimeError> + Send + Sync + 'static,
632 ) -> Self {
633 self.stages.push(Stage {
634 name: name.into(),
635 handler: Box::new(handler),
636 });
637 self
638 }
639
640 #[tracing::instrument(skip(self))]
642 pub fn run(&self, input: String) -> Result<String, AgentRuntimeError> {
643 let mut current = input;
644 for stage in &self.stages {
645 tracing::debug!(stage = %stage.name, "running pipeline stage");
646 current = (stage.handler)(current).map_err(|e| {
647 tracing::error!(stage = %stage.name, error = %e, "pipeline stage failed");
648 e
649 })?;
650 }
651 Ok(current)
652 }
653
654 pub fn stage_count(&self) -> usize {
656 self.stages.len()
657 }
658}
659
660impl Default for Pipeline {
661 fn default() -> Self {
662 Self::new()
663 }
664}
665
666#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
675 fn test_retry_policy_rejects_zero_attempts() {
676 assert!(RetryPolicy::exponential(0, 100).is_err());
677 }
678
679 #[test]
680 fn test_retry_policy_delay_attempt_1_equals_base() {
681 let p = RetryPolicy::exponential(3, 100).unwrap();
682 assert_eq!(p.delay_for(1), Duration::from_millis(100));
683 }
684
685 #[test]
686 fn test_retry_policy_delay_doubles_each_attempt() {
687 let p = RetryPolicy::exponential(5, 100).unwrap();
688 assert_eq!(p.delay_for(2), Duration::from_millis(200));
689 assert_eq!(p.delay_for(3), Duration::from_millis(400));
690 assert_eq!(p.delay_for(4), Duration::from_millis(800));
691 }
692
693 #[test]
694 fn test_retry_policy_delay_capped_at_max() {
695 let p = RetryPolicy::exponential(10, 10_000).unwrap();
696 assert_eq!(p.delay_for(10), MAX_RETRY_DELAY);
697 }
698
699 #[test]
700 fn test_retry_policy_delay_never_exceeds_max_for_any_attempt() {
701 let p = RetryPolicy::exponential(10, 1000).unwrap();
702 for attempt in 1..=10 {
703 assert!(p.delay_for(attempt) <= MAX_RETRY_DELAY);
704 }
705 }
706
707 #[test]
710 fn test_circuit_breaker_rejects_zero_threshold() {
711 assert!(CircuitBreaker::new("svc", 0, Duration::from_secs(1)).is_err());
712 }
713
714 #[test]
715 fn test_circuit_breaker_starts_closed() {
716 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
717 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
718 }
719
720 #[test]
721 fn test_circuit_breaker_success_keeps_closed() {
722 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
723 let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(42));
724 assert!(result.is_ok());
725 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
726 }
727
728 #[test]
729 fn test_circuit_breaker_opens_after_threshold_failures() {
730 let cb = CircuitBreaker::new("svc", 3, Duration::from_secs(60)).unwrap();
731 for _ in 0..3 {
732 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("oops".to_string()));
733 }
734 assert!(matches!(cb.state().unwrap(), CircuitState::Open { .. }));
735 }
736
737 #[test]
738 fn test_circuit_breaker_open_fast_fails() {
739 let cb = CircuitBreaker::new("svc", 1, Duration::from_secs(3600)).unwrap();
740 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
741 let result: Result<(), AgentRuntimeError> = cb.call(|| Ok::<(), AgentRuntimeError>(()));
742 assert!(matches!(result, Err(AgentRuntimeError::CircuitOpen { .. })));
743 }
744
745 #[test]
746 fn test_circuit_breaker_success_resets_failure_count() {
747 let cb = CircuitBreaker::new("svc", 5, Duration::from_secs(60)).unwrap();
748 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
749 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
750 let _: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(1));
751 assert_eq!(cb.failure_count().unwrap(), 0);
752 }
753
754 #[test]
755 fn test_circuit_breaker_half_open_on_recovery() {
756 let cb = CircuitBreaker::new("svc", 1, Duration::ZERO).unwrap();
758 let _: Result<(), AgentRuntimeError> = cb.call(|| Err::<(), _>("fail".to_string()));
759 let result: Result<i32, AgentRuntimeError> = cb.call(|| Ok::<i32, AgentRuntimeError>(99));
761 assert_eq!(result.unwrap_or(0), 99);
762 assert_eq!(cb.state().unwrap(), CircuitState::Closed);
763 }
764
765 #[test]
766 fn test_circuit_breaker_with_custom_backend_uses_backend_state() {
767 let shared_backend: Arc<dyn CircuitBreakerBackend> =
770 Arc::new(InMemoryCircuitBreakerBackend::new());
771
772 let cb1 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
773 .unwrap()
774 .with_backend(Arc::clone(&shared_backend));
775
776 let cb2 = CircuitBreaker::new("svc", 2, Duration::from_secs(60))
777 .unwrap()
778 .with_backend(Arc::clone(&shared_backend));
779
780 let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail".to_string()));
782
783 assert_eq!(cb2.failure_count().unwrap(), 1);
785
786 let _: Result<(), AgentRuntimeError> = cb1.call(|| Err::<(), _>("fail again".to_string()));
788
789 assert!(matches!(cb2.state().unwrap(), CircuitState::Open { .. }));
791 }
792
793 #[test]
794 fn test_in_memory_backend_increments_and_resets() {
795 let backend = InMemoryCircuitBreakerBackend::new();
796
797 assert_eq!(backend.get_failures("svc"), 0);
798
799 let count = backend.increment_failures("svc");
800 assert_eq!(count, 1);
801
802 let count = backend.increment_failures("svc");
803 assert_eq!(count, 2);
804
805 backend.reset_failures("svc");
806 assert_eq!(backend.get_failures("svc"), 0);
807
808 assert!(backend.get_open_at("svc").is_none());
810 let now = Instant::now();
811 backend.set_open_at("svc", now);
812 assert!(backend.get_open_at("svc").is_some());
813 backend.clear_open_at("svc");
814 assert!(backend.get_open_at("svc").is_none());
815 }
816
817 #[test]
820 fn test_deduplicator_new_key_is_new() {
821 let d = Deduplicator::new(Duration::from_secs(60));
822 let r = d.check_and_register("key-1").unwrap();
823 assert_eq!(r, DeduplicationResult::New);
824 }
825
826 #[test]
827 fn test_deduplicator_second_check_is_in_progress() {
828 let d = Deduplicator::new(Duration::from_secs(60));
829 d.check_and_register("key-1").unwrap();
830 let r = d.check_and_register("key-1").unwrap();
831 assert_eq!(r, DeduplicationResult::InProgress);
832 }
833
834 #[test]
835 fn test_deduplicator_complete_makes_cached() {
836 let d = Deduplicator::new(Duration::from_secs(60));
837 d.check_and_register("key-1").unwrap();
838 d.complete("key-1", "result-value").unwrap();
839 let r = d.check_and_register("key-1").unwrap();
840 assert_eq!(r, DeduplicationResult::Cached("result-value".into()));
841 }
842
843 #[test]
844 fn test_deduplicator_different_keys_are_independent() {
845 let d = Deduplicator::new(Duration::from_secs(60));
846 d.check_and_register("key-a").unwrap();
847 let r = d.check_and_register("key-b").unwrap();
848 assert_eq!(r, DeduplicationResult::New);
849 }
850
851 #[test]
852 fn test_deduplicator_expired_entry_is_new() {
853 let d = Deduplicator::new(Duration::ZERO); d.check_and_register("key-1").unwrap();
855 d.complete("key-1", "old").unwrap();
856 let r = d.check_and_register("key-1").unwrap();
858 assert_eq!(r, DeduplicationResult::New);
859 }
860
861 #[test]
864 fn test_backpressure_guard_rejects_zero_capacity() {
865 assert!(BackpressureGuard::new(0).is_err());
866 }
867
868 #[test]
869 fn test_backpressure_guard_acquire_within_capacity() {
870 let g = BackpressureGuard::new(5).unwrap();
871 assert!(g.try_acquire().is_ok());
872 assert_eq!(g.depth().unwrap(), 1);
873 }
874
875 #[test]
876 fn test_backpressure_guard_sheds_when_full() {
877 let g = BackpressureGuard::new(2).unwrap();
878 g.try_acquire().unwrap();
879 g.try_acquire().unwrap();
880 let result = g.try_acquire();
881 assert!(matches!(
882 result,
883 Err(AgentRuntimeError::BackpressureShed { .. })
884 ));
885 }
886
887 #[test]
888 fn test_backpressure_guard_release_decrements_depth() {
889 let g = BackpressureGuard::new(3).unwrap();
890 g.try_acquire().unwrap();
891 g.try_acquire().unwrap();
892 g.release().unwrap();
893 assert_eq!(g.depth().unwrap(), 1);
894 }
895
896 #[test]
897 fn test_backpressure_guard_release_on_empty_is_noop() {
898 let g = BackpressureGuard::new(3).unwrap();
899 g.release().unwrap(); assert_eq!(g.depth().unwrap(), 0);
901 }
902
903 #[test]
906 fn test_pipeline_runs_stages_in_order() {
907 let p = Pipeline::new()
908 .add_stage("upper", |s| Ok(s.to_uppercase()))
909 .add_stage("append", |s| Ok(format!("{s}!")));
910 let result = p.run("hello".into()).unwrap();
911 assert_eq!(result, "HELLO!");
912 }
913
914 #[test]
915 fn test_pipeline_empty_pipeline_returns_input() {
916 let p = Pipeline::new();
917 assert_eq!(p.run("test".into()).unwrap(), "test");
918 }
919
920 #[test]
921 fn test_pipeline_stage_failure_short_circuits() {
922 let p = Pipeline::new()
923 .add_stage("fail", |_| {
924 Err(AgentRuntimeError::Orchestration("boom".into()))
925 })
926 .add_stage("never", |s| Ok(s));
927 assert!(p.run("input".into()).is_err());
928 }
929
930 #[test]
931 fn test_pipeline_stage_count() {
932 let p = Pipeline::new()
933 .add_stage("s1", |s| Ok(s))
934 .add_stage("s2", |s| Ok(s));
935 assert_eq!(p.stage_count(), 2);
936 }
937
938 #[test]
941 fn test_backpressure_soft_limit_rejects_invalid_config() {
942 let g = BackpressureGuard::new(5).unwrap();
944 assert!(g.with_soft_limit(5).is_err());
945 let g = BackpressureGuard::new(5).unwrap();
946 assert!(g.with_soft_limit(6).is_err());
947 }
948
949 #[test]
950 fn test_backpressure_soft_limit_accepts_requests_below_soft() {
951 let g = BackpressureGuard::new(5)
952 .unwrap()
953 .with_soft_limit(2)
954 .unwrap();
955 assert!(g.try_acquire().is_ok());
957 assert!(g.try_acquire().is_ok());
958 assert_eq!(g.depth().unwrap(), 2);
959 }
960
961 #[test]
962 fn test_backpressure_with_soft_limit_still_sheds_at_hard_capacity() {
963 let g = BackpressureGuard::new(3)
964 .unwrap()
965 .with_soft_limit(2)
966 .unwrap();
967 g.try_acquire().unwrap();
968 g.try_acquire().unwrap();
969 g.try_acquire().unwrap(); let result = g.try_acquire();
971 assert!(matches!(
972 result,
973 Err(AgentRuntimeError::BackpressureShed { .. })
974 ));
975 }
976
977 #[test]
980 fn test_backpressure_concurrent_acquires_are_consistent() {
981 use std::sync::Arc;
982 use std::thread;
983
984 let g = Arc::new(BackpressureGuard::new(100).unwrap());
985 let mut handles = Vec::new();
986
987 for _ in 0..10 {
988 let g_clone = Arc::clone(&g);
989 handles.push(thread::spawn(move || {
990 g_clone.try_acquire().ok();
991 }));
992 }
993
994 for h in handles {
995 h.join().unwrap();
996 }
997
998 assert_eq!(g.depth().unwrap(), 10);
1000 }
1001}