floxide_core/
source.rs

1//! Abstraction for value-producing (source) nodes: nodes with Input=() that emit a stream of outputs.
2use crate::context::Context;
3use crate::error::FloxideError;
4use crate::workflow::Workflow;
5use crate::WorkflowCtx;
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10// -----------------------------------------------------------------------------
11/// A channel source: external code can send values in, and this source
12/// will drive a workflow for each received item until the channel closes.
13#[derive(Debug, Clone)]
14pub struct Source<C, O> {
15    receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<O>>>,
16    _phantom: std::marker::PhantomData<C>,
17}
18
19impl<C: Context, O> Source<C, O> {
20    /// Wrap an existing receiver into a Source
21    pub fn new(rx: tokio::sync::mpsc::Receiver<O>) -> Self {
22        Self {
23            receiver: Arc::new(Mutex::new(rx)),
24            _phantom: std::marker::PhantomData,
25        }
26    }
27}
28
29impl<C, O> Source<C, O>
30where
31    C: Context,
32    O: Send + Sync,
33{
34    /// Drive the provided workflow by pulling items from the channel and
35    /// invoking `wf.run(ctx, item)` for each until the channel closes.
36    pub async fn run<W>(&self, wf: &W, ctx: &WorkflowCtx<C>) -> Result<(), FloxideError>
37    where
38        W: Workflow<C, Input = O>,
39    {
40        let mut rx = self.receiver.lock().await;
41        while let Some(item) = rx.recv().await {
42            wf.run(ctx, item).await?;
43        }
44        Ok(())
45    }
46}
47
48/// Create a channel-backed source node and its sender handle.
49///
50/// `capacity` sets the mpsc buffer size. Returns `(sender, source_node)`.
51pub fn source<C, O>(capacity: usize) -> (tokio::sync::mpsc::Sender<O>, Source<C, O>)
52where
53    C: Context,
54    O: Send + Sync,
55{
56    let (tx, rx) = tokio::sync::mpsc::channel(capacity);
57    (tx, Source::new(rx))
58}