Skip to main content

synaptic_runnables/
generator.rs

1use std::pin::Pin;
2
3use async_trait::async_trait;
4use futures::{Stream, StreamExt};
5use synaptic_core::{RunnableConfig, SynapseError};
6
7use crate::runnable::{Runnable, RunnableOutputStream};
8
9type GeneratorFn<I, O> =
10    dyn Fn(I) -> Pin<Box<dyn Stream<Item = Result<O, SynapseError>> + Send>> + Send + Sync;
11
12/// A runnable built from a generator function that yields streaming output.
13///
14/// The generator function receives an input and returns a `Stream` of results.
15/// `invoke()` collects the entire stream into the final item, while `stream()`
16/// returns the generator's output directly for true streaming.
17///
18/// ```ignore
19/// let gen = RunnableGenerator::new(|input: String| {
20///     async_stream::stream! {
21///         for ch in input.chars() {
22///             yield Ok(ch.to_string());
23///         }
24///     }
25/// });
26/// // stream() yields individual characters; invoke() collects them
27/// ```
28pub struct RunnableGenerator<I: Send + 'static, O: Send + 'static> {
29    func: Box<GeneratorFn<I, O>>,
30}
31
32impl<I: Send + 'static, O: Send + 'static> RunnableGenerator<I, O> {
33    pub fn new<F, S>(func: F) -> Self
34    where
35        F: Fn(I) -> S + Send + Sync + 'static,
36        S: Stream<Item = Result<O, SynapseError>> + Send + 'static,
37    {
38        Self {
39            func: Box::new(move |input| Box::pin(func(input))),
40        }
41    }
42}
43
44#[async_trait]
45impl<I: Send + 'static, O: Send + 'static> Runnable<I, Vec<O>> for RunnableGenerator<I, O> {
46    async fn invoke(&self, input: I, _config: &RunnableConfig) -> Result<Vec<O>, SynapseError> {
47        let stream = (self.func)(input);
48        futures::pin_mut!(stream);
49        let mut results = Vec::new();
50        while let Some(item) = stream.next().await {
51            results.push(item?);
52        }
53        Ok(results)
54    }
55
56    fn stream<'a>(
57        &'a self,
58        input: I,
59        _config: &'a RunnableConfig,
60    ) -> RunnableOutputStream<'a, Vec<O>>
61    where
62        I: 'a,
63    {
64        Box::pin(async_stream::stream! {
65            let stream = (self.func)(input);
66            futures::pin_mut!(stream);
67            while let Some(item) = stream.next().await {
68                match item {
69                    Ok(val) => yield Ok(vec![val]),
70                    Err(e) => yield Err(e),
71                }
72            }
73        })
74    }
75}