use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ScheduleResource {
WorkspaceScan,
Path(String),
WorkspaceWrite,
}
#[derive(Debug, Clone)]
pub struct DagPlanView {
pub index: usize,
pub parallel_eligible: bool,
pub reads: HashSet<ScheduleResource>,
pub writes: HashSet<ScheduleResource>,
}
impl DagPlanView {
#[must_use]
pub fn depends_on(&self, prior: &Self) -> bool {
resource_conflict(&prior.writes, &self.reads)
|| resource_conflict(&prior.writes, &self.writes)
|| resource_conflict(&prior.reads, &self.writes)
|| coarse_workspace_write_before_scan(&prior.writes, &self.reads)
}
}
fn coarse_workspace_write_before_scan(
writes: &HashSet<ScheduleResource>,
reads: &HashSet<ScheduleResource>,
) -> bool {
reads.contains(&ScheduleResource::WorkspaceScan)
&& writes.contains(&ScheduleResource::WorkspaceWrite)
}
fn resource_conflict(left: &HashSet<ScheduleResource>, right: &HashSet<ScheduleResource>) -> bool {
left.iter()
.any(|l| right.iter().any(|r| l.conflicts_with(r)))
}
impl ScheduleResource {
fn conflicts_with(&self, other: &Self) -> bool {
match (self, other) {
(Self::Path(a), Self::Path(b)) => a == b,
(Self::WorkspaceWrite, Self::WorkspaceWrite) => true,
(Self::WorkspaceWrite, Self::Path(_)) | (Self::Path(_), Self::WorkspaceWrite) => true,
_ => false,
}
}
}
#[must_use]
pub fn build_execution_waves(plans: &[DagPlanView]) -> Vec<Vec<usize>> {
if plans.is_empty() {
return Vec::new();
}
if plans.len() == 1 {
return vec![vec![plans[0].index]];
}
let n = plans.len();
let mut deps: Vec<HashSet<usize>> = vec![HashSet::new(); n];
for (j, plan_j) in plans.iter().enumerate() {
for (_i, plan_i) in plans.iter().enumerate().take(j) {
if plan_j.depends_on(plan_i) {
deps[j].insert(plan_i.index);
}
}
}
let mut remaining: HashSet<usize> = plans.iter().map(|p| p.index).collect();
let mut satisfied: HashSet<usize> = HashSet::new();
let mut waves: Vec<Vec<usize>> = Vec::new();
while !remaining.is_empty() {
let mut wave: Vec<usize> = remaining
.iter()
.copied()
.filter(|idx| deps[*idx].iter().all(|dep| satisfied.contains(dep)))
.collect();
if wave.is_empty() {
wave = remaining.iter().copied().take(1).collect();
}
wave.sort_unstable();
for idx in &wave {
remaining.remove(idx);
satisfied.insert(*idx);
}
waves.push(wave);
}
waves
}
static SCHEDULER_SHADOW_COMPARISONS: AtomicU64 = AtomicU64::new(0);
static SCHEDULER_SHADOW_DIFFS: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SchedulerShadowStats {
pub comparisons: u64,
pub diffs: u64,
}
pub fn record_scheduler_shadow_diff(legacy: &[Vec<usize>], dag: &[Vec<usize>]) {
SCHEDULER_SHADOW_COMPARISONS.fetch_add(1, Ordering::Relaxed);
if legacy != dag {
SCHEDULER_SHADOW_DIFFS.fetch_add(1, Ordering::Relaxed);
tracing::debug!(?legacy, ?dag, "scheduler shadow diff");
}
}
#[must_use]
pub fn scheduler_shadow_stats() -> SchedulerShadowStats {
SchedulerShadowStats {
comparisons: SCHEDULER_SHADOW_COMPARISONS.load(Ordering::Relaxed),
diffs: SCHEDULER_SHADOW_DIFFS.load(Ordering::Relaxed),
}
}
#[must_use]
pub fn wave_parallel_eligible(plans: &[DagPlanView], wave: &[usize]) -> bool {
!wave.is_empty()
&& wave.iter().all(|idx| {
plans
.iter()
.find(|p| p.index == *idx)
.is_some_and(|p| p.parallel_eligible)
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
fn path_read(index: usize, path: &str) -> DagPlanView {
DagPlanView {
index,
parallel_eligible: true,
reads: HashSet::from([ScheduleResource::Path(path.to_string())]),
writes: HashSet::new(),
}
}
fn path_write(index: usize, path: &str) -> DagPlanView {
DagPlanView {
index,
parallel_eligible: false,
reads: HashSet::new(),
writes: HashSet::from([ScheduleResource::Path(path.to_string())]),
}
}
fn workspace_scan(index: usize) -> DagPlanView {
DagPlanView {
index,
parallel_eligible: true,
reads: HashSet::from([ScheduleResource::WorkspaceScan]),
writes: HashSet::new(),
}
}
#[test]
fn proposal_example_read_batch_then_write() {
let plans = vec![
path_read(0, "a"),
path_read(1, "b"),
path_write(2, "a"),
workspace_scan(3),
];
let waves = build_execution_waves(&plans);
assert_eq!(waves.len(), 2, "expected two waves, got {waves:?}");
assert_eq!(waves[0], vec![0, 1, 3]);
assert_eq!(waves[1], vec![2]);
assert!(wave_parallel_eligible(&plans, &waves[0]));
assert!(!wave_parallel_eligible(&plans, &waves[1]));
}
#[test]
fn legacy_all_readonly_single_wave() {
let plans = vec![path_read(0, "a"), path_read(1, "b"), path_read(2, "c")];
let waves = build_execution_waves(&plans);
assert_eq!(waves, vec![vec![0, 1, 2]]);
}
#[test]
fn write_serializes_on_same_path() {
let plans = vec![path_write(0, "x"), path_read(1, "x")];
let waves = build_execution_waves(&plans);
assert_eq!(waves, vec![vec![0], vec![1]]);
}
#[test]
fn independent_paths_parallelize() {
let plans = vec![path_write(0, "a"), path_read(1, "b")];
let waves = build_execution_waves(&plans);
assert_eq!(waves, vec![vec![0, 1]]);
}
#[test]
fn empty_batch() {
assert!(build_execution_waves(&[]).is_empty());
}
#[test]
fn wave_order_is_deterministic() {
let plans: Vec<DagPlanView> = (0..4).map(|i| path_read(i, &format!("f{i}"))).collect();
let w1 = build_execution_waves(&plans);
let w2 = build_execution_waves(&plans);
assert_eq!(w1, w2);
assert_eq!(w1, vec![vec![0, 1, 2, 3]]);
}
#[test]
fn coarse_workspace_write_blocks_scan() {
let plans = vec![
DagPlanView {
index: 0,
parallel_eligible: false,
reads: HashSet::new(),
writes: HashSet::from([ScheduleResource::WorkspaceWrite]),
},
workspace_scan(1),
];
let waves = build_execution_waves(&plans);
assert_eq!(waves, vec![vec![0], vec![1]]);
}
#[test]
fn deps_queue_matches_manual_topo() {
let plans = vec![
path_read(0, "src/lib.rs"),
path_write(1, "src/lib.rs"),
path_read(2, "README.md"),
];
let mut q: VecDeque<usize> = build_execution_waves(&plans)
.into_iter()
.flat_map(|w| w.into_iter())
.collect();
assert_eq!(q.pop_front(), Some(0));
assert_eq!(q.pop_front(), Some(2));
assert_eq!(q.pop_front(), Some(1));
}
}