Skip to main content

zagens_tools/
dag_scheduler.rs

1//! Resource-dependency DAG scheduler (kernel-v2 M4).
2//!
3//! Groups tool plans into execution waves. Plans within a wave may run in
4//! parallel when `parallel_eligible`; later waves start only after all deps in
5//! earlier waves complete.
6
7use std::collections::HashSet;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10/// Coarse scheduling resource (path-level refinement lives in runtime bridge).
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub enum ScheduleResource {
13    /// Whole-workspace scan reads (grep/glob/list without isolated path).
14    WorkspaceScan,
15    /// Canonical workspace-relative path (normalized lowercase).
16    Path(String),
17    /// Fallback write lock when path cannot be inferred.
18    WorkspaceWrite,
19}
20
21/// Lightweight plan view for scheduling (no execution state).
22#[derive(Debug, Clone)]
23pub struct DagPlanView {
24    pub index: usize,
25    pub parallel_eligible: bool,
26    pub reads: HashSet<ScheduleResource>,
27    pub writes: HashSet<ScheduleResource>,
28}
29
30impl DagPlanView {
31    #[must_use]
32    pub fn depends_on(&self, prior: &Self) -> bool {
33        resource_conflict(&prior.writes, &self.reads)
34            || resource_conflict(&prior.writes, &self.writes)
35            || resource_conflict(&prior.reads, &self.writes)
36            || coarse_workspace_write_before_scan(&prior.writes, &self.reads)
37    }
38}
39
40fn coarse_workspace_write_before_scan(
41    writes: &HashSet<ScheduleResource>,
42    reads: &HashSet<ScheduleResource>,
43) -> bool {
44    reads.contains(&ScheduleResource::WorkspaceScan)
45        && writes.contains(&ScheduleResource::WorkspaceWrite)
46}
47
48fn resource_conflict(left: &HashSet<ScheduleResource>, right: &HashSet<ScheduleResource>) -> bool {
49    left.iter()
50        .any(|l| right.iter().any(|r| l.conflicts_with(r)))
51}
52
53impl ScheduleResource {
54    fn conflicts_with(&self, other: &Self) -> bool {
55        match (self, other) {
56            (Self::Path(a), Self::Path(b)) => a == b,
57            (Self::WorkspaceWrite, Self::WorkspaceWrite) => true,
58            (Self::WorkspaceWrite, Self::Path(_)) | (Self::Path(_), Self::WorkspaceWrite) => true,
59            _ => false,
60        }
61    }
62}
63
64/// Build execution waves as plan index groups (proposal §8.2 DAG batching).
65///
66/// Returns one index per inner vec. Empty input yields empty vec.
67#[must_use]
68pub fn build_execution_waves(plans: &[DagPlanView]) -> Vec<Vec<usize>> {
69    if plans.is_empty() {
70        return Vec::new();
71    }
72    if plans.len() == 1 {
73        return vec![vec![plans[0].index]];
74    }
75
76    let n = plans.len();
77    let mut deps: Vec<HashSet<usize>> = vec![HashSet::new(); n];
78    for (j, plan_j) in plans.iter().enumerate() {
79        for (_i, plan_i) in plans.iter().enumerate().take(j) {
80            if plan_j.depends_on(plan_i) {
81                deps[j].insert(plan_i.index);
82            }
83        }
84    }
85
86    let mut remaining: HashSet<usize> = plans.iter().map(|p| p.index).collect();
87    let mut satisfied: HashSet<usize> = HashSet::new();
88    let mut waves: Vec<Vec<usize>> = Vec::new();
89
90    while !remaining.is_empty() {
91        let mut wave: Vec<usize> = remaining
92            .iter()
93            .copied()
94            .filter(|idx| deps[*idx].iter().all(|dep| satisfied.contains(dep)))
95            .collect();
96        if wave.is_empty() {
97            // Cycle or logic bug — fall back to serial program order for safety.
98            wave = remaining.iter().copied().take(1).collect();
99        }
100        wave.sort_unstable();
101        for idx in &wave {
102            remaining.remove(idx);
103            satisfied.insert(*idx);
104        }
105        waves.push(wave);
106    }
107
108    waves
109}
110
111static SCHEDULER_SHADOW_COMPARISONS: AtomicU64 = AtomicU64::new(0);
112static SCHEDULER_SHADOW_DIFFS: AtomicU64 = AtomicU64::new(0);
113
114/// Shadow-mode counters (`tools.scheduler = "shadow"`).
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub struct SchedulerShadowStats {
117    pub comparisons: u64,
118    pub diffs: u64,
119}
120
121/// Record a shadow comparison between legacy and DAG execution groups.
122pub fn record_scheduler_shadow_diff(legacy: &[Vec<usize>], dag: &[Vec<usize>]) {
123    SCHEDULER_SHADOW_COMPARISONS.fetch_add(1, Ordering::Relaxed);
124    if legacy != dag {
125        SCHEDULER_SHADOW_DIFFS.fetch_add(1, Ordering::Relaxed);
126        tracing::debug!(?legacy, ?dag, "scheduler shadow diff");
127    }
128}
129
130#[must_use]
131pub fn scheduler_shadow_stats() -> SchedulerShadowStats {
132    SchedulerShadowStats {
133        comparisons: SCHEDULER_SHADOW_COMPARISONS.load(Ordering::Relaxed),
134        diffs: SCHEDULER_SHADOW_DIFFS.load(Ordering::Relaxed),
135    }
136}
137
138/// Whether every plan in a wave may use the parallel executor path.
139#[must_use]
140pub fn wave_parallel_eligible(plans: &[DagPlanView], wave: &[usize]) -> bool {
141    !wave.is_empty()
142        && wave.iter().all(|idx| {
143            plans
144                .iter()
145                .find(|p| p.index == *idx)
146                .is_some_and(|p| p.parallel_eligible)
147        })
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use std::collections::VecDeque;
154
155    fn path_read(index: usize, path: &str) -> DagPlanView {
156        DagPlanView {
157            index,
158            parallel_eligible: true,
159            reads: HashSet::from([ScheduleResource::Path(path.to_string())]),
160            writes: HashSet::new(),
161        }
162    }
163
164    fn path_write(index: usize, path: &str) -> DagPlanView {
165        DagPlanView {
166            index,
167            parallel_eligible: false,
168            reads: HashSet::new(),
169            writes: HashSet::from([ScheduleResource::Path(path.to_string())]),
170        }
171    }
172
173    fn workspace_scan(index: usize) -> DagPlanView {
174        DagPlanView {
175            index,
176            parallel_eligible: true,
177            reads: HashSet::from([ScheduleResource::WorkspaceScan]),
178            writes: HashSet::new(),
179        }
180    }
181
182    #[test]
183    fn proposal_example_read_batch_then_write() {
184        // batch = [read A, read B, edit A, grep C] → {read A, read B, grep C} ∥ then edit A
185        let plans = vec![
186            path_read(0, "a"),
187            path_read(1, "b"),
188            path_write(2, "a"),
189            workspace_scan(3),
190        ];
191        let waves = build_execution_waves(&plans);
192        assert_eq!(waves.len(), 2, "expected two waves, got {waves:?}");
193        assert_eq!(waves[0], vec![0, 1, 3]);
194        assert_eq!(waves[1], vec![2]);
195        assert!(wave_parallel_eligible(&plans, &waves[0]));
196        assert!(!wave_parallel_eligible(&plans, &waves[1]));
197    }
198
199    #[test]
200    fn legacy_all_readonly_single_wave() {
201        let plans = vec![path_read(0, "a"), path_read(1, "b"), path_read(2, "c")];
202        let waves = build_execution_waves(&plans);
203        assert_eq!(waves, vec![vec![0, 1, 2]]);
204    }
205
206    #[test]
207    fn write_serializes_on_same_path() {
208        let plans = vec![path_write(0, "x"), path_read(1, "x")];
209        let waves = build_execution_waves(&plans);
210        assert_eq!(waves, vec![vec![0], vec![1]]);
211    }
212
213    #[test]
214    fn independent_paths_parallelize() {
215        let plans = vec![path_write(0, "a"), path_read(1, "b")];
216        let waves = build_execution_waves(&plans);
217        assert_eq!(waves, vec![vec![0, 1]]);
218    }
219
220    #[test]
221    fn empty_batch() {
222        assert!(build_execution_waves(&[]).is_empty());
223    }
224
225    #[test]
226    fn wave_order_is_deterministic() {
227        let plans: Vec<DagPlanView> = (0..4).map(|i| path_read(i, &format!("f{i}"))).collect();
228        let w1 = build_execution_waves(&plans);
229        let w2 = build_execution_waves(&plans);
230        assert_eq!(w1, w2);
231        assert_eq!(w1, vec![vec![0, 1, 2, 3]]);
232    }
233
234    #[test]
235    fn coarse_workspace_write_blocks_scan() {
236        let plans = vec![
237            DagPlanView {
238                index: 0,
239                parallel_eligible: false,
240                reads: HashSet::new(),
241                writes: HashSet::from([ScheduleResource::WorkspaceWrite]),
242            },
243            workspace_scan(1),
244        ];
245        let waves = build_execution_waves(&plans);
246        assert_eq!(waves, vec![vec![0], vec![1]]);
247    }
248
249    #[test]
250    fn deps_queue_matches_manual_topo() {
251        let plans = vec![
252            path_read(0, "src/lib.rs"),
253            path_write(1, "src/lib.rs"),
254            path_read(2, "README.md"),
255        ];
256        let mut q: VecDeque<usize> = build_execution_waves(&plans)
257            .into_iter()
258            .flat_map(|w| w.into_iter())
259            .collect();
260        assert_eq!(q.pop_front(), Some(0));
261        assert_eq!(q.pop_front(), Some(2));
262        assert_eq!(q.pop_front(), Some(1));
263    }
264}