Skip to main content

cognis_graph/checkpoint/
mod.rs

1//! Checkpointing — persist graph state across supersteps for resume,
2//! time-travel, and human-in-the-loop interrupts.
3
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use cognis_core::Result;
9
10use crate::state::GraphState;
11
12mod in_memory;
13pub use in_memory::InMemoryCheckpointer;
14
15pub mod serializer;
16#[cfg(feature = "serializer-cbor")]
17pub use serializer::CborSerializer;
18pub use serializer::{CheckpointSerializer, JsonSerializer};
19
20#[cfg(feature = "sqlite")]
21pub mod sqlite;
22#[cfg(feature = "sqlite")]
23pub use sqlite::SqliteCheckpointer;
24
25#[cfg(feature = "postgres")]
26pub mod postgres;
27#[cfg(feature = "postgres")]
28pub use postgres::PostgresCheckpointer;
29
30/// Snapshot of one active task at an interrupt boundary. Used for
31/// point-of-interrupt resume.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ActiveSnapshot {
34    /// The node name to dispatch on resume.
35    pub node_name: String,
36    /// Per-target payload from `Goto::Send`, if any.
37    pub payload: Option<serde_json::Value>,
38}
39
40/// Trait for storing & retrieving graph state at superstep boundaries.
41#[async_trait]
42pub trait Checkpointer<S: GraphState>: Send + Sync {
43    /// Save state at `step` for `run_id`.
44    async fn save(&self, run_id: Uuid, step: u64, state: &S) -> Result<()>;
45
46    /// Load state for `run_id` at `step` (or the latest if `step` is None).
47    async fn load(&self, run_id: Uuid, step: Option<u64>) -> Result<Option<S>>;
48
49    /// List all saved step numbers for `run_id`.
50    async fn list(&self, run_id: Uuid) -> Result<Vec<u64>>;
51
52    /// Save the engine's active task snapshot alongside state. Default
53    /// is no-op — older checkpointers won't persist the active set, and
54    /// `engine::resume` falls back to the start node when `load_active`
55    /// returns empty.
56    async fn save_active(&self, run_id: Uuid, step: u64, active: &[ActiveSnapshot]) -> Result<()> {
57        let _ = (run_id, step, active);
58        Ok(())
59    }
60
61    /// Load the active task snapshot for `(run_id, step)`. Default is empty.
62    async fn load_active(&self, run_id: Uuid, step: u64) -> Result<Vec<ActiveSnapshot>> {
63        let _ = (run_id, step);
64        Ok(Vec::new())
65    }
66}