1use anyhow::{Context, Result};
2use duckdb::Connection;
3
4pub fn run(conn: &Connection) -> Result<()> {
5 conn.execute_batch(
7 r#"
8 CREATE TABLE IF NOT EXISTS schema_migrations (
9 version INTEGER PRIMARY KEY,
10 applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
11 );
12 "#,
13 )
14 .context("creating schema_migrations table")?;
15
16 let current = current_version(conn)?;
17 let mut migrations_applied = false;
18
19 if current < 1 {
20 apply_v1(conn)?;
21 set_version(conn, 1)?;
22 migrations_applied = true;
23 }
24
25 if current < 2 {
26 apply_v2(conn)?;
27 set_version(conn, 2)?;
28 migrations_applied = true;
29 }
30
31 if current < 3 {
32 apply_v3(conn)?;
33 set_version(conn, 3)?;
34 migrations_applied = true;
35 }
36
37 if current < 4 {
38 apply_v4(conn)?;
39 set_version(conn, 4)?;
40 migrations_applied = true;
41 }
42
43 if current < 5 {
44 apply_v5(conn)?;
45 set_version(conn, 5)?;
46 migrations_applied = true;
47 }
48
49 if current < 6 {
50 apply_v6(conn)?;
51 set_version(conn, 6)?;
52 migrations_applied = true;
53 }
54
55 if current < 7 {
56 apply_v7(conn)?;
57 set_version(conn, 7)?;
58 migrations_applied = true;
59 }
60
61 if current < 8 {
62 apply_v8(conn)?;
63 set_version(conn, 8)?;
64 migrations_applied = true;
65 }
66
67 if migrations_applied {
72 conn.execute_batch("FORCE CHECKPOINT;")
73 .context("forcing checkpoint after migrations")?;
74 }
75
76 Ok(())
77}
78
79fn current_version(conn: &Connection) -> Result<i64> {
80 let mut stmt = conn.prepare("SELECT COALESCE(MAX(version), 0) FROM schema_migrations")?;
81 let v: i64 = stmt.query_row([], |row| row.get(0))?;
82 Ok(v)
83}
84
85fn set_version(conn: &Connection, v: i64) -> Result<()> {
86 conn.execute("INSERT INTO schema_migrations (version) VALUES (?)", [v])?;
87 Ok(())
88}
89
90fn apply_v1(conn: &Connection) -> Result<()> {
91 conn.execute_batch(
93 r#"
94 -- Sequences for surrogate keys
95 CREATE SEQUENCE IF NOT EXISTS messages_id_seq START 1;
96 CREATE SEQUENCE IF NOT EXISTS memory_vectors_id_seq START 1;
97 CREATE SEQUENCE IF NOT EXISTS tool_log_id_seq START 1;
98
99 CREATE TABLE IF NOT EXISTS messages (
100 id BIGINT PRIMARY KEY DEFAULT nextval('messages_id_seq'),
101 session_id TEXT NOT NULL,
102 role TEXT NOT NULL,
103 content TEXT NOT NULL,
104 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
105 );
106
107 CREATE TABLE IF NOT EXISTS memory_vectors (
108 id BIGINT PRIMARY KEY DEFAULT nextval('memory_vectors_id_seq'),
109 session_id TEXT NOT NULL,
110 message_id BIGINT,
111 embedding TEXT NOT NULL,
112 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
113 FOREIGN KEY (message_id) REFERENCES messages(id)
114 );
115
116 CREATE TABLE IF NOT EXISTS tool_log (
117 id BIGINT PRIMARY KEY DEFAULT nextval('tool_log_id_seq'),
118 tool_name TEXT NOT NULL,
119 arguments TEXT NOT NULL,
120 result TEXT NOT NULL,
121 success BOOLEAN NOT NULL,
122 error TEXT,
123 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
124 );
125
126 CREATE TABLE IF NOT EXISTS policy_cache (
127 key TEXT PRIMARY KEY,
128 value TEXT NOT NULL,
129 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
130 );
131
132 CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
133 CREATE INDEX IF NOT EXISTS idx_memory_vectors_session ON memory_vectors(session_id);
134 "#,
135 )
136 .context("applying v1 schema")?;
137
138 Ok(())
139}
140
141fn apply_v2(conn: &Connection) -> Result<()> {
142 conn.execute_batch(
143 r#"
144 ALTER TABLE tool_log ADD COLUMN session_id TEXT;
145 ALTER TABLE tool_log ADD COLUMN agent TEXT;
146 ALTER TABLE tool_log ADD COLUMN run_id TEXT;
147 UPDATE tool_log SET session_id = COALESCE(session_id, ''), agent = COALESCE(agent, ''), run_id = COALESCE(run_id, '');
148 "#,
149 )
150 .context("applying v2 schema (tool telemetry columns)")
151}
152
153fn apply_v3(conn: &Connection) -> Result<()> {
154 conn.execute_batch(
156 r#"
157 -- Install DuckPGQ extension for graph capabilities
158 -- Note: DuckPGQ requires DuckDB v1.1.3+
159 -- For now, we'll create the tables without the extension
160 -- Users can manually install DuckPGQ when available
161
162 -- Sequences for graph tables
163 CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
164 CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
165 CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
166
167 -- Graph nodes table
168 CREATE TABLE IF NOT EXISTS graph_nodes (
169 id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
170 session_id TEXT NOT NULL,
171 node_type TEXT NOT NULL, -- 'entity', 'concept', 'fact', 'message', 'tool_result'
172 label TEXT NOT NULL, -- semantic label (Person, Location, Action, etc.)
173 properties TEXT NOT NULL, -- JSON properties specific to node type
174 embedding_id BIGINT, -- FK to memory_vectors for semantic search
175 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
176 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
177 FOREIGN KEY (embedding_id) REFERENCES memory_vectors(id)
178 );
179
180 -- Graph edges table
181 CREATE TABLE IF NOT EXISTS graph_edges (
182 id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
183 session_id TEXT NOT NULL,
184 source_id BIGINT NOT NULL,
185 target_id BIGINT NOT NULL,
186 edge_type TEXT NOT NULL, -- 'RELATES_TO', 'CAUSED_BY', 'PART_OF', 'MENTIONS', etc.
187 predicate TEXT, -- RDF-style predicate for triple store
188 properties TEXT, -- JSON for edge metadata
189 weight REAL DEFAULT 1.0, -- for weighted graphs
190 temporal_start TIMESTAMP, -- for temporal graphs
191 temporal_end TIMESTAMP,
192 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
193 FOREIGN KEY (source_id) REFERENCES graph_nodes(id),
194 FOREIGN KEY (target_id) REFERENCES graph_nodes(id)
195 );
196
197 -- Graph metadata table
198 CREATE TABLE IF NOT EXISTS graph_metadata (
199 id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
200 session_id TEXT NOT NULL,
201 graph_name TEXT NOT NULL,
202 is_created BOOLEAN DEFAULT FALSE, -- Track if DuckPGQ graph object exists
203 schema_version INTEGER DEFAULT 1,
204 config TEXT, -- JSON config for graph-specific settings
205 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
206 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
207 UNIQUE(session_id, graph_name)
208 );
209
210 -- Create indexes for performance
211 CREATE INDEX IF NOT EXISTS idx_graph_nodes_session ON graph_nodes(session_id);
212 CREATE INDEX IF NOT EXISTS idx_graph_nodes_type ON graph_nodes(node_type);
213 CREATE INDEX IF NOT EXISTS idx_graph_nodes_label ON graph_nodes(label);
214 CREATE INDEX IF NOT EXISTS idx_graph_nodes_embedding ON graph_nodes(embedding_id);
215
216 CREATE INDEX IF NOT EXISTS idx_graph_edges_session ON graph_edges(session_id);
217 CREATE INDEX IF NOT EXISTS idx_graph_edges_source ON graph_edges(source_id);
218 CREATE INDEX IF NOT EXISTS idx_graph_edges_target ON graph_edges(target_id);
219 CREATE INDEX IF NOT EXISTS idx_graph_edges_type ON graph_edges(edge_type);
220 CREATE INDEX IF NOT EXISTS idx_graph_edges_temporal ON graph_edges(temporal_start, temporal_end);
221 "#,
222 )
223 .context("applying v3 schema (knowledge graph tables)")
224}
225
226fn apply_v4(conn: &Connection) -> Result<()> {
227 conn.execute_batch(
229 r#"
230 -- Sequence for transcriptions table
231 CREATE SEQUENCE IF NOT EXISTS transcriptions_id_seq START 1;
232
233 -- Transcriptions table
234 CREATE TABLE IF NOT EXISTS transcriptions (
235 id BIGINT PRIMARY KEY DEFAULT nextval('transcriptions_id_seq'),
236 session_id TEXT NOT NULL,
237 chunk_id INTEGER NOT NULL,
238 text TEXT NOT NULL,
239 timestamp TIMESTAMP NOT NULL,
240 embedding_id BIGINT,
241 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
242 FOREIGN KEY (embedding_id) REFERENCES memory_vectors(id)
243 );
244
245 -- Create indexes for performance
246 CREATE INDEX IF NOT EXISTS idx_transcriptions_session ON transcriptions(session_id);
247 CREATE INDEX IF NOT EXISTS idx_transcriptions_session_chunk ON transcriptions(session_id, chunk_id);
248 CREATE INDEX IF NOT EXISTS idx_transcriptions_embedding ON transcriptions(embedding_id);
249 "#,
250 )
251 .context("applying v4 schema (transcriptions table)")
252}
253
254fn apply_v5(conn: &Connection) -> Result<()> {
255 conn.execute_batch(
256 r#"
257 CREATE SEQUENCE IF NOT EXISTS tokenized_files_id_seq START 1;
258
259 CREATE TABLE IF NOT EXISTS tokenized_files (
260 id BIGINT PRIMARY KEY DEFAULT nextval('tokenized_files_id_seq'),
261 session_id TEXT NOT NULL,
262 path TEXT NOT NULL,
263 file_hash TEXT NOT NULL,
264 raw_tokens INTEGER NOT NULL,
265 cleaned_tokens INTEGER NOT NULL,
266 bytes_captured INTEGER NOT NULL,
267 truncated BOOLEAN DEFAULT FALSE,
268 embedding_id BIGINT,
269 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
270 UNIQUE(session_id, path),
271 FOREIGN KEY (embedding_id) REFERENCES memory_vectors(id)
272 );
273
274 CREATE INDEX IF NOT EXISTS idx_tokenized_files_session ON tokenized_files(session_id);
275 CREATE INDEX IF NOT EXISTS idx_tokenized_files_hash ON tokenized_files(file_hash);
276 "#,
277 )
278 .context("applying v5 schema (tokenized file cache)")
279}
280
281fn apply_v6(conn: &Connection) -> Result<()> {
282 conn.execute_batch(
283 r#"
284 CREATE SEQUENCE IF NOT EXISTS mesh_messages_id_seq START 1;
285
286 -- Service registry for mesh instances
287 CREATE TABLE IF NOT EXISTS mesh_registry (
288 instance_id TEXT PRIMARY KEY,
289 hostname TEXT NOT NULL,
290 port INTEGER NOT NULL,
291 capabilities TEXT, -- JSON array of capabilities
292 is_leader BOOLEAN DEFAULT FALSE,
293 last_heartbeat TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
294 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
295 );
296
297 -- Inter-agent messaging
298 CREATE TABLE IF NOT EXISTS mesh_messages (
299 id BIGINT PRIMARY KEY DEFAULT nextval('mesh_messages_id_seq'),
300 source_instance TEXT NOT NULL,
301 target_instance TEXT,
302 message_type TEXT NOT NULL,
303 payload TEXT, -- JSON payload
304 status TEXT DEFAULT 'pending', -- pending, delivered, failed
305 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
306 delivered_at TIMESTAMP,
307 FOREIGN KEY (source_instance) REFERENCES mesh_registry(instance_id),
308 FOREIGN KEY (target_instance) REFERENCES mesh_registry(instance_id)
309 );
310
311 -- Distributed consensus/locking
312 CREATE TABLE IF NOT EXISTS mesh_consensus (
313 resource TEXT PRIMARY KEY,
314 owner_instance TEXT NOT NULL,
315 acquired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
316 expires_at TIMESTAMP NOT NULL,
317 version INTEGER DEFAULT 1,
318 FOREIGN KEY (owner_instance) REFERENCES mesh_registry(instance_id)
319 );
320
321 -- Indexes for efficient queries
322 CREATE INDEX IF NOT EXISTS idx_mesh_registry_leader ON mesh_registry(is_leader);
323 CREATE INDEX IF NOT EXISTS idx_mesh_messages_target ON mesh_messages(target_instance, status);
324 CREATE INDEX IF NOT EXISTS idx_mesh_messages_created ON mesh_messages(created_at);
325 CREATE INDEX IF NOT EXISTS idx_mesh_consensus_expires ON mesh_consensus(expires_at);
326 "#,
327 )
328 .context("applying v6 schema (mesh networking)")
329}
330
331fn apply_v7(conn: &Connection) -> Result<()> {
332 conn.execute_batch(
334 r#"
335 -- Add sync metadata columns to graph_nodes
336 ALTER TABLE graph_nodes ADD COLUMN vector_clock TEXT DEFAULT '{}';
337 ALTER TABLE graph_nodes ADD COLUMN last_modified_by TEXT;
338 ALTER TABLE graph_nodes ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
339 ALTER TABLE graph_nodes ADD COLUMN sync_enabled BOOLEAN DEFAULT FALSE;
340
341 -- Add sync metadata columns to graph_edges
342 ALTER TABLE graph_edges ADD COLUMN vector_clock TEXT DEFAULT '{}';
343 ALTER TABLE graph_edges ADD COLUMN last_modified_by TEXT;
344 ALTER TABLE graph_edges ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
345 ALTER TABLE graph_edges ADD COLUMN sync_enabled BOOLEAN DEFAULT FALSE;
346
347 -- Add sync toggle to graph_metadata for graph-level opt-in
348 ALTER TABLE graph_metadata ADD COLUMN sync_enabled BOOLEAN DEFAULT FALSE;
349
350 -- Create sequence for changelog
351 CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
352
353 -- Change log for incremental sync
354 CREATE TABLE IF NOT EXISTS graph_changelog (
355 id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
356 session_id TEXT NOT NULL,
357 instance_id TEXT NOT NULL,
358 entity_type TEXT NOT NULL, -- 'node' or 'edge'
359 entity_id BIGINT NOT NULL,
360 operation TEXT NOT NULL, -- 'create', 'update', 'delete'
361 vector_clock TEXT NOT NULL, -- JSON map of instance_id -> version
362 data TEXT, -- Full entity JSON snapshot
363 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
364 FOREIGN KEY (instance_id) REFERENCES mesh_registry(instance_id)
365 );
366
367 -- Sync state tracking: per-instance vector clocks for each session/graph
368 CREATE TABLE IF NOT EXISTS graph_sync_state (
369 instance_id TEXT NOT NULL,
370 session_id TEXT NOT NULL,
371 graph_name TEXT NOT NULL,
372 vector_clock TEXT NOT NULL, -- JSON map of instance_id -> version
373 last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
374 PRIMARY KEY (instance_id, session_id, graph_name),
375 FOREIGN KEY (instance_id) REFERENCES mesh_registry(instance_id)
376 );
377
378 -- Indexes for sync operations
379 CREATE INDEX IF NOT EXISTS idx_graph_nodes_sync ON graph_nodes(sync_enabled, session_id);
380 CREATE INDEX IF NOT EXISTS idx_graph_nodes_deleted ON graph_nodes(is_deleted);
381 CREATE INDEX IF NOT EXISTS idx_graph_nodes_modified ON graph_nodes(last_modified_by);
382
383 CREATE INDEX IF NOT EXISTS idx_graph_edges_sync ON graph_edges(sync_enabled, session_id);
384 CREATE INDEX IF NOT EXISTS idx_graph_edges_deleted ON graph_edges(is_deleted);
385 CREATE INDEX IF NOT EXISTS idx_graph_edges_modified ON graph_edges(last_modified_by);
386
387 CREATE INDEX IF NOT EXISTS idx_graph_changelog_session ON graph_changelog(session_id, created_at);
388 CREATE INDEX IF NOT EXISTS idx_graph_changelog_instance ON graph_changelog(instance_id, created_at);
389 CREATE INDEX IF NOT EXISTS idx_graph_changelog_entity ON graph_changelog(entity_type, entity_id);
390
391 CREATE INDEX IF NOT EXISTS idx_graph_sync_state_session ON graph_sync_state(session_id, graph_name);
392 "#,
393 )
394 .context("applying v7 schema (graph synchronization)")
395}
396
397fn apply_v8(conn: &Connection) -> Result<()> {
398 let column_exists: bool = conn
403 .query_row(
404 "SELECT COUNT(*) > 0 FROM information_schema.columns
405 WHERE table_name = 'mesh_messages' AND column_name = 'message_id'",
406 [],
407 |row| row.get(0),
408 )
409 .unwrap_or(false);
410
411 if !column_exists {
412 conn.execute_batch(
415 r#"
416 -- Drop the old table (messages are transient, this is safe)
417 DROP TABLE IF EXISTS mesh_messages;
418
419 -- Recreate with message_id column
420 CREATE TABLE mesh_messages (
421 id BIGINT PRIMARY KEY DEFAULT nextval('mesh_messages_id_seq'),
422 message_id TEXT UNIQUE NOT NULL,
423 source_instance TEXT NOT NULL,
424 target_instance TEXT,
425 message_type TEXT NOT NULL,
426 payload TEXT,
427 status TEXT DEFAULT 'pending',
428 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
429 delivered_at TIMESTAMP,
430 FOREIGN KEY (source_instance) REFERENCES mesh_registry(instance_id),
431 FOREIGN KEY (target_instance) REFERENCES mesh_registry(instance_id)
432 );
433 "#,
434 )
435 .context("recreating mesh_messages with message_id column")?;
436 }
437
438 conn.execute_batch(
440 r#"
441 CREATE UNIQUE INDEX IF NOT EXISTS idx_mesh_messages_message_id_unique ON mesh_messages(message_id);
442 CREATE INDEX IF NOT EXISTS idx_mesh_messages_status ON mesh_messages(status);
443 "#,
444 )
445 .context("adding indexes to mesh_messages")?;
446
447 Ok(())
448}