1use crate::{
2 event_log::EventLog,
3 mutation::{MutationRequest, MutationResult},
4 snapshot::build_snapshot,
5 validator::MutationValidator,
6};
7use devsper_core::{
8 now_ms, GraphEvent, GraphMutation, GraphSnapshot, Node, NodeId, NodeSpec, NodeStatus, RunId,
9};
10use petgraph::graph::{DiGraph, NodeIndex};
11use std::collections::{HashMap, HashSet};
12use tokio::sync::{mpsc, oneshot};
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone)]
17pub struct GraphConfig {
18 pub run_id: RunId,
19 pub snapshot_interval: u64,
20 pub max_depth: u32,
21}
22
23impl Default for GraphConfig {
24 fn default() -> Self {
25 Self {
26 run_id: RunId::new(),
27 snapshot_interval: 1000,
28 max_depth: 10,
29 }
30 }
31}
32
33enum ActorMessage {
35 Mutate(MutationRequest),
36 GetReady(oneshot::Sender<Vec<NodeId>>),
37 ClaimNode(NodeId, oneshot::Sender<bool>),
38 CompleteNode(NodeId, serde_json::Value),
39 FailNode(NodeId, String),
40 GetSnapshot(oneshot::Sender<GraphSnapshot>),
41 Shutdown,
42}
43
44#[derive(Clone)]
46pub struct GraphHandle {
47 sender: mpsc::Sender<ActorMessage>,
48}
49
50impl GraphHandle {
51 pub async fn mutate(&self, mutation: GraphMutation) -> Result<(), String> {
53 let (req, rx) = MutationRequest::new(mutation);
54 self.sender
55 .send(ActorMessage::Mutate(req))
56 .await
57 .map_err(|_| "GraphActor has shut down".to_string())?;
58 match rx.await.map_err(|_| "GraphActor dropped response".to_string())? {
59 MutationResult::Applied => Ok(()),
60 MutationResult::Rejected { reason } => Err(reason),
61 }
62 }
63
64 pub async fn get_ready(&self) -> Vec<NodeId> {
66 let (tx, rx) = oneshot::channel();
67 let _ = self.sender.send(ActorMessage::GetReady(tx)).await;
68 rx.await.unwrap_or_default()
69 }
70
71 pub async fn claim(&self, id: NodeId) -> bool {
74 let (tx, rx) = oneshot::channel();
75 let _ = self.sender.send(ActorMessage::ClaimNode(id, tx)).await;
76 rx.await.unwrap_or(false)
77 }
78
79 pub async fn complete(&self, id: NodeId, result: serde_json::Value) {
81 let _ = self.sender.send(ActorMessage::CompleteNode(id, result)).await;
82 }
83
84 pub async fn fail(&self, id: NodeId, error: String) {
86 let _ = self.sender.send(ActorMessage::FailNode(id, error)).await;
87 }
88
89 pub async fn snapshot(&self) -> Option<GraphSnapshot> {
91 let (tx, rx) = oneshot::channel();
92 let _ = self.sender.send(ActorMessage::GetSnapshot(tx)).await;
93 rx.await.ok()
94 }
95
96 pub async fn shutdown(&self) {
98 let _ = self.sender.send(ActorMessage::Shutdown).await;
99 }
100}
101
102pub struct GraphActor {
105 config: GraphConfig,
106 nodes: HashMap<NodeId, Node>,
107 graph: DiGraph<NodeId, ()>,
108 index_map: HashMap<NodeId, NodeIndex>,
109 ready_set: HashSet<NodeId>,
110 event_log: EventLog,
111 validator: MutationValidator,
112 receiver: mpsc::Receiver<ActorMessage>,
113 event_tx: mpsc::Sender<GraphEvent>,
114}
115
116impl GraphActor {
117 pub fn new(config: GraphConfig) -> (Self, GraphHandle, mpsc::Receiver<GraphEvent>) {
120 let (msg_tx, msg_rx) = mpsc::channel(1024);
121 let (event_tx, event_rx) = mpsc::channel(4096);
122
123 let actor = Self {
124 event_log: EventLog::new(config.snapshot_interval),
125 config,
126 nodes: HashMap::new(),
127 graph: DiGraph::new(),
128 index_map: HashMap::new(),
129 ready_set: HashSet::new(),
130 validator: MutationValidator::new(),
131 receiver: msg_rx,
132 event_tx,
133 };
134
135 let handle = GraphHandle { sender: msg_tx };
136 (actor, handle, event_rx)
137 }
138
139 pub fn add_initial_nodes(&mut self, specs: Vec<NodeSpec>) {
141 for spec in specs {
142 self.add_node_internal(spec);
143 }
144 let pairs: Vec<(NodeId, NodeId)> = self
146 .nodes
147 .values()
148 .flat_map(|n| {
149 n.spec
150 .depends_on
151 .iter()
152 .map(|dep| (dep.clone(), n.spec.id.clone()))
153 .collect::<Vec<_>>()
154 })
155 .collect();
156 for (from, to) in pairs {
157 if let (Some(&fi), Some(&ti)) =
158 (self.index_map.get(&from), self.index_map.get(&to))
159 {
160 self.graph.add_edge(fi, ti, ());
161 }
162 }
163 self.recompute_ready_set();
164 }
165
166 pub async fn run(mut self) {
168 info!(run_id = %self.config.run_id, "GraphActor started");
169
170 while let Some(msg) = self.receiver.recv().await {
171 match msg {
172 ActorMessage::Mutate(req) => self.handle_mutate(req).await,
173
174 ActorMessage::GetReady(tx) => {
175 let ready: Vec<NodeId> = self.ready_set.iter().cloned().collect();
176 debug!(count = ready.len(), "GetReady");
177 let _ = tx.send(ready);
178 }
179
180 ActorMessage::ClaimNode(id, tx) => {
181 let ok = self.handle_claim(&id);
182 let _ = tx.send(ok);
183 }
184
185 ActorMessage::CompleteNode(id, result) => {
186 self.handle_complete(id, result).await;
187 }
188
189 ActorMessage::FailNode(id, error) => {
190 self.handle_fail(id, error).await;
191 }
192
193 ActorMessage::GetSnapshot(tx) => {
194 let snap = self.build_current_snapshot();
195 let _ = tx.send(snap);
196 }
197
198 ActorMessage::Shutdown => {
199 info!(run_id = %self.config.run_id, "GraphActor shutting down");
200 break;
201 }
202 }
203
204 if self.event_log.should_snapshot() {
206 let snap = self.build_current_snapshot();
207 self.event_log.record_snapshot(snap.clone());
208 self.emit(GraphEvent::SnapshotTaken {
209 snapshot: snap,
210 ts: now_ms(),
211 })
212 .await;
213 }
214 }
215 }
216
217 fn add_node_internal(&mut self, spec: NodeSpec) -> NodeIndex {
220 let id = spec.id.clone();
221 let idx = self.graph.add_node(id.clone());
222 self.index_map.insert(id.clone(), idx);
223 self.nodes.insert(id, Node::new(spec));
224 idx
225 }
226
227 fn recompute_ready_set(&mut self) {
228 self.ready_set.clear();
229 let ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
230 for id in ids {
231 let node = &self.nodes[&id];
232 if node.status != NodeStatus::Pending {
233 continue;
234 }
235 let all_deps_done = node.spec.depends_on.iter().all(|dep_id| {
236 self.nodes
237 .get(dep_id)
238 .map(|d| d.status == NodeStatus::Completed)
239 .unwrap_or(false)
240 });
241 if all_deps_done {
242 self.ready_set.insert(id);
243 }
244 }
245 }
246
247 fn handle_claim(&mut self, id: &NodeId) -> bool {
248 if !self.ready_set.contains(id) {
249 return false;
250 }
251 if let Some(node) = self.nodes.get_mut(id) {
252 if matches!(node.status, NodeStatus::Pending | NodeStatus::Ready) {
253 node.status = NodeStatus::Running;
254 node.started_at = Some(now_ms());
255 self.ready_set.remove(id);
256 return true;
257 }
258 }
259 false
260 }
261
262 async fn handle_complete(&mut self, id: NodeId, result: serde_json::Value) {
263 if let Some(node) = self.nodes.get_mut(&id) {
264 node.status = NodeStatus::Completed;
265 node.result = Some(result.clone());
266 node.completed_at = Some(now_ms());
267 self.emit(GraphEvent::NodeCompleted {
268 id: id.clone(),
269 result,
270 ts: now_ms(),
271 })
272 .await;
273 }
274 self.recompute_ready_set();
275 if self.is_run_complete() {
276 self.emit(GraphEvent::RunCompleted {
277 run_id: self.config.run_id.clone(),
278 ts: now_ms(),
279 })
280 .await;
281 }
282 }
283
284 async fn handle_fail(&mut self, id: NodeId, error: String) {
285 if let Some(node) = self.nodes.get_mut(&id) {
286 node.status = NodeStatus::Failed;
287 node.error = Some(error.clone());
288 node.completed_at = Some(now_ms());
289 }
290 self.emit(GraphEvent::NodeFailed {
291 id,
292 error,
293 ts: now_ms(),
294 })
295 .await;
296 }
297
298 async fn handle_mutate(&mut self, req: MutationRequest) {
299 match self
300 .validator
301 .validate(&self.graph, &self.index_map, &req.mutation)
302 {
303 Err(reason) => {
304 warn!("Mutation rejected: {reason}");
305 self.emit(GraphEvent::MutationRejected {
306 reason: reason.clone(),
307 ts: now_ms(),
308 })
309 .await;
310 let _ = req.response.send(MutationResult::Rejected { reason });
311 }
312 Ok(()) => {
313 self.apply_mutation(req.mutation.clone()).await;
314 self.emit(GraphEvent::MutationApplied {
315 mutation: req.mutation,
316 ts: now_ms(),
317 })
318 .await;
319 let _ = req.response.send(MutationResult::Applied);
320 self.recompute_ready_set();
321 }
322 }
323 }
324
325 async fn apply_mutation(&mut self, mutation: GraphMutation) {
326 match mutation {
327 GraphMutation::AddNode { spec } => {
328 let id = spec.id.clone();
329 let deps = spec.depends_on.clone();
330 self.add_node_internal(spec.clone());
331 for dep_id in &deps {
333 if let (Some(&di), Some(&ni)) =
334 (self.index_map.get(dep_id), self.index_map.get(&id))
335 {
336 self.graph.add_edge(di, ni, ());
337 self.emit(GraphEvent::EdgeAdded {
338 from: dep_id.clone(),
339 to: id.clone(),
340 ts: now_ms(),
341 })
342 .await;
343 }
344 }
345 self.emit(GraphEvent::NodeAdded {
346 id,
347 spec,
348 ts: now_ms(),
349 })
350 .await;
351 }
352
353 GraphMutation::AddEdge { from, to } => {
354 if let (Some(&fi), Some(&ti)) =
355 (self.index_map.get(&from), self.index_map.get(&to))
356 {
357 self.graph.add_edge(fi, ti, ());
358 self.emit(GraphEvent::EdgeAdded {
359 from,
360 to,
361 ts: now_ms(),
362 })
363 .await;
364 }
365 }
366
367 GraphMutation::RemoveEdge { from, to } => {
368 if let (Some(&fi), Some(&ti)) =
369 (self.index_map.get(&from), self.index_map.get(&to))
370 {
371 if let Some(edge) = self.graph.find_edge(fi, ti) {
372 self.graph.remove_edge(edge);
373 self.emit(GraphEvent::EdgeRemoved {
374 from,
375 to,
376 ts: now_ms(),
377 })
378 .await;
379 }
380 }
381 }
382
383 GraphMutation::InjectBefore { before, insert } => {
384 let new_id = insert.id.clone();
385 self.add_node_internal(insert.clone());
386 self.emit(GraphEvent::NodeAdded {
387 id: new_id.clone(),
388 spec: insert,
389 ts: now_ms(),
390 })
391 .await;
392 if let (Some(&ni), Some(&bi)) =
394 (self.index_map.get(&new_id), self.index_map.get(&before))
395 {
396 self.graph.add_edge(ni, bi, ());
397 self.emit(GraphEvent::EdgeAdded {
398 from: new_id,
399 to: before,
400 ts: now_ms(),
401 })
402 .await;
403 }
404 }
405
406 GraphMutation::PruneSubgraph { root } => {
407 let to_abandon = self.collect_subgraph(&root);
408 for id in to_abandon {
409 if let Some(node) = self.nodes.get_mut(&id) {
410 if !node.is_terminal() {
411 node.status = NodeStatus::Abandoned;
412 self.ready_set.remove(&id);
413 self.emit(GraphEvent::NodeAbandoned {
414 id,
415 ts: now_ms(),
416 })
417 .await;
418 }
419 }
420 }
421 }
422
423 GraphMutation::SplitNode { node, into } => {
424 if let Some(n) = self.nodes.get_mut(&node) {
425 if !n.is_terminal() {
426 n.status = NodeStatus::Abandoned;
427 self.ready_set.remove(&node);
428 self.emit(GraphEvent::NodeAbandoned {
429 id: node,
430 ts: now_ms(),
431 })
432 .await;
433 }
434 }
435 for spec in into {
436 let id = spec.id.clone();
437 self.add_node_internal(spec.clone());
438 self.emit(GraphEvent::NodeAdded {
439 id,
440 spec,
441 ts: now_ms(),
442 })
443 .await;
444 }
445 }
446
447 GraphMutation::MarkSpeculative { nodes } => {
448 for id in nodes {
449 if let Some(node) = self.nodes.get_mut(&id) {
450 if node.status == NodeStatus::Pending {
451 node.status = NodeStatus::Speculative;
452 self.ready_set.remove(&id);
453 }
454 }
455 }
456 }
457
458 GraphMutation::ConfirmSpeculative { nodes } => {
459 for id in nodes {
460 if let Some(node) = self.nodes.get_mut(&id) {
461 if node.status == NodeStatus::Speculative {
462 node.status = NodeStatus::Pending;
463 }
464 }
465 }
466 self.recompute_ready_set();
467 }
468
469 GraphMutation::DiscardSpeculative { nodes } => {
470 for id in nodes {
471 if let Some(node) = self.nodes.get_mut(&id) {
472 if node.status == NodeStatus::Speculative {
473 node.status = NodeStatus::Abandoned;
474 self.ready_set.remove(&id);
475 self.emit(GraphEvent::NodeAbandoned {
476 id,
477 ts: now_ms(),
478 })
479 .await;
480 }
481 }
482 }
483 }
484 }
485 }
486
487 fn collect_subgraph(&self, root: &NodeId) -> Vec<NodeId> {
489 let mut result = Vec::new();
490 let Some(&root_idx) = self.index_map.get(root) else {
491 return result;
492 };
493 let mut stack = vec![root_idx];
494 let mut visited = HashSet::new();
495 while let Some(idx) = stack.pop() {
496 if !visited.insert(idx) {
497 continue;
498 }
499 if let Some(id) = self.graph.node_weight(idx) {
500 result.push(id.clone());
501 }
502 for neighbor in self.graph.neighbors(idx) {
503 stack.push(neighbor);
504 }
505 }
506 result
507 }
508
509 fn build_current_snapshot(&self) -> GraphSnapshot {
510 let edges: Vec<(NodeId, NodeId)> = self
511 .graph
512 .edge_indices()
513 .filter_map(|e| {
514 self.graph.edge_endpoints(e).and_then(|(fi, ti)| {
515 let from = self.graph.node_weight(fi)?.clone();
516 let to = self.graph.node_weight(ti)?.clone();
517 Some((from, to))
518 })
519 })
520 .collect();
521
522 build_snapshot(
523 self.config.run_id.clone(),
524 &self.nodes,
525 edges,
526 self.event_log.len() as u64,
527 )
528 }
529
530 fn is_run_complete(&self) -> bool {
531 !self.nodes.is_empty() && self.nodes.values().all(|n| n.is_terminal())
532 }
533
534 async fn emit(&mut self, event: GraphEvent) {
535 self.event_log.append(event.clone());
536 let _ = self.event_tx.try_send(event);
538 }
539}
540
541#[cfg(test)]
544mod tests {
545 use super::*;
546 use devsper_core::{GraphMutation, NodeSpec};
547
548 fn make_config() -> GraphConfig {
549 GraphConfig {
550 run_id: RunId::new(),
551 snapshot_interval: 100,
552 max_depth: 10,
553 }
554 }
555
556 #[tokio::test]
557 async fn single_task_ready_and_completes() {
558 let (mut actor, handle, _rx) = GraphActor::new(make_config());
559 let spec = NodeSpec::new("test task");
560 let node_id = spec.id.clone();
561 actor.add_initial_nodes(vec![spec]);
562 tokio::spawn(actor.run());
563
564 let ready = handle.get_ready().await;
565 assert!(ready.contains(&node_id));
566
567 assert!(handle.claim(node_id.clone()).await);
568
569 let ready2 = handle.get_ready().await;
571 assert!(!ready2.contains(&node_id));
572
573 handle.complete(node_id, serde_json::json!({"ok": true})).await;
574 handle.shutdown().await;
575 }
576
577 #[tokio::test]
578 async fn dependency_ordering_respected() {
579 let (mut actor, handle, _rx) = GraphActor::new(make_config());
580
581 let spec_a = NodeSpec::new("A");
582 let id_a = spec_a.id.clone();
583 let spec_b = NodeSpec::new("B").depends_on(vec![id_a.clone()]);
584 let id_b = spec_b.id.clone();
585
586 actor.add_initial_nodes(vec![spec_a, spec_b]);
587 tokio::spawn(actor.run());
588
589 let ready = handle.get_ready().await;
591 assert!(ready.contains(&id_a), "A should be ready");
592 assert!(!ready.contains(&id_b), "B should not be ready yet");
593
594 handle.claim(id_a.clone()).await;
595 handle.complete(id_a, serde_json::json!(null)).await;
596
597 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
598
599 let ready2 = handle.get_ready().await;
600 assert!(ready2.contains(&id_b), "B should be ready after A completes");
601
602 handle.shutdown().await;
603 }
604
605 #[tokio::test]
606 async fn cycle_mutation_rejected() {
607 let (mut actor, handle, _rx) = GraphActor::new(make_config());
608
609 let spec_a = NodeSpec::new("A");
610 let id_a = spec_a.id.clone();
611 let spec_b = NodeSpec::new("B").depends_on(vec![id_a.clone()]);
612 let id_b = spec_b.id.clone();
613
614 actor.add_initial_nodes(vec![spec_a, spec_b]);
615 tokio::spawn(actor.run());
616
617 let result = handle
619 .mutate(GraphMutation::AddEdge {
620 from: id_b.clone(),
621 to: id_a.clone(),
622 })
623 .await;
624 assert!(result.is_err(), "Cycle should be rejected: {result:?}");
625
626 handle.shutdown().await;
627 }
628
629 #[tokio::test]
630 async fn inject_node_mutation_makes_it_ready() {
631 let (actor, handle, _rx) = GraphActor::new(make_config());
632 tokio::spawn(actor.run());
633
634 let new_spec = NodeSpec::new("injected");
635 let new_id = new_spec.id.clone();
636
637 handle
638 .mutate(GraphMutation::AddNode { spec: new_spec })
639 .await
640 .unwrap();
641
642 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
643
644 let ready = handle.get_ready().await;
645 assert!(ready.contains(&new_id), "Injected node should be ready");
646
647 handle.shutdown().await;
648 }
649
650 #[tokio::test]
651 async fn speculative_lifecycle() {
652 let (mut actor, handle, _rx) = GraphActor::new(make_config());
653 let spec = NodeSpec::new("speculative");
654 let id = spec.id.clone();
655 actor.add_initial_nodes(vec![spec]);
656 tokio::spawn(actor.run());
657
658 handle
660 .mutate(GraphMutation::MarkSpeculative {
661 nodes: vec![id.clone()],
662 })
663 .await
664 .unwrap();
665 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
666 assert!(
667 !handle.get_ready().await.contains(&id),
668 "Speculative should not be ready"
669 );
670
671 handle
673 .mutate(GraphMutation::ConfirmSpeculative {
674 nodes: vec![id.clone()],
675 })
676 .await
677 .unwrap();
678 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
679 assert!(
680 handle.get_ready().await.contains(&id),
681 "Confirmed speculative should be ready"
682 );
683
684 handle.shutdown().await;
685 }
686
687 #[tokio::test]
688 async fn snapshot_contains_seeded_nodes() {
689 let (mut actor, handle, _rx) = GraphActor::new(make_config());
690 let spec = NodeSpec::new("seed");
691 let id = spec.id.clone();
692 actor.add_initial_nodes(vec![spec]);
693 tokio::spawn(actor.run());
694
695 let snap = handle.snapshot().await.unwrap();
696 assert!(snap.nodes.contains_key(&id));
697
698 handle.shutdown().await;
699 }
700}