car-scheduler 0.32.0

Task scheduling and background execution for Common Agent Runtime
//! Scheduler-theoretic analysis of a static execution DAG (arXiv 2604.11378,
//! *From Agent Loops to Structured Graphs: A Scheduler-Theoretic Framework for
//! LLM Agent Execution*).
//!
//! See `docs/proposals/scheduler-graph-analysis.md`. The paper characterizes the
//! Agent Loop as a *single-ready-unit scheduler* and argues for lifting control
//! flow into an explicit static DAG you can inspect. CAR's executor already runs
//! a dependency DAG concurrently, but nothing analyzes a plan's *schedule*
//! first. This module is that pure analysis: critical path, makespan,
//! parallelism width, topological waves, cycle detection, and the serial
//! "Agent-Loop pathology" flag — deterministic, inspectable, zero-inference.

use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};

/// One schedulable unit: an id, an execution `duration` (any time unit), and the
/// units it depends on (must finish first).
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ScheduleUnit {
    pub id: String,
    #[serde(default)]
    pub duration: u64,
    #[serde(default)]
    pub depends_on: Vec<String>,
}

/// A static execution DAG.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ScheduleGraph {
    #[serde(default)]
    pub units: Vec<ScheduleUnit>,
}

/// The schedule analysis.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ScheduleAnalysis {
    /// True if the dependency graph has a cycle — a static plan must be acyclic;
    /// a cycle is an unbounded loop in plan form. Path/level metrics are then
    /// computed over the acyclic prefix only.
    pub has_cycle: bool,
    /// Unit ids on the longest-duration dependency chain.
    pub critical_path: Vec<String>,
    /// Total duration of the critical path — the minimum completion time with
    /// unbounded parallelism.
    pub makespan: u64,
    /// The widest wave of independent units — the available concurrency width.
    pub max_parallelism: u64,
    /// Topological waves: `levels[i]` are units that can run together once
    /// everything in earlier waves has finished.
    pub levels: Vec<Vec<String>>,
    /// True when `max_parallelism <= 1` — the plan collapses to one-unit-at-a-
    /// time, the single-ready-unit Agent-Loop pathology the paper critiques.
    pub serial: bool,
}

/// Dependencies of `u` that reference a real unit in `ids` (a dangling dep is
/// ignored here — structural validity is a separate concern). The returned refs
/// borrow `u`, not `ids`.
fn valid_deps<'a>(u: &'a ScheduleUnit, ids: &HashSet<&str>) -> Vec<&'a str> {
    u.depends_on
        .iter()
        .map(|d| d.as_str())
        .filter(|d| ids.contains(*d))
        .collect()
}

