Skip to main content

devsper_graph/
actor.rs

1use crate::{
2    event_log::EventLog,
3    mutation::{MutationRequest, MutationResult},
4    snapshot::build_snapshot,
5    validator::MutationValidator,
6};
7use devsper_core::{
8    now_ms, GraphEvent, GraphMutation, GraphSnapshot, Node, NodeId, NodeSpec, NodeStatus, RunId,
9};
10use petgraph::graph::{DiGraph, NodeIndex};
11use std::collections::{HashMap, HashSet};
12use tokio::sync::{mpsc, oneshot};
13use tracing::{debug, info, warn};
14
15/// Configuration for the graph actor
16#[derive(Debug, Clone)]
17pub struct GraphConfig {
18    pub run_id: RunId,
19    pub snapshot_interval: u64,
20    pub max_depth: u32,
21}
22
23impl Default for GraphConfig {
24    fn default() -> Self {
25        Self {
26            run_id: RunId::new(),
27            snapshot_interval: 1000,
28            max_depth: 10,
29        }
30    }
31}
32
33/// Messages the GraphActor processes
34enum ActorMessage {
35    Mutate(MutationRequest),
36    GetReady(oneshot::Sender<Vec<NodeId>>),
37    ClaimNode(NodeId, oneshot::Sender<bool>),
38    CompleteNode(NodeId, serde_json::Value),
39    FailNode(NodeId, String),
40    GetSnapshot(oneshot::Sender<GraphSnapshot>),
41    Shutdown,
42}
43
44/// Handle for interacting with a running GraphActor from other tasks
45#[derive(Clone)]
46pub struct GraphHandle {
47    sender: mpsc::Sender<ActorMessage>,
48}
49
50impl GraphHandle {
51    /// Apply a mutation to the graph. Returns Err if rejected or actor is gone.
52    pub async fn mutate(&self, mutation: GraphMutation) -> Result<(), String> {
53        let (req, rx) = MutationRequest::new(mutation);
54        self.sender
55            .send(ActorMessage::Mutate(req))
56            .await
57            .map_err(|_| "GraphActor has shut down".to_string())?;
58        match rx.await.map_err(|_| "GraphActor dropped response".to_string())? {
59            MutationResult::Applied => Ok(()),
60            MutationResult::Rejected { reason } => Err(reason),
61        }
62    }
63
64    /// Get all currently ready (runnable) node IDs.
65    pub async fn get_ready(&self) -> Vec<NodeId> {
66        let (tx, rx) = oneshot::channel();
67        let _ = self.sender.send(ActorMessage::GetReady(tx)).await;
68        rx.await.unwrap_or_default()
69    }
70
71    /// Claim a node for execution (Pending/Ready → Running).
72    /// Returns true if this caller won the claim race.
73    pub async fn claim(&self, id: NodeId) -> bool {
74        let (tx, rx) = oneshot::channel();
75        let _ = self.sender.send(ActorMessage::ClaimNode(id, tx)).await;
76        rx.await.unwrap_or(false)
77    }
78
79    /// Mark a node as completed with its result value.
80    pub async fn complete(&self, id: NodeId, result: serde_json::Value) {
81        let _ = self.sender.send(ActorMessage::CompleteNode(id, result)).await;
82    }
83
84    /// Mark a node as failed with an error message.
85    pub async fn fail(&self, id: NodeId, error: String) {
86        let _ = self.sender.send(ActorMessage::FailNode(id, error)).await;
87    }
88
89    /// Get a point-in-time snapshot of the graph state.
90    pub async fn snapshot(&self) -> Option<GraphSnapshot> {
91        let (tx, rx) = oneshot::channel();
92        let _ = self.sender.send(ActorMessage::GetSnapshot(tx)).await;
93        rx.await.ok()
94    }
95
96    /// Gracefully shut down the graph actor.
97    pub async fn shutdown(&self) {
98        let _ = self.sender.send(ActorMessage::Shutdown).await;
99    }
100}
101
102/// The graph actor — single writer, owns all graph state.
103/// Run in a dedicated tokio task via `tokio::spawn(actor.run())`.
104pub struct GraphActor {
105    config: GraphConfig,
106    nodes: HashMap<NodeId, Node>,
107    graph: DiGraph<NodeId, ()>,
108    index_map: HashMap<NodeId, NodeIndex>,
109    ready_set: HashSet<NodeId>,
110    event_log: EventLog,
111    validator: MutationValidator,
112    receiver: mpsc::Receiver<ActorMessage>,
113    event_tx: mpsc::Sender<GraphEvent>,
114}
115
116impl GraphActor {
117    /// Create a new GraphActor.
118    /// Returns (actor, handle, event_receiver).
119    pub fn new(config: GraphConfig) -> (Self, GraphHandle, mpsc::Receiver<GraphEvent>) {
120        let (msg_tx, msg_rx) = mpsc::channel(1024);
121        let (event_tx, event_rx) = mpsc::channel(4096);
122
123        let actor = Self {
124            event_log: EventLog::new(config.snapshot_interval),
125            config,
126            nodes: HashMap::new(),
127            graph: DiGraph::new(),
128            index_map: HashMap::new(),
129            ready_set: HashSet::new(),
130            validator: MutationValidator::new(),
131            receiver: msg_rx,
132            event_tx,
133        };
134
135        let handle = GraphHandle { sender: msg_tx };
136        (actor, handle, event_rx)
137    }
138
139    /// Seed the graph with initial nodes before starting the run loop.
140    pub fn add_initial_nodes(&mut self, specs: Vec<NodeSpec>) {
141        for spec in specs {
142            self.add_node_internal(spec);
143        }
144        // Wire up declared edges
145        let pairs: Vec<(NodeId, NodeId)> = self
146            .nodes
147            .values()
148            .flat_map(|n| {
149                n.spec
150                    .depends_on
151                    .iter()
152                    .map(|dep| (dep.clone(), n.spec.id.clone()))
153                    .collect::<Vec<_>>()
154            })
155            .collect();
156        for (from, to) in pairs {
157            if let (Some(&fi), Some(&ti)) =
158                (self.index_map.get(&from), self.index_map.get(&to))
159            {
160                self.graph.add_edge(fi, ti, ());
161            }
162        }
163        self.recompute_ready_set();
164    }
165
166    /// Drive the actor message loop. Call via `tokio::spawn(actor.run())`.
167    pub async fn run(mut self) {
168        info!(run_id = %self.config.run_id, "GraphActor started");
169
170        while let Some(msg) = self.receiver.recv().await {
171            match msg {
172                ActorMessage::Mutate(req) => self.handle_mutate(req).await,
173
174                ActorMessage::GetReady(tx) => {
175                    let ready: Vec<NodeId> = self.ready_set.iter().cloned().collect();
176                    debug!(count = ready.len(), "GetReady");
177                    let _ = tx.send(ready);
178                }
179
180                ActorMessage::ClaimNode(id, tx) => {
181                    let ok = self.handle_claim(&id);
182                    let _ = tx.send(ok);
183                }
184
185                ActorMessage::CompleteNode(id, result) => {
186                    self.handle_complete(id, result).await;
187                }
188
189                ActorMessage::FailNode(id, error) => {
190                    self.handle_fail(id, error).await;
191                }
192
193                ActorMessage::GetSnapshot(tx) => {
194                    let snap = self.build_current_snapshot();
195                    let _ = tx.send(snap);
196                }
197
198                ActorMessage::Shutdown => {
199                    info!(run_id = %self.config.run_id, "GraphActor shutting down");
200                    break;
201                }
202            }
203
204            // Auto-snapshot when interval is reached
205            if self.event_log.should_snapshot() {
206                let snap = self.build_current_snapshot();
207                self.event_log.record_snapshot(snap.clone());
208                self.emit(GraphEvent::SnapshotTaken {
209                    snapshot: snap,
210                    ts: now_ms(),
211                })
212                .await;
213            }
214        }
215    }
216
217    // ── Internal helpers ──────────────────────────────────────────────────────
218
219    fn add_node_internal(&mut self, spec: NodeSpec) -> NodeIndex {
220        let id = spec.id.clone();
221        let idx = self.graph.add_node(id.clone());
222        self.index_map.insert(id.clone(), idx);
223        self.nodes.insert(id, Node::new(spec));
224        idx
225    }
226
227    fn recompute_ready_set(&mut self) {
228        self.ready_set.clear();
229        let ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
230        for id in ids {
231            let node = &self.nodes[&id];
232            if node.status != NodeStatus::Pending {
233                continue;
234            }
235            let all_deps_done = node.spec.depends_on.iter().all(|dep_id| {
236                self.nodes
237                    .get(dep_id)
238                    .map(|d| d.status == NodeStatus::Completed)
239                    .unwrap_or(false)
240            });
241            if all_deps_done {
242                self.ready_set.insert(id);
243            }
244        }
245    }
246
247    fn handle_claim(&mut self, id: &NodeId) -> bool {
248        if !self.ready_set.contains(id) {
249            return false;
250        }
251        if let Some(node) = self.nodes.get_mut(id) {
252            if matches!(node.status, NodeStatus::Pending | NodeStatus::Ready) {
253                node.status = NodeStatus::Running;
254                node.started_at = Some(now_ms());
255                self.ready_set.remove(id);
256                return true;
257            }
258        }
259        false
260    }
261
262    async fn handle_complete(&mut self, id: NodeId, result: serde_json::Value) {
263        if let Some(node) = self.nodes.get_mut(&id) {
264            node.status = NodeStatus::Completed;
265            node.result = Some(result.clone());
266            node.completed_at = Some(now_ms());
267            self.emit(GraphEvent::NodeCompleted {
268                id: id.clone(),
269                result,
270                ts: now_ms(),
271            })
272            .await;
273        }
274        self.recompute_ready_set();
275        if self.is_run_complete() {
276            self.emit(GraphEvent::RunCompleted {
277                run_id: self.config.run_id.clone(),
278                ts: now_ms(),
279            })
280            .await;
281        }
282    }
283
284    async fn handle_fail(&mut self, id: NodeId, error: String) {
285        if let Some(node) = self.nodes.get_mut(&id) {
286            node.status = NodeStatus::Failed;
287            node.error = Some(error.clone());
288            node.completed_at = Some(now_ms());
289        }
290        self.emit(GraphEvent::NodeFailed {
291            id,
292            error,
293            ts: now_ms(),
294        })
295        .await;
296    }
297
298    async fn handle_mutate(&mut self, req: MutationRequest) {
299        match self
300            .validator
301            .validate(&self.graph, &self.index_map, &req.mutation)
302        {
303            Err(reason) => {
304                warn!("Mutation rejected: {reason}");
305                self.emit(GraphEvent::MutationRejected {
306                    reason: reason.clone(),
307                    ts: now_ms(),
308                })
309                .await;
310                let _ = req.response.send(MutationResult::Rejected { reason });
311            }
312            Ok(()) => {
313                self.apply_mutation(req.mutation.clone()).await;
314                self.emit(GraphEvent::MutationApplied {
315                    mutation: req.mutation,
316                    ts: now_ms(),
317                })
318                .await;
319                let _ = req.response.send(MutationResult::Applied);
320                self.recompute_ready_set();
321            }
322        }
323    }
324
325    async fn apply_mutation(&mut self, mutation: GraphMutation) {
326        match mutation {
327            GraphMutation::AddNode { spec } => {
328                let id = spec.id.clone();
329                let deps = spec.depends_on.clone();
330                self.add_node_internal(spec.clone());
331                // Wire declared dependencies
332                for dep_id in &deps {
333                    if let (Some(&di), Some(&ni)) =
334                        (self.index_map.get(dep_id), self.index_map.get(&id))
335                    {
336                        self.graph.add_edge(di, ni, ());
337                        self.emit(GraphEvent::EdgeAdded {
338                            from: dep_id.clone(),
339                            to: id.clone(),
340                            ts: now_ms(),
341                        })
342                        .await;
343                    }
344                }
345                self.emit(GraphEvent::NodeAdded {
346                    id,
347                    spec,
348                    ts: now_ms(),
349                })
350                .await;
351            }
352
353            GraphMutation::AddEdge { from, to } => {
354                if let (Some(&fi), Some(&ti)) =
355                    (self.index_map.get(&from), self.index_map.get(&to))
356                {
357                    self.graph.add_edge(fi, ti, ());
358                    self.emit(GraphEvent::EdgeAdded {
359                        from,
360                        to,
361                        ts: now_ms(),
362                    })
363                    .await;
364                }
365            }
366
367            GraphMutation::RemoveEdge { from, to } => {
368                if let (Some(&fi), Some(&ti)) =
369                    (self.index_map.get(&from), self.index_map.get(&to))
370                {
371                    if let Some(edge) = self.graph.find_edge(fi, ti) {
372                        self.graph.remove_edge(edge);
373                        self.emit(GraphEvent::EdgeRemoved {
374                            from,
375                            to,
376                            ts: now_ms(),
377                        })
378                        .await;
379                    }
380                }
381            }
382
383            GraphMutation::InjectBefore { before, insert } => {
384                let new_id = insert.id.clone();
385                self.add_node_internal(insert.clone());
386                self.emit(GraphEvent::NodeAdded {
387                    id: new_id.clone(),
388                    spec: insert,
389                    ts: now_ms(),
390                })
391                .await;
392                // new_node → before
393                if let (Some(&ni), Some(&bi)) =
394                    (self.index_map.get(&new_id), self.index_map.get(&before))
395                {
396                    self.graph.add_edge(ni, bi, ());
397                    self.emit(GraphEvent::EdgeAdded {
398                        from: new_id,
399                        to: before,
400                        ts: now_ms(),
401                    })
402                    .await;
403                }
404            }
405
406            GraphMutation::PruneSubgraph { root } => {
407                let to_abandon = self.collect_subgraph(&root);
408                for id in to_abandon {
409                    if let Some(node) = self.nodes.get_mut(&id) {
410                        if !node.is_terminal() {
411                            node.status = NodeStatus::Abandoned;
412                            self.ready_set.remove(&id);
413                            self.emit(GraphEvent::NodeAbandoned {
414                                id,
415                                ts: now_ms(),
416                            })
417                            .await;
418                        }
419                    }
420                }
421            }
422
423            GraphMutation::SplitNode { node, into } => {
424                if let Some(n) = self.nodes.get_mut(&node) {
425                    if !n.is_terminal() {
426                        n.status = NodeStatus::Abandoned;
427                        self.ready_set.remove(&node);
428                        self.emit(GraphEvent::NodeAbandoned {
429                            id: node,
430                            ts: now_ms(),
431                        })
432                        .await;
433                    }
434                }
435                for spec in into {
436                    let id = spec.id.clone();
437                    self.add_node_internal(spec.clone());
438                    self.emit(GraphEvent::NodeAdded {
439                        id,
440                        spec,
441                        ts: now_ms(),
442                    })
443                    .await;
444                }
445            }
446
447            GraphMutation::MarkSpeculative { nodes } => {
448                for id in nodes {
449                    if let Some(node) = self.nodes.get_mut(&id) {
450                        if node.status == NodeStatus::Pending {
451                            node.status = NodeStatus::Speculative;
452                            self.ready_set.remove(&id);
453                        }
454                    }
455                }
456            }
457
458            GraphMutation::ConfirmSpeculative { nodes } => {
459                for id in nodes {
460                    if let Some(node) = self.nodes.get_mut(&id) {
461                        if node.status == NodeStatus::Speculative {
462                            node.status = NodeStatus::Pending;
463                        }
464                    }
465                }
466                self.recompute_ready_set();
467            }
468
469            GraphMutation::DiscardSpeculative { nodes } => {
470                for id in nodes {
471                    if let Some(node) = self.nodes.get_mut(&id) {
472                        if node.status == NodeStatus::Speculative {
473                            node.status = NodeStatus::Abandoned;
474                            self.ready_set.remove(&id);
475                            self.emit(GraphEvent::NodeAbandoned {
476                                id,
477                                ts: now_ms(),
478                            })
479                            .await;
480                        }
481                    }
482                }
483            }
484        }
485    }
486
487    /// Collect a node and all its descendants (BFS over outgoing edges).
488    fn collect_subgraph(&self, root: &NodeId) -> Vec<NodeId> {
489        let mut result = Vec::new();
490        let Some(&root_idx) = self.index_map.get(root) else {
491            return result;
492        };
493        let mut stack = vec![root_idx];
494        let mut visited = HashSet::new();
495        while let Some(idx) = stack.pop() {
496            if !visited.insert(idx) {
497                continue;
498            }
499            if let Some(id) = self.graph.node_weight(idx) {
500                result.push(id.clone());
501            }
502            for neighbor in self.graph.neighbors(idx) {
503                stack.push(neighbor);
504            }
505        }
506        result
507    }
508
509    fn build_current_snapshot(&self) -> GraphSnapshot {
510        let edges: Vec<(NodeId, NodeId)> = self
511            .graph
512            .edge_indices()
513            .filter_map(|e| {
514                self.graph.edge_endpoints(e).and_then(|(fi, ti)| {
515                    let from = self.graph.node_weight(fi)?.clone();
516                    let to = self.graph.node_weight(ti)?.clone();
517                    Some((from, to))
518                })
519            })
520            .collect();
521
522        build_snapshot(
523            self.config.run_id.clone(),
524            &self.nodes,
525            edges,
526            self.event_log.len() as u64,
527        )
528    }
529
530    fn is_run_complete(&self) -> bool {
531        !self.nodes.is_empty() && self.nodes.values().all(|n| n.is_terminal())
532    }
533
534    async fn emit(&mut self, event: GraphEvent) {
535        self.event_log.append(event.clone());
536        // Non-blocking: drop events if the consumer is slow
537        let _ = self.event_tx.try_send(event);
538    }
539}
540
541// ── Tests ─────────────────────────────────────────────────────────────────────
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use devsper_core::{GraphMutation, NodeSpec};
547
548    fn make_config() -> GraphConfig {
549        GraphConfig {
550            run_id: RunId::new(),
551            snapshot_interval: 100,
552            max_depth: 10,
553        }
554    }
555
556    #[tokio::test]
557    async fn single_task_ready_and_completes() {
558        let (mut actor, handle, _rx) = GraphActor::new(make_config());
559        let spec = NodeSpec::new("test task");
560        let node_id = spec.id.clone();
561        actor.add_initial_nodes(vec![spec]);
562        tokio::spawn(actor.run());
563
564        let ready = handle.get_ready().await;
565        assert!(ready.contains(&node_id));
566
567        assert!(handle.claim(node_id.clone()).await);
568
569        // No longer in ready set after claim
570        let ready2 = handle.get_ready().await;
571        assert!(!ready2.contains(&node_id));
572
573        handle.complete(node_id, serde_json::json!({"ok": true})).await;
574        handle.shutdown().await;
575    }
576
577    #[tokio::test]
578    async fn dependency_ordering_respected() {
579        let (mut actor, handle, _rx) = GraphActor::new(make_config());
580
581        let spec_a = NodeSpec::new("A");
582        let id_a = spec_a.id.clone();
583        let spec_b = NodeSpec::new("B").depends_on(vec![id_a.clone()]);
584        let id_b = spec_b.id.clone();
585
586        actor.add_initial_nodes(vec![spec_a, spec_b]);
587        tokio::spawn(actor.run());
588
589        // Only A ready initially
590        let ready = handle.get_ready().await;
591        assert!(ready.contains(&id_a), "A should be ready");
592        assert!(!ready.contains(&id_b), "B should not be ready yet");
593
594        handle.claim(id_a.clone()).await;
595        handle.complete(id_a, serde_json::json!(null)).await;
596
597        tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
598
599        let ready2 = handle.get_ready().await;
600        assert!(ready2.contains(&id_b), "B should be ready after A completes");
601
602        handle.shutdown().await;
603    }
604
605    #[tokio::test]
606    async fn cycle_mutation_rejected() {
607        let (mut actor, handle, _rx) = GraphActor::new(make_config());
608
609        let spec_a = NodeSpec::new("A");
610        let id_a = spec_a.id.clone();
611        let spec_b = NodeSpec::new("B").depends_on(vec![id_a.clone()]);
612        let id_b = spec_b.id.clone();
613
614        actor.add_initial_nodes(vec![spec_a, spec_b]);
615        tokio::spawn(actor.run());
616
617        // A→B exists; adding B→A creates a cycle
618        let result = handle
619            .mutate(GraphMutation::AddEdge {
620                from: id_b.clone(),
621                to: id_a.clone(),
622            })
623            .await;
624        assert!(result.is_err(), "Cycle should be rejected: {result:?}");
625
626        handle.shutdown().await;
627    }
628
629    #[tokio::test]
630    async fn inject_node_mutation_makes_it_ready() {
631        let (actor, handle, _rx) = GraphActor::new(make_config());
632        tokio::spawn(actor.run());
633
634        let new_spec = NodeSpec::new("injected");
635        let new_id = new_spec.id.clone();
636
637        handle
638            .mutate(GraphMutation::AddNode { spec: new_spec })
639            .await
640            .unwrap();
641
642        tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
643
644        let ready = handle.get_ready().await;
645        assert!(ready.contains(&new_id), "Injected node should be ready");
646
647        handle.shutdown().await;
648    }
649
650    #[tokio::test]
651    async fn speculative_lifecycle() {
652        let (mut actor, handle, _rx) = GraphActor::new(make_config());
653        let spec = NodeSpec::new("speculative");
654        let id = spec.id.clone();
655        actor.add_initial_nodes(vec![spec]);
656        tokio::spawn(actor.run());
657
658        // Mark speculative → not ready
659        handle
660            .mutate(GraphMutation::MarkSpeculative {
661                nodes: vec![id.clone()],
662            })
663            .await
664            .unwrap();
665        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
666        assert!(
667            !handle.get_ready().await.contains(&id),
668            "Speculative should not be ready"
669        );
670
671        // Confirm → ready
672        handle
673            .mutate(GraphMutation::ConfirmSpeculative {
674                nodes: vec![id.clone()],
675            })
676            .await
677            .unwrap();
678        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
679        assert!(
680            handle.get_ready().await.contains(&id),
681            "Confirmed speculative should be ready"
682        );
683
684        handle.shutdown().await;
685    }
686
687    #[tokio::test]
688    async fn snapshot_contains_seeded_nodes() {
689        let (mut actor, handle, _rx) = GraphActor::new(make_config());
690        let spec = NodeSpec::new("seed");
691        let id = spec.id.clone();
692        actor.add_initial_nodes(vec![spec]);
693        tokio::spawn(actor.run());
694
695        let snap = handle.snapshot().await.unwrap();
696        assert!(snap.nodes.contains_key(&id));
697
698        handle.shutdown().await;
699    }
700}