node_flow/flows/detached.rs
1use std::{fmt::Debug, vec};
2
3use crate::{
4 context::{Fork, SpawnAsync},
5 describe::{Description, Edge, remove_generics_from_name},
6 flows::NodeResult,
7 node::{Node, NodeOutput as NodeOutputStruct},
8};
9
10/// `Detached` executes a node **asynchronously and independently** of the main flow.
11///
12/// The node is executed in a **detached task** using the [`SpawnAsync`]
13/// context trait and any result or error from the detached node is ignored.
14///
15/// This flow is useful for side-effect operations such as logging, analytics, or background
16/// triggers that should not block or influence the main execution path.
17///
18/// # Type Parameters
19/// - `Input`: The type of data **accepted and produced** by this flow.
20/// - `Error`: The type of error emitted by this flow.
21/// - `Context`: The type of context used during execution.
22///
23/// # Examples
24/// ```
25/// use node_flow::node::{Node, NodeOutput};
26/// use node_flow::context::{SpawnAsync, Fork};
27/// # use node_flow::context::Task;
28/// use node_flow::flows::Detached;
29/// use std::future::Future;
30///
31/// #[derive(Clone)]
32/// struct PrintNode;
33///
34/// struct ExampleCtx;
35/// impl Fork for ExampleCtx // ...
36/// # { fn fork(&self) -> Self { Self } }
37/// impl SpawnAsync for ExampleCtx // ...
38/// # {
39/// # fn spawn<F>(fut: F) -> impl Task<F::Output>
40/// # where
41/// # F: Future + Send + 'static,
42/// # F::Output: Send + 'static,
43/// # {
44/// # DummyTask(std::marker::PhantomData)
45/// # }
46/// # }
47/// # struct DummyTask<T>(std::marker::PhantomData<T>);
48/// # impl<T> Future for DummyTask<T> {
49/// # type Output = T;
50/// # fn poll(
51/// # self: std::pin::Pin<&mut Self>,
52/// # _: &mut std::task::Context<'_>
53/// # ) -> std::task::Poll<Self::Output> {
54/// # std::task::Poll::Pending
55/// # }
56/// # }
57/// # impl<T> Task<T> for DummyTask<T> {
58/// # fn is_finished(&self) -> bool { false }
59/// # fn cancel(self) {}
60/// # }
61///
62/// impl<Ctx: Send> Node<u8, NodeOutput<()>, (), Ctx> for PrintNode {
63/// async fn run(&mut self, input: u8, _: &mut Ctx) -> Result<NodeOutput<()>, ()> {
64/// println!("Running detached task with input: {input}");
65/// Ok(NodeOutput::Ok(()))
66/// }
67/// }
68///
69/// # tokio::runtime::Builder::new_current_thread()
70/// # .enable_all()
71/// # .build()
72/// # .unwrap()
73/// # .block_on(async {
74/// async fn main() {
75/// let mut detached = Detached::<u8, (), _>::new(PrintNode);
76///
77/// let mut ctx = ExampleCtx;
78/// let result = detached.run(7, &mut ctx).await;
79/// assert_eq!(result, Ok(NodeOutput::Ok(7)));
80/// }
81/// # main().await;
82/// # });
83/// ```
84pub struct Detached<Input, Error, Context, NodeType = (), NodeOutput = (), NodeError = ()> {
85 #[expect(clippy::type_complexity)]
86 _iec: std::marker::PhantomData<fn() -> (Input, Error, Context)>,
87 _node_oe: std::marker::PhantomData<fn() -> (NodeOutput, NodeError)>,
88 node: std::sync::Arc<NodeType>,
89}
90
91impl<Input, Error, Context> Detached<Input, Error, Context> {
92 /// Creates a new [`Detached`] flow by wrapping the given node.
93 ///
94 /// See also [`Detached`].
95 ///
96 /// # Examples
97 /// ```
98 /// use node_flow::flows::Detached;
99 /// use node_flow::node::{Node, NodeOutput};
100 /// # use node_flow::context::{SpawnAsync, Fork};
101 /// # use node_flow::context::Task;
102 /// # use std::future::Future;
103 ///
104 /// #[derive(Clone)]
105 /// struct BackgroundTask;
106 /// impl<Ctx: Send> Node<(), NodeOutput<()>, (), Ctx> for BackgroundTask // ...
107 /// # {
108 /// # async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<()>, ()> {
109 /// # todo!()
110 /// # }
111 /// # }
112 /// # struct Ctx;
113 /// # impl Fork for Ctx { fn fork(&self) -> Self { Self } }
114 /// # impl SpawnAsync for Ctx {
115 /// # fn spawn<F>(fut: F) -> impl Task<F::Output>
116 /// # where
117 /// # F: Future + Send + 'static,
118 /// # F::Output: Send + 'static,
119 /// # {
120 /// # DummyTask(std::marker::PhantomData)
121 /// # }
122 /// # }
123 /// # struct DummyTask<T>(std::marker::PhantomData<T>);
124 /// # impl<T> Future for DummyTask<T> // ...
125 /// # {
126 /// # type Output = T;
127 /// # fn poll(
128 /// # self: std::pin::Pin<&mut Self>,
129 /// # _: &mut std::task::Context<'_>
130 /// # ) -> std::task::Poll<Self::Output> {
131 /// # std::task::Poll::Pending
132 /// # }
133 /// # }
134 /// # impl<T> Task<T> for DummyTask<T> {
135 /// # fn is_finished(&self) -> bool { false }
136 /// # fn cancel(self) {}
137 /// # }
138 ///
139 /// let detached = Detached::<(), (), Ctx>::new(BackgroundTask);
140 /// ```
141 #[expect(clippy::type_repetition_in_bounds)]
142 pub fn new<NodeType, NodeOutput, NodeError>(
143 node: NodeType,
144 ) -> Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
145 where
146 NodeType: Node<Input, NodeOutput, NodeError, Context>,
147 // Trait bounds for better and nicer errors
148 NodeType: Clone + Send,
149 Input: Clone + Send,
150 {
151 Detached {
152 _iec: std::marker::PhantomData,
153 _node_oe: std::marker::PhantomData,
154 node: std::sync::Arc::new(node),
155 }
156 }
157}
158
159impl<Input, Error, Context, NodeType, NodeOutput, NodeError> Debug
160 for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
161where
162 NodeType: Debug,
163{
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("Detached")
166 .field("node", &self.node)
167 .finish_non_exhaustive()
168 }
169}
170
171impl<Input, Error, Context, NodeType, NodeOutput, NodeError> Clone
172 for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
173{
174 fn clone(&self) -> Self {
175 Self {
176 _iec: std::marker::PhantomData,
177 _node_oe: std::marker::PhantomData,
178 node: self.node.clone(),
179 }
180 }
181}
182
183impl<Input, Error, Context, NodeType, NodeOutput, NodeError>
184 Node<Input, NodeOutputStruct<Input>, Error, Context>
185 for Detached<Input, Error, Context, NodeType, NodeOutput, NodeError>
186where
187 NodeType: Node<Input, NodeOutput, NodeError, Context> + Clone + Send + 'static,
188 Context: SpawnAsync + Fork + Send + 'static,
189 Input: Clone + Send + 'static,
190{
191 fn run(
192 &mut self,
193 input: Input,
194 context: &mut Context,
195 ) -> impl Future<Output = NodeResult<Input, Error>> + Send {
196 let _task = Context::spawn({
197 let mut node = self.node.as_ref().clone();
198 let input = input.clone();
199 let mut context = context.fork();
200 async move {
201 let _ = node.run(input, &mut context).await;
202 }
203 });
204 async { Ok(NodeOutputStruct::Ok(input)) }
205 }
206
207 fn describe(&self) -> Description {
208 Description::new_flow(
209 self,
210 vec![self.node.describe()],
211 vec![Edge::passthrough(), Edge::flow_to_node(0)],
212 )
213 .modify_name(remove_generics_from_name)
214 }
215}
216
217#[cfg(test)]
218mod test {
219 use std::time::{Duration, Instant};
220
221 use super::Detached;
222 use crate::{
223 context::{Fork, test::TokioSpawner},
224 node::{Node, NodeOutput},
225 };
226
227 impl Fork for TokioSpawner {
228 fn fork(&self) -> Self {
229 Self
230 }
231 }
232
233 #[derive(Clone)]
234 pub struct TestNode(tokio::sync::mpsc::Sender<()>);
235
236 impl<I, C> Node<I, (), (), C> for TestNode
237 where
238 I: Send,
239 C: Send,
240 {
241 async fn run(&mut self, _input: I, _context: &mut C) -> Result<(), ()> {
242 tokio::time::sleep(Duration::from_millis(20)).await;
243 self.0.send(()).await.unwrap();
244 Err(())
245 }
246 }
247
248 #[tokio::test]
249 async fn test_flow() {
250 let (sender, mut receiver) = tokio::sync::mpsc::channel(5);
251 let mut ctx = TokioSpawner;
252 let mut flow = Detached::<_, (), _>::new(TestNode(sender));
253
254 let sleep = tokio::time::sleep(Duration::from_millis(50));
255 tokio::pin!(sleep);
256 let start = Instant::now();
257 let res = flow.run(3u8, &mut ctx).await;
258 let flow_end = Instant::now();
259
260 tokio::select! {
261 _ = receiver.recv() => {}
262 _ = &mut sleep => {
263 panic!("timeout");
264 }
265 };
266 let end = Instant::now();
267
268 let flow_took = flow_end.duration_since(start);
269 let node_took = end.duration_since(start);
270 println!("flow_took: {flow_took:?}");
271 println!("node_took: {node_took:?}");
272 assert_eq!(res, Ok(NodeOutput::Ok(3)));
273 assert!(flow_took.as_millis() < 1);
274 assert!(node_took.as_millis() > 15);
275 }
276}