1mod config;
141mod error;
142mod events;
143mod layer;
144
145pub use config::{FallbackConfig, FallbackConfigBuilder};
146pub use error::FallbackError;
147pub use events::FallbackEvent;
148pub use layer::FallbackLayer;
149
150use futures::future::BoxFuture;
151use std::sync::Arc;
152use std::task::{Context, Poll};
153use std::time::Instant;
154use tower::Service;
155
156#[cfg(feature = "metrics")]
157use metrics::{counter, describe_counter};
158
159#[cfg(feature = "metrics")]
160use std::sync::Once;
161
162#[cfg(feature = "metrics")]
163static METRICS_INIT: Once = Once::new();
164
165pub type ValueFn<Res> = Arc<dyn Fn() -> Res + Send + Sync>;
167
168pub type FromErrorFn<Res, E> = Arc<dyn Fn(&E) -> Res + Send + Sync>;
170
171pub type FromRequestErrorFn<Req, Res, E> = Arc<dyn Fn(&Req, &E) -> Res + Send + Sync>;
173
174pub type ServiceFn<Req, Res, E> =
176 Arc<dyn Fn(Req) -> BoxFuture<'static, Result<Res, E>> + Send + Sync>;
177
178pub type ExceptionFn<E> = Arc<dyn Fn(E) -> E + Send + Sync>;
180
181pub enum FallbackStrategy<Req, Res, E> {
183 Value(Res),
185
186 ValueFn(ValueFn<Res>),
188
189 FromError(FromErrorFn<Res, E>),
191
192 FromRequestError(FromRequestErrorFn<Req, Res, E>),
194
195 Service(ServiceFn<Req, Res, E>),
198
199 Exception(ExceptionFn<E>),
201}
202
203impl<Req, Res, E> Clone for FallbackStrategy<Req, Res, E>
204where
205 Res: Clone,
206{
207 fn clone(&self) -> Self {
208 match self {
209 Self::Value(v) => Self::Value(v.clone()),
210 Self::ValueFn(f) => Self::ValueFn(Arc::clone(f)),
211 Self::FromError(f) => Self::FromError(Arc::clone(f)),
212 Self::FromRequestError(f) => Self::FromRequestError(Arc::clone(f)),
213 Self::Service(s) => Self::Service(Arc::clone(s)),
214 Self::Exception(f) => Self::Exception(Arc::clone(f)),
215 }
216 }
217}
218
219pub type HandlePredicate<E> = Arc<dyn Fn(&E) -> bool + Send + Sync>;
221
222pub type HandleResponsePredicate<Res> = Arc<dyn Fn(&Res) -> bool + Send + Sync>;
227
228pub struct Fallback<S, Req, Res, E> {
232 inner: S,
233 config: Arc<FallbackConfig<Req, Res, E>>,
234}
235
236impl<S, Req, Res, E> Fallback<S, Req, Res, E> {
237 pub fn new(inner: S, config: Arc<FallbackConfig<Req, Res, E>>) -> Self {
239 #[cfg(feature = "metrics")]
240 METRICS_INIT.call_once(|| {
241 describe_counter!(
242 "fallback_calls_total",
243 "Total number of fallback operations"
244 );
245 });
246
247 Self { inner, config }
248 }
249}
250
251impl<S, Req, Res, E> Clone for Fallback<S, Req, Res, E>
252where
253 S: Clone,
254 Res: Clone,
255{
256 fn clone(&self) -> Self {
257 Self {
258 inner: self.inner.clone(),
259 config: Arc::clone(&self.config),
260 }
261 }
262}
263
264impl<S, Req, Res, E> Service<Req> for Fallback<S, Req, Res, E>
265where
266 S: Service<Req, Response = Res, Error = E> + Clone + Send + 'static,
267 S::Future: Send + 'static,
268 Req: Clone + Send + Sync + 'static,
269 Res: Clone + Send + Sync + 'static,
270 E: Clone + Send + Sync + 'static,
271{
272 type Response = Res;
273 type Error = FallbackError<E>;
274 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
275
276 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277 self.inner.poll_ready(cx).map_err(FallbackError::Inner)
278 }
279
280 fn call(&mut self, req: Req) -> Self::Future {
281 let mut service = self.inner.clone();
282 let config = Arc::clone(&self.config);
283 let req_clone = req.clone();
284
285 Box::pin(async move {
286 #[cfg(feature = "tracing")]
287 tracing::debug!(fallback = %config.name, "Calling inner service");
288
289 let result = service.call(req).await;
290
291 match result {
292 Ok(response) => {
293 let should_handle_response = config
295 .handle_response_predicate
296 .as_ref()
297 .map(|p| p(&response))
298 .unwrap_or(false);
299
300 if should_handle_response {
301 #[cfg(feature = "tracing")]
302 tracing::debug!(
303 fallback = %config.name,
304 "Response matches predicate, applying fallback"
305 );
306
307 let event = FallbackEvent::FailedAttempt {
309 pattern_name: config.name.clone(),
310 timestamp: Instant::now(),
311 };
312 config.event_listeners.emit(&event);
313
314 match &config.strategy {
316 FallbackStrategy::Value(v) => {
317 #[cfg(feature = "metrics")]
318 counter!(
319 "fallback_calls_total",
320 "fallback" => config.name.clone(),
321 "result" => "applied",
322 "strategy" => "value"
323 )
324 .increment(1);
325
326 let event = FallbackEvent::Applied {
327 pattern_name: config.name.clone(),
328 timestamp: Instant::now(),
329 strategy: "value",
330 };
331 config.event_listeners.emit(&event);
332
333 return Ok(v.clone());
334 }
335
336 FallbackStrategy::ValueFn(f) => {
337 let fallback_response = f();
338
339 #[cfg(feature = "metrics")]
340 counter!(
341 "fallback_calls_total",
342 "fallback" => config.name.clone(),
343 "result" => "applied",
344 "strategy" => "value_fn"
345 )
346 .increment(1);
347
348 let event = FallbackEvent::Applied {
349 pattern_name: config.name.clone(),
350 timestamp: Instant::now(),
351 strategy: "value_fn",
352 };
353 config.event_listeners.emit(&event);
354
355 return Ok(fallback_response);
356 }
357
358 FallbackStrategy::Service(backup) => {
359 #[cfg(feature = "tracing")]
360 tracing::debug!(fallback = %config.name, "Calling backup service (response predicate)");
361
362 match backup(req_clone).await {
363 Ok(backup_response) => {
364 #[cfg(feature = "metrics")]
365 counter!(
366 "fallback_calls_total",
367 "fallback" => config.name.clone(),
368 "result" => "applied",
369 "strategy" => "service"
370 )
371 .increment(1);
372
373 let event = FallbackEvent::Applied {
374 pattern_name: config.name.clone(),
375 timestamp: Instant::now(),
376 strategy: "service",
377 };
378 config.event_listeners.emit(&event);
379
380 return Ok(backup_response);
381 }
382 Err(backup_error) => {
383 #[cfg(feature = "tracing")]
384 tracing::warn!(
385 fallback = %config.name,
386 "Backup service failed (response predicate)"
387 );
388
389 #[cfg(feature = "metrics")]
390 counter!(
391 "fallback_calls_total",
392 "fallback" => config.name.clone(),
393 "result" => "failed",
394 "strategy" => "service"
395 )
396 .increment(1);
397
398 let event = FallbackEvent::Failed {
399 pattern_name: config.name.clone(),
400 timestamp: Instant::now(),
401 };
402 config.event_listeners.emit(&event);
403
404 return Err(FallbackError::FallbackFailed(backup_error));
405 }
406 }
407 }
408
409 _ => {
412 return Ok(response);
413 }
414 }
415 }
416
417 #[cfg(feature = "tracing")]
418 tracing::debug!(fallback = %config.name, "Inner service succeeded");
419
420 #[cfg(feature = "metrics")]
421 counter!(
422 "fallback_calls_total",
423 "fallback" => config.name.clone(),
424 "result" => "success"
425 )
426 .increment(1);
427
428 let event = FallbackEvent::Success {
429 pattern_name: config.name.clone(),
430 timestamp: Instant::now(),
431 };
432 config.event_listeners.emit(&event);
433
434 Ok(response)
435 }
436 Err(error) => {
437 let should_handle = config
439 .handle_predicate
440 .as_ref()
441 .map(|p| p(&error))
442 .unwrap_or(true);
443
444 if !should_handle {
445 #[cfg(feature = "tracing")]
446 tracing::debug!(
447 fallback = %config.name,
448 "Error does not match predicate, skipping fallback"
449 );
450
451 #[cfg(feature = "metrics")]
452 counter!(
453 "fallback_calls_total",
454 "fallback" => config.name.clone(),
455 "result" => "skipped"
456 )
457 .increment(1);
458
459 let event = FallbackEvent::Skipped {
460 pattern_name: config.name.clone(),
461 timestamp: Instant::now(),
462 };
463 config.event_listeners.emit(&event);
464
465 return Err(FallbackError::Inner(error));
466 }
467
468 #[cfg(feature = "tracing")]
469 tracing::debug!(fallback = %config.name, "Inner service failed, applying fallback");
470
471 let event = FallbackEvent::FailedAttempt {
473 pattern_name: config.name.clone(),
474 timestamp: Instant::now(),
475 };
476 config.event_listeners.emit(&event);
477
478 match &config.strategy {
480 FallbackStrategy::Value(v) => {
481 #[cfg(feature = "metrics")]
482 counter!(
483 "fallback_calls_total",
484 "fallback" => config.name.clone(),
485 "result" => "applied",
486 "strategy" => "value"
487 )
488 .increment(1);
489
490 let event = FallbackEvent::Applied {
491 pattern_name: config.name.clone(),
492 timestamp: Instant::now(),
493 strategy: "value",
494 };
495 config.event_listeners.emit(&event);
496
497 Ok(v.clone())
498 }
499
500 FallbackStrategy::ValueFn(f) => {
501 let response = f();
502
503 #[cfg(feature = "metrics")]
504 counter!(
505 "fallback_calls_total",
506 "fallback" => config.name.clone(),
507 "result" => "applied",
508 "strategy" => "value_fn"
509 )
510 .increment(1);
511
512 let event = FallbackEvent::Applied {
513 pattern_name: config.name.clone(),
514 timestamp: Instant::now(),
515 strategy: "value_fn",
516 };
517 config.event_listeners.emit(&event);
518
519 Ok(response)
520 }
521
522 FallbackStrategy::FromError(f) => {
523 let response = f(&error);
524
525 #[cfg(feature = "metrics")]
526 counter!(
527 "fallback_calls_total",
528 "fallback" => config.name.clone(),
529 "result" => "applied",
530 "strategy" => "from_error"
531 )
532 .increment(1);
533
534 let event = FallbackEvent::Applied {
535 pattern_name: config.name.clone(),
536 timestamp: Instant::now(),
537 strategy: "from_error",
538 };
539 config.event_listeners.emit(&event);
540
541 Ok(response)
542 }
543
544 FallbackStrategy::FromRequestError(f) => {
545 let response = f(&req_clone, &error);
546
547 #[cfg(feature = "metrics")]
548 counter!(
549 "fallback_calls_total",
550 "fallback" => config.name.clone(),
551 "result" => "applied",
552 "strategy" => "from_request_error"
553 )
554 .increment(1);
555
556 let event = FallbackEvent::Applied {
557 pattern_name: config.name.clone(),
558 timestamp: Instant::now(),
559 strategy: "from_request_error",
560 };
561 config.event_listeners.emit(&event);
562
563 Ok(response)
564 }
565
566 FallbackStrategy::Service(backup) => {
567 #[cfg(feature = "tracing")]
568 tracing::debug!(fallback = %config.name, "Calling backup service");
569
570 match backup(req_clone).await {
571 Ok(response) => {
572 #[cfg(feature = "metrics")]
573 counter!(
574 "fallback_calls_total",
575 "fallback" => config.name.clone(),
576 "result" => "applied",
577 "strategy" => "service"
578 )
579 .increment(1);
580
581 let event = FallbackEvent::Applied {
582 pattern_name: config.name.clone(),
583 timestamp: Instant::now(),
584 strategy: "service",
585 };
586 config.event_listeners.emit(&event);
587
588 Ok(response)
589 }
590 Err(backup_error) => {
591 #[cfg(feature = "tracing")]
592 tracing::warn!(
593 fallback = %config.name,
594 "Backup service also failed"
595 );
596
597 #[cfg(feature = "metrics")]
598 counter!(
599 "fallback_calls_total",
600 "fallback" => config.name.clone(),
601 "result" => "failed",
602 "strategy" => "service"
603 )
604 .increment(1);
605
606 let event = FallbackEvent::Failed {
607 pattern_name: config.name.clone(),
608 timestamp: Instant::now(),
609 };
610 config.event_listeners.emit(&event);
611
612 Err(FallbackError::FallbackFailed(backup_error))
613 }
614 }
615 }
616
617 FallbackStrategy::Exception(transform) => {
618 let transformed = transform(error);
619
620 #[cfg(feature = "metrics")]
621 counter!(
622 "fallback_calls_total",
623 "fallback" => config.name.clone(),
624 "result" => "transformed",
625 "strategy" => "exception"
626 )
627 .increment(1);
628
629 let event = FallbackEvent::Applied {
630 pattern_name: config.name.clone(),
631 timestamp: Instant::now(),
632 strategy: "exception",
633 };
634 config.event_listeners.emit(&event);
635
636 Err(FallbackError::Inner(transformed))
637 }
638 }
639 }
640 }
641 })
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use std::sync::atomic::{AtomicUsize, Ordering};
649 use tower::{service_fn, Layer, ServiceExt};
650 use tower_resilience_core::ResilienceEvent;
651
652 #[derive(Debug, Clone)]
653 struct TestError {
654 message: String,
655 retryable: bool,
656 }
657
658 impl TestError {
659 fn new(message: &str) -> Self {
660 Self {
661 message: message.to_string(),
662 retryable: true,
663 }
664 }
665
666 fn non_retryable(message: &str) -> Self {
667 Self {
668 message: message.to_string(),
669 retryable: false,
670 }
671 }
672 }
673
674 #[tokio::test]
675 async fn success_no_fallback() {
676 let service =
677 service_fn(
678 |req: String| async move { Ok::<_, TestError>(format!("response: {}", req)) },
679 );
680
681 let layer = FallbackLayer::<String, String, TestError>::value("fallback".to_string());
682 let mut service = layer.layer(service);
683
684 let response = service
685 .ready()
686 .await
687 .unwrap()
688 .call("test".to_string())
689 .await
690 .unwrap();
691
692 assert_eq!(response, "response: test");
693 }
694
695 #[tokio::test]
696 async fn failure_triggers_value_fallback() {
697 let service =
698 service_fn(|_req: String| async move { Err::<String, _>(TestError::new("failed")) });
699
700 let layer = FallbackLayer::<String, String, TestError>::value("fallback".to_string());
701 let mut service = layer.layer(service);
702
703 let response = service
704 .ready()
705 .await
706 .unwrap()
707 .call("test".to_string())
708 .await
709 .unwrap();
710
711 assert_eq!(response, "fallback");
712 }
713
714 #[tokio::test]
715 async fn failure_triggers_from_error_fallback() {
716 let service = service_fn(|_req: String| async move {
717 Err::<String, _>(TestError::new("something went wrong"))
718 });
719
720 let layer = FallbackLayer::<String, String, TestError>::from_error(|e: &TestError| {
721 format!("Error: {}", e.message)
722 });
723 let mut service = layer.layer(service);
724
725 let response = service
726 .ready()
727 .await
728 .unwrap()
729 .call("test".to_string())
730 .await
731 .unwrap();
732
733 assert_eq!(response, "Error: something went wrong");
734 }
735
736 #[tokio::test]
737 async fn failure_triggers_from_request_error_fallback() {
738 let service =
739 service_fn(|_req: String| async move { Err::<String, _>(TestError::new("failed")) });
740
741 let layer = FallbackLayer::<String, String, TestError>::from_request_error(
742 |req: &String, _e: &TestError| format!("fallback for: {}", req),
743 );
744 let mut service = layer.layer(service);
745
746 let response = service
747 .ready()
748 .await
749 .unwrap()
750 .call("my-request".to_string())
751 .await
752 .unwrap();
753
754 assert_eq!(response, "fallback for: my-request");
755 }
756
757 #[tokio::test]
758 async fn predicate_skips_non_matching_errors() {
759 let service = service_fn(|_req: String| async move {
760 Err::<String, _>(TestError::non_retryable("permanent failure"))
761 });
762
763 let layer = FallbackLayer::builder()
764 .value("fallback".to_string())
765 .handle(|e: &TestError| e.retryable) .build();
767 let mut service = layer.layer(service);
768
769 let result = service
770 .ready()
771 .await
772 .unwrap()
773 .call("test".to_string())
774 .await;
775
776 assert!(matches!(result, Err(FallbackError::Inner(_))));
778 }
779
780 #[tokio::test]
781 async fn backup_service_fallback() {
782 let call_count = Arc::new(AtomicUsize::new(0));
783 let cc = Arc::clone(&call_count);
784
785 let primary = service_fn(move |_req: String| {
786 let cc = Arc::clone(&cc);
787 async move {
788 cc.fetch_add(1, Ordering::SeqCst);
789 Err::<String, _>(TestError::new("primary failed"))
790 }
791 });
792
793 let backup_calls = Arc::new(AtomicUsize::new(0));
794 let bc = Arc::clone(&backup_calls);
795
796 let layer = FallbackLayer::<String, String, TestError>::service(move |req: String| {
797 let bc = Arc::clone(&bc);
798 async move {
799 bc.fetch_add(1, Ordering::SeqCst);
800 Ok::<_, TestError>(format!("backup: {}", req))
801 }
802 });
803 let mut service = layer.layer(primary);
804
805 let response = service
806 .ready()
807 .await
808 .unwrap()
809 .call("test".to_string())
810 .await
811 .unwrap();
812
813 assert_eq!(response, "backup: test");
814 assert_eq!(call_count.load(Ordering::SeqCst), 1);
815 assert_eq!(backup_calls.load(Ordering::SeqCst), 1);
816 }
817
818 #[tokio::test]
819 async fn backup_service_also_fails() {
820 let primary =
821 service_fn(
822 |_req: String| async move { Err::<String, _>(TestError::new("primary failed")) },
823 );
824
825 let layer =
826 FallbackLayer::<String, String, TestError>::service(|_req: String| async move {
827 Err::<String, _>(TestError::new("backup also failed"))
828 });
829 let mut service = layer.layer(primary);
830
831 let result = service
832 .ready()
833 .await
834 .unwrap()
835 .call("test".to_string())
836 .await;
837
838 assert!(matches!(result, Err(FallbackError::FallbackFailed(_))));
839 }
840
841 #[tokio::test]
842 async fn exception_transforms_error() {
843 let service =
844 service_fn(
845 |_req: String| async move { Err::<String, _>(TestError::new("original error")) },
846 );
847
848 let layer = FallbackLayer::<String, String, TestError>::exception(|_e: TestError| {
849 TestError::new("transformed error")
850 });
851 let mut service = layer.layer(service);
852
853 let result = service
854 .ready()
855 .await
856 .unwrap()
857 .call("test".to_string())
858 .await;
859
860 match result {
861 Err(FallbackError::Inner(e)) => {
862 assert_eq!(e.message, "transformed error");
863 }
864 _ => panic!("expected transformed error"),
865 }
866 }
867
868 #[tokio::test]
869 async fn event_listeners_called() {
870 use std::sync::Mutex;
871
872 let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
873 let events_clone = Arc::clone(&events);
874
875 let service =
876 service_fn(|_req: String| async move { Err::<String, _>(TestError::new("failed")) });
877
878 let layer = FallbackLayer::builder()
879 .name("test-fallback")
880 .value("fallback".to_string())
881 .on_event(move |event: &FallbackEvent| {
882 events_clone
883 .lock()
884 .unwrap()
885 .push(event.event_type().to_string());
886 })
887 .build();
888 let mut service = layer.layer(service);
889
890 let _ = service
891 .ready()
892 .await
893 .unwrap()
894 .call("test".to_string())
895 .await;
896
897 let recorded = events.lock().unwrap();
898 assert!(recorded.contains(&"failed_attempt".to_string()));
899 assert!(recorded.contains(&"applied".to_string()));
900 }
901}