1#[cfg(feature = "resilience")]
15use std::collections::HashMap;
16#[cfg(feature = "resilience")]
17use std::sync::Arc;
18#[cfg(feature = "resilience")]
19use std::time::Duration;
20
21use crate::domain::resilience::{ResilienceDomainError, ResiliencePolicy, ResilientOperation};
22#[cfg(feature = "resilience")]
23use crate::domain::resilience::BackoffStrategy;
24#[cfg(feature = "resilience")]
25use crate::resilience::{
26 CircuitBreaker, CircuitBreakerConfig, CircuitBreakerError, RateLimiter, RetryConfig,
27};
28#[cfg(feature = "resilience")]
29use dashmap::DashMap;
30
31#[async_trait::async_trait]
35pub trait ResilienceOrchestrator: Send + Sync {
36 async fn execute_with_policy<T, F, Fut, E>(
38 &self,
39 policy: ResiliencePolicy,
40 operation: F,
41 ) -> Result<T, ResilienceOrchestrationError>
42 where
43 F: FnMut() -> Fut + Send,
44 Fut: std::future::Future<Output = Result<T, E>> + Send,
45 E: Into<ResilienceOrchestrationError> + Send;
46
47 async fn execute_operation<T, E, Op>(
50 &self,
51 operation: Op,
52 ) -> Result<T, ResilienceOrchestrationError>
53 where
54 Op: ResilientOperation<T, E> + Send + Sync,
55 E: Into<ResilienceOrchestrationError> + Send,
56 {
57 let policy = operation.resilience_policy();
58 self.execute_with_policy(policy, || operation.execute())
59 .await
60 }
61
62 fn get_circuit_breaker(&self, name: &str) -> Option<&CircuitBreaker>;
64
65 fn get_rate_limiter(&self, name: &str) -> Option<&RateLimiter>;
67
68 fn metrics(&self) -> ResilienceMetrics;
70}
71
72#[derive(thiserror::Error, Debug, PartialEq)]
74pub enum ResilienceOrchestrationError {
75 #[error("Domain error: {0}")]
76 Domain(#[from] ResilienceDomainError),
77
78 #[error("Infrastructure error: {0}")]
79 Infrastructure(String),
80
81 #[error("Configuration error: {0}")]
82 Configuration(String),
83
84 #[error("Operation cancelled")]
85 Cancelled,
86}
87
88#[cfg(not(feature = "resilience"))]
90pub struct CircuitBreaker;
91
92#[cfg(not(feature = "resilience"))]
93pub struct RateLimiter;
94
95#[derive(Clone, Debug, Default)]
97pub struct ResilienceMetrics {
98 pub total_operations: u64,
99 pub successful_operations: u64,
100 pub failed_operations: u64,
101 pub retry_attempts: u64,
102 pub circuit_breaker_trips: u64,
103 pub rate_limit_hits: u64,
104 pub timeout_count: u64,
105}
106
107#[cfg(feature = "resilience")]
109pub struct DefaultResilienceOrchestrator {
110 circuit_breakers: HashMap<String, CircuitBreaker>,
111 rate_limiters: HashMap<String, RateLimiter>,
112 dynamic_circuit_breakers: DashMap<String, Arc<CircuitBreaker>>,
113 dynamic_rate_limiters: DashMap<String, Arc<RateLimiter>>,
114 metrics: parking_lot::Mutex<ResilienceMetrics>,
115}
116
117#[cfg(feature = "resilience")]
118impl DefaultResilienceOrchestrator {
119 pub fn new() -> Self {
121 Self {
122 circuit_breakers: HashMap::new(),
123 rate_limiters: HashMap::new(),
124 dynamic_circuit_breakers: DashMap::new(),
125 dynamic_rate_limiters: DashMap::new(),
126 metrics: parking_lot::Mutex::new(ResilienceMetrics::default()),
127 }
128 }
129
130 pub fn with_components(
132 circuit_breakers: HashMap<String, CircuitBreaker>,
133 rate_limiters: HashMap<String, RateLimiter>,
134 ) -> Self {
135 Self {
136 circuit_breakers,
137 rate_limiters,
138 dynamic_circuit_breakers: DashMap::new(),
139 dynamic_rate_limiters: DashMap::new(),
140 metrics: parking_lot::Mutex::new(ResilienceMetrics::default()),
141 }
142 }
143
144 pub fn register_circuit_breaker(&mut self, name: String, circuit_breaker: CircuitBreaker) {
146 self.circuit_breakers.insert(name, circuit_breaker);
147 }
148
149 pub fn register_rate_limiter(&mut self, name: String, rate_limiter: RateLimiter) {
151 self.rate_limiters.insert(name, rate_limiter);
152 }
153
154 fn record_success(&self) {
156 let mut metrics = self.metrics.lock();
157 metrics.total_operations += 1;
158 metrics.successful_operations += 1;
159 }
160
161 fn record_failure(&self, error: &ResilienceOrchestrationError) {
163 let mut metrics = self.metrics.lock();
164 metrics.total_operations += 1;
165 metrics.failed_operations += 1;
166
167 match error {
168 ResilienceOrchestrationError::Domain(ResilienceDomainError::RetryExhausted {
169 ..
170 }) => {
171 }
173 ResilienceOrchestrationError::Domain(ResilienceDomainError::CircuitOpen) => {
174 metrics.circuit_breaker_trips += 1;
175 }
176 ResilienceOrchestrationError::Domain(ResilienceDomainError::RateLimited { .. }) => {
177 metrics.rate_limit_hits += 1;
178 }
179 ResilienceOrchestrationError::Domain(ResilienceDomainError::Timeout { .. }) => {
180 metrics.timeout_count += 1;
181 }
182 _ => {}
183 }
184 }
185
186 fn record_retry(&self) {
188 let mut metrics = self.metrics.lock();
189 metrics.retry_attempts += 1;
190 }
191
192 fn get_or_create_circuit_breaker(
194 &self,
195 failure_threshold: u32,
196 recovery_timeout: Duration,
197 success_threshold: u32,
198 ) -> Arc<CircuitBreaker> {
199 let key = format!(
200 "cb_{}_{}_{}", failure_threshold, recovery_timeout.as_millis(), success_threshold
201 );
202 self.dynamic_circuit_breakers
203 .entry(key)
204 .or_insert_with(|| {
205 let config = CircuitBreakerConfig::new(failure_threshold)
206 .with_timeout(recovery_timeout)
207 .with_success_threshold(success_threshold);
208 Arc::new(CircuitBreaker::new("policy", config))
209 })
210 .clone()
211 }
212
213 fn get_or_create_rate_limiter(&self, rps: u32, burst: u32) -> Arc<RateLimiter> {
215 let key = format!("rl_{}_{}", rps, burst);
216 self.dynamic_rate_limiters
217 .entry(key)
218 .or_insert_with(|| Arc::new(RateLimiter::new(rps, burst)))
219 .clone()
220 }
221
222 fn build_retry_config(max_attempts: u32, backoff: &BackoffStrategy) -> RetryConfig {
226 let max_retries = max_attempts.saturating_sub(1);
227 match backoff {
228 BackoffStrategy::Fixed { delay } => RetryConfig::new(max_retries)
229 .with_initial_interval(*delay)
230 .with_multiplier(1.0)
231 .with_randomization_factor(0.0),
232 BackoffStrategy::Exponential {
233 initial_delay,
234 multiplier,
235 max_delay,
236 jitter,
237 } => {
238 let mut config = RetryConfig::new(max_retries)
239 .with_initial_interval(*initial_delay)
240 .with_multiplier(*multiplier);
241
242 if let Some(max) = max_delay {
243 config = config.with_max_interval(*max);
244 }
245
246 if *jitter {
247 config = config.with_randomization_factor(0.5);
248 } else {
249 config = config.with_randomization_factor(0.0);
250 }
251
252 config
253 }
254 BackoffStrategy::Linear {
255 initial_delay,
256 increment: _,
257 max_delay,
258 } => {
259 let mut config = RetryConfig::new(max_retries)
260 .with_initial_interval(*initial_delay)
261 .with_multiplier(1.0);
262
263 if let Some(max) = max_delay {
264 config = config.with_max_interval(*max);
265 }
266
267 config
268 }
269 }
270 }
271}
272
273#[cfg(feature = "resilience")]
274#[async_trait::async_trait]
275impl ResilienceOrchestrator for DefaultResilienceOrchestrator {
276 async fn execute_with_policy<T, F, Fut, E>(
277 &self,
278 policy: ResiliencePolicy,
279 mut operation: F,
280 ) -> Result<T, ResilienceOrchestrationError>
281 where
282 F: FnMut() -> Fut + Send,
283 Fut: std::future::Future<Output = Result<T, E>> + Send,
284 E: Into<ResilienceOrchestrationError> + Send,
285 {
286 match policy {
287 ResiliencePolicy::None => {
288 let result = operation().await;
289 match result {
290 Ok(value) => {
291 self.record_success();
292 Ok(value)
293 }
294 Err(error) => {
295 let orch_error = error.into();
296 self.record_failure(&orch_error);
297 Err(orch_error)
298 }
299 }
300 }
301
302 ResiliencePolicy::Retry {
303 max_attempts,
304 backoff,
305 } => {
306 let retry_config = Self::build_retry_config(max_attempts, &backoff);
307
308 let mut attempts = 0u32;
311 loop {
312 attempts += 1;
313 match operation().await {
314 Ok(value) => {
315 self.record_success();
316 return Ok(value);
317 }
318 Err(error) => {
319 let msg = format!("{}", error.into());
320
321 if attempts > retry_config.max_retries {
322 let final_error = ResilienceOrchestrationError::Domain(
323 ResilienceDomainError::RetryExhausted {
324 attempts,
325 last_error: msg,
326 },
327 );
328 self.record_failure(&final_error);
329 return Err(final_error);
330 }
331
332 self.record_retry();
333 }
335 }
336 let interval = retry_config.calculate_interval(attempts - 1);
337 tokio::time::sleep(interval).await;
338 }
339 }
340
341 ResiliencePolicy::CircuitBreaker {
342 failure_threshold,
343 recovery_timeout,
344 success_threshold,
345 } => {
346 let cb = self.get_or_create_circuit_breaker(
347 failure_threshold,
348 recovery_timeout,
349 success_threshold,
350 );
351
352 match cb.call(operation).await {
353 Ok(value) => {
354 self.record_success();
355 Ok(value)
356 }
357 Err(CircuitBreakerError::CircuitOpen(_)) => {
358 let orch_error = ResilienceOrchestrationError::Domain(
359 ResilienceDomainError::CircuitOpen,
360 );
361 self.record_failure(&orch_error);
362 Err(orch_error)
363 }
364 Err(CircuitBreakerError::Inner(error)) => {
365 let orch_error = error.into();
366 self.record_failure(&orch_error);
367 Err(orch_error)
368 }
369 }
370 }
371
372 ResiliencePolicy::RateLimit {
373 requests_per_second,
374 burst_capacity,
375 } => {
376 let limiter = self.get_or_create_rate_limiter(requests_per_second, burst_capacity);
377
378 if limiter.check().is_ok() {
379 let result = operation().await;
380 match result {
381 Ok(value) => {
382 self.record_success();
383 Ok(value)
384 }
385 Err(error) => {
386 let orch_error = error.into();
387 self.record_failure(&orch_error);
388 Err(orch_error)
389 }
390 }
391 } else {
392 let orch_error =
393 ResilienceOrchestrationError::Domain(ResilienceDomainError::RateLimited {
394 retry_after: None,
395 });
396 self.record_failure(&orch_error);
397 Err(orch_error)
398 }
399 }
400
401 ResiliencePolicy::Timeout { duration } => {
402 let result = tokio::time::timeout(duration, operation()).await;
403 match result {
404 Ok(Ok(value)) => {
405 self.record_success();
406 Ok(value)
407 }
408 Ok(Err(error)) => {
409 let orch_error = error.into();
410 self.record_failure(&orch_error);
411 Err(orch_error)
412 }
413 Err(_elapsed) => {
414 let orch_error = ResilienceOrchestrationError::Domain(
415 ResilienceDomainError::Timeout { duration },
416 );
417 self.record_failure(&orch_error);
418 Err(orch_error)
419 }
420 }
421 }
422
423 ResiliencePolicy::Combined { policies } => {
424 if policies.is_empty() {
425 return self
426 .execute_with_policy(ResiliencePolicy::None, operation)
427 .await;
428 }
429
430 let mut execution_policy = None;
434
435 for policy in policies {
436 match policy {
437 ResiliencePolicy::RateLimit {
438 requests_per_second,
439 burst_capacity,
440 } => {
441 let limiter = self.get_or_create_rate_limiter(
442 requests_per_second,
443 burst_capacity,
444 );
445 if limiter.check().is_err() {
446 let e = ResilienceOrchestrationError::Domain(
447 ResilienceDomainError::RateLimited { retry_after: None },
448 );
449 self.record_failure(&e);
450 return Err(e);
451 }
452 }
453 ResiliencePolicy::CircuitBreaker {
454 failure_threshold,
455 recovery_timeout,
456 success_threshold,
457 } => {
458 let cb = self.get_or_create_circuit_breaker(
459 failure_threshold,
460 recovery_timeout,
461 success_threshold,
462 );
463 if cb.check().is_err() {
464 let e = ResilienceOrchestrationError::Domain(
465 ResilienceDomainError::CircuitOpen,
466 );
467 self.record_failure(&e);
468 return Err(e);
469 }
470 }
471 p @ (ResiliencePolicy::Retry { .. }
472 | ResiliencePolicy::Timeout { .. }) => {
473 execution_policy = Some(p);
474 }
475 ResiliencePolicy::None => {}
476 ResiliencePolicy::Combined { .. } => {
477 return Err(ResilienceOrchestrationError::Configuration(
478 "Nested Combined policies are not supported".to_string(),
479 ));
480 }
481 }
482 }
483
484 self.execute_with_policy(
486 execution_policy.unwrap_or(ResiliencePolicy::None),
487 operation,
488 )
489 .await
490 }
491 }
492 }
493
494 fn get_circuit_breaker(&self, name: &str) -> Option<&CircuitBreaker> {
495 self.circuit_breakers.get(name)
496 }
497
498 fn get_rate_limiter(&self, name: &str) -> Option<&RateLimiter> {
499 self.rate_limiters.get(name)
500 }
501
502 fn metrics(&self) -> ResilienceMetrics {
503 self.metrics.lock().clone()
504 }
505}
506
507#[cfg(feature = "resilience")]
508impl Default for DefaultResilienceOrchestrator {
509 fn default() -> Self {
510 Self::new()
511 }
512}
513
514#[cfg(all(test, feature = "resilience"))]
515mod tests {
516 use std::sync::atomic::{AtomicU32, Ordering};
517 use std::sync::Arc;
518 use std::time::Duration;
519
520 use super::*;
521 use crate::domain::resilience::policies;
522
523 #[tokio::test]
524 async fn test_no_resilience_policy() {
525 let orchestrator = DefaultResilienceOrchestrator::new();
526
527 let result = orchestrator
528 .execute_with_policy(ResiliencePolicy::None, || async {
529 Ok::<_, ResilienceOrchestrationError>(42)
530 })
531 .await;
532
533 assert_eq!(result, Ok(42));
534 let metrics = orchestrator.metrics();
535 assert_eq!(metrics.total_operations, 1);
536 assert_eq!(metrics.successful_operations, 1);
537 }
538
539 #[tokio::test]
540 async fn test_retry_policy_success() {
541 let orchestrator = DefaultResilienceOrchestrator::new();
542 let attempts = Arc::new(AtomicU32::new(0));
543 let attempts_clone = attempts.clone();
544
545 let result = orchestrator
546 .execute_with_policy(policies::retry(3), move || {
547 let attempts = attempts_clone.clone();
548 async move {
549 let count = attempts.fetch_add(1, Ordering::SeqCst) + 1;
550 if count < 2 {
551 Err(ResilienceOrchestrationError::Domain(
552 ResilienceDomainError::Infrastructure {
553 message: "Temporary failure".to_string(),
554 },
555 ))
556 } else {
557 Ok(42)
558 }
559 }
560 })
561 .await;
562
563 assert_eq!(result, Ok(42));
564 assert_eq!(attempts.load(Ordering::SeqCst), 2);
565 }
566
567 #[tokio::test]
568 async fn test_circuit_breaker_policy() {
569 let orchestrator = DefaultResilienceOrchestrator::new();
570
571 let result1 = orchestrator
572 .execute_with_policy(
573 ResiliencePolicy::CircuitBreaker {
574 failure_threshold: 2,
575 recovery_timeout: Duration::from_secs(1),
576 success_threshold: 1,
577 },
578 || async { Ok::<_, ResilienceOrchestrationError>(42) },
579 )
580 .await;
581 assert_eq!(result1, Ok(42));
582 }
583
584 #[tokio::test]
585 async fn test_circuit_breaker_trips_after_failures() {
586 let orchestrator = DefaultResilienceOrchestrator::new();
587 let policy = ResiliencePolicy::CircuitBreaker {
588 failure_threshold: 2,
589 recovery_timeout: Duration::from_secs(60),
590 success_threshold: 1,
591 };
592
593 for _ in 0..2 {
595 let _ = orchestrator
596 .execute_with_policy(policy.clone(), || async {
597 Err::<i32, _>(ResilienceOrchestrationError::Infrastructure(
598 "fail".to_string(),
599 ))
600 })
601 .await;
602 }
603
604 let call_count = Arc::new(AtomicU32::new(0));
606 let call_count_clone = call_count.clone();
607 let result = orchestrator
608 .execute_with_policy(policy.clone(), move || {
609 let cc = call_count_clone.clone();
610 async move {
611 cc.fetch_add(1, Ordering::SeqCst);
612 Ok::<_, ResilienceOrchestrationError>(42)
613 }
614 })
615 .await;
616
617 assert!(matches!(
618 result,
619 Err(ResilienceOrchestrationError::Domain(
620 ResilienceDomainError::CircuitOpen
621 ))
622 ));
623 assert_eq!(call_count.load(Ordering::SeqCst), 0);
625
626 let metrics = orchestrator.metrics();
627 assert!(metrics.circuit_breaker_trips > 0);
628 }
629
630 #[tokio::test]
631 async fn test_circuit_breaker_inner_error_preserved() {
632 let orchestrator = DefaultResilienceOrchestrator::new();
633
634 let result = orchestrator
635 .execute_with_policy(
636 ResiliencePolicy::CircuitBreaker {
637 failure_threshold: 5,
638 recovery_timeout: Duration::from_secs(60),
639 success_threshold: 1,
640 },
641 || async {
642 Err::<i32, _>(ResilienceOrchestrationError::Infrastructure(
643 "db connection failed".to_string(),
644 ))
645 },
646 )
647 .await;
648
649 assert!(matches!(
651 result,
652 Err(ResilienceOrchestrationError::Infrastructure(ref msg)) if msg == "db connection failed"
653 ));
654 }
655
656 #[tokio::test]
657 async fn test_rate_limit_policy() {
658 let orchestrator = DefaultResilienceOrchestrator::new();
659
660 let result1 = orchestrator
662 .execute_with_policy(
663 ResiliencePolicy::RateLimit {
664 requests_per_second: 1,
665 burst_capacity: 1,
666 },
667 || async { Ok::<_, ResilienceOrchestrationError>(42) },
668 )
669 .await;
670 assert_eq!(result1, Ok(42));
671 }
672
673 #[tokio::test]
674 async fn test_rate_limit_persists_across_calls() {
675 let orchestrator = DefaultResilienceOrchestrator::new();
676 let policy = ResiliencePolicy::RateLimit {
677 requests_per_second: 1,
678 burst_capacity: 1,
679 };
680
681 let result1 = orchestrator
683 .execute_with_policy(policy.clone(), || async {
684 Ok::<_, ResilienceOrchestrationError>(1)
685 })
686 .await;
687 assert!(result1.is_ok());
688
689 let result2 = orchestrator
691 .execute_with_policy(policy, || async {
692 Ok::<_, ResilienceOrchestrationError>(2)
693 })
694 .await;
695 assert!(matches!(
696 result2,
697 Err(ResilienceOrchestrationError::Domain(
698 ResilienceDomainError::RateLimited { .. }
699 ))
700 ));
701
702 let metrics = orchestrator.metrics();
703 assert!(metrics.rate_limit_hits > 0);
704 }
705
706 #[tokio::test]
707 async fn test_combined_rate_limit_and_retry() {
708 let orchestrator = DefaultResilienceOrchestrator::new();
709 let attempts = Arc::new(AtomicU32::new(0));
710 let attempts_clone = attempts.clone();
711
712 let policy = ResiliencePolicy::Combined {
713 policies: vec![
714 ResiliencePolicy::RateLimit {
715 requests_per_second: 100,
716 burst_capacity: 10,
717 },
718 policies::retry(3),
719 ],
720 };
721
722 let result = orchestrator
723 .execute_with_policy(policy, move || {
724 let attempts = attempts_clone.clone();
725 async move {
726 let count = attempts.fetch_add(1, Ordering::SeqCst) + 1;
727 if count < 2 {
728 Err(ResilienceOrchestrationError::Infrastructure(
729 "temporary".to_string(),
730 ))
731 } else {
732 Ok(42)
733 }
734 }
735 })
736 .await;
737
738 assert_eq!(result, Ok(42));
739 assert_eq!(attempts.load(Ordering::SeqCst), 2);
740 }
741
742 #[tokio::test]
743 async fn test_combined_empty_policies() {
744 let orchestrator = DefaultResilienceOrchestrator::new();
745
746 let result = orchestrator
747 .execute_with_policy(
748 ResiliencePolicy::Combined {
749 policies: vec![],
750 },
751 || async { Ok::<_, ResilienceOrchestrationError>(42) },
752 )
753 .await;
754
755 assert_eq!(result, Ok(42));
756 }
757
758 #[test]
759 fn test_metrics_tracking() {
760 let orchestrator = DefaultResilienceOrchestrator::new();
761 let metrics = orchestrator.metrics();
762 assert_eq!(metrics.total_operations, 0);
763 assert_eq!(metrics.successful_operations, 0);
764 assert_eq!(metrics.failed_operations, 0);
765 }
766}
767
768#[cfg(not(feature = "resilience"))]
770#[derive(Default)]
771pub struct DefaultResilienceOrchestrator;
772
773#[cfg(not(feature = "resilience"))]
774impl DefaultResilienceOrchestrator {
775 pub fn new() -> Self {
776 Self
777 }
778}
779
780#[cfg(not(feature = "resilience"))]
781#[async_trait::async_trait]
782impl ResilienceOrchestrator for DefaultResilienceOrchestrator {
783 async fn execute_with_policy<T, F, Fut, E>(
784 &self,
785 _policy: ResiliencePolicy,
786 mut operation: F,
787 ) -> Result<T, ResilienceOrchestrationError>
788 where
789 F: FnMut() -> Fut + Send,
790 Fut: std::future::Future<Output = Result<T, E>> + Send,
791 E: Into<ResilienceOrchestrationError> + Send,
792 {
793 let result = operation().await;
794 match result {
795 Ok(value) => Ok(value),
796 Err(error) => Err(error.into()),
797 }
798 }
799
800 fn get_circuit_breaker(&self, _name: &str) -> Option<&CircuitBreaker> {
801 None
802 }
803
804 fn get_rate_limiter(&self, _name: &str) -> Option<&RateLimiter> {
805 None
806 }
807
808 fn metrics(&self) -> ResilienceMetrics {
809 ResilienceMetrics::default()
810 }
811}