Skip to main content

langgraph_checkpoint_sqlite_rs/
queries.rs

1//! SQL schema and statement constants for the SQLite checkpoint saver.
2//!
3//! SQLite-specific notes:
4//! - Uses `TEXT` for JSON columns (SQLite has no native JSONB; `json_extract()`
5//!   handles filtering).
6//! - Uses `BLOB` for binary payloads (Postgres `BYTEA`).
7//! - Bind parameters use `?` placeholders.
8//! - WAL mode is enabled at connection setup for concurrent reads.
9
10/// SQL migrations for the checkpoint schema. Applied in order; each
11/// migration's index is recorded in `checkpoint_migrations.v`.
12pub const MIGRATIONS: &[&str] = &[
13    // 0: bootstrap migrations table
14    "CREATE TABLE IF NOT EXISTS checkpoint_migrations (v INTEGER PRIMARY KEY);",
15    // 1: checkpoints table
16    "CREATE TABLE IF NOT EXISTS checkpoints (
17        thread_id TEXT NOT NULL,
18        checkpoint_ns TEXT NOT NULL DEFAULT '',
19        checkpoint_id TEXT NOT NULL,
20        parent_checkpoint_id TEXT,
21        type TEXT,
22        checkpoint TEXT NOT NULL,
23        metadata TEXT NOT NULL DEFAULT '{}',
24        PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
25    );",
26    // 2: checkpoint blobs (channel value storage)
27    "CREATE TABLE IF NOT EXISTS checkpoint_blobs (
28        thread_id TEXT NOT NULL,
29        checkpoint_ns TEXT NOT NULL DEFAULT '',
30        channel TEXT NOT NULL,
31        version TEXT NOT NULL,
32        type TEXT NOT NULL,
33        blob BLOB,
34        PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
35    );",
36    // 3: checkpoint writes (pending task outputs)
37    "CREATE TABLE IF NOT EXISTS checkpoint_writes (
38        thread_id TEXT NOT NULL,
39        checkpoint_ns TEXT NOT NULL DEFAULT '',
40        checkpoint_id TEXT NOT NULL,
41        task_id TEXT NOT NULL,
42        task_path TEXT NOT NULL DEFAULT '',
43        idx INTEGER NOT NULL,
44        channel TEXT NOT NULL,
45        type TEXT,
46        blob BLOB,
47        PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
48    );",
49    // 4: indexes (CONCURRENTLY is Postgres-only; SQLite uses plain CREATE INDEX)
50    "CREATE INDEX IF NOT EXISTS checkpoints_thread_id_idx ON checkpoints(thread_id);",
51    // 5
52    "CREATE INDEX IF NOT EXISTS checkpoint_blobs_thread_id_idx ON checkpoint_blobs(thread_id);",
53    // 6
54    "CREATE INDEX IF NOT EXISTS checkpoint_writes_thread_id_idx ON checkpoint_writes(thread_id);",
55];
56
57/// Select base columns for a checkpoint row. Channel values and pending
58/// writes are fetched separately (SQLite lacks `array_agg`).
59pub const SELECT_CHECKPOINT_SQL: &str = r#"
60SELECT
61    thread_id,
62    checkpoint_ns,
63    checkpoint_id,
64    parent_checkpoint_id,
65    type,
66    checkpoint,
67    metadata
68FROM checkpoints
69"#;
70
71/// Fetch all blobs (channel values) for a given checkpoint by joining
72/// `checkpoint.channel_versions` with the blobs table.
73///
74/// SQLite stores `version` as TEXT, while `je.value` from `json_each` may
75/// be a JSON number or string depending on how the checkpoint was
76/// produced. The explicit `CAST(... AS TEXT)` normalizes both sides so
77/// integer and string versions compare equal.
78pub const SELECT_BLOBS_SQL: &str = r#"
79SELECT bl.channel, bl.type, bl.blob
80FROM checkpoints cp
81CROSS JOIN json_each(json_extract(cp.checkpoint, '$.channel_versions')) je
82INNER JOIN checkpoint_blobs bl
83    ON bl.thread_id = cp.thread_id
84    AND bl.checkpoint_ns = cp.checkpoint_ns
85    AND bl.channel = je.key
86    AND bl.version = CAST(je.value AS TEXT)
87WHERE cp.thread_id = ?1
88  AND cp.checkpoint_ns = ?2
89  AND cp.checkpoint_id = ?3
90"#;
91
92/// Fetch pending writes for a given checkpoint, ordered by task and idx.
93pub const SELECT_WRITES_SQL: &str = r#"
94SELECT task_id, channel, type, blob, idx, task_path
95FROM checkpoint_writes
96WHERE thread_id = ?1 AND checkpoint_ns = ?2 AND checkpoint_id = ?3
97ORDER BY task_path ASC, task_id ASC, idx ASC
98"#;
99
100/// Upsert checkpoint blobs. Blobs are immutable per (channel, version),
101/// so on conflict we keep the existing row.
102pub const UPSERT_CHECKPOINT_BLOBS_SQL: &str = r#"
103INSERT INTO checkpoint_blobs (thread_id, checkpoint_ns, channel, version, type, blob)
104VALUES (?1, ?2, ?3, ?4, ?5, ?6)
105ON CONFLICT (thread_id, checkpoint_ns, channel, version) DO NOTHING
106"#;
107
108/// Upsert a checkpoint, replacing the JSON payload and metadata if the
109/// (thread, ns, id) tuple already exists.
110pub const UPSERT_CHECKPOINTS_SQL: &str = r#"
111INSERT INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, checkpoint, metadata)
112VALUES (?1, ?2, ?3, ?4, ?5, ?6)
113ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id) DO UPDATE SET
114    checkpoint = excluded.checkpoint,
115    metadata = excluded.metadata
116"#;
117
118/// Upsert checkpoint writes (overwrite on conflict). Used for special
119/// channels like `__error__` whose value may legitimately change.
120pub const UPSERT_CHECKPOINT_WRITES_SQL: &str = r#"
121INSERT INTO checkpoint_writes (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx, channel, type, blob)
122VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
123ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO UPDATE SET
124    channel = excluded.channel,
125    type = excluded.type,
126    blob = excluded.blob
127"#;
128
129/// Insert checkpoint writes (ignore on conflict). Default for normal
130/// per-task writes which should be append-only.
131pub const INSERT_CHECKPOINT_WRITES_SQL: &str = r#"
132INSERT INTO checkpoint_writes (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx, channel, type, blob)
133VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
134ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO NOTHING
135"#;