langgraph_checkpoint_postgres_rs/
queries.rs1pub const MIGRATIONS: &[&str] = &[
3 "CREATE TABLE IF NOT EXISTS checkpoint_migrations (v INTEGER PRIMARY KEY);",
4 "CREATE TABLE IF NOT EXISTS checkpoints (
5 thread_id TEXT NOT NULL,
6 checkpoint_ns TEXT NOT NULL DEFAULT '',
7 checkpoint_id TEXT NOT NULL,
8 parent_checkpoint_id TEXT,
9 type TEXT,
10 checkpoint JSONB NOT NULL,
11 metadata JSONB NOT NULL DEFAULT '{}',
12 PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
13 );",
14 "CREATE TABLE IF NOT EXISTS checkpoint_blobs (
15 thread_id TEXT NOT NULL,
16 checkpoint_ns TEXT NOT NULL DEFAULT '',
17 channel TEXT NOT NULL,
18 version TEXT NOT NULL,
19 type TEXT NOT NULL,
20 blob BYTEA,
21 PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
22 );",
23 "CREATE TABLE IF NOT EXISTS checkpoint_writes (
24 thread_id TEXT NOT NULL,
25 checkpoint_ns TEXT NOT NULL DEFAULT '',
26 checkpoint_id TEXT NOT NULL,
27 task_id TEXT NOT NULL,
28 idx INTEGER NOT NULL,
29 channel TEXT NOT NULL,
30 type TEXT,
31 blob BYTEA NOT NULL,
32 PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
33 );",
34 "ALTER TABLE checkpoint_blobs ALTER COLUMN blob DROP NOT NULL;",
35 "SELECT 1;",
36 "CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoints_thread_id_idx ON checkpoints(thread_id);",
37 "CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_blobs_thread_id_idx ON checkpoint_blobs(thread_id);",
38 "CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_writes_thread_id_idx ON checkpoint_writes(thread_id);",
39 "ALTER TABLE checkpoint_writes ADD COLUMN IF NOT EXISTS task_path TEXT NOT NULL DEFAULT '';",
40];
41
42pub const SELECT_SQL: &str = r#"
44SELECT
45 thread_id,
46 checkpoint,
47 checkpoint_ns,
48 checkpoint_id,
49 parent_checkpoint_id,
50 metadata,
51 (
52 SELECT array_agg(array[bl.channel::bytea, bl.type::bytea, bl.blob])
53 FROM jsonb_each_text(checkpoint -> 'channel_versions')
54 INNER JOIN checkpoint_blobs bl
55 ON bl.thread_id = checkpoints.thread_id
56 AND bl.checkpoint_ns = checkpoints.checkpoint_ns
57 AND bl.channel = jsonb_each_text.key
58 AND bl.version = jsonb_each_text.value
59 ) AS channel_values,
60 (
61 SELECT array_agg(array[cw.task_id::text::bytea, cw.channel::bytea, cw.type::bytea, cw.blob] ORDER BY cw.task_id, cw.idx)
62 FROM checkpoint_writes cw
63 WHERE cw.thread_id = checkpoints.thread_id
64 AND cw.checkpoint_ns = checkpoints.checkpoint_ns
65 AND cw.checkpoint_id = checkpoints.checkpoint_id
66 ) AS pending_writes
67FROM checkpoints
68"#;
69
70pub const UPSERT_CHECKPOINT_BLOBS_SQL: &str = r#"
72INSERT INTO checkpoint_blobs (thread_id, checkpoint_ns, channel, version, type, blob)
73VALUES ($1, $2, $3, $4, $5, $6)
74ON CONFLICT (thread_id, checkpoint_ns, channel, version) DO NOTHING
75"#;
76
77pub const UPSERT_CHECKPOINTS_SQL: &str = r#"
79INSERT INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, checkpoint, metadata)
80VALUES ($1, $2, $3, $4, $5, $6)
81ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id)
82DO UPDATE SET
83 checkpoint = EXCLUDED.checkpoint,
84 metadata = EXCLUDED.metadata
85"#;
86
87pub const UPSERT_CHECKPOINT_WRITES_SQL: &str = r#"
89INSERT INTO checkpoint_writes (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx, channel, type, blob)
90VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
91ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO UPDATE SET
92 channel = EXCLUDED.channel,
93 type = EXCLUDED.type,
94 blob = EXCLUDED.blob
95"#;
96
97pub const INSERT_CHECKPOINT_WRITES_SQL: &str = r#"
99INSERT INTO checkpoint_writes (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx, channel, type, blob)
100VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
101ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) DO NOTHING
102"#;
103
104pub const SELECT_PENDING_SENDS_SQL: &str = r#"
106SELECT
107 checkpoint_id,
108 array_agg(array[type::bytea, blob] ORDER BY task_path, task_id, idx) AS sends
109FROM checkpoint_writes
110WHERE thread_id = $1
111 AND checkpoint_id = ANY($2)
112 AND channel = '__tasks__'
113GROUP BY checkpoint_id
114"#;