entelix_graph/checkpoint.rs
1//! Checkpoint primitives — `Checkpoint<S>`, `CheckpointId`, the
2//! addressing tuple [`ThreadKey`], and the [`Checkpointer`] trait.
3//!
4//! A checkpoint records "after running step N at node X with state S,
5//! the graph plans to execute Y next". On crash recovery, a fresh
6//! process calls `CompiledGraph::resume(ctx)` to reconstitute state
7//! and continue from the saved point.
8//!
9//! ## Multi-tenant addressing — `ThreadKey`
10//!
11//! Every persistence operation is keyed by `(tenant_id, thread_id)`.
12//! [`ThreadKey`] encodes that tuple as a single type so impls cannot
13//! "forget" to scope a query — Invariant 11 holds at the type level
14//! rather than relying on each backend to remember to add a `WHERE
15//! tenant_id = ...` clause. `ThreadKey::from_ctx(ctx)` is the
16//! canonical builder; it requires `ctx.thread_id()` to be set
17//! (`ctx.tenant_id()` is always present).
18
19use async_trait::async_trait;
20use serde::{Deserialize, Serialize};
21
22use entelix_core::error::Result;
23use entelix_core::{TenantId, ThreadKey};
24
25/// Stable identifier for a checkpoint. Backed by UUID v7 — time-ordered
26/// and globally unique across processes.
27#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
28#[serde(transparent)]
29pub struct CheckpointId(uuid::Uuid);
30
31impl CheckpointId {
32 /// Generate a fresh time-ordered id.
33 pub fn new() -> Self {
34 Self(uuid::Uuid::now_v7())
35 }
36
37 /// Reconstruct an id from a `uuid::Uuid` — used by persistence
38 /// backends that read checkpoint rows out of storage.
39 pub const fn from_uuid(uuid: uuid::Uuid) -> Self {
40 Self(uuid)
41 }
42
43 /// Borrow the underlying UUID.
44 pub const fn as_uuid(&self) -> &uuid::Uuid {
45 &self.0
46 }
47
48 /// Render as a hyphenated string.
49 pub fn to_hyphenated_string(&self) -> String {
50 self.0.to_string()
51 }
52}
53
54impl Default for CheckpointId {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60/// One snapshot of graph progress for a particular `(tenant_id,
61/// thread_id)`. `next_node = None` indicates the graph terminated
62/// cleanly (a finish point ran or a conditional edge routed to
63/// `END`).
64///
65/// `#[non_exhaustive]` so post-1.0 additions (e.g. trace-context
66/// propagation, schema-version stamping) ship as MINOR. Construct
67/// via [`Checkpoint::new`]; attach the optional parent for
68/// time-travel writes via [`Checkpoint::with_parent`].
69#[derive(Clone, Debug)]
70#[non_exhaustive]
71pub struct Checkpoint<S>
72where
73 S: Clone + Send + Sync + 'static,
74{
75 /// Unique identifier (UUID v7).
76 pub id: CheckpointId,
77 /// Tenant scope this checkpoint belongs to.
78 pub tenant_id: TenantId,
79 /// Conversation thread this checkpoint belongs to.
80 pub thread_id: String,
81 /// Optional parent — used by time-travel writes.
82 pub parent_id: Option<CheckpointId>,
83 /// Monotonic step counter within the thread.
84 pub step: usize,
85 /// State produced by the most recently executed node.
86 pub state: S,
87 /// Node the graph is poised to execute next, or `None` if it has
88 /// terminated.
89 pub next_node: Option<String>,
90 /// When the checkpoint was written.
91 pub timestamp: chrono::DateTime<chrono::Utc>,
92}
93
94impl<S> Checkpoint<S>
95where
96 S: Clone + Send + Sync + 'static,
97{
98 /// Construct a fresh checkpoint addressed by `key`. Generates a
99 /// new [`CheckpointId`] (UUID v7) and stamps `timestamp` with
100 /// the current wall clock. `parent_id` defaults to `None`;
101 /// chain [`Self::with_parent`] for time-travel writes.
102 #[must_use]
103 pub fn new(key: &ThreadKey, step: usize, state: S, next_node: Option<String>) -> Self {
104 Self {
105 id: CheckpointId::new(),
106 tenant_id: key.tenant_id().clone(),
107 thread_id: key.thread_id().to_owned(),
108 parent_id: None,
109 step,
110 state,
111 next_node,
112 timestamp: chrono::Utc::now(),
113 }
114 }
115
116 /// Attach a `parent_id` (time-travel branching). Chain after
117 /// [`Self::new`].
118 #[must_use]
119 pub const fn with_parent(mut self, parent_id: CheckpointId) -> Self {
120 self.parent_id = Some(parent_id);
121 self
122 }
123
124 /// Reconstitute a checkpoint from explicit parts. Used by
125 /// persistence backends rehydrating rows from storage — the
126 /// caller already knows every field's value (id from the row's
127 /// PK, timestamp from the column). Agent code reaches for
128 /// [`Self::new`] instead.
129 #[must_use]
130 #[allow(clippy::too_many_arguments)]
131 pub fn from_parts(
132 id: CheckpointId,
133 key: &ThreadKey,
134 parent_id: Option<CheckpointId>,
135 step: usize,
136 state: S,
137 next_node: Option<String>,
138 timestamp: chrono::DateTime<chrono::Utc>,
139 ) -> Self {
140 Self {
141 id,
142 tenant_id: key.tenant_id().to_owned(),
143 thread_id: key.thread_id().to_owned(),
144 parent_id,
145 step,
146 state,
147 next_node,
148 timestamp,
149 }
150 }
151
152 /// Borrow the addressing tuple this checkpoint belongs to.
153 #[must_use]
154 pub fn key(&self) -> ThreadKey {
155 ThreadKey::new(self.tenant_id.clone(), self.thread_id.clone())
156 }
157}
158
159/// Persistent (or in-memory) store of `Checkpoint<S>`s addressed by
160/// [`ThreadKey`].
161///
162/// Implementors must be `Send + Sync` so a single instance can serve
163/// every concurrent invocation in a multi-pod deployment. The
164/// `&ThreadKey` parameter on every read/write enforces tenant scope
165/// at the type level — Invariant 11.
166///
167/// # `S: Drop` contract
168///
169/// Implementors may evict, replace, or reallocate stored values inside
170/// internal locks. `S::drop` therefore **must not block** — no
171/// `block_on`, no synchronous IO, no lock acquisition. Spawn a
172/// detached task or use a non-blocking sink instead. See
173/// §"Amendment 2026-04-30 — State drop semantics".
174#[async_trait]
175pub trait Checkpointer<S>: Send + Sync + 'static
176where
177 S: Clone + Send + Sync + 'static,
178{
179 /// Persist a checkpoint. The checkpoint's own
180 /// `(tenant_id, thread_id)` fields define its addressing.
181 async fn put(&self, checkpoint: Checkpoint<S>) -> Result<()>;
182
183 /// Load the most recent checkpoint for `key`. Verb-family
184 /// `get` per `.claude/rules/naming.md` — single-item primary-
185 /// key (most-recent) lookup, returns `Option<Checkpoint<S>>`.
186 async fn get_latest(&self, key: &ThreadKey) -> Result<Option<Checkpoint<S>>>;
187
188 /// Look up a specific checkpoint by id within `key`'s scope.
189 /// Verb-family `get` — primary-key lookup.
190 async fn get_by_id(&self, key: &ThreadKey, id: &CheckpointId) -> Result<Option<Checkpoint<S>>>;
191
192 /// Return the thread's checkpoint history, most recent first.
193 /// `limit` caps the result size (`usize::MAX` for "all").
194 async fn list_history(&self, key: &ThreadKey, limit: usize) -> Result<Vec<Checkpoint<S>>>;
195
196 /// Time-travel write: create a fresh checkpoint that branches off
197 /// `parent_id`, replacing only the state. The new checkpoint
198 /// inherits `next_node` from its parent and records `parent_id`
199 /// so history renders branches correctly.
200 ///
201 /// Returns the new id. Returns `Error::InvalidRequest` if the
202 /// parent does not exist for the supplied `key`.
203 async fn update_state(
204 &self,
205 key: &ThreadKey,
206 parent_id: &CheckpointId,
207 new_state: S,
208 ) -> Result<CheckpointId>;
209}