dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Lightweight, dependency-free metrics collection.

use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};

/// A snapshot of collected metrics at a point in time.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct MetricsSnapshot {
    /// Tasks that started executing.
    pub tasks_started: u64,
    /// Tasks that completed successfully.
    pub tasks_completed: u64,
    /// Tasks that failed terminally.
    pub tasks_failed: u64,
    /// Tasks routed to the dead-letter queue.
    pub tasks_dead_lettered: u64,
    /// Total retry attempts across all tasks.
    pub retries: u64,
    /// Tasks skipped due to failed dependencies.
    pub tasks_skipped: u64,
    /// Per-task execution durations in milliseconds.
    pub durations_ms: HashMap<String, u64>,
}

impl MetricsSnapshot {
    /// Success rate over tasks that reached a started state, in `[0, 1]`.
    pub fn success_rate(&self) -> f64 {
        let finished = self.tasks_completed + self.tasks_failed + self.tasks_dead_lettered;
        if finished == 0 {
            return 1.0;
        }
        self.tasks_completed as f64 / finished as f64
    }

    /// Mean task duration in milliseconds, if any durations were recorded.
    pub fn mean_duration_ms(&self) -> Option<f64> {
        if self.durations_ms.is_empty() {
            return None;
        }
        let sum: u64 = self.durations_ms.values().sum();
        Some(sum as f64 / self.durations_ms.len() as f64)
    }
}

/// Thread-safe metrics collector shared across the executor.
///
/// Counters are lock-free atomics; the per-task duration map is behind a mutex
/// since it is written far less often (once per task completion).
#[derive(Default)]
pub struct MetricsCollector {
    started: AtomicU64,
    completed: AtomicU64,
    failed: AtomicU64,
    dead_lettered: AtomicU64,
    retries: AtomicU64,
    skipped: AtomicU64,
    durations: Mutex<HashMap<String, u64>>,
}

impl MetricsCollector {
    /// Create an empty collector.
    pub fn new() -> Self {
        Self::default()
    }

    /// Record that a task started.
    pub fn task_started(&self) {
        self.started.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a successful completion along with its duration.
    pub fn task_completed(&self, id: &str, duration_ms: u64) {
        self.completed.fetch_add(1, Ordering::Relaxed);
        self.durations.lock().insert(id.to_string(), duration_ms);
    }

    /// Record a terminal failure.
    pub fn task_failed(&self) {
        self.failed.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a dead-letter event.
    pub fn task_dead_lettered(&self) {
        self.dead_lettered.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a retry attempt.
    pub fn retry(&self) {
        self.retries.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a skipped task.
    pub fn task_skipped(&self) {
        self.skipped.fetch_add(1, Ordering::Relaxed);
    }

    /// Capture an immutable snapshot of all current values.
    pub fn snapshot(&self) -> MetricsSnapshot {
        MetricsSnapshot {
            tasks_started: self.started.load(Ordering::Relaxed),
            tasks_completed: self.completed.load(Ordering::Relaxed),
            tasks_failed: self.failed.load(Ordering::Relaxed),
            tasks_dead_lettered: self.dead_lettered.load(Ordering::Relaxed),
            retries: self.retries.load(Ordering::Relaxed),
            tasks_skipped: self.skipped.load(Ordering::Relaxed),
            durations_ms: self.durations.lock().clone(),
        }
    }
}