synaptic_runnables/
sequence.rs1use std::ops::BitOr;
2
3use async_trait::async_trait;
4use synaptic_core::{RunnableConfig, SynapticError};
5
6use crate::runnable::{BoxRunnable, Runnable, RunnableOutputStream};
7
8pub struct RunnableSequence<I, M, O>
11where
12 I: Send + 'static,
13 M: Send + 'static,
14 O: Send + 'static,
15{
16 pub(crate) first: BoxRunnable<I, M>,
17 pub(crate) second: BoxRunnable<M, O>,
18}
19
20#[async_trait]
21impl<I, M, O> Runnable<I, O> for RunnableSequence<I, M, O>
22where
23 I: Send + 'static,
24 M: Send + 'static,
25 O: Send + 'static,
26{
27 async fn invoke(&self, input: I, config: &RunnableConfig) -> Result<O, SynapticError> {
28 let mid = self.first.invoke(input, config).await?;
29 self.second.invoke(mid, config).await
30 }
31
32 fn stream<'a>(&'a self, input: I, config: &'a RunnableConfig) -> RunnableOutputStream<'a, O>
35 where
36 I: 'a,
37 {
38 Box::pin(async_stream::stream! {
39 match self.first.invoke(input, config).await {
40 Ok(mid) => {
41 use futures::StreamExt;
42 let mut s = std::pin::pin!(self.second.stream(mid, config));
43 while let Some(item) = s.next().await {
44 yield item;
45 }
46 }
47 Err(e) => yield Err(e),
48 }
49 })
50 }
51}
52
53impl<I, M, O> BitOr<BoxRunnable<M, O>> for BoxRunnable<I, M>
55where
56 I: Send + 'static,
57 M: Send + 'static,
58 O: Send + 'static,
59{
60 type Output = BoxRunnable<I, O>;
61
62 fn bitor(self, rhs: BoxRunnable<M, O>) -> BoxRunnable<I, O> {
63 BoxRunnable::new(RunnableSequence {
64 first: self,
65 second: rhs,
66 })
67 }
68}