/// Analyze a static execution DAG. Deterministic and pure.
pub fn analyze_schedule(graph: &ScheduleGraph) -> ScheduleAnalysis {
    let ids: HashSet<&str> = graph.units.iter().map(|u| u.id.as_str()).collect();
    let by_id: HashMap<&str, &ScheduleUnit> =
        graph.units.iter().map(|u| (u.id.as_str(), u)).collect();

    // In-degree for Kahn's algorithm (cycle detection + level assignment).
    let mut indegree: HashMap<&str, usize> = graph.units.iter().map(|u| (u.id.as_str(), 0)).collect();
    let mut dependents: HashMap<&str, Vec<&str>> = HashMap::new();
    for u in &graph.units {
        for d in valid_deps(u, &ids) {
            *indegree.get_mut(u.id.as_str()).unwrap() += 1;
            dependents.entry(d).or_default().push(u.id.as_str());
        }
    }

    // Kahn: process ready units (indegree 0), assigning each to a level one past
    // the max of its dependencies. Units never reached are in a cycle.
    let mut level_of: HashMap<&str, usize> = HashMap::new();
    let mut q: VecDeque<&str> = indegree
        .iter()
        .filter(|(_, &d)| d == 0)
        .map(|(&id, _)| id)
        .collect();
    // Stable order for determinism.
    let mut ready: Vec<&str> = q.drain(..).collect();
    ready.sort_unstable();
    let mut q: VecDeque<&str> = ready.into_iter().collect();
    for &id in &q {
        level_of.insert(id, 0);
    }
    let mut processed = 0usize;
    let mut indeg = indegree.clone();
    while let Some(id) = q.pop_front() {
        processed += 1;
        let lvl = level_of[id];
        let mut next_ready: Vec<&str> = Vec::new();
        if let Some(deps) = dependents.get(id) {
            for &dep in deps {
                let lv = level_of.get(dep).copied().unwrap_or(0).max(lvl + 1);
                level_of.insert(dep, lv);
                let e = indeg.get_mut(dep).unwrap();
                *e -= 1;
                if *e == 0 {
                    next_ready.push(dep);
                }
            }
        }
        next_ready.sort_unstable();
        for dep in next_ready {
            q.push_back(dep);
        }
    }
    let has_cycle = processed < graph.units.len();

    // Group acyclic units into topological waves.
    let max_level = level_of.values().copied().max().unwrap_or(0);
    let mut levels: Vec<Vec<String>> = vec![Vec::new(); max_level + 1];
    for (&id, &lvl) in &level_of {
        levels[lvl].push(id.to_string());
    }
    for wave in &mut levels {
        wave.sort();
    }
    if level_of.is_empty() {
        levels.clear();
    }
    let max_parallelism = levels.iter().map(|w| w.len() as u64).max().unwrap_or(0);

    // Critical path via earliest-finish DP over the acyclic units, in level
    // order so every dependency is finalized before its dependents.
    let mut ef: HashMap<&str, u64> = HashMap::new();
    let mut pred: HashMap<&str, Option<&str>> = HashMap::new();
    for lvl in 0..=max_level {
        for id in level_of
            .iter()
            .filter(|(_, &l)| l == lvl)
            .map(|(&id, _)| id)
        {
            let unit = by_id[id];
            let mut best = 0u64;
            let mut best_pred: Option<&str> = None;
            for d in valid_deps(unit, &ids) {
                if let Some(&fin) = ef.get(d) {
                    if fin > best {
                        best = fin;
                        best_pred = Some(d);
                    }
                }
            }
            ef.insert(id, best + unit.duration);
            pred.insert(id, best_pred);
        }
    }
    let makespan = ef.values().copied().max().unwrap_or(0);
    // Backtrack from the max-EF unit (smallest id on ties) to recover the path.
    let mut tail: Option<&str> = None;
    let mut best_ef = 0u64;
    let mut tail_ids: Vec<&str> = ef.keys().copied().collect();
    tail_ids.sort_unstable();
    for id in tail_ids {
        let f = ef[id];
        if f > best_ef {
            best_ef = f;
            tail = Some(id);
        }
    }
    let mut critical_path = Vec::new();
    let mut cur = tail;
    while let Some(id) = cur {
        critical_path.push(id.to_string());
        cur = pred.get(id).copied().flatten();
    }
    critical_path.reverse();

    ScheduleAnalysis {
        has_cycle,
        critical_path,
        makespan,
        max_parallelism,
        serial: max_parallelism <= 1,
        levels,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn unit(id: &str, duration: u64, deps: &[&str]) -> ScheduleUnit {
        ScheduleUnit {
            id: id.into(),
            duration,
            depends_on: deps.iter().map(|s| s.to_string()).collect(),
        }
    }

    fn graph(units: Vec<ScheduleUnit>) -> ScheduleGraph {
        ScheduleGraph { units }
    }

    #[test]
    fn linear_chain_is_serial() {
        // a -> b -> c, durations 1/2/3.
        let g = graph(vec![
            unit("a", 1, &[]),
            unit("b", 2, &["a"]),
            unit("c", 3, &["b"]),
        ]);
        let r = analyze_schedule(&g);
        assert!(!r.has_cycle);
        assert!(r.serial);
        assert_eq!(r.max_parallelism, 1);
        assert_eq!(r.makespan, 6);
        assert_eq!(
            r.critical_path,
            vec!["a".to_string(), "b".to_string(), "c".to_string()]
        );
        assert_eq!(r.levels.len(), 3);
    }

    #[test]
    fn diamond_has_parallelism_two() {
        // a -> {b, c} -> d. b=5, c=2, so critical path is a,b,d.
        let g = graph(vec![
            unit("a", 1, &[]),
            unit("b", 5, &["a"]),
            unit("c", 2, &["a"]),
            unit("d", 1, &["b", "c"]),
        ]);
        let r = analyze_schedule(&g);
        assert!(!r.has_cycle);
        assert!(!r.serial);
        assert_eq!(r.max_parallelism, 2); // {b, c} run together
        assert_eq!(r.makespan, 7); // a(1) + b(5) + d(1)
        assert_eq!(
            r.critical_path,
            vec!["a".to_string(), "b".to_string(), "d".to_string()]
        );
    }

    #[test]
    fn fully_parallel_fan() {
        // three independent units → one wave of width 3.
        let g = graph(vec![unit("a", 3, &[]), unit("b", 1, &[]), unit("c", 2, &[])]);
        let r = analyze_schedule(&g);
        assert_eq!(r.max_parallelism, 3);
        assert_eq!(r.makespan, 3); // the longest single unit
        assert!(!r.serial);
        assert_eq!(r.levels.len(), 1);
    }

    #[test]
    fn cycle_is_detected() {
        // a -> b -> a.
        let g = graph(vec![unit("a", 1, &["b"]), unit("b", 1, &["a"])]);
        let r = analyze_schedule(&g);
        assert!(r.has_cycle);
    }

    #[test]
    fn empty_graph_is_trivial() {
        let r = analyze_schedule(&graph(vec![]));
        assert!(!r.has_cycle);
        assert_eq!(r.makespan, 0);
        assert_eq!(r.max_parallelism, 0);
        assert!(r.serial); // <= 1 by definition; nothing to parallelize
        assert!(r.critical_path.is_empty());
    }

    #[test]
    fn single_unit_is_serial() {
        let r = analyze_schedule(&graph(vec![unit("only", 4, &[])]));
        assert!(r.serial);
        assert_eq!(r.makespan, 4);
        assert_eq!(r.critical_path, vec!["only".to_string()]);
    }

    #[test]
    fn dangling_dependency_is_ignored() {
        // 'a' depends on a non-existent 'ghost' — treated as no dep.
        let g = graph(vec![unit("a", 2, &["ghost"])]);
        let r = analyze_schedule(&g);
        assert!(!r.has_cycle);
        assert_eq!(r.makespan, 2);
        assert_eq!(r.levels.len(), 1);
    }
}