juncture_core/checkpoint.rs
1//! Checkpoint persistence types and traits
2//!
3//! Defines the checkpoint saver trait and all checkpoint-related types.
4//!
5//! Storage implementations (`MemorySaver`, `SqliteSaver`, etc.) are provided
6//! by the `juncture-checkpoint` crate, which implements this trait.
7
8use std::collections::HashMap;
9
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12
13use crate::config::RunnableConfig;
14
15/// Separator used between namespace segments in checkpoint namespace strings.
16///
17/// The pipe character `|` is used instead of colon `:` to avoid ambiguity
18/// with UUID v6 string representation which already contains colons.
19/// See design doc 04-checkpoint.md, Implementation Note C-04-5.
20pub const CHECKPOINT_NS_SEPARATOR: &str = "|";
21
22/// Checkpoint operation errors
23///
24/// Represents all possible errors that can occur during checkpoint operations.
25/// This type is defined in `juncture-core` for use in the `CheckpointSaver` trait.
26/// The juncture-checkpoint crate provides a compatible implementation with
27/// additional storage-specific errors.
28#[derive(Debug, thiserror::Error)]
29pub enum CheckpointError {
30 /// Serialization failed
31 #[error("Serialization failed: {0}")]
32 Serialize(#[source] Box<dyn std::error::Error + Send + Sync>),
33
34 /// Deserialization failed
35 #[error("Deserialization failed: {0}")]
36 Deserialize(#[source] Box<dyn std::error::Error + Send + Sync>),
37
38 /// Checkpoint not found
39 #[error("Checkpoint not found: thread={thread_id}, id={checkpoint_id}")]
40 NotFound {
41 /// Thread identifier
42 thread_id: String,
43 /// Checkpoint identifier
44 checkpoint_id: String,
45 },
46
47 /// Storage operation error
48 #[error("Storage error: {0}")]
49 Storage(#[source] Box<dyn std::error::Error + Send + Sync>),
50
51 /// Other checkpoint errors
52 #[error("Checkpoint error: {0}")]
53 Other(String),
54}
55
56impl From<serde_json::Error> for CheckpointError {
57 fn from(err: serde_json::Error) -> Self {
58 Self::Serialize(Box::new(err))
59 }
60}
61
62/// Single namespace segment with node name and invocation UUID
63///
64/// Represents one level in a hierarchical checkpoint namespace,
65/// combining a node name with a unique invocation identifier.
66///
67/// # Examples
68///
69/// ```ignore
70/// use juncture_core::checkpoint::NamespaceSegment;
71///
72/// let segment = NamespaceSegment::new("review".to_string(), "uuid-1234".to_string());
73/// assert_eq!(segment.as_str(), "review:uuid-1234");
74/// ```
75#[derive(Clone, Debug, PartialEq, Eq, Hash)]
76pub struct NamespaceSegment {
77 /// Node name for this segment
78 pub node_name: String,
79
80 /// Unique invocation identifier (UUID v4)
81 pub invocation_id: String,
82}
83
84impl NamespaceSegment {
85 /// Create a new namespace segment
86 ///
87 /// # Arguments
88 ///
89 /// * `node_name` - The node name
90 /// * `invocation_id` - The unique invocation ID
91 #[must_use]
92 pub const fn new(node_name: String, invocation_id: String) -> Self {
93 Self {
94 node_name,
95 invocation_id,
96 }
97 }
98
99 /// Get the segment as a string
100 ///
101 /// Returns the segment in the format `node_name:invocation_id`.
102 #[must_use]
103 pub fn as_str(&self) -> String {
104 format!("{}:{}", self.node_name, self.invocation_id)
105 }
106}
107
108impl std::fmt::Display for NamespaceSegment {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 write!(f, "{}", self.as_str())
111 }
112}
113
114/// Namespace for checkpoint isolation in subgraph execution
115///
116/// Provides hierarchical namespace isolation to prevent checkpoint
117/// collisions when executing nested subgraphs.
118///
119/// The wire format uses a leading `|` per segment with `node_name:invocation_id`
120/// pairs, e.g. `"|review:uuid1|detail:uuid2"`. The root namespace is `""`.
121///
122/// # Examples
123///
124/// ```ignore
125/// use juncture_core::checkpoint::CheckpointNamespace;
126///
127/// let root_ns = CheckpointNamespace::root();
128/// let child_ns = root_ns.child("review", "550e8400-e29b-41d4-a716-446655440000");
129/// let grandchild_ns = child_ns.child("detail", "6ba7b810-9dad-11d1-80b4-00c04fd430c8");
130///
131/// assert_eq!(root_ns.as_str(), "");
132/// assert_eq!(child_ns.as_str(), "|review:550e8400-e29b-41d4-a716-446655440000");
133/// assert_eq!(grandchild_ns.as_str(), "|review:550e8400-e29b-41d4-a716-446655440000|detail:6ba7b810-9dad-11d1-80b4-00c04fd430c8");
134/// ```
135#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
136pub struct CheckpointNamespace {
137 /// Namespace segments forming a hierarchical path
138 pub segments: Vec<NamespaceSegment>,
139}
140
141impl CheckpointNamespace {
142 /// Create a new root namespace (empty path)
143 #[must_use]
144 pub const fn root() -> Self {
145 Self {
146 segments: Vec::new(),
147 }
148 }
149
150 /// Create a namespace from segments
151 #[must_use]
152 pub const fn new(segments: Vec<NamespaceSegment>) -> Self {
153 Self { segments }
154 }
155
156 /// Create a child namespace by appending a new segment
157 ///
158 /// # Arguments
159 ///
160 /// * `node_name` - The node name for this nesting level
161 /// * `invocation_id` - The unique invocation identifier (typically UUID v4)
162 #[must_use]
163 pub fn child(&self, node_name: &str, invocation_id: &str) -> Self {
164 let mut segments = self.segments.clone();
165 segments.push(NamespaceSegment {
166 node_name: node_name.to_string(),
167 invocation_id: invocation_id.to_string(),
168 });
169 Self { segments }
170 }
171
172 /// Get the parent namespace by removing the last segment
173 ///
174 /// Returns `None` if this is already the root namespace.
175 #[must_use]
176 pub fn parent(&self) -> Option<Self> {
177 if self.segments.is_empty() {
178 None
179 } else {
180 let segments = self.segments[..self.segments.len() - 1].to_vec();
181 Some(Self { segments })
182 }
183 }
184
185 /// Check if this is a root namespace
186 #[must_use]
187 pub const fn is_root(&self) -> bool {
188 self.segments.is_empty()
189 }
190
191 /// Convert to string representation using the design-spec wire format
192 ///
193 /// Each segment is prefixed with `|` and formatted as `|node_name:invocation_id`.
194 /// Root produces `""`.
195 #[must_use]
196 pub fn as_str(&self) -> String {
197 self.segments.iter().fold(String::new(), |mut acc, s| {
198 acc.push('|');
199 acc.push_str(&s.node_name);
200 acc.push(':');
201 acc.push_str(&s.invocation_id);
202 acc
203 })
204 }
205
206 /// Convert to string representation (alias for `as_str`)
207 #[allow(
208 clippy::should_implement_trait,
209 clippy::inherent_to_string_shadow_display,
210 reason = "required by design spec 04-027"
211 )]
212 #[must_use]
213 pub fn to_string(&self) -> String {
214 self.as_str()
215 }
216
217 /// Parse from the design-spec wire format `|name:id|name:id`
218 ///
219 /// Empty string produces root. Each segment is split on the first `:`
220 /// to extract `node_name` and `invocation_id`.
221 #[must_use]
222 pub fn parse(s: &str) -> Self {
223 if s.is_empty() {
224 return Self::root();
225 }
226 let trimmed = s.trim_start_matches('|');
227 let segments = trimmed
228 .split('|')
229 .filter_map(|seg| {
230 let (node_name, invocation_id) = seg.split_once(':')?;
231 Some(NamespaceSegment {
232 node_name: node_name.to_string(),
233 invocation_id: invocation_id.to_string(),
234 })
235 })
236 .collect();
237 Self { segments }
238 }
239}
240
241impl std::fmt::Display for CheckpointNamespace {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 write!(f, "{}", self.as_str())
244 }
245}
246
247impl From<Vec<NamespaceSegment>> for CheckpointNamespace {
248 fn from(segments: Vec<NamespaceSegment>) -> Self {
249 Self::new(segments)
250 }
251}
252
253impl From<&str> for CheckpointNamespace {
254 fn from(s: &str) -> Self {
255 Self::parse(s)
256 }
257}
258
259/// Checkpoint persistence interface
260///
261/// Trait defining operations for saving, loading, and listing checkpoints.
262/// This trait is implemented by storage backends in the `juncture-checkpoint` crate.
263///
264/// # Example
265///
266/// ```ignore
267/// use juncture_core::CheckpointSaver;
268/// use juncture_checkpoint::MemorySaver;
269///
270/// let saver = MemorySaver::new();
271/// // Use saver as a CheckpointSaver trait object...
272/// ```
273#[async_trait]
274pub trait CheckpointSaver: Send + Sync + 'static {
275 /// Get checkpoint tuple by configuration
276 ///
277 /// # Errors
278 ///
279 /// Returns [`CheckpointError`] if retrieval fails.
280 async fn get_tuple(
281 &self,
282 config: &RunnableConfig,
283 ) -> Result<Option<CheckpointTuple>, CheckpointError>;
284
285 /// List checkpoints with optional filtering
286 ///
287 /// # Errors
288 ///
289 /// Returns [`CheckpointError`] if listing fails.
290 async fn list(
291 &self,
292 config: &RunnableConfig,
293 filter: Option<CheckpointFilter>,
294 ) -> Result<Vec<CheckpointTuple>, CheckpointError>;
295
296 /// Save a checkpoint
297 ///
298 /// # Errors
299 ///
300 /// Returns [`CheckpointError`] if saving fails.
301 async fn put(
302 &self,
303 config: &RunnableConfig,
304 checkpoint: Checkpoint,
305 metadata: CheckpointMetadata,
306 ) -> Result<RunnableConfig, CheckpointError>;
307
308 /// Save incremental writes from a completed task
309 ///
310 /// # Errors
311 ///
312 /// Returns [`CheckpointError`] if saving fails.
313 async fn put_writes(
314 &self,
315 config: &RunnableConfig,
316 writes: Vec<PendingWrite>,
317 task_id: &str,
318 ) -> Result<(), CheckpointError>;
319}
320
321/// Complete checkpoint state
322///
323/// Captures the entire state of a graph execution at a specific point in time,
324/// including channel values, versions, pending tasks, and metadata.
325#[derive(Clone, Debug, Serialize, Deserialize)]
326pub struct Checkpoint {
327 /// Unique checkpoint identifier (UUID v6, time-ordered)
328 pub id: String,
329
330 /// Serialized channel values (JSON or `MessagePack`)
331 pub channel_values: serde_json::Value,
332
333 /// Version number for each channel
334 ///
335 /// Keys are channel names, values are monotonically increasing version numbers.
336 pub channel_versions: HashMap<String, u64>,
337
338 /// Versions of channels each node has consumed
339 ///
340 /// Outer key is node name, inner key is channel name, value is version consumed.
341 pub versions_seen: HashMap<String, HashMap<String, u64>>,
342
343 /// Tasks pending execution in the next superstep
344 pub pending_tasks: Vec<CheckpointPendingTask>,
345
346 /// Pending Send operations awaiting delivery
347 pub pending_sends: Vec<SerializedSend>,
348
349 /// Interrupt signals captured when execution was interrupted
350 ///
351 /// Populated when checkpoint source is `CheckpointSource::Interrupt`.
352 /// Used for ID-based resume to match incoming resume values.
353 #[serde(default)]
354 pub pending_interrupts: Vec<crate::interrupt::InterruptSignal>,
355
356 /// State schema version for migration support
357 pub schema_version: u32,
358
359 /// ISO 8601 timestamp of checkpoint creation
360 pub created_at: String,
361
362 /// Checkpoint format version
363 ///
364 /// Used for forward compatibility when Checkpoint structure changes.
365 pub v: u32,
366
367 /// Channels updated in this checkpoint
368 ///
369 /// Keys are channel names, values are the new version numbers.
370 pub new_versions: HashMap<String, u64>,
371
372 /// Delta counters since last full snapshot
373 ///
374 /// Keys are channel names, values track changes since last complete snapshot.
375 pub counters_since_delta_snapshot: HashMap<String, DeltaCounters>,
376}
377
378/// Delta tracking counters for a channel
379///
380/// Tracks incremental changes since the last complete snapshot,
381/// enabling efficient `DeltaChannel` optimization.
382#[derive(Clone, Debug, Default, Serialize, Deserialize)]
383pub struct DeltaCounters {
384 /// Number of updates since last snapshot
385 pub updates: u64,
386
387 /// Number of supersteps since last snapshot
388 pub supersteps: u64,
389}
390
391impl DeltaCounters {
392 /// Create a new delta counter with zero values.
393 #[must_use]
394 pub const fn new() -> Self {
395 Self {
396 updates: 0,
397 supersteps: 0,
398 }
399 }
400
401 /// Check if this channel's update count exceeds the given snapshot frequency.
402 ///
403 /// Returns `true` when a full snapshot should be taken instead of a delta.
404 /// A frequency of zero is treated as "always snapshot" (snapshot on every write).
405 #[must_use]
406 pub fn exceeds_frequency(&self, snapshot_frequency: usize) -> bool {
407 if snapshot_frequency == 0 {
408 return true;
409 }
410 usize::try_from(self.updates).unwrap_or(usize::MAX) >= snapshot_frequency
411 }
412}
413
414/// Checkpoint metadata
415///
416/// Provides context about how and when a checkpoint was created.
417#[derive(Clone, Debug, Serialize, Deserialize)]
418pub struct CheckpointMetadata {
419 /// Source of the checkpoint creation
420 pub source: CheckpointSource,
421
422 /// Superstep sequence number
423 pub step: i64,
424
425 /// Summary of writes from each node in this superstep
426 pub writes: HashMap<String, serde_json::Value>,
427
428 /// Parent checkpoint relationships
429 ///
430 /// Maps namespace to parent `checkpoint_id`.
431 pub parents: HashMap<String, String>,
432
433 /// Unique identifier for this execution run
434 pub run_id: String,
435}
436
437/// Source of checkpoint creation
438///
439/// Indicates what triggered the checkpoint to be created.
440#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
441#[non_exhaustive]
442pub enum CheckpointSource {
443 /// Initial state when graph execution begins
444 Input,
445
446 /// End of each superstep loop iteration
447 Loop,
448
449 /// External state update via `update_state()`
450 Update,
451
452 /// Fork from a historical checkpoint
453 Fork,
454
455 /// Interrupt triggered by human-in-the-loop interaction
456 Interrupt { node: String },
457}
458
459/// Complete checkpoint tuple with all context
460///
461/// Combines checkpoint data with its metadata, configuration, and pending writes.
462/// This is the primary structure returned from checkpoint storage for recovery.
463#[derive(Clone, Debug)]
464pub struct CheckpointTuple {
465 /// Configuration containing `thread_id`, `checkpoint_id`, and `checkpoint_ns`
466 pub config: RunnableConfig,
467
468 /// The checkpoint itself
469 pub checkpoint: Checkpoint,
470
471 /// Checkpoint metadata
472 pub metadata: CheckpointMetadata,
473
474 /// Incremental writes since this checkpoint
475 ///
476 /// Used for crash recovery: these writes completed after the checkpoint
477 /// and before the next checkpoint, so they don't need to be re-executed.
478 pub pending_writes: Vec<PendingWrite>,
479
480 /// Parent checkpoint configuration
481 ///
482 /// Used for time-travel navigation.
483 pub parent_config: Option<RunnableConfig>,
484}
485
486/// Pending write from a completed task
487///
488/// Represents a channel write that completed after checkpoint creation
489/// but before the next checkpoint.
490#[derive(Clone, Debug, Serialize, Deserialize)]
491pub struct PendingWrite {
492 /// ID of the task that produced this write
493 pub task_id: String,
494
495 /// Target channel name
496 pub channel: String,
497
498 /// Serialized value to write
499 pub value: serde_json::Value,
500}
501
502/// Pending task in checkpoint
503///
504/// Represents a task scheduled for execution in the next superstep.
505#[derive(Clone, Debug, Serialize, Deserialize)]
506pub struct CheckpointPendingTask {
507 /// Unique task identifier (UUID)
508 pub id: String,
509
510 /// Target node name to execute
511 pub node: String,
512
513 /// Channels that triggered this task
514 pub triggers: Vec<String>,
515
516 /// Optional state override (used in Send API scenarios)
517 pub state_override: Option<serde_json::Value>,
518}
519
520/// Serialized Send operation
521///
522/// Represents a Send object flowing through the `__pregel_tasks` channel.
523#[derive(Clone, Debug, Serialize, Deserialize)]
524pub struct SerializedSend {
525 /// Destination node name
526 pub node: String,
527
528 /// Serialized state override
529 pub state: serde_json::Value,
530}
531
532/// Delta operation type
533///
534/// Defines how to apply delta values to a channel.
535#[derive(Clone, Debug, Serialize, Deserialize)]
536pub enum DeltaOp {
537 /// Append values to existing channel data
538 Append,
539
540 /// Replace entire channel data
541 Replace,
542}
543
544/// Checkpoint listing filter
545///
546/// Used to query checkpoint history with specific criteria.
547#[derive(Clone, Debug, Default)]
548pub struct CheckpointFilter {
549 /// Filter by checkpoint source
550 pub source: Option<CheckpointSource>,
551
552 /// Minimum step number (inclusive)
553 pub step_gte: Option<i64>,
554
555 /// Maximum step number (inclusive)
556 pub step_lte: Option<i64>,
557
558 /// Only checkpoints before this `checkpoint_id`
559 pub before: Option<String>,
560
561 /// Only checkpoints after this `checkpoint_id`
562 pub after: Option<String>,
563
564 /// Maximum number of checkpoints to return
565 pub limit: Option<usize>,
566}
567
568/// State snapshot at a specific checkpoint
569///
570/// Represents the deserialized, fully-hydrated execution state at a checkpoint.
571///
572/// # Type Parameters
573///
574/// * `S` - State type implementing the [`crate::State`] trait
575#[derive(Clone, Debug)]
576pub struct StateSnapshot<S: crate::State> {
577 /// The complete state values
578 pub values: S,
579
580 /// Next nodes to execute
581 pub next: Vec<String>,
582
583 /// Configuration with `checkpoint_id` for time-travel
584 pub config: RunnableConfig,
585
586 /// Checkpoint metadata
587 pub metadata: CheckpointMetadata,
588
589 /// ISO 8601 creation timestamp
590 pub created_at: String,
591
592 /// Parent checkpoint configuration
593 pub parent_config: Option<RunnableConfig>,
594
595 /// Task information for current superstep
596 pub tasks: Vec<PregelTaskInfo>,
597}
598
599/// Pregel task information
600///
601/// Provides execution status for tasks in a superstep.
602#[derive(Clone, Debug, Serialize, Deserialize)]
603pub struct PregelTaskInfo {
604 /// Task identifier
605 pub id: String,
606
607 /// Node name being executed
608 pub node_name: String,
609
610 /// Error if task failed
611 pub error: Option<String>,
612
613 /// Interrupt values if task was interrupted
614 pub interrupts: Vec<serde_json::Value>,
615}
616
617/// Generate a new time-ordered checkpoint ID using UUID v6.
618///
619/// UUID v6 reorders the timestamp bits from UUID v1 for lexicographic
620/// sortability, making checkpoint IDs suitable for range queries and
621/// time-ordered iteration without a separate timestamp column.
622///
623/// The node ID is derived from random bytes to ensure uniqueness across
624/// processes without requiring a persistent MAC address.
625///
626/// # Panics
627///
628/// Will not panic under normal circumstances. The uuid crate handles
629/// timestamp generation internally using a shared atomic context.
630#[must_use]
631pub fn generate_checkpoint_id() -> String {
632 // Random 6-byte node ID avoids the need for a persistent IEEE 802
633 // MAC address while still guaranteeing global uniqueness when
634 // combined with the timestamp and monotonic counter.
635 let node_id: [u8; 6] = rand::random();
636 uuid::Uuid::now_v6(&node_id).to_string()
637}
638
639// Rust guideline compliant 2026-05-21