1use std::collections::HashSet;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub enum ScheduleResource {
13 WorkspaceScan,
15 Path(String),
17 WorkspaceWrite,
19}
20
21#[derive(Debug, Clone)]
23pub struct DagPlanView {
24 pub index: usize,
25 pub parallel_eligible: bool,
26 pub reads: HashSet<ScheduleResource>,
27 pub writes: HashSet<ScheduleResource>,
28}
29
30impl DagPlanView {
31 #[must_use]
32 pub fn depends_on(&self, prior: &Self) -> bool {
33 resource_conflict(&prior.writes, &self.reads)
34 || resource_conflict(&prior.writes, &self.writes)
35 || resource_conflict(&prior.reads, &self.writes)
36 || coarse_workspace_write_before_scan(&prior.writes, &self.reads)
37 }
38}
39
40fn coarse_workspace_write_before_scan(
41 writes: &HashSet<ScheduleResource>,
42 reads: &HashSet<ScheduleResource>,
43) -> bool {
44 reads.contains(&ScheduleResource::WorkspaceScan)
45 && writes.contains(&ScheduleResource::WorkspaceWrite)
46}
47
48fn resource_conflict(left: &HashSet<ScheduleResource>, right: &HashSet<ScheduleResource>) -> bool {
49 left.iter()
50 .any(|l| right.iter().any(|r| l.conflicts_with(r)))
51}
52
53impl ScheduleResource {
54 fn conflicts_with(&self, other: &Self) -> bool {
55 match (self, other) {
56 (Self::Path(a), Self::Path(b)) => a == b,
57 (Self::WorkspaceWrite, Self::WorkspaceWrite) => true,
58 (Self::WorkspaceWrite, Self::Path(_)) | (Self::Path(_), Self::WorkspaceWrite) => true,
59 _ => false,
60 }
61 }
62}
63
64#[must_use]
68pub fn build_execution_waves(plans: &[DagPlanView]) -> Vec<Vec<usize>> {
69 if plans.is_empty() {
70 return Vec::new();
71 }
72 if plans.len() == 1 {
73 return vec![vec![plans[0].index]];
74 }
75
76 let n = plans.len();
77 let mut deps: Vec<HashSet<usize>> = vec![HashSet::new(); n];
78 for (j, plan_j) in plans.iter().enumerate() {
79 for (_i, plan_i) in plans.iter().enumerate().take(j) {
80 if plan_j.depends_on(plan_i) {
81 deps[j].insert(plan_i.index);
82 }
83 }
84 }
85
86 let mut remaining: HashSet<usize> = plans.iter().map(|p| p.index).collect();
87 let mut satisfied: HashSet<usize> = HashSet::new();
88 let mut waves: Vec<Vec<usize>> = Vec::new();
89
90 while !remaining.is_empty() {
91 let mut wave: Vec<usize> = remaining
92 .iter()
93 .copied()
94 .filter(|idx| deps[*idx].iter().all(|dep| satisfied.contains(dep)))
95 .collect();
96 if wave.is_empty() {
97 wave = remaining.iter().copied().take(1).collect();
99 }
100 wave.sort_unstable();
101 for idx in &wave {
102 remaining.remove(idx);
103 satisfied.insert(*idx);
104 }
105 waves.push(wave);
106 }
107
108 waves
109}
110
111static SCHEDULER_SHADOW_COMPARISONS: AtomicU64 = AtomicU64::new(0);
112static SCHEDULER_SHADOW_DIFFS: AtomicU64 = AtomicU64::new(0);
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub struct SchedulerShadowStats {
117 pub comparisons: u64,
118 pub diffs: u64,
119}
120
121pub fn record_scheduler_shadow_diff(legacy: &[Vec<usize>], dag: &[Vec<usize>]) {
123 SCHEDULER_SHADOW_COMPARISONS.fetch_add(1, Ordering::Relaxed);
124 if legacy != dag {
125 SCHEDULER_SHADOW_DIFFS.fetch_add(1, Ordering::Relaxed);
126 tracing::debug!(?legacy, ?dag, "scheduler shadow diff");
127 }
128}
129
130#[must_use]
131pub fn scheduler_shadow_stats() -> SchedulerShadowStats {
132 SchedulerShadowStats {
133 comparisons: SCHEDULER_SHADOW_COMPARISONS.load(Ordering::Relaxed),
134 diffs: SCHEDULER_SHADOW_DIFFS.load(Ordering::Relaxed),
135 }
136}
137
138#[must_use]
140pub fn wave_parallel_eligible(plans: &[DagPlanView], wave: &[usize]) -> bool {
141 !wave.is_empty()
142 && wave.iter().all(|idx| {
143 plans
144 .iter()
145 .find(|p| p.index == *idx)
146 .is_some_and(|p| p.parallel_eligible)
147 })
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use std::collections::VecDeque;
154
155 fn path_read(index: usize, path: &str) -> DagPlanView {
156 DagPlanView {
157 index,
158 parallel_eligible: true,
159 reads: HashSet::from([ScheduleResource::Path(path.to_string())]),
160 writes: HashSet::new(),
161 }
162 }
163
164 fn path_write(index: usize, path: &str) -> DagPlanView {
165 DagPlanView {
166 index,
167 parallel_eligible: false,
168 reads: HashSet::new(),
169 writes: HashSet::from([ScheduleResource::Path(path.to_string())]),
170 }
171 }
172
173 fn workspace_scan(index: usize) -> DagPlanView {
174 DagPlanView {
175 index,
176 parallel_eligible: true,
177 reads: HashSet::from([ScheduleResource::WorkspaceScan]),
178 writes: HashSet::new(),
179 }
180 }
181
182 #[test]
183 fn proposal_example_read_batch_then_write() {
184 let plans = vec![
186 path_read(0, "a"),
187 path_read(1, "b"),
188 path_write(2, "a"),
189 workspace_scan(3),
190 ];
191 let waves = build_execution_waves(&plans);
192 assert_eq!(waves.len(), 2, "expected two waves, got {waves:?}");
193 assert_eq!(waves[0], vec![0, 1, 3]);
194 assert_eq!(waves[1], vec![2]);
195 assert!(wave_parallel_eligible(&plans, &waves[0]));
196 assert!(!wave_parallel_eligible(&plans, &waves[1]));
197 }
198
199 #[test]
200 fn legacy_all_readonly_single_wave() {
201 let plans = vec![path_read(0, "a"), path_read(1, "b"), path_read(2, "c")];
202 let waves = build_execution_waves(&plans);
203 assert_eq!(waves, vec![vec![0, 1, 2]]);
204 }
205
206 #[test]
207 fn write_serializes_on_same_path() {
208 let plans = vec![path_write(0, "x"), path_read(1, "x")];
209 let waves = build_execution_waves(&plans);
210 assert_eq!(waves, vec![vec![0], vec![1]]);
211 }
212
213 #[test]
214 fn independent_paths_parallelize() {
215 let plans = vec![path_write(0, "a"), path_read(1, "b")];
216 let waves = build_execution_waves(&plans);
217 assert_eq!(waves, vec![vec![0, 1]]);
218 }
219
220 #[test]
221 fn empty_batch() {
222 assert!(build_execution_waves(&[]).is_empty());
223 }
224
225 #[test]
226 fn wave_order_is_deterministic() {
227 let plans: Vec<DagPlanView> = (0..4).map(|i| path_read(i, &format!("f{i}"))).collect();
228 let w1 = build_execution_waves(&plans);
229 let w2 = build_execution_waves(&plans);
230 assert_eq!(w1, w2);
231 assert_eq!(w1, vec![vec![0, 1, 2, 3]]);
232 }
233
234 #[test]
235 fn coarse_workspace_write_blocks_scan() {
236 let plans = vec![
237 DagPlanView {
238 index: 0,
239 parallel_eligible: false,
240 reads: HashSet::new(),
241 writes: HashSet::from([ScheduleResource::WorkspaceWrite]),
242 },
243 workspace_scan(1),
244 ];
245 let waves = build_execution_waves(&plans);
246 assert_eq!(waves, vec![vec![0], vec![1]]);
247 }
248
249 #[test]
250 fn deps_queue_matches_manual_topo() {
251 let plans = vec![
252 path_read(0, "src/lib.rs"),
253 path_write(1, "src/lib.rs"),
254 path_read(2, "README.md"),
255 ];
256 let mut q: VecDeque<usize> = build_execution_waves(&plans)
257 .into_iter()
258 .flat_map(|w| w.into_iter())
259 .collect();
260 assert_eq!(q.pop_front(), Some(0));
261 assert_eq!(q.pop_front(), Some(2));
262 assert_eq!(q.pop_front(), Some(1));
263 }
264}