synaptic_runnables/
fallback.rs1use async_trait::async_trait;
2use synaptic_core::{RunnableConfig, SynapticError};
3
4use crate::runnable::{BoxRunnable, Runnable, RunnableOutputStream};
5
6pub struct RunnableWithFallbacks<I: Send + Clone + 'static, O: Send + 'static> {
9 primary: BoxRunnable<I, O>,
10 fallbacks: Vec<BoxRunnable<I, O>>,
11}
12
13impl<I: Send + Clone + 'static, O: Send + 'static> RunnableWithFallbacks<I, O> {
14 pub fn new(primary: BoxRunnable<I, O>, fallbacks: Vec<BoxRunnable<I, O>>) -> Self {
15 Self { primary, fallbacks }
16 }
17}
18
19#[async_trait]
20impl<I: Send + Clone + 'static, O: Send + 'static> Runnable<I, O> for RunnableWithFallbacks<I, O> {
21 async fn invoke(&self, input: I, config: &RunnableConfig) -> Result<O, SynapticError> {
22 let mut last_error = match self.primary.invoke(input.clone(), config).await {
23 Ok(output) => return Ok(output),
24 Err(e) => e,
25 };
26 for fallback in &self.fallbacks {
27 match fallback.invoke(input.clone(), config).await {
28 Ok(output) => return Ok(output),
29 Err(e) => last_error = e,
30 }
31 }
32 Err(last_error)
33 }
34
35 fn stream<'a>(&'a self, input: I, config: &'a RunnableConfig) -> RunnableOutputStream<'a, O>
37 where
38 I: 'a,
39 {
40 Box::pin(async_stream::stream! {
41 use futures::StreamExt;
42
43 let mut primary_stream = std::pin::pin!(self.primary.stream(input.clone(), config));
45 let mut primary_items = Vec::new();
46 let mut primary_failed = false;
47
48 while let Some(item) = primary_stream.next().await {
49 match item {
50 Ok(val) => primary_items.push(val),
51 Err(_e) => {
52 primary_failed = true;
53 break;
54 }
55 }
56 }
57
58 if !primary_failed {
59 for item in primary_items {
60 yield Ok(item);
61 }
62 return;
63 }
64
65 let mut last_error = None;
67 for fallback in &self.fallbacks {
68 let mut fb_stream = std::pin::pin!(fallback.stream(input.clone(), config));
69 let mut fb_items = Vec::new();
70 let mut fb_failed = false;
71
72 while let Some(item) = fb_stream.next().await {
73 match item {
74 Ok(val) => fb_items.push(val),
75 Err(e) => {
76 fb_failed = true;
77 last_error = Some(e);
78 break;
79 }
80 }
81 }
82
83 if !fb_failed {
84 for item in fb_items {
85 yield Ok(item);
86 }
87 return;
88 }
89 }
90
91 if let Some(e) = last_error {
92 yield Err(e);
93 }
94 })
95 }
96}