operese-dagx 0.4.1

A minimal, type-safe, runtime-agnostic async DAG (Directed Acyclic Graph) executor with compile-time cycle prevention and true parallel execution
Documentation
//! Internal node types for DAG execution.
//!
//! Provides type erasure to store tasks with different types in a single collection.
//! The public API remains fully type-safe; type erasure is purely an internal implementation detail.
//!
//! - **TypedNode\<T\>**: Stores a task with full type information
//! - **ExecutableNode**: Trait for executing type-erased tasks

use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;

use crate::error::DagError;
use crate::task::{Task, TaskInput};

/// Type-erased async execution result (Arc-wrapped for efficient fanout)
type ExecuteFuture =
    Pin<Box<dyn Future<Output = Result<Arc<dyn Any + Send + Sync>, DagError>> + Send>>;

/// Internal trait for executing heterogeneous tasks.
///
/// This trait provides type erasure necessary for storing different task types in a single
/// collection. While the public API (Task, Handle, Node) is fully type-safe, internally we
/// need dynamic dispatch to execute tasks with different input/output types.
///
/// This is the ONLY place where type erasure occurs - tasks are created with full type
/// information and only erased at this trait boundary.
pub(crate) trait ExecutableNode: Send {
    /// Execute the task with type-erased inputs and output
    ///
    /// The TypedNode implementation knows the concrete types and can safely downcast.
    /// Consumes the node and returns the output value.
    fn execute_with_deps(
        self: Box<Self>,
        dependencies: Vec<Arc<dyn Any + Send + Sync>>,
    ) -> ExecuteFuture;
}

/// Fully-typed node storage for a single task.
///
/// Stores a task of type T. Implements ExecutableNode for type erasure,
/// allowing heterogeneous tasks to be stored together.
///
/// # Ownership Model
///
/// The task is owned directly (no Mutex needed) and consumed during execution.
/// Each task executes exactly once, taking ownership and producing an output.
///
/// The output is also returned from execute_with_deps to be passed to dependents and returned as an output
pub(crate) struct TypedNode<Input, T>
where
    Input: Send + Sync + 'static,
    T: Task<Input> + 'static,
{
    pub(crate) task: T,
    phantom: PhantomData<Input>,
}

impl<Input, T> TypedNode<Input, T>
where
    T: Task<Input>,
    Input: Send + Sync + 'static,
{
    pub(crate) fn new(task: T) -> Self {
        Self {
            task,
            phantom: PhantomData,
        }
    }
}

impl<Input, T> ExecutableNode for TypedNode<Input, T>
where
    T: Task<Input>,
    Input: Send + Sync + 'static,
{
    fn execute_with_deps(
        self: Box<Self>,
        dependencies: Vec<Arc<dyn Any + Send + Sync>>,
    ) -> ExecuteFuture {
        Box::pin(async move {
            // Use the task's run method which has inline extraction logic
            // generated by the #[task] macro.
            let output = self.task.run(TaskInput::new(dependencies.iter())).await;

            // Arc Wrapping Strategy:
            // Wrap output in Arc ONCE for efficient fan-out sharing.
            // Downstream tasks receive &T after extraction from Arc.
            let arc_output = Arc::new(output);

            // Return Arc-wrapped output (for storage by runner)
            Ok(arc_output as Arc<dyn Any + Send + Sync>)
        })
    }
}

#[cfg(test)]
mod tests;