Skip to main content

synaptic_runnables/
sequence.rs

1use std::ops::BitOr;
2
3use async_trait::async_trait;
4use synaptic_core::{RunnableConfig, SynapticError};
5
6use crate::runnable::{BoxRunnable, Runnable, RunnableOutputStream};
7
8/// Chains two runnables: output of `first` feeds into `second`.
9/// Created automatically via the `|` operator on `BoxRunnable`.
10pub struct RunnableSequence<I, M, O>
11where
12    I: Send + 'static,
13    M: Send + 'static,
14    O: Send + 'static,
15{
16    pub(crate) first: BoxRunnable<I, M>,
17    pub(crate) second: BoxRunnable<M, O>,
18}
19
20#[async_trait]
21impl<I, M, O> Runnable<I, O> for RunnableSequence<I, M, O>
22where
23    I: Send + 'static,
24    M: Send + 'static,
25    O: Send + 'static,
26{
27    async fn invoke(&self, input: I, config: &RunnableConfig) -> Result<O, SynapticError> {
28        let mid = self.first.invoke(input, config).await?;
29        self.second.invoke(mid, config).await
30    }
31
32    /// Stream: invoke first step fully, then stream the second step.
33    /// This matches LangChain behavior where only the final component truly streams.
34    fn stream<'a>(&'a self, input: I, config: &'a RunnableConfig) -> RunnableOutputStream<'a, O>
35    where
36        I: 'a,
37    {
38        Box::pin(async_stream::stream! {
39            match self.first.invoke(input, config).await {
40                Ok(mid) => {
41                    use futures::StreamExt;
42                    let mut s = std::pin::pin!(self.second.stream(mid, config));
43                    while let Some(item) = s.next().await {
44                        yield item;
45                    }
46                }
47                Err(e) => yield Err(e),
48            }
49        })
50    }
51}
52
53/// Pipe operator: `a | b` creates a `RunnableSequence` that runs `a` then `b`.
54impl<I, M, O> BitOr<BoxRunnable<M, O>> for BoxRunnable<I, M>
55where
56    I: Send + 'static,
57    M: Send + 'static,
58    O: Send + 'static,
59{
60    type Output = BoxRunnable<I, O>;
61
62    fn bitor(self, rhs: BoxRunnable<M, O>) -> BoxRunnable<I, O> {
63        BoxRunnable::new(RunnableSequence {
64            first: self,
65            second: rhs,
66        })
67    }
68}