Skip to main content

operese_dagx/
task.rs

1//! Task trait and implementations.
2//!
3//! This module defines the core Task trait that all DAG nodes must implement,
4//! along with convenience functions for creating tasks from closures.
5
6use std::{any::Any, future::Future, marker::PhantomData, slice::Iter, sync::Arc};
7
8/// A unit of async work with typed inputs and outputs.
9///
10/// Use the [`crate::task`] macro to implement this trait. The macro automatically
11/// derives `Input` and `Output` types from your `run()` method signature and generates
12/// extraction logic in `run()`.
13///
14/// **Custom types work automatically!** As long as your output type implements
15/// `Send + Sync + 'static`, the macro handles everything.
16///
17/// **Note:** When implementing this trait manually for a task with a single dependency, the Input type must be (T,) instead of T.
18///
19/// # Input Patterns
20///
21/// - `()`: No dependencies (source task)
22/// - `&T`: Single dependency
23/// - `&A, &B, &C, ...`: Multiple dependencies (up to 8, order matters)
24///
25/// # Output Wrapping
26///
27/// **Task outputs are automatically wrapped in `Arc<T>` internally** for efficient fan-out.
28/// You define `Output = T`, but the framework wraps it in `Arc<T>` before sending to dependents.
29/// This enables O(1) sharing for 1:N dependencies - Arc is cloned (cheap), not your data.
30///
31/// Downstream tasks receive `&T` after ExtractInput extracts from the Arc. This is transparent
32/// to your task code - you work with `T`, the framework handles Arc wrapping/unwrapping.
33pub trait Task<Input>: Send
34where
35    Input: Send + Sync + 'static,
36{
37    type Output: Send + Sync + 'static; // Sync required for Arc-wrapping and cross-thread sharing
38
39    fn run(self, input: TaskInput<Input>) -> impl Future<Output = Self::Output> + Send;
40}
41
42/// Linear type providing a [`Task`] access to its dependencies.
43///
44/// The framework ensures that all dependencies will be present when this task runs, and
45/// can be retrieved with [`TaskInput::next`]. The nth call to `next` consumes this TaskInput instances and returns a
46/// (&T, TaskInput) tuple with the nth dependency specified in the call to [`crate::TaskBuilder::depends_on`] and a
47/// TaskInput with the remaining dependencies.
48///
49/// No dependencies can be retrieved from an empty TaskInput (Input = ()).
50pub struct TaskInput<'inputs, Input: Send> {
51    inputs: Iter<'inputs, Arc<dyn Any + Send + Sync + 'static>>,
52    phantom: PhantomData<Input>,
53}
54
55impl<'inputs, Input: Send> TaskInput<'inputs, Input> {
56    pub(crate) fn new(inputs: Iter<'inputs, Arc<dyn Any + Send + Sync + 'static>>) -> Self {
57        Self {
58            inputs,
59            phantom: PhantomData,
60        }
61    }
62}
63
64macro_rules! impl_task_input {
65    ($First:ident $(, $T:ident)*) => {
66        impl<'inputs, $First: 'static + Send, $($T: 'static + Send),*> TaskInput<'inputs, ($First, $($T,)*)> {
67            /// Get a reference to the next dependency from this TaskInput, and a [`TaskInput`] instance with the remaining dependencies.
68            #[must_use]
69            pub fn next(mut self) -> (&'inputs $First, TaskInput<'inputs, ($($T,)*)>) {
70                let value = self.inputs.next().and_then(|value| value.downcast_ref()).unwrap();
71                let next_inputs = TaskInput {
72                    inputs: self.inputs,
73                    phantom: PhantomData,
74                };
75                (value, next_inputs)
76            }
77        }
78    };
79}
80
81impl_task_input!(A);
82impl_task_input!(A, B);
83impl_task_input!(A, B, C);
84impl_task_input!(A, B, C, D);
85impl_task_input!(A, B, C, D, E);
86impl_task_input!(A, B, C, D, E, F);
87impl_task_input!(A, B, C, D, E, F, G);
88impl_task_input!(A, B, C, D, E, F, G, H);
89
90#[cfg(test)]
91mod tests;