Skip to main content

juncture_core/
checkpoint.rs

1//! Checkpoint persistence types and traits
2//!
3//! Defines the checkpoint saver trait and all checkpoint-related types.
4//!
5//! Storage implementations (`MemorySaver`, `SqliteSaver`, etc.) are provided
6//! by the `juncture-checkpoint` crate, which implements this trait.
7
8use std::collections::HashMap;
9
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12
13use crate::config::RunnableConfig;
14
15/// Separator used between namespace segments in checkpoint namespace strings.
16///
17/// The pipe character `|` is used instead of colon `:` to avoid ambiguity
18/// with UUID v6 string representation which already contains colons.
19/// See design doc 04-checkpoint.md, Implementation Note C-04-5.
20pub const CHECKPOINT_NS_SEPARATOR: &str = "|";
21
22/// Checkpoint operation errors
23///
24/// Represents all possible errors that can occur during checkpoint operations.
25/// This type is defined in `juncture-core` for use in the `CheckpointSaver` trait.
26/// The juncture-checkpoint crate provides a compatible implementation with
27/// additional storage-specific errors.
28#[derive(Debug, thiserror::Error)]
29pub enum CheckpointError {
30    /// Serialization failed
31    #[error("Serialization failed: {0}")]
32    Serialize(#[source] Box<dyn std::error::Error + Send + Sync>),
33
34    /// Deserialization failed
35    #[error("Deserialization failed: {0}")]
36    Deserialize(#[source] Box<dyn std::error::Error + Send + Sync>),
37
38    /// Checkpoint not found
39    #[error("Checkpoint not found: thread={thread_id}, id={checkpoint_id}")]
40    NotFound {
41        /// Thread identifier
42        thread_id: String,
43        /// Checkpoint identifier
44        checkpoint_id: String,
45    },
46
47    /// Storage operation error
48    #[error("Storage error: {0}")]
49    Storage(#[source] Box<dyn std::error::Error + Send + Sync>),
50
51    /// Other checkpoint errors
52    #[error("Checkpoint error: {0}")]
53    Other(String),
54}
55
56impl From<serde_json::Error> for CheckpointError {
57    fn from(err: serde_json::Error) -> Self {
58        Self::Serialize(Box::new(err))
59    }
60}
61
62/// Single namespace segment with node name and invocation UUID
63///
64/// Represents one level in a hierarchical checkpoint namespace,
65/// combining a node name with a unique invocation identifier.
66///
67/// # Examples
68///
69/// ```ignore
70/// use juncture_core::checkpoint::NamespaceSegment;
71///
72/// let segment = NamespaceSegment::new("review".to_string(), "uuid-1234".to_string());
73/// assert_eq!(segment.as_str(), "review:uuid-1234");
74/// ```
75#[derive(Clone, Debug, PartialEq, Eq, Hash)]
76pub struct NamespaceSegment {
77    /// Node name for this segment
78    pub node_name: String,
79
80    /// Unique invocation identifier (UUID v4)
81    pub invocation_id: String,
82}
83
84impl NamespaceSegment {
85    /// Create a new namespace segment
86    ///
87    /// # Arguments
88    ///
89    /// * `node_name` - The node name
90    /// * `invocation_id` - The unique invocation ID
91    #[must_use]
92    pub const fn new(node_name: String, invocation_id: String) -> Self {
93        Self {
94            node_name,
95            invocation_id,
96        }
97    }
98
99    /// Get the segment as a string
100    ///
101    /// Returns the segment in the format `node_name:invocation_id`.
102    #[must_use]
103    pub fn as_str(&self) -> String {
104        format!("{}:{}", self.node_name, self.invocation_id)
105    }
106}
107
108impl std::fmt::Display for NamespaceSegment {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        write!(f, "{}", self.as_str())
111    }
112}
113
114/// Namespace for checkpoint isolation in subgraph execution
115///
116/// Provides hierarchical namespace isolation to prevent checkpoint
117/// collisions when executing nested subgraphs.
118///
119/// The wire format uses a leading `|` per segment with `node_name:invocation_id`
120/// pairs, e.g. `"|review:uuid1|detail:uuid2"`. The root namespace is `""`.
121///
122/// # Examples
123///
124/// ```ignore
125/// use juncture_core::checkpoint::CheckpointNamespace;
126///
127/// let root_ns = CheckpointNamespace::root();
128/// let child_ns = root_ns.child("review", "550e8400-e29b-41d4-a716-446655440000");
129/// let grandchild_ns = child_ns.child("detail", "6ba7b810-9dad-11d1-80b4-00c04fd430c8");
130///
131/// assert_eq!(root_ns.as_str(), "");
132/// assert_eq!(child_ns.as_str(), "|review:550e8400-e29b-41d4-a716-446655440000");
133/// assert_eq!(grandchild_ns.as_str(), "|review:550e8400-e29b-41d4-a716-446655440000|detail:6ba7b810-9dad-11d1-80b4-00c04fd430c8");
134/// ```
135#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
136pub struct CheckpointNamespace {
137    /// Namespace segments forming a hierarchical path
138    pub segments: Vec<NamespaceSegment>,
139}
140
141impl CheckpointNamespace {
142    /// Create a new root namespace (empty path)
143    #[must_use]
144    pub const fn root() -> Self {
145        Self {
146            segments: Vec::new(),
147        }
148    }
149
150    /// Create a namespace from segments
151    #[must_use]
152    pub const fn new(segments: Vec<NamespaceSegment>) -> Self {
153        Self { segments }
154    }
155
156    /// Create a child namespace by appending a new segment
157    ///
158    /// # Arguments
159    ///
160    /// * `node_name` - The node name for this nesting level
161    /// * `invocation_id` - The unique invocation identifier (typically UUID v4)
162    #[must_use]
163    pub fn child(&self, node_name: &str, invocation_id: &str) -> Self {
164        let mut segments = self.segments.clone();
165        segments.push(NamespaceSegment {
166            node_name: node_name.to_string(),
167            invocation_id: invocation_id.to_string(),
168        });
169        Self { segments }
170    }
171
172    /// Get the parent namespace by removing the last segment
173    ///
174    /// Returns `None` if this is already the root namespace.
175    #[must_use]
176    pub fn parent(&self) -> Option<Self> {
177        if self.segments.is_empty() {
178            None
179        } else {
180            let segments = self.segments[..self.segments.len() - 1].to_vec();
181            Some(Self { segments })
182        }
183    }
184
185    /// Check if this is a root namespace
186    #[must_use]
187    pub const fn is_root(&self) -> bool {
188        self.segments.is_empty()
189    }
190
191    /// Convert to string representation using the design-spec wire format
192    ///
193    /// Each segment is prefixed with `|` and formatted as `|node_name:invocation_id`.
194    /// Root produces `""`.
195    #[must_use]
196    pub fn as_str(&self) -> String {
197        self.segments.iter().fold(String::new(), |mut acc, s| {
198            acc.push('|');
199            acc.push_str(&s.node_name);
200            acc.push(':');
201            acc.push_str(&s.invocation_id);
202            acc
203        })
204    }
205
206    /// Convert to string representation (alias for `as_str`)
207    #[allow(
208        clippy::should_implement_trait,
209        clippy::inherent_to_string_shadow_display,
210        reason = "required by design spec 04-027"
211    )]
212    #[must_use]
213    pub fn to_string(&self) -> String {
214        self.as_str()
215    }
216
217    /// Parse from the design-spec wire format `|name:id|name:id`
218    ///
219    /// Empty string produces root. Each segment is split on the first `:`
220    /// to extract `node_name` and `invocation_id`.
221    #[must_use]
222    pub fn parse(s: &str) -> Self {
223        if s.is_empty() {
224            return Self::root();
225        }
226        let trimmed = s.trim_start_matches('|');
227        let segments = trimmed
228            .split('|')
229            .filter_map(|seg| {
230                let (node_name, invocation_id) = seg.split_once(':')?;
231                Some(NamespaceSegment {
232                    node_name: node_name.to_string(),
233                    invocation_id: invocation_id.to_string(),
234                })
235            })
236            .collect();
237        Self { segments }
238    }
239}
240
241impl std::fmt::Display for CheckpointNamespace {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        write!(f, "{}", self.as_str())
244    }
245}
246
247impl From<Vec<NamespaceSegment>> for CheckpointNamespace {
248    fn from(segments: Vec<NamespaceSegment>) -> Self {
249        Self::new(segments)
250    }
251}
252
253impl From<&str> for CheckpointNamespace {
254    fn from(s: &str) -> Self {
255        Self::parse(s)
256    }
257}
258
259/// Checkpoint persistence interface
260///
261/// Trait defining operations for saving, loading, and listing checkpoints.
262/// This trait is implemented by storage backends in the `juncture-checkpoint` crate.
263///
264/// # Example
265///
266/// ```ignore
267/// use juncture_core::CheckpointSaver;
268/// use juncture_checkpoint::MemorySaver;
269///
270/// let saver = MemorySaver::new();
271/// // Use saver as a CheckpointSaver trait object...
272/// ```
273#[async_trait]
274pub trait CheckpointSaver: Send + Sync + 'static {
275    /// Get checkpoint tuple by configuration
276    ///
277    /// # Errors
278    ///
279    /// Returns [`CheckpointError`] if retrieval fails.
280    async fn get_tuple(
281        &self,
282        config: &RunnableConfig,
283    ) -> Result<Option<CheckpointTuple>, CheckpointError>;
284
285    /// List checkpoints with optional filtering
286    ///
287    /// # Errors
288    ///
289    /// Returns [`CheckpointError`] if listing fails.
290    async fn list(
291        &self,
292        config: &RunnableConfig,
293        filter: Option<CheckpointFilter>,
294    ) -> Result<Vec<CheckpointTuple>, CheckpointError>;
295
296    /// Save a checkpoint
297    ///
298    /// # Errors
299    ///
300    /// Returns [`CheckpointError`] if saving fails.
301    async fn put(
302        &self,
303        config: &RunnableConfig,
304        checkpoint: Checkpoint,
305        metadata: CheckpointMetadata,
306    ) -> Result<RunnableConfig, CheckpointError>;
307
308    /// Save incremental writes from a completed task
309    ///
310    /// # Errors
311    ///
312    /// Returns [`CheckpointError`] if saving fails.
313    async fn put_writes(
314        &self,
315        config: &RunnableConfig,
316        writes: Vec<PendingWrite>,
317        task_id: &str,
318    ) -> Result<(), CheckpointError>;
319}
320
321/// Complete checkpoint state
322///
323/// Captures the entire state of a graph execution at a specific point in time,
324/// including channel values, versions, pending tasks, and metadata.
325#[derive(Clone, Debug, Serialize, Deserialize)]
326pub struct Checkpoint {
327    /// Unique checkpoint identifier (UUID v6, time-ordered)
328    pub id: String,
329
330    /// Serialized channel values (JSON or `MessagePack`)
331    pub channel_values: serde_json::Value,
332
333    /// Version number for each channel
334    ///
335    /// Keys are channel names, values are monotonically increasing version numbers.
336    pub channel_versions: HashMap<String, u64>,
337
338    /// Versions of channels each node has consumed
339    ///
340    /// Outer key is node name, inner key is channel name, value is version consumed.
341    pub versions_seen: HashMap<String, HashMap<String, u64>>,
342
343    /// Tasks pending execution in the next superstep
344    pub pending_tasks: Vec<CheckpointPendingTask>,
345
346    /// Pending Send operations awaiting delivery
347    pub pending_sends: Vec<SerializedSend>,
348
349    /// Interrupt signals captured when execution was interrupted
350    ///
351    /// Populated when checkpoint source is `CheckpointSource::Interrupt`.
352    /// Used for ID-based resume to match incoming resume values.
353    #[serde(default)]
354    pub pending_interrupts: Vec<crate::interrupt::InterruptSignal>,
355
356    /// State schema version for migration support
357    pub schema_version: u32,
358
359    /// ISO 8601 timestamp of checkpoint creation
360    pub created_at: String,
361
362    /// Checkpoint format version
363    ///
364    /// Used for forward compatibility when Checkpoint structure changes.
365    pub v: u32,
366
367    /// Channels updated in this checkpoint
368    ///
369    /// Keys are channel names, values are the new version numbers.
370    pub new_versions: HashMap<String, u64>,
371
372    /// Delta counters since last full snapshot
373    ///
374    /// Keys are channel names, values track changes since last complete snapshot.
375    pub counters_since_delta_snapshot: HashMap<String, DeltaCounters>,
376}
377
378/// Delta tracking counters for a channel
379///
380/// Tracks incremental changes since the last complete snapshot,
381/// enabling efficient `DeltaChannel` optimization.
382#[derive(Clone, Debug, Default, Serialize, Deserialize)]
383pub struct DeltaCounters {
384    /// Number of updates since last snapshot
385    pub updates: u64,
386
387    /// Number of supersteps since last snapshot
388    pub supersteps: u64,
389}
390
391impl DeltaCounters {
392    /// Create a new delta counter with zero values.
393    #[must_use]
394    pub const fn new() -> Self {
395        Self {
396            updates: 0,
397            supersteps: 0,
398        }
399    }
400
401    /// Check if this channel's update count exceeds the given snapshot frequency.
402    ///
403    /// Returns `true` when a full snapshot should be taken instead of a delta.
404    /// A frequency of zero is treated as "always snapshot" (snapshot on every write).
405    #[must_use]
406    pub fn exceeds_frequency(&self, snapshot_frequency: usize) -> bool {
407        if snapshot_frequency == 0 {
408            return true;
409        }
410        usize::try_from(self.updates).unwrap_or(usize::MAX) >= snapshot_frequency
411    }
412}
413
414/// Checkpoint metadata
415///
416/// Provides context about how and when a checkpoint was created.
417#[derive(Clone, Debug, Serialize, Deserialize)]
418pub struct CheckpointMetadata {
419    /// Source of the checkpoint creation
420    pub source: CheckpointSource,
421
422    /// Superstep sequence number
423    pub step: i64,
424
425    /// Summary of writes from each node in this superstep
426    pub writes: HashMap<String, serde_json::Value>,
427
428    /// Parent checkpoint relationships
429    ///
430    /// Maps namespace to parent `checkpoint_id`.
431    pub parents: HashMap<String, String>,
432
433    /// Unique identifier for this execution run
434    pub run_id: String,
435}
436
437/// Source of checkpoint creation
438///
439/// Indicates what triggered the checkpoint to be created.
440#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
441#[non_exhaustive]
442pub enum CheckpointSource {
443    /// Initial state when graph execution begins
444    Input,
445
446    /// End of each superstep loop iteration
447    Loop,
448
449    /// External state update via `update_state()`
450    Update,
451
452    /// Fork from a historical checkpoint
453    Fork,
454
455    /// Interrupt triggered by human-in-the-loop interaction
456    Interrupt { node: String },
457}
458
459/// Complete checkpoint tuple with all context
460///
461/// Combines checkpoint data with its metadata, configuration, and pending writes.
462/// This is the primary structure returned from checkpoint storage for recovery.
463#[derive(Clone, Debug)]
464pub struct CheckpointTuple {
465    /// Configuration containing `thread_id`, `checkpoint_id`, and `checkpoint_ns`
466    pub config: RunnableConfig,
467
468    /// The checkpoint itself
469    pub checkpoint: Checkpoint,
470
471    /// Checkpoint metadata
472    pub metadata: CheckpointMetadata,
473
474    /// Incremental writes since this checkpoint
475    ///
476    /// Used for crash recovery: these writes completed after the checkpoint
477    /// and before the next checkpoint, so they don't need to be re-executed.
478    pub pending_writes: Vec<PendingWrite>,
479
480    /// Parent checkpoint configuration
481    ///
482    /// Used for time-travel navigation.
483    pub parent_config: Option<RunnableConfig>,
484}
485
486/// Pending write from a completed task
487///
488/// Represents a channel write that completed after checkpoint creation
489/// but before the next checkpoint.
490#[derive(Clone, Debug, Serialize, Deserialize)]
491pub struct PendingWrite {
492    /// ID of the task that produced this write
493    pub task_id: String,
494
495    /// Target channel name
496    pub channel: String,
497
498    /// Serialized value to write
499    pub value: serde_json::Value,
500}
501
502/// Pending task in checkpoint
503///
504/// Represents a task scheduled for execution in the next superstep.
505#[derive(Clone, Debug, Serialize, Deserialize)]
506pub struct CheckpointPendingTask {
507    /// Unique task identifier (UUID)
508    pub id: String,
509
510    /// Target node name to execute
511    pub node: String,
512
513    /// Channels that triggered this task
514    pub triggers: Vec<String>,
515
516    /// Optional state override (used in Send API scenarios)
517    pub state_override: Option<serde_json::Value>,
518}
519
520/// Serialized Send operation
521///
522/// Represents a Send object flowing through the `__pregel_tasks` channel.
523#[derive(Clone, Debug, Serialize, Deserialize)]
524pub struct SerializedSend {
525    /// Destination node name
526    pub node: String,
527
528    /// Serialized state override
529    pub state: serde_json::Value,
530}
531
532/// Delta operation type
533///
534/// Defines how to apply delta values to a channel.
535#[derive(Clone, Debug, Serialize, Deserialize)]
536pub enum DeltaOp {
537    /// Append values to existing channel data
538    Append,
539
540    /// Replace entire channel data
541    Replace,
542}
543
544/// Checkpoint listing filter
545///
546/// Used to query checkpoint history with specific criteria.
547#[derive(Clone, Debug, Default)]
548pub struct CheckpointFilter {
549    /// Filter by checkpoint source
550    pub source: Option<CheckpointSource>,
551
552    /// Minimum step number (inclusive)
553    pub step_gte: Option<i64>,
554
555    /// Maximum step number (inclusive)
556    pub step_lte: Option<i64>,
557
558    /// Only checkpoints before this `checkpoint_id`
559    pub before: Option<String>,
560
561    /// Only checkpoints after this `checkpoint_id`
562    pub after: Option<String>,
563
564    /// Maximum number of checkpoints to return
565    pub limit: Option<usize>,
566}
567
568/// State snapshot at a specific checkpoint
569///
570/// Represents the deserialized, fully-hydrated execution state at a checkpoint.
571///
572/// # Type Parameters
573///
574/// * `S` - State type implementing the [`crate::State`] trait
575#[derive(Clone, Debug)]
576pub struct StateSnapshot<S: crate::State> {
577    /// The complete state values
578    pub values: S,
579
580    /// Next nodes to execute
581    pub next: Vec<String>,
582
583    /// Configuration with `checkpoint_id` for time-travel
584    pub config: RunnableConfig,
585
586    /// Checkpoint metadata
587    pub metadata: CheckpointMetadata,
588
589    /// ISO 8601 creation timestamp
590    pub created_at: String,
591
592    /// Parent checkpoint configuration
593    pub parent_config: Option<RunnableConfig>,
594
595    /// Task information for current superstep
596    pub tasks: Vec<PregelTaskInfo>,
597}
598
599/// Pregel task information
600///
601/// Provides execution status for tasks in a superstep.
602#[derive(Clone, Debug, Serialize, Deserialize)]
603pub struct PregelTaskInfo {
604    /// Task identifier
605    pub id: String,
606
607    /// Node name being executed
608    pub node_name: String,
609
610    /// Error if task failed
611    pub error: Option<String>,
612
613    /// Interrupt values if task was interrupted
614    pub interrupts: Vec<serde_json::Value>,
615}
616
617/// Generate a new time-ordered checkpoint ID using UUID v6.
618///
619/// UUID v6 reorders the timestamp bits from UUID v1 for lexicographic
620/// sortability, making checkpoint IDs suitable for range queries and
621/// time-ordered iteration without a separate timestamp column.
622///
623/// The node ID is derived from random bytes to ensure uniqueness across
624/// processes without requiring a persistent MAC address.
625///
626/// # Panics
627///
628/// Will not panic under normal circumstances. The uuid crate handles
629/// timestamp generation internally using a shared atomic context.
630#[must_use]
631pub fn generate_checkpoint_id() -> String {
632    // Random 6-byte node ID avoids the need for a persistent IEEE 802
633    // MAC address while still guaranteeing global uniqueness when
634    // combined with the timestamp and monotonic counter.
635    let node_id: [u8; 6] = rand::random();
636    uuid::Uuid::now_v6(&node_id).to_string()
637}
638
639// Rust guideline compliant 2026-05-21