node_flow/context/
traits.rs

1/// The `Fork` trait is used for creating a new instances a context from an existing one.
2///
3/// `Fork` is used in a flow where context must sent into branches.
4/// For example, when spawning parallel tasks that each require their own context.
5///
6/// # Examples
7/// ```
8/// use node_flow::context::Fork;
9///
10/// #[derive(Clone)]
11/// struct ExampleContext {
12///     id: usize,
13/// }
14///
15/// impl Fork for ExampleContext {
16///     fn fork(&self) -> Self {
17///         Self { id: self.id + 1 }
18///     }
19/// }
20///
21/// let ctx = ExampleContext { id: 0 };
22/// let forked = ctx.fork();
23/// assert_eq!(forked.id, 1);
24/// ```
25pub trait Fork {
26    /// Creates a forked instance of the implementor.
27    #[must_use]
28    fn fork(&self) -> Self;
29}
30
31/// The `Update` trait is used for updating the state of a context from another.
32///
33/// `Update` allows incremental merging or synchronization between two instances of the same type.
34///
35/// # Examples
36/// ```
37/// use node_flow::context::Update;
38///
39/// struct Stats {
40///     count: usize,
41/// }
42///
43/// impl Update for Stats {
44///     fn update_from(&mut self, other: Self) {
45///         self.count += other.count;
46///     }
47/// }
48///
49/// let mut a = Stats { count: 5 };
50/// let b = Stats { count: 3 };
51/// a.update_from(b);
52/// assert_eq!(a.count, 8);
53/// ```
54pub trait Update {
55    /// Merges or synchronizes the state of `other` into `self`.
56    ///
57    /// The specifics depend on the implementor.
58    /// For example it could be implemented as additive merging,
59    /// overwriting fields, or some conflict resolution.
60    fn update_from(&mut self, other: Self);
61}
62
63/// The `Join` trait is used for merging multiple instances of a context into one.
64///
65/// `Join` is typically used after parallel execution, where several contexts
66/// (or partial results) must be combined back into a single instance.
67///
68/// This trait complements [`Fork`], enabling a *fork-join* lifecycle for contexts.
69///
70/// # Examples
71/// ```
72/// use node_flow::context::Join;
73///
74/// struct Sum(usize);
75///
76/// impl Join for Sum {
77///     fn join(&mut self, others: Box<[Self]>) {
78///         for other in others {
79///             self.0 += other.0;
80///         }
81///     }
82/// }
83///
84/// let mut total = Sum(5);
85/// total.join(Box::new([Sum(2), Sum(3)]));
86/// assert_eq!(total.0, 10);
87/// ```
88pub trait Join: Sized {
89    /// Joins the state of multiple `others` into this instance.
90    ///
91    /// Implementors define how merging should occur.
92    /// For example it could be summation, set unions or aggregation.
93    fn join(&mut self, others: Box<[Self]>);
94}
95
96/// The `Task` trait represents an asynchronous task.
97///
98/// `Task` is an abstraction over a specific task in some async runtime like
99/// [`tokio::task::JoinHandle<T>`](https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html).
100///
101/// # Examples
102/// ```
103/// use node_flow::context::Task;
104/// use std::future::Future;
105///
106/// struct DummyTask;
107///
108/// impl Future for DummyTask {
109///     type Output = u8;
110///     fn poll(
111///         self: std::pin::Pin<&mut Self>,
112///         _: &mut std::task::Context<'_>
113///     ) -> std::task::Poll<Self::Output> {
114///         std::task::Poll::Ready(5)
115///     }
116/// }
117///
118/// impl Task<u8> for DummyTask {
119///     fn is_finished(&self) -> bool { true }
120///     fn cancel(self) {}
121/// }
122/// ```
123pub trait Task<T>: Future<Output = T> {
124    /// Returns `true` if the task has finished.
125    fn is_finished(&self) -> bool;
126    /// Cancels the task if it is still running.
127    ///
128    /// The implementation should attempt to stop or drop any ongoing work.
129    /// Be aware that tasks spawned using [`SpawnSync::spawn_blocking`] may or may not be canceled,
130    /// because they are not async (it all depends on the implementor).
131    fn cancel(self);
132}
133
134/// The `SpawnAsync` trait provides an interface for spawning asynchronous tasks on a runtime or executor.
135///
136/// This trait abstracts over asynchronous task execution environments
137/// (such as Tokio, smol, or a custom thread pool).
138/// It returns a handle implementing the [`Task`] trait.
139///
140/// # Examples
141/// ```
142/// use node_flow::context::{SpawnAsync, Task};
143/// use std::future::Future;
144///
145/// struct MyRuntime;
146/// struct DummyTask<T>(T);
147/// impl<T> Future for DummyTask<T> // ...
148/// # {
149/// #     type Output = T;
150/// #     fn poll(
151/// #         self: std::pin::Pin<&mut Self>,
152/// #         _: &mut std::task::Context<'_>
153/// #     ) -> std::task::Poll<Self::Output> {
154/// #         todo!()
155/// #     }
156/// # }
157/// impl<T> Task<T> for DummyTask<T> // ...
158/// # {
159/// #     fn is_finished(&self) -> bool { todo!() }
160/// #     fn cancel(self) {}
161/// # }
162///
163/// impl SpawnAsync for MyRuntime {
164///     fn spawn<F>(fut: F) -> impl Task<F::Output>
165///     where
166///         F: Future + Send + 'static,
167///         F::Output: Send + 'static,
168///     {
169///         // Example stub (replace with actual runtime call)
170///         DummyTask(todo!())
171///     }
172/// }
173/// ```
174pub trait SpawnAsync {
175    /// Spawns an asynchronous concurrent task.
176    ///
177    /// The task must be `Send + 'static`, as it may execute on another thread.
178    ///
179    /// # Returns
180    /// A task handle implementing [`Task`] trait.
181    fn spawn<F>(fut: F) -> impl Task<F::Output>
182    where
183        F: Future + Send + 'static,
184        F::Output: Send + 'static;
185}
186
187/// The `SpawnSync` trait provides an interface for spawning **blocking** (synchronous) tasks.
188///
189/// This trait is the synchronous version of the [`SpawnAsync`] trait,
190/// allowing potentially long-running CPU-bound or blocking operations to be executed.
191/// It returns a handle implementing the [`Task`] trait.
192///
193/// # Examples
194/// ```
195/// use node_flow::context::{SpawnSync, Task};
196///
197/// struct MyRuntime;
198/// struct DummyTask<T>(T);
199/// impl<T> Future for DummyTask<T> // ...
200/// # {
201/// #     type Output = T;
202/// #     fn poll(
203/// #         self: std::pin::Pin<&mut Self>,
204/// #         _: &mut std::task::Context<'_>
205/// #     ) -> std::task::Poll<Self::Output> {
206/// #         todo!()
207/// #     }
208/// # }
209/// impl<T> Task<T> for DummyTask<T> // ...
210/// # {
211/// #     fn is_finished(&self) -> bool { todo!() }
212/// #     fn cancel(self) {}
213/// # }
214///
215/// impl SpawnSync for MyRuntime {
216///     fn spawn_blocking<F, O>(func: F) -> impl Task<O>
217///     where
218///         F: Fn() -> O + Send + 'static,
219///         O: Send + 'static,
220///     {
221///         // Example stub (replace with actual runtime call)
222///         DummyTask(func())
223///     }
224/// }
225/// ```
226pub trait SpawnSync {
227    /// Spawns a blocking (synchronous) function in a background.
228    ///
229    /// The function `func` is executed on a separate worker thread. The returned
230    /// task can be awaited or canceled, depending on the runtime implementation.
231    ///
232    /// # Type Parameters
233    /// - `F`: A closure or function that produces an output of type `O`.
234    /// - `O`: The output type of the blocking computation.
235    ///
236    /// # Returns
237    /// A task handle implementing [`Task<O>`] trait.
238    fn spawn_blocking<F, O>(func: F) -> impl Task<O>
239    where
240        F: Fn() -> O + Send + 'static,
241        O: Send + 'static;
242}
243
244#[cfg(test)]
245pub(crate) mod test {
246    use std::time::{Duration, Instant};
247
248    use super::{SpawnAsync, SpawnSync, Task};
249
250    mod tokio_ {
251        use super::{SpawnAsync, SpawnSync, Task};
252        use std::pin::Pin;
253
254        pub struct TokioSpawner;
255        struct TokioTask<T>(tokio::task::JoinHandle<T>);
256
257        impl<T> Future for TokioTask<T> {
258            type Output = T;
259
260            fn poll(
261                self: std::pin::Pin<&mut Self>,
262                cx: &mut std::task::Context<'_>,
263            ) -> std::task::Poll<Self::Output> {
264                let task = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) };
265                task.poll(cx).map(|r| r.unwrap())
266            }
267        }
268
269        impl<T> Task<T> for TokioTask<T> {
270            fn is_finished(&self) -> bool {
271                self.0.is_finished()
272            }
273
274            fn cancel(self) {
275                self.0.abort();
276            }
277        }
278
279        impl SpawnAsync for TokioSpawner {
280            fn spawn<F>(fut: F) -> impl super::Task<F::Output>
281            where
282                F: Future + Send + 'static,
283                F::Output: Send + 'static,
284            {
285                TokioTask(tokio::spawn(fut))
286            }
287        }
288
289        impl SpawnSync for TokioSpawner {
290            fn spawn_blocking<F, O>(func: F) -> impl Task<O>
291            where
292                F: Fn() -> O + Send + 'static,
293                O: Send + 'static,
294            {
295                TokioTask(tokio::task::spawn_blocking(func))
296            }
297        }
298    }
299
300    mod none {
301        use super::{SpawnAsync, SpawnSync, Task};
302        use futures_util::future::MaybeDone;
303        use std::pin::Pin;
304
305        pub struct NoneSpawner;
306
307        struct NoneTask<F>(MaybeDone<F>)
308        where
309            F: Future;
310
311        impl<T, F> Future for NoneTask<F>
312        where
313            F: Future<Output = T>,
314        {
315            type Output = T;
316
317            fn poll(
318                self: std::pin::Pin<&mut Self>,
319                cx: &mut std::task::Context<'_>,
320            ) -> std::task::Poll<Self::Output> {
321                let mut task = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) };
322                task.as_mut().poll(cx).map(|_| task.take_output().unwrap())
323            }
324        }
325
326        impl<T, F> Task<T> for NoneTask<F>
327        where
328            F: Future<Output = T>,
329        {
330            fn is_finished(&self) -> bool {
331                matches!(self.0, MaybeDone::Done(_))
332            }
333
334            fn cancel(self) {}
335        }
336
337        impl SpawnAsync for NoneSpawner {
338            fn spawn<F>(fut: F) -> impl super::Task<F::Output>
339            where
340                F: Future + Send + 'static,
341                F::Output: Send + 'static,
342            {
343                NoneTask(MaybeDone::Future(fut))
344            }
345        }
346
347        impl SpawnSync for NoneSpawner {
348            fn spawn_blocking<F, O>(func: F) -> impl Task<O>
349            where
350                F: Fn() -> O + Send + 'static,
351                O: Send + 'static,
352            {
353                NoneTask(MaybeDone::Future(async move { func() }))
354            }
355        }
356    }
357
358    pub use none::NoneSpawner;
359    pub use tokio_::TokioSpawner;
360
361    async fn test<T>(spawn_fn: impl Fn(u64) -> T) -> (u64, u64)
362    where
363        T: Task<()>,
364    {
365        let mut acc = Vec::new();
366        let mut time_sum = 0;
367
368        let start = Instant::now();
369
370        for i in 0..15 {
371            let delay = i % 5 + 3;
372            time_sum += delay;
373            acc.push(spawn_fn(i));
374        }
375        for f in acc {
376            f.await;
377        }
378
379        let end = Instant::now();
380        let took = end.duration_since(start).as_millis() as u64;
381        (time_sum, took)
382    }
383
384    async fn test_async<S: SpawnAsync>() -> (u64, u64) {
385        test(|delay| {
386            S::spawn(async move {
387                tokio::time::sleep(Duration::from_millis(delay)).await;
388            })
389        })
390        .await
391    }
392
393    async fn test_sync<S: SpawnSync>() -> (u64, u64) {
394        test(|delay| {
395            S::spawn_blocking(move || {
396                std::thread::sleep(Duration::from_millis(delay));
397            })
398        })
399        .await
400    }
401
402    #[tokio::test]
403    async fn test_async_tokio() {
404        let (time_sum, took) = test_async::<TokioSpawner>().await;
405        println!("time_sum: {time_sum}, took: {took}");
406        assert!(time_sum > took);
407    }
408
409    #[tokio::test]
410    async fn test_async_none() {
411        let (time_sum, took) = test_async::<NoneSpawner>().await;
412        println!("time_sum: {time_sum}, took: {took}");
413        assert!(time_sum <= took);
414    }
415
416    #[tokio::test]
417    async fn test_sync_tokio() {
418        let (time_sum, took) = test_sync::<TokioSpawner>().await;
419        println!("time_sum: {time_sum}, took: {took}");
420        assert!(time_sum > took);
421    }
422
423    #[tokio::test]
424    async fn test_sync_none() {
425        let (time_sum, took) = test_sync::<NoneSpawner>().await;
426        println!("time_sum: {time_sum}, took: {took}");
427        assert!(time_sum <= took);
428    }
429}