Skip to main content

synaptic_runnables/
fallback.rs

1use async_trait::async_trait;
2use synaptic_core::{RunnableConfig, SynapseError};
3
4use crate::runnable::{BoxRunnable, Runnable, RunnableOutputStream};
5
6/// Tries the primary runnable first. If it fails, tries each fallback in order.
7/// Input must be `Clone` so it can be retried on fallbacks.
8pub struct RunnableWithFallbacks<I: Send + Clone + 'static, O: Send + 'static> {
9    primary: BoxRunnable<I, O>,
10    fallbacks: Vec<BoxRunnable<I, O>>,
11}
12
13impl<I: Send + Clone + 'static, O: Send + 'static> RunnableWithFallbacks<I, O> {
14    pub fn new(primary: BoxRunnable<I, O>, fallbacks: Vec<BoxRunnable<I, O>>) -> Self {
15        Self { primary, fallbacks }
16    }
17}
18
19#[async_trait]
20impl<I: Send + Clone + 'static, O: Send + 'static> Runnable<I, O> for RunnableWithFallbacks<I, O> {
21    async fn invoke(&self, input: I, config: &RunnableConfig) -> Result<O, SynapseError> {
22        let mut last_error = match self.primary.invoke(input.clone(), config).await {
23            Ok(output) => return Ok(output),
24            Err(e) => e,
25        };
26        for fallback in &self.fallbacks {
27            match fallback.invoke(input.clone(), config).await {
28                Ok(output) => return Ok(output),
29                Err(e) => last_error = e,
30            }
31        }
32        Err(last_error)
33    }
34
35    /// Stream: try primary stream, fall back on error.
36    fn stream<'a>(&'a self, input: I, config: &'a RunnableConfig) -> RunnableOutputStream<'a, O>
37    where
38        I: 'a,
39    {
40        Box::pin(async_stream::stream! {
41            use futures::StreamExt;
42
43            // Try primary
44            let mut primary_stream = std::pin::pin!(self.primary.stream(input.clone(), config));
45            let mut primary_items = Vec::new();
46            let mut primary_failed = false;
47
48            while let Some(item) = primary_stream.next().await {
49                match item {
50                    Ok(val) => primary_items.push(val),
51                    Err(_e) => {
52                        primary_failed = true;
53                        break;
54                    }
55                }
56            }
57
58            if !primary_failed {
59                for item in primary_items {
60                    yield Ok(item);
61                }
62                return;
63            }
64
65            // Try fallbacks
66            let mut last_error = None;
67            for fallback in &self.fallbacks {
68                let mut fb_stream = std::pin::pin!(fallback.stream(input.clone(), config));
69                let mut fb_items = Vec::new();
70                let mut fb_failed = false;
71
72                while let Some(item) = fb_stream.next().await {
73                    match item {
74                        Ok(val) => fb_items.push(val),
75                        Err(e) => {
76                            fb_failed = true;
77                            last_error = Some(e);
78                            break;
79                        }
80                    }
81                }
82
83                if !fb_failed {
84                    for item in fb_items {
85                        yield Ok(item);
86                    }
87                    return;
88                }
89            }
90
91            if let Some(e) = last_error {
92                yield Err(e);
93            }
94        })
95    }
96}