Skip to main content

perspt_sdk/
workgraph.rs

1//! Mutable, revisioned work graph (PSP-8 System 4).
2//!
3//! Each graph revision is acyclic, but the session as a whole may add, retire,
4//! split, merge, update, or requeue nodes as verifier evidence changes. Updating
5//! a node creates a new [`WorkNode`] generation rather than mutating it in place;
6//! retired generations remain in the ledger but are no longer executable.
7
8use std::collections::{BTreeSet, HashMap, HashSet};
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::{Result, SdkError};
13use crate::residual::ResidualEventRef;
14
15/// The kind of work a node performs.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum NodeClass {
19    Explore,
20    Plan,
21    Implement,
22    Verify,
23    Test,
24    Integrate,
25    Repair,
26    Interface,
27}
28
29/// Execution state of a node generation.
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum WorkNodeState {
33    /// Waiting on dependencies, sensors, or leases.
34    Pending,
35    /// Eligible for dispatch.
36    Ready,
37    /// Currently executing.
38    Running,
39    /// Accepted (hard pass or descent).
40    Stable,
41    /// Stopped with a residual certificate.
42    Stopped { certificate_id: String },
43    /// Superseded by a newer generation.
44    Retired { reason: String },
45    /// Blocked on a missing or degraded required sensor.
46    BlockedOnSensor { sensor: String },
47}
48
49/// Typed edge semantics (PSP-8 System 4).
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub enum EdgeKind {
53    /// Destination reads a file produced by source.
54    RequiresArtifact,
55    /// Destination relies on a sealed signature/schema/symbol.
56    RequiresInterface,
57    /// Destination test node validates source implementation node.
58    Tests,
59    /// Destination reconciles a cross-node/domain/adapter boundary.
60    Integrates,
61    /// Nodes touch non-commuting durable state and must serialize.
62    ConflictsWith,
63    /// Graph-rewrite lineage edge for audit.
64    DerivedFrom,
65    /// Node cannot execute until a required sensor exists or is downgraded.
66    BlocksOnSensor,
67}
68
69impl EdgeKind {
70    /// Whether this edge imposes an execution-ordering dependency (the source
71    /// must reach a stable state before the destination is ready). `DerivedFrom`
72    /// is audit-only and `ConflictsWith` is handled by footprint serialization,
73    /// not readiness.
74    pub fn is_dependency(self) -> bool {
75        matches!(
76            self,
77            EdgeKind::RequiresArtifact
78                | EdgeKind::RequiresInterface
79                | EdgeKind::Tests
80                | EdgeKind::Integrates
81        )
82    }
83}
84
85/// A directed edge in a graph revision.
86#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
87pub struct WorkEdge {
88    pub src: String,
89    pub dst: String,
90    pub kind: EdgeKind,
91}
92
93impl WorkEdge {
94    pub fn new(src: impl Into<String>, dst: impl Into<String>, kind: EdgeKind) -> Self {
95        Self {
96            src: src.into(),
97            dst: dst.into(),
98            kind,
99        }
100    }
101}
102
103/// An immutable incarnation of a node (PSP-8 `WorkNode`).
104#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
105pub struct WorkNode {
106    pub node_id: String,
107    pub generation: u32,
108    pub goal: String,
109    pub node_class: NodeClass,
110    pub owner_domains: Vec<String>,
111    pub output_targets: Vec<String>,
112    /// Required sensors that gate readiness (mapped from `BlocksOnSensor`).
113    pub required_sensors: Vec<String>,
114    pub state: WorkNodeState,
115}
116
117impl WorkNode {
118    pub fn new(node_id: impl Into<String>, goal: impl Into<String>, node_class: NodeClass) -> Self {
119        Self {
120            node_id: node_id.into(),
121            generation: 0,
122            goal: goal.into(),
123            node_class,
124            owner_domains: Vec::new(),
125            output_targets: Vec::new(),
126            required_sensors: Vec::new(),
127            state: WorkNodeState::Pending,
128        }
129    }
130
131    pub fn with_outputs(mut self, outputs: Vec<String>) -> Self {
132        self.output_targets = outputs;
133        self
134    }
135
136    /// Create the next generation of this node, resetting it to pending.
137    pub fn next_generation(&self) -> Self {
138        Self {
139            generation: self.generation + 1,
140            state: WorkNodeState::Pending,
141            ..self.clone()
142        }
143    }
144
145    pub fn is_terminal(&self) -> bool {
146        matches!(
147            self.state,
148            WorkNodeState::Stable | WorkNodeState::Stopped { .. } | WorkNodeState::Retired { .. }
149        )
150    }
151
152    pub fn is_accepted(&self) -> bool {
153        matches!(self.state, WorkNodeState::Stable)
154    }
155}
156
157/// Why a graph revision was produced.
158#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
159#[serde(rename_all = "snake_case")]
160pub enum GraphRevisionReason {
161    InitialPlan,
162    LocalRepair,
163    ScopeExpansion,
164    UserEdit,
165    Replan,
166}
167
168/// Result of validating a revision before activation.
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
170pub struct GraphValidationReport {
171    pub acyclic: bool,
172    /// A topological order over dependency edges, if acyclic.
173    pub topo_order: Vec<String>,
174    pub dangling_edges: Vec<WorkEdge>,
175}
176
177/// A durable version of the work graph (PSP-8 `WorkGraphRevision`).
178#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
179pub struct WorkGraphRevision {
180    pub revision_id: String,
181    pub sequence: u32,
182    pub parent_revision_id: Option<String>,
183    pub reason: GraphRevisionReason,
184    pub nodes: Vec<WorkNode>,
185    pub edges: Vec<WorkEdge>,
186    pub validation: GraphValidationReport,
187    pub evidence: Vec<ResidualEventRef>,
188}
189
190impl WorkGraphRevision {
191    /// Build and validate a revision. Returns an error if it is not acyclic over
192    /// dependency edges; every revision SHALL validate acyclicity before
193    /// activation (PSP-8 System 4).
194    pub fn build(
195        sequence: u32,
196        parent_revision_id: Option<String>,
197        reason: GraphRevisionReason,
198        nodes: Vec<WorkNode>,
199        edges: Vec<WorkEdge>,
200    ) -> Result<Self> {
201        let validation = validate(&nodes, &edges)?;
202        if !validation.acyclic {
203            return Err(SdkError::Domain("graph revision is not acyclic".into()));
204        }
205        if !validation.dangling_edges.is_empty() {
206            return Err(SdkError::Domain(format!(
207                "graph revision has {} dangling edge(s)",
208                validation.dangling_edges.len()
209            )));
210        }
211        Ok(Self {
212            revision_id: uuid::Uuid::new_v4().to_string(),
213            sequence,
214            parent_revision_id,
215            reason,
216            nodes,
217            edges,
218            validation,
219            evidence: Vec::new(),
220        })
221    }
222
223    pub fn node(&self, node_id: &str) -> Option<&WorkNode> {
224        self.nodes.iter().find(|n| n.node_id == node_id)
225    }
226
227    /// Dependency predecessors of a node (sources of dependency edges into it).
228    pub fn dependencies_of(&self, node_id: &str) -> Vec<&str> {
229        self.edges
230            .iter()
231            .filter(|e| e.dst == node_id && e.kind.is_dependency())
232            .map(|e| e.src.as_str())
233            .collect()
234    }
235
236    /// Nodes that conflict with the given node via an explicit `ConflictsWith`
237    /// edge (in addition to footprint-derived conflicts handled by the
238    /// scheduler).
239    pub fn explicit_conflicts_of(&self, node_id: &str) -> Vec<&str> {
240        self.edges
241            .iter()
242            .filter(|e| e.kind == EdgeKind::ConflictsWith && (e.src == node_id || e.dst == node_id))
243            .map(|e| {
244                if e.src == node_id {
245                    e.dst.as_str()
246                } else {
247                    e.src.as_str()
248                }
249            })
250            .collect()
251    }
252}
253
254/// Validate a node/edge set: check that all edge endpoints exist and that the
255/// dependency subgraph is acyclic (Kahn's algorithm).
256pub fn validate(nodes: &[WorkNode], edges: &[WorkEdge]) -> Result<GraphValidationReport> {
257    let ids: HashSet<&str> = nodes.iter().map(|n| n.node_id.as_str()).collect();
258    let dangling: Vec<WorkEdge> = edges
259        .iter()
260        .filter(|e| !ids.contains(e.src.as_str()) || !ids.contains(e.dst.as_str()))
261        .cloned()
262        .collect();
263
264    // Kahn's algorithm over dependency edges only.
265    let mut indegree: HashMap<&str, usize> =
266        nodes.iter().map(|n| (n.node_id.as_str(), 0)).collect();
267    let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
268    for e in edges.iter().filter(|e| e.kind.is_dependency()) {
269        if !ids.contains(e.src.as_str()) || !ids.contains(e.dst.as_str()) {
270            continue;
271        }
272        adj.entry(e.src.as_str()).or_default().push(e.dst.as_str());
273        *indegree.get_mut(e.dst.as_str()).unwrap() += 1;
274    }
275
276    let mut queue: BTreeSet<&str> = indegree
277        .iter()
278        .filter(|(_, &d)| d == 0)
279        .map(|(&n, _)| n)
280        .collect();
281    let mut topo_order = Vec::new();
282    while let Some(&n) = queue.iter().next() {
283        queue.remove(n);
284        topo_order.push(n.to_string());
285        if let Some(succs) = adj.get(n) {
286            for &s in succs {
287                let d = indegree.get_mut(s).unwrap();
288                *d -= 1;
289                if *d == 0 {
290                    queue.insert(s);
291                }
292            }
293        }
294    }
295
296    let acyclic = topo_order.len() == nodes.len();
297    Ok(GraphValidationReport {
298        acyclic,
299        topo_order: if acyclic { topo_order } else { Vec::new() },
300        dangling_edges: dangling,
301    })
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    fn node(id: &str) -> WorkNode {
309        WorkNode::new(id, format!("goal {id}"), NodeClass::Implement)
310    }
311
312    #[test]
313    fn acyclic_graph_validates_with_topo_order() {
314        let nodes = vec![node("a"), node("b"), node("c")];
315        let edges = vec![
316            WorkEdge::new("a", "b", EdgeKind::RequiresArtifact),
317            WorkEdge::new("b", "c", EdgeKind::RequiresInterface),
318        ];
319        let rev = WorkGraphRevision::build(0, None, GraphRevisionReason::InitialPlan, nodes, edges)
320            .unwrap();
321        assert!(rev.validation.acyclic);
322        assert_eq!(rev.validation.topo_order, vec!["a", "b", "c"]);
323    }
324
325    #[test]
326    fn cyclic_graph_is_rejected() {
327        let nodes = vec![node("a"), node("b")];
328        let edges = vec![
329            WorkEdge::new("a", "b", EdgeKind::RequiresArtifact),
330            WorkEdge::new("b", "a", EdgeKind::RequiresArtifact),
331        ];
332        assert!(
333            WorkGraphRevision::build(0, None, GraphRevisionReason::InitialPlan, nodes, edges)
334                .is_err()
335        );
336    }
337
338    #[test]
339    fn dangling_edge_is_rejected() {
340        let nodes = vec![node("a")];
341        let edges = vec![WorkEdge::new("a", "ghost", EdgeKind::RequiresArtifact)];
342        assert!(
343            WorkGraphRevision::build(0, None, GraphRevisionReason::InitialPlan, nodes, edges)
344                .is_err()
345        );
346    }
347
348    #[test]
349    fn conflicts_with_does_not_create_dependency_cycle() {
350        // ConflictsWith is bidirectional but is not a dependency edge, so a pair
351        // of conflicting nodes is still a valid acyclic revision.
352        let nodes = vec![node("a"), node("b")];
353        let edges = vec![WorkEdge::new("a", "b", EdgeKind::ConflictsWith)];
354        let rev = WorkGraphRevision::build(0, None, GraphRevisionReason::InitialPlan, nodes, edges)
355            .unwrap();
356        assert!(rev.validation.acyclic);
357        assert_eq!(rev.explicit_conflicts_of("a"), vec!["b"]);
358    }
359
360    #[test]
361    fn next_generation_resets_state() {
362        let mut n = node("a");
363        n.state = WorkNodeState::Stable;
364        let g1 = n.next_generation();
365        assert_eq!(g1.generation, 1);
366        assert_eq!(g1.state, WorkNodeState::Pending);
367    }
368
369    #[test]
370    fn dependencies_exclude_audit_and_conflict_edges() {
371        let nodes = vec![node("a"), node("b")];
372        let edges = vec![
373            WorkEdge::new("a", "b", EdgeKind::DerivedFrom),
374            WorkEdge::new("a", "b", EdgeKind::ConflictsWith),
375        ];
376        let rev = WorkGraphRevision::build(0, None, GraphRevisionReason::LocalRepair, nodes, edges)
377            .unwrap();
378        assert!(rev.dependencies_of("b").is_empty());
379    }
380}