Skip to main content

perspt_sdk/
scheduler.rs

1//! Dependency-aware parallel ready-queue scheduler (PSP-8 System 4).
2//!
3//! The scheduler runs independent work in parallel and serializes only work that
4//! conflicts through dependencies, leases, or non-commuting durable effects. Two
5//! durable turns commute — and need no relative order — only when their semantic
6//! read and write footprints are disjoint:
7//!
8//! ```text
9//! writes(e) ∩ reads(e') = ∅  and  writes(e') ∩ reads(e) = ∅.
10//! ```
11//!
12//! The conflict footprint includes durable platform state, not only workspace
13//! files: the capability table, risk budgets, fresh-identifier allocation, and
14//! the ledger root are modeled as read/write resources.
15
16use std::collections::{BTreeSet, HashMap, HashSet};
17
18use serde::{Deserialize, Serialize};
19
20use crate::certificate::ResidualCertificate;
21use crate::workgraph::{WorkGraphRevision, WorkNode, WorkNodeState};
22
23/// A durable resource that turns read or write. Overlapping footprints force
24/// serialization (PSP-8 System 4 / Theorem 8).
25#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
26pub enum Resource {
27    File(String),
28    Interface(String),
29    Manifest(String),
30    Lockfile(String),
31    Migration(String),
32    TestFixture(String),
33    Toolchain(String),
34    /// A specific capability in the capability table `Γ`.
35    Capability(String),
36    /// A named risk budget.
37    RiskBudget(String),
38    /// The fresh-identifier allocator.
39    FreshIdAllocator,
40    /// The ledger root (Merkle head).
41    LedgerRoot,
42}
43
44/// The read/write footprint of a turn.
45#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
46pub struct Footprint {
47    pub reads: BTreeSet<Resource>,
48    pub writes: BTreeSet<Resource>,
49}
50
51impl Footprint {
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    pub fn read(mut self, r: Resource) -> Self {
57        self.reads.insert(r);
58        self
59    }
60
61    pub fn write(mut self, r: Resource) -> Self {
62        self.writes.insert(r);
63        self
64    }
65
66    /// Whether this turn commutes with `other`: write/read and write/write
67    /// footprints must be disjoint in both directions.
68    pub fn commutes_with(&self, other: &Footprint) -> bool {
69        self.writes.is_disjoint(&other.reads)
70            && other.writes.is_disjoint(&self.reads)
71            && self.writes.is_disjoint(&other.writes)
72    }
73
74    pub fn conflicts_with(&self, other: &Footprint) -> bool {
75        !self.commutes_with(other)
76    }
77}
78
79/// What a lease protects.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum LeaseKind {
83    /// Serializes all graph revisions against each other.
84    GraphWrite,
85    /// Package-manager / dependency mutation.
86    Toolchain,
87    /// Exclusive access to a workspace resource.
88    Resource,
89}
90
91/// A held lease.
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct ExecutionLease {
94    pub lease_id: String,
95    pub holder_work_id: String,
96    pub kind: LeaseKind,
97    pub scope: Resource,
98}
99
100/// The lease table, tracking exclusive grants.
101#[derive(Debug, Clone, Default)]
102pub struct LeaseTable {
103    held: Vec<ExecutionLease>,
104}
105
106impl LeaseTable {
107    pub fn new() -> Self {
108        Self::default()
109    }
110
111    /// Whether a lease over `scope` is available (not already held by another).
112    pub fn is_available(&self, scope: &Resource) -> bool {
113        !self.held.iter().any(|l| &l.scope == scope)
114    }
115
116    /// Acquire a lease, returning its id, or `None` if it is unavailable.
117    pub fn acquire(&mut self, holder: &str, kind: LeaseKind, scope: Resource) -> Option<String> {
118        if !self.is_available(&scope) {
119            return None;
120        }
121        let lease_id = uuid::Uuid::new_v4().to_string();
122        self.held.push(ExecutionLease {
123            lease_id: lease_id.clone(),
124            holder_work_id: holder.to_string(),
125            kind,
126            scope,
127        });
128        Some(lease_id)
129    }
130
131    pub fn release(&mut self, lease_id: &str) {
132        self.held.retain(|l| l.lease_id != lease_id);
133    }
134
135    pub fn held_count(&self) -> usize {
136        self.held.len()
137    }
138}
139
140/// A repair outcome that becomes durable scheduler work (PSP-8 System 4).
141#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
142#[serde(tag = "action", rename_all = "snake_case")]
143pub enum RepairAction {
144    RetryNode {
145        node_id: String,
146        generation: u32,
147    },
148    ExpandScope {
149        node_id: String,
150        generation: u32,
151        added_paths: Vec<String>,
152    },
153    SplitNode {
154        node_id: String,
155        generation: u32,
156        child_goals: Vec<String>,
157    },
158    InsertInterfaceNode {
159        boundary: String,
160    },
161    AddNode {
162        goal: String,
163        reason: String,
164    },
165    RetireNode {
166        node_id: String,
167        generation: u32,
168        reason: String,
169    },
170    ReplanSubgraph {
171        root: String,
172        affected: Vec<String>,
173    },
174    StopNode {
175        node_id: String,
176        generation: u32,
177        certificate_id: String,
178    },
179}
180
181/// A durable scheduler command consumed by the ready-queue loop (PSP-8
182/// `SchedulerEffect`).
183#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
184#[serde(tag = "effect", rename_all = "snake_case")]
185pub enum SchedulerEffect {
186    CommitNode {
187        node_id: String,
188        generation: u32,
189    },
190    RequeueNode {
191        node_id: String,
192        generation: u32,
193        reason: String,
194    },
195    ApplyGraphRevision {
196        revision_id: String,
197    },
198    SpawnWork {
199        work_id: String,
200    },
201    CancelWork {
202        work_id: String,
203        reason: String,
204    },
205    RequestApproval {
206        proposal_id: String,
207    },
208    StopWithCertificate {
209        certificate_id: String,
210    },
211}
212
213/// Convert a repair action into scheduler effects. A local repair that produces
214/// new executable work SHALL NOT escalate; it returns effects the ready-queue
215/// loop consumes (PSP-8 System 4).
216pub fn repair_to_effects(action: &RepairAction) -> Vec<SchedulerEffect> {
217    match action {
218        RepairAction::RetryNode {
219            node_id,
220            generation,
221        } => vec![SchedulerEffect::RequeueNode {
222            node_id: node_id.clone(),
223            generation: *generation,
224            reason: "retry".into(),
225        }],
226        RepairAction::ExpandScope {
227            node_id,
228            generation,
229            ..
230        } => vec![SchedulerEffect::RequeueNode {
231            node_id: node_id.clone(),
232            generation: generation + 1,
233            reason: "scope expanded".into(),
234        }],
235        RepairAction::SplitNode { child_goals, .. } => child_goals
236            .iter()
237            .map(|_| SchedulerEffect::SpawnWork {
238                work_id: uuid::Uuid::new_v4().to_string(),
239            })
240            .chain(std::iter::once(SchedulerEffect::ApplyGraphRevision {
241                revision_id: uuid::Uuid::new_v4().to_string(),
242            }))
243            .collect(),
244        RepairAction::InsertInterfaceNode { .. } | RepairAction::AddNode { .. } => {
245            vec![
246                SchedulerEffect::SpawnWork {
247                    work_id: uuid::Uuid::new_v4().to_string(),
248                },
249                SchedulerEffect::ApplyGraphRevision {
250                    revision_id: uuid::Uuid::new_v4().to_string(),
251                },
252            ]
253        }
254        RepairAction::RetireNode { .. } | RepairAction::ReplanSubgraph { .. } => {
255            vec![SchedulerEffect::ApplyGraphRevision {
256                revision_id: uuid::Uuid::new_v4().to_string(),
257            }]
258        }
259        RepairAction::StopNode { certificate_id, .. } => {
260            vec![SchedulerEffect::StopWithCertificate {
261                certificate_id: certificate_id.clone(),
262            }]
263        }
264    }
265}
266
267/// A node currently executing, with the footprint it occupies.
268#[derive(Debug, Clone)]
269pub struct RunningTask {
270    pub node_id: String,
271    pub generation: u32,
272    pub footprint: Footprint,
273}
274
275/// The mutable parallel scheduler.
276#[derive(Debug)]
277pub struct Scheduler {
278    max_parallel: usize,
279    running: Vec<RunningTask>,
280    pub leases: LeaseTable,
281}
282
283impl Scheduler {
284    pub fn new(max_parallel: usize) -> Self {
285        Self {
286            max_parallel: max_parallel.max(1),
287            running: Vec::new(),
288            leases: LeaseTable::new(),
289        }
290    }
291
292    pub fn running_count(&self) -> usize {
293        self.running.len()
294    }
295
296    /// Compute the ready set: pending nodes whose dependencies are all accepted,
297    /// whose required sensors are resolved, and whose footprints do not conflict
298    /// with currently-running tasks. `footprint_of` supplies each node's
299    /// footprint (domain-derived from output targets and edges).
300    pub fn ready_nodes<'a, F>(
301        &self,
302        revision: &'a WorkGraphRevision,
303        footprint_of: F,
304    ) -> Vec<&'a WorkNode>
305    where
306        F: Fn(&WorkNode) -> Footprint,
307    {
308        let accepted: HashSet<&str> = revision
309            .nodes
310            .iter()
311            .filter(|n| n.is_accepted())
312            .map(|n| n.node_id.as_str())
313            .collect();
314
315        let mut ready = Vec::new();
316        // Footprints occupied by running tasks plus already-selected ready nodes,
317        // so two conflicting ready nodes are not dispatched together.
318        let mut occupied: Vec<Footprint> =
319            self.running.iter().map(|t| t.footprint.clone()).collect();
320        let slots = self.max_parallel.saturating_sub(self.running.len());
321
322        for node in &revision.nodes {
323            if ready.len() >= slots {
324                break;
325            }
326            if !matches!(node.state, WorkNodeState::Pending | WorkNodeState::Ready) {
327                continue;
328            }
329            // Required sensors resolved?
330            if !node.required_sensors.is_empty() {
331                continue;
332            }
333            // All dependencies accepted?
334            let deps = revision.dependencies_of(&node.node_id);
335            if !deps.iter().all(|d| accepted.contains(d)) {
336                continue;
337            }
338            // Footprint free?
339            let fp = footprint_of(node);
340            if occupied.iter().any(|o| o.conflicts_with(&fp)) {
341                continue;
342            }
343            occupied.push(fp);
344            ready.push(node);
345        }
346        ready
347    }
348
349    /// Mark a node as running, occupying its footprint.
350    pub fn start(&mut self, node: &WorkNode, footprint: Footprint) {
351        self.running.push(RunningTask {
352            node_id: node.node_id.clone(),
353            generation: node.generation,
354            footprint,
355        });
356    }
357
358    /// Mark a running node as finished, freeing its footprint.
359    pub fn finish(&mut self, node_id: &str, generation: u32) {
360        self.running
361            .retain(|t| !(t.node_id == node_id && t.generation == generation));
362    }
363}
364
365/// A terminal classification for a node generation, ensuring recovery is total:
366/// every node ends accepted, certified-stopped, or escalated (PSP-8 System 4).
367#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
368#[serde(tag = "outcome", rename_all = "snake_case")]
369pub enum NodeOutcome {
370    Committed { node_id: String, generation: u32 },
371    Stopped { certificate: ResidualCertificate },
372    Escalated { node_id: String, reason: String },
373}
374
375/// Validate that a set of node outcomes covers every node exactly once with a
376/// terminal classification (recovery totality check).
377pub fn recovery_is_total(revision: &WorkGraphRevision, outcomes: &[NodeOutcome]) -> bool {
378    let classified: HashMap<&str, usize> = {
379        let mut m: HashMap<&str, usize> = HashMap::new();
380        for o in outcomes {
381            let id = match o {
382                NodeOutcome::Committed { node_id, .. } => node_id.as_str(),
383                NodeOutcome::Stopped { certificate } => certificate.node_id.as_str(),
384                NodeOutcome::Escalated { node_id, .. } => node_id.as_str(),
385            };
386            *m.entry(id).or_default() += 1;
387        }
388        m
389    };
390    revision
391        .nodes
392        .iter()
393        .all(|n| classified.get(n.node_id.as_str()) == Some(&1))
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::workgraph::{EdgeKind, GraphRevisionReason, NodeClass, WorkEdge};
400
401    fn node(id: &str) -> WorkNode {
402        WorkNode::new(id, format!("goal {id}"), NodeClass::Implement)
403    }
404
405    fn rev(nodes: Vec<WorkNode>, edges: Vec<WorkEdge>) -> WorkGraphRevision {
406        WorkGraphRevision::build(0, None, GraphRevisionReason::InitialPlan, nodes, edges).unwrap()
407    }
408
409    #[test]
410    fn disjoint_footprints_commute() {
411        let a = Footprint::new().write(Resource::File("a.rs".into()));
412        let b = Footprint::new().write(Resource::File("b.rs".into()));
413        assert!(a.commutes_with(&b));
414    }
415
416    #[test]
417    fn write_read_overlap_does_not_commute() {
418        let a = Footprint::new().write(Resource::File("shared.rs".into()));
419        let b = Footprint::new().read(Resource::File("shared.rs".into()));
420        assert!(a.conflicts_with(&b));
421    }
422
423    #[test]
424    fn manifest_writes_serialize() {
425        let a = Footprint::new().write(Resource::Manifest("Cargo.toml".into()));
426        let b = Footprint::new().write(Resource::Manifest("Cargo.toml".into()));
427        assert!(a.conflicts_with(&b));
428    }
429
430    #[test]
431    fn capability_table_is_a_conflict_resource() {
432        // A capability grant conflicts with a commit that reads that capability.
433        let grant = Footprint::new().write(Resource::Capability("write-src".into()));
434        let commit = Footprint::new().read(Resource::Capability("write-src".into()));
435        assert!(grant.conflicts_with(&commit));
436    }
437
438    #[test]
439    fn ledger_root_and_fresh_id_serialize() {
440        let a = Footprint::new().write(Resource::LedgerRoot);
441        let b = Footprint::new().write(Resource::LedgerRoot);
442        assert!(a.conflicts_with(&b));
443        let c = Footprint::new().write(Resource::FreshIdAllocator);
444        let d = Footprint::new().write(Resource::FreshIdAllocator);
445        assert!(c.conflicts_with(&d));
446    }
447
448    #[test]
449    fn independent_nodes_are_ready_in_parallel() {
450        let nodes = vec![node("a"), node("b")];
451        let revision = rev(nodes, vec![]);
452        let sched = Scheduler::new(4);
453        let fp = |n: &WorkNode| Footprint::new().write(Resource::File(format!("{}.rs", n.node_id)));
454        let ready = sched.ready_nodes(&revision, fp);
455        assert_eq!(ready.len(), 2);
456    }
457
458    #[test]
459    fn dependent_node_waits_for_its_predecessor() {
460        let nodes = vec![node("a"), node("b")];
461        let edges = vec![WorkEdge::new("a", "b", EdgeKind::RequiresArtifact)];
462        let revision = rev(nodes, edges);
463        let sched = Scheduler::new(4);
464        let fp = |n: &WorkNode| Footprint::new().write(Resource::File(format!("{}.rs", n.node_id)));
465        let ready = sched.ready_nodes(&revision, fp);
466        // Only `a` is ready; `b` depends on (not-yet-accepted) `a`.
467        assert_eq!(ready.len(), 1);
468        assert_eq!(ready[0].node_id, "a");
469    }
470
471    #[test]
472    fn conflicting_footprints_do_not_dispatch_together() {
473        let nodes = vec![node("a"), node("b")];
474        let revision = rev(nodes, vec![]);
475        let sched = Scheduler::new(4);
476        // Both touch the same manifest -> only one is ready at a time.
477        let fp = |_n: &WorkNode| Footprint::new().write(Resource::Manifest("Cargo.toml".into()));
478        let ready = sched.ready_nodes(&revision, fp);
479        assert_eq!(ready.len(), 1);
480    }
481
482    #[test]
483    fn inserted_node_becomes_ready_after_revision() {
484        // The static-snapshot bug: a node inserted by a repair must be executed.
485        let mut nodes = vec![node("a")];
486        nodes[0].state = WorkNodeState::Stable;
487        let revision = rev(nodes, vec![]);
488        // Repair inserts node "b".
489        let mut nodes2 = revision.nodes.clone();
490        nodes2.push(node("b"));
491        let revision2 = WorkGraphRevision::build(
492            1,
493            Some(revision.revision_id.clone()),
494            GraphRevisionReason::LocalRepair,
495            nodes2,
496            vec![],
497        )
498        .unwrap();
499        let sched = Scheduler::new(4);
500        let fp = |n: &WorkNode| Footprint::new().write(Resource::File(format!("{}.rs", n.node_id)));
501        let ready = sched.ready_nodes(&revision2, fp);
502        assert!(
503            ready.iter().any(|n| n.node_id == "b"),
504            "inserted node must be ready"
505        );
506    }
507
508    #[test]
509    fn leases_are_exclusive() {
510        let mut table = LeaseTable::new();
511        let scope = Resource::Toolchain("cargo".into());
512        let l1 = table.acquire("w1", LeaseKind::Toolchain, scope.clone());
513        assert!(l1.is_some());
514        assert!(table
515            .acquire("w2", LeaseKind::Toolchain, scope.clone())
516            .is_none());
517        table.release(&l1.unwrap());
518        assert!(table.acquire("w2", LeaseKind::Toolchain, scope).is_some());
519    }
520
521    #[test]
522    fn repair_retry_becomes_requeue_effect() {
523        let effects = repair_to_effects(&RepairAction::RetryNode {
524            node_id: "a".into(),
525            generation: 0,
526        });
527        assert_eq!(
528            effects,
529            vec![SchedulerEffect::RequeueNode {
530                node_id: "a".into(),
531                generation: 0,
532                reason: "retry".into()
533            }]
534        );
535    }
536
537    #[test]
538    fn split_produces_spawn_and_revision_effects() {
539        let effects = repair_to_effects(&RepairAction::SplitNode {
540            node_id: "a".into(),
541            generation: 0,
542            child_goals: vec!["x".into(), "y".into()],
543        });
544        let spawns = effects
545            .iter()
546            .filter(|e| matches!(e, SchedulerEffect::SpawnWork { .. }))
547            .count();
548        let revs = effects
549            .iter()
550            .filter(|e| matches!(e, SchedulerEffect::ApplyGraphRevision { .. }))
551            .count();
552        assert_eq!(spawns, 2);
553        assert_eq!(revs, 1);
554    }
555
556    #[test]
557    fn recovery_totality_requires_every_node_classified() {
558        let nodes = vec![node("a"), node("b")];
559        let revision = rev(nodes, vec![]);
560        let outcomes = vec![NodeOutcome::Committed {
561            node_id: "a".into(),
562            generation: 0,
563        }];
564        assert!(!recovery_is_total(&revision, &outcomes));
565        let outcomes = vec![
566            NodeOutcome::Committed {
567                node_id: "a".into(),
568                generation: 0,
569            },
570            NodeOutcome::Escalated {
571                node_id: "b".into(),
572                reason: "blocked".into(),
573            },
574        ];
575        assert!(recovery_is_total(&revision, &outcomes));
576    }
577}