1use crate::time::Instant;
7use crate::{
8 JunctureError, Node, State,
9 checkpoint::{
10 Checkpoint, CheckpointMetadata, CheckpointSource, DeltaCounters, generate_checkpoint_id,
11 },
12 edge::TriggerTable,
13 interrupt::should_interrupt,
14 pregel::{
15 budget::BudgetTracker,
16 context::ExecutionContext,
17 durability::Durability,
18 runner::execute_superstep,
19 scheduler::{
20 FieldVersionTracker, VersionsSeen, apply_writes, compute_next_tasks,
21 schedule_error_handlers,
22 },
23 types::{BubbleUp, LoopStatus, PendingTask, SuperstepResult},
24 },
25 state::FieldsChanged,
26 stream::{DebugEvent, StreamEvent},
27};
28use indexmap::IndexMap;
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use tokio::sync::mpsc;
33use tokio_util::sync::CancellationToken;
34
35#[derive(Clone, Debug)]
60pub struct RunControl {
61 drain_requested: Arc<AtomicBool>,
62}
63
64impl RunControl {
65 #[must_use]
76 pub fn new() -> Self {
77 Self {
78 drain_requested: Arc::new(AtomicBool::new(false)),
79 }
80 }
81
82 pub fn request_drain(&self) {
96 self.drain_requested.store(true, Ordering::Release);
97 }
98
99 #[must_use]
110 pub fn is_drain_requested(&self) -> bool {
111 self.drain_requested.load(Ordering::Acquire)
112 }
113}
114
115impl Default for RunControl {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121pub struct PregelLoop<S: State> {
126 pub state: S,
128
129 pub nodes: IndexMap<String, Arc<dyn Node<S>>>,
131
132 pub trigger_table: TriggerTable<S>,
134
135 pub field_versions: FieldVersionTracker,
137
138 pub versions_seen: VersionsSeen,
140
141 pub runnable_config: crate::config::RunnableConfig,
143
144 pub cancellation_token: CancellationToken,
146
147 pub stream_tx: Option<mpsc::UnboundedSender<StreamEvent<S>>>,
149
150 pub checkpointer: Option<Arc<dyn crate::checkpoint::CheckpointSaver>>,
152
153 pub step: usize,
155
156 pub status: LoopStatus,
158
159 pub pending_tasks: Vec<PendingTask<S>>,
161
162 previous_superstep_changed_fields: FieldsChanged,
168
169 budget_tracker: Option<Arc<BudgetTracker>>,
171
172 run_control: RunControl,
174
175 run_id: String,
177
178 interrupt_rx: Option<mpsc::UnboundedReceiver<crate::interrupt::InterruptSignal>>,
181
182 pending_interrupts: Vec<crate::interrupt::InterruptSignal>,
184
185 scratchpad: crate::interrupt::Scratchpad,
187
188 interrupt_versions_seen: HashMap<String, u64>,
192
193 superstep_start: Option<Instant>,
197
198 error_handler_map: HashMap<String, String>,
204
205 trigger_to_nodes: crate::pregel::scheduler::TriggerToNodes,
211
212 retry_policies: HashMap<String, crate::graph::RetryPolicy>,
218
219 timeout_policies: HashMap<String, crate::pregel::context::TimeoutPolicy>,
226
227 delta_counters: HashMap<String, DeltaCounters>,
233
234 channels_finished: bool,
244}
245
246impl<S: State> std::fmt::Debug for PregelLoop<S> {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 f.debug_struct("PregelLoop")
249 .field("state", &"<state>")
250 .field("nodes", &self.nodes.len())
251 .field("trigger_table", &self.trigger_table)
252 .field("field_versions", &self.field_versions)
253 .field("versions_seen", &self.versions_seen)
254 .field("runnable_config", &self.runnable_config)
255 .field("cancellation_token", &self.cancellation_token)
256 .field("stream_tx", &self.stream_tx.is_some())
257 .field("checkpointer", &self.checkpointer.is_some())
258 .field("step", &self.step)
259 .field("status", &self.status)
260 .field("pending_tasks", &self.pending_tasks)
261 .field(
262 "previous_superstep_changed_fields",
263 &self.previous_superstep_changed_fields,
264 )
265 .field("budget_tracker", &self.budget_tracker.is_some())
266 .field("run_control", &self.run_control)
267 .field("run_id", &self.run_id)
268 .field("interrupt_rx", &self.interrupt_rx.is_some())
269 .field("pending_interrupts", &self.pending_interrupts.len())
270 .field("scratchpad", &self.scratchpad)
271 .field("interrupt_versions_seen", &self.interrupt_versions_seen)
272 .field("superstep_start", &self.superstep_start.is_some())
273 .field("error_handler_map", &self.error_handler_map.len())
274 .field("trigger_to_nodes", &"<cached>")
275 .field(
276 "retry_policies",
277 &self.retry_policies.keys().collect::<Vec<_>>(),
278 )
279 .field(
280 "timeout_policies",
281 &self.timeout_policies.keys().collect::<Vec<_>>(),
282 )
283 .field("delta_counters", &self.delta_counters.len())
284 .field("channels_finished", &self.channels_finished)
285 .finish()
286 }
287}
288
289impl<S: State> PregelLoop<S> {
290 pub fn new(
319 state: S,
320 nodes: IndexMap<String, Arc<dyn Node<S>>>,
321 trigger_table: TriggerTable<S>,
322 config: crate::config::RunnableConfig,
323 num_fields: usize,
324 ) -> Result<Self, JunctureError> {
325 Self::with_error_handlers(
326 state,
327 nodes,
328 trigger_table,
329 config,
330 num_fields,
331 HashMap::new(),
332 )
333 }
334
335 pub fn with_error_handlers(
355 state: S,
356 nodes: IndexMap<String, Arc<dyn Node<S>>>,
357 trigger_table: TriggerTable<S>,
358 config: crate::config::RunnableConfig,
359 num_fields: usize,
360 error_handler_map: HashMap<String, String>,
361 ) -> Result<Self, JunctureError> {
362 let node_names: Vec<String> = nodes.keys().cloned().collect();
363 let field_versions = FieldVersionTracker::new(num_fields);
364 let versions_seen = VersionsSeen::new(&node_names, num_fields);
365 let cancellation_token = CancellationToken::new();
366
367 let pending_tasks = Self::compute_initial_tasks(&trigger_table);
369
370 let trigger_to_nodes =
372 crate::pregel::scheduler::TriggerToNodes::from_trigger_table(&trigger_table);
373
374 let run_id = uuid::Uuid::new_v4().to_string();
376
377 Ok(Self {
378 state,
379 nodes,
380 trigger_table,
381 field_versions,
382 versions_seen,
383 runnable_config: config,
384 cancellation_token,
385 stream_tx: None,
386 checkpointer: None,
387 step: 0,
388 status: LoopStatus::Running,
389 pending_tasks,
390 previous_superstep_changed_fields: FieldsChanged(0),
391 budget_tracker: None,
392 run_control: RunControl::new(),
393 run_id,
394 interrupt_rx: None,
395 pending_interrupts: Vec::new(),
396 scratchpad: crate::interrupt::Scratchpad::new(),
397 interrupt_versions_seen: HashMap::new(),
398 superstep_start: None,
399 error_handler_map,
400 trigger_to_nodes,
401 retry_policies: HashMap::new(),
402 timeout_policies: HashMap::new(),
403 delta_counters: HashMap::new(),
404 channels_finished: false,
405 })
406 }
407
408 pub fn set_stream_sender(&mut self, tx: mpsc::UnboundedSender<StreamEvent<S>>) {
419 self.stream_tx = Some(tx);
420 }
421
422 pub fn set_checkpointer(&mut self, saver: Arc<dyn crate::checkpoint::CheckpointSaver>) {
424 self.checkpointer = Some(saver);
425 }
426
427 pub fn set_budget_tracker(&mut self, tracker: BudgetTracker) {
443 let shared = Arc::new(tracker);
444 self.runnable_config.budget_tracker = Some(Arc::clone(&shared));
445 self.budget_tracker = Some(shared);
446 }
447
448 pub fn set_retry_policies(&mut self, policies: HashMap<String, crate::graph::RetryPolicy>) {
455 self.retry_policies = policies;
456 }
457
458 pub fn set_timeout_policies(
465 &mut self,
466 policies: HashMap<String, crate::pregel::context::TimeoutPolicy>,
467 ) {
468 self.timeout_policies = policies;
469 }
470
471 fn compute_initial_tasks(trigger_table: &TriggerTable<S>) -> Vec<PendingTask<S>> {
473 let mut initial_tasks = Vec::new();
475
476 for (node_name, sources) in &trigger_table.incoming {
477 for source in sources {
478 if let crate::edge::TriggerSource::Edge { from } = source
479 && from == crate::edge::START
480 {
481 initial_tasks.push(PendingTask::pull(
482 uuid::Uuid::new_v4().to_string(),
483 node_name.clone(),
484 ));
485 break;
486 }
487 }
488 }
489
490 initial_tasks
491 }
492
493 #[allow(
513 clippy::too_many_lines,
514 reason = "Function contains multiple termination path checks (recursion limit, cancellation, budget, drain, interrupt) with finish_all_channels() calls on each path. Refactoring would reduce clarity by splitting related checks."
515 )]
516 pub fn tick(&mut self) -> Result<bool, JunctureError> {
517 let span = tracing::info_span!(
519 "juncture.graph.invoke",
520 "juncture.thread.id" = ?std::thread::current().id(),
521 "juncture.step" = self.step,
522 "juncture.recursion.limit" = self.runnable_config.recursion_limit,
523 "juncture.graph.name" = ?self.runnable_config.graph_name,
524 "juncture.run.id" = %self.run_id,
525 );
526 let _enter = span.enter();
527
528 if self.step >= self.runnable_config.recursion_limit {
530 self.status = LoopStatus::OutOfSteps;
531 self.emit_counter("juncture.graph.errors", 1);
532 let result: Result<(), JunctureError> = Err(JunctureError::recursion_limit(
533 self.step,
534 self.runnable_config.recursion_limit,
535 ));
536 self.on_graph_end(&result);
537 self.finish_all_channels();
539 let Err(err) = result else {
542 unreachable!("result was constructed as Err");
543 };
544 return Err(err);
545 }
546
547 if self.cancellation_token.is_cancelled() {
549 self.status = LoopStatus::Cancelled;
550 self.on_graph_end(&Ok(()));
551 self.finish_all_channels();
553 return Ok(false);
554 }
555
556 if let Some(tracker) = &self.budget_tracker
558 && let Some(reason) = tracker.check()
559 {
560 self.status = LoopStatus::BudgetExceeded;
561 self.emit_counter("juncture.graph.errors", 1);
562 let result: Result<(), JunctureError> = Err(JunctureError::execution(format!(
563 "Budget exceeded: {reason}"
564 )));
565 self.on_graph_end(&result);
566 self.finish_all_channels();
568 let Err(err) = result else {
569 unreachable!("result was constructed as Err");
570 };
571 return Err(err);
572 }
573
574 if let Some(ref tracker) = self.budget_tracker {
576 let usage = tracker.current_usage();
577 if let Some(ref budget) = self.runnable_config.budget {
578 if let Some(max_tokens) = budget.max_tokens {
579 self.emit_gauge(
580 "juncture.budget.remaining_tokens",
581 max_tokens.saturating_sub(usage.tokens_used),
582 );
583 }
584 if let Some(max_cost) = budget.max_cost_usd {
585 #[allow(
586 clippy::cast_possible_truncation,
587 clippy::cast_sign_loss,
588 reason = "Gauge values are u64; cost is converted to micro-units (6 decimal places) for precision. Truncation is acceptable for gauge display."
589 )]
590 let remaining_micro_usd =
591 ((max_cost - usage.cost_usd).max(0.0) * 1_000_000.0) as u64;
592 self.emit_gauge("juncture.budget.remaining_cost_usd", remaining_micro_usd);
593 }
594 }
595 }
596
597 if self.pending_tasks.is_empty() {
599 if self.run_control.is_drain_requested() {
601 self.finish_all_channels();
604 self.status = LoopStatus::Done;
605 self.on_graph_end(&Ok(()));
606 return Ok(false);
607 }
608
609 self.finish_all_channels();
614 self.status = LoopStatus::Done;
615 self.on_graph_end(&Ok(()));
616 return Ok(false);
617 }
618
619 if let Some(ref interrupt_before_nodes) = self.runnable_config.interrupt_before {
621 let interrupt_before_set: HashSet<String> =
622 interrupt_before_nodes.iter().cloned().collect();
623
624 let channel_versions: HashMap<String, u64> = self
626 .field_versions
627 .versions()
628 .iter()
629 .enumerate()
630 .map(|(idx, ver)| (format!("field_{idx}"), *ver))
631 .collect();
632
633 if let Some(signals) = should_interrupt(
634 &self.pending_tasks,
635 &interrupt_before_set,
636 &HashSet::new(), &channel_versions,
638 &self.interrupt_versions_seen,
639 ) {
640 self.interrupt_versions_seen = channel_versions;
641 self.pending_interrupts.clone_from(&signals);
642 self.status = LoopStatus::InterruptBefore(signals);
643 self.finish_all_channels();
645 return Ok(false);
646 }
647 }
648
649 Ok(true)
650 }
651
652 fn resolve_state_json(tasks: &mut [PendingTask<S>]) -> Result<(), JunctureError>
657 where
658 S: serde::de::DeserializeOwned,
659 {
660 for task in tasks {
661 if task.state_override.is_none()
662 && let Some(ref json) = task.state_json
663 {
664 let deserialized = serde_json::from_value::<S>(json.clone()).map_err(|e| {
665 JunctureError::execution(format!(
666 "failed to deserialize state_json for task '{}': {e}",
667 task.node_name
668 ))
669 })?;
670 task.state_override = Some(deserialized);
671 }
672 }
673 Ok(())
674 }
675
676 pub async fn execute_superstep(&mut self) -> Result<SuperstepResult<S>, JunctureError>
693 where
694 S: serde::de::DeserializeOwned,
695 S::Update: serde::Serialize,
696 {
697 Self::resolve_state_json(&mut self.pending_tasks)?;
701
702 let arc_state: Arc<S> = Arc::new(std::mem::take(&mut self.state));
708 let node_names: Vec<_> = self
709 .pending_tasks
710 .iter()
711 .map(|t| t.node_name.as_str())
712 .collect();
713 let span = tracing::info_span!(
714 "juncture.superstep",
715 step = self.step,
716 num_tasks = self.pending_tasks.len(),
717 "juncture.step.nodes" = ?node_names,
718 "juncture.step.duration_ms" = tracing::field::Empty,
719 );
720 let _enter = span.enter();
721
722 let start = Instant::now();
724 self.superstep_start = Some(start);
725
726 if let Some(ref tx) = self.stream_tx {
728 let _ = tx.send(StreamEvent::Debug(DebugEvent::SuperstepStart {
729 step: self.step,
730 pending_nodes: node_names
731 .iter()
732 .copied()
733 .map(std::string::ToString::to_string)
734 .collect(),
735 }));
736 }
737
738 let num_tasks = u64::try_from(self.pending_tasks.len()).unwrap_or(u64::MAX);
740 self.emit_counter("juncture.superstep.tasks", num_tasks);
741
742 let (result, interrupt_rx) = execute_superstep(
743 &self.pending_tasks,
744 &arc_state,
745 &self.nodes,
746 &self.runnable_config,
747 &self.cancellation_token,
748 self.checkpointer.as_ref(),
749 &self.pending_interrupts,
750 &self.scratchpad,
751 &self.error_handler_map,
752 &self.retry_policies,
753 &self.timeout_policies,
754 self.step,
755 )
756 .await?;
757
758 self.state = match Arc::try_unwrap(arc_state) {
761 Ok(state) => state,
762 Err(arc) => {
763 tracing::warn!(
764 name: "juncture.state.arc_leak",
765 step = self.step,
766 "Arc refcount > 1 after superstep, falling back to clone"
767 );
768 S::clone(&*arc)
769 }
770 };
771
772 for signal in &self.pending_interrupts {
779 if let Some(ref id) = signal.id {
780 self.scratchpad.mark_interrupt_processed(id);
781 }
782 }
783
784 let duration = start.elapsed().as_millis();
785 tracing::Span::current().record("juncture.step.duration_ms", duration);
786
787 let duration_ms = u64::try_from(duration).unwrap_or(u64::MAX);
791 #[allow(
792 clippy::cast_precision_loss,
793 reason = "millisecond durations fit well within f64 precision for histogram recording"
794 )]
795 let duration_f64 = duration_ms as f64;
796 self.emit_histogram("juncture.superstep.duration_ms", duration_f64);
797
798 tracing::debug!(
800 name: "juncture.superstep.duration_ms",
801 step = self.step,
802 duration_ms = duration,
803 );
804
805 self.interrupt_rx = Some(interrupt_rx);
808
809 Ok(result)
810 }
811
812 #[expect(
829 clippy::too_many_lines,
830 reason = "after_tick orchestrates multiple sequential phases: apply writes, bump versions, consume channels, emit events including stream_data, compute tasks, drain interrupts, check interrupts, finish channels, increment step"
831 )]
832 #[allow(
833 clippy::cognitive_complexity,
834 reason = "after_tick orchestrates multiple sequential phases: apply writes, bump versions, consume channels, emit events including stream_data, compute tasks, drain interrupts, check interrupts, finish channels, increment step"
835 )]
836 pub async fn after_tick(&mut self, result: SuperstepResult<S>) -> Result<(), JunctureError>
837 where
838 S: Clone + serde::Serialize,
839 {
840 let versions_before_apply = self.field_versions.versions().to_vec();
846
847 let total_changed = apply_writes(
853 &mut self.state,
854 &result.task_outputs,
855 &mut self.field_versions,
856 )?;
857
858 self.update_delta_counters(&total_changed);
863
864 let fields_to_consume = self.previous_superstep_changed_fields.clone();
875 self.consume_triggered_channels(&fields_to_consume);
876
877 for task_output in &result.task_outputs {
881 self.versions_seen
882 .mark_consumed(&task_output.node_name, &versions_before_apply);
883 }
884
885 self.state.reset_ephemeral();
887
888 if let Some(ref tx) = self.stream_tx {
890 for task_output in &result.task_outputs {
891 let start_event = StreamEvent::TaskStart {
893 node: task_output.node_name.clone(),
894 task_id: task_output.task_id.clone(),
895 step: self.step,
896 };
897 let _ = tx.send(start_event);
898
899 let end_event = StreamEvent::TaskEnd {
901 node: task_output.node_name.clone(),
902 task_id: task_output.task_id.clone(),
903 step: self.step,
904 duration_ms: u64::try_from(task_output.duration.as_millis())
905 .expect("duration should fit in u64"),
906 };
907 let _ = tx.send(end_event);
908
909 for data in &task_output.command.stream_data {
913 let custom_event = StreamEvent::Custom {
914 node: task_output.node_name.clone(),
915 data: data.clone(),
916 ns: Vec::new(),
917 };
918 let _ = tx.send(custom_event);
919 }
920
921 if let Some(ref update) = task_output.command.update {
923 let updates_event = StreamEvent::Updates {
924 node: task_output.node_name.clone(),
925 update: update.clone(),
926 step: self.step,
927 };
928 let _ = tx.send(updates_event);
929 }
930 }
931
932 let values_event = StreamEvent::Values {
934 state: self.state.clone(),
935 step: self.step,
936 };
937 let _ = tx.send(values_event);
938
939 if let Some(superstep_start) = self.superstep_start {
941 let duration_ms =
942 u64::try_from(superstep_start.elapsed().as_millis()).unwrap_or(u64::MAX);
943 let end_event = StreamEvent::Debug(DebugEvent::SuperstepEnd {
944 step: self.step,
945 duration_ms,
946 });
947 let _ = tx.send(end_event);
948 }
949 }
950
951 self.pending_tasks = compute_next_tasks(
953 &result.task_outputs,
954 &self.trigger_table,
955 &self.trigger_to_nodes,
956 &self.state,
957 )
958 .await?;
959
960 let recovery_tasks =
964 schedule_error_handlers(&result.task_outputs, &self.nodes, &self.error_handler_map);
965 if !recovery_tasks.is_empty() {
966 tracing::debug!(
967 name: "juncture.error_handler.recovery_tasks",
968 step = self.step,
969 count = recovery_tasks.len(),
970 "Scheduling error handler recovery tasks"
971 );
972 self.pending_tasks.extend(recovery_tasks);
973 }
974
975 if let Some(ref tx) = self.stream_tx {
977 let next_node_names: Vec<String> = self
978 .pending_tasks
979 .iter()
980 .map(|t| t.node_name.clone())
981 .collect();
982 for node_name in &next_node_names {
984 let edge_event = StreamEvent::Debug(DebugEvent::EdgeTraversed {
985 from: "superstep".to_string(),
986 to: node_name.clone(),
987 edge_type: "conditional".to_string(),
988 });
989 let _ = tx.send(edge_event);
990 }
991 }
992
993 self.previous_superstep_changed_fields = total_changed;
997
998 self.save_superstep_checkpoint().await;
1003
1004 let mut node_interrupts = Vec::new();
1007 if let Some(mut rx) = self.interrupt_rx.take() {
1008 while let Ok(signal) = rx.try_recv() {
1010 node_interrupts.push(signal);
1011 }
1012 }
1013
1014 if !node_interrupts.is_empty() {
1016 self.pending_interrupts.clone_from(&node_interrupts);
1017 self.status = LoopStatus::InterruptAfter(node_interrupts.clone());
1018
1019 self.emit_interrupt_events(&node_interrupts);
1021
1022 let node = self.interrupt_node_name().to_string();
1024 self.save_interrupt_checkpoint(&node).await;
1025
1026 self.finish_all_channels();
1028 return Ok(());
1029 }
1030
1031 if result.has_bubble_ups() && self.handle_bubble_ups(&result.bubble_ups) {
1033 if self.status.is_interrupted() {
1036 let node = self.interrupt_node_name().to_string();
1037 self.save_interrupt_checkpoint(&node).await;
1038 }
1039 self.finish_all_channels();
1041 return Ok(());
1042 }
1043
1044 if let Some(ref interrupt_after_nodes) = self.runnable_config.interrupt_after {
1046 let interrupt_after_set: HashSet<String> =
1047 interrupt_after_nodes.iter().cloned().collect();
1048
1049 let channel_versions: HashMap<String, u64> = self
1051 .field_versions
1052 .versions()
1053 .iter()
1054 .enumerate()
1055 .map(|(idx, ver)| (format!("field_{idx}"), *ver))
1056 .collect();
1057
1058 if let Some(signals) = should_interrupt(
1059 &self.pending_tasks,
1060 &HashSet::new(), &interrupt_after_set,
1062 &channel_versions,
1063 &self.interrupt_versions_seen,
1064 ) {
1065 self.interrupt_versions_seen = channel_versions;
1066 self.pending_interrupts.clone_from(&signals);
1067 self.status = LoopStatus::InterruptAfter(signals.clone());
1068
1069 self.emit_interrupt_events(&signals);
1071
1072 let node = self.interrupt_node_name().to_string();
1074 self.save_interrupt_checkpoint(&node).await;
1075
1076 self.finish_all_channels();
1078 return Ok(());
1079 }
1080 }
1081
1082 if self.pending_tasks.is_empty() {
1086 self.finish_all_channels();
1087 if self.effective_durability() == Durability::Exit {
1090 self.save_exit_checkpoint().await;
1091 }
1092 }
1093
1094 self.step += 1;
1096
1097 if let Some(ref tracker) = self.budget_tracker {
1099 tracker.report_step();
1100 }
1101
1102 Ok(())
1103 }
1104
1105 fn handle_bubble_ups(&mut self, bubble_ups: &[BubbleUp<S>]) -> bool {
1113 let mut should_stop = false;
1114
1115 for bubble_up in bubble_ups {
1116 match bubble_up {
1117 BubbleUp::Interrupt(graph_interrupt) => {
1118 self.handle_bubble_up_interrupt(graph_interrupt);
1119 should_stop = true;
1120 }
1121 BubbleUp::Drained(drained) => {
1122 self.handle_bubble_up_drained(drained);
1123 should_stop = true;
1124 }
1125 BubbleUp::ParentCommand(cmd) => {
1126 self.handle_bubble_up_parent_command(cmd);
1127 }
1128 }
1129 }
1130
1131 should_stop
1132 }
1133
1134 fn handle_bubble_up_interrupt(
1136 &mut self,
1137 graph_interrupt: &crate::pregel::types::GraphInterrupt,
1138 ) {
1139 tracing::debug!(
1140 step = self.step,
1141 num_signals = graph_interrupt.interrupts.len(),
1142 interrupt_step = graph_interrupt.step,
1143 namespace = ?graph_interrupt.namespace,
1144 "Subgraph interrupt bubbling up to parent"
1145 );
1146
1147 self.pending_interrupts
1148 .clone_from(&graph_interrupt.interrupts);
1149 self.status = LoopStatus::InterruptAfter(graph_interrupt.interrupts.clone());
1150
1151 self.emit_interrupt_events_with_namespace(
1155 &graph_interrupt.interrupts,
1156 &graph_interrupt.namespace,
1157 );
1158 }
1159
1160 fn handle_bubble_up_drained(&mut self, drained: &crate::pregel::types::GraphDrained) {
1162 tracing::debug!(
1163 step = self.step,
1164 reason = %drained.reason,
1165 "Subgraph drained bubbling up to parent"
1166 );
1167
1168 self.status = LoopStatus::Drained;
1169 }
1170
1171 fn handle_bubble_up_parent_command(&mut self, parent_cmd: &crate::command::ParentCommand<S>) {
1173 tracing::debug!(
1174 step = self.step,
1175 source_node = %parent_cmd.source_node,
1176 namespace = %parent_cmd.namespace,
1177 goto = ?parent_cmd.command.goto,
1178 "Subgraph parent command bubbling up"
1179 );
1180
1181 if let Some(ref update) = parent_cmd.command.update {
1182 let changed = self.state.try_apply(update.clone());
1183 match changed {
1184 Ok(changed) => self.field_versions.bump_all(&changed),
1185 Err(err) => {
1186 tracing::warn!(
1187 name: "juncture.subgraph.parent_command.apply_failed",
1188 step = self.step,
1189 source_node = %parent_cmd.source_node,
1190 namespace = %parent_cmd.namespace,
1191 error = %err,
1192 "Failed to apply parent command from subgraph"
1193 );
1194 }
1195 }
1196 }
1197 }
1198
1199 #[must_use]
1207 pub fn into_state(self) -> S {
1208 self.state
1209 }
1210
1211 #[must_use]
1213 pub const fn step(&self) -> usize {
1214 self.step
1215 }
1216
1217 #[must_use]
1219 pub fn run_id(&self) -> &str {
1220 &self.run_id
1221 }
1222
1223 #[must_use]
1225 pub const fn status(&self) -> &LoopStatus {
1226 &self.status
1227 }
1228
1229 #[must_use]
1231 pub fn pending_interrupts(&self) -> &[crate::interrupt::InterruptSignal] {
1232 &self.pending_interrupts
1233 }
1234
1235 #[must_use]
1237 pub const fn scratchpad(&self) -> &crate::interrupt::Scratchpad {
1238 &self.scratchpad
1239 }
1240
1241 pub const fn scratchpad_mut(&mut self) -> &mut crate::interrupt::Scratchpad {
1243 &mut self.scratchpad
1244 }
1245
1246 #[must_use]
1248 pub const fn is_running(&self) -> bool {
1249 matches!(self.status, LoopStatus::Running)
1250 }
1251
1252 #[must_use]
1257 pub fn snapshot_state(&self) -> S
1258 where
1259 S: Clone,
1260 {
1261 self.state.clone()
1262 }
1263
1264 #[must_use]
1283 pub const fn run_control(&self) -> &RunControl {
1284 &self.run_control
1285 }
1286
1287 #[must_use]
1307 #[allow(
1308 clippy::clone_on_copy,
1309 reason = "ExecutionContext requires owned state, not reference"
1310 )]
1311 pub fn as_context(&self) -> ExecutionContext<S>
1312 where
1313 S: Clone,
1314 {
1315 ExecutionContext {
1316 state: self.state.clone(),
1317 field_versions: self.field_versions.clone(),
1318 versions_seen: self.versions_seen.clone(),
1319 pending_writes: vec![],
1320 }
1321 }
1322
1323 #[must_use]
1343 pub fn as_config(&self) -> crate::pregel::context::ExecutionConfig {
1344 crate::pregel::context::ExecutionConfig {
1345 recursion_limit: self.runnable_config.recursion_limit,
1346 interrupt_before: self
1347 .runnable_config
1348 .interrupt_before
1349 .as_ref()
1350 .map_or_else(HashSet::new, |v| v.iter().cloned().collect()),
1351 interrupt_after: self
1352 .runnable_config
1353 .interrupt_after
1354 .as_ref()
1355 .map_or_else(HashSet::new, |v| v.iter().cloned().collect()),
1356 budget: self.runnable_config.budget.clone(),
1357 durability: self.runnable_config.durability.clone().unwrap_or_default(),
1358 retry_policies: std::collections::HashMap::new(),
1359 timeout_policies: std::collections::HashMap::new(),
1360 }
1361 }
1362
1363 #[allow(
1375 clippy::cognitive_complexity,
1376 clippy::too_many_lines,
1377 reason = "durability match arms and checkpoint construction logic are necessarily complex for handling Sync/Async/Exit modes"
1378 )]
1379 async fn save_interrupt_checkpoint(&mut self, node: &str)
1380 where
1381 S: serde::Serialize,
1382 {
1383 let Some(ref checkpointer) = self.checkpointer else {
1384 return;
1385 };
1386
1387 let channel_values = match serde_json::to_value(&self.state) {
1388 Ok(v) => v,
1389 Err(err) => {
1390 tracing::warn!(
1391 name: "juncture.checkpoint.interrupt.serialize_failed",
1392 node = node,
1393 error = %err,
1394 "Failed to serialize state for interrupt checkpoint"
1395 );
1396 return;
1397 }
1398 };
1399
1400 let (channel_versions, new_versions, versions_seen) = self.build_checkpoint_versions();
1401
1402 let checkpoint_id = generate_checkpoint_id();
1403 let cp_id_for_event = checkpoint_id.clone();
1404 let created_at = chrono::Utc::now().to_rfc3339();
1405
1406 let checkpoint = Checkpoint {
1407 id: checkpoint_id,
1408 channel_values,
1409 channel_versions,
1410 versions_seen,
1411 pending_tasks: Vec::new(),
1412 pending_sends: Vec::new(),
1413 pending_interrupts: self.pending_interrupts.clone(),
1414 schema_version: S::schema_version(),
1415 created_at,
1416 v: 1,
1417 new_versions,
1418 counters_since_delta_snapshot: self.build_checkpoint_delta_counters(),
1419 };
1420
1421 let metadata = CheckpointMetadata {
1422 source: CheckpointSource::Interrupt {
1423 node: node.to_string(),
1424 },
1425 step: i64::try_from(self.step).unwrap_or(i64::MAX),
1426 writes: HashMap::new(),
1427 parents: HashMap::new(),
1428 run_id: self.run_id.clone(),
1429 };
1430
1431 let cp_config = self.runnable_config.clone();
1432 let stream_tx_clone = self.stream_tx.clone();
1433 match self.effective_durability() {
1434 Durability::Async => {
1435 let step = self.step;
1436 let node_label = node.to_string();
1437 let checkpointer_arc = Arc::clone(checkpointer);
1438 let metadata_for_event = metadata.clone();
1439 tokio::spawn(async move {
1440 match checkpointer_arc.put(&cp_config, checkpoint, metadata).await {
1441 Ok(_updated_config) => {
1442 tracing::info!(
1443 name: "juncture.checkpoint.put",
1444 checkpoint_step = step,
1445 checkpoint_source = "Interrupt",
1446 "Interrupt checkpoint persisted (async)"
1447 );
1448 if let Some(ref collector) = cp_config.metrics_collector {
1450 collector.inc_counter("juncture.checkpoint.writes", 1);
1451 }
1452 Self::emit_checkpoint_saved_event(
1454 stream_tx_clone.as_ref(),
1455 cp_id_for_event,
1456 metadata_for_event,
1457 step,
1458 );
1459 }
1460 Err(err) => {
1461 tracing::warn!(
1462 name: "juncture.checkpoint.interrupt.save_failed",
1463 node = node_label,
1464 error = %err,
1465 "Failed to save interrupt checkpoint (async)"
1466 );
1467 if let Some(ref collector) = cp_config.metrics_collector {
1469 collector.inc_counter("juncture.checkpoint.errors", 1);
1470 }
1471 }
1472 }
1473 });
1474 self.reset_delta_counters();
1475 }
1476 Durability::Sync | Durability::Exit => {
1477 let metadata_for_event = metadata.clone();
1478 match checkpointer
1479 .put(&self.runnable_config, checkpoint, metadata)
1480 .await
1481 {
1482 Ok(updated_config) => {
1483 self.runnable_config.checkpoint_id = updated_config.checkpoint_id;
1484 self.reset_delta_counters();
1485 self.emit_counter("juncture.checkpoint.writes", 1);
1487 tracing::info!(
1488 name: "juncture.checkpoint.put",
1489 checkpoint_id = %self.runnable_config.checkpoint_id.as_deref().unwrap_or("unknown"),
1490 checkpoint_step = self.step,
1491 checkpoint_source = "Interrupt",
1492 "Interrupt checkpoint persisted"
1493 );
1494 if let Some(ref cp_id) = self.runnable_config.checkpoint_id {
1495 self.on_checkpoint_saved(cp_id, self.step);
1496 Self::emit_checkpoint_saved_event(
1498 self.stream_tx.as_ref(),
1499 cp_id.clone(),
1500 metadata_for_event,
1501 self.step,
1502 );
1503 }
1504 }
1505 Err(err) => {
1506 tracing::warn!(
1507 name: "juncture.checkpoint.interrupt.save_failed",
1508 node = node,
1509 error = %err,
1510 "Failed to save interrupt checkpoint"
1511 );
1512 self.emit_counter("juncture.checkpoint.errors", 1);
1514 }
1515 }
1516 }
1517 }
1518 }
1519
1520 #[allow(
1535 clippy::cognitive_complexity,
1536 clippy::too_many_lines,
1537 reason = "durability match arms and checkpoint construction logic are necessarily complex for handling Sync/Async/Exit modes"
1538 )]
1539 async fn save_superstep_checkpoint(&mut self)
1540 where
1541 S: serde::Serialize,
1542 {
1543 let Some(ref checkpointer) = self.checkpointer else {
1544 return;
1545 };
1546
1547 if self.effective_durability() == Durability::Exit {
1551 return;
1552 }
1553
1554 let needs_full_snapshot = self.should_take_full_snapshot();
1559 tracing::debug!(
1560 name = "juncture.checkpoint.superstep.delta_decision",
1561 step = self.step,
1562 needs_full_snapshot = needs_full_snapshot,
1563 "Delta-channel snapshot frequency evaluation"
1564 );
1565
1566 if !needs_full_snapshot {
1569 tracing::debug!(
1570 name = "juncture.checkpoint.superstep.skipped",
1571 step = self.step,
1572 "Skipped full checkpoint - delta optimization active"
1573 );
1574 return;
1575 }
1576
1577 let channel_values = match serde_json::to_value(&self.state) {
1578 Ok(v) => v,
1579 Err(err) => {
1580 tracing::warn!(
1581 name: "juncture.checkpoint.superstep.serialize_failed",
1582 step = self.step,
1583 error = %err,
1584 "Failed to serialize state for superstep checkpoint"
1585 );
1586 return;
1587 }
1588 };
1589
1590 let (channel_versions, new_versions, versions_seen) = self.build_checkpoint_versions();
1591
1592 let pending_tasks: Vec<crate::checkpoint::CheckpointPendingTask> = self
1595 .pending_tasks
1596 .iter()
1597 .map(|task| crate::checkpoint::CheckpointPendingTask {
1598 id: task.id.clone(),
1599 node: task.node_name.clone(),
1600 triggers: Vec::new(),
1601 state_override: None,
1602 })
1603 .collect();
1604
1605 let checkpoint_id = generate_checkpoint_id();
1606 let cp_id_for_event = checkpoint_id.clone();
1607 let created_at = chrono::Utc::now().to_rfc3339();
1608
1609 let checkpoint = Checkpoint {
1610 id: checkpoint_id,
1611 channel_values,
1612 channel_versions,
1613 versions_seen,
1614 pending_tasks,
1615 pending_sends: Vec::new(),
1616 pending_interrupts: Vec::new(),
1617 schema_version: S::schema_version(),
1618 created_at,
1619 v: 1,
1620 new_versions,
1621 counters_since_delta_snapshot: self.build_checkpoint_delta_counters(),
1622 };
1623
1624 let metadata = CheckpointMetadata {
1625 source: CheckpointSource::Loop,
1626 step: i64::try_from(self.step).unwrap_or(i64::MAX),
1627 writes: HashMap::new(),
1628 parents: HashMap::new(),
1629 run_id: self.run_id.clone(),
1630 };
1631
1632 let cp_config = self.runnable_config.clone();
1633 let stream_tx_clone = self.stream_tx.clone();
1634 match self.effective_durability() {
1635 Durability::Async => {
1636 let step = self.step;
1637 let checkpointer_arc = Arc::clone(checkpointer);
1638 let metadata_for_event = metadata.clone();
1639 tokio::spawn(async move {
1640 match checkpointer_arc.put(&cp_config, checkpoint, metadata).await {
1641 Ok(_updated_config) => {
1642 tracing::info!(
1643 name: "juncture.checkpoint.put",
1644 checkpoint_step = step,
1645 checkpoint_source = "Loop",
1646 "Superstep checkpoint persisted (async)"
1647 );
1648 if let Some(ref collector) = cp_config.metrics_collector {
1650 collector.inc_counter("juncture.checkpoint.writes", 1);
1651 }
1652 Self::emit_checkpoint_saved_event(
1654 stream_tx_clone.as_ref(),
1655 cp_id_for_event,
1656 metadata_for_event,
1657 step,
1658 );
1659 }
1660 Err(err) => {
1661 tracing::warn!(
1662 name: "juncture.checkpoint.superstep.save_failed",
1663 step = step,
1664 error = %err,
1665 "Failed to save superstep checkpoint (async)"
1666 );
1667 if let Some(ref collector) = cp_config.metrics_collector {
1669 collector.inc_counter("juncture.checkpoint.errors", 1);
1670 }
1671 }
1672 }
1673 });
1674 self.reset_delta_counters();
1675 }
1676 Durability::Sync | Durability::Exit => {
1677 let metadata_for_event = metadata.clone();
1678 match checkpointer
1679 .put(&self.runnable_config, checkpoint, metadata)
1680 .await
1681 {
1682 Ok(updated_config) => {
1683 self.runnable_config.checkpoint_id = updated_config.checkpoint_id;
1684 self.reset_delta_counters();
1688 self.emit_counter("juncture.checkpoint.writes", 1);
1690 tracing::info!(
1691 name: "juncture.checkpoint.put",
1692 checkpoint_id = %self.runnable_config.checkpoint_id.as_deref().unwrap_or("unknown"),
1693 checkpoint_step = self.step,
1694 checkpoint_source = "Loop",
1695 "Superstep checkpoint persisted"
1696 );
1697 if let Some(ref cp_id) = self.runnable_config.checkpoint_id {
1698 self.on_checkpoint_saved(cp_id, self.step);
1699 Self::emit_checkpoint_saved_event(
1701 self.stream_tx.as_ref(),
1702 cp_id.clone(),
1703 metadata_for_event,
1704 self.step,
1705 );
1706 }
1707 }
1708 Err(err) => {
1709 tracing::warn!(
1710 name: "juncture.checkpoint.superstep.save_failed",
1711 step = self.step,
1712 error = %err,
1713 "Failed to save superstep checkpoint"
1714 );
1715 self.emit_counter("juncture.checkpoint.errors", 1);
1717 }
1718 }
1719 }
1720 }
1721 }
1722
1723 pub async fn save_pending_interrupt_checkpoint(&mut self)
1742 where
1743 S: serde::Serialize,
1744 {
1745 if !self.status.is_interrupted() || self.checkpointer.is_none() {
1746 return;
1747 }
1748 let node = self.interrupt_node_name().to_string();
1749 self.save_interrupt_checkpoint(&node).await;
1750 }
1751
1752 fn interrupt_node_name(&self) -> &str {
1757 static UNKNOWN: &str = "unknown";
1758 self.pending_interrupts
1759 .first()
1760 .and_then(|s| s.payload.get("node"))
1761 .and_then(|v| v.as_str())
1762 .unwrap_or(UNKNOWN)
1763 }
1764
1765 fn current_ns(&self) -> Vec<String> {
1772 self.runnable_config
1773 .checkpoint_ns
1774 .as_ref()
1775 .map(|ns| {
1776 ns.segments
1777 .iter()
1778 .map(|seg| seg.node_name.clone())
1779 .collect()
1780 })
1781 .unwrap_or_default()
1782 }
1783
1784 fn emit_interrupt_events(&self, signals: &[crate::interrupt::InterruptSignal]) {
1790 self.emit_interrupt_events_with_namespace(signals, &self.current_ns());
1791 }
1792
1793 fn emit_interrupt_events_with_namespace(
1804 &self,
1805 signals: &[crate::interrupt::InterruptSignal],
1806 namespace: &[String],
1807 ) {
1808 let Some(ref tx) = self.stream_tx else {
1809 return;
1810 };
1811
1812 for signal in signals {
1813 let node = signal
1814 .payload
1815 .get("node")
1816 .and_then(|v| v.as_str())
1817 .unwrap_or("unknown");
1818
1819 let tags: &[String] = &[];
1822 if crate::interrupt::is_hidden_node(node, tags) {
1823 continue;
1824 }
1825
1826 let event = StreamEvent::Interrupt {
1827 node: node.to_string(),
1828 payload: signal.payload.clone(),
1829 resumable: true,
1830 ns: namespace.to_vec(),
1831 };
1832 let _ = tx.send(event);
1833 }
1834 }
1835
1836 fn finish_all_channels(&mut self) {
1859 if self.channels_finished {
1864 return;
1865 }
1866
1867 for &field_idx in S::replace_after_finish_field_indices() {
1868 self.state.finish_field(field_idx);
1869 }
1870
1871 self.channels_finished = true;
1872 }
1873
1874 fn consume_triggered_channels(&mut self, changed: &crate::FieldsChanged) {
1886 for field_idx in 0..S::field_count() {
1887 if changed.has_field(field_idx) {
1888 self.state.consume_field(field_idx);
1889 }
1890 }
1891 }
1892
1893 #[must_use]
1899 fn effective_durability(&self) -> Durability {
1900 self.runnable_config
1901 .durability
1902 .clone()
1903 .unwrap_or(Durability::Sync)
1904 }
1905
1906 #[must_use]
1914 #[allow(
1915 clippy::type_complexity,
1916 reason = "return type is a direct mapping of the three version maps required by Checkpoint struct; factoring into a named type adds indirection without benefit"
1917 )]
1918 fn build_checkpoint_versions(
1919 &self,
1920 ) -> (
1921 HashMap<String, u64>,
1922 HashMap<String, u64>,
1923 HashMap<String, HashMap<String, u64>>,
1924 ) {
1925 let channel_versions: HashMap<String, u64> = self
1926 .field_versions
1927 .versions()
1928 .iter()
1929 .enumerate()
1930 .map(|(idx, ver)| (format!("field_{idx}"), *ver))
1931 .collect();
1932
1933 let new_versions = channel_versions.clone();
1934
1935 let versions_seen: HashMap<String, HashMap<String, u64>> = self
1936 .nodes
1937 .keys()
1938 .map(|node_name| {
1939 let versions = self.versions_seen.get_versions(node_name);
1940 let map: HashMap<String, u64> = versions
1941 .iter()
1942 .enumerate()
1943 .map(|(idx, ver)| (format!("field_{idx}"), *ver))
1944 .collect();
1945 (node_name.clone(), map)
1946 })
1947 .collect();
1948
1949 (channel_versions, new_versions, versions_seen)
1950 }
1951
1952 async fn save_exit_checkpoint(&mut self)
1961 where
1962 S: serde::Serialize,
1963 {
1964 let Some(ref checkpointer) = self.checkpointer else {
1965 return;
1966 };
1967
1968 let channel_values = match serde_json::to_value(&self.state) {
1969 Ok(v) => v,
1970 Err(err) => {
1971 tracing::warn!(
1972 name: "juncture.checkpoint.exit.serialize_failed",
1973 step = self.step,
1974 error = %err,
1975 "Failed to serialize state for exit checkpoint"
1976 );
1977 return;
1978 }
1979 };
1980
1981 let (channel_versions, new_versions, versions_seen) = self.build_checkpoint_versions();
1982
1983 let pending_tasks: Vec<crate::checkpoint::CheckpointPendingTask> = self
1984 .pending_tasks
1985 .iter()
1986 .map(|task| crate::checkpoint::CheckpointPendingTask {
1987 id: task.id.clone(),
1988 node: task.node_name.clone(),
1989 triggers: Vec::new(),
1990 state_override: None,
1991 })
1992 .collect();
1993
1994 let checkpoint_id = generate_checkpoint_id();
1995 let created_at = chrono::Utc::now().to_rfc3339();
1996
1997 let checkpoint = Checkpoint {
1998 id: checkpoint_id,
1999 channel_values,
2000 channel_versions,
2001 versions_seen,
2002 pending_tasks,
2003 pending_sends: Vec::new(),
2004 pending_interrupts: Vec::new(),
2005 schema_version: S::schema_version(),
2006 created_at,
2007 v: 1,
2008 new_versions,
2009 counters_since_delta_snapshot: HashMap::new(),
2010 };
2011
2012 let metadata = CheckpointMetadata {
2013 source: CheckpointSource::Loop,
2014 step: i64::try_from(self.step).unwrap_or(i64::MAX),
2015 writes: HashMap::new(),
2016 parents: HashMap::new(),
2017 run_id: self.run_id.clone(),
2018 };
2019
2020 let metadata_for_event = metadata.clone();
2021 match checkpointer
2022 .put(&self.runnable_config, checkpoint, metadata)
2023 .await
2024 {
2025 Ok(updated_config) => {
2026 self.runnable_config.checkpoint_id = updated_config.checkpoint_id;
2027 self.emit_counter("juncture.checkpoint.writes", 1);
2029 tracing::info!(
2030 name: "juncture.checkpoint.put",
2031 checkpoint_id = %self.runnable_config.checkpoint_id.as_deref().unwrap_or("unknown"),
2032 checkpoint_step = self.step,
2033 checkpoint_source = "Loop",
2034 "Exit checkpoint persisted"
2035 );
2036 if let Some(ref cp_id) = self.runnable_config.checkpoint_id {
2037 self.on_checkpoint_saved(cp_id, self.step);
2038 Self::emit_checkpoint_saved_event(
2040 self.stream_tx.as_ref(),
2041 cp_id.clone(),
2042 metadata_for_event,
2043 self.step,
2044 );
2045 }
2046 }
2047 Err(err) => {
2048 tracing::warn!(
2049 name: "juncture.checkpoint.exit.save_failed",
2050 step = self.step,
2051 error = %err,
2052 "Failed to save exit checkpoint"
2053 );
2054 self.emit_counter("juncture.checkpoint.errors", 1);
2056 }
2057 }
2058 }
2059
2060 fn update_delta_counters(&mut self, changed: &crate::FieldsChanged) {
2071 let field_names = S::field_names();
2072 let num_fields = field_names.len().min(self.field_versions.len());
2073
2074 for field_idx in 0..num_fields {
2075 let channel_name = format!("field_{field_idx}");
2076 let entry = self.delta_counters.entry(channel_name).or_default();
2077
2078 entry.supersteps = entry.supersteps.saturating_add(1);
2080
2081 if changed.has_field(field_idx) {
2083 entry.updates = entry.updates.saturating_add(1);
2084 }
2085 }
2086 }
2087
2088 fn build_checkpoint_delta_counters(&self) -> HashMap<String, DeltaCounters> {
2093 self.delta_counters.clone()
2094 }
2095
2096 fn should_take_full_snapshot(&self) -> bool {
2103 let specs = S::delta_channel_specs();
2104 if specs.is_empty() {
2105 return true;
2108 }
2109
2110 for &(field_idx, frequency) in specs {
2111 let channel_name = format!("field_{field_idx}");
2112 if let Some(counters) = self.delta_counters.get(&channel_name)
2113 && counters.exceeds_frequency(frequency)
2114 {
2115 return true;
2116 }
2117 }
2118
2119 false
2120 }
2121
2122 fn reset_delta_counters(&mut self) {
2124 self.delta_counters.clear();
2125 }
2126
2127 #[inline]
2133 fn emit_counter(&self, name: &str, value: u64) {
2134 if let Some(ref collector) = self.runnable_config.metrics_collector {
2135 collector.inc_counter(name, value);
2136 }
2137 }
2138
2139 #[inline]
2141 fn emit_histogram(&self, name: &str, value: f64) {
2142 if let Some(ref collector) = self.runnable_config.metrics_collector {
2143 collector.record_histogram(name, value);
2144 }
2145 }
2146
2147 #[inline]
2149 fn emit_gauge(&self, name: &str, value: u64) {
2150 if let Some(ref collector) = self.runnable_config.metrics_collector {
2151 collector.set_gauge(name, value);
2152 }
2153 }
2154
2155 #[inline]
2162 fn on_graph_end(&self, result: &Result<(), JunctureError>) {
2163 let (total_tokens, cost_usd) = self.budget_tracker.as_ref().map_or((0, 0.0), |tracker| {
2165 let usage = tracker.current_usage();
2166 (usage.tokens_used, usage.cost_usd)
2167 });
2168
2169 let success = result.is_ok();
2170 let span = tracing::info_span!(
2171 "juncture.graph.complete",
2172 total_steps = self.step,
2173 total_tokens = total_tokens,
2174 cost_usd = cost_usd,
2175 success = success,
2176 );
2177 let _enter = span.enter();
2178
2179 tracing::info!("Graph execution completed");
2180
2181 if let Some(ref handler) = self.runnable_config.callback_handler {
2182 handler.on_graph_end(result);
2183 }
2184 }
2185
2186 #[inline]
2188 fn on_checkpoint_saved(&self, checkpoint_id: &str, step: usize) {
2189 if let Some(ref handler) = self.runnable_config.callback_handler {
2190 handler.on_checkpoint_saved(checkpoint_id, step);
2191 }
2192 }
2193
2194 #[inline]
2196 fn emit_checkpoint_saved_event(
2197 stream_tx: Option<&mpsc::UnboundedSender<StreamEvent<S>>>,
2198 checkpoint_id: String,
2199 metadata: CheckpointMetadata,
2200 step: usize,
2201 ) {
2202 if let Some(tx) = stream_tx {
2203 let _ = tx.send(StreamEvent::CheckpointSaved {
2204 checkpoint_id,
2205 metadata,
2206 step,
2207 });
2208 }
2209 }
2210}
2211
2212#[cfg(test)]
2213mod tests {
2214 use super::*;
2215 use crate::state::FieldVersions;
2216 use crate::{
2217 Command,
2218 node::IntoNode,
2219 node::NodeFnCommand,
2220 pregel::types::{TaskOutput, TaskTrigger},
2221 };
2222 use chrono::Utc;
2223
2224 #[test]
2225 fn test_pregel_loop_creation() {
2226 let state = TestState;
2227 let mut nodes = IndexMap::new();
2228 nodes.insert(
2229 "test_node".to_string(),
2230 NodeFnCommand(
2231 |_s: &TestState| -> std::pin::Pin<
2232 Box<
2233 dyn std::future::Future<
2234 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2235 > + Send,
2236 >,
2237 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2238 )
2239 .into_node("test_node"),
2240 );
2241
2242 let trigger_table = TriggerTable::new();
2243 let config = crate::config::RunnableConfig::new();
2244
2245 let result = PregelLoop::new(state, nodes, trigger_table, config, 0);
2246 result.unwrap();
2247 }
2248
2249 #[test]
2250 fn test_field_version_tracker() {
2251 let mut tracker = FieldVersionTracker::new(5);
2252
2253 assert_eq!(tracker.get(0), 0);
2254 assert_eq!(tracker.global_max(), 0);
2255
2256 tracker.bump(0);
2257 assert_eq!(tracker.get(0), 1);
2258 assert_eq!(tracker.global_max(), 1);
2259
2260 tracker.bump(2);
2261 assert_eq!(tracker.get(2), 2);
2262 assert_eq!(tracker.global_max(), 2);
2263 }
2264
2265 #[test]
2266 fn test_versions_seen() {
2267 let node_names = vec!["node_a".to_string(), "node_b".to_string()];
2268 let mut seen = VersionsSeen::new(&node_names, 3);
2269
2270 assert!(!seen.should_activate("node_a", &[0], &[0, 0, 0]));
2271
2272 let current = vec![1, 0, 0];
2273 assert!(seen.should_activate("node_a", &[0], ¤t));
2274
2275 seen.mark_consumed("node_a", ¤t);
2276 assert!(!seen.should_activate("node_a", &[0], ¤t));
2277 }
2278
2279 #[test]
2280 fn test_run_control() {
2281 let rc = RunControl::new();
2282 assert!(!rc.is_drain_requested());
2283
2284 rc.request_drain();
2285 assert!(rc.is_drain_requested());
2286 }
2287
2288 #[test]
2289 fn test_run_control_default() {
2290 let rc = RunControl::default();
2291 assert!(!rc.is_drain_requested());
2292 }
2293
2294 #[test]
2295 fn test_handle_bubble_up_interrupt_sets_status() {
2296 let state = TestState;
2297 let mut nodes = IndexMap::new();
2298 nodes.insert(
2299 "test_node".to_string(),
2300 NodeFnCommand(
2301 |_s: &TestState| -> std::pin::Pin<
2302 Box<
2303 dyn std::future::Future<
2304 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2305 > + Send,
2306 >,
2307 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2308 )
2309 .into_node("test_node"),
2310 );
2311
2312 let trigger_table = TriggerTable::new();
2313 let config = crate::config::RunnableConfig::new();
2314
2315 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2316
2317 let signals = vec![crate::interrupt::InterruptSignal {
2318 index: 0,
2319 id: Some("sub-int-0".to_string()),
2320 payload: serde_json::json!({"node": "subgraph_node"}),
2321 timestamp: Utc::now(),
2322 }];
2323 let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
2324 interrupts: signals,
2325 step: 2,
2326 namespace: vec![],
2327 })];
2328
2329 let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2330
2331 assert!(should_stop);
2332 assert!(loop_.status.is_interrupted());
2333 assert_eq!(loop_.pending_interrupts.len(), 1);
2334 assert_eq!(loop_.pending_interrupts[0].id.as_deref(), Some("sub-int-0"));
2335 }
2336
2337 #[test]
2338 fn test_handle_bubble_up_drained_sets_status() {
2339 let state = TestState;
2340 let mut nodes = IndexMap::new();
2341 nodes.insert(
2342 "test_node".to_string(),
2343 NodeFnCommand(
2344 |_s: &TestState| -> std::pin::Pin<
2345 Box<
2346 dyn std::future::Future<
2347 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2348 > + Send,
2349 >,
2350 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2351 )
2352 .into_node("test_node"),
2353 );
2354
2355 let trigger_table = TriggerTable::new();
2356 let config = crate::config::RunnableConfig::new();
2357
2358 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2359
2360 let bubble_ups = vec![BubbleUp::Drained(crate::pregel::types::GraphDrained {
2361 reason: "subgraph completed".to_string(),
2362 })];
2363
2364 let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2365
2366 assert!(should_stop);
2367 assert!(loop_.status.is_terminal());
2368 assert!(matches!(loop_.status, LoopStatus::Drained));
2369 }
2370
2371 #[test]
2372 fn test_handle_bubble_up_parent_command_does_not_stop() {
2373 let state = TestState;
2374 let mut nodes = IndexMap::new();
2375 nodes.insert(
2376 "test_node".to_string(),
2377 NodeFnCommand(
2378 |_s: &TestState| -> std::pin::Pin<
2379 Box<
2380 dyn std::future::Future<
2381 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2382 > + Send,
2383 >,
2384 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2385 )
2386 .into_node("test_node"),
2387 );
2388
2389 let trigger_table = TriggerTable::new();
2390 let config = crate::config::RunnableConfig::new();
2391
2392 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2393
2394 let parent_cmd = crate::command::ParentCommand::from_subgraph(
2395 Command::end(),
2396 "test_subgraph_node",
2397 "test_namespace",
2398 );
2399 let bubble_ups = vec![BubbleUp::ParentCommand(parent_cmd)];
2400
2401 let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2402
2403 assert!(!should_stop);
2404 assert!(loop_.status.is_running());
2405 }
2406
2407 #[test]
2408 fn test_handle_bubble_up_empty_does_nothing() {
2409 let state = TestState;
2410 let mut nodes = IndexMap::new();
2411 nodes.insert(
2412 "test_node".to_string(),
2413 NodeFnCommand(
2414 |_s: &TestState| -> std::pin::Pin<
2415 Box<
2416 dyn std::future::Future<
2417 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2418 > + Send,
2419 >,
2420 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2421 )
2422 .into_node("test_node"),
2423 );
2424
2425 let trigger_table = TriggerTable::new();
2426 let config = crate::config::RunnableConfig::new();
2427
2428 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2429
2430 let should_stop = loop_.handle_bubble_ups(&[]);
2431
2432 assert!(!should_stop);
2433 assert!(loop_.status.is_running());
2434 }
2435
2436 #[test]
2437 fn test_handle_bubble_up_interrupt_takes_priority_over_drain() {
2438 let state = TestState;
2439 let mut nodes = IndexMap::new();
2440 nodes.insert(
2441 "test_node".to_string(),
2442 NodeFnCommand(
2443 |_s: &TestState| -> std::pin::Pin<
2444 Box<
2445 dyn std::future::Future<
2446 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2447 > + Send,
2448 >,
2449 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2450 )
2451 .into_node("test_node"),
2452 );
2453
2454 let trigger_table = TriggerTable::new();
2455 let config = crate::config::RunnableConfig::new();
2456
2457 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2458
2459 let bubble_ups = vec![
2460 BubbleUp::Drained(crate::pregel::types::GraphDrained {
2461 reason: "drained".to_string(),
2462 }),
2463 BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
2464 interrupts: vec![crate::interrupt::InterruptSignal {
2465 index: 0,
2466 id: None,
2467 payload: serde_json::Value::Null,
2468 timestamp: Utc::now(),
2469 }],
2470 step: 1,
2471 namespace: vec![],
2472 }),
2473 ];
2474
2475 let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2476
2477 assert!(should_stop);
2478 assert!(loop_.status.is_interrupted());
2480 }
2481
2482 #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
2483 struct TestState;
2484
2485 impl State for TestState {
2486 type Update = TestUpdate;
2487 type FieldVersions = FieldVersions;
2488
2489 fn apply(&mut self, _: Self::Update) -> crate::FieldsChanged {
2490 crate::FieldsChanged(0)
2491 }
2492
2493 fn reset_ephemeral(&mut self) {}
2494 }
2495
2496 #[derive(Clone, Debug, Default, serde::Serialize)]
2497 struct TestUpdate;
2498
2499 #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
2503 struct DeltaTestState {
2504 value: i32,
2505 messages: Vec<String>,
2506 }
2507
2508 #[derive(Clone, Debug, Default, serde::Serialize)]
2509 struct DeltaTestUpdate {
2510 value: Option<i32>,
2511 messages: Option<Vec<String>>,
2512 }
2513
2514 impl State for DeltaTestState {
2515 type Update = DeltaTestUpdate;
2516 type FieldVersions = FieldVersions;
2517
2518 fn apply(&mut self, update: Self::Update) -> crate::FieldsChanged {
2519 let mut changed = crate::FieldsChanged(0);
2520 if let Some(v) = update.value {
2521 self.value = v;
2522 changed.set_field(0);
2523 }
2524 if let Some(msgs) = update.messages {
2525 self.messages.extend(msgs);
2526 changed.set_field(1);
2527 }
2528 changed
2529 }
2530
2531 fn reset_ephemeral(&mut self) {}
2532
2533 fn field_names() -> &'static [&'static str] {
2534 &["value", "messages"]
2535 }
2536
2537 fn field_count() -> usize {
2538 2
2539 }
2540
2541 fn delta_channel_specs() -> &'static [(usize, usize)] {
2543 &[(1, 3)]
2544 }
2545 }
2546
2547 struct CapturingCheckpointer {
2549 captured: Arc<std::sync::Mutex<Option<crate::checkpoint::Checkpoint>>>,
2550 }
2551
2552 #[async_trait::async_trait]
2553 impl crate::checkpoint::CheckpointSaver for CapturingCheckpointer {
2554 async fn get_tuple(
2555 &self,
2556 _: &crate::config::RunnableConfig,
2557 ) -> Result<Option<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
2558 {
2559 Ok(None)
2560 }
2561
2562 async fn list(
2563 &self,
2564 _: &crate::config::RunnableConfig,
2565 _: Option<crate::checkpoint::CheckpointFilter>,
2566 ) -> Result<Vec<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
2567 {
2568 Ok(Vec::new())
2569 }
2570
2571 async fn put(
2572 &self,
2573 _: &crate::config::RunnableConfig,
2574 checkpoint: crate::checkpoint::Checkpoint,
2575 _metadata: crate::checkpoint::CheckpointMetadata,
2576 ) -> Result<crate::config::RunnableConfig, crate::checkpoint::CheckpointError> {
2577 *self
2578 .captured
2579 .lock()
2580 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(checkpoint);
2581 let mut cfg = crate::config::RunnableConfig::new();
2582 cfg.checkpoint_id = Some("cp-capture".to_string());
2583 Ok(cfg)
2584 }
2585
2586 async fn put_writes(
2587 &self,
2588 _: &crate::config::RunnableConfig,
2589 _: Vec<crate::checkpoint::PendingWrite>,
2590 _: &str,
2591 ) -> Result<(), crate::checkpoint::CheckpointError> {
2592 Ok(())
2593 }
2594 }
2595
2596 #[tokio::test]
2598 async fn test_delta_counters_increment_on_field_change() {
2599 let state = DeltaTestState {
2600 value: 0,
2601 messages: vec![],
2602 };
2603 let mut nodes = IndexMap::new();
2604 nodes.insert(
2605 "test_node".to_string(),
2606 NodeFnCommand(
2607 |_s: &DeltaTestState| -> std::pin::Pin<
2608 Box<
2609 dyn std::future::Future<
2610 Output = Result<
2611 crate::Command<DeltaTestState>,
2612 crate::JunctureError,
2613 >,
2614 > + Send,
2615 >,
2616 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2617 )
2618 .into_node("test_node"),
2619 );
2620 let trigger_table = TriggerTable::new();
2621 let config = crate::config::RunnableConfig::new();
2622
2623 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2624
2625 let changed = crate::FieldsChanged(0b11); loop_.update_delta_counters(&changed);
2628
2629 assert_eq!(loop_.delta_counters.len(), 2, "should track both fields");
2630
2631 let field_0 = loop_
2632 .delta_counters
2633 .get("field_0")
2634 .expect("field_0 should exist");
2635 assert_eq!(field_0.updates, 1, "field_0 should have 1 update");
2636 assert_eq!(field_0.supersteps, 1, "field_0 should have 1 superstep");
2637
2638 let field_1 = loop_
2639 .delta_counters
2640 .get("field_1")
2641 .expect("field_1 should exist");
2642 assert_eq!(field_1.updates, 1, "field_1 should have 1 update");
2643 assert_eq!(field_1.supersteps, 1, "field_1 should have 1 superstep");
2644 }
2645
2646 #[tokio::test]
2648 async fn test_delta_counters_increment_unchanged_fields_get_superstep_only() {
2649 let state = DeltaTestState {
2650 value: 0,
2651 messages: vec![],
2652 };
2653 let mut nodes = IndexMap::new();
2654 nodes.insert(
2655 "test_node".to_string(),
2656 NodeFnCommand(
2657 |_s: &DeltaTestState| -> std::pin::Pin<
2658 Box<
2659 dyn std::future::Future<
2660 Output = Result<
2661 crate::Command<DeltaTestState>,
2662 crate::JunctureError,
2663 >,
2664 > + Send,
2665 >,
2666 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2667 )
2668 .into_node("test_node"),
2669 );
2670 let trigger_table = TriggerTable::new();
2671 let config = crate::config::RunnableConfig::new();
2672
2673 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2674
2675 let changed = crate::FieldsChanged(0b01);
2677 loop_.update_delta_counters(&changed);
2678
2679 let field_0 = loop_
2680 .delta_counters
2681 .get("field_0")
2682 .expect("field_0 should exist");
2683 assert_eq!(field_0.updates, 1, "field_0 should have 1 update");
2684
2685 let field_1 = loop_
2686 .delta_counters
2687 .get("field_1")
2688 .expect("field_1 should exist");
2689 assert_eq!(
2690 field_1.updates, 0,
2691 "field_1 should have 0 updates (not changed)"
2692 );
2693 assert_eq!(
2694 field_1.supersteps, 1,
2695 "field_1 should still have 1 superstep"
2696 );
2697 }
2698
2699 #[tokio::test]
2701 async fn test_delta_counters_accumulate_across_supersteps() {
2702 let state = DeltaTestState {
2703 value: 0,
2704 messages: vec![],
2705 };
2706 let mut nodes = IndexMap::new();
2707 nodes.insert(
2708 "test_node".to_string(),
2709 NodeFnCommand(
2710 |_s: &DeltaTestState| -> std::pin::Pin<
2711 Box<
2712 dyn std::future::Future<
2713 Output = Result<
2714 crate::Command<DeltaTestState>,
2715 crate::JunctureError,
2716 >,
2717 > + Send,
2718 >,
2719 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2720 )
2721 .into_node("test_node"),
2722 );
2723 let trigger_table = TriggerTable::new();
2724 let config = crate::config::RunnableConfig::new();
2725
2726 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2727
2728 loop_.update_delta_counters(&crate::FieldsChanged(0b01));
2730 loop_.update_delta_counters(&crate::FieldsChanged(0b11));
2732
2733 let field_0 = loop_
2734 .delta_counters
2735 .get("field_0")
2736 .expect("field_0 should exist");
2737 assert_eq!(field_0.updates, 2, "field_0 updated in both supersteps");
2738 assert_eq!(field_0.supersteps, 2, "field_0 has 2 supersteps");
2739
2740 let field_1 = loop_
2741 .delta_counters
2742 .get("field_1")
2743 .expect("field_1 should exist");
2744 assert_eq!(
2745 field_1.updates, 1,
2746 "field_1 updated in only second superstep"
2747 );
2748 assert_eq!(field_1.supersteps, 2, "field_1 has 2 supersteps");
2749 }
2750
2751 #[tokio::test]
2753 async fn test_delta_counters_populated_in_checkpoint_and_reset() {
2754 let state = DeltaTestState {
2755 value: 0,
2756 messages: vec![],
2757 };
2758 let mut nodes = IndexMap::new();
2759 nodes.insert(
2760 "test_node".to_string(),
2761 NodeFnCommand(
2762 |_s: &DeltaTestState| -> std::pin::Pin<
2763 Box<
2764 dyn std::future::Future<
2765 Output = Result<
2766 crate::Command<DeltaTestState>,
2767 crate::JunctureError,
2768 >,
2769 > + Send,
2770 >,
2771 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2772 )
2773 .into_node("test_node"),
2774 );
2775 let trigger_table = TriggerTable::new();
2776 let mut config = crate::config::RunnableConfig::new();
2777 config.thread_id = Some("test-thread".to_string());
2778
2779 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2780
2781 let captured: Arc<std::sync::Mutex<Option<crate::checkpoint::Checkpoint>>> =
2782 Arc::new(std::sync::Mutex::new(None));
2783 let checkpointer = CapturingCheckpointer {
2784 captured: Arc::clone(&captured),
2785 };
2786 loop_.set_checkpointer(Arc::new(checkpointer));
2787
2788 loop_.delta_counters.insert(
2793 "field_0".to_string(),
2794 DeltaCounters {
2795 updates: 1,
2796 supersteps: 1,
2797 },
2798 );
2799 loop_.delta_counters.insert(
2802 "field_1".to_string(),
2803 DeltaCounters {
2804 updates: 3,
2805 supersteps: 1,
2806 },
2807 );
2808
2809 loop_.pending_tasks = vec![PendingTask::pull(
2812 uuid::Uuid::new_v4().to_string(),
2813 "test_node".to_string(),
2814 )];
2815 let _ = loop_.execute_superstep().await;
2816 let _ = loop_.after_tick(SuperstepResult::empty()).await;
2817
2818 let checkpoint = captured
2820 .lock()
2821 .unwrap_or_else(std::sync::PoisonError::into_inner)
2822 .take()
2823 .expect("checkpoint should have been saved");
2824 assert!(
2825 !checkpoint.counters_since_delta_snapshot.is_empty(),
2826 "counters_since_delta_snapshot should be populated"
2827 );
2828 let field_0 = checkpoint
2829 .counters_since_delta_snapshot
2830 .get("field_0")
2831 .expect("field_0 should be in delta counters");
2832 assert_eq!(
2835 field_0.updates, 1,
2836 "field_0 should have 1 update in checkpoint"
2837 );
2838 assert_eq!(
2839 field_0.supersteps, 2,
2840 "field_0 should have 2 supersteps in checkpoint"
2841 );
2842
2843 let field_1 = checkpoint
2844 .counters_since_delta_snapshot
2845 .get("field_1")
2846 .expect("field_1 should be in delta counters");
2847 assert_eq!(
2848 field_1.updates, 3,
2849 "field_1 should have 3 updates in checkpoint"
2850 );
2851
2852 assert!(
2854 loop_.delta_counters.is_empty(),
2855 "delta counters should be reset after checkpoint save"
2856 );
2857 }
2858
2859 #[test]
2861 fn test_should_take_full_snapshot_no_delta_channels() {
2862 let state = TestState;
2864 let mut nodes = IndexMap::new();
2865 nodes.insert(
2866 "test_node".to_string(),
2867 NodeFnCommand(
2868 |_s: &TestState| -> std::pin::Pin<
2869 Box<
2870 dyn std::future::Future<
2871 Output = Result<crate::Command<TestState>, crate::JunctureError>,
2872 > + Send,
2873 >,
2874 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2875 )
2876 .into_node("test_node"),
2877 );
2878 let trigger_table = TriggerTable::new();
2879 let config = crate::config::RunnableConfig::new();
2880
2881 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2882
2883 assert!(
2885 loop_.should_take_full_snapshot(),
2886 "should always take full snapshot with no delta channels"
2887 );
2888
2889 loop_.delta_counters.insert(
2891 "field_0".to_string(),
2892 DeltaCounters {
2893 updates: 100,
2894 supersteps: 50,
2895 },
2896 );
2897 assert!(
2898 loop_.should_take_full_snapshot(),
2899 "still full snapshot when specs are empty (no delta optimization)"
2900 );
2901 }
2902
2903 #[test]
2905 fn test_should_take_full_snapshot_respects_frequency() {
2906 let state = DeltaTestState {
2907 value: 0,
2908 messages: vec![],
2909 };
2910 let mut nodes = IndexMap::new();
2911 nodes.insert(
2912 "test_node".to_string(),
2913 NodeFnCommand(
2914 |_s: &DeltaTestState| -> std::pin::Pin<
2915 Box<
2916 dyn std::future::Future<
2917 Output = Result<
2918 crate::Command<DeltaTestState>,
2919 crate::JunctureError,
2920 >,
2921 > + Send,
2922 >,
2923 > { Box::pin(async move { Ok(crate::Command::end()) }) },
2924 )
2925 .into_node("test_node"),
2926 );
2927 let trigger_table = TriggerTable::new();
2928 let config = crate::config::RunnableConfig::new();
2929
2930 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2931
2932 loop_.delta_counters.insert(
2934 "field_1".to_string(),
2935 DeltaCounters {
2936 updates: 2,
2937 supersteps: 2,
2938 },
2939 );
2940 assert!(
2941 !loop_.should_take_full_snapshot(),
2942 "should not take full snapshot below frequency threshold"
2943 );
2944
2945 loop_.delta_counters.insert(
2947 "field_1".to_string(),
2948 DeltaCounters {
2949 updates: 3,
2950 supersteps: 3,
2951 },
2952 );
2953 assert!(
2954 loop_.should_take_full_snapshot(),
2955 "should take full snapshot at frequency threshold"
2956 );
2957
2958 loop_.delta_counters.insert(
2960 "field_1".to_string(),
2961 DeltaCounters {
2962 updates: 10,
2963 supersteps: 5,
2964 },
2965 );
2966 assert!(
2967 loop_.should_take_full_snapshot(),
2968 "should take full snapshot above frequency threshold"
2969 );
2970 }
2971
2972 #[test]
2974 fn test_delta_counters_exceeds_frequency() {
2975 let counters = DeltaCounters::new();
2976 assert_eq!(counters.updates, 0);
2977 assert_eq!(counters.supersteps, 0);
2978
2979 assert!(
2981 counters.exceeds_frequency(0),
2982 "frequency 0 always snapshots"
2983 );
2984
2985 let counters = DeltaCounters {
2987 updates: 2,
2988 supersteps: 1,
2989 };
2990 assert!(!counters.exceeds_frequency(3), "2 < 3, not exceeded");
2991
2992 let counters = DeltaCounters {
2994 updates: 3,
2995 supersteps: 1,
2996 };
2997 assert!(counters.exceeds_frequency(3), "3 >= 3, exceeded");
2998
2999 let counters = DeltaCounters {
3001 updates: 10,
3002 supersteps: 1,
3003 };
3004 assert!(counters.exceeds_frequency(3), "10 >= 3, exceeded");
3005 }
3006
3007 #[tokio::test]
3014 async fn test_scratchpad_populated_after_execute_superstep() {
3015 let state = TestState;
3016
3017 let mut nodes = IndexMap::new();
3018 nodes.insert(
3019 "test_node".to_string(),
3020 NodeFnCommand(
3021 |_s: &TestState| -> std::pin::Pin<
3022 Box<
3023 dyn std::future::Future<
3024 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3025 > + Send,
3026 >,
3027 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3028 )
3029 .into_node("test_node"),
3030 );
3031
3032 let trigger_table = TriggerTable::new();
3033 let config = crate::config::RunnableConfig::new();
3034
3035 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3036
3037 loop_.pending_tasks = vec![PendingTask::pull(
3039 uuid::Uuid::new_v4().to_string(),
3040 "test_node".to_string(),
3041 )];
3042 loop_.pending_interrupts = vec![
3043 crate::interrupt::InterruptSignal {
3044 index: 0,
3045 id: Some("int-alpha".to_string()),
3046 payload: serde_json::Value::Null,
3047 timestamp: Utc::now(),
3048 },
3049 crate::interrupt::InterruptSignal {
3050 index: 1,
3051 id: Some("int-beta".to_string()),
3052 payload: serde_json::Value::Null,
3053 timestamp: Utc::now(),
3054 },
3055 ];
3056
3057 assert!(
3059 !loop_.scratchpad.is_interrupt_processed("int-alpha"),
3060 "scratchpad should be empty before superstep"
3061 );
3062 assert!(
3063 !loop_.scratchpad.is_interrupt_processed("int-beta"),
3064 "scratchpad should be empty before superstep"
3065 );
3066
3067 let result = loop_.execute_superstep().await;
3068 assert!(result.is_ok(), "execute_superstep should succeed");
3069
3070 assert!(
3072 loop_.scratchpad.is_interrupt_processed("int-alpha"),
3073 "int-alpha should be marked as processed after superstep"
3074 );
3075 assert!(
3076 loop_.scratchpad.is_interrupt_processed("int-beta"),
3077 "int-beta should be marked as processed after superstep"
3078 );
3079 assert!(
3080 !loop_.scratchpad.is_interrupt_processed("int-gamma"),
3081 "unrelated interrupt should not be marked as processed"
3082 );
3083 }
3084
3085 #[tokio::test]
3088 async fn test_scratchpad_accumulates_across_supersteps() {
3089 let state = TestState;
3090
3091 let mut nodes = IndexMap::new();
3092 nodes.insert(
3093 "test_node".to_string(),
3094 NodeFnCommand(
3095 |_s: &TestState| -> std::pin::Pin<
3096 Box<
3097 dyn std::future::Future<
3098 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3099 > + Send,
3100 >,
3101 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3102 )
3103 .into_node("test_node"),
3104 );
3105
3106 let trigger_table = TriggerTable::new();
3107 let config = crate::config::RunnableConfig::new();
3108
3109 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3110
3111 loop_.pending_tasks = vec![PendingTask::pull(
3113 uuid::Uuid::new_v4().to_string(),
3114 "test_node".to_string(),
3115 )];
3116 loop_.pending_interrupts = vec![crate::interrupt::InterruptSignal {
3117 index: 0,
3118 id: Some("int-1".to_string()),
3119 payload: serde_json::Value::Null,
3120 timestamp: Utc::now(),
3121 }];
3122
3123 let _ = loop_.execute_superstep().await;
3124 let _ = loop_.after_tick(SuperstepResult::empty()).await;
3125
3126 loop_.pending_tasks = vec![PendingTask::pull(
3128 uuid::Uuid::new_v4().to_string(),
3129 "test_node".to_string(),
3130 )];
3131 loop_.pending_interrupts = vec![crate::interrupt::InterruptSignal {
3132 index: 0,
3133 id: Some("int-2".to_string()),
3134 payload: serde_json::Value::Null,
3135 timestamp: Utc::now(),
3136 }];
3137
3138 let _ = loop_.execute_superstep().await;
3139
3140 assert!(
3142 loop_.scratchpad.is_interrupt_processed("int-1"),
3143 "int-1 from first superstep should still be tracked"
3144 );
3145 assert!(
3146 loop_.scratchpad.is_interrupt_processed("int-2"),
3147 "int-2 from second superstep should be tracked"
3148 );
3149 }
3150
3151 #[derive(Clone, Debug, PartialEq, Eq)]
3155 enum ObservedCall {
3156 Put {
3157 source: crate::checkpoint::CheckpointSource,
3158 step: i64,
3159 },
3160 }
3161
3162 struct TrackingCheckpointer {
3164 observed: Arc<std::sync::Mutex<Vec<ObservedCall>>>,
3165 }
3166
3167 #[async_trait::async_trait]
3168 impl crate::checkpoint::CheckpointSaver for TrackingCheckpointer {
3169 async fn get_tuple(
3170 &self,
3171 _: &crate::config::RunnableConfig,
3172 ) -> Result<Option<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
3173 {
3174 Ok(None)
3175 }
3176
3177 async fn list(
3178 &self,
3179 _: &crate::config::RunnableConfig,
3180 _: Option<crate::checkpoint::CheckpointFilter>,
3181 ) -> Result<Vec<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
3182 {
3183 Ok(Vec::new())
3184 }
3185
3186 async fn put(
3187 &self,
3188 _: &crate::config::RunnableConfig,
3189 _checkpoint: crate::checkpoint::Checkpoint,
3190 metadata: crate::checkpoint::CheckpointMetadata,
3191 ) -> Result<crate::config::RunnableConfig, crate::checkpoint::CheckpointError> {
3192 self.observed
3193 .lock()
3194 .unwrap_or_else(std::sync::PoisonError::into_inner)
3195 .push(ObservedCall::Put {
3196 source: metadata.source,
3197 step: metadata.step,
3198 });
3199 let mut cfg = crate::config::RunnableConfig::new();
3200 cfg.checkpoint_id = Some("cp-test".to_string());
3201 Ok(cfg)
3202 }
3203
3204 async fn put_writes(
3205 &self,
3206 _: &crate::config::RunnableConfig,
3207 _: Vec<crate::checkpoint::PendingWrite>,
3208 _: &str,
3209 ) -> Result<(), crate::checkpoint::CheckpointError> {
3210 Ok(())
3211 }
3212 }
3213
3214 #[tokio::test]
3217 async fn test_superstep_checkpoint_saved_on_normal_completion() {
3218 let state = TestState;
3219
3220 let mut nodes = IndexMap::new();
3221 nodes.insert(
3222 "test_node".to_string(),
3223 NodeFnCommand(
3224 |_s: &TestState| -> std::pin::Pin<
3225 Box<
3226 dyn std::future::Future<
3227 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3228 > + Send,
3229 >,
3230 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3231 )
3232 .into_node("test_node"),
3233 );
3234
3235 let trigger_table = TriggerTable::new();
3236 let mut config = crate::config::RunnableConfig::new();
3237 config.thread_id = Some("test-thread".to_string());
3238
3239 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3240
3241 let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3242 let checkpointer = TrackingCheckpointer {
3243 observed: Arc::clone(&observed),
3244 };
3245 loop_.set_checkpointer(Arc::new(checkpointer));
3246
3247 loop_.pending_tasks = vec![PendingTask::pull(
3249 uuid::Uuid::new_v4().to_string(),
3250 "test_node".to_string(),
3251 )];
3252
3253 let _ = loop_.execute_superstep().await;
3254 let _ = loop_.after_tick(SuperstepResult::empty()).await;
3255
3256 let has_loop_checkpoint = {
3258 let calls = observed
3259 .lock()
3260 .unwrap_or_else(std::sync::PoisonError::into_inner);
3261 calls.iter().any(|c| {
3262 matches!(
3263 c,
3264 ObservedCall::Put {
3265 source: crate::checkpoint::CheckpointSource::Loop,
3266 step: 0,
3267 }
3268 )
3269 })
3270 };
3271 assert!(has_loop_checkpoint, "expected a Loop checkpoint at step 0");
3272 }
3273
3274 #[tokio::test]
3277 async fn test_superstep_checkpoint_step_increments() {
3278 let state = TestState;
3279
3280 let mut nodes = IndexMap::new();
3281 nodes.insert(
3282 "test_node".to_string(),
3283 NodeFnCommand(
3284 |_s: &TestState| -> std::pin::Pin<
3285 Box<
3286 dyn std::future::Future<
3287 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3288 > + Send,
3289 >,
3290 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3291 )
3292 .into_node("test_node"),
3293 );
3294
3295 let trigger_table = TriggerTable::new();
3296 let mut config = crate::config::RunnableConfig::new();
3297 config.thread_id = Some("test-thread".to_string());
3298
3299 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3300
3301 let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3302 let checkpointer = TrackingCheckpointer {
3303 observed: Arc::clone(&observed),
3304 };
3305 loop_.set_checkpointer(Arc::new(checkpointer));
3306
3307 loop_.pending_tasks = vec![PendingTask::pull(
3309 uuid::Uuid::new_v4().to_string(),
3310 "test_node".to_string(),
3311 )];
3312 let _ = loop_.execute_superstep().await;
3313 let _ = loop_.after_tick(SuperstepResult::empty()).await;
3314
3315 loop_.pending_tasks = vec![PendingTask::pull(
3317 uuid::Uuid::new_v4().to_string(),
3318 "test_node".to_string(),
3319 )];
3320 let _ = loop_.execute_superstep().await;
3321 let _ = loop_.after_tick(SuperstepResult::empty()).await;
3322
3323 let loop_steps: Vec<i64> = {
3324 let calls = observed
3325 .lock()
3326 .unwrap_or_else(std::sync::PoisonError::into_inner);
3327 calls
3328 .iter()
3329 .filter_map(|c| match c {
3330 ObservedCall::Put {
3331 source: crate::checkpoint::CheckpointSource::Loop,
3332 step,
3333 } => Some(*step),
3334 ObservedCall::Put { .. } => None,
3335 })
3336 .collect()
3337 };
3338
3339 assert_eq!(
3340 loop_steps,
3341 vec![0, 1],
3342 "expected Loop checkpoints at steps 0 and 1, got: {loop_steps:?}"
3343 );
3344 }
3345
3346 #[tokio::test]
3349 async fn test_superstep_checkpoint_noop_without_checkpointer() {
3350 let state = TestState;
3351
3352 let mut nodes = IndexMap::new();
3353 nodes.insert(
3354 "test_node".to_string(),
3355 NodeFnCommand(
3356 |_s: &TestState| -> std::pin::Pin<
3357 Box<
3358 dyn std::future::Future<
3359 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3360 > + Send,
3361 >,
3362 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3363 )
3364 .into_node("test_node"),
3365 );
3366
3367 let trigger_table = TriggerTable::new();
3368 let config = crate::config::RunnableConfig::new();
3369
3370 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3371 assert!(
3372 loop_.checkpointer.is_none(),
3373 "no checkpointer should be configured by default"
3374 );
3375
3376 loop_.pending_tasks = vec![PendingTask::pull(
3378 uuid::Uuid::new_v4().to_string(),
3379 "test_node".to_string(),
3380 )];
3381
3382 let result = loop_.execute_superstep().await;
3383 assert!(result.is_ok(), "execute_superstep should succeed");
3384
3385 let after_result = loop_.after_tick(SuperstepResult::empty()).await;
3386 assert!(
3387 after_result.is_ok(),
3388 "after_tick should succeed without checkpointer"
3389 );
3390 }
3391
3392 #[test]
3395 fn test_current_ns_empty_when_no_checkpoint_ns() {
3396 let state = TestState;
3397 let mut nodes = IndexMap::new();
3398 nodes.insert(
3399 "test_node".to_string(),
3400 NodeFnCommand(
3401 |_s: &TestState| -> std::pin::Pin<
3402 Box<
3403 dyn std::future::Future<
3404 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3405 > + Send,
3406 >,
3407 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3408 )
3409 .into_node("test_node"),
3410 );
3411 let trigger_table = TriggerTable::new();
3412 let config = crate::config::RunnableConfig::new();
3413
3414 let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3415 assert!(
3416 loop_.current_ns().is_empty(),
3417 "root-level graph should have empty ns"
3418 );
3419 }
3420
3421 #[test]
3422 fn test_current_ns_extracts_node_names_from_checkpoint_ns() {
3423 let state = TestState;
3424 let mut nodes = IndexMap::new();
3425 nodes.insert(
3426 "test_node".to_string(),
3427 NodeFnCommand(
3428 |_s: &TestState| -> std::pin::Pin<
3429 Box<
3430 dyn std::future::Future<
3431 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3432 > + Send,
3433 >,
3434 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3435 )
3436 .into_node("test_node"),
3437 );
3438 let trigger_table = TriggerTable::new();
3439 let config = crate::config::RunnableConfig::new().with_checkpoint_ns(
3440 crate::checkpoint::CheckpointNamespace::new(vec![
3441 crate::checkpoint::NamespaceSegment::new(
3442 "review".to_string(),
3443 "uuid-1".to_string(),
3444 ),
3445 crate::checkpoint::NamespaceSegment::new(
3446 "detail".to_string(),
3447 "uuid-2".to_string(),
3448 ),
3449 ]),
3450 );
3451
3452 let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3453 let ns = loop_.current_ns();
3454 assert_eq!(ns, vec!["review", "detail"]);
3455 }
3456
3457 #[test]
3458 fn test_current_ns_single_segment() {
3459 let state = TestState;
3460 let mut nodes = IndexMap::new();
3461 nodes.insert(
3462 "test_node".to_string(),
3463 NodeFnCommand(
3464 |_s: &TestState| -> std::pin::Pin<
3465 Box<
3466 dyn std::future::Future<
3467 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3468 > + Send,
3469 >,
3470 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3471 )
3472 .into_node("test_node"),
3473 );
3474 let trigger_table = TriggerTable::new();
3475 let config = crate::config::RunnableConfig::new().with_checkpoint_ns(
3476 crate::checkpoint::CheckpointNamespace::new(vec![
3477 crate::checkpoint::NamespaceSegment::new(
3478 "agent".to_string(),
3479 "uuid-single".to_string(),
3480 ),
3481 ]),
3482 );
3483
3484 let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3485 let ns = loop_.current_ns();
3486 assert_eq!(ns, vec!["agent"]);
3487 }
3488
3489 #[test]
3492 fn test_bubble_up_interrupt_emits_ns_from_checkpoint_ns() {
3493 let state = TestState;
3494 let mut nodes = IndexMap::new();
3495 nodes.insert(
3496 "test_node".to_string(),
3497 NodeFnCommand(
3498 |_s: &TestState| -> std::pin::Pin<
3499 Box<
3500 dyn std::future::Future<
3501 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3502 > + Send,
3503 >,
3504 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3505 )
3506 .into_node("test_node"),
3507 );
3508
3509 let trigger_table = TriggerTable::new();
3510 let checkpoint_ns = crate::checkpoint::CheckpointNamespace::new(vec![
3511 crate::checkpoint::NamespaceSegment::new(
3512 "review".to_string(),
3513 "uuid-parent".to_string(),
3514 ),
3515 ]);
3516 let config = crate::config::RunnableConfig::new().with_checkpoint_ns(checkpoint_ns);
3517
3518 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3519
3520 let (tx, mut rx) = mpsc::unbounded_channel();
3522 loop_.stream_tx = Some(tx);
3523
3524 let signals = vec![crate::interrupt::InterruptSignal {
3525 index: 0,
3526 id: Some("int-ns-0".to_string()),
3527 payload: serde_json::json!({"node": "child_node"}),
3528 timestamp: Utc::now(),
3529 }];
3530 let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
3531 interrupts: signals,
3532 step: 1,
3533 namespace: vec!["review".to_string()],
3534 })];
3535
3536 let _ = loop_.handle_bubble_ups(&bubble_ups);
3537
3538 let event = rx
3540 .try_recv()
3541 .expect("should have received an interrupt event");
3542 match event {
3543 StreamEvent::Interrupt { ns, .. } => {
3544 assert_eq!(ns, vec!["review"]);
3545 }
3546 other => panic!("expected Interrupt event, got {other:?}"),
3547 }
3548 }
3549
3550 #[test]
3555 fn test_hidden_node_filtered_from_bubble_up_interrupt_stream() {
3556 let state = TestState;
3557 let mut nodes = IndexMap::new();
3558 nodes.insert(
3559 "test_node".to_string(),
3560 NodeFnCommand(
3561 |_s: &TestState| -> std::pin::Pin<
3562 Box<
3563 dyn std::future::Future<
3564 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3565 > + Send,
3566 >,
3567 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3568 )
3569 .into_node("test_node"),
3570 );
3571
3572 let trigger_table = TriggerTable::new();
3573 let config = crate::config::RunnableConfig::new();
3574
3575 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3576
3577 let (tx, mut rx) = mpsc::unbounded_channel();
3578 loop_.stream_tx = Some(tx);
3579
3580 let signals = vec![
3582 crate::interrupt::InterruptSignal {
3583 index: 0,
3584 id: Some("int-visible".to_string()),
3585 payload: serde_json::json!({"node": "agent"}),
3586 timestamp: Utc::now(),
3587 },
3588 crate::interrupt::InterruptSignal {
3589 index: 1,
3590 id: Some("int-hidden".to_string()),
3591 payload: serde_json::json!({"node": "__route__"}),
3592 timestamp: Utc::now(),
3593 },
3594 crate::interrupt::InterruptSignal {
3595 index: 2,
3596 id: Some("int-also-visible".to_string()),
3597 payload: serde_json::json!({"node": "review"}),
3598 timestamp: Utc::now(),
3599 },
3600 ];
3601 let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
3602 interrupts: signals,
3603 step: 1,
3604 namespace: vec![],
3605 })];
3606
3607 let _ = loop_.handle_bubble_ups(&bubble_ups);
3608
3609 let mut received_nodes = Vec::new();
3611 while let Ok(event) = rx.try_recv() {
3612 match event {
3613 StreamEvent::Interrupt { node, .. } => received_nodes.push(node),
3614 other => panic!("unexpected event: {other:?}"),
3615 }
3616 }
3617 assert_eq!(
3618 received_nodes,
3619 vec!["agent", "review"],
3620 "hidden node __route__ should be filtered from stream"
3621 );
3622 }
3623
3624 #[test]
3626 fn test_all_hidden_nodes_produce_no_stream_events() {
3627 let state = TestState;
3628 let mut nodes = IndexMap::new();
3629 nodes.insert(
3630 "test_node".to_string(),
3631 NodeFnCommand(
3632 |_s: &TestState| -> std::pin::Pin<
3633 Box<
3634 dyn std::future::Future<
3635 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3636 > + Send,
3637 >,
3638 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3639 )
3640 .into_node("test_node"),
3641 );
3642
3643 let trigger_table = TriggerTable::new();
3644 let config = crate::config::RunnableConfig::new();
3645
3646 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3647
3648 let (tx, mut rx) = mpsc::unbounded_channel();
3649 loop_.stream_tx = Some(tx);
3650
3651 let signals = vec![
3652 crate::interrupt::InterruptSignal {
3653 index: 0,
3654 id: Some("int-h1".to_string()),
3655 payload: serde_json::json!({"node": "__route__"}),
3656 timestamp: Utc::now(),
3657 },
3658 crate::interrupt::InterruptSignal {
3659 index: 1,
3660 id: Some("int-h2".to_string()),
3661 payload: serde_json::json!({"node": "__handler__"}),
3662 timestamp: Utc::now(),
3663 },
3664 ];
3665 let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
3666 interrupts: signals,
3667 step: 1,
3668 namespace: vec![],
3669 })];
3670
3671 let _ = loop_.handle_bubble_ups(&bubble_ups);
3672
3673 assert!(
3675 rx.try_recv().is_err(),
3676 "all-hidden signals should produce no stream events"
3677 );
3678 assert_eq!(loop_.pending_interrupts.len(), 2);
3680 }
3681
3682 #[test]
3687 fn test_effective_durability_defaults_to_sync() {
3688 let state = TestState;
3689 let mut nodes = IndexMap::new();
3690 nodes.insert(
3691 "test_node".to_string(),
3692 NodeFnCommand(
3693 |_s: &TestState| -> std::pin::Pin<
3694 Box<
3695 dyn std::future::Future<
3696 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3697 > + Send,
3698 >,
3699 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3700 )
3701 .into_node("test_node"),
3702 );
3703 let trigger_table = TriggerTable::new();
3704 let config = crate::config::RunnableConfig::new();
3705
3706 let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3707 assert_eq!(
3708 loop_.effective_durability(),
3709 Durability::Sync,
3710 "default durability should be Sync"
3711 );
3712 }
3713
3714 #[tokio::test]
3717 async fn test_durability_exit_skips_superstep_saves_final() {
3718 let state = TestState;
3719
3720 let mut nodes = IndexMap::new();
3721 nodes.insert(
3722 "test_node".to_string(),
3723 NodeFnCommand(
3724 |_s: &TestState| -> std::pin::Pin<
3725 Box<
3726 dyn std::future::Future<
3727 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3728 > + Send,
3729 >,
3730 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3731 )
3732 .into_node("test_node"),
3733 );
3734
3735 let trigger_table = TriggerTable::new();
3736 let mut config = crate::config::RunnableConfig::new();
3737 config.thread_id = Some("test-thread".to_string());
3738 config.durability = Some(Durability::Exit);
3739
3740 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3741
3742 let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3743 let checkpointer = TrackingCheckpointer {
3744 observed: Arc::clone(&observed),
3745 };
3746 loop_.set_checkpointer(Arc::new(checkpointer));
3747
3748 loop_.pending_tasks = vec![PendingTask::pull(
3752 uuid::Uuid::new_v4().to_string(),
3753 "test_node".to_string(),
3754 )];
3755 let _ = loop_.execute_superstep().await;
3756 let _ = loop_.after_tick(SuperstepResult::empty()).await;
3757
3758 let calls = observed
3761 .lock()
3762 .unwrap_or_else(std::sync::PoisonError::into_inner)
3763 .clone();
3764 assert_eq!(
3765 calls.len(),
3766 1,
3767 "Exit mode should save exactly one final checkpoint"
3768 );
3769 assert!(
3770 matches!(
3771 &calls[0],
3772 ObservedCall::Put {
3773 source: crate::checkpoint::CheckpointSource::Loop,
3774 step: 0
3775 }
3776 ),
3777 "Final exit checkpoint should have Loop source at step 0"
3778 );
3779 }
3780
3781 #[tokio::test]
3783 async fn test_durability_sync_saves_superstep_checkpoint() {
3784 let state = TestState;
3785
3786 let mut nodes = IndexMap::new();
3787 nodes.insert(
3788 "test_node".to_string(),
3789 NodeFnCommand(
3790 |_s: &TestState| -> std::pin::Pin<
3791 Box<
3792 dyn std::future::Future<
3793 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3794 > + Send,
3795 >,
3796 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3797 )
3798 .into_node("test_node"),
3799 );
3800
3801 let trigger_table = TriggerTable::new();
3802 let mut config = crate::config::RunnableConfig::new();
3803 config.thread_id = Some("test-thread".to_string());
3804 config.durability = Some(Durability::Sync);
3805
3806 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3807
3808 let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3809 let checkpointer = TrackingCheckpointer {
3810 observed: Arc::clone(&observed),
3811 };
3812 loop_.set_checkpointer(Arc::new(checkpointer));
3813
3814 loop_.pending_tasks = vec![PendingTask::pull(
3816 uuid::Uuid::new_v4().to_string(),
3817 "test_node".to_string(),
3818 )];
3819 let _ = loop_.execute_superstep().await;
3820 let _ = loop_.after_tick(SuperstepResult::empty()).await;
3821
3822 let has_loop_checkpoint = {
3823 let calls = observed
3824 .lock()
3825 .unwrap_or_else(std::sync::PoisonError::into_inner);
3826 calls.iter().any(|c| {
3827 matches!(
3828 c,
3829 ObservedCall::Put {
3830 source: crate::checkpoint::CheckpointSource::Loop,
3831 step: 0,
3832 }
3833 )
3834 })
3835 };
3836 assert!(
3837 has_loop_checkpoint,
3838 "Sync mode should save a Loop checkpoint at step 0"
3839 );
3840 }
3841
3842 #[tokio::test]
3844 async fn test_durability_exit_saves_interrupt_checkpoint() {
3845 let state = TestState;
3846
3847 let mut nodes = IndexMap::new();
3848 nodes.insert(
3849 "test_node".to_string(),
3850 NodeFnCommand(
3851 |_s: &TestState| -> std::pin::Pin<
3852 Box<
3853 dyn std::future::Future<
3854 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3855 > + Send,
3856 >,
3857 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3858 )
3859 .into_node("test_node"),
3860 );
3861
3862 let trigger_table = TriggerTable::new();
3863 let mut config = crate::config::RunnableConfig::new();
3864 config.thread_id = Some("test-thread".to_string());
3865 config.durability = Some(Durability::Exit);
3866
3867 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3868
3869 let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3870 let checkpointer = TrackingCheckpointer {
3871 observed: Arc::clone(&observed),
3872 };
3873 loop_.set_checkpointer(Arc::new(checkpointer));
3874
3875 loop_.pending_interrupts = vec![crate::interrupt::InterruptSignal {
3877 index: 0,
3878 id: Some("int-exit-test".to_string()),
3879 payload: serde_json::json!({"node": "test_node"}),
3880 timestamp: Utc::now(),
3881 }];
3882 loop_.save_interrupt_checkpoint("test_node").await;
3883
3884 let has_interrupt_checkpoint = {
3885 let calls = observed
3886 .lock()
3887 .unwrap_or_else(std::sync::PoisonError::into_inner);
3888 calls.iter().any(|c| {
3889 matches!(
3890 c,
3891 ObservedCall::Put {
3892 source: crate::checkpoint::CheckpointSource::Interrupt { .. },
3893 step: 0,
3894 }
3895 )
3896 })
3897 };
3898 assert!(
3899 has_interrupt_checkpoint,
3900 "Exit mode should still save interrupt checkpoints"
3901 );
3902 }
3903
3904 #[tokio::test]
3910 async fn test_budget_tracker_arc_sharing() {
3911 let state = TestState;
3912 let mut nodes = IndexMap::new();
3913 nodes.insert(
3914 "test_node".to_string(),
3915 NodeFnCommand(
3916 |_s: &TestState| -> std::pin::Pin<
3917 Box<
3918 dyn std::future::Future<
3919 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3920 > + Send,
3921 >,
3922 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3923 )
3924 .into_node("test_node"),
3925 );
3926
3927 let trigger_table = TriggerTable::new();
3928 let budget = crate::pregel::budget::BudgetConfig::new().with_max_tokens(100);
3929 let config = crate::config::RunnableConfig::new().with_budget(budget);
3930
3931 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3932
3933 let tracker_config = loop_.runnable_config.budget.clone().unwrap();
3935 loop_.set_budget_tracker(BudgetTracker::new(tracker_config));
3936
3937 assert!(loop_.budget_tracker.as_ref().unwrap().check().is_none());
3939
3940 if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
3942 tracker.report_model_call(30, 20); }
3944
3945 let usage = loop_.budget_tracker.as_ref().unwrap().current_usage();
3947 assert_eq!(usage.tokens_used, 50);
3948
3949 assert!(loop_.budget_tracker.as_ref().unwrap().check().is_none());
3951
3952 if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
3954 tracker.report_model_call(40, 30); }
3956
3957 assert!(loop_.budget_tracker.as_ref().unwrap().check().is_some());
3959 assert_eq!(
3960 loop_
3961 .budget_tracker
3962 .as_ref()
3963 .unwrap()
3964 .current_usage()
3965 .tokens_used,
3966 120
3967 );
3968
3969 let _ = loop_.tick().unwrap_err();
3971 assert!(loop_.status.is_terminal());
3972 }
3973
3974 #[tokio::test]
3978 async fn test_budget_tracker_cost_via_config() {
3979 let state = TestState;
3980 let mut nodes = IndexMap::new();
3981 nodes.insert(
3982 "test_node".to_string(),
3983 NodeFnCommand(
3984 |_s: &TestState| -> std::pin::Pin<
3985 Box<
3986 dyn std::future::Future<
3987 Output = Result<crate::Command<TestState>, crate::JunctureError>,
3988 > + Send,
3989 >,
3990 > { Box::pin(async move { Ok(crate::Command::end()) }) },
3991 )
3992 .into_node("test_node"),
3993 );
3994
3995 let trigger_table = TriggerTable::new();
3996 let budget = crate::pregel::budget::BudgetConfig::new().with_max_cost_usd(0.01);
3997 let config = crate::config::RunnableConfig::new().with_budget(budget);
3998
3999 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4000
4001 let tracker_config = loop_.runnable_config.budget.clone().unwrap();
4002 loop_.set_budget_tracker(BudgetTracker::new(tracker_config));
4003
4004 if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
4006 tracker.report_cost(0.003);
4007 tracker.report_cost(0.004);
4008 }
4009
4010 let usage = loop_.budget_tracker.as_ref().unwrap().current_usage();
4012 assert!((usage.cost_usd - 0.007).abs() < 0.0001);
4013 assert!(loop_.budget_tracker.as_ref().unwrap().check().is_none());
4014
4015 if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
4017 tracker.report_cost(0.004); }
4019
4020 assert!(loop_.budget_tracker.as_ref().unwrap().check().is_some());
4021
4022 let _ = loop_.tick().unwrap_err();
4024 assert!(loop_.status.is_terminal());
4025 }
4026
4027 #[tokio::test]
4029 async fn test_durability_async_does_not_block() {
4030 let state = TestState;
4031
4032 let mut nodes = IndexMap::new();
4033 nodes.insert(
4034 "test_node".to_string(),
4035 NodeFnCommand(
4036 |_s: &TestState| -> std::pin::Pin<
4037 Box<
4038 dyn std::future::Future<
4039 Output = Result<crate::Command<TestState>, crate::JunctureError>,
4040 > + Send,
4041 >,
4042 > { Box::pin(async move { Ok(crate::Command::end()) }) },
4043 )
4044 .into_node("test_node"),
4045 );
4046
4047 let trigger_table = TriggerTable::new();
4048 let mut config = crate::config::RunnableConfig::new();
4049 config.thread_id = Some("test-thread".to_string());
4050 config.durability = Some(Durability::Async);
4051
4052 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4053
4054 let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
4055 let checkpointer = TrackingCheckpointer {
4056 observed: Arc::clone(&observed),
4057 };
4058 loop_.set_checkpointer(Arc::new(checkpointer));
4059
4060 loop_.pending_tasks = vec![PendingTask::pull(
4062 uuid::Uuid::new_v4().to_string(),
4063 "test_node".to_string(),
4064 )];
4065 let _ = loop_.execute_superstep().await;
4066 let _ = loop_.after_tick(SuperstepResult::empty()).await;
4067
4068 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4071
4072 let has_checkpoint = {
4074 let calls = observed
4075 .lock()
4076 .unwrap_or_else(std::sync::PoisonError::into_inner);
4077 calls.iter().any(|c| {
4078 matches!(
4079 c,
4080 ObservedCall::Put {
4081 source: crate::checkpoint::CheckpointSource::Loop,
4082 step: 0,
4083 }
4084 )
4085 })
4086 };
4087 assert!(
4088 has_checkpoint,
4089 "Async mode should eventually persist the checkpoint via spawned task"
4090 );
4091 }
4092
4093 #[tokio::test]
4098 async fn test_stream_data_emits_custom_events() {
4099 let state = TestState;
4100 let nodes = IndexMap::new();
4101 let trigger_table = TriggerTable::new();
4102 let config = crate::config::RunnableConfig::new();
4103
4104 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4105
4106 let (tx, mut rx) = mpsc::unbounded_channel();
4107 loop_.stream_tx = Some(tx);
4108
4109 let result = SuperstepResult {
4111 task_outputs: vec![TaskOutput {
4112 triggered_fields: vec![],
4113 task_id: "task-1".to_string(),
4114 node_name: "test_node".to_string(),
4115 command: Command::end()
4116 .with_stream_data(serde_json::json!({"event": "first"}))
4117 .with_stream_data(serde_json::json!({"event": "second"})),
4118 duration: std::time::Duration::from_millis(1),
4119 trigger: TaskTrigger::Pull,
4120 error: None,
4121 }],
4122 bubble_ups: Vec::new(),
4123 };
4124
4125 let () = loop_.after_tick(result).await.unwrap();
4126
4127 let mut custom_data = Vec::new();
4129 while let Ok(event) = rx.try_recv() {
4130 if let StreamEvent::Custom { node, data, ns } = event {
4131 assert_eq!(node, "test_node");
4132 assert!(ns.is_empty());
4133 custom_data.push(data);
4134 }
4135 }
4136
4137 assert_eq!(custom_data.len(), 2, "should emit two custom events");
4138 assert_eq!(custom_data[0], serde_json::json!({"event": "first"}));
4139 assert_eq!(custom_data[1], serde_json::json!({"event": "second"}));
4140 }
4141
4142 #[tokio::test]
4144 async fn test_stream_data_empty_produces_no_custom_events() {
4145 let state = TestState;
4146 let nodes = IndexMap::new();
4147 let trigger_table = TriggerTable::new();
4148 let config = crate::config::RunnableConfig::new();
4149
4150 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4151
4152 let (tx, mut rx) = mpsc::unbounded_channel();
4153 loop_.stream_tx = Some(tx);
4154
4155 let result = SuperstepResult {
4157 task_outputs: vec![TaskOutput {
4158 triggered_fields: vec![],
4159 task_id: "task-1".to_string(),
4160 node_name: "test_node".to_string(),
4161 command: Command::end(),
4162 duration: std::time::Duration::from_millis(1),
4163 trigger: TaskTrigger::Pull,
4164 error: None,
4165 }],
4166 bubble_ups: Vec::new(),
4167 };
4168
4169 let () = loop_.after_tick(result).await.unwrap();
4170
4171 while let Ok(event) = rx.try_recv() {
4173 assert!(
4174 !matches!(event, StreamEvent::Custom { .. }),
4175 "no Custom events expected for empty stream_data"
4176 );
4177 }
4178 }
4179
4180 #[tokio::test]
4182 async fn test_stream_data_multiple_tasks() {
4183 let state = TestState;
4184 let nodes = IndexMap::new();
4185 let trigger_table = TriggerTable::new();
4186 let config = crate::config::RunnableConfig::new();
4187
4188 let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4189
4190 let (tx, mut rx) = mpsc::unbounded_channel();
4191 loop_.stream_tx = Some(tx);
4192
4193 let result = SuperstepResult {
4195 task_outputs: vec![
4196 TaskOutput {
4197 triggered_fields: vec![],
4198 task_id: "task-1".to_string(),
4199 node_name: "node_a".to_string(),
4200 command: Command::end().with_stream_data(serde_json::json!("from_a")),
4201 duration: std::time::Duration::from_millis(1),
4202 trigger: TaskTrigger::Pull,
4203 error: None,
4204 },
4205 TaskOutput {
4206 triggered_fields: vec![],
4207 task_id: "task-2".to_string(),
4208 node_name: "node_b".to_string(),
4209 command: Command::end(),
4210 duration: std::time::Duration::from_millis(2),
4211 trigger: TaskTrigger::Pull,
4212 error: None,
4213 },
4214 ],
4215 bubble_ups: Vec::new(),
4216 };
4217
4218 let () = loop_.after_tick(result).await.unwrap();
4219
4220 let mut custom_events = Vec::new();
4222 while let Ok(event) = rx.try_recv() {
4223 if let StreamEvent::Custom { node, data, .. } = event {
4224 custom_events.push((node, data));
4225 }
4226 }
4227
4228 assert_eq!(
4229 custom_events.len(),
4230 1,
4231 "only node_a should emit a custom event"
4232 );
4233 assert_eq!(custom_events[0].0, "node_a");
4234 assert_eq!(custom_events[0].1, serde_json::json!("from_a"));
4235 }
4236}
4237
4238