Skip to main content

operese_dagx/
lib.rs

1//! Async DAG Task Runner
2//!
3//! A minimal, type-safe, runtime-agnostic DAG (Directed Acyclic Graph) executor with
4//! compile-time dependency validation and optimal parallel execution.
5//!
6//! # Features
7//!
8//! - **Compile-time cycle prevention**: The type system makes cycles **impossible**—no runtime
9//!   cycle detection needed! See [the repository documentation](https://github.com/swaits/dagx/blob/main/docs/CYCLE_PREVENTION.md) for a detailed explanation.
10//! - **Compile-time type safety**: Dependencies are validated at compile time through the type
11//!   system. The public API is fully type-safe with no runtime type errors. Internal execution
12//!   uses type erasure for heterogeneous task storage, but this is never exposed to users.
13//! - **Runtime-agnostic**: Works with any async runtime (Tokio, smol, Embassy, etc.)
14//! - **Optimal execution**: Topological scheduling with maximum safe parallelism
15//!
16//! # Quick Start
17//!
18//! ```no_run
19//! use operese_dagx::{task, DagRunner, Task, TaskHandle};
20//!
21//! // Source task with read-only state (tuple struct)
22//! struct Value(i32);
23//!
24//! #[task]
25//! impl Value {
26//!     async fn run(&self) -> i32 { self.0 }  // Read-only: use &self
27//! }
28//!
29//! // Stateless task (unit struct)
30//! struct Add;
31//!
32//! #[task]
33//! impl Add {
34//!     async fn run(a: &i32, b: &i32) -> i32 { a + b }  // No self needed!
35//! }
36//!
37//! # async {
38//! let mut dag = DagRunner::new();
39//!
40//! // Add source tasks
41//! let x = dag.add_task(Value(2));
42//! let y = dag.add_task(Value(3));
43//!
44//! // Add task with dependencies
45//! let sum = dag.add_task(Add).depends_on((&x, &y));
46//!
47//! // Execute and retrieve results
48//!let mut output = dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await.unwrap();
49//! assert_eq!(output.get(sum), 5);
50//! # };
51//! ```
52//!
53//! # Implementation Notes
54//!
55//! ## Inline Execution Fast-Path
56//!
57//! dagx automatically optimizes execution for sequential workloads using an inline fast-path.
58//! When a layer contains only a single task (common in deep chains and linear pipelines),
59//! that task executes inline rather than being spawned. This eliminates spawning overhead,
60//! and context switching.
61//!
62//! **Panic handling**: To maintain consistent behavior between runtimes,
63//! panics in tasks are caught using `FutureExt::catch_unwind()` and converted to
64//! `DagError::TaskPanicked`. This matches the behavior of async runtimes like Tokio.
65//!
66//! **What this means for you**:
67//! - Tasks behave identically whether executed inline or spawned
68//! - Panics become errors regardless of execution path
69//! - Sequential workloads see 10-100x performance improvements
70//! - The optimization is completely transparent - no code changes needed
71//!
72//! **When inline execution happens**:
73//! - Any layer where `layer.len() == 1` after topological ordering
74//!
75//! ## Dependency Limits
76//!
77//! dagx supports up to 8 dependencies per task. If you need more than 8 dependencies:
78//! 1. Group related inputs into a struct
79//! 2. Use intermediate aggregation tasks
80//! 3. Consider if 8+ dependencies indicates a design issue
81//!
82//! # Core Concepts
83//!
84//! ## Task
85//!
86//! A [`Task`] is a unit of async work with typed inputs and outputs. Use the `#[task]` macro to generate an implementation.
87//!
88//! ### Task Patterns
89//!
90//! dagx supports three task patterns based on state requirements:
91//!
92//! **1. Stateless** - No state (unit struct, no `self` parameter):
93//!
94//! ```
95//! use operese_dagx::{task, Task};
96//!
97//! struct Add;
98//!
99//! #[task]
100//! impl Add {
101//!     async fn run(a: &i32, b: &i32) -> i32 { a + b }
102//! }
103//! ```
104//!
105//! **2. Read-only state** - Immutable access (use `&self`):
106//!
107//! ```
108//! use operese_dagx::{task, Task};
109//!
110//! struct Scale(i32);
111//!
112//! #[task]
113//! impl Scale {
114//!     async fn run(&self, input: &i32) -> i32 {
115//!         input * self.0  // Read-only access
116//!     }
117//! }
118//! ```
119//!
120//! **3. Mutable state** - State modification (use `&mut self`):
121//!
122//! ```
123//! use operese_dagx::{task, Task};
124//!
125//! struct Counter(i32);
126//!
127//! #[task]
128//! impl Counter {
129//!     async fn run(&mut self, input: &i32) -> i32 {
130//!         self.0 += input;  // Modifies state
131//!         self.0
132//!     }
133//! }
134//! ```
135//!
136//! ## DagRunner
137//!
138//! The [`DagRunner`] orchestrates task execution. Add tasks with [`DagRunner::add_task`],
139//! wire dependencies with [`TaskBuilder::depends_on`], then run everything with [`DagRunner::run`].
140//!
141//! ## TaskHandle
142//!
143//! A [`TaskHandle<T>`] is a typed, opaque reference to a task's output. Use it to:
144//! - Wire dependencies between tasks
145//! - Retrieve results after execution with [`DagOutput::get`]
146//!
147//! ## Dependency Patterns
148//!
149//! dagx supports three dependency patterns:
150//!
151//! ### No Dependencies (Source Tasks)
152//!
153//! Tasks with no dependencies return a [`TaskHandle`] directly and don't call `depends_on()`:
154//!
155//! ```no_run
156//! # use operese_dagx::{task, DagRunner, Task};
157//! # struct Value(i32);
158//! # #[task]
159//! # impl Value {
160//! #     async fn run(&self) -> i32 { self.0 }
161//! # }
162//! # async {
163//! # let mut dag = DagRunner::new();
164//! let source = dag.add_task(Value(42));
165//! # };
166//! ```
167//!
168//! ### Single Dependency
169//!
170//! Tasks with a single dependency receive a reference to that value:
171//!
172//! ```no_run
173//! # use operese_dagx::{task, DagRunner, Task};
174//! # struct Value(i32);
175//! # #[task]
176//! # impl Value {
177//! #     async fn run(&self) -> i32 { self.0 }
178//! # }
179//! struct Double;
180//! #[task]
181//! impl Double {
182//!     async fn run(input: &i32) -> i32 { input * 2 }
183//! }
184//! # async {
185//! # let mut dag = DagRunner::new();
186//! # let source = dag.add_task(Value(42));
187//! let doubled = dag.add_task(Double).depends_on(&source);
188//! # };
189//! ```
190//!
191//! To manually implement [`Task`] for a single-dependency type, you must implement [`Task<(T,)>`], not [`Task<T>`].
192//!
193//! ### Multiple Dependencies
194//!
195//! Tasks with multiple dependencies receive separate reference parameters (order matters!):
196//!
197//! ```no_run
198//! # use operese_dagx::{task, DagRunner, Task};
199//! # struct Value(i32);
200//! # #[task]
201//! # impl Value {
202//! #     async fn run(&self) -> i32 { self.0 }
203//! # }
204//! struct Add;
205//! #[task]
206//! impl Add {
207//!     async fn run(a: &i32, b: &i32) -> i32 { a + b }
208//! }
209//! # async {
210//! # let mut dag = DagRunner::new();
211//! # let x = dag.add_task(Value(2));
212//! # let y = dag.add_task(Value(3));
213//! let sum = dag.add_task(Add).depends_on((&x, &y));
214//! # };
215//! ```
216//!
217//! # Examples
218//!
219//! ## Fan-out Pattern (1 → n)
220//!
221//! One task produces a value consumed by multiple downstream tasks:
222//!
223//! ```no_run
224//! use operese_dagx::{task, DagRunner, Task};
225//!
226//!
227//! // Source task (tuple struct)
228//! struct Value(i32);
229//!
230//! #[task]
231//! impl Value {
232//!     async fn run(&self) -> i32 { self.0 }
233//! }
234//!
235//! // Stateful task (tuple struct)
236//! struct Add(i32);
237//!
238//! #[task]
239//! impl Add {
240//!     async fn run(&self, input: &i32) -> i32 { input + self.0 }
241//! }
242//!
243//! // Stateful task (tuple struct)
244//! struct Scale(i32);
245//!
246//! #[task]
247//! impl Scale {
248//!     async fn run(&self, input: &i32) -> i32 { input * self.0 }
249//! }
250//!
251//! # async {
252//! let mut dag = DagRunner::new();
253//!
254//! let base = dag.add_task(Value(10));
255//! let plus1 = dag.add_task(Add(1)).depends_on(&base);
256//! let times2 = dag.add_task(Scale(2)).depends_on(&base);
257//!
258//!let mut output = dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await.unwrap();
259//!
260//! assert_eq!(output.get(plus1), 11);
261//! assert_eq!(output.get(times2), 20);
262//! # };
263//! ```
264//!
265//! ## Fan-in Pattern (m → 1)
266//!
267//! Multiple tasks produce values consumed by a single downstream task:
268//!
269//! ```no_run
270//! use operese_dagx::{task, DagRunner, Task};
271//!
272//!
273//! // Source task for String (tuple struct)
274//! struct Name(String);
275//!
276//! #[task]
277//! impl Name {
278//!     async fn run(&self) -> String { self.0.clone() }
279//! }
280//!
281//! // Source task for i32 (tuple struct)
282//! struct Age(i32);
283//!
284//! #[task]
285//! impl Age {
286//!     async fn run(&self) -> i32 { self.0 }
287//! }
288//!
289//! // Source task for bool (tuple struct)
290//! struct Active(bool);
291//!
292//! #[task]
293//! impl Active {
294//!     async fn run(&self) -> bool { self.0 }
295//! }
296//!
297//! // Stateless formatter (unit struct)
298//! struct FormatUser;
299//!
300//! #[task]
301//! impl FormatUser {
302//!     async fn run(n: &String, a: &i32, f: &bool) -> String {
303//!         format!("User: {n}, Age: {a}, Active: {f}")
304//!     }
305//! }
306//!
307//! # async {
308//! let mut dag = DagRunner::new();
309//!
310//! let name = dag.add_task(Name("Alice".to_string()));
311//! let age = dag.add_task(Age(30));
312//! let active = dag.add_task(Active(true));
313//! let result = dag.add_task(FormatUser).depends_on((&name, &age, &active));
314//!
315//!let mut output = dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await.unwrap();
316//!
317//! assert_eq!(output.get(result), "User: Alice, Age: 30, Active: true");
318//! # };
319//! ```
320//!
321//! # Runtime Agnostic
322//!
323//! dagx works with any async runtime. The library has been tested with:
324//! - **Tokio** - Most popular async runtime
325//! - **smol** - Lightweight async runtime
326//!
327//! Examples with different runtimes:
328//!
329//! ```ignore
330//! // With Tokio
331//! #[tokio::main]
332//! async fn main() {
333//!     let mut dag = DagRunner::new();
334//!     // ... build DAG
335//!     let result = dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await;
336//! }
337//!
338//! // With smol
339//! fn main() {
340//!     smol::block_on(async {
341//!         let mut dag = DagRunner::new();
342//!         // ... build and run DAG
343//!         let result = dag.run(|fut| smol::spawn(fut)).await;
344//!     });
345//! }
346//! ```
347//!
348//! # Error Handling
349//!
350//! dagx uses [`DagResult<T>`] (an alias for `Result<T, DagError>`) for operations that
351//! can fail:
352//!
353//! - [`DagRunner::run`] returns `DagResult<DagOutput>` and can fail if tasks panic or if multiple
354//!   concurrent runs are attempted
355//! - [`DagOutput::get`] returns `DagResult<T>` and can fail if the task hasn't executed or
356//!   the handle is invalid
357//!
358//! ```no_run
359//! # use operese_dagx::{task, DagRunner, Task};
360//! #
361//! # struct Value(i32);
362//! # #[task]
363//! # impl Value {
364//! #     async fn run(&self) -> i32 { self.0 }
365//! # }
366//! # async {
367//! # let mut dag = DagRunner::new();
368//! # let node = dag.add_task(Value(42));
369//! // Simple approach with .unwrap()
370//! let mut output = dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await.unwrap();
371//! let result = output.get(node);
372//! # let mut dag = DagRunner::new();
373//!
374//! // Or handle errors explicitly
375//! match dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await {
376//!     Ok(_) => println!("DAG executed successfully"),
377//!     Err(e) => eprintln!("DAG execution failed: {}", e),
378//! }
379//! # };
380//! ```
381//! # API Design Principles
382//!
383//! dagx follows these API design principles:
384//!
385//! 1. **Type Safety**: Dependencies validated at compile time via type-state pattern
386//! 2. **Builder Pattern**: Fluent interface with `add_task().depends_on()`
387//! 3. **Error Handling**: All fallible operations return `DagResult<T>`
388//! 4. **Minimal Surface**: Small, focused API
389//!
390//! # Performance Characteristics
391//!
392//! dagx is designed for minimal overhead and optimal parallel execution.
393//!
394//! See the [repository](https://github.com/swaits/dagx) for up-to-date benchmarks.
395//!
396//! ## Performance Tips
397//!
398//! ### Automatic Arc Wrapping (No Manual Arc Needed!)
399//!
400//! **Task outputs are automatically wrapped in `Arc<T>` internally** for efficient fan-out patterns.
401//! You output `T`, the framework handles the Arc wrapping:
402//!
403//! ```
404//! # use operese_dagx::{task, DagRunner, Task};
405//! #
406//!
407//! // ✅ CORRECT: Just output Vec<String>, framework wraps in Arc internally
408//! struct FetchData;
409//! #[task]
410//! impl FetchData {
411//!     async fn run() -> Vec<String> {
412//!         vec!["data".to_string(); 10_000]
413//!     }
414//! }
415//!
416//! // Downstream tasks receive &Vec<String> as normal
417//! // Behind the scenes: Arc<Vec<String>> is cloned cheaply, then a reference to the inner Vec is extracted
418//! struct ProcessData;
419//! #[task]
420//! impl ProcessData {
421//!     async fn run(data: &Vec<String>) -> usize {
422//!         data.len()
423//!     }
424//! }
425//!
426//! # async {
427//! let mut dag = DagRunner::new();
428//! let data = dag.add_task(FetchData);
429//!
430//! // All three tasks get efficient Arc-wrapped sharing automatically
431//! let task1 = dag.add_task(ProcessData).depends_on(&data);
432//! let task2 = dag.add_task(ProcessData).depends_on(&data);
433//! let task3 = dag.add_task(ProcessData).depends_on(&data);
434//!
435//!let mut output = dag.run(|fut| async move { tokio::spawn(fut).await.unwrap() }).await.unwrap();
436//! # };
437//! ```
438//!
439//! **How it works:**
440//! - Your task outputs `T` (e.g., `Vec<String>`)
441//! - Framework wraps it in `Arc<T>` internally
442//! - For fan-out (1→N), Arc is cloned N times
443//! - Each downstream task receives `&T` after extracting from Arc
444//!
445//! ### Other Tips
446//!
447//! 1. **Minimize task count**: Combine small operations into larger tasks
448//! 2. **Use appropriate granularity**: Don't create tasks for trivial work (< 1µs)
449//!
450//! Run `cargo bench` to see comprehensive benchmarks including basic operations, scaling
451//! characteristics, common patterns (fan-out, diamond), and realistic workloads.
452//!
453//! # Optional Tracing Support
454//!
455//! dagx provides optional observability through the `tracing` crate. It can be enabled with the `tracing` feature flag.
456//!
457//! ## Log Levels
458//!
459//! - **INFO**: DAG execution start/completion
460//! - **DEBUG**: Task additions, dependency wiring, layer computation
461//! - **TRACE**: Individual task execution (inline vs spawned), detailed execution flow
462//! - **ERROR**: Task panics, concurrent execution attempts
463//!
464//! Control log level with the `RUST_LOG` environment variable:
465//!
466//! ```bash
467//! RUST_LOG=dagx=info  cargo run    # High-level execution info
468//! RUST_LOG=dagx=debug cargo run    # Task and layer details
469//! RUST_LOG=dagx=trace cargo run    # All execution details
470//! ```
471
472extern crate self as operese_dagx;
473
474mod builder;
475mod deps;
476mod error;
477mod node;
478mod output;
479mod runner;
480mod task;
481
482// Public re-exports
483pub use builder::{TaskBuilder, TaskHandle};
484pub use error::{DagError, DagResult};
485pub use output::DagOutput;
486pub use runner::DagRunner;
487pub use task::{Task, TaskInput};
488
489// Re-export the procedural macro
490#[cfg(feature = "derive")]
491pub use operese_dagx_macros::task;