Skip to main content

node_flow/flows/
detached.rs

1use std::{fmt::Debug, vec};
2
3use crate::{
4    context::{Fork, SpawnAsync},
5    describe::{Description, Edge, remove_generics_from_name},
6    flows::NodeResult,
7    node::{Node, NodeOutput as NodeOutputStruct},
8};
9
10/// `Detached` executes a node **asynchronously and independently** of the main flow.
11///
12/// The node is executed in a **detached task** using the [`SpawnAsync`]
13/// context trait and any result or error from the detached node is ignored.
14///
15/// This flow is useful for side-effect operations such as logging, analytics, or background
16/// triggers that should not block or influence the main execution path.
17///
18/// # Type Parameters
19/// - `Input`: The type of data **accepted and produced** by this flow.
20/// - `Error`: The type of error emitted by this flow.
21/// - `Context`: The type of context used during execution.
22///
23/// # Examples
24/// ```
25/// use node_flow::node::{Node, NodeOutput};
26/// use node_flow::context::{SpawnAsync, Fork};
27/// # use node_flow::context::Task;
28/// use node_flow::flows::Detached;
29/// use std::future::Future;
30///
31/// #[derive(Clone)]
32/// struct PrintNode;
33///
34/// struct ExampleCtx;
35/// impl Fork for ExampleCtx // ...
36/// # { fn fork(&self) -> Self { Self } }
37/// impl SpawnAsync for ExampleCtx // ...
38/// # {
39/// #    fn spawn<F>(fut: F) -> impl Task<F::Output>
40/// #     where
41/// #         F: Future + Send + 'static,
42/// #         F::Output: Send + 'static,
43/// #     {
44/// #         DummyTask(std::marker::PhantomData)
45/// #     }
46/// # }
47/// # struct DummyTask<T>(std::marker::PhantomData<T>);
48/// # impl<T> Future for DummyTask<T> {
49/// #     type Output = T;
50/// #     fn poll(
51/// #         self: std::pin::Pin<&mut Self>,
52/// #         _: &mut std::task::Context<'_>
53/// #     ) -> std::task::Poll<Self::Output> {
54/// #         std::task::Poll::Pending
55/// #     }
56/// # }
57/// # impl<T> Task<T> for DummyTask<T> {
58/// #     fn is_finished(&self) -> bool { false }
59/// #     fn cancel(self) {}
60/// # }
61///
62/// impl<Ctx: Send> Node<u8, NodeOutput<()>, (), Ctx> for PrintNode {
63///     async fn run(&mut self, input: u8, _: &mut Ctx) -> Result<NodeOutput<()>, ()> {
64///         println!("Running detached task with input: {input}");
65///         Ok(NodeOutput::Ok(()))
66///     }
67/// }
68///
69/// # tokio::runtime::Builder::new_current_thread()
70/// #     .enable_all()
71/// #     .build()
72/// #     .unwrap()
73/// #     .block_on(async {
74/// async fn main() {
75///     let mut detached = Detached::<u8, (), _>::new(PrintNode);
76///
77///     let mut ctx = ExampleCtx;
78///     let result = detached.run(7, &mut ctx).await;
79///     assert_eq!(result, Ok(NodeOutput::Ok(7)));
80/// }
81/// # main().await;
82/// # });
83/// ```
84pub struct Detached<Input, Error, Context, NodeType = (), NodeOutput = (), NodeError = ()> {
85    #[expect(clippy::type_complexity)]
86    _iec: std::marker::PhantomData<fn() -> (Input, Error, Context)>,
87    _node_oe: std::marker::PhantomData<fn() -> (NodeOutput, NodeError)>,
88    node: NodeType,
89}
90
91impl<Input, Error, Context> Detached<Input, Error, Context> {
92    /// Creates a new [`Detached`] flow by wrapping the given node.
93    ///
94    /// See also [`Detached`].
95    ///
96    /// # Examples
97    /// ```
98    /// use node_flow::flows::Detached;
99    /// use node_flow::node::{Node, NodeOutput};
100    /// # use node_flow::context::{SpawnAsync, Fork};
101    /// # use node_flow::context::Task;
102    /// # use std::future::Future;
103    ///
104    /// #[derive(Clone)]
105    /// struct BackgroundTask;
106    /// impl<Ctx: Send> Node<(), NodeOutput<()>, (), Ctx> for BackgroundTask // ...
107    /// # {
108    /// #     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<()>, ()> {
109    /// #         todo!()
110    /// #     }
111    /// # }
112    /// # struct Ctx;
113    /// # impl Fork for Ctx { fn fork(&self) -> Self { Self } }
114    /// # impl SpawnAsync for Ctx {
115    /// #    fn spawn<F>(fut: F) -> impl Task<F::Output>
116    /// #     where
117    /// #         F: Future + Send + 'static,
118    /// #         F::Output: Send + 'static,
119    /// #     {
120    /// #         DummyTask(std::marker::PhantomData)
121    /// #     }
122    /// # }
123    /// # struct DummyTask<T>(std::marker::PhantomData<T>);
124    /// # impl<T> Future for DummyTask<T> // ...
125    /// # {
126    /// #     type Output = T;
127    /// #     fn poll(
128    /// #         self: std::pin::Pin<&mut Self>,
129    /// #         _: &mut std::task::Context<'_>
130    /// #     ) -> std::task::Poll<Self::Output> {
131    /// #         std::task::Poll::Pending
132    /// #     }
133    /// # }
134    /// # impl<T> Task<T> for DummyTask<T> {
135    /// #     fn is_finished(&self) -> bool { false }
136    /// #     fn cancel(self) {}
137    /// # }
138    ///
139    /// let detached = Detached::<(), (), Ctx>::new(BackgroundTask);
140    /// ```
141    #[expect(clippy::type_repetition_in_bounds)]
142    pub fn new<NodeType, NodeOutput, NodeError>(
143        node: NodeType,
144    ) -> Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
145    where
146        NodeType: Node<Input, NodeOutput, NodeError, Context>,
147        // Trait bounds for better and nicer errors
148        NodeType: Clone + Send,
149        Input: Clone + Send,
150    {
151        Detached {
152            _iec: std::marker::PhantomData,
153            _node_oe: std::marker::PhantomData,
154            node,
155        }
156    }
157}
158
159impl<Input, Error, Context, NodeType, NodeOutput, NodeError> Debug
160    for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
161where
162    NodeType: Debug,
163{
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("Detached")
166            .field("node", &self.node)
167            .finish_non_exhaustive()
168    }
169}
170
171impl<Input, Error, Context, NodeType, NodeOutput, NodeError> Clone
172    for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
173where
174    NodeType: Clone,
175{
176    fn clone(&self) -> Self {
177        Self {
178            _iec: std::marker::PhantomData,
179            _node_oe: std::marker::PhantomData,
180            node: self.node.clone(),
181        }
182    }
183}
184
185impl<Input, Error, Context, NodeType, NodeOutput, NodeError>
186    Node<Input, NodeOutputStruct<Input>, Error, Context>
187    for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
188where
189    NodeType: Node<Input, NodeOutput, NodeError, Context> + Clone + Send + 'static,
190    Context: SpawnAsync + Fork + Send + 'static,
191    Input: Clone + Send + 'static,
192{
193    fn run(
194        &mut self,
195        input: Input,
196        context: &mut Context,
197    ) -> impl Future<Output = NodeResult<Input, Error>> + Send {
198        let _task = Context::spawn({
199            let mut node = self.node.clone();
200            let input = input.clone();
201            let mut context = context.fork();
202            async move {
203                let _ = node.run(input, &mut context).await;
204            }
205        });
206        async { Ok(NodeOutputStruct::Ok(input)) }
207    }
208
209    fn describe(&self) -> Description {
210        Description::new_flow(
211            self,
212            vec![self.node.describe()],
213            vec![Edge::passthrough(), Edge::flow_to_node(0)],
214        )
215        .modify_name(remove_generics_from_name)
216    }
217}
218
219#[cfg(test)]
220mod test {
221    use std::time::{Duration, Instant};
222
223    use super::Detached;
224    use crate::{
225        context::{Fork, test::TokioSpawner},
226        node::{Node, NodeOutput},
227    };
228
229    impl Fork for TokioSpawner {
230        fn fork(&self) -> Self {
231            Self
232        }
233    }
234
235    #[derive(Clone)]
236    pub struct TestNode(tokio::sync::mpsc::Sender<()>);
237
238    impl<I, C> Node<I, (), (), C> for TestNode
239    where
240        I: Send,
241        C: Send,
242    {
243        async fn run(&mut self, _input: I, _context: &mut C) -> Result<(), ()> {
244            tokio::time::sleep(Duration::from_millis(20)).await;
245            self.0.send(()).await.unwrap();
246            Err(())
247        }
248    }
249
250    #[tokio::test]
251    async fn test_flow() {
252        let (sender, mut receiver) = tokio::sync::mpsc::channel(5);
253        let mut ctx = TokioSpawner;
254        let mut flow = Detached::<_, (), _>::new(TestNode(sender));
255
256        let sleep = tokio::time::sleep(Duration::from_millis(50));
257        tokio::pin!(sleep);
258        let start = Instant::now();
259        let res = flow.run(3u8, &mut ctx).await;
260        let flow_end = Instant::now();
261
262        tokio::select! {
263            _ = receiver.recv() => {}
264            _ = &mut sleep => {
265                panic!("timeout");
266            }
267        };
268        let end = Instant::now();
269
270        let flow_took = flow_end.duration_since(start);
271        let node_took = end.duration_since(start);
272        println!("flow_took: {flow_took:?}");
273        println!("node_took: {node_took:?}");
274        assert_eq!(res, Ok(NodeOutput::Ok(3)));
275        assert!(flow_took.as_millis() < 1);
276        assert!(node_took.as_millis() > 15);
277    }
278}