dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Priority-aware scheduling and task-state bookkeeping.

use crate::state::{TaskRecord, TaskState};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};

/// A ready task waiting in the priority queue.
struct Prioritized {
    priority: u8,
    /// Insertion order, used to break ties FIFO.
    seq: u64,
    id: String,
}

impl PartialEq for Prioritized {
    fn eq(&self, other: &Self) -> bool {
        self.priority == other.priority && self.seq == other.seq
    }
}
impl Eq for Prioritized {}

impl Ord for Prioritized {
    fn cmp(&self, other: &Self) -> Ordering {
        // Higher priority first; among equal priorities, lower seq (enqueued
        // earlier) wins — so reverse the seq comparison for the max-heap.
        self.priority
            .cmp(&other.priority)
            .then_with(|| other.seq.cmp(&self.seq))
    }
}
impl PartialOrd for Prioritized {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

/// Tracks the state of every task and serves the next ready task by priority.
///
/// The scheduler owns the authoritative [`TaskRecord`] map. The executor pushes
/// tasks in as they become runnable and pulls the highest-priority one out.
#[derive(Default)]
pub struct Scheduler {
    records: HashMap<String, TaskRecord>,
    ready: BinaryHeap<Prioritized>,
    seq: u64,
}

impl Scheduler {
    /// Create an empty scheduler.
    pub fn new() -> Self {
        Self::default()
    }

    /// Seed the scheduler with previously persisted records (for recovery).
    pub fn with_records(records: HashMap<String, TaskRecord>) -> Self {
        Scheduler {
            records,
            ready: BinaryHeap::new(),
            seq: 0,
        }
    }

    /// Ensure a record exists for `id`, creating a `Pending` one if not.
    pub fn ensure_record(&mut self, id: &str) -> &mut TaskRecord {
        self.records
            .entry(id.to_string())
            .or_insert_with(|| TaskRecord::new(id))
    }

    /// Immutable access to a record.
    pub fn record(&self, id: &str) -> Option<&TaskRecord> {
        self.records.get(id)
    }

    /// The full record map.
    pub fn records(&self) -> &HashMap<String, TaskRecord> {
        &self.records
    }

    /// Mutable access to the full record map.
    pub fn records_mut(&mut self) -> &mut HashMap<String, TaskRecord> {
        &mut self.records
    }

    /// Current state of `id`, if known.
    pub fn state(&self, id: &str) -> Option<TaskState> {
        self.records.get(id).map(|r| r.state)
    }

    /// Apply a state transition, returning whether it was legal/applied.
    pub fn transition(&mut self, id: &str, state: TaskState) -> bool {
        match self.records.get_mut(id) {
            Some(r) => r.transition(state),
            None => false,
        }
    }

    /// Mark `id` ready and enqueue it at `priority`.
    pub fn mark_ready(&mut self, id: &str, priority: u8) {
        let applied = {
            let record = self.ensure_record(id);
            record.transition(TaskState::Ready)
        };
        if applied {
            let seq = self.seq;
            self.seq += 1;
            self.ready.push(Prioritized {
                priority,
                seq,
                id: id.to_string(),
            });
        }
    }

    /// Pop the highest-priority task that is still in the `Ready` state.
    ///
    /// Stale heap entries (whose record has since moved on) are discarded.
    pub fn next_ready(&mut self) -> Option<String> {
        while let Some(entry) = self.ready.pop() {
            if self.state(&entry.id) == Some(TaskState::Ready) {
                return Some(entry.id);
            }
        }
        None
    }

    /// Whether any task is queued ready to run.
    pub fn has_ready(&self) -> bool {
        // May overcount stale entries, but is only used as a cheap hint.
        !self.ready.is_empty()
    }

    /// Whether every known task has reached a terminal state.
    pub fn all_terminal(&self) -> bool {
        self.records.values().all(|r| r.state.is_terminal())
    }

    /// Count records currently in `state`.
    pub fn count_in(&self, state: TaskState) -> usize {
        self.records.values().filter(|r| r.state == state).count()
    }
}