Skip to main content

langgraph_checkpoint_postgres_rs/
queries.rs

1/// SQL migrations for the checkpoint schema.
2pub const MIGRATIONS: &[&str] = &[
3    "CREATE TABLE IF NOT EXISTS checkpoint_migrations (v INTEGER PRIMARY KEY);",
4    "CREATE TABLE IF NOT EXISTS checkpoints (
5        thread_id TEXT NOT NULL,
6        checkpoint_ns TEXT NOT NULL DEFAULT '',
7        checkpoint_id TEXT NOT NULL,
8        parent_checkpoint_id TEXT,
9        type TEXT,
10        checkpoint JSONB NOT NULL,
11        metadata JSONB NOT NULL DEFAULT '{}',
12        PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
13    );",
14    "CREATE TABLE IF NOT EXISTS checkpoint_blobs (
15        thread_id TEXT NOT NULL,
16        checkpoint_ns TEXT NOT NULL DEFAULT '',
17        channel TEXT NOT NULL,
18        version TEXT NOT NULL,
19        type TEXT NOT NULL,
20        blob BYTEA,
21        PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
22    );",
23    "CREATE TABLE IF NOT EXISTS checkpoint_writes (
24        thread_id TEXT NOT NULL,
25        checkpoint_ns TEXT NOT NULL DEFAULT '',
26        checkpoint_id TEXT NOT NULL,
27        task_id TEXT NOT NULL,
28        idx INTEGER NOT NULL,
29        channel TEXT NOT NULL,
30        type TEXT,
31        blob BYTEA NOT NULL,
32        PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
33    );",
34    "ALTER TABLE checkpoint_blobs ALTER COLUMN blob DROP NOT NULL;",
35    "SELECT 1;",
36    "CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoints_thread_id_idx ON checkpoints(thread_id);",
37    "CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_blobs_thread_id_idx ON checkpoint_blobs(thread_id);",
38    "CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_writes_thread_id_idx ON checkpoint_writes(thread_id);",
39    "ALTER TABLE checkpoint_writes ADD COLUMN IF NOT EXISTS task_path TEXT NOT NULL DEFAULT '';",
40];
41
42/// Select a checkpoint with its channel values and pending writes.
43pub const SELECT_SQL: &str = r#"
44SELECT
45    thread_id,
46    checkpoint,
47    checkpoint_ns,
48    checkpoint_id,
49    parent_checkpoint_id,
50    metadata,
51    (
52        SELECT array_agg(array[bl.channel::bytea, bl.type::bytea, bl.blob])
53        FROM jsonb_each_text(checkpoint -> 'channel_versions')
54        INNER JOIN checkpoint_blobs bl
55            ON bl.thread_id = checkpoints.thread_id
56            AND bl.checkpoint_ns = checkpoints.checkpoint_ns
57            AND bl.channel = jsonb_each_text.key
58            AND bl.version = jsonb_each_text.value
59    ) AS channel_values,
60    (
61        SELECT array_agg(array[cw.task_id::text::bytea, cw.channel::bytea, cw.type::bytea, cw.blob] ORDER BY cw.task_id, cw.idx)
62        FROM checkpoint_writes cw
63        WHERE cw.thread_id = checkpoints.thread_id
64            AND cw.checkpoint_ns = checkpoints.checkpoint_ns
65            AND cw.checkpoint_id = checkpoints.checkpoint_id
66    ) AS pending_writes
67FROM checkpoints
68"#;
69
70/// Upsert checkpoint blobs.
71pub const UPSERT_CHECKPOINT_BLOBS_SQL: &str = r#"
72INSERT INTO checkpoint_blobs (thread_id, checkpoint_ns, channel, version, type, blob)
73VALUES ($1, $2, $3, $4, $5, $6)
74ON CONFLICT (thread_id, checkpoint_ns, channel, version) DO NOTHING
75"#;
76
77/// Upsert a checkpoint.
78pub const UPSERT_CHECKPOINTS_SQL: &str = r#"
79INSERT INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, checkpoint, metadata)
80VALUES ($1, $2, $3, $4, $5, $6)
81ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id)
82DO UPDATE SET
83    checkpoint = EXCLUDED.checkpoint,
84    metadata = EXCLUDED.metadata
85"#;
86
87/// Upsert checkpoint writes (overwrite on conflict).
88pub const UPSERT_CHECKPOINT_WRITES_SQL: &str = r#"
89INSERT INTO checkpoint_writes (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx, channel, type, blob)
90VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
91ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO UPDATE SET
92    channel = EXCLUDED.channel,
93    type = EXCLUDED.type,
94    blob = EXCLUDED.blob
95"#;
96
97/// Insert checkpoint writes (ignore on conflict).
98pub const INSERT_CHECKPOINT_WRITES_SQL: &str = r#"
99INSERT INTO checkpoint_writes (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx, channel, type, blob)
100VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
101ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO NOTHING
102"#;
103
104/// Select pending sends for migration.
105pub const SELECT_PENDING_SENDS_SQL: &str = r#"
106SELECT
107    checkpoint_id,
108    array_agg(array[type::bytea, blob] ORDER BY task_path, task_id, idx) AS sends
109FROM checkpoint_writes
110WHERE thread_id = $1
111    AND checkpoint_id = ANY($2)
112    AND channel = '__tasks__'
113GROUP BY checkpoint_id
114"#;