Skip to main content

juncture_core/pregel/
loop_.rs

1//! Main Pregel execution loop
2//!
3//! This module provides the `PregelLoop` struct that orchestrates graph execution
4//! using the Pregel algorithm with version tracking and task scheduling.
5
6use crate::time::Instant;
7use crate::{
8    JunctureError, Node, State,
9    checkpoint::{
10        Checkpoint, CheckpointMetadata, CheckpointSource, DeltaCounters, generate_checkpoint_id,
11    },
12    edge::TriggerTable,
13    interrupt::should_interrupt,
14    pregel::{
15        budget::BudgetTracker,
16        context::ExecutionContext,
17        durability::Durability,
18        runner::execute_superstep,
19        scheduler::{
20            FieldVersionTracker, VersionsSeen, apply_writes, compute_next_tasks,
21            schedule_error_handlers,
22        },
23        types::{BubbleUp, LoopStatus, PendingTask, SuperstepResult},
24    },
25    state::FieldsChanged,
26    stream::{DebugEvent, StreamEvent},
27};
28use indexmap::IndexMap;
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use tokio::sync::mpsc;
33use tokio_util::sync::CancellationToken;
34
35/// Graceful shutdown control for Pregel execution
36///
37/// Allows external callers to request drain (finish current tasks but don't start new ones).
38/// Checked in `PregelLoop::tick()` before computing next tasks.
39///
40/// # Examples
41///
42/// ```ignore
43/// use juncture_core::pregel::loop_::RunControl;
44///
45/// let run_control = RunControl::new();
46///
47/// // Request drain from another thread
48/// let rc_clone = run_control.clone();
49/// std::thread::spawn(move || {
50///     // After some condition, request drain
51///     rc_clone.request_drain();
52/// });
53///
54/// // In the main loop
55/// if run_control.is_drain_requested() {
56///     // Finish current tasks but don't start new ones
57/// }
58/// ```
59#[derive(Clone, Debug)]
60pub struct RunControl {
61    drain_requested: Arc<AtomicBool>,
62}
63
64impl RunControl {
65    /// Create a new run control instance
66    ///
67    /// # Examples
68    ///
69    /// ```ignore
70    /// use juncture_core::pregel::loop_::RunControl;
71    ///
72    /// let run_control = RunControl::new();
73    /// assert!(!run_control.is_drain_requested());
74    /// ```
75    #[must_use]
76    pub fn new() -> Self {
77        Self {
78            drain_requested: Arc::new(AtomicBool::new(false)),
79        }
80    }
81
82    /// Request drain (finish current tasks but don't start new ones)
83    ///
84    /// This is thread-safe and can be called from any thread.
85    ///
86    /// # Examples
87    ///
88    /// ```ignore
89    /// use juncture_core::pregel::loop_::RunControl;
90    ///
91    /// let run_control = RunControl::new();
92    /// run_control.request_drain();
93    /// assert!(run_control.is_drain_requested());
94    /// ```
95    pub fn request_drain(&self) {
96        self.drain_requested.store(true, Ordering::Release);
97    }
98
99    /// Check if drain has been requested
100    ///
101    /// # Examples
102    ///
103    /// ```ignore
104    /// use juncture_core::pregel::loop_::RunControl;
105    ///
106    /// let run_control = RunControl::new();
107    /// assert!(!run_control.is_drain_requested());
108    /// ```
109    #[must_use]
110    pub fn is_drain_requested(&self) -> bool {
111        self.drain_requested.load(Ordering::Acquire)
112    }
113}
114
115impl Default for RunControl {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121/// Main Pregel execution loop
122///
123/// Orchestrates graph execution using the Pregel algorithm, managing
124/// task scheduling, version tracking, and execution state.
125pub struct PregelLoop<S: State> {
126    /// Current execution state
127    pub state: S,
128
129    /// Graph nodes
130    pub nodes: IndexMap<String, Arc<dyn Node<S>>>,
131
132    /// Trigger table for routing
133    pub trigger_table: TriggerTable<S>,
134
135    /// Field version tracker
136    pub field_versions: FieldVersionTracker,
137
138    /// Versions seen by each node
139    pub versions_seen: VersionsSeen,
140
141    /// Execution configuration
142    pub runnable_config: crate::config::RunnableConfig,
143
144    /// Cancellation token
145    pub cancellation_token: CancellationToken,
146
147    /// Optional stream event sender
148    pub stream_tx: Option<mpsc::UnboundedSender<StreamEvent<S>>>,
149
150    /// Optional checkpoint saver for crash recovery
151    pub checkpointer: Option<Arc<dyn crate::checkpoint::CheckpointSaver>>,
152
153    /// Current step number
154    pub step: usize,
155
156    /// Loop status
157    pub status: LoopStatus,
158
159    /// Pending tasks for next superstep
160    pub pending_tasks: Vec<PendingTask<S>>,
161
162    /// Fields that were changed in the previous superstep and triggered the current one
163    ///
164    /// This tracks which fields should be consumed after each superstep completes.
165    /// The fields are stored from the previous superstep's `apply_writes` result and
166    /// consumed in the current superstep's `after_tick`.
167    previous_superstep_changed_fields: FieldsChanged,
168
169    /// Optional budget tracker (shared with `RunnableConfig` via Arc)
170    budget_tracker: Option<Arc<BudgetTracker>>,
171
172    /// Run control for graceful shutdown
173    run_control: RunControl,
174
175    /// Unique ID for this graph execution
176    run_id: String,
177
178    /// Interrupt signal receiver from the last superstep
179    /// This is stored here so `after_tick` can drain it
180    interrupt_rx: Option<mpsc::UnboundedReceiver<crate::interrupt::InterruptSignal>>,
181
182    /// Interrupt signals captured during execution for checkpoint persistence
183    pending_interrupts: Vec<crate::interrupt::InterruptSignal>,
184
185    /// Scratchpad for tracking processed interrupts and transient data
186    scratchpad: crate::interrupt::Scratchpad,
187
188    /// Channel versions snapshot at the time of the last interrupt.
189    /// Used by `should_interrupt` to prevent infinite interrupt loops when no
190    /// state actually changed between interrupts.
191    interrupt_versions_seen: HashMap<String, u64>,
192
193    /// Superstep start time for duration tracking
194    ///
195    /// Set at the beginning of [`execute_superstep`], read in [`after_tick`].
196    superstep_start: Option<Instant>,
197
198    /// Maps node names to their registered error handler node names.
199    ///
200    /// Extracted from builder metadata during `PregelLoop` construction. When a
201    /// task fails and its node has a handler in this map, the engine creates
202    /// a recovery task targeting the handler instead of canceling all tasks.
203    error_handler_map: HashMap<String, String>,
204
205    /// Cached reverse mapping from trigger source to target nodes.
206    ///
207    /// Built once at construction time since the graph topology never changes
208    /// during execution. Previously rebuilt every superstep causing O(N^2) total
209    /// work for N-node sequential chains.
210    trigger_to_nodes: crate::pregel::scheduler::TriggerToNodes,
211
212    /// Per-node retry policies extracted from builder metadata.
213    ///
214    /// When a node has an entry here, its execution in `execute_superstep` is
215    /// wrapped with [`crate::graph::builder::execute_with_retry`] for automatic
216    /// retries with exponential backoff and jitter.
217    retry_policies: HashMap<String, crate::graph::RetryPolicy>,
218
219    /// Per-node timeout policies extracted from builder metadata.
220    ///
221    /// When a node has an entry here, its execution in `execute_superstep` is
222    /// wrapped with `tokio::time::timeout` using the configured `run_timeout`.
223    /// The timeout wraps the entire execution (including retry attempts when a
224    /// retry policy is also configured).
225    timeout_policies: HashMap<String, crate::pregel::context::TimeoutPolicy>,
226
227    /// Per-channel delta counters tracking updates and supersteps since last full snapshot.
228    ///
229    /// Keys are channel names (e.g. `"field_0"`), values track cumulative write counts.
230    /// Populated from [`FieldsChanged`] after each superstep and reset when a full
231    /// snapshot checkpoint is saved.
232    delta_counters: HashMap<String, DeltaCounters>,
233
234    /// Tracks whether [`finish_all_channels`](Self::finish_all_channels) has been called.
235    ///
236    /// This flag ensures `finish_all_channels()` is only called once per execution,
237    /// even when there are multiple termination paths (interrupt, cancellation, budget,
238    /// recursion limit, etc.). Calling it multiple times would be redundant and could
239    /// cause unexpected behavior with `LastValueAfterFinishChannel` semantics.
240    ///
241    /// Set to `true` after the first call to `finish_all_channels()`, preventing
242    /// subsequent calls on other termination paths.
243    channels_finished: bool,
244}
245
246impl<S: State> std::fmt::Debug for PregelLoop<S> {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        f.debug_struct("PregelLoop")
249            .field("state", &"<state>")
250            .field("nodes", &self.nodes.len())
251            .field("trigger_table", &self.trigger_table)
252            .field("field_versions", &self.field_versions)
253            .field("versions_seen", &self.versions_seen)
254            .field("runnable_config", &self.runnable_config)
255            .field("cancellation_token", &self.cancellation_token)
256            .field("stream_tx", &self.stream_tx.is_some())
257            .field("checkpointer", &self.checkpointer.is_some())
258            .field("step", &self.step)
259            .field("status", &self.status)
260            .field("pending_tasks", &self.pending_tasks)
261            .field(
262                "previous_superstep_changed_fields",
263                &self.previous_superstep_changed_fields,
264            )
265            .field("budget_tracker", &self.budget_tracker.is_some())
266            .field("run_control", &self.run_control)
267            .field("run_id", &self.run_id)
268            .field("interrupt_rx", &self.interrupt_rx.is_some())
269            .field("pending_interrupts", &self.pending_interrupts.len())
270            .field("scratchpad", &self.scratchpad)
271            .field("interrupt_versions_seen", &self.interrupt_versions_seen)
272            .field("superstep_start", &self.superstep_start.is_some())
273            .field("error_handler_map", &self.error_handler_map.len())
274            .field("trigger_to_nodes", &"<cached>")
275            .field(
276                "retry_policies",
277                &self.retry_policies.keys().collect::<Vec<_>>(),
278            )
279            .field(
280                "timeout_policies",
281                &self.timeout_policies.keys().collect::<Vec<_>>(),
282            )
283            .field("delta_counters", &self.delta_counters.len())
284            .field("channels_finished", &self.channels_finished)
285            .finish()
286    }
287}
288
289impl<S: State> PregelLoop<S> {
290    /// Create a new Pregel loop
291    ///
292    /// # Arguments
293    ///
294    /// * `state` - Initial state
295    /// * `nodes` - Graph nodes
296    /// * `trigger_table` - Trigger table for routing
297    /// * `config` - Execution configuration
298    /// * `num_fields` - Number of fields in the state
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if:
303    /// - The trigger table is invalid
304    ///
305    /// # Examples
306    ///
307    /// ```ignore
308    /// use juncture_core::pregel::loop_::PregelLoop;
309    ///
310    /// let loop = PregelLoop::new(
311    ///     initial_state,
312    ///     nodes,
313    ///     trigger_table,
314    ///     config,
315    ///     5, // number of fields
316    /// )?;
317    /// ```
318    pub fn new(
319        state: S,
320        nodes: IndexMap<String, Arc<dyn Node<S>>>,
321        trigger_table: TriggerTable<S>,
322        config: crate::config::RunnableConfig,
323        num_fields: usize,
324    ) -> Result<Self, JunctureError> {
325        Self::with_error_handlers(
326            state,
327            nodes,
328            trigger_table,
329            config,
330            num_fields,
331            HashMap::new(),
332        )
333    }
334
335    /// Create a new Pregel loop with error handler mappings
336    ///
337    /// Like [`new`](Self::new) but accepts a pre-built error handler map
338    /// extracted from builder metadata. Nodes with entries in this map
339    /// will have their failures routed to the named handler instead of
340    /// canceling the entire superstep.
341    ///
342    /// # Arguments
343    ///
344    /// * `state` - Initial state
345    /// * `nodes` - Graph nodes
346    /// * `trigger_table` - Trigger table for routing
347    /// * `config` - Execution configuration
348    /// * `num_fields` - Number of fields in the state
349    /// * `error_handler_map` - Maps node names to error handler node names
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if the trigger table is invalid.
354    pub fn with_error_handlers(
355        state: S,
356        nodes: IndexMap<String, Arc<dyn Node<S>>>,
357        trigger_table: TriggerTable<S>,
358        config: crate::config::RunnableConfig,
359        num_fields: usize,
360        error_handler_map: HashMap<String, String>,
361    ) -> Result<Self, JunctureError> {
362        let node_names: Vec<String> = nodes.keys().cloned().collect();
363        let field_versions = FieldVersionTracker::new(num_fields);
364        let versions_seen = VersionsSeen::new(&node_names, num_fields);
365        let cancellation_token = CancellationToken::new();
366
367        // Initialize pending tasks from entry point
368        let pending_tasks = Self::compute_initial_tasks(&trigger_table);
369
370        // Build reverse trigger mapping once; topology never changes during execution
371        let trigger_to_nodes =
372            crate::pregel::scheduler::TriggerToNodes::from_trigger_table(&trigger_table);
373
374        // Generate unique run ID for this execution
375        let run_id = uuid::Uuid::new_v4().to_string();
376
377        Ok(Self {
378            state,
379            nodes,
380            trigger_table,
381            field_versions,
382            versions_seen,
383            runnable_config: config,
384            cancellation_token,
385            stream_tx: None,
386            checkpointer: None,
387            step: 0,
388            status: LoopStatus::Running,
389            pending_tasks,
390            previous_superstep_changed_fields: FieldsChanged(0),
391            budget_tracker: None,
392            run_control: RunControl::new(),
393            run_id,
394            interrupt_rx: None,
395            pending_interrupts: Vec::new(),
396            scratchpad: crate::interrupt::Scratchpad::new(),
397            interrupt_versions_seen: HashMap::new(),
398            superstep_start: None,
399            error_handler_map,
400            trigger_to_nodes,
401            retry_policies: HashMap::new(),
402            timeout_policies: HashMap::new(),
403            delta_counters: HashMap::new(),
404            channels_finished: false,
405        })
406    }
407
408    /// Set the stream event sender
409    ///
410    /// # Examples
411    ///
412    /// ```ignore
413    /// use tokio::sync::mpsc;
414    ///
415    /// let (tx, _rx) = mpsc::unbounded_channel();
416    /// loop.set_stream_sender(tx);
417    /// ```
418    pub fn set_stream_sender(&mut self, tx: mpsc::UnboundedSender<StreamEvent<S>>) {
419        self.stream_tx = Some(tx);
420    }
421
422    /// Set the checkpoint saver for crash recovery during supersteps
423    pub fn set_checkpointer(&mut self, saver: Arc<dyn crate::checkpoint::CheckpointSaver>) {
424        self.checkpointer = Some(saver);
425    }
426
427    /// Set the budget tracker
428    ///
429    /// Wraps the tracker in an `Arc` so it can be shared between the
430    /// `PregelLoop` (for budget checking) and the `RunnableConfig` (for
431    /// node-level token reporting). Both share the same underlying
432    /// counters via atomic operations.
433    ///
434    /// # Examples
435    ///
436    /// ```ignore
437    /// use juncture_core::pregel::budget::BudgetTracker;
438    ///
439    /// let budget = BudgetTracker::new(BudgetConfig::new());
440    /// loop.set_budget_tracker(budget);
441    /// ```
442    pub fn set_budget_tracker(&mut self, tracker: BudgetTracker) {
443        let shared = Arc::new(tracker);
444        self.runnable_config.budget_tracker = Some(Arc::clone(&shared));
445        self.budget_tracker = Some(shared);
446    }
447
448    /// Set per-node retry policies
449    ///
450    /// Each entry maps a node name to its [`RetryPolicy`]. During superstep
451    /// execution, nodes with a configured policy are wrapped with
452    /// [`crate::graph::builder::execute_with_retry`] for automatic retries
453    /// with exponential backoff and jitter.
454    pub fn set_retry_policies(&mut self, policies: HashMap<String, crate::graph::RetryPolicy>) {
455        self.retry_policies = policies;
456    }
457
458    /// Set per-node timeout policies
459    ///
460    /// Each entry maps a node name to its [`TimeoutPolicy`](crate::pregel::context::TimeoutPolicy).
461    /// During superstep execution, nodes with a configured policy are wrapped with
462    /// `tokio::time::timeout` using the configured `run_timeout`. The timeout wraps
463    /// the entire execution including any retry attempts.
464    pub fn set_timeout_policies(
465        &mut self,
466        policies: HashMap<String, crate::pregel::context::TimeoutPolicy>,
467    ) {
468        self.timeout_policies = policies;
469    }
470
471    /// Compute initial tasks from entry point
472    fn compute_initial_tasks(trigger_table: &TriggerTable<S>) -> Vec<PendingTask<S>> {
473        // Find nodes that have incoming edges from START
474        let mut initial_tasks = Vec::new();
475
476        for (node_name, sources) in &trigger_table.incoming {
477            for source in sources {
478                if let crate::edge::TriggerSource::Edge { from } = source
479                    && from == crate::edge::START
480                {
481                    initial_tasks.push(PendingTask::pull(
482                        uuid::Uuid::new_v4().to_string(),
483                        node_name.clone(),
484                    ));
485                    break;
486                }
487            }
488        }
489
490        initial_tasks
491    }
492
493    /// Execute one tick of the loop
494    ///
495    /// Returns `true` if execution should continue, `false` if done.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if:
500    /// - Recursion limit is reached
501    /// - Cancellation is requested
502    /// - Budget limits are exceeded
503    ///
504    /// # Examples
505    ///
506    /// ```ignore
507    /// while loop.tick()? {
508    ///     let result = loop.execute_superstep().await?;
509    ///     loop.after_tick(result)?;
510    /// }
511    /// ```
512    #[allow(
513        clippy::too_many_lines,
514        reason = "Function contains multiple termination path checks (recursion limit, cancellation, budget, drain, interrupt) with finish_all_channels() calls on each path. Refactoring would reduce clarity by splitting related checks."
515    )]
516    pub fn tick(&mut self) -> Result<bool, JunctureError> {
517        // Create graph invocation span with proper attribute names
518        let span = tracing::info_span!(
519            "juncture.graph.invoke",
520            "juncture.thread.id" = ?std::thread::current().id(),
521            "juncture.step" = self.step,
522            "juncture.recursion.limit" = self.runnable_config.recursion_limit,
523            "juncture.graph.name" = ?self.runnable_config.graph_name,
524            "juncture.run.id" = %self.run_id,
525        );
526        let _enter = span.enter();
527
528        // Check recursion limit
529        if self.step >= self.runnable_config.recursion_limit {
530            self.status = LoopStatus::OutOfSteps;
531            self.emit_counter("juncture.graph.errors", 1);
532            let result: Result<(), JunctureError> = Err(JunctureError::recursion_limit(
533                self.step,
534                self.runnable_config.recursion_limit,
535            ));
536            self.on_graph_end(&result);
537            // Finalize channels before returning error
538            self.finish_all_channels();
539            // Extract the error from the Result for the return value.
540            // This is safe because we just constructed it as Err.
541            let Err(err) = result else {
542                unreachable!("result was constructed as Err");
543            };
544            return Err(err);
545        }
546
547        // Check cancellation
548        if self.cancellation_token.is_cancelled() {
549            self.status = LoopStatus::Cancelled;
550            self.on_graph_end(&Ok(()));
551            // Finalize channels before returning
552            self.finish_all_channels();
553            return Ok(false);
554        }
555
556        // Check budget
557        if let Some(tracker) = &self.budget_tracker
558            && let Some(reason) = tracker.check()
559        {
560            self.status = LoopStatus::BudgetExceeded;
561            self.emit_counter("juncture.graph.errors", 1);
562            let result: Result<(), JunctureError> = Err(JunctureError::execution(format!(
563                "Budget exceeded: {reason}"
564            )));
565            self.on_graph_end(&result);
566            // Finalize channels before returning error
567            self.finish_all_channels();
568            let Err(err) = result else {
569                unreachable!("result was constructed as Err");
570            };
571            return Err(err);
572        }
573
574        // Emit budget gauges when a collector is configured
575        if let Some(ref tracker) = self.budget_tracker {
576            let usage = tracker.current_usage();
577            if let Some(ref budget) = self.runnable_config.budget {
578                if let Some(max_tokens) = budget.max_tokens {
579                    self.emit_gauge(
580                        "juncture.budget.remaining_tokens",
581                        max_tokens.saturating_sub(usage.tokens_used),
582                    );
583                }
584                if let Some(max_cost) = budget.max_cost_usd {
585                    #[allow(
586                        clippy::cast_possible_truncation,
587                        clippy::cast_sign_loss,
588                        reason = "Gauge values are u64; cost is converted to micro-units (6 decimal places) for precision. Truncation is acceptable for gauge display."
589                    )]
590                    let remaining_micro_usd =
591                        ((max_cost - usage.cost_usd).max(0.0) * 1_000_000.0) as u64;
592                    self.emit_gauge("juncture.budget.remaining_cost_usd", remaining_micro_usd);
593                }
594            }
595        }
596
597        // Compute next tasks if pending is empty
598        if self.pending_tasks.is_empty() {
599            // Check if drain is requested - if so, we're done
600            if self.run_control.is_drain_requested() {
601                // Call finish_all_channels() since no more tasks will be scheduled
602                // This ensures LastValueAfterFinishChannel values are made available
603                self.finish_all_channels();
604                self.status = LoopStatus::Done;
605                self.on_graph_end(&Ok(()));
606                return Ok(false);
607            }
608
609            // Try to compute tasks from trigger table
610            // This is a no-op in the current implementation since
611            // compute_next_tasks requires completed tasks
612            // Call finish_all_channels() since compute_next_tasks() would return empty
613            self.finish_all_channels();
614            self.status = LoopStatus::Done;
615            self.on_graph_end(&Ok(()));
616            return Ok(false);
617        }
618
619        // Check interrupt_before before executing next superstep
620        if let Some(ref interrupt_before_nodes) = self.runnable_config.interrupt_before {
621            let interrupt_before_set: HashSet<String> =
622                interrupt_before_nodes.iter().cloned().collect();
623
624            // Build channel versions map for should_interrupt
625            let channel_versions: HashMap<String, u64> = self
626                .field_versions
627                .versions()
628                .iter()
629                .enumerate()
630                .map(|(idx, ver)| (format!("field_{idx}"), *ver))
631                .collect();
632
633            if let Some(signals) = should_interrupt(
634                &self.pending_tasks,
635                &interrupt_before_set,
636                &HashSet::new(), // interrupt_after not checked here
637                &channel_versions,
638                &self.interrupt_versions_seen,
639            ) {
640                self.interrupt_versions_seen = channel_versions;
641                self.pending_interrupts.clone_from(&signals);
642                self.status = LoopStatus::InterruptBefore(signals);
643                // Finalize channels before returning
644                self.finish_all_channels();
645                return Ok(false);
646            }
647        }
648
649        Ok(true)
650    }
651
652    /// Deserialize `state_json` → `state_override` for Send targets.
653    ///
654    /// Send targets carry per-target state as JSON; this converts it to a typed
655    /// override before passing to the runner so each task gets its own state instance.
656    fn resolve_state_json(tasks: &mut [PendingTask<S>]) -> Result<(), JunctureError>
657    where
658        S: serde::de::DeserializeOwned,
659    {
660        for task in tasks {
661            if task.state_override.is_none()
662                && let Some(ref json) = task.state_json
663            {
664                let deserialized = serde_json::from_value::<S>(json.clone()).map_err(|e| {
665                    JunctureError::execution(format!(
666                        "failed to deserialize state_json for task '{}': {e}",
667                        task.node_name
668                    ))
669                })?;
670                task.state_override = Some(deserialized);
671            }
672        }
673        Ok(())
674    }
675
676    /// Execute one superstep
677    ///
678    /// Delegates to [`runner::execute_superstep`] with the current [`step`](Self::step)
679    /// number for observability span attributes.
680    ///
681    /// # Errors
682    ///
683    /// Returns an error if:
684    /// - Task execution fails
685    /// - Cancellation is requested
686    ///
687    /// # Examples
688    ///
689    /// ```ignore
690    /// let result = loop.execute_superstep().await?;
691    /// ```
692    pub async fn execute_superstep(&mut self) -> Result<SuperstepResult<S>, JunctureError>
693    where
694        S: serde::de::DeserializeOwned,
695        S::Update: serde::Serialize,
696    {
697        // Resolve state_json → state_override for Send targets.
698        // Send targets carry per-target state as JSON; deserialize it before
699        // passing to the runner so each task gets its own state instance.
700        Self::resolve_state_json(&mut self.pending_tasks)?;
701
702        // Move state into Arc for zero-copy sharing with spawned tasks.
703        // std::mem::take replaces self.state with Default::default() (free),
704        // avoiding the O(state_size) clone that caused O(N^2) total cost
705        // in scenarios with growing state (e.g., wide_state with append reducer).
706        // After all tasks complete, we recover the state via Arc::try_unwrap.
707        let arc_state: Arc<S> = Arc::new(std::mem::take(&mut self.state));
708        let node_names: Vec<_> = self
709            .pending_tasks
710            .iter()
711            .map(|t| t.node_name.as_str())
712            .collect();
713        let span = tracing::info_span!(
714            "juncture.superstep",
715            step = self.step,
716            num_tasks = self.pending_tasks.len(),
717            "juncture.step.nodes" = ?node_names,
718            "juncture.step.duration_ms" = tracing::field::Empty,
719        );
720        let _enter = span.enter();
721
722        // Store superstep start time for duration tracking in after_tick
723        let start = Instant::now();
724        self.superstep_start = Some(start);
725
726        // Emit SuperstepStart debug event if streaming is configured
727        if let Some(ref tx) = self.stream_tx {
728            let _ = tx.send(StreamEvent::Debug(DebugEvent::SuperstepStart {
729                step: self.step,
730                pending_nodes: node_names
731                    .iter()
732                    .copied()
733                    .map(std::string::ToString::to_string)
734                    .collect(),
735            }));
736        }
737
738        // Emit superstep tasks counter metric
739        let num_tasks = u64::try_from(self.pending_tasks.len()).unwrap_or(u64::MAX);
740        self.emit_counter("juncture.superstep.tasks", num_tasks);
741
742        let (result, interrupt_rx) = execute_superstep(
743            &self.pending_tasks,
744            &arc_state,
745            &self.nodes,
746            &self.runnable_config,
747            &self.cancellation_token,
748            self.checkpointer.as_ref(),
749            &self.pending_interrupts,
750            &self.scratchpad,
751            &self.error_handler_map,
752            &self.retry_policies,
753            &self.timeout_policies,
754            self.step,
755        )
756        .await?;
757
758        // Recover state from Arc. All spawned tasks completed and dropped their
759        // Arc clones, so refcount is 1 and Arc::try_unwrap succeeds without cloning.
760        self.state = match Arc::try_unwrap(arc_state) {
761            Ok(state) => state,
762            Err(arc) => {
763                tracing::warn!(
764                    name: "juncture.state.arc_leak",
765                    step = self.step,
766                    "Arc refcount > 1 after superstep, falling back to clone"
767                );
768                S::clone(&*arc)
769            }
770        };
771
772        // Mark previously pending interrupts as processed in the scratchpad.
773        // This is critical for multi-interrupt scenarios where a node has several
774        // interrupt points and the user resumes with values for only a subset.
775        // On subsequent re-execution, already-handled interrupt positions receive
776        // a Null resume value via match_resume_to_interrupts -> scratchpad.get_null_resume(),
777        // allowing the node to skip past those interrupt points without re-interrupting.
778        for signal in &self.pending_interrupts {
779            if let Some(ref id) = signal.id {
780                self.scratchpad.mark_interrupt_processed(id);
781            }
782        }
783
784        let duration = start.elapsed().as_millis();
785        tracing::Span::current().record("juncture.step.duration_ms", duration);
786
787        // Emit superstep duration histogram metric
788        // duration is u128 from as_millis(), but realistic superstep durations
789        // fit in u64 (millisecond precision, max ~584 million years).
790        let duration_ms = u64::try_from(duration).unwrap_or(u64::MAX);
791        #[allow(
792            clippy::cast_precision_loss,
793            reason = "millisecond durations fit well within f64 precision for histogram recording"
794        )]
795        let duration_f64 = duration_ms as f64;
796        self.emit_histogram("juncture.superstep.duration_ms", duration_f64);
797
798        // Emit superstep duration metric
799        tracing::debug!(
800            name: "juncture.superstep.duration_ms",
801            step = self.step,
802            duration_ms = duration,
803        );
804
805        // Store the interrupt receiver for after_tick to drain
806        // We use Option to allow moving it into after_tick
807        self.interrupt_rx = Some(interrupt_rx);
808
809        Ok(result)
810    }
811
812    /// Process results after a superstep
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if:
817    /// - Task computation fails
818    ///
819    /// # Panics
820    ///
821    /// Panics if a task duration exceeds `u64::MAX` milliseconds (extremely unlikely)
822    ///
823    /// # Examples
824    ///
825    /// ```ignore
826    /// loop.after_tick(result).await?;
827    /// ```
828    #[expect(
829        clippy::too_many_lines,
830        reason = "after_tick orchestrates multiple sequential phases: apply writes, bump versions, consume channels, emit events including stream_data, compute tasks, drain interrupts, check interrupts, finish channels, increment step"
831    )]
832    #[allow(
833        clippy::cognitive_complexity,
834        reason = "after_tick orchestrates multiple sequential phases: apply writes, bump versions, consume channels, emit events including stream_data, compute tasks, drain interrupts, check interrupts, finish channels, increment step"
835    )]
836    pub async fn after_tick(&mut self, result: SuperstepResult<S>) -> Result<(), JunctureError>
837    where
838        S: Clone + serde::Serialize,
839    {
840        // CRITICAL: Capture current field versions BEFORE any mutations (A-001 fix).
841        // versions_seen must record the channel state that existed when tasks
842        // were executing, not after apply_writes() modifies the versions.
843        // This ensures node activation semantics match LangGraph design:
844        // nodes activate based on pre-superstep versions, not post-superstep.
845        let versions_before_apply = self.field_versions.versions().to_vec();
846
847        // Apply writes from completed tasks using path-based deterministic merge order.
848        // apply_writes sorts by trigger type (PULL before PUSH) then by node name / send
849        // index so that concurrent writes to the same field produce a deterministic result
850        // matching LangGraph semantics. It also checks for replace-field conflicts before
851        // applying any writes, so a double-write rejects the entire superstep.
852        let total_changed = apply_writes(
853            &mut self.state,
854            &result.task_outputs,
855            &mut self.field_versions,
856        )?;
857
858        // Increment delta counters for all tracked fields.
859        // DeltaChannel fields get update+superstep increments; other changed
860        // fields get update+superstep increments too, providing a consistent
861        // view of write activity across all channels.
862        self.update_delta_counters(&total_changed);
863
864        // Consume the channels that were triggered by the PREVIOUS superstep's writes.
865        // These are the fields that caused the current superstep's tasks to be scheduled.
866        // We consume these fields after applying the current superstep's writes but before
867        // resetting ephemeral fields, matching LangGraph's consume semantics.
868        //
869        // Note: We consume `previous_superstep_changed_fields` (from the last superstep)
870        // rather than `total_changed` (from this superstep) because:
871        // 1. The current superstep was triggered by writes from the previous superstep
872        // 2. Those triggered fields should be consumed after the current superstep executes
873        // 3. The current superstep's writes will trigger the NEXT superstep and be consumed then
874        let fields_to_consume = self.previous_superstep_changed_fields.clone();
875        self.consume_triggered_channels(&fields_to_consume);
876
877        // Mark versions as consumed using the versions captured BEFORE apply_writes (A-001 fix).
878        // This records the channel state that existed when tasks were executing,
879        // ensuring node activation semantics match the design specification.
880        for task_output in &result.task_outputs {
881            self.versions_seen
882                .mark_consumed(&task_output.node_name, &versions_before_apply);
883        }
884
885        // Reset ephemeral fields
886        self.state.reset_ephemeral();
887
888        // Emit stream events
889        if let Some(ref tx) = self.stream_tx {
890            for task_output in &result.task_outputs {
891                // Emit TaskStart event before TaskEnd (retroactive, but provides task_id info)
892                let start_event = StreamEvent::TaskStart {
893                    node: task_output.node_name.clone(),
894                    task_id: task_output.task_id.clone(),
895                    step: self.step,
896                };
897                let _ = tx.send(start_event);
898
899                // Emit TaskEnd event
900                let end_event = StreamEvent::TaskEnd {
901                    node: task_output.node_name.clone(),
902                    task_id: task_output.task_id.clone(),
903                    step: self.step,
904                    duration_ms: u64::try_from(task_output.duration.as_millis())
905                        .expect("duration should fit in u64"),
906                };
907                let _ = tx.send(end_event);
908
909                // Emit custom stream events from the command's stream_data.
910                // Each entry in stream_data produces one StreamEvent::Custom
911                // tagged with the emitting node name and empty namespace.
912                for data in &task_output.command.stream_data {
913                    let custom_event = StreamEvent::Custom {
914                        node: task_output.node_name.clone(),
915                        data: data.clone(),
916                        ns: Vec::new(),
917                    };
918                    let _ = tx.send(custom_event);
919                }
920
921                // Emit Updates event if the task produced an update
922                if let Some(ref update) = task_output.command.update {
923                    let updates_event = StreamEvent::Updates {
924                        node: task_output.node_name.clone(),
925                        update: update.clone(),
926                        step: self.step,
927                    };
928                    let _ = tx.send(updates_event);
929                }
930            }
931
932            // Emit Values event after all updates applied
933            let values_event = StreamEvent::Values {
934                state: self.state.clone(),
935                step: self.step,
936            };
937            let _ = tx.send(values_event);
938
939            // Emit SuperstepEnd debug event with duration
940            if let Some(superstep_start) = self.superstep_start {
941                let duration_ms =
942                    u64::try_from(superstep_start.elapsed().as_millis()).unwrap_or(u64::MAX);
943                let end_event = StreamEvent::Debug(DebugEvent::SuperstepEnd {
944                    step: self.step,
945                    duration_ms,
946                });
947                let _ = tx.send(end_event);
948            }
949        }
950
951        // Compute next pending tasks (uses cached trigger_to_nodes)
952        self.pending_tasks = compute_next_tasks(
953            &result.task_outputs,
954            &self.trigger_table,
955            &self.trigger_to_nodes,
956            &self.state,
957        )
958        .await?;
959
960        // Schedule error handler recovery tasks for any failed nodes that have
961        // a registered error handler. These tasks run the handler node which
962        // receives the error context and returns a recovery Command.
963        let recovery_tasks =
964            schedule_error_handlers(&result.task_outputs, &self.nodes, &self.error_handler_map);
965        if !recovery_tasks.is_empty() {
966            tracing::debug!(
967                name: "juncture.error_handler.recovery_tasks",
968                step = self.step,
969                count = recovery_tasks.len(),
970                "Scheduling error handler recovery tasks"
971            );
972            self.pending_tasks.extend(recovery_tasks);
973        }
974
975        // Emit EdgeTraversed debug event after computing next tasks
976        if let Some(ref tx) = self.stream_tx {
977            let next_node_names: Vec<String> = self
978                .pending_tasks
979                .iter()
980                .map(|t| t.node_name.clone())
981                .collect();
982            // Emit one EdgeTraversed event per next node
983            for node_name in &next_node_names {
984                let edge_event = StreamEvent::Debug(DebugEvent::EdgeTraversed {
985                    from: "superstep".to_string(),
986                    to: node_name.clone(),
987                    edge_type: "conditional".to_string(),
988                });
989                let _ = tx.send(edge_event);
990            }
991        }
992
993        // Store the current superstep's changed fields for consumption in the next superstep.
994        // The fields changed in THIS superstep will trigger the NEXT superstep's tasks,
995        // and should be consumed after that next superstep completes.
996        self.previous_superstep_changed_fields = total_changed;
997
998        // Save superstep checkpoint after state merge and next-task computation.
999        // This provides crash recovery for normal superstep completion (B-04-002).
1000        // The checkpoint is only saved when no interrupts are pending; interrupt
1001        // paths save their own checkpoint with richer context (pending_interrupts).
1002        self.save_superstep_checkpoint().await;
1003
1004        // Drain interrupt signals from the channel
1005        // These are signals sent by the interrupt!() macro during node execution
1006        let mut node_interrupts = Vec::new();
1007        if let Some(mut rx) = self.interrupt_rx.take() {
1008            // Collect all pending interrupt signals
1009            while let Ok(signal) = rx.try_recv() {
1010                node_interrupts.push(signal);
1011            }
1012        }
1013
1014        // If we received any interrupt signals from nodes, handle them
1015        if !node_interrupts.is_empty() {
1016            self.pending_interrupts.clone_from(&node_interrupts);
1017            self.status = LoopStatus::InterruptAfter(node_interrupts.clone());
1018
1019            // Emit interrupt events to stream (hidden nodes filtered)
1020            self.emit_interrupt_events(&node_interrupts);
1021
1022            // Save checkpoint with Interrupt source for HITL recovery
1023            let node = self.interrupt_node_name().to_string();
1024            self.save_interrupt_checkpoint(&node).await;
1025
1026            // Finalize channels before returning
1027            self.finish_all_channels();
1028            return Ok(());
1029        }
1030
1031        // Handle BubbleUp events from subgraph execution.
1032        if result.has_bubble_ups() && self.handle_bubble_ups(&result.bubble_ups) {
1033            // Save checkpoint with Interrupt source if a BubbleUp interrupt was
1034            // the reason for stopping.
1035            if self.status.is_interrupted() {
1036                let node = self.interrupt_node_name().to_string();
1037                self.save_interrupt_checkpoint(&node).await;
1038            }
1039            // Finalize channels before returning
1040            self.finish_all_channels();
1041            return Ok(());
1042        }
1043
1044        // Check interrupt_after after computing next tasks
1045        if let Some(ref interrupt_after_nodes) = self.runnable_config.interrupt_after {
1046            let interrupt_after_set: HashSet<String> =
1047                interrupt_after_nodes.iter().cloned().collect();
1048
1049            // Build channel versions map for should_interrupt
1050            let channel_versions: HashMap<String, u64> = self
1051                .field_versions
1052                .versions()
1053                .iter()
1054                .enumerate()
1055                .map(|(idx, ver)| (format!("field_{idx}"), *ver))
1056                .collect();
1057
1058            if let Some(signals) = should_interrupt(
1059                &self.pending_tasks,
1060                &HashSet::new(), // interrupt_before not checked here
1061                &interrupt_after_set,
1062                &channel_versions,
1063                &self.interrupt_versions_seen,
1064            ) {
1065                self.interrupt_versions_seen = channel_versions;
1066                self.pending_interrupts.clone_from(&signals);
1067                self.status = LoopStatus::InterruptAfter(signals.clone());
1068
1069                // Emit interrupt events to stream (hidden nodes filtered)
1070                self.emit_interrupt_events(&signals);
1071
1072                // Save checkpoint with Interrupt source for HITL recovery
1073                let node = self.interrupt_node_name().to_string();
1074                self.save_interrupt_checkpoint(&node).await;
1075
1076                // Finalize channels before returning
1077                self.finish_all_channels();
1078                return Ok(());
1079            }
1080        }
1081
1082        // Call finish() on all channels if no more tasks (execution complete)
1083        // This is critical for LastValueAfterFinishChannel which only makes
1084        // its value available after finish() is called.
1085        if self.pending_tasks.is_empty() {
1086            self.finish_all_channels();
1087            // In Exit durability mode, save a checkpoint on graph completion
1088            // so the final state is preserved in durable storage.
1089            if self.effective_durability() == Durability::Exit {
1090                self.save_exit_checkpoint().await;
1091            }
1092        }
1093
1094        // Increment step
1095        self.step += 1;
1096
1097        // Report step to budget tracker
1098        if let Some(ref tracker) = self.budget_tracker {
1099            tracker.report_step();
1100        }
1101
1102        Ok(())
1103    }
1104
1105    /// Process `BubbleUp` events from subgraph execution
1106    ///
1107    /// Handles interrupt propagation, drain propagation, and parent command
1108    /// routing from nested subgraph execution.
1109    ///
1110    /// Returns `true` if the parent loop should stop (interrupt or drain occurred),
1111    /// `false` if execution should continue.
1112    fn handle_bubble_ups(&mut self, bubble_ups: &[BubbleUp<S>]) -> bool {
1113        let mut should_stop = false;
1114
1115        for bubble_up in bubble_ups {
1116            match bubble_up {
1117                BubbleUp::Interrupt(graph_interrupt) => {
1118                    self.handle_bubble_up_interrupt(graph_interrupt);
1119                    should_stop = true;
1120                }
1121                BubbleUp::Drained(drained) => {
1122                    self.handle_bubble_up_drained(drained);
1123                    should_stop = true;
1124                }
1125                BubbleUp::ParentCommand(cmd) => {
1126                    self.handle_bubble_up_parent_command(cmd);
1127                }
1128            }
1129        }
1130
1131        should_stop
1132    }
1133
1134    /// Handle a subgraph interrupt bubbling up to the parent graph
1135    fn handle_bubble_up_interrupt(
1136        &mut self,
1137        graph_interrupt: &crate::pregel::types::GraphInterrupt,
1138    ) {
1139        tracing::debug!(
1140            step = self.step,
1141            num_signals = graph_interrupt.interrupts.len(),
1142            interrupt_step = graph_interrupt.step,
1143            namespace = ?graph_interrupt.namespace,
1144            "Subgraph interrupt bubbling up to parent"
1145        );
1146
1147        self.pending_interrupts
1148            .clone_from(&graph_interrupt.interrupts);
1149        self.status = LoopStatus::InterruptAfter(graph_interrupt.interrupts.clone());
1150
1151        // Emit interrupt events to stream (hidden nodes filtered)
1152        // Use the namespace from the GraphInterrupt to properly attribute
1153        // the interrupt to the subgraph where it originated
1154        self.emit_interrupt_events_with_namespace(
1155            &graph_interrupt.interrupts,
1156            &graph_interrupt.namespace,
1157        );
1158    }
1159
1160    /// Handle a subgraph drain bubbling up to the parent graph
1161    fn handle_bubble_up_drained(&mut self, drained: &crate::pregel::types::GraphDrained) {
1162        tracing::debug!(
1163            step = self.step,
1164            reason = %drained.reason,
1165            "Subgraph drained bubbling up to parent"
1166        );
1167
1168        self.status = LoopStatus::Drained;
1169    }
1170
1171    /// Handle a subgraph parent command bubbling up to the parent graph
1172    fn handle_bubble_up_parent_command(&mut self, parent_cmd: &crate::command::ParentCommand<S>) {
1173        tracing::debug!(
1174            step = self.step,
1175            source_node = %parent_cmd.source_node,
1176            namespace = %parent_cmd.namespace,
1177            goto = ?parent_cmd.command.goto,
1178            "Subgraph parent command bubbling up"
1179        );
1180
1181        if let Some(ref update) = parent_cmd.command.update {
1182            let changed = self.state.try_apply(update.clone());
1183            match changed {
1184                Ok(changed) => self.field_versions.bump_all(&changed),
1185                Err(err) => {
1186                    tracing::warn!(
1187                        name: "juncture.subgraph.parent_command.apply_failed",
1188                        step = self.step,
1189                        source_node = %parent_cmd.source_node,
1190                        namespace = %parent_cmd.namespace,
1191                        error = %err,
1192                        "Failed to apply parent command from subgraph"
1193                    );
1194                }
1195            }
1196        }
1197    }
1198
1199    /// Consume the loop and return the final state
1200    ///
1201    /// # Examples
1202    ///
1203    /// ```ignore
1204    /// let final_state = loop.into_state();
1205    /// ```
1206    #[must_use]
1207    pub fn into_state(self) -> S {
1208        self.state
1209    }
1210
1211    /// Get the current step number
1212    #[must_use]
1213    pub const fn step(&self) -> usize {
1214        self.step
1215    }
1216
1217    /// Get the unique run ID for this execution
1218    #[must_use]
1219    pub fn run_id(&self) -> &str {
1220        &self.run_id
1221    }
1222
1223    /// Get the current status
1224    #[must_use]
1225    pub const fn status(&self) -> &LoopStatus {
1226        &self.status
1227    }
1228
1229    /// Get the pending interrupt signals for checkpoint persistence
1230    #[must_use]
1231    pub fn pending_interrupts(&self) -> &[crate::interrupt::InterruptSignal] {
1232        &self.pending_interrupts
1233    }
1234
1235    /// Get a reference to the scratchpad for interrupt tracking
1236    #[must_use]
1237    pub const fn scratchpad(&self) -> &crate::interrupt::Scratchpad {
1238        &self.scratchpad
1239    }
1240
1241    /// Get a mutable reference to the scratchpad for interrupt tracking
1242    pub const fn scratchpad_mut(&mut self) -> &mut crate::interrupt::Scratchpad {
1243        &mut self.scratchpad
1244    }
1245
1246    /// Check if the loop is still running
1247    #[must_use]
1248    pub const fn is_running(&self) -> bool {
1249        matches!(self.status, LoopStatus::Running)
1250    }
1251
1252    /// Get a clone of the current state without consuming the loop
1253    ///
1254    /// Useful for streaming execution where state snapshots are needed
1255    /// after each superstep without terminating the loop.
1256    #[must_use]
1257    pub fn snapshot_state(&self) -> S
1258    where
1259        S: Clone,
1260    {
1261        self.state.clone()
1262    }
1263
1264    /// Get the run control for graceful shutdown
1265    ///
1266    /// Returns a clone of the run control that can be used to request
1267    /// drain from another thread or context.
1268    ///
1269    /// # Examples
1270    ///
1271    /// ```ignore
1272    /// use juncture_core::pregel::loop_::PregelLoop;
1273    ///
1274    /// let mut loop = PregelLoop::new(...)?;
1275    /// let run_control = loop.run_control();
1276    ///
1277    /// // From another thread
1278    /// std::thread::spawn(move || {
1279    ///     run_control.request_drain();
1280    /// });
1281    /// ```
1282    #[must_use]
1283    pub const fn run_control(&self) -> &RunControl {
1284        &self.run_control
1285    }
1286
1287    /// Get a view of the current execution context
1288    ///
1289    /// Returns an `ExecutionContext` value that provides typed access
1290    /// to the mutable execution state (state, `field_versions`, `versions_seen`).
1291    /// This provides the design-intended separation between mutable context
1292    /// and immutable configuration.
1293    ///
1294    /// Note: Returns a cloned context, not a reference, since `ExecutionContext`
1295    /// is designed to own its data.
1296    ///
1297    /// # Examples
1298    ///
1299    /// ```ignore
1300    /// use juncture_core::pregel::loop_::PregelLoop;
1301    ///
1302    /// let loop = PregelLoop::new(...)?;
1303    /// let context = loop.as_context();
1304    /// let versions = context.field_versions.versions();
1305    /// ```
1306    #[must_use]
1307    #[allow(
1308        clippy::clone_on_copy,
1309        reason = "ExecutionContext requires owned state, not reference"
1310    )]
1311    pub fn as_context(&self) -> ExecutionContext<S>
1312    where
1313        S: Clone,
1314    {
1315        ExecutionContext {
1316            state: self.state.clone(),
1317            field_versions: self.field_versions.clone(),
1318            versions_seen: self.versions_seen.clone(),
1319            pending_writes: vec![],
1320        }
1321    }
1322
1323    /// Get a view of the current execution config
1324    ///
1325    /// Returns an `ExecutionConfig` value that provides typed access
1326    /// to the immutable execution configuration (`recursion_limit`, interrupts, etc.).
1327    /// This provides the design-intended separation between mutable context
1328    /// and immutable configuration.
1329    ///
1330    /// Note: Returns a cloned config, not a reference, since `ExecutionConfig`
1331    /// is designed to own its data.
1332    ///
1333    /// # Examples
1334    ///
1335    /// ```ignore
1336    /// use juncture_core::pregel::loop_::PregelLoop;
1337    ///
1338    /// let loop = PregelLoop::new(...)?;
1339    /// let config = loop.as_config();
1340    /// let limit = config.recursion_limit;
1341    /// ```
1342    #[must_use]
1343    pub fn as_config(&self) -> crate::pregel::context::ExecutionConfig {
1344        crate::pregel::context::ExecutionConfig {
1345            recursion_limit: self.runnable_config.recursion_limit,
1346            interrupt_before: self
1347                .runnable_config
1348                .interrupt_before
1349                .as_ref()
1350                .map_or_else(HashSet::new, |v| v.iter().cloned().collect()),
1351            interrupt_after: self
1352                .runnable_config
1353                .interrupt_after
1354                .as_ref()
1355                .map_or_else(HashSet::new, |v| v.iter().cloned().collect()),
1356            budget: self.runnable_config.budget.clone(),
1357            durability: self.runnable_config.durability.clone().unwrap_or_default(),
1358            retry_policies: std::collections::HashMap::new(),
1359            timeout_policies: std::collections::HashMap::new(),
1360        }
1361    }
1362
1363    /// Save a checkpoint with [`CheckpointSource::Interrupt`] when a checkpointer is configured.
1364    ///
1365    /// Builds a full checkpoint from the current loop state, sets the source to
1366    /// `Interrupt { node }`, and persists it via the checkpointer. Errors are
1367    /// logged but do not propagate -- interrupt checkpointing is best-effort and
1368    /// should not prevent the interrupt from being surfaced to the caller.
1369    ///
1370    /// # Type Parameters
1371    ///
1372    /// Requires `S: serde::Serialize` to serialize the current state into
1373    /// `channel_values` for the checkpoint.
1374    #[allow(
1375        clippy::cognitive_complexity,
1376        clippy::too_many_lines,
1377        reason = "durability match arms and checkpoint construction logic are necessarily complex for handling Sync/Async/Exit modes"
1378    )]
1379    async fn save_interrupt_checkpoint(&mut self, node: &str)
1380    where
1381        S: serde::Serialize,
1382    {
1383        let Some(ref checkpointer) = self.checkpointer else {
1384            return;
1385        };
1386
1387        let channel_values = match serde_json::to_value(&self.state) {
1388            Ok(v) => v,
1389            Err(err) => {
1390                tracing::warn!(
1391                    name: "juncture.checkpoint.interrupt.serialize_failed",
1392                    node = node,
1393                    error = %err,
1394                    "Failed to serialize state for interrupt checkpoint"
1395                );
1396                return;
1397            }
1398        };
1399
1400        let (channel_versions, new_versions, versions_seen) = self.build_checkpoint_versions();
1401
1402        let checkpoint_id = generate_checkpoint_id();
1403        let cp_id_for_event = checkpoint_id.clone();
1404        let created_at = chrono::Utc::now().to_rfc3339();
1405
1406        let checkpoint = Checkpoint {
1407            id: checkpoint_id,
1408            channel_values,
1409            channel_versions,
1410            versions_seen,
1411            pending_tasks: Vec::new(),
1412            pending_sends: Vec::new(),
1413            pending_interrupts: self.pending_interrupts.clone(),
1414            schema_version: S::schema_version(),
1415            created_at,
1416            v: 1,
1417            new_versions,
1418            counters_since_delta_snapshot: self.build_checkpoint_delta_counters(),
1419        };
1420
1421        let metadata = CheckpointMetadata {
1422            source: CheckpointSource::Interrupt {
1423                node: node.to_string(),
1424            },
1425            step: i64::try_from(self.step).unwrap_or(i64::MAX),
1426            writes: HashMap::new(),
1427            parents: HashMap::new(),
1428            run_id: self.run_id.clone(),
1429        };
1430
1431        let cp_config = self.runnable_config.clone();
1432        let stream_tx_clone = self.stream_tx.clone();
1433        match self.effective_durability() {
1434            Durability::Async => {
1435                let step = self.step;
1436                let node_label = node.to_string();
1437                let checkpointer_arc = Arc::clone(checkpointer);
1438                let metadata_for_event = metadata.clone();
1439                tokio::spawn(async move {
1440                    match checkpointer_arc.put(&cp_config, checkpoint, metadata).await {
1441                        Ok(_updated_config) => {
1442                            tracing::info!(
1443                                name: "juncture.checkpoint.put",
1444                                checkpoint_step = step,
1445                                checkpoint_source = "Interrupt",
1446                                "Interrupt checkpoint persisted (async)"
1447                            );
1448                            // Emit checkpoint write metric
1449                            if let Some(ref collector) = cp_config.metrics_collector {
1450                                collector.inc_counter("juncture.checkpoint.writes", 1);
1451                            }
1452                            // Emit CheckpointSaved event to stream
1453                            Self::emit_checkpoint_saved_event(
1454                                stream_tx_clone.as_ref(),
1455                                cp_id_for_event,
1456                                metadata_for_event,
1457                                step,
1458                            );
1459                        }
1460                        Err(err) => {
1461                            tracing::warn!(
1462                                name: "juncture.checkpoint.interrupt.save_failed",
1463                                node = node_label,
1464                                error = %err,
1465                                "Failed to save interrupt checkpoint (async)"
1466                            );
1467                            // Emit checkpoint error metric
1468                            if let Some(ref collector) = cp_config.metrics_collector {
1469                                collector.inc_counter("juncture.checkpoint.errors", 1);
1470                            }
1471                        }
1472                    }
1473                });
1474                self.reset_delta_counters();
1475            }
1476            Durability::Sync | Durability::Exit => {
1477                let metadata_for_event = metadata.clone();
1478                match checkpointer
1479                    .put(&self.runnable_config, checkpoint, metadata)
1480                    .await
1481                {
1482                    Ok(updated_config) => {
1483                        self.runnable_config.checkpoint_id = updated_config.checkpoint_id;
1484                        self.reset_delta_counters();
1485                        // Emit checkpoint write metric
1486                        self.emit_counter("juncture.checkpoint.writes", 1);
1487                        tracing::info!(
1488                            name: "juncture.checkpoint.put",
1489                            checkpoint_id = %self.runnable_config.checkpoint_id.as_deref().unwrap_or("unknown"),
1490                            checkpoint_step = self.step,
1491                            checkpoint_source = "Interrupt",
1492                            "Interrupt checkpoint persisted"
1493                        );
1494                        if let Some(ref cp_id) = self.runnable_config.checkpoint_id {
1495                            self.on_checkpoint_saved(cp_id, self.step);
1496                            // Emit CheckpointSaved event to stream
1497                            Self::emit_checkpoint_saved_event(
1498                                self.stream_tx.as_ref(),
1499                                cp_id.clone(),
1500                                metadata_for_event,
1501                                self.step,
1502                            );
1503                        }
1504                    }
1505                    Err(err) => {
1506                        tracing::warn!(
1507                            name: "juncture.checkpoint.interrupt.save_failed",
1508                            node = node,
1509                            error = %err,
1510                            "Failed to save interrupt checkpoint"
1511                        );
1512                        // Emit checkpoint error metric
1513                        self.emit_counter("juncture.checkpoint.errors", 1);
1514                    }
1515                }
1516            }
1517        }
1518    }
1519
1520    /// Save a checkpoint with [`CheckpointSource::Loop`] after normal superstep completion.
1521    ///
1522    /// This is the second phase of two-phase persistence (B-04-002):
1523    /// - Phase 1: `put_writes()` after each task completes (already in runner)
1524    /// - Phase 2: `put()` after each superstep completes (this method)
1525    ///
1526    /// Called from [`after_tick`](Self::after_tick) after state merge and
1527    /// next-task computation, but before interrupt drain checks. This ensures
1528    /// crash recovery can resume from the last completed superstep rather than
1529    /// replaying from the initial state or last interrupt.
1530    ///
1531    /// No-op if no checkpointer is configured. Errors are logged but do not
1532    /// propagate -- superstep checkpointing is best-effort and must not prevent
1533    /// the graph from continuing execution.
1534    #[allow(
1535        clippy::cognitive_complexity,
1536        clippy::too_many_lines,
1537        reason = "durability match arms and checkpoint construction logic are necessarily complex for handling Sync/Async/Exit modes"
1538    )]
1539    async fn save_superstep_checkpoint(&mut self)
1540    where
1541        S: serde::Serialize,
1542    {
1543        let Some(ref checkpointer) = self.checkpointer else {
1544            return;
1545        };
1546
1547        // In Exit mode, skip normal superstep checkpoints. Only save on
1548        // graph completion or interrupt, where checkpoints are treated as
1549        // the final durable snapshot.
1550        if self.effective_durability() == Durability::Exit {
1551            return;
1552        }
1553
1554        // Evaluate whether delta-channel write counts have exceeded their
1555        // snapshot_frequency thresholds. When no delta channel has crossed
1556        // the threshold, the pending writes from put_writes() are sufficient
1557        // for crash recovery and a full snapshot is unnecessary.
1558        let needs_full_snapshot = self.should_take_full_snapshot();
1559        tracing::debug!(
1560            name = "juncture.checkpoint.superstep.delta_decision",
1561            step = self.step,
1562            needs_full_snapshot = needs_full_snapshot,
1563            "Delta-channel snapshot frequency evaluation"
1564        );
1565
1566        // Skip full checkpoint if not needed - delta writes from put_writes()
1567        // are sufficient for crash recovery.
1568        if !needs_full_snapshot {
1569            tracing::debug!(
1570                name = "juncture.checkpoint.superstep.skipped",
1571                step = self.step,
1572                "Skipped full checkpoint - delta optimization active"
1573            );
1574            return;
1575        }
1576
1577        let channel_values = match serde_json::to_value(&self.state) {
1578            Ok(v) => v,
1579            Err(err) => {
1580                tracing::warn!(
1581                    name: "juncture.checkpoint.superstep.serialize_failed",
1582                    step = self.step,
1583                    error = %err,
1584                    "Failed to serialize state for superstep checkpoint"
1585                );
1586                return;
1587            }
1588        };
1589
1590        let (channel_versions, new_versions, versions_seen) = self.build_checkpoint_versions();
1591
1592        // Serialize pending tasks for crash recovery so the engine knows
1593        // which nodes to execute next after resuming from this checkpoint.
1594        let pending_tasks: Vec<crate::checkpoint::CheckpointPendingTask> = self
1595            .pending_tasks
1596            .iter()
1597            .map(|task| crate::checkpoint::CheckpointPendingTask {
1598                id: task.id.clone(),
1599                node: task.node_name.clone(),
1600                triggers: Vec::new(),
1601                state_override: None,
1602            })
1603            .collect();
1604
1605        let checkpoint_id = generate_checkpoint_id();
1606        let cp_id_for_event = checkpoint_id.clone();
1607        let created_at = chrono::Utc::now().to_rfc3339();
1608
1609        let checkpoint = Checkpoint {
1610            id: checkpoint_id,
1611            channel_values,
1612            channel_versions,
1613            versions_seen,
1614            pending_tasks,
1615            pending_sends: Vec::new(),
1616            pending_interrupts: Vec::new(),
1617            schema_version: S::schema_version(),
1618            created_at,
1619            v: 1,
1620            new_versions,
1621            counters_since_delta_snapshot: self.build_checkpoint_delta_counters(),
1622        };
1623
1624        let metadata = CheckpointMetadata {
1625            source: CheckpointSource::Loop,
1626            step: i64::try_from(self.step).unwrap_or(i64::MAX),
1627            writes: HashMap::new(),
1628            parents: HashMap::new(),
1629            run_id: self.run_id.clone(),
1630        };
1631
1632        let cp_config = self.runnable_config.clone();
1633        let stream_tx_clone = self.stream_tx.clone();
1634        match self.effective_durability() {
1635            Durability::Async => {
1636                let step = self.step;
1637                let checkpointer_arc = Arc::clone(checkpointer);
1638                let metadata_for_event = metadata.clone();
1639                tokio::spawn(async move {
1640                    match checkpointer_arc.put(&cp_config, checkpoint, metadata).await {
1641                        Ok(_updated_config) => {
1642                            tracing::info!(
1643                                name: "juncture.checkpoint.put",
1644                                checkpoint_step = step,
1645                                checkpoint_source = "Loop",
1646                                "Superstep checkpoint persisted (async)"
1647                            );
1648                            // Emit checkpoint write metric
1649                            if let Some(ref collector) = cp_config.metrics_collector {
1650                                collector.inc_counter("juncture.checkpoint.writes", 1);
1651                            }
1652                            // Emit CheckpointSaved event to stream
1653                            Self::emit_checkpoint_saved_event(
1654                                stream_tx_clone.as_ref(),
1655                                cp_id_for_event,
1656                                metadata_for_event,
1657                                step,
1658                            );
1659                        }
1660                        Err(err) => {
1661                            tracing::warn!(
1662                                name: "juncture.checkpoint.superstep.save_failed",
1663                                step = step,
1664                                error = %err,
1665                                "Failed to save superstep checkpoint (async)"
1666                            );
1667                            // Emit checkpoint error metric
1668                            if let Some(ref collector) = cp_config.metrics_collector {
1669                                collector.inc_counter("juncture.checkpoint.errors", 1);
1670                            }
1671                        }
1672                    }
1673                });
1674                self.reset_delta_counters();
1675            }
1676            Durability::Sync | Durability::Exit => {
1677                let metadata_for_event = metadata.clone();
1678                match checkpointer
1679                    .put(&self.runnable_config, checkpoint, metadata)
1680                    .await
1681                {
1682                    Ok(updated_config) => {
1683                        self.runnable_config.checkpoint_id = updated_config.checkpoint_id;
1684                        // Reset delta counters after a successful checkpoint save.
1685                        // The checkpoint now carries the cumulative counters, and a
1686                        // fresh counting window starts for the next checkpoint cycle.
1687                        self.reset_delta_counters();
1688                        // Emit checkpoint write metric
1689                        self.emit_counter("juncture.checkpoint.writes", 1);
1690                        tracing::info!(
1691                            name: "juncture.checkpoint.put",
1692                            checkpoint_id = %self.runnable_config.checkpoint_id.as_deref().unwrap_or("unknown"),
1693                            checkpoint_step = self.step,
1694                            checkpoint_source = "Loop",
1695                            "Superstep checkpoint persisted"
1696                        );
1697                        if let Some(ref cp_id) = self.runnable_config.checkpoint_id {
1698                            self.on_checkpoint_saved(cp_id, self.step);
1699                            // Emit CheckpointSaved event to stream
1700                            Self::emit_checkpoint_saved_event(
1701                                self.stream_tx.as_ref(),
1702                                cp_id.clone(),
1703                                metadata_for_event,
1704                                self.step,
1705                            );
1706                        }
1707                    }
1708                    Err(err) => {
1709                        tracing::warn!(
1710                            name: "juncture.checkpoint.superstep.save_failed",
1711                            step = self.step,
1712                            error = %err,
1713                            "Failed to save superstep checkpoint"
1714                        );
1715                        // Emit checkpoint error metric
1716                        self.emit_counter("juncture.checkpoint.errors", 1);
1717                    }
1718                }
1719            }
1720        }
1721    }
1722
1723    /// Save a pending interrupt checkpoint for `interrupt_before` scenarios.
1724    ///
1725    /// When `tick()` detects an `interrupt_before`, the loop exits immediately
1726    /// (tick is synchronous and cannot call the async checkpointer). The caller
1727    /// should invoke this method after the loop exits when the status is
1728    /// [`LoopStatus::InterruptBefore`].
1729    ///
1730    /// This is a no-op if no checkpointer is configured or if the status is not
1731    /// interrupted.
1732    ///
1733    /// # Type Parameters
1734    ///
1735    /// Requires `S: serde::Serialize` to serialize the current state.
1736    ///
1737    /// # Errors
1738    ///
1739    /// Does not return errors -- checkpoint save failures are logged and the
1740    /// interrupt is still surfaced to the caller.
1741    pub async fn save_pending_interrupt_checkpoint(&mut self)
1742    where
1743        S: serde::Serialize,
1744    {
1745        if !self.status.is_interrupted() || self.checkpointer.is_none() {
1746            return;
1747        }
1748        let node = self.interrupt_node_name().to_string();
1749        self.save_interrupt_checkpoint(&node).await;
1750    }
1751
1752    /// Extract the primary interrupt node name from pending interrupts or loop status.
1753    ///
1754    /// Used for checkpoint source identification. Returns the first interrupt's
1755    /// associated node name, or "unknown" if not available.
1756    fn interrupt_node_name(&self) -> &str {
1757        static UNKNOWN: &str = "unknown";
1758        self.pending_interrupts
1759            .first()
1760            .and_then(|s| s.payload.get("node"))
1761            .and_then(|v| v.as_str())
1762            .unwrap_or(UNKNOWN)
1763    }
1764
1765    /// Convert the current checkpoint namespace into a `Vec<String>` suitable
1766    /// for the `ns` field of [`StreamEvent::Interrupt`].
1767    ///
1768    /// Each [`NamespaceSegment`] contributes only its `node_name`; the
1769    /// invocation UUID is omitted because stream consumers only need the
1770    /// logical nesting path (e.g. `["review", "detail"]`).
1771    fn current_ns(&self) -> Vec<String> {
1772        self.runnable_config
1773            .checkpoint_ns
1774            .as_ref()
1775            .map(|ns| {
1776                ns.segments
1777                    .iter()
1778                    .map(|seg| seg.node_name.clone())
1779                    .collect()
1780            })
1781            .unwrap_or_default()
1782    }
1783
1784    /// Emit interrupt stream events for the given signals, filtering out
1785    /// hidden/internal nodes (names starting and ending with `__`).
1786    ///
1787    /// Hidden nodes represent internal infrastructure (routing, error handling)
1788    /// that should never surface to external stream consumers.
1789    fn emit_interrupt_events(&self, signals: &[crate::interrupt::InterruptSignal]) {
1790        self.emit_interrupt_events_with_namespace(signals, &self.current_ns());
1791    }
1792
1793    /// Emit interrupt stream events with the specified namespace, filtering out
1794    /// hidden/internal nodes (names starting and ending with `__`).
1795    ///
1796    /// This variant is used when handling interrupts that bubbled up from subgraphs,
1797    /// where we need to use the subgraph's namespace rather than the parent's namespace.
1798    ///
1799    /// # Arguments
1800    ///
1801    /// * `signals` - Interrupt signals to emit
1802    /// * `namespace` - Namespace to use for the events (typically from a subgraph)
1803    fn emit_interrupt_events_with_namespace(
1804        &self,
1805        signals: &[crate::interrupt::InterruptSignal],
1806        namespace: &[String],
1807    ) {
1808        let Some(ref tx) = self.stream_tx else {
1809            return;
1810        };
1811
1812        for signal in signals {
1813            let node = signal
1814                .payload
1815                .get("node")
1816                .and_then(|v| v.as_str())
1817                .unwrap_or("unknown");
1818
1819            // Skip hidden/internal nodes from stream emission
1820            // PendingTask doesn't have tags yet, so pass empty slice
1821            let tags: &[String] = &[];
1822            if crate::interrupt::is_hidden_node(node, tags) {
1823                continue;
1824            }
1825
1826            let event = StreamEvent::Interrupt {
1827                node: node.to_string(),
1828                payload: signal.payload.clone(),
1829                resumable: true,
1830                ns: namespace.to_vec(),
1831            };
1832            let _ = tx.send(event);
1833        }
1834    }
1835
1836    /// Finish all channels in the state
1837    ///
1838    /// Called when graph execution completes (no more pending tasks).
1839    /// This allows channels like `LastValueAfterFinishChannel` to finalize
1840    /// their state and make values available to consumers.
1841    ///
1842    /// Only calls `finish_field()` for fields that use the
1843    /// `replace_after_finish` reducer, as indicated by
1844    /// [`State::replace_after_finish_field_indices`]. Other field types
1845    /// have no-op finish semantics.
1846    ///
1847    /// # Examples
1848    ///
1849    /// ```ignore
1850    /// use juncture_core::pregel::loop_::PregelLoop;
1851    ///
1852    /// let mut loop = PregelLoop::new(...)?;
1853    /// // ... execution ...
1854    /// if loop.pending_tasks.is_empty() {
1855    ///     loop.finish_all_channels();
1856    /// }
1857    /// ```
1858    fn finish_all_channels(&mut self) {
1859        // Only finish channels once per execution to avoid duplicate finalization.
1860        // This is called on all termination paths (normal completion, interrupt,
1861        // cancellation, budget exceeded, recursion limit, drain), but we only want
1862        // to execute the actual finish operation once.
1863        if self.channels_finished {
1864            return;
1865        }
1866
1867        for &field_idx in S::replace_after_finish_field_indices() {
1868            self.state.finish_field(field_idx);
1869        }
1870
1871        self.channels_finished = true;
1872    }
1873
1874    /// Consume all channels that were triggered (changed) in the current superstep.
1875    ///
1876    /// Called after `apply_writes()` in `after_tick()` to mark triggered channels
1877    /// as consumed. For ephemeral fields backed by `EphemeralChannel`, this sets
1878    /// the consumed flag. Other channel types (`UntrackedChannel`,
1879    /// `LastValueAfterFinishChannel`, `DeltaChannel`) have no-op consume semantics.
1880    ///
1881    /// Iterates over all field indices and calls `consume_field()` only for fields
1882    /// that changed in the current superstep, as indicated by the `FieldsChanged`
1883    /// bitmask. This matches the design spec where all triggered channels call
1884    /// `consume()` after `apply_writes()`.
1885    fn consume_triggered_channels(&mut self, changed: &crate::FieldsChanged) {
1886        for field_idx in 0..S::field_count() {
1887            if changed.has_field(field_idx) {
1888                self.state.consume_field(field_idx);
1889            }
1890        }
1891    }
1892
1893    // -----------------------------------------------------------------------
1894    // Durability mode helpers (B-03-003)
1895    // -----------------------------------------------------------------------
1896
1897    /// Return the effective durability mode, defaulting to `Sync` when not configured.
1898    #[must_use]
1899    fn effective_durability(&self) -> Durability {
1900        self.runnable_config
1901            .durability
1902            .clone()
1903            .unwrap_or(Durability::Sync)
1904    }
1905
1906    /// Build the channel versions, new versions, and versions seen maps from
1907    /// the current execution state.
1908    ///
1909    /// Returns a tuple of `(channel_versions, new_versions, versions_seen)` for
1910    /// use in checkpoint construction. This refactors duplicate version-building
1911    /// code that appears in both `save_interrupt_checkpoint` and
1912    /// `save_superstep_checkpoint`.
1913    #[must_use]
1914    #[allow(
1915        clippy::type_complexity,
1916        reason = "return type is a direct mapping of the three version maps required by Checkpoint struct; factoring into a named type adds indirection without benefit"
1917    )]
1918    fn build_checkpoint_versions(
1919        &self,
1920    ) -> (
1921        HashMap<String, u64>,
1922        HashMap<String, u64>,
1923        HashMap<String, HashMap<String, u64>>,
1924    ) {
1925        let channel_versions: HashMap<String, u64> = self
1926            .field_versions
1927            .versions()
1928            .iter()
1929            .enumerate()
1930            .map(|(idx, ver)| (format!("field_{idx}"), *ver))
1931            .collect();
1932
1933        let new_versions = channel_versions.clone();
1934
1935        let versions_seen: HashMap<String, HashMap<String, u64>> = self
1936            .nodes
1937            .keys()
1938            .map(|node_name| {
1939                let versions = self.versions_seen.get_versions(node_name);
1940                let map: HashMap<String, u64> = versions
1941                    .iter()
1942                    .enumerate()
1943                    .map(|(idx, ver)| (format!("field_{idx}"), *ver))
1944                    .collect();
1945                (node_name.clone(), map)
1946            })
1947            .collect();
1948
1949        (channel_versions, new_versions, versions_seen)
1950    }
1951
1952    /// Save a final exit checkpoint when running in [`Durability::Exit`] mode.
1953    ///
1954    /// This checkpoint captures the final state after all channels are finished
1955    /// and no more tasks remain. It uses [`CheckpointSource::Loop`] since it
1956    /// represents a normal completion checkpoint, not an interrupt.
1957    ///
1958    /// No-op if no checkpointer is configured. Errors are logged but do not
1959    /// propagate -- exit checkpointing is best-effort.
1960    async fn save_exit_checkpoint(&mut self)
1961    where
1962        S: serde::Serialize,
1963    {
1964        let Some(ref checkpointer) = self.checkpointer else {
1965            return;
1966        };
1967
1968        let channel_values = match serde_json::to_value(&self.state) {
1969            Ok(v) => v,
1970            Err(err) => {
1971                tracing::warn!(
1972                    name: "juncture.checkpoint.exit.serialize_failed",
1973                    step = self.step,
1974                    error = %err,
1975                    "Failed to serialize state for exit checkpoint"
1976                );
1977                return;
1978            }
1979        };
1980
1981        let (channel_versions, new_versions, versions_seen) = self.build_checkpoint_versions();
1982
1983        let pending_tasks: Vec<crate::checkpoint::CheckpointPendingTask> = self
1984            .pending_tasks
1985            .iter()
1986            .map(|task| crate::checkpoint::CheckpointPendingTask {
1987                id: task.id.clone(),
1988                node: task.node_name.clone(),
1989                triggers: Vec::new(),
1990                state_override: None,
1991            })
1992            .collect();
1993
1994        let checkpoint_id = generate_checkpoint_id();
1995        let created_at = chrono::Utc::now().to_rfc3339();
1996
1997        let checkpoint = Checkpoint {
1998            id: checkpoint_id,
1999            channel_values,
2000            channel_versions,
2001            versions_seen,
2002            pending_tasks,
2003            pending_sends: Vec::new(),
2004            pending_interrupts: Vec::new(),
2005            schema_version: S::schema_version(),
2006            created_at,
2007            v: 1,
2008            new_versions,
2009            counters_since_delta_snapshot: HashMap::new(),
2010        };
2011
2012        let metadata = CheckpointMetadata {
2013            source: CheckpointSource::Loop,
2014            step: i64::try_from(self.step).unwrap_or(i64::MAX),
2015            writes: HashMap::new(),
2016            parents: HashMap::new(),
2017            run_id: self.run_id.clone(),
2018        };
2019
2020        let metadata_for_event = metadata.clone();
2021        match checkpointer
2022            .put(&self.runnable_config, checkpoint, metadata)
2023            .await
2024        {
2025            Ok(updated_config) => {
2026                self.runnable_config.checkpoint_id = updated_config.checkpoint_id;
2027                // Emit checkpoint write metric
2028                self.emit_counter("juncture.checkpoint.writes", 1);
2029                tracing::info!(
2030                    name: "juncture.checkpoint.put",
2031                    checkpoint_id = %self.runnable_config.checkpoint_id.as_deref().unwrap_or("unknown"),
2032                    checkpoint_step = self.step,
2033                    checkpoint_source = "Loop",
2034                    "Exit checkpoint persisted"
2035                );
2036                if let Some(ref cp_id) = self.runnable_config.checkpoint_id {
2037                    self.on_checkpoint_saved(cp_id, self.step);
2038                    // Emit CheckpointSaved event to stream
2039                    Self::emit_checkpoint_saved_event(
2040                        self.stream_tx.as_ref(),
2041                        cp_id.clone(),
2042                        metadata_for_event,
2043                        self.step,
2044                    );
2045                }
2046            }
2047            Err(err) => {
2048                tracing::warn!(
2049                    name: "juncture.checkpoint.exit.save_failed",
2050                    step = self.step,
2051                    error = %err,
2052                    "Failed to save exit checkpoint"
2053                );
2054                // Emit checkpoint error metric
2055                self.emit_counter("juncture.checkpoint.errors", 1);
2056            }
2057        }
2058    }
2059
2060    // -----------------------------------------------------------------------
2061    // Delta counter tracking (B-04-001)
2062    // -----------------------------------------------------------------------
2063
2064    /// Increment delta counters after a superstep applies writes.
2065    ///
2066    /// For every field that changed, increment its `updates` counter. For all
2067    /// fields tracked in `delta_counters` (whether or not they changed), increment
2068    /// the `supersteps` counter. This provides a consistent view of write activity
2069    /// that the checkpoint builder consults to decide full-snapshot vs delta.
2070    fn update_delta_counters(&mut self, changed: &crate::FieldsChanged) {
2071        let field_names = S::field_names();
2072        let num_fields = field_names.len().min(self.field_versions.len());
2073
2074        for field_idx in 0..num_fields {
2075            let channel_name = format!("field_{field_idx}");
2076            let entry = self.delta_counters.entry(channel_name).or_default();
2077
2078            // Always increment supersteps for tracked channels
2079            entry.supersteps = entry.supersteps.saturating_add(1);
2080
2081            // Only increment updates for channels that actually changed
2082            if changed.has_field(field_idx) {
2083                entry.updates = entry.updates.saturating_add(1);
2084            }
2085        }
2086    }
2087
2088    /// Build the `counters_since_delta_snapshot` map for checkpoint persistence.
2089    ///
2090    /// Returns a clone of the current delta counters so the checkpoint carries an
2091    /// accurate snapshot of write activity since the last full snapshot.
2092    fn build_checkpoint_delta_counters(&self) -> HashMap<String, DeltaCounters> {
2093        self.delta_counters.clone()
2094    }
2095
2096    /// Decide whether a full snapshot checkpoint should be taken.
2097    ///
2098    /// Checks each `DeltaChannel` field against its configured `snapshot_frequency`.
2099    /// If any field's update count exceeds its frequency, returns `true` to
2100    /// indicate that a full snapshot is needed. Non-DeltaChannel fields are
2101    /// excluded from this decision since they always snapshot fully.
2102    fn should_take_full_snapshot(&self) -> bool {
2103        let specs = S::delta_channel_specs();
2104        if specs.is_empty() {
2105            // No DeltaChannel fields configured -- always take full snapshots
2106            // since there is no delta optimization to apply.
2107            return true;
2108        }
2109
2110        for &(field_idx, frequency) in specs {
2111            let channel_name = format!("field_{field_idx}");
2112            if let Some(counters) = self.delta_counters.get(&channel_name)
2113                && counters.exceeds_frequency(frequency)
2114            {
2115                return true;
2116            }
2117        }
2118
2119        false
2120    }
2121
2122    /// Reset delta counters after a full snapshot checkpoint has been saved.
2123    fn reset_delta_counters(&mut self) {
2124        self.delta_counters.clear();
2125    }
2126
2127    // -----------------------------------------------------------------------
2128    // Metric emission helpers
2129    // -----------------------------------------------------------------------
2130
2131    /// Increment a counter metric if a collector is configured.
2132    #[inline]
2133    fn emit_counter(&self, name: &str, value: u64) {
2134        if let Some(ref collector) = self.runnable_config.metrics_collector {
2135            collector.inc_counter(name, value);
2136        }
2137    }
2138
2139    /// Record a histogram value if a collector is configured.
2140    #[inline]
2141    fn emit_histogram(&self, name: &str, value: f64) {
2142        if let Some(ref collector) = self.runnable_config.metrics_collector {
2143            collector.record_histogram(name, value);
2144        }
2145    }
2146
2147    /// Set a gauge value if a collector is configured.
2148    #[inline]
2149    fn emit_gauge(&self, name: &str, value: u64) {
2150        if let Some(ref collector) = self.runnable_config.metrics_collector {
2151            collector.set_gauge(name, value);
2152        }
2153    }
2154
2155    // -----------------------------------------------------------------------
2156    // Lifecycle callback helpers
2157    // -----------------------------------------------------------------------
2158
2159    /// Invoke graph-end callback if a handler is configured and emit
2160    /// the `juncture.graph.complete` tracing span with execution metrics.
2161    #[inline]
2162    fn on_graph_end(&self, result: &Result<(), JunctureError>) {
2163        // Extract budget metrics for the completion span.
2164        let (total_tokens, cost_usd) = self.budget_tracker.as_ref().map_or((0, 0.0), |tracker| {
2165            let usage = tracker.current_usage();
2166            (usage.tokens_used, usage.cost_usd)
2167        });
2168
2169        let success = result.is_ok();
2170        let span = tracing::info_span!(
2171            "juncture.graph.complete",
2172            total_steps = self.step,
2173            total_tokens = total_tokens,
2174            cost_usd = cost_usd,
2175            success = success,
2176        );
2177        let _enter = span.enter();
2178
2179        tracing::info!("Graph execution completed");
2180
2181        if let Some(ref handler) = self.runnable_config.callback_handler {
2182            handler.on_graph_end(result);
2183        }
2184    }
2185
2186    /// Invoke checkpoint-saved callback if a handler is configured.
2187    #[inline]
2188    fn on_checkpoint_saved(&self, checkpoint_id: &str, step: usize) {
2189        if let Some(ref handler) = self.runnable_config.callback_handler {
2190            handler.on_checkpoint_saved(checkpoint_id, step);
2191        }
2192    }
2193
2194    /// Emit `CheckpointSaved` event to stream if a stream sender is configured.
2195    #[inline]
2196    fn emit_checkpoint_saved_event(
2197        stream_tx: Option<&mpsc::UnboundedSender<StreamEvent<S>>>,
2198        checkpoint_id: String,
2199        metadata: CheckpointMetadata,
2200        step: usize,
2201    ) {
2202        if let Some(tx) = stream_tx {
2203            let _ = tx.send(StreamEvent::CheckpointSaved {
2204                checkpoint_id,
2205                metadata,
2206                step,
2207            });
2208        }
2209    }
2210}
2211
2212#[cfg(test)]
2213mod tests {
2214    use super::*;
2215    use crate::state::FieldVersions;
2216    use crate::{
2217        Command,
2218        node::IntoNode,
2219        node::NodeFnCommand,
2220        pregel::types::{TaskOutput, TaskTrigger},
2221    };
2222    use chrono::Utc;
2223
2224    #[test]
2225    fn test_pregel_loop_creation() {
2226        let state = TestState;
2227        let mut nodes = IndexMap::new();
2228        nodes.insert(
2229            "test_node".to_string(),
2230            NodeFnCommand(
2231                |_s: &TestState| -> std::pin::Pin<
2232                    Box<
2233                        dyn std::future::Future<
2234                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2235                            > + Send,
2236                    >,
2237                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2238            )
2239            .into_node("test_node"),
2240        );
2241
2242        let trigger_table = TriggerTable::new();
2243        let config = crate::config::RunnableConfig::new();
2244
2245        let result = PregelLoop::new(state, nodes, trigger_table, config, 0);
2246        result.unwrap();
2247    }
2248
2249    #[test]
2250    fn test_field_version_tracker() {
2251        let mut tracker = FieldVersionTracker::new(5);
2252
2253        assert_eq!(tracker.get(0), 0);
2254        assert_eq!(tracker.global_max(), 0);
2255
2256        tracker.bump(0);
2257        assert_eq!(tracker.get(0), 1);
2258        assert_eq!(tracker.global_max(), 1);
2259
2260        tracker.bump(2);
2261        assert_eq!(tracker.get(2), 2);
2262        assert_eq!(tracker.global_max(), 2);
2263    }
2264
2265    #[test]
2266    fn test_versions_seen() {
2267        let node_names = vec!["node_a".to_string(), "node_b".to_string()];
2268        let mut seen = VersionsSeen::new(&node_names, 3);
2269
2270        assert!(!seen.should_activate("node_a", &[0], &[0, 0, 0]));
2271
2272        let current = vec![1, 0, 0];
2273        assert!(seen.should_activate("node_a", &[0], &current));
2274
2275        seen.mark_consumed("node_a", &current);
2276        assert!(!seen.should_activate("node_a", &[0], &current));
2277    }
2278
2279    #[test]
2280    fn test_run_control() {
2281        let rc = RunControl::new();
2282        assert!(!rc.is_drain_requested());
2283
2284        rc.request_drain();
2285        assert!(rc.is_drain_requested());
2286    }
2287
2288    #[test]
2289    fn test_run_control_default() {
2290        let rc = RunControl::default();
2291        assert!(!rc.is_drain_requested());
2292    }
2293
2294    #[test]
2295    fn test_handle_bubble_up_interrupt_sets_status() {
2296        let state = TestState;
2297        let mut nodes = IndexMap::new();
2298        nodes.insert(
2299            "test_node".to_string(),
2300            NodeFnCommand(
2301                |_s: &TestState| -> std::pin::Pin<
2302                    Box<
2303                        dyn std::future::Future<
2304                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2305                            > + Send,
2306                    >,
2307                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2308            )
2309            .into_node("test_node"),
2310        );
2311
2312        let trigger_table = TriggerTable::new();
2313        let config = crate::config::RunnableConfig::new();
2314
2315        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2316
2317        let signals = vec![crate::interrupt::InterruptSignal {
2318            index: 0,
2319            id: Some("sub-int-0".to_string()),
2320            payload: serde_json::json!({"node": "subgraph_node"}),
2321            timestamp: Utc::now(),
2322        }];
2323        let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
2324            interrupts: signals,
2325            step: 2,
2326            namespace: vec![],
2327        })];
2328
2329        let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2330
2331        assert!(should_stop);
2332        assert!(loop_.status.is_interrupted());
2333        assert_eq!(loop_.pending_interrupts.len(), 1);
2334        assert_eq!(loop_.pending_interrupts[0].id.as_deref(), Some("sub-int-0"));
2335    }
2336
2337    #[test]
2338    fn test_handle_bubble_up_drained_sets_status() {
2339        let state = TestState;
2340        let mut nodes = IndexMap::new();
2341        nodes.insert(
2342            "test_node".to_string(),
2343            NodeFnCommand(
2344                |_s: &TestState| -> std::pin::Pin<
2345                    Box<
2346                        dyn std::future::Future<
2347                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2348                            > + Send,
2349                    >,
2350                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2351            )
2352            .into_node("test_node"),
2353        );
2354
2355        let trigger_table = TriggerTable::new();
2356        let config = crate::config::RunnableConfig::new();
2357
2358        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2359
2360        let bubble_ups = vec![BubbleUp::Drained(crate::pregel::types::GraphDrained {
2361            reason: "subgraph completed".to_string(),
2362        })];
2363
2364        let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2365
2366        assert!(should_stop);
2367        assert!(loop_.status.is_terminal());
2368        assert!(matches!(loop_.status, LoopStatus::Drained));
2369    }
2370
2371    #[test]
2372    fn test_handle_bubble_up_parent_command_does_not_stop() {
2373        let state = TestState;
2374        let mut nodes = IndexMap::new();
2375        nodes.insert(
2376            "test_node".to_string(),
2377            NodeFnCommand(
2378                |_s: &TestState| -> std::pin::Pin<
2379                    Box<
2380                        dyn std::future::Future<
2381                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2382                            > + Send,
2383                    >,
2384                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2385            )
2386            .into_node("test_node"),
2387        );
2388
2389        let trigger_table = TriggerTable::new();
2390        let config = crate::config::RunnableConfig::new();
2391
2392        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2393
2394        let parent_cmd = crate::command::ParentCommand::from_subgraph(
2395            Command::end(),
2396            "test_subgraph_node",
2397            "test_namespace",
2398        );
2399        let bubble_ups = vec![BubbleUp::ParentCommand(parent_cmd)];
2400
2401        let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2402
2403        assert!(!should_stop);
2404        assert!(loop_.status.is_running());
2405    }
2406
2407    #[test]
2408    fn test_handle_bubble_up_empty_does_nothing() {
2409        let state = TestState;
2410        let mut nodes = IndexMap::new();
2411        nodes.insert(
2412            "test_node".to_string(),
2413            NodeFnCommand(
2414                |_s: &TestState| -> std::pin::Pin<
2415                    Box<
2416                        dyn std::future::Future<
2417                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2418                            > + Send,
2419                    >,
2420                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2421            )
2422            .into_node("test_node"),
2423        );
2424
2425        let trigger_table = TriggerTable::new();
2426        let config = crate::config::RunnableConfig::new();
2427
2428        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2429
2430        let should_stop = loop_.handle_bubble_ups(&[]);
2431
2432        assert!(!should_stop);
2433        assert!(loop_.status.is_running());
2434    }
2435
2436    #[test]
2437    fn test_handle_bubble_up_interrupt_takes_priority_over_drain() {
2438        let state = TestState;
2439        let mut nodes = IndexMap::new();
2440        nodes.insert(
2441            "test_node".to_string(),
2442            NodeFnCommand(
2443                |_s: &TestState| -> std::pin::Pin<
2444                    Box<
2445                        dyn std::future::Future<
2446                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2447                            > + Send,
2448                    >,
2449                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2450            )
2451            .into_node("test_node"),
2452        );
2453
2454        let trigger_table = TriggerTable::new();
2455        let config = crate::config::RunnableConfig::new();
2456
2457        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2458
2459        let bubble_ups = vec![
2460            BubbleUp::Drained(crate::pregel::types::GraphDrained {
2461                reason: "drained".to_string(),
2462            }),
2463            BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
2464                interrupts: vec![crate::interrupt::InterruptSignal {
2465                    index: 0,
2466                    id: None,
2467                    payload: serde_json::Value::Null,
2468                    timestamp: Utc::now(),
2469                }],
2470                step: 1,
2471                namespace: vec![],
2472            }),
2473        ];
2474
2475        let should_stop = loop_.handle_bubble_ups(&bubble_ups);
2476
2477        assert!(should_stop);
2478        // Interrupt is processed last, so status reflects the interrupt
2479        assert!(loop_.status.is_interrupted());
2480    }
2481
2482    #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
2483    struct TestState;
2484
2485    impl State for TestState {
2486        type Update = TestUpdate;
2487        type FieldVersions = FieldVersions;
2488
2489        fn apply(&mut self, _: Self::Update) -> crate::FieldsChanged {
2490            crate::FieldsChanged(0)
2491        }
2492
2493        fn reset_ephemeral(&mut self) {}
2494    }
2495
2496    #[derive(Clone, Debug, Default, serde::Serialize)]
2497    struct TestUpdate;
2498
2499    // --- B-04-001: delta counter tests ---
2500
2501    /// Test state with two fields to exercise delta counter tracking.
2502    #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
2503    struct DeltaTestState {
2504        value: i32,
2505        messages: Vec<String>,
2506    }
2507
2508    #[derive(Clone, Debug, Default, serde::Serialize)]
2509    struct DeltaTestUpdate {
2510        value: Option<i32>,
2511        messages: Option<Vec<String>>,
2512    }
2513
2514    impl State for DeltaTestState {
2515        type Update = DeltaTestUpdate;
2516        type FieldVersions = FieldVersions;
2517
2518        fn apply(&mut self, update: Self::Update) -> crate::FieldsChanged {
2519            let mut changed = crate::FieldsChanged(0);
2520            if let Some(v) = update.value {
2521                self.value = v;
2522                changed.set_field(0);
2523            }
2524            if let Some(msgs) = update.messages {
2525                self.messages.extend(msgs);
2526                changed.set_field(1);
2527            }
2528            changed
2529        }
2530
2531        fn reset_ephemeral(&mut self) {}
2532
2533        fn field_names() -> &'static [&'static str] {
2534            &["value", "messages"]
2535        }
2536
2537        fn field_count() -> usize {
2538            2
2539        }
2540
2541        /// Field 1 (messages) is a `DeltaChannel` with `snapshot_frequency` = 3
2542        fn delta_channel_specs() -> &'static [(usize, usize)] {
2543            &[(1, 3)]
2544        }
2545    }
2546
2547    /// Checkpointer that captures the last saved checkpoint for inspection.
2548    struct CapturingCheckpointer {
2549        captured: Arc<std::sync::Mutex<Option<crate::checkpoint::Checkpoint>>>,
2550    }
2551
2552    #[async_trait::async_trait]
2553    impl crate::checkpoint::CheckpointSaver for CapturingCheckpointer {
2554        async fn get_tuple(
2555            &self,
2556            _: &crate::config::RunnableConfig,
2557        ) -> Result<Option<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
2558        {
2559            Ok(None)
2560        }
2561
2562        async fn list(
2563            &self,
2564            _: &crate::config::RunnableConfig,
2565            _: Option<crate::checkpoint::CheckpointFilter>,
2566        ) -> Result<Vec<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
2567        {
2568            Ok(Vec::new())
2569        }
2570
2571        async fn put(
2572            &self,
2573            _: &crate::config::RunnableConfig,
2574            checkpoint: crate::checkpoint::Checkpoint,
2575            _metadata: crate::checkpoint::CheckpointMetadata,
2576        ) -> Result<crate::config::RunnableConfig, crate::checkpoint::CheckpointError> {
2577            *self
2578                .captured
2579                .lock()
2580                .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(checkpoint);
2581            let mut cfg = crate::config::RunnableConfig::new();
2582            cfg.checkpoint_id = Some("cp-capture".to_string());
2583            Ok(cfg)
2584        }
2585
2586        async fn put_writes(
2587            &self,
2588            _: &crate::config::RunnableConfig,
2589            _: Vec<crate::checkpoint::PendingWrite>,
2590            _: &str,
2591        ) -> Result<(), crate::checkpoint::CheckpointError> {
2592            Ok(())
2593        }
2594    }
2595
2596    /// Verify delta counters are incremented when fields change in a superstep.
2597    #[tokio::test]
2598    async fn test_delta_counters_increment_on_field_change() {
2599        let state = DeltaTestState {
2600            value: 0,
2601            messages: vec![],
2602        };
2603        let mut nodes = IndexMap::new();
2604        nodes.insert(
2605            "test_node".to_string(),
2606            NodeFnCommand(
2607                |_s: &DeltaTestState| -> std::pin::Pin<
2608                    Box<
2609                        dyn std::future::Future<
2610                                Output = Result<
2611                                    crate::Command<DeltaTestState>,
2612                                    crate::JunctureError,
2613                                >,
2614                            > + Send,
2615                    >,
2616                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2617            )
2618            .into_node("test_node"),
2619        );
2620        let trigger_table = TriggerTable::new();
2621        let config = crate::config::RunnableConfig::new();
2622
2623        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2624
2625        // Simulate a superstep where both fields changed
2626        let changed = crate::FieldsChanged(0b11); // both field 0 and 1 changed
2627        loop_.update_delta_counters(&changed);
2628
2629        assert_eq!(loop_.delta_counters.len(), 2, "should track both fields");
2630
2631        let field_0 = loop_
2632            .delta_counters
2633            .get("field_0")
2634            .expect("field_0 should exist");
2635        assert_eq!(field_0.updates, 1, "field_0 should have 1 update");
2636        assert_eq!(field_0.supersteps, 1, "field_0 should have 1 superstep");
2637
2638        let field_1 = loop_
2639            .delta_counters
2640            .get("field_1")
2641            .expect("field_1 should exist");
2642        assert_eq!(field_1.updates, 1, "field_1 should have 1 update");
2643        assert_eq!(field_1.supersteps, 1, "field_1 should have 1 superstep");
2644    }
2645
2646    /// Verify delta counters only increment updates for fields that actually changed.
2647    #[tokio::test]
2648    async fn test_delta_counters_increment_unchanged_fields_get_superstep_only() {
2649        let state = DeltaTestState {
2650            value: 0,
2651            messages: vec![],
2652        };
2653        let mut nodes = IndexMap::new();
2654        nodes.insert(
2655            "test_node".to_string(),
2656            NodeFnCommand(
2657                |_s: &DeltaTestState| -> std::pin::Pin<
2658                    Box<
2659                        dyn std::future::Future<
2660                                Output = Result<
2661                                    crate::Command<DeltaTestState>,
2662                                    crate::JunctureError,
2663                                >,
2664                            > + Send,
2665                    >,
2666                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2667            )
2668            .into_node("test_node"),
2669        );
2670        let trigger_table = TriggerTable::new();
2671        let config = crate::config::RunnableConfig::new();
2672
2673        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2674
2675        // Only field 0 changed, field 1 did not
2676        let changed = crate::FieldsChanged(0b01);
2677        loop_.update_delta_counters(&changed);
2678
2679        let field_0 = loop_
2680            .delta_counters
2681            .get("field_0")
2682            .expect("field_0 should exist");
2683        assert_eq!(field_0.updates, 1, "field_0 should have 1 update");
2684
2685        let field_1 = loop_
2686            .delta_counters
2687            .get("field_1")
2688            .expect("field_1 should exist");
2689        assert_eq!(
2690            field_1.updates, 0,
2691            "field_1 should have 0 updates (not changed)"
2692        );
2693        assert_eq!(
2694            field_1.supersteps, 1,
2695            "field_1 should still have 1 superstep"
2696        );
2697    }
2698
2699    /// Verify delta counters accumulate across multiple supersteps.
2700    #[tokio::test]
2701    async fn test_delta_counters_accumulate_across_supersteps() {
2702        let state = DeltaTestState {
2703            value: 0,
2704            messages: vec![],
2705        };
2706        let mut nodes = IndexMap::new();
2707        nodes.insert(
2708            "test_node".to_string(),
2709            NodeFnCommand(
2710                |_s: &DeltaTestState| -> std::pin::Pin<
2711                    Box<
2712                        dyn std::future::Future<
2713                                Output = Result<
2714                                    crate::Command<DeltaTestState>,
2715                                    crate::JunctureError,
2716                                >,
2717                            > + Send,
2718                    >,
2719                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2720            )
2721            .into_node("test_node"),
2722        );
2723        let trigger_table = TriggerTable::new();
2724        let config = crate::config::RunnableConfig::new();
2725
2726        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2727
2728        // First superstep: field 0 changes
2729        loop_.update_delta_counters(&crate::FieldsChanged(0b01));
2730        // Second superstep: both fields change
2731        loop_.update_delta_counters(&crate::FieldsChanged(0b11));
2732
2733        let field_0 = loop_
2734            .delta_counters
2735            .get("field_0")
2736            .expect("field_0 should exist");
2737        assert_eq!(field_0.updates, 2, "field_0 updated in both supersteps");
2738        assert_eq!(field_0.supersteps, 2, "field_0 has 2 supersteps");
2739
2740        let field_1 = loop_
2741            .delta_counters
2742            .get("field_1")
2743            .expect("field_1 should exist");
2744        assert_eq!(
2745            field_1.updates, 1,
2746            "field_1 updated in only second superstep"
2747        );
2748        assert_eq!(field_1.supersteps, 2, "field_1 has 2 supersteps");
2749    }
2750
2751    /// Verify delta counters are populated in checkpoints and reset after save.
2752    #[tokio::test]
2753    async fn test_delta_counters_populated_in_checkpoint_and_reset() {
2754        let state = DeltaTestState {
2755            value: 0,
2756            messages: vec![],
2757        };
2758        let mut nodes = IndexMap::new();
2759        nodes.insert(
2760            "test_node".to_string(),
2761            NodeFnCommand(
2762                |_s: &DeltaTestState| -> std::pin::Pin<
2763                    Box<
2764                        dyn std::future::Future<
2765                                Output = Result<
2766                                    crate::Command<DeltaTestState>,
2767                                    crate::JunctureError,
2768                                >,
2769                            > + Send,
2770                    >,
2771                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2772            )
2773            .into_node("test_node"),
2774        );
2775        let trigger_table = TriggerTable::new();
2776        let mut config = crate::config::RunnableConfig::new();
2777        config.thread_id = Some("test-thread".to_string());
2778
2779        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2780
2781        let captured: Arc<std::sync::Mutex<Option<crate::checkpoint::Checkpoint>>> =
2782            Arc::new(std::sync::Mutex::new(None));
2783        let checkpointer = CapturingCheckpointer {
2784            captured: Arc::clone(&captured),
2785        };
2786        loop_.set_checkpointer(Arc::new(checkpointer));
2787
2788        // Manually populate delta counters to simulate a prior superstep.
2789        // We do NOT call update_delta_counters here because after_tick will
2790        // call it again, doubling the superstep count. Instead we set the
2791        // counters directly to model a pre-existing counter state.
2792        loop_.delta_counters.insert(
2793            "field_0".to_string(),
2794            DeltaCounters {
2795                updates: 1,
2796                supersteps: 1,
2797            },
2798        );
2799        // Set field_1's updates at the snapshot_frequency threshold (3) so that
2800        // should_take_full_snapshot() returns true and the checkpoint is saved.
2801        loop_.delta_counters.insert(
2802            "field_1".to_string(),
2803            DeltaCounters {
2804                updates: 3,
2805                supersteps: 1,
2806            },
2807        );
2808
2809        // Execute a superstep (empty result -- no writes, but after_tick will
2810        // increment superstep counters for all tracked fields).
2811        loop_.pending_tasks = vec![PendingTask::pull(
2812            uuid::Uuid::new_v4().to_string(),
2813            "test_node".to_string(),
2814        )];
2815        let _ = loop_.execute_superstep().await;
2816        let _ = loop_.after_tick(SuperstepResult::empty()).await;
2817
2818        // Checkpoint should have populated delta counters
2819        let checkpoint = captured
2820            .lock()
2821            .unwrap_or_else(std::sync::PoisonError::into_inner)
2822            .take()
2823            .expect("checkpoint should have been saved");
2824        assert!(
2825            !checkpoint.counters_since_delta_snapshot.is_empty(),
2826            "counters_since_delta_snapshot should be populated"
2827        );
2828        let field_0 = checkpoint
2829            .counters_since_delta_snapshot
2830            .get("field_0")
2831            .expect("field_0 should be in delta counters");
2832        // Pre-existing 1 update + 1 superstep, after_tick adds 0 updates (empty
2833        // result) and 1 superstep via update_delta_counters.
2834        assert_eq!(
2835            field_0.updates, 1,
2836            "field_0 should have 1 update in checkpoint"
2837        );
2838        assert_eq!(
2839            field_0.supersteps, 2,
2840            "field_0 should have 2 supersteps in checkpoint"
2841        );
2842
2843        let field_1 = checkpoint
2844            .counters_since_delta_snapshot
2845            .get("field_1")
2846            .expect("field_1 should be in delta counters");
2847        assert_eq!(
2848            field_1.updates, 3,
2849            "field_1 should have 3 updates in checkpoint"
2850        );
2851
2852        // After checkpoint save, delta counters should be reset
2853        assert!(
2854            loop_.delta_counters.is_empty(),
2855            "delta counters should be reset after checkpoint save"
2856        );
2857    }
2858
2859    /// Verify `should_take_full_snapshot` returns true when no delta channels configured.
2860    #[test]
2861    fn test_should_take_full_snapshot_no_delta_channels() {
2862        // TestState has no delta_channel_specs override (default empty)
2863        let state = TestState;
2864        let mut nodes = IndexMap::new();
2865        nodes.insert(
2866            "test_node".to_string(),
2867            NodeFnCommand(
2868                |_s: &TestState| -> std::pin::Pin<
2869                    Box<
2870                        dyn std::future::Future<
2871                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
2872                            > + Send,
2873                    >,
2874                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2875            )
2876            .into_node("test_node"),
2877        );
2878        let trigger_table = TriggerTable::new();
2879        let config = crate::config::RunnableConfig::new();
2880
2881        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
2882
2883        // With no delta channels, should always take full snapshot
2884        assert!(
2885            loop_.should_take_full_snapshot(),
2886            "should always take full snapshot with no delta channels"
2887        );
2888
2889        // Even with some counters accumulated
2890        loop_.delta_counters.insert(
2891            "field_0".to_string(),
2892            DeltaCounters {
2893                updates: 100,
2894                supersteps: 50,
2895            },
2896        );
2897        assert!(
2898            loop_.should_take_full_snapshot(),
2899            "still full snapshot when specs are empty (no delta optimization)"
2900        );
2901    }
2902
2903    /// Verify `should_take_full_snapshot` respects `snapshot_frequency` for delta channels.
2904    #[test]
2905    fn test_should_take_full_snapshot_respects_frequency() {
2906        let state = DeltaTestState {
2907            value: 0,
2908            messages: vec![],
2909        };
2910        let mut nodes = IndexMap::new();
2911        nodes.insert(
2912            "test_node".to_string(),
2913            NodeFnCommand(
2914                |_s: &DeltaTestState| -> std::pin::Pin<
2915                    Box<
2916                        dyn std::future::Future<
2917                                Output = Result<
2918                                    crate::Command<DeltaTestState>,
2919                                    crate::JunctureError,
2920                                >,
2921                            > + Send,
2922                    >,
2923                > { Box::pin(async move { Ok(crate::Command::end()) }) },
2924            )
2925            .into_node("test_node"),
2926        );
2927        let trigger_table = TriggerTable::new();
2928        let config = crate::config::RunnableConfig::new();
2929
2930        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 2).unwrap();
2931
2932        // Below frequency threshold: field 1 has frequency 3, counters at 2
2933        loop_.delta_counters.insert(
2934            "field_1".to_string(),
2935            DeltaCounters {
2936                updates: 2,
2937                supersteps: 2,
2938            },
2939        );
2940        assert!(
2941            !loop_.should_take_full_snapshot(),
2942            "should not take full snapshot below frequency threshold"
2943        );
2944
2945        // At frequency threshold
2946        loop_.delta_counters.insert(
2947            "field_1".to_string(),
2948            DeltaCounters {
2949                updates: 3,
2950                supersteps: 3,
2951            },
2952        );
2953        assert!(
2954            loop_.should_take_full_snapshot(),
2955            "should take full snapshot at frequency threshold"
2956        );
2957
2958        // Above frequency threshold
2959        loop_.delta_counters.insert(
2960            "field_1".to_string(),
2961            DeltaCounters {
2962                updates: 10,
2963                supersteps: 5,
2964            },
2965        );
2966        assert!(
2967            loop_.should_take_full_snapshot(),
2968            "should take full snapshot above frequency threshold"
2969        );
2970    }
2971
2972    /// Verify `DeltaCounters::exceeds_frequency` edge cases.
2973    #[test]
2974    fn test_delta_counters_exceeds_frequency() {
2975        let counters = DeltaCounters::new();
2976        assert_eq!(counters.updates, 0);
2977        assert_eq!(counters.supersteps, 0);
2978
2979        // Frequency 0 means always snapshot
2980        assert!(
2981            counters.exceeds_frequency(0),
2982            "frequency 0 always snapshots"
2983        );
2984
2985        // Below threshold
2986        let counters = DeltaCounters {
2987            updates: 2,
2988            supersteps: 1,
2989        };
2990        assert!(!counters.exceeds_frequency(3), "2 < 3, not exceeded");
2991
2992        // At threshold
2993        let counters = DeltaCounters {
2994            updates: 3,
2995            supersteps: 1,
2996        };
2997        assert!(counters.exceeds_frequency(3), "3 >= 3, exceeded");
2998
2999        // Above threshold
3000        let counters = DeltaCounters {
3001            updates: 10,
3002            supersteps: 1,
3003        };
3004        assert!(counters.exceeds_frequency(3), "10 >= 3, exceeded");
3005    }
3006
3007    /// Verify that the scratchpad is populated with interrupt IDs after
3008    /// `execute_superstep` processes pending interrupts. This is the core
3009    /// fix for review finding B-06-006: the scratchpad must track which
3010    /// interrupts have been processed so that on re-execution, already-
3011    /// handled interrupt points receive null-resume values instead of
3012    /// re-interrupting.
3013    #[tokio::test]
3014    async fn test_scratchpad_populated_after_execute_superstep() {
3015        let state = TestState;
3016
3017        let mut nodes = IndexMap::new();
3018        nodes.insert(
3019            "test_node".to_string(),
3020            NodeFnCommand(
3021                |_s: &TestState| -> std::pin::Pin<
3022                    Box<
3023                        dyn std::future::Future<
3024                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3025                            > + Send,
3026                    >,
3027                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3028            )
3029            .into_node("test_node"),
3030        );
3031
3032        let trigger_table = TriggerTable::new();
3033        let config = crate::config::RunnableConfig::new();
3034
3035        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3036
3037        // Simulate pending interrupts from a previous cycle
3038        loop_.pending_tasks = vec![PendingTask::pull(
3039            uuid::Uuid::new_v4().to_string(),
3040            "test_node".to_string(),
3041        )];
3042        loop_.pending_interrupts = vec![
3043            crate::interrupt::InterruptSignal {
3044                index: 0,
3045                id: Some("int-alpha".to_string()),
3046                payload: serde_json::Value::Null,
3047                timestamp: Utc::now(),
3048            },
3049            crate::interrupt::InterruptSignal {
3050                index: 1,
3051                id: Some("int-beta".to_string()),
3052                payload: serde_json::Value::Null,
3053                timestamp: Utc::now(),
3054            },
3055        ];
3056
3057        // Before execute_superstep, scratchpad is empty
3058        assert!(
3059            !loop_.scratchpad.is_interrupt_processed("int-alpha"),
3060            "scratchpad should be empty before superstep"
3061        );
3062        assert!(
3063            !loop_.scratchpad.is_interrupt_processed("int-beta"),
3064            "scratchpad should be empty before superstep"
3065        );
3066
3067        let result = loop_.execute_superstep().await;
3068        assert!(result.is_ok(), "execute_superstep should succeed");
3069
3070        // After execute_superstep, pending interrupts are marked as processed
3071        assert!(
3072            loop_.scratchpad.is_interrupt_processed("int-alpha"),
3073            "int-alpha should be marked as processed after superstep"
3074        );
3075        assert!(
3076            loop_.scratchpad.is_interrupt_processed("int-beta"),
3077            "int-beta should be marked as processed after superstep"
3078        );
3079        assert!(
3080            !loop_.scratchpad.is_interrupt_processed("int-gamma"),
3081            "unrelated interrupt should not be marked as processed"
3082        );
3083    }
3084
3085    /// Verify that the scratchpad accumulates across multiple supersteps,
3086    /// so interrupts from different cycles are all tracked.
3087    #[tokio::test]
3088    async fn test_scratchpad_accumulates_across_supersteps() {
3089        let state = TestState;
3090
3091        let mut nodes = IndexMap::new();
3092        nodes.insert(
3093            "test_node".to_string(),
3094            NodeFnCommand(
3095                |_s: &TestState| -> std::pin::Pin<
3096                    Box<
3097                        dyn std::future::Future<
3098                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3099                            > + Send,
3100                    >,
3101                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3102            )
3103            .into_node("test_node"),
3104        );
3105
3106        let trigger_table = TriggerTable::new();
3107        let config = crate::config::RunnableConfig::new();
3108
3109        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3110
3111        // First superstep with interrupt "int-1"
3112        loop_.pending_tasks = vec![PendingTask::pull(
3113            uuid::Uuid::new_v4().to_string(),
3114            "test_node".to_string(),
3115        )];
3116        loop_.pending_interrupts = vec![crate::interrupt::InterruptSignal {
3117            index: 0,
3118            id: Some("int-1".to_string()),
3119            payload: serde_json::Value::Null,
3120            timestamp: Utc::now(),
3121        }];
3122
3123        let _ = loop_.execute_superstep().await;
3124        let _ = loop_.after_tick(SuperstepResult::empty()).await;
3125
3126        // Second superstep with interrupt "int-2"
3127        loop_.pending_tasks = vec![PendingTask::pull(
3128            uuid::Uuid::new_v4().to_string(),
3129            "test_node".to_string(),
3130        )];
3131        loop_.pending_interrupts = vec![crate::interrupt::InterruptSignal {
3132            index: 0,
3133            id: Some("int-2".to_string()),
3134            payload: serde_json::Value::Null,
3135            timestamp: Utc::now(),
3136        }];
3137
3138        let _ = loop_.execute_superstep().await;
3139
3140        // Both interrupt IDs should be tracked
3141        assert!(
3142            loop_.scratchpad.is_interrupt_processed("int-1"),
3143            "int-1 from first superstep should still be tracked"
3144        );
3145        assert!(
3146            loop_.scratchpad.is_interrupt_processed("int-2"),
3147            "int-2 from second superstep should be tracked"
3148        );
3149    }
3150
3151    // --- B-04-002: superstep checkpoint tests ---
3152
3153    /// Observed checkpointer call for test assertions
3154    #[derive(Clone, Debug, PartialEq, Eq)]
3155    enum ObservedCall {
3156        Put {
3157            source: crate::checkpoint::CheckpointSource,
3158            step: i64,
3159        },
3160    }
3161
3162    /// Mock checkpointer that records `put()` calls for test verification
3163    struct TrackingCheckpointer {
3164        observed: Arc<std::sync::Mutex<Vec<ObservedCall>>>,
3165    }
3166
3167    #[async_trait::async_trait]
3168    impl crate::checkpoint::CheckpointSaver for TrackingCheckpointer {
3169        async fn get_tuple(
3170            &self,
3171            _: &crate::config::RunnableConfig,
3172        ) -> Result<Option<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
3173        {
3174            Ok(None)
3175        }
3176
3177        async fn list(
3178            &self,
3179            _: &crate::config::RunnableConfig,
3180            _: Option<crate::checkpoint::CheckpointFilter>,
3181        ) -> Result<Vec<crate::checkpoint::CheckpointTuple>, crate::checkpoint::CheckpointError>
3182        {
3183            Ok(Vec::new())
3184        }
3185
3186        async fn put(
3187            &self,
3188            _: &crate::config::RunnableConfig,
3189            _checkpoint: crate::checkpoint::Checkpoint,
3190            metadata: crate::checkpoint::CheckpointMetadata,
3191        ) -> Result<crate::config::RunnableConfig, crate::checkpoint::CheckpointError> {
3192            self.observed
3193                .lock()
3194                .unwrap_or_else(std::sync::PoisonError::into_inner)
3195                .push(ObservedCall::Put {
3196                    source: metadata.source,
3197                    step: metadata.step,
3198                });
3199            let mut cfg = crate::config::RunnableConfig::new();
3200            cfg.checkpoint_id = Some("cp-test".to_string());
3201            Ok(cfg)
3202        }
3203
3204        async fn put_writes(
3205            &self,
3206            _: &crate::config::RunnableConfig,
3207            _: Vec<crate::checkpoint::PendingWrite>,
3208            _: &str,
3209        ) -> Result<(), crate::checkpoint::CheckpointError> {
3210            Ok(())
3211        }
3212    }
3213
3214    /// Verify that `after_tick` saves a checkpoint with `CheckpointSource::Loop`
3215    /// after a normal (non-interrupt) superstep completes (B-04-002).
3216    #[tokio::test]
3217    async fn test_superstep_checkpoint_saved_on_normal_completion() {
3218        let state = TestState;
3219
3220        let mut nodes = IndexMap::new();
3221        nodes.insert(
3222            "test_node".to_string(),
3223            NodeFnCommand(
3224                |_s: &TestState| -> std::pin::Pin<
3225                    Box<
3226                        dyn std::future::Future<
3227                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3228                            > + Send,
3229                    >,
3230                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3231            )
3232            .into_node("test_node"),
3233        );
3234
3235        let trigger_table = TriggerTable::new();
3236        let mut config = crate::config::RunnableConfig::new();
3237        config.thread_id = Some("test-thread".to_string());
3238
3239        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3240
3241        let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3242        let checkpointer = TrackingCheckpointer {
3243            observed: Arc::clone(&observed),
3244        };
3245        loop_.set_checkpointer(Arc::new(checkpointer));
3246
3247        // Execute one superstep (no interrupts)
3248        loop_.pending_tasks = vec![PendingTask::pull(
3249            uuid::Uuid::new_v4().to_string(),
3250            "test_node".to_string(),
3251        )];
3252
3253        let _ = loop_.execute_superstep().await;
3254        let _ = loop_.after_tick(SuperstepResult::empty()).await;
3255
3256        // Verify a checkpoint with Loop source was saved
3257        let has_loop_checkpoint = {
3258            let calls = observed
3259                .lock()
3260                .unwrap_or_else(std::sync::PoisonError::into_inner);
3261            calls.iter().any(|c| {
3262                matches!(
3263                    c,
3264                    ObservedCall::Put {
3265                        source: crate::checkpoint::CheckpointSource::Loop,
3266                        step: 0,
3267                    }
3268                )
3269            })
3270        };
3271        assert!(has_loop_checkpoint, "expected a Loop checkpoint at step 0");
3272    }
3273
3274    /// Verify that superstep checkpoint is saved at the correct step number
3275    /// across multiple supersteps (B-04-002).
3276    #[tokio::test]
3277    async fn test_superstep_checkpoint_step_increments() {
3278        let state = TestState;
3279
3280        let mut nodes = IndexMap::new();
3281        nodes.insert(
3282            "test_node".to_string(),
3283            NodeFnCommand(
3284                |_s: &TestState| -> std::pin::Pin<
3285                    Box<
3286                        dyn std::future::Future<
3287                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3288                            > + Send,
3289                    >,
3290                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3291            )
3292            .into_node("test_node"),
3293        );
3294
3295        let trigger_table = TriggerTable::new();
3296        let mut config = crate::config::RunnableConfig::new();
3297        config.thread_id = Some("test-thread".to_string());
3298
3299        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3300
3301        let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3302        let checkpointer = TrackingCheckpointer {
3303            observed: Arc::clone(&observed),
3304        };
3305        loop_.set_checkpointer(Arc::new(checkpointer));
3306
3307        // First superstep at step 0
3308        loop_.pending_tasks = vec![PendingTask::pull(
3309            uuid::Uuid::new_v4().to_string(),
3310            "test_node".to_string(),
3311        )];
3312        let _ = loop_.execute_superstep().await;
3313        let _ = loop_.after_tick(SuperstepResult::empty()).await;
3314
3315        // Second superstep at step 1
3316        loop_.pending_tasks = vec![PendingTask::pull(
3317            uuid::Uuid::new_v4().to_string(),
3318            "test_node".to_string(),
3319        )];
3320        let _ = loop_.execute_superstep().await;
3321        let _ = loop_.after_tick(SuperstepResult::empty()).await;
3322
3323        let loop_steps: Vec<i64> = {
3324            let calls = observed
3325                .lock()
3326                .unwrap_or_else(std::sync::PoisonError::into_inner);
3327            calls
3328                .iter()
3329                .filter_map(|c| match c {
3330                    ObservedCall::Put {
3331                        source: crate::checkpoint::CheckpointSource::Loop,
3332                        step,
3333                    } => Some(*step),
3334                    ObservedCall::Put { .. } => None,
3335                })
3336                .collect()
3337        };
3338
3339        assert_eq!(
3340            loop_steps,
3341            vec![0, 1],
3342            "expected Loop checkpoints at steps 0 and 1, got: {loop_steps:?}"
3343        );
3344    }
3345
3346    /// Verify that NO superstep checkpoint is saved when no checkpointer is configured
3347    /// (B-04-002 -- should be a silent no-op).
3348    #[tokio::test]
3349    async fn test_superstep_checkpoint_noop_without_checkpointer() {
3350        let state = TestState;
3351
3352        let mut nodes = IndexMap::new();
3353        nodes.insert(
3354            "test_node".to_string(),
3355            NodeFnCommand(
3356                |_s: &TestState| -> std::pin::Pin<
3357                    Box<
3358                        dyn std::future::Future<
3359                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3360                            > + Send,
3361                    >,
3362                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3363            )
3364            .into_node("test_node"),
3365        );
3366
3367        let trigger_table = TriggerTable::new();
3368        let config = crate::config::RunnableConfig::new();
3369
3370        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3371        assert!(
3372            loop_.checkpointer.is_none(),
3373            "no checkpointer should be configured by default"
3374        );
3375
3376        // Execute one superstep without checkpointer -- should succeed without error
3377        loop_.pending_tasks = vec![PendingTask::pull(
3378            uuid::Uuid::new_v4().to_string(),
3379            "test_node".to_string(),
3380        )];
3381
3382        let result = loop_.execute_superstep().await;
3383        assert!(result.is_ok(), "execute_superstep should succeed");
3384
3385        let after_result = loop_.after_tick(SuperstepResult::empty()).await;
3386        assert!(
3387            after_result.is_ok(),
3388            "after_tick should succeed without checkpointer"
3389        );
3390    }
3391
3392    // --- B-06-003: current_ns tests ---
3393
3394    #[test]
3395    fn test_current_ns_empty_when_no_checkpoint_ns() {
3396        let state = TestState;
3397        let mut nodes = IndexMap::new();
3398        nodes.insert(
3399            "test_node".to_string(),
3400            NodeFnCommand(
3401                |_s: &TestState| -> std::pin::Pin<
3402                    Box<
3403                        dyn std::future::Future<
3404                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3405                            > + Send,
3406                    >,
3407                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3408            )
3409            .into_node("test_node"),
3410        );
3411        let trigger_table = TriggerTable::new();
3412        let config = crate::config::RunnableConfig::new();
3413
3414        let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3415        assert!(
3416            loop_.current_ns().is_empty(),
3417            "root-level graph should have empty ns"
3418        );
3419    }
3420
3421    #[test]
3422    fn test_current_ns_extracts_node_names_from_checkpoint_ns() {
3423        let state = TestState;
3424        let mut nodes = IndexMap::new();
3425        nodes.insert(
3426            "test_node".to_string(),
3427            NodeFnCommand(
3428                |_s: &TestState| -> std::pin::Pin<
3429                    Box<
3430                        dyn std::future::Future<
3431                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3432                            > + Send,
3433                    >,
3434                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3435            )
3436            .into_node("test_node"),
3437        );
3438        let trigger_table = TriggerTable::new();
3439        let config = crate::config::RunnableConfig::new().with_checkpoint_ns(
3440            crate::checkpoint::CheckpointNamespace::new(vec![
3441                crate::checkpoint::NamespaceSegment::new(
3442                    "review".to_string(),
3443                    "uuid-1".to_string(),
3444                ),
3445                crate::checkpoint::NamespaceSegment::new(
3446                    "detail".to_string(),
3447                    "uuid-2".to_string(),
3448                ),
3449            ]),
3450        );
3451
3452        let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3453        let ns = loop_.current_ns();
3454        assert_eq!(ns, vec!["review", "detail"]);
3455    }
3456
3457    #[test]
3458    fn test_current_ns_single_segment() {
3459        let state = TestState;
3460        let mut nodes = IndexMap::new();
3461        nodes.insert(
3462            "test_node".to_string(),
3463            NodeFnCommand(
3464                |_s: &TestState| -> std::pin::Pin<
3465                    Box<
3466                        dyn std::future::Future<
3467                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3468                            > + Send,
3469                    >,
3470                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3471            )
3472            .into_node("test_node"),
3473        );
3474        let trigger_table = TriggerTable::new();
3475        let config = crate::config::RunnableConfig::new().with_checkpoint_ns(
3476            crate::checkpoint::CheckpointNamespace::new(vec![
3477                crate::checkpoint::NamespaceSegment::new(
3478                    "agent".to_string(),
3479                    "uuid-single".to_string(),
3480                ),
3481            ]),
3482        );
3483
3484        let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3485        let ns = loop_.current_ns();
3486        assert_eq!(ns, vec!["agent"]);
3487    }
3488
3489    /// Verify that a bubble-up interrupt emitted to the stream carries the
3490    /// namespace from the execution context (fix for B-06-003).
3491    #[test]
3492    fn test_bubble_up_interrupt_emits_ns_from_checkpoint_ns() {
3493        let state = TestState;
3494        let mut nodes = IndexMap::new();
3495        nodes.insert(
3496            "test_node".to_string(),
3497            NodeFnCommand(
3498                |_s: &TestState| -> std::pin::Pin<
3499                    Box<
3500                        dyn std::future::Future<
3501                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3502                            > + Send,
3503                    >,
3504                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3505            )
3506            .into_node("test_node"),
3507        );
3508
3509        let trigger_table = TriggerTable::new();
3510        let checkpoint_ns = crate::checkpoint::CheckpointNamespace::new(vec![
3511            crate::checkpoint::NamespaceSegment::new(
3512                "review".to_string(),
3513                "uuid-parent".to_string(),
3514            ),
3515        ]);
3516        let config = crate::config::RunnableConfig::new().with_checkpoint_ns(checkpoint_ns);
3517
3518        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3519
3520        // Attach a stream receiver to capture emitted events
3521        let (tx, mut rx) = mpsc::unbounded_channel();
3522        loop_.stream_tx = Some(tx);
3523
3524        let signals = vec![crate::interrupt::InterruptSignal {
3525            index: 0,
3526            id: Some("int-ns-0".to_string()),
3527            payload: serde_json::json!({"node": "child_node"}),
3528            timestamp: Utc::now(),
3529        }];
3530        let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
3531            interrupts: signals,
3532            step: 1,
3533            namespace: vec!["review".to_string()],
3534        })];
3535
3536        let _ = loop_.handle_bubble_ups(&bubble_ups);
3537
3538        // The emitted event should carry the checkpoint namespace
3539        let event = rx
3540            .try_recv()
3541            .expect("should have received an interrupt event");
3542        match event {
3543            StreamEvent::Interrupt { ns, .. } => {
3544                assert_eq!(ns, vec!["review"]);
3545            }
3546            other => panic!("expected Interrupt event, got {other:?}"),
3547        }
3548    }
3549
3550    // --- B-06-005: HIDDEN_TAG stream filtering tests ---
3551
3552    /// Verify that hidden nodes (names starting/ending with `__`) are filtered
3553    /// from bubble-up interrupt stream events.
3554    #[test]
3555    fn test_hidden_node_filtered_from_bubble_up_interrupt_stream() {
3556        let state = TestState;
3557        let mut nodes = IndexMap::new();
3558        nodes.insert(
3559            "test_node".to_string(),
3560            NodeFnCommand(
3561                |_s: &TestState| -> std::pin::Pin<
3562                    Box<
3563                        dyn std::future::Future<
3564                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3565                            > + Send,
3566                    >,
3567                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3568            )
3569            .into_node("test_node"),
3570        );
3571
3572        let trigger_table = TriggerTable::new();
3573        let config = crate::config::RunnableConfig::new();
3574
3575        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3576
3577        let (tx, mut rx) = mpsc::unbounded_channel();
3578        loop_.stream_tx = Some(tx);
3579
3580        // Mix of visible and hidden node signals
3581        let signals = vec![
3582            crate::interrupt::InterruptSignal {
3583                index: 0,
3584                id: Some("int-visible".to_string()),
3585                payload: serde_json::json!({"node": "agent"}),
3586                timestamp: Utc::now(),
3587            },
3588            crate::interrupt::InterruptSignal {
3589                index: 1,
3590                id: Some("int-hidden".to_string()),
3591                payload: serde_json::json!({"node": "__route__"}),
3592                timestamp: Utc::now(),
3593            },
3594            crate::interrupt::InterruptSignal {
3595                index: 2,
3596                id: Some("int-also-visible".to_string()),
3597                payload: serde_json::json!({"node": "review"}),
3598                timestamp: Utc::now(),
3599            },
3600        ];
3601        let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
3602            interrupts: signals,
3603            step: 1,
3604            namespace: vec![],
3605        })];
3606
3607        let _ = loop_.handle_bubble_ups(&bubble_ups);
3608
3609        // Should receive exactly 2 events (agent and review), __route__ filtered
3610        let mut received_nodes = Vec::new();
3611        while let Ok(event) = rx.try_recv() {
3612            match event {
3613                StreamEvent::Interrupt { node, .. } => received_nodes.push(node),
3614                other => panic!("unexpected event: {other:?}"),
3615            }
3616        }
3617        assert_eq!(
3618            received_nodes,
3619            vec!["agent", "review"],
3620            "hidden node __route__ should be filtered from stream"
3621        );
3622    }
3623
3624    /// Verify that all-hidden-node signals produce zero stream events.
3625    #[test]
3626    fn test_all_hidden_nodes_produce_no_stream_events() {
3627        let state = TestState;
3628        let mut nodes = IndexMap::new();
3629        nodes.insert(
3630            "test_node".to_string(),
3631            NodeFnCommand(
3632                |_s: &TestState| -> std::pin::Pin<
3633                    Box<
3634                        dyn std::future::Future<
3635                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3636                            > + Send,
3637                    >,
3638                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3639            )
3640            .into_node("test_node"),
3641        );
3642
3643        let trigger_table = TriggerTable::new();
3644        let config = crate::config::RunnableConfig::new();
3645
3646        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3647
3648        let (tx, mut rx) = mpsc::unbounded_channel();
3649        loop_.stream_tx = Some(tx);
3650
3651        let signals = vec![
3652            crate::interrupt::InterruptSignal {
3653                index: 0,
3654                id: Some("int-h1".to_string()),
3655                payload: serde_json::json!({"node": "__route__"}),
3656                timestamp: Utc::now(),
3657            },
3658            crate::interrupt::InterruptSignal {
3659                index: 1,
3660                id: Some("int-h2".to_string()),
3661                payload: serde_json::json!({"node": "__handler__"}),
3662                timestamp: Utc::now(),
3663            },
3664        ];
3665        let bubble_ups = vec![BubbleUp::Interrupt(crate::pregel::types::GraphInterrupt {
3666            interrupts: signals,
3667            step: 1,
3668            namespace: vec![],
3669        })];
3670
3671        let _ = loop_.handle_bubble_ups(&bubble_ups);
3672
3673        // No events should be emitted
3674        assert!(
3675            rx.try_recv().is_err(),
3676            "all-hidden signals should produce no stream events"
3677        );
3678        // But pending_interrupts and status still reflect all signals (internal state)
3679        assert_eq!(loop_.pending_interrupts.len(), 2);
3680    }
3681
3682    // --- B-03-003: Durability mode tests ---
3683
3684    /// Verify that `effective_durability` defaults to `Sync` when no durability
3685    /// is configured in `RunnableConfig`.
3686    #[test]
3687    fn test_effective_durability_defaults_to_sync() {
3688        let state = TestState;
3689        let mut nodes = IndexMap::new();
3690        nodes.insert(
3691            "test_node".to_string(),
3692            NodeFnCommand(
3693                |_s: &TestState| -> std::pin::Pin<
3694                    Box<
3695                        dyn std::future::Future<
3696                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3697                            > + Send,
3698                    >,
3699                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3700            )
3701            .into_node("test_node"),
3702        );
3703        let trigger_table = TriggerTable::new();
3704        let config = crate::config::RunnableConfig::new();
3705
3706        let loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3707        assert_eq!(
3708            loop_.effective_durability(),
3709            Durability::Sync,
3710            "default durability should be Sync"
3711        );
3712    }
3713
3714    /// Verify that `Durability::Exit` skips superstep checkpoints but saves
3715    /// a final checkpoint on clean completion.
3716    #[tokio::test]
3717    async fn test_durability_exit_skips_superstep_saves_final() {
3718        let state = TestState;
3719
3720        let mut nodes = IndexMap::new();
3721        nodes.insert(
3722            "test_node".to_string(),
3723            NodeFnCommand(
3724                |_s: &TestState| -> std::pin::Pin<
3725                    Box<
3726                        dyn std::future::Future<
3727                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3728                            > + Send,
3729                    >,
3730                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3731            )
3732            .into_node("test_node"),
3733        );
3734
3735        let trigger_table = TriggerTable::new();
3736        let mut config = crate::config::RunnableConfig::new();
3737        config.thread_id = Some("test-thread".to_string());
3738        config.durability = Some(Durability::Exit);
3739
3740        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3741
3742        let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3743        let checkpointer = TrackingCheckpointer {
3744            observed: Arc::clone(&observed),
3745        };
3746        loop_.set_checkpointer(Arc::new(checkpointer));
3747
3748        // Execute one superstep -- no superstep checkpoint should be saved
3749        // in Exit mode; only the final exit checkpoint (when pending_tasks
3750        // is empty) should be persisted.
3751        loop_.pending_tasks = vec![PendingTask::pull(
3752            uuid::Uuid::new_v4().to_string(),
3753            "test_node".to_string(),
3754        )];
3755        let _ = loop_.execute_superstep().await;
3756        let _ = loop_.after_tick(SuperstepResult::empty()).await;
3757
3758        // Exactly one checkpoint should be saved (the final exit checkpoint,
3759        // since compute_next_tasks returns empty for an end() command).
3760        let calls = observed
3761            .lock()
3762            .unwrap_or_else(std::sync::PoisonError::into_inner)
3763            .clone();
3764        assert_eq!(
3765            calls.len(),
3766            1,
3767            "Exit mode should save exactly one final checkpoint"
3768        );
3769        assert!(
3770            matches!(
3771                &calls[0],
3772                ObservedCall::Put {
3773                    source: crate::checkpoint::CheckpointSource::Loop,
3774                    step: 0
3775                }
3776            ),
3777            "Final exit checkpoint should have Loop source at step 0"
3778        );
3779    }
3780
3781    /// Verify that `Durability::Sync` saves a superstep checkpoint (default behavior).
3782    #[tokio::test]
3783    async fn test_durability_sync_saves_superstep_checkpoint() {
3784        let state = TestState;
3785
3786        let mut nodes = IndexMap::new();
3787        nodes.insert(
3788            "test_node".to_string(),
3789            NodeFnCommand(
3790                |_s: &TestState| -> std::pin::Pin<
3791                    Box<
3792                        dyn std::future::Future<
3793                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3794                            > + Send,
3795                    >,
3796                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3797            )
3798            .into_node("test_node"),
3799        );
3800
3801        let trigger_table = TriggerTable::new();
3802        let mut config = crate::config::RunnableConfig::new();
3803        config.thread_id = Some("test-thread".to_string());
3804        config.durability = Some(Durability::Sync);
3805
3806        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3807
3808        let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3809        let checkpointer = TrackingCheckpointer {
3810            observed: Arc::clone(&observed),
3811        };
3812        loop_.set_checkpointer(Arc::new(checkpointer));
3813
3814        // Execute one superstep -- a Loop checkpoint should be saved
3815        loop_.pending_tasks = vec![PendingTask::pull(
3816            uuid::Uuid::new_v4().to_string(),
3817            "test_node".to_string(),
3818        )];
3819        let _ = loop_.execute_superstep().await;
3820        let _ = loop_.after_tick(SuperstepResult::empty()).await;
3821
3822        let has_loop_checkpoint = {
3823            let calls = observed
3824                .lock()
3825                .unwrap_or_else(std::sync::PoisonError::into_inner);
3826            calls.iter().any(|c| {
3827                matches!(
3828                    c,
3829                    ObservedCall::Put {
3830                        source: crate::checkpoint::CheckpointSource::Loop,
3831                        step: 0,
3832                    }
3833                )
3834            })
3835        };
3836        assert!(
3837            has_loop_checkpoint,
3838            "Sync mode should save a Loop checkpoint at step 0"
3839        );
3840    }
3841
3842    /// Verify that `Durability::Exit` still saves interrupt checkpoints.
3843    #[tokio::test]
3844    async fn test_durability_exit_saves_interrupt_checkpoint() {
3845        let state = TestState;
3846
3847        let mut nodes = IndexMap::new();
3848        nodes.insert(
3849            "test_node".to_string(),
3850            NodeFnCommand(
3851                |_s: &TestState| -> std::pin::Pin<
3852                    Box<
3853                        dyn std::future::Future<
3854                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3855                            > + Send,
3856                    >,
3857                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3858            )
3859            .into_node("test_node"),
3860        );
3861
3862        let trigger_table = TriggerTable::new();
3863        let mut config = crate::config::RunnableConfig::new();
3864        config.thread_id = Some("test-thread".to_string());
3865        config.durability = Some(Durability::Exit);
3866
3867        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3868
3869        let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
3870        let checkpointer = TrackingCheckpointer {
3871            observed: Arc::clone(&observed),
3872        };
3873        loop_.set_checkpointer(Arc::new(checkpointer));
3874
3875        // Simulate an interrupt scenario
3876        loop_.pending_interrupts = vec![crate::interrupt::InterruptSignal {
3877            index: 0,
3878            id: Some("int-exit-test".to_string()),
3879            payload: serde_json::json!({"node": "test_node"}),
3880            timestamp: Utc::now(),
3881        }];
3882        loop_.save_interrupt_checkpoint("test_node").await;
3883
3884        let has_interrupt_checkpoint = {
3885            let calls = observed
3886                .lock()
3887                .unwrap_or_else(std::sync::PoisonError::into_inner);
3888            calls.iter().any(|c| {
3889                matches!(
3890                    c,
3891                    ObservedCall::Put {
3892                        source: crate::checkpoint::CheckpointSource::Interrupt { .. },
3893                        step: 0,
3894                    }
3895                )
3896            })
3897        };
3898        assert!(
3899            has_interrupt_checkpoint,
3900            "Exit mode should still save interrupt checkpoints"
3901        );
3902    }
3903
3904    // --- B-08-001: Budget tracker Arc sharing tests ---
3905
3906    /// Verify that `BudgetTracker` is shared between `PregelLoop` and `RunnableConfig`
3907    /// via Arc, so tokens reported through `config.budget_tracker()` are visible
3908    /// to the loop's budget check method.
3909    #[tokio::test]
3910    async fn test_budget_tracker_arc_sharing() {
3911        let state = TestState;
3912        let mut nodes = IndexMap::new();
3913        nodes.insert(
3914            "test_node".to_string(),
3915            NodeFnCommand(
3916                |_s: &TestState| -> std::pin::Pin<
3917                    Box<
3918                        dyn std::future::Future<
3919                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3920                            > + Send,
3921                    >,
3922                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3923            )
3924            .into_node("test_node"),
3925        );
3926
3927        let trigger_table = TriggerTable::new();
3928        let budget = crate::pregel::budget::BudgetConfig::new().with_max_tokens(100);
3929        let config = crate::config::RunnableConfig::new().with_budget(budget);
3930
3931        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
3932
3933        // Set up the shared budget tracker (normally done in compiled.rs)
3934        let tracker_config = loop_.runnable_config.budget.clone().unwrap();
3935        loop_.set_budget_tracker(BudgetTracker::new(tracker_config));
3936
3937        // Initially, no tokens reported, budget not exceeded
3938        assert!(loop_.budget_tracker.as_ref().unwrap().check().is_none());
3939
3940        // Report tokens via the RunnableConfig's budget_tracker (the node's view)
3941        if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
3942            tracker.report_model_call(30, 20); // 50 total tokens
3943        }
3944
3945        // The loop's budget tracker should reflect the same usage (Arc sharing)
3946        let usage = loop_.budget_tracker.as_ref().unwrap().current_usage();
3947        assert_eq!(usage.tokens_used, 50);
3948
3949        // Budget not exceeded yet
3950        assert!(loop_.budget_tracker.as_ref().unwrap().check().is_none());
3951
3952        // Report more tokens to exceed the limit via the same shared tracker
3953        if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
3954            tracker.report_model_call(40, 30); // 70 more, total 120 > 100
3955        }
3956
3957        // Budget should now be exceeded
3958        assert!(loop_.budget_tracker.as_ref().unwrap().check().is_some());
3959        assert_eq!(
3960            loop_
3961                .budget_tracker
3962                .as_ref()
3963                .unwrap()
3964                .current_usage()
3965                .tokens_used,
3966            120
3967        );
3968
3969        // tick() should detect the exceeded budget and return an error
3970        let _ = loop_.tick().unwrap_err();
3971        assert!(loop_.status.is_terminal());
3972    }
3973
3974    /// Verify that multiple token reports via the `RunnableConfig` path
3975    /// accumulate correctly and pass through budget checks when a
3976    /// cost limit is configured.
3977    #[tokio::test]
3978    async fn test_budget_tracker_cost_via_config() {
3979        let state = TestState;
3980        let mut nodes = IndexMap::new();
3981        nodes.insert(
3982            "test_node".to_string(),
3983            NodeFnCommand(
3984                |_s: &TestState| -> std::pin::Pin<
3985                    Box<
3986                        dyn std::future::Future<
3987                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
3988                            > + Send,
3989                    >,
3990                > { Box::pin(async move { Ok(crate::Command::end()) }) },
3991            )
3992            .into_node("test_node"),
3993        );
3994
3995        let trigger_table = TriggerTable::new();
3996        let budget = crate::pregel::budget::BudgetConfig::new().with_max_cost_usd(0.01);
3997        let config = crate::config::RunnableConfig::new().with_budget(budget);
3998
3999        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4000
4001        let tracker_config = loop_.runnable_config.budget.clone().unwrap();
4002        loop_.set_budget_tracker(BudgetTracker::new(tracker_config));
4003
4004        // Report costs via the RunnableConfig (simulating multiple LLM calls)
4005        if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
4006            tracker.report_cost(0.003);
4007            tracker.report_cost(0.004);
4008        }
4009
4010        // Combined cost is below limit
4011        let usage = loop_.budget_tracker.as_ref().unwrap().current_usage();
4012        assert!((usage.cost_usd - 0.007).abs() < 0.0001);
4013        assert!(loop_.budget_tracker.as_ref().unwrap().check().is_none());
4014
4015        // Third call pushes cost over the limit
4016        if let Some(ref tracker) = loop_.runnable_config.budget_tracker {
4017            tracker.report_cost(0.004); // total now 0.011 > 0.01
4018        }
4019
4020        assert!(loop_.budget_tracker.as_ref().unwrap().check().is_some());
4021
4022        // tick() should detect the exceeded budget
4023        let _ = loop_.tick().unwrap_err();
4024        assert!(loop_.status.is_terminal());
4025    }
4026
4027    /// Verify that `Durability::Async` does not block on checkpoint persistence.
4028    #[tokio::test]
4029    async fn test_durability_async_does_not_block() {
4030        let state = TestState;
4031
4032        let mut nodes = IndexMap::new();
4033        nodes.insert(
4034            "test_node".to_string(),
4035            NodeFnCommand(
4036                |_s: &TestState| -> std::pin::Pin<
4037                    Box<
4038                        dyn std::future::Future<
4039                                Output = Result<crate::Command<TestState>, crate::JunctureError>,
4040                            > + Send,
4041                    >,
4042                > { Box::pin(async move { Ok(crate::Command::end()) }) },
4043            )
4044            .into_node("test_node"),
4045        );
4046
4047        let trigger_table = TriggerTable::new();
4048        let mut config = crate::config::RunnableConfig::new();
4049        config.thread_id = Some("test-thread".to_string());
4050        config.durability = Some(Durability::Async);
4051
4052        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4053
4054        let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
4055        let checkpointer = TrackingCheckpointer {
4056            observed: Arc::clone(&observed),
4057        };
4058        loop_.set_checkpointer(Arc::new(checkpointer));
4059
4060        // Execute one superstep
4061        loop_.pending_tasks = vec![PendingTask::pull(
4062            uuid::Uuid::new_v4().to_string(),
4063            "test_node".to_string(),
4064        )];
4065        let _ = loop_.execute_superstep().await;
4066        let _ = loop_.after_tick(SuperstepResult::empty()).await;
4067
4068        // In Async mode, the put() is spawned as a background task. Give it
4069        // a brief moment to execute before checking.
4070        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4071
4072        // The checkpoint should eventually be persisted by the spawned task.
4073        let has_checkpoint = {
4074            let calls = observed
4075                .lock()
4076                .unwrap_or_else(std::sync::PoisonError::into_inner);
4077            calls.iter().any(|c| {
4078                matches!(
4079                    c,
4080                    ObservedCall::Put {
4081                        source: crate::checkpoint::CheckpointSource::Loop,
4082                        step: 0,
4083                    }
4084                )
4085            })
4086        };
4087        assert!(
4088            has_checkpoint,
4089            "Async mode should eventually persist the checkpoint via spawned task"
4090        );
4091    }
4092
4093    // --- B-05-002: Command stream_data tests ---
4094
4095    /// Verify that a task output with `stream_data` produces `StreamEvent::Custom`
4096    /// events during `after_tick`.
4097    #[tokio::test]
4098    async fn test_stream_data_emits_custom_events() {
4099        let state = TestState;
4100        let nodes = IndexMap::new();
4101        let trigger_table = TriggerTable::new();
4102        let config = crate::config::RunnableConfig::new();
4103
4104        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4105
4106        let (tx, mut rx) = mpsc::unbounded_channel();
4107        loop_.stream_tx = Some(tx);
4108
4109        // Build a SuperstepResult with a task output that has stream_data
4110        let result = SuperstepResult {
4111            task_outputs: vec![TaskOutput {
4112                triggered_fields: vec![],
4113                task_id: "task-1".to_string(),
4114                node_name: "test_node".to_string(),
4115                command: Command::end()
4116                    .with_stream_data(serde_json::json!({"event": "first"}))
4117                    .with_stream_data(serde_json::json!({"event": "second"})),
4118                duration: std::time::Duration::from_millis(1),
4119                trigger: TaskTrigger::Pull,
4120                error: None,
4121            }],
4122            bubble_ups: Vec::new(),
4123        };
4124
4125        let () = loop_.after_tick(result).await.unwrap();
4126
4127        // Collect Custom events from the stream
4128        let mut custom_data = Vec::new();
4129        while let Ok(event) = rx.try_recv() {
4130            if let StreamEvent::Custom { node, data, ns } = event {
4131                assert_eq!(node, "test_node");
4132                assert!(ns.is_empty());
4133                custom_data.push(data);
4134            }
4135        }
4136
4137        assert_eq!(custom_data.len(), 2, "should emit two custom events");
4138        assert_eq!(custom_data[0], serde_json::json!({"event": "first"}));
4139        assert_eq!(custom_data[1], serde_json::json!({"event": "second"}));
4140    }
4141
4142    /// Verify that a task output without `stream_data` produces no Custom events.
4143    #[tokio::test]
4144    async fn test_stream_data_empty_produces_no_custom_events() {
4145        let state = TestState;
4146        let nodes = IndexMap::new();
4147        let trigger_table = TriggerTable::new();
4148        let config = crate::config::RunnableConfig::new();
4149
4150        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4151
4152        let (tx, mut rx) = mpsc::unbounded_channel();
4153        loop_.stream_tx = Some(tx);
4154
4155        // Build a SuperstepResult with a task output that has NO stream_data
4156        let result = SuperstepResult {
4157            task_outputs: vec![TaskOutput {
4158                triggered_fields: vec![],
4159                task_id: "task-1".to_string(),
4160                node_name: "test_node".to_string(),
4161                command: Command::end(),
4162                duration: std::time::Duration::from_millis(1),
4163                trigger: TaskTrigger::Pull,
4164                error: None,
4165            }],
4166            bubble_ups: Vec::new(),
4167        };
4168
4169        let () = loop_.after_tick(result).await.unwrap();
4170
4171        // No Custom events should be emitted
4172        while let Ok(event) = rx.try_recv() {
4173            assert!(
4174                !matches!(event, StreamEvent::Custom { .. }),
4175                "no Custom events expected for empty stream_data"
4176            );
4177        }
4178    }
4179
4180    /// Verify that `stream_data` from multiple task outputs are all emitted.
4181    #[tokio::test]
4182    async fn test_stream_data_multiple_tasks() {
4183        let state = TestState;
4184        let nodes = IndexMap::new();
4185        let trigger_table = TriggerTable::new();
4186        let config = crate::config::RunnableConfig::new();
4187
4188        let mut loop_ = PregelLoop::new(state, nodes, trigger_table, config, 0).unwrap();
4189
4190        let (tx, mut rx) = mpsc::unbounded_channel();
4191        loop_.stream_tx = Some(tx);
4192
4193        // Build a SuperstepResult with two task outputs, one with stream_data
4194        let result = SuperstepResult {
4195            task_outputs: vec![
4196                TaskOutput {
4197                    triggered_fields: vec![],
4198                    task_id: "task-1".to_string(),
4199                    node_name: "node_a".to_string(),
4200                    command: Command::end().with_stream_data(serde_json::json!("from_a")),
4201                    duration: std::time::Duration::from_millis(1),
4202                    trigger: TaskTrigger::Pull,
4203                    error: None,
4204                },
4205                TaskOutput {
4206                    triggered_fields: vec![],
4207                    task_id: "task-2".to_string(),
4208                    node_name: "node_b".to_string(),
4209                    command: Command::end(),
4210                    duration: std::time::Duration::from_millis(2),
4211                    trigger: TaskTrigger::Pull,
4212                    error: None,
4213                },
4214            ],
4215            bubble_ups: Vec::new(),
4216        };
4217
4218        let () = loop_.after_tick(result).await.unwrap();
4219
4220        // Collect Custom events from the stream
4221        let mut custom_events = Vec::new();
4222        while let Ok(event) = rx.try_recv() {
4223            if let StreamEvent::Custom { node, data, .. } = event {
4224                custom_events.push((node, data));
4225            }
4226        }
4227
4228        assert_eq!(
4229            custom_events.len(),
4230            1,
4231            "only node_a should emit a custom event"
4232        );
4233        assert_eq!(custom_events[0].0, "node_a");
4234        assert_eq!(custom_events[0].1, serde_json::json!("from_a"));
4235    }
4236}
4237
4238// Rust guideline compliant 2026-05-22