1use crate::context::Context;
3use crate::error::FloxideError;
4use crate::workflow::Workflow;
5use crate::WorkflowCtx;
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10#[derive(Debug, Clone)]
14pub struct Source<C, O> {
15 receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<O>>>,
16 _phantom: std::marker::PhantomData<C>,
17}
18
19impl<C: Context, O> Source<C, O> {
20 pub fn new(rx: tokio::sync::mpsc::Receiver<O>) -> Self {
22 Self {
23 receiver: Arc::new(Mutex::new(rx)),
24 _phantom: std::marker::PhantomData,
25 }
26 }
27}
28
29impl<C, O> Source<C, O>
30where
31 C: Context,
32 O: Send + Sync,
33{
34 pub async fn run<W>(&self, wf: &W, ctx: &WorkflowCtx<C>) -> Result<(), FloxideError>
37 where
38 W: Workflow<C, Input = O>,
39 {
40 let mut rx = self.receiver.lock().await;
41 while let Some(item) = rx.recv().await {
42 wf.run(ctx, item).await?;
43 }
44 Ok(())
45 }
46}
47
48pub fn source<C, O>(capacity: usize) -> (tokio::sync::mpsc::Sender<O>, Source<C, O>)
52where
53 C: Context,
54 O: Send + Sync,
55{
56 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
57 (tx, Source::new(rx))
58}