node_flow/context/traits.rs
1/// The `Fork` trait is used for creating a new instances a context from an existing one.
2///
3/// `Fork` is used in a flow where context must sent into branches.
4/// For example, when spawning parallel tasks that each require their own context.
5///
6/// # Examples
7/// ```
8/// use node_flow::context::Fork;
9///
10/// #[derive(Clone)]
11/// struct ExampleContext {
12/// id: usize,
13/// }
14///
15/// impl Fork for ExampleContext {
16/// fn fork(&self) -> Self {
17/// Self { id: self.id + 1 }
18/// }
19/// }
20///
21/// let ctx = ExampleContext { id: 0 };
22/// let forked = ctx.fork();
23/// assert_eq!(forked.id, 1);
24/// ```
25pub trait Fork {
26 /// Creates a forked instance of the implementor.
27 #[must_use]
28 fn fork(&self) -> Self;
29}
30
31/// The `Update` trait is used for updating the state of a context from another.
32///
33/// `Update` allows incremental merging or synchronization between two instances of the same type.
34///
35/// # Examples
36/// ```
37/// use node_flow::context::Update;
38///
39/// struct Stats {
40/// count: usize,
41/// }
42///
43/// impl Update for Stats {
44/// fn update_from(&mut self, other: Self) {
45/// self.count += other.count;
46/// }
47/// }
48///
49/// let mut a = Stats { count: 5 };
50/// let b = Stats { count: 3 };
51/// a.update_from(b);
52/// assert_eq!(a.count, 8);
53/// ```
54pub trait Update {
55 /// Merges or synchronizes the state of `other` into `self`.
56 ///
57 /// The specifics depend on the implementor.
58 /// For example it could be implemented as additive merging,
59 /// overwriting fields, or some conflict resolution.
60 fn update_from(&mut self, other: Self);
61}
62
63/// The `Join` trait is used for merging multiple instances of a context into one.
64///
65/// `Join` is typically used after parallel execution, where several contexts
66/// (or partial results) must be combined back into a single instance.
67///
68/// This trait complements [`Fork`], enabling a *fork-join* lifecycle for contexts.
69///
70/// # Examples
71/// ```
72/// use node_flow::context::Join;
73///
74/// struct Sum(usize);
75///
76/// impl Join for Sum {
77/// fn join(&mut self, others: Box<[Self]>) {
78/// for other in others {
79/// self.0 += other.0;
80/// }
81/// }
82/// }
83///
84/// let mut total = Sum(5);
85/// total.join(Box::new([Sum(2), Sum(3)]));
86/// assert_eq!(total.0, 10);
87/// ```
88pub trait Join: Sized {
89 /// Joins the state of multiple `others` into this instance.
90 ///
91 /// Implementors define how merging should occur.
92 /// For example it could be summation, set unions or aggregation.
93 fn join(&mut self, others: Box<[Self]>);
94}
95
96/// The `Task` trait represents an asynchronous task.
97///
98/// `Task` is an abstraction over a specific task in some async runtime like
99/// [`tokio::task::JoinHandle<T>`](https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html).
100///
101/// # Examples
102/// ```
103/// use node_flow::context::Task;
104/// use std::future::Future;
105///
106/// struct DummyTask;
107///
108/// impl Future for DummyTask {
109/// type Output = u8;
110/// fn poll(
111/// self: std::pin::Pin<&mut Self>,
112/// _: &mut std::task::Context<'_>
113/// ) -> std::task::Poll<Self::Output> {
114/// std::task::Poll::Ready(5)
115/// }
116/// }
117///
118/// impl Task<u8> for DummyTask {
119/// fn is_finished(&self) -> bool { true }
120/// fn cancel(self) {}
121/// }
122/// ```
123pub trait Task<T>: Future<Output = T> {
124 /// Returns `true` if the task has finished.
125 fn is_finished(&self) -> bool;
126 /// Cancels the task if it is still running.
127 ///
128 /// The implementation should attempt to stop or drop any ongoing work.
129 /// Be aware that tasks spawned using [`SpawnSync::spawn_blocking`] may or may not be canceled,
130 /// because they are not async (it all depends on the implementor).
131 fn cancel(self);
132}
133
134/// The `SpawnAsync` trait provides an interface for spawning asynchronous tasks on a runtime or executor.
135///
136/// This trait abstracts over asynchronous task execution environments
137/// (such as Tokio, smol, or a custom thread pool).
138/// It returns a handle implementing the [`Task`] trait.
139///
140/// # Examples
141/// ```
142/// use node_flow::context::{SpawnAsync, Task};
143/// use std::future::Future;
144///
145/// struct MyRuntime;
146/// struct DummyTask<T>(T);
147/// impl<T> Future for DummyTask<T> // ...
148/// # {
149/// # type Output = T;
150/// # fn poll(
151/// # self: std::pin::Pin<&mut Self>,
152/// # _: &mut std::task::Context<'_>
153/// # ) -> std::task::Poll<Self::Output> {
154/// # todo!()
155/// # }
156/// # }
157/// impl<T> Task<T> for DummyTask<T> // ...
158/// # {
159/// # fn is_finished(&self) -> bool { todo!() }
160/// # fn cancel(self) {}
161/// # }
162///
163/// impl SpawnAsync for MyRuntime {
164/// fn spawn<F>(fut: F) -> impl Task<F::Output>
165/// where
166/// F: Future + Send + 'static,
167/// F::Output: Send + 'static,
168/// {
169/// // Example stub (replace with actual runtime call)
170/// DummyTask(todo!())
171/// }
172/// }
173/// ```
174pub trait SpawnAsync {
175 /// Spawns an asynchronous concurrent task.
176 ///
177 /// The task must be `Send + 'static`, as it may execute on another thread.
178 ///
179 /// # Returns
180 /// A task handle implementing [`Task`] trait.
181 fn spawn<F>(fut: F) -> impl Task<F::Output>
182 where
183 F: Future + Send + 'static,
184 F::Output: Send + 'static;
185}
186
187/// The `SpawnSync` trait provides an interface for spawning **blocking** (synchronous) tasks.
188///
189/// This trait is the synchronous version of the [`SpawnAsync`] trait,
190/// allowing potentially long-running CPU-bound or blocking operations to be executed.
191/// It returns a handle implementing the [`Task`] trait.
192///
193/// # Examples
194/// ```
195/// use node_flow::context::{SpawnSync, Task};
196///
197/// struct MyRuntime;
198/// struct DummyTask<T>(T);
199/// impl<T> Future for DummyTask<T> // ...
200/// # {
201/// # type Output = T;
202/// # fn poll(
203/// # self: std::pin::Pin<&mut Self>,
204/// # _: &mut std::task::Context<'_>
205/// # ) -> std::task::Poll<Self::Output> {
206/// # todo!()
207/// # }
208/// # }
209/// impl<T> Task<T> for DummyTask<T> // ...
210/// # {
211/// # fn is_finished(&self) -> bool { todo!() }
212/// # fn cancel(self) {}
213/// # }
214///
215/// impl SpawnSync for MyRuntime {
216/// fn spawn_blocking<F, O>(func: F) -> impl Task<O>
217/// where
218/// F: Fn() -> O + Send + 'static,
219/// O: Send + 'static,
220/// {
221/// // Example stub (replace with actual runtime call)
222/// DummyTask(func())
223/// }
224/// }
225/// ```
226pub trait SpawnSync {
227 /// Spawns a blocking (synchronous) function in a background.
228 ///
229 /// The function `func` is executed on a separate worker thread. The returned
230 /// task can be awaited or canceled, depending on the runtime implementation.
231 ///
232 /// # Type Parameters
233 /// - `F`: A closure or function that produces an output of type `O`.
234 /// - `O`: The output type of the blocking computation.
235 ///
236 /// # Returns
237 /// A task handle implementing [`Task<O>`] trait.
238 fn spawn_blocking<F, O>(func: F) -> impl Task<O>
239 where
240 F: Fn() -> O + Send + 'static,
241 O: Send + 'static;
242}
243
244#[cfg(test)]
245pub(crate) mod test {
246 use std::time::{Duration, Instant};
247
248 use super::{SpawnAsync, SpawnSync, Task};
249
250 mod tokio_ {
251 use super::{SpawnAsync, SpawnSync, Task};
252 use std::pin::Pin;
253
254 pub struct TokioSpawner;
255 struct TokioTask<T>(tokio::task::JoinHandle<T>);
256
257 impl<T> Future for TokioTask<T> {
258 type Output = T;
259
260 fn poll(
261 self: std::pin::Pin<&mut Self>,
262 cx: &mut std::task::Context<'_>,
263 ) -> std::task::Poll<Self::Output> {
264 let task = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) };
265 task.poll(cx).map(|r| r.unwrap())
266 }
267 }
268
269 impl<T> Task<T> for TokioTask<T> {
270 fn is_finished(&self) -> bool {
271 self.0.is_finished()
272 }
273
274 fn cancel(self) {
275 self.0.abort();
276 }
277 }
278
279 impl SpawnAsync for TokioSpawner {
280 fn spawn<F>(fut: F) -> impl super::Task<F::Output>
281 where
282 F: Future + Send + 'static,
283 F::Output: Send + 'static,
284 {
285 TokioTask(tokio::spawn(fut))
286 }
287 }
288
289 impl SpawnSync for TokioSpawner {
290 fn spawn_blocking<F, O>(func: F) -> impl Task<O>
291 where
292 F: Fn() -> O + Send + 'static,
293 O: Send + 'static,
294 {
295 TokioTask(tokio::task::spawn_blocking(func))
296 }
297 }
298 }
299
300 mod none {
301 use super::{SpawnAsync, SpawnSync, Task};
302 use futures_util::future::MaybeDone;
303 use std::pin::Pin;
304
305 pub struct NoneSpawner;
306
307 struct NoneTask<F>(MaybeDone<F>)
308 where
309 F: Future;
310
311 impl<T, F> Future for NoneTask<F>
312 where
313 F: Future<Output = T>,
314 {
315 type Output = T;
316
317 fn poll(
318 self: std::pin::Pin<&mut Self>,
319 cx: &mut std::task::Context<'_>,
320 ) -> std::task::Poll<Self::Output> {
321 let mut task = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) };
322 task.as_mut().poll(cx).map(|_| task.take_output().unwrap())
323 }
324 }
325
326 impl<T, F> Task<T> for NoneTask<F>
327 where
328 F: Future<Output = T>,
329 {
330 fn is_finished(&self) -> bool {
331 matches!(self.0, MaybeDone::Done(_))
332 }
333
334 fn cancel(self) {}
335 }
336
337 impl SpawnAsync for NoneSpawner {
338 fn spawn<F>(fut: F) -> impl super::Task<F::Output>
339 where
340 F: Future + Send + 'static,
341 F::Output: Send + 'static,
342 {
343 NoneTask(MaybeDone::Future(fut))
344 }
345 }
346
347 impl SpawnSync for NoneSpawner {
348 fn spawn_blocking<F, O>(func: F) -> impl Task<O>
349 where
350 F: Fn() -> O + Send + 'static,
351 O: Send + 'static,
352 {
353 NoneTask(MaybeDone::Future(async move { func() }))
354 }
355 }
356 }
357
358 pub use none::NoneSpawner;
359 pub use tokio_::TokioSpawner;
360
361 async fn test<T>(spawn_fn: impl Fn(u64) -> T) -> (u64, u64)
362 where
363 T: Task<()>,
364 {
365 let mut acc = Vec::new();
366 let mut time_sum = 0;
367
368 let start = Instant::now();
369
370 for i in 0..15 {
371 let delay = i % 5 + 3;
372 time_sum += delay;
373 acc.push(spawn_fn(i));
374 }
375 for f in acc {
376 f.await;
377 }
378
379 let end = Instant::now();
380 let took = end.duration_since(start).as_millis() as u64;
381 (time_sum, took)
382 }
383
384 async fn test_async<S: SpawnAsync>() -> (u64, u64) {
385 test(|delay| {
386 S::spawn(async move {
387 tokio::time::sleep(Duration::from_millis(delay)).await;
388 })
389 })
390 .await
391 }
392
393 async fn test_sync<S: SpawnSync>() -> (u64, u64) {
394 test(|delay| {
395 S::spawn_blocking(move || {
396 std::thread::sleep(Duration::from_millis(delay));
397 })
398 })
399 .await
400 }
401
402 #[tokio::test]
403 async fn test_async_tokio() {
404 let (time_sum, took) = test_async::<TokioSpawner>().await;
405 println!("time_sum: {time_sum}, took: {took}");
406 assert!(time_sum > took);
407 }
408
409 #[tokio::test]
410 async fn test_async_none() {
411 let (time_sum, took) = test_async::<NoneSpawner>().await;
412 println!("time_sum: {time_sum}, took: {took}");
413 assert!(time_sum <= took);
414 }
415
416 #[tokio::test]
417 async fn test_sync_tokio() {
418 let (time_sum, took) = test_sync::<TokioSpawner>().await;
419 println!("time_sum: {time_sum}, took: {took}");
420 assert!(time_sum > took);
421 }
422
423 #[tokio::test]
424 async fn test_sync_none() {
425 let (time_sum, took) = test_sync::<NoneSpawner>().await;
426 println!("time_sum: {time_sum}, took: {took}");
427 assert!(time_sum <= took);
428 }
429}