1use std::collections::{BTreeSet, HashMap, HashSet};
17
18use serde::{Deserialize, Serialize};
19
20use crate::certificate::ResidualCertificate;
21use crate::workgraph::{WorkGraphRevision, WorkNode, WorkNodeState};
22
23#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
26pub enum Resource {
27 File(String),
28 Interface(String),
29 Manifest(String),
30 Lockfile(String),
31 Migration(String),
32 TestFixture(String),
33 Toolchain(String),
34 Capability(String),
36 RiskBudget(String),
38 FreshIdAllocator,
40 LedgerRoot,
42}
43
44#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
46pub struct Footprint {
47 pub reads: BTreeSet<Resource>,
48 pub writes: BTreeSet<Resource>,
49}
50
51impl Footprint {
52 pub fn new() -> Self {
53 Self::default()
54 }
55
56 pub fn read(mut self, r: Resource) -> Self {
57 self.reads.insert(r);
58 self
59 }
60
61 pub fn write(mut self, r: Resource) -> Self {
62 self.writes.insert(r);
63 self
64 }
65
66 pub fn commutes_with(&self, other: &Footprint) -> bool {
69 self.writes.is_disjoint(&other.reads)
70 && other.writes.is_disjoint(&self.reads)
71 && self.writes.is_disjoint(&other.writes)
72 }
73
74 pub fn conflicts_with(&self, other: &Footprint) -> bool {
75 !self.commutes_with(other)
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum LeaseKind {
83 GraphWrite,
85 Toolchain,
87 Resource,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct ExecutionLease {
94 pub lease_id: String,
95 pub holder_work_id: String,
96 pub kind: LeaseKind,
97 pub scope: Resource,
98}
99
100#[derive(Debug, Clone, Default)]
102pub struct LeaseTable {
103 held: Vec<ExecutionLease>,
104}
105
106impl LeaseTable {
107 pub fn new() -> Self {
108 Self::default()
109 }
110
111 pub fn is_available(&self, scope: &Resource) -> bool {
113 !self.held.iter().any(|l| &l.scope == scope)
114 }
115
116 pub fn acquire(&mut self, holder: &str, kind: LeaseKind, scope: Resource) -> Option<String> {
118 if !self.is_available(&scope) {
119 return None;
120 }
121 let lease_id = uuid::Uuid::new_v4().to_string();
122 self.held.push(ExecutionLease {
123 lease_id: lease_id.clone(),
124 holder_work_id: holder.to_string(),
125 kind,
126 scope,
127 });
128 Some(lease_id)
129 }
130
131 pub fn release(&mut self, lease_id: &str) {
132 self.held.retain(|l| l.lease_id != lease_id);
133 }
134
135 pub fn held_count(&self) -> usize {
136 self.held.len()
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
142#[serde(tag = "action", rename_all = "snake_case")]
143pub enum RepairAction {
144 RetryNode {
145 node_id: String,
146 generation: u32,
147 },
148 ExpandScope {
149 node_id: String,
150 generation: u32,
151 added_paths: Vec<String>,
152 },
153 SplitNode {
154 node_id: String,
155 generation: u32,
156 child_goals: Vec<String>,
157 },
158 InsertInterfaceNode {
159 boundary: String,
160 },
161 AddNode {
162 goal: String,
163 reason: String,
164 },
165 RetireNode {
166 node_id: String,
167 generation: u32,
168 reason: String,
169 },
170 ReplanSubgraph {
171 root: String,
172 affected: Vec<String>,
173 },
174 StopNode {
175 node_id: String,
176 generation: u32,
177 certificate_id: String,
178 },
179}
180
181#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
184#[serde(tag = "effect", rename_all = "snake_case")]
185pub enum SchedulerEffect {
186 CommitNode {
187 node_id: String,
188 generation: u32,
189 },
190 RequeueNode {
191 node_id: String,
192 generation: u32,
193 reason: String,
194 },
195 ApplyGraphRevision {
196 revision_id: String,
197 },
198 SpawnWork {
199 work_id: String,
200 },
201 CancelWork {
202 work_id: String,
203 reason: String,
204 },
205 RequestApproval {
206 proposal_id: String,
207 },
208 StopWithCertificate {
209 certificate_id: String,
210 },
211}
212
213pub fn repair_to_effects(action: &RepairAction) -> Vec<SchedulerEffect> {
217 match action {
218 RepairAction::RetryNode {
219 node_id,
220 generation,
221 } => vec![SchedulerEffect::RequeueNode {
222 node_id: node_id.clone(),
223 generation: *generation,
224 reason: "retry".into(),
225 }],
226 RepairAction::ExpandScope {
227 node_id,
228 generation,
229 ..
230 } => vec![SchedulerEffect::RequeueNode {
231 node_id: node_id.clone(),
232 generation: generation + 1,
233 reason: "scope expanded".into(),
234 }],
235 RepairAction::SplitNode { child_goals, .. } => child_goals
236 .iter()
237 .map(|_| SchedulerEffect::SpawnWork {
238 work_id: uuid::Uuid::new_v4().to_string(),
239 })
240 .chain(std::iter::once(SchedulerEffect::ApplyGraphRevision {
241 revision_id: uuid::Uuid::new_v4().to_string(),
242 }))
243 .collect(),
244 RepairAction::InsertInterfaceNode { .. } | RepairAction::AddNode { .. } => {
245 vec![
246 SchedulerEffect::SpawnWork {
247 work_id: uuid::Uuid::new_v4().to_string(),
248 },
249 SchedulerEffect::ApplyGraphRevision {
250 revision_id: uuid::Uuid::new_v4().to_string(),
251 },
252 ]
253 }
254 RepairAction::RetireNode { .. } | RepairAction::ReplanSubgraph { .. } => {
255 vec![SchedulerEffect::ApplyGraphRevision {
256 revision_id: uuid::Uuid::new_v4().to_string(),
257 }]
258 }
259 RepairAction::StopNode { certificate_id, .. } => {
260 vec![SchedulerEffect::StopWithCertificate {
261 certificate_id: certificate_id.clone(),
262 }]
263 }
264 }
265}
266
267#[derive(Debug, Clone)]
269pub struct RunningTask {
270 pub node_id: String,
271 pub generation: u32,
272 pub footprint: Footprint,
273}
274
275#[derive(Debug)]
277pub struct Scheduler {
278 max_parallel: usize,
279 running: Vec<RunningTask>,
280 pub leases: LeaseTable,
281}
282
283impl Scheduler {
284 pub fn new(max_parallel: usize) -> Self {
285 Self {
286 max_parallel: max_parallel.max(1),
287 running: Vec::new(),
288 leases: LeaseTable::new(),
289 }
290 }
291
292 pub fn running_count(&self) -> usize {
293 self.running.len()
294 }
295
296 pub fn ready_nodes<'a, F>(
301 &self,
302 revision: &'a WorkGraphRevision,
303 footprint_of: F,
304 ) -> Vec<&'a WorkNode>
305 where
306 F: Fn(&WorkNode) -> Footprint,
307 {
308 let accepted: HashSet<&str> = revision
309 .nodes
310 .iter()
311 .filter(|n| n.is_accepted())
312 .map(|n| n.node_id.as_str())
313 .collect();
314
315 let mut ready = Vec::new();
316 let mut occupied: Vec<Footprint> =
319 self.running.iter().map(|t| t.footprint.clone()).collect();
320 let slots = self.max_parallel.saturating_sub(self.running.len());
321
322 for node in &revision.nodes {
323 if ready.len() >= slots {
324 break;
325 }
326 if !matches!(node.state, WorkNodeState::Pending | WorkNodeState::Ready) {
327 continue;
328 }
329 if !node.required_sensors.is_empty() {
331 continue;
332 }
333 let deps = revision.dependencies_of(&node.node_id);
335 if !deps.iter().all(|d| accepted.contains(d)) {
336 continue;
337 }
338 let fp = footprint_of(node);
340 if occupied.iter().any(|o| o.conflicts_with(&fp)) {
341 continue;
342 }
343 occupied.push(fp);
344 ready.push(node);
345 }
346 ready
347 }
348
349 pub fn start(&mut self, node: &WorkNode, footprint: Footprint) {
351 self.running.push(RunningTask {
352 node_id: node.node_id.clone(),
353 generation: node.generation,
354 footprint,
355 });
356 }
357
358 pub fn finish(&mut self, node_id: &str, generation: u32) {
360 self.running
361 .retain(|t| !(t.node_id == node_id && t.generation == generation));
362 }
363}
364
365#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
368#[serde(tag = "outcome", rename_all = "snake_case")]
369pub enum NodeOutcome {
370 Committed { node_id: String, generation: u32 },
371 Stopped { certificate: ResidualCertificate },
372 Escalated { node_id: String, reason: String },
373}
374
375pub fn recovery_is_total(revision: &WorkGraphRevision, outcomes: &[NodeOutcome]) -> bool {
378 let classified: HashMap<&str, usize> = {
379 let mut m: HashMap<&str, usize> = HashMap::new();
380 for o in outcomes {
381 let id = match o {
382 NodeOutcome::Committed { node_id, .. } => node_id.as_str(),
383 NodeOutcome::Stopped { certificate } => certificate.node_id.as_str(),
384 NodeOutcome::Escalated { node_id, .. } => node_id.as_str(),
385 };
386 *m.entry(id).or_default() += 1;
387 }
388 m
389 };
390 revision
391 .nodes
392 .iter()
393 .all(|n| classified.get(n.node_id.as_str()) == Some(&1))
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::workgraph::{EdgeKind, GraphRevisionReason, NodeClass, WorkEdge};
400
401 fn node(id: &str) -> WorkNode {
402 WorkNode::new(id, format!("goal {id}"), NodeClass::Implement)
403 }
404
405 fn rev(nodes: Vec<WorkNode>, edges: Vec<WorkEdge>) -> WorkGraphRevision {
406 WorkGraphRevision::build(0, None, GraphRevisionReason::InitialPlan, nodes, edges).unwrap()
407 }
408
409 #[test]
410 fn disjoint_footprints_commute() {
411 let a = Footprint::new().write(Resource::File("a.rs".into()));
412 let b = Footprint::new().write(Resource::File("b.rs".into()));
413 assert!(a.commutes_with(&b));
414 }
415
416 #[test]
417 fn write_read_overlap_does_not_commute() {
418 let a = Footprint::new().write(Resource::File("shared.rs".into()));
419 let b = Footprint::new().read(Resource::File("shared.rs".into()));
420 assert!(a.conflicts_with(&b));
421 }
422
423 #[test]
424 fn manifest_writes_serialize() {
425 let a = Footprint::new().write(Resource::Manifest("Cargo.toml".into()));
426 let b = Footprint::new().write(Resource::Manifest("Cargo.toml".into()));
427 assert!(a.conflicts_with(&b));
428 }
429
430 #[test]
431 fn capability_table_is_a_conflict_resource() {
432 let grant = Footprint::new().write(Resource::Capability("write-src".into()));
434 let commit = Footprint::new().read(Resource::Capability("write-src".into()));
435 assert!(grant.conflicts_with(&commit));
436 }
437
438 #[test]
439 fn ledger_root_and_fresh_id_serialize() {
440 let a = Footprint::new().write(Resource::LedgerRoot);
441 let b = Footprint::new().write(Resource::LedgerRoot);
442 assert!(a.conflicts_with(&b));
443 let c = Footprint::new().write(Resource::FreshIdAllocator);
444 let d = Footprint::new().write(Resource::FreshIdAllocator);
445 assert!(c.conflicts_with(&d));
446 }
447
448 #[test]
449 fn independent_nodes_are_ready_in_parallel() {
450 let nodes = vec![node("a"), node("b")];
451 let revision = rev(nodes, vec![]);
452 let sched = Scheduler::new(4);
453 let fp = |n: &WorkNode| Footprint::new().write(Resource::File(format!("{}.rs", n.node_id)));
454 let ready = sched.ready_nodes(&revision, fp);
455 assert_eq!(ready.len(), 2);
456 }
457
458 #[test]
459 fn dependent_node_waits_for_its_predecessor() {
460 let nodes = vec![node("a"), node("b")];
461 let edges = vec![WorkEdge::new("a", "b", EdgeKind::RequiresArtifact)];
462 let revision = rev(nodes, edges);
463 let sched = Scheduler::new(4);
464 let fp = |n: &WorkNode| Footprint::new().write(Resource::File(format!("{}.rs", n.node_id)));
465 let ready = sched.ready_nodes(&revision, fp);
466 assert_eq!(ready.len(), 1);
468 assert_eq!(ready[0].node_id, "a");
469 }
470
471 #[test]
472 fn conflicting_footprints_do_not_dispatch_together() {
473 let nodes = vec![node("a"), node("b")];
474 let revision = rev(nodes, vec![]);
475 let sched = Scheduler::new(4);
476 let fp = |_n: &WorkNode| Footprint::new().write(Resource::Manifest("Cargo.toml".into()));
478 let ready = sched.ready_nodes(&revision, fp);
479 assert_eq!(ready.len(), 1);
480 }
481
482 #[test]
483 fn inserted_node_becomes_ready_after_revision() {
484 let mut nodes = vec![node("a")];
486 nodes[0].state = WorkNodeState::Stable;
487 let revision = rev(nodes, vec![]);
488 let mut nodes2 = revision.nodes.clone();
490 nodes2.push(node("b"));
491 let revision2 = WorkGraphRevision::build(
492 1,
493 Some(revision.revision_id.clone()),
494 GraphRevisionReason::LocalRepair,
495 nodes2,
496 vec![],
497 )
498 .unwrap();
499 let sched = Scheduler::new(4);
500 let fp = |n: &WorkNode| Footprint::new().write(Resource::File(format!("{}.rs", n.node_id)));
501 let ready = sched.ready_nodes(&revision2, fp);
502 assert!(
503 ready.iter().any(|n| n.node_id == "b"),
504 "inserted node must be ready"
505 );
506 }
507
508 #[test]
509 fn leases_are_exclusive() {
510 let mut table = LeaseTable::new();
511 let scope = Resource::Toolchain("cargo".into());
512 let l1 = table.acquire("w1", LeaseKind::Toolchain, scope.clone());
513 assert!(l1.is_some());
514 assert!(table
515 .acquire("w2", LeaseKind::Toolchain, scope.clone())
516 .is_none());
517 table.release(&l1.unwrap());
518 assert!(table.acquire("w2", LeaseKind::Toolchain, scope).is_some());
519 }
520
521 #[test]
522 fn repair_retry_becomes_requeue_effect() {
523 let effects = repair_to_effects(&RepairAction::RetryNode {
524 node_id: "a".into(),
525 generation: 0,
526 });
527 assert_eq!(
528 effects,
529 vec![SchedulerEffect::RequeueNode {
530 node_id: "a".into(),
531 generation: 0,
532 reason: "retry".into()
533 }]
534 );
535 }
536
537 #[test]
538 fn split_produces_spawn_and_revision_effects() {
539 let effects = repair_to_effects(&RepairAction::SplitNode {
540 node_id: "a".into(),
541 generation: 0,
542 child_goals: vec!["x".into(), "y".into()],
543 });
544 let spawns = effects
545 .iter()
546 .filter(|e| matches!(e, SchedulerEffect::SpawnWork { .. }))
547 .count();
548 let revs = effects
549 .iter()
550 .filter(|e| matches!(e, SchedulerEffect::ApplyGraphRevision { .. }))
551 .count();
552 assert_eq!(spawns, 2);
553 assert_eq!(revs, 1);
554 }
555
556 #[test]
557 fn recovery_totality_requires_every_node_classified() {
558 let nodes = vec![node("a"), node("b")];
559 let revision = rev(nodes, vec![]);
560 let outcomes = vec![NodeOutcome::Committed {
561 node_id: "a".into(),
562 generation: 0,
563 }];
564 assert!(!recovery_is_total(&revision, &outcomes));
565 let outcomes = vec![
566 NodeOutcome::Committed {
567 node_id: "a".into(),
568 generation: 0,
569 },
570 NodeOutcome::Escalated {
571 node_id: "b".into(),
572 reason: "blocked".into(),
573 },
574 ];
575 assert!(recovery_is_total(&revision, &outcomes));
576 }
577}