synaptic_runnables/
generator.rs1use std::pin::Pin;
2
3use async_trait::async_trait;
4use futures::{Stream, StreamExt};
5use synaptic_core::{RunnableConfig, SynapseError};
6
7use crate::runnable::{Runnable, RunnableOutputStream};
8
9type GeneratorFn<I, O> =
10 dyn Fn(I) -> Pin<Box<dyn Stream<Item = Result<O, SynapseError>> + Send>> + Send + Sync;
11
12pub struct RunnableGenerator<I: Send + 'static, O: Send + 'static> {
29 func: Box<GeneratorFn<I, O>>,
30}
31
32impl<I: Send + 'static, O: Send + 'static> RunnableGenerator<I, O> {
33 pub fn new<F, S>(func: F) -> Self
34 where
35 F: Fn(I) -> S + Send + Sync + 'static,
36 S: Stream<Item = Result<O, SynapseError>> + Send + 'static,
37 {
38 Self {
39 func: Box::new(move |input| Box::pin(func(input))),
40 }
41 }
42}
43
44#[async_trait]
45impl<I: Send + 'static, O: Send + 'static> Runnable<I, Vec<O>> for RunnableGenerator<I, O> {
46 async fn invoke(&self, input: I, _config: &RunnableConfig) -> Result<Vec<O>, SynapseError> {
47 let stream = (self.func)(input);
48 futures::pin_mut!(stream);
49 let mut results = Vec::new();
50 while let Some(item) = stream.next().await {
51 results.push(item?);
52 }
53 Ok(results)
54 }
55
56 fn stream<'a>(
57 &'a self,
58 input: I,
59 _config: &'a RunnableConfig,
60 ) -> RunnableOutputStream<'a, Vec<O>>
61 where
62 I: 'a,
63 {
64 Box::pin(async_stream::stream! {
65 let stream = (self.func)(input);
66 futures::pin_mut!(stream);
67 while let Some(item) = stream.next().await {
68 match item {
69 Ok(val) => yield Ok(vec![val]),
70 Err(e) => yield Err(e),
71 }
72 }
73 })
74 }
75}