dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Concurrent task execution with bounded parallelism, retries and timeouts.

use crate::advanced::{CircuitBreaker, RetryPolicy};
use crate::context::Context;
use crate::error::TaskError;
use crate::metrics::MetricsCollector;
use crate::tasks::{Task, TaskOutput};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;

/// The final result of running one task (after any retries).
#[derive(Debug)]
pub struct TaskResult {
    /// The task id.
    pub id: String,
    /// Total attempts made.
    pub attempts: u32,
    /// Success output or the terminal error.
    pub outcome: Result<TaskOutput, TaskError>,
}

/// Executes tasks concurrently under a global concurrency limit.
///
/// Each task is run on its own Tokio task but must first acquire a permit from
/// a shared [`Semaphore`], which caps how many run at once. The pool also owns
/// the retry loop, per-attempt timeout, and circuit-breaker gating so the
/// executor's scheduling loop stays simple.
pub struct WorkerPool {
    semaphore: Arc<Semaphore>,
    retry: RetryPolicy,
    timeout: Option<Duration>,
    metrics: Arc<MetricsCollector>,
}

impl WorkerPool {
    /// Create a pool allowing `concurrency` simultaneous tasks.
    pub fn new(
        concurrency: usize,
        retry: RetryPolicy,
        timeout: Option<Duration>,
        metrics: Arc<MetricsCollector>,
    ) -> Self {
        WorkerPool {
            semaphore: Arc::new(Semaphore::new(concurrency.max(1))),
            retry,
            timeout,
            metrics,
        }
    }

    /// Permits currently available (i.e. free concurrency slots).
    pub fn available_permits(&self) -> usize {
        self.semaphore.available_permits()
    }

    /// Spawn `task` for execution, returning a handle to its eventual result.
    ///
    /// `breaker`, if supplied, gates execution: when open, the task fails fast
    /// with [`TaskError::CircuitOpen`] without consuming an attempt.
    pub fn spawn(
        &self,
        task: Arc<dyn Task>,
        ctx: Arc<Context>,
        breaker: Option<Arc<CircuitBreaker>>,
    ) -> JoinHandle<TaskResult> {
        let semaphore = self.semaphore.clone();
        let retry = self.retry;
        let timeout = self.timeout;
        let metrics = self.metrics.clone();
        let id = task.id().to_string();

        tokio::spawn(async move {
            // Hold a permit for the entire (possibly multi-attempt) lifetime so
            // concurrency stays bounded even while a task is backing off.
            let _permit = semaphore
                .acquire_owned()
                .await
                .expect("semaphore is never closed");

            metrics.task_started();
            let mut attempts = 0u32;

            loop {
                if ctx.is_cancelled() {
                    return TaskResult {
                        id,
                        attempts,
                        outcome: Err(TaskError::Cancelled),
                    };
                }

                if let Some(ref b) = breaker {
                    if !b.allow_request() {
                        return TaskResult {
                            id: id.clone(),
                            attempts,
                            outcome: Err(TaskError::CircuitOpen(id.clone())),
                        };
                    }
                }

                attempts += 1;
                let result = run_once(task.clone(), ctx.clone(), timeout).await;

                match result {
                    Ok(output) => {
                        if let Some(ref b) = breaker {
                            b.record_success();
                        }
                        return TaskResult {
                            id,
                            attempts,
                            outcome: Ok(output),
                        };
                    }
                    Err(err) => {
                        if let Some(ref b) = breaker {
                            b.record_failure();
                        }
                        let retryable = err.is_retryable()
                            && retry.should_retry(attempts)
                            && !ctx.is_cancelled();
                        if !retryable {
                            return TaskResult {
                                id,
                                attempts,
                                outcome: Err(err),
                            };
                        }
                        metrics.retry();
                        let delay = retry.delay_for(attempts);
                        if !delay.is_zero() {
                            tokio::time::sleep(delay).await;
                        }
                    }
                }
            }
        })
    }
}

/// Run a single attempt, applying the optional timeout.
async fn run_once(
    task: Arc<dyn Task>,
    ctx: Arc<Context>,
    timeout: Option<Duration>,
) -> Result<TaskOutput, TaskError> {
    match timeout {
        Some(t) => match tokio::time::timeout(t, task.execute(ctx)).await {
            Ok(res) => res,
            Err(_) => Err(TaskError::Timeout(t)),
        },
        None => task.execute(ctx).await,
    }
}