node_flow/flows/
detached.rs

1use std::{fmt::Debug, vec};
2
3use crate::{
4    context::{Fork, SpawnAsync},
5    describe::{Description, Edge, remove_generics_from_name},
6    flows::NodeResult,
7    node::{Node, NodeOutput as NodeOutputStruct},
8};
9
10/// `Detached` executes a node **asynchronously and independently** of the main flow.
11///
12/// The node is executed in a **detached task** using the [`SpawnAsync`]
13/// context trait and any result or error from the detached node is ignored.
14///
15/// This flow is useful for side-effect operations such as logging, analytics, or background
16/// triggers that should not block or influence the main execution path.
17///
18/// # Type Parameters
19/// - `Input`: The type of data **accepted and produced** by this flow.
20/// - `Error`: The type of error emitted by this flow.
21/// - `Context`: The type of context used during execution.
22///
23/// # Examples
24/// ```
25/// use node_flow::node::{Node, NodeOutput};
26/// use node_flow::context::{SpawnAsync, Fork};
27/// # use node_flow::context::Task;
28/// use node_flow::flows::Detached;
29/// use std::future::Future;
30///
31/// #[derive(Clone)]
32/// struct PrintNode;
33///
34/// struct ExampleCtx;
35/// impl Fork for ExampleCtx // ...
36/// # { fn fork(&self) -> Self { Self } }
37/// impl SpawnAsync for ExampleCtx // ...
38/// # {
39/// #    fn spawn<F>(fut: F) -> impl Task<F::Output>
40/// #     where
41/// #         F: Future + Send + 'static,
42/// #         F::Output: Send + 'static,
43/// #     {
44/// #         DummyTask(std::marker::PhantomData)
45/// #     }
46/// # }
47/// # struct DummyTask<T>(std::marker::PhantomData<T>);
48/// # impl<T> Future for DummyTask<T> {
49/// #     type Output = T;
50/// #     fn poll(
51/// #         self: std::pin::Pin<&mut Self>,
52/// #         _: &mut std::task::Context<'_>
53/// #     ) -> std::task::Poll<Self::Output> {
54/// #         std::task::Poll::Pending
55/// #     }
56/// # }
57/// # impl<T> Task<T> for DummyTask<T> {
58/// #     fn is_finished(&self) -> bool { false }
59/// #     fn cancel(self) {}
60/// # }
61///
62/// impl<Ctx: Send> Node<u8, NodeOutput<()>, (), Ctx> for PrintNode {
63///     async fn run(&mut self, input: u8, _: &mut Ctx) -> Result<NodeOutput<()>, ()> {
64///         println!("Running detached task with input: {input}");
65///         Ok(NodeOutput::Ok(()))
66///     }
67/// }
68///
69/// # tokio::runtime::Builder::new_current_thread()
70/// #     .enable_all()
71/// #     .build()
72/// #     .unwrap()
73/// #     .block_on(async {
74/// async fn main() {
75///     let mut detached = Detached::<u8, (), _>::new(PrintNode);
76///
77///     let mut ctx = ExampleCtx;
78///     let result = detached.run(7, &mut ctx).await;
79///     assert_eq!(result, Ok(NodeOutput::Ok(7)));
80/// }
81/// # main().await;
82/// # });
83/// ```
84pub struct Detached<Input, Error, Context, NodeType = (), NodeOutput = (), NodeError = ()> {
85    #[expect(clippy::type_complexity)]
86    _iec: std::marker::PhantomData<fn() -> (Input, Error, Context)>,
87    _node_oe: std::marker::PhantomData<fn() -> (NodeOutput, NodeError)>,
88    node: std::sync::Arc<NodeType>,
89}
90
91impl<Input, Error, Context> Detached<Input, Error, Context> {
92    /// Creates a new [`Detached`] flow by wrapping the given node.
93    ///
94    /// See also [`Detached`].
95    ///
96    /// # Examples
97    /// ```
98    /// use node_flow::flows::Detached;
99    /// use node_flow::node::{Node, NodeOutput};
100    /// # use node_flow::context::{SpawnAsync, Fork};
101    /// # use node_flow::context::Task;
102    /// # use std::future::Future;
103    ///
104    /// #[derive(Clone)]
105    /// struct BackgroundTask;
106    /// impl<Ctx: Send> Node<(), NodeOutput<()>, (), Ctx> for BackgroundTask // ...
107    /// # {
108    /// #     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<()>, ()> {
109    /// #         todo!()
110    /// #     }
111    /// # }
112    /// # struct Ctx;
113    /// # impl Fork for Ctx { fn fork(&self) -> Self { Self } }
114    /// # impl SpawnAsync for Ctx {
115    /// #    fn spawn<F>(fut: F) -> impl Task<F::Output>
116    /// #     where
117    /// #         F: Future + Send + 'static,
118    /// #         F::Output: Send + 'static,
119    /// #     {
120    /// #         DummyTask(std::marker::PhantomData)
121    /// #     }
122    /// # }
123    /// # struct DummyTask<T>(std::marker::PhantomData<T>);
124    /// # impl<T> Future for DummyTask<T> // ...
125    /// # {
126    /// #     type Output = T;
127    /// #     fn poll(
128    /// #         self: std::pin::Pin<&mut Self>,
129    /// #         _: &mut std::task::Context<'_>
130    /// #     ) -> std::task::Poll<Self::Output> {
131    /// #         std::task::Poll::Pending
132    /// #     }
133    /// # }
134    /// # impl<T> Task<T> for DummyTask<T> {
135    /// #     fn is_finished(&self) -> bool { false }
136    /// #     fn cancel(self) {}
137    /// # }
138    ///
139    /// let detached = Detached::<(), (), Ctx>::new(BackgroundTask);
140    /// ```
141    #[expect(clippy::type_repetition_in_bounds)]
142    pub fn new<NodeType, NodeOutput, NodeError>(
143        node: NodeType,
144    ) -> Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
145    where
146        NodeType: Node<Input, NodeOutput, NodeError, Context>,
147        // Trait bounds for better and nicer errors
148        NodeType: Clone + Send,
149        Input: Clone + Send,
150    {
151        Detached {
152            _iec: std::marker::PhantomData,
153            _node_oe: std::marker::PhantomData,
154            node: std::sync::Arc::new(node),
155        }
156    }
157}
158
159impl<Input, Error, Context, NodeType, NodeOutput, NodeError> Debug
160    for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
161where
162    NodeType: Debug,
163{
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("Detached")
166            .field("node", &self.node)
167            .finish_non_exhaustive()
168    }
169}
170
171impl<Input, Error, Context, NodeType, NodeOutput, NodeError> Clone
172    for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
173{
174    fn clone(&self) -> Self {
175        Self {
176            _iec: std::marker::PhantomData,
177            _node_oe: std::marker::PhantomData,
178            node: self.node.clone(),
179        }
180    }
181}
182
183impl<Input, Error, Context, NodeType, NodeOutput, NodeError>
184    Node<Input, NodeOutputStruct<Input>, Error, Context>
185    for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
186where
187    NodeType: Node<Input, NodeOutput, NodeError, Context> + Clone + Send + 'static,
188    Context: SpawnAsync + Fork + Send + 'static,
189    Input: Clone + Send + 'static,
190{
191    fn run(
192        &mut self,
193        input: Input,
194        context: &mut Context,
195    ) -> impl Future<Output = NodeResult<Input, Error>> + Send {
196        let _task = Context::spawn({
197            let mut node = self.node.as_ref().clone();
198            let input = input.clone();
199            let mut context = context.fork();
200            async move {
201                let _ = node.run(input, &mut context).await;
202            }
203        });
204        async { Ok(NodeOutputStruct::Ok(input)) }
205    }
206
207    fn describe(&self) -> Description {
208        Description::new_flow(
209            self,
210            vec![self.node.describe()],
211            vec![Edge::passthrough(), Edge::flow_to_node(0)],
212        )
213        .modify_name(remove_generics_from_name)
214    }
215}
216
217#[cfg(test)]
218mod test {
219    use std::time::{Duration, Instant};
220
221    use super::Detached;
222    use crate::{
223        context::{Fork, test::TokioSpawner},
224        node::{Node, NodeOutput},
225    };
226
227    impl Fork for TokioSpawner {
228        fn fork(&self) -> Self {
229            Self
230        }
231    }
232
233    #[derive(Clone)]
234    pub struct TestNode(tokio::sync::mpsc::Sender<()>);
235
236    impl<I, C> Node<I, (), (), C> for TestNode
237    where
238        I: Send,
239        C: Send,
240    {
241        async fn run(&mut self, _input: I, _context: &mut C) -> Result<(), ()> {
242            tokio::time::sleep(Duration::from_millis(20)).await;
243            self.0.send(()).await.unwrap();
244            Err(())
245        }
246    }
247
248    #[tokio::test]
249    async fn test_flow() {
250        let (sender, mut receiver) = tokio::sync::mpsc::channel(5);
251        let mut ctx = TokioSpawner;
252        let mut flow = Detached::<_, (), _>::new(TestNode(sender));
253
254        let sleep = tokio::time::sleep(Duration::from_millis(50));
255        tokio::pin!(sleep);
256        let start = Instant::now();
257        let res = flow.run(3u8, &mut ctx).await;
258        let flow_end = Instant::now();
259
260        tokio::select! {
261            _ = receiver.recv() => {}
262            _ = &mut sleep => {
263                panic!("timeout");
264            }
265        };
266        let end = Instant::now();
267
268        let flow_took = flow_end.duration_since(start);
269        let node_took = end.duration_since(start);
270        println!("flow_took: {flow_took:?}");
271        println!("node_took: {node_took:?}");
272        assert_eq!(res, Ok(NodeOutput::Ok(3)));
273        assert!(flow_took.as_millis() < 1);
274        assert!(node_took.as_millis() > 15);
275    }
276}