spec_ai_config/persistence/
migrations.rs

1use anyhow::{Context, Result};
2use duckdb::Connection;
3
4pub fn run(conn: &Connection) -> Result<()> {
5    // Simple migration system: ensure a schema version table and apply migrations sequentially.
6    conn.execute_batch(
7        r#"
8        CREATE TABLE IF NOT EXISTS schema_migrations (
9            version INTEGER PRIMARY KEY,
10            applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
11        );
12        "#,
13    )
14    .context("creating schema_migrations table")?;
15
16    let current = current_version(conn)?;
17    let mut migrations_applied = false;
18
19    if current < 1 {
20        apply_v1(conn)?;
21        set_version(conn, 1)?;
22        migrations_applied = true;
23    }
24
25    if current < 2 {
26        apply_v2(conn)?;
27        set_version(conn, 2)?;
28        migrations_applied = true;
29    }
30
31    if current < 3 {
32        apply_v3(conn)?;
33        set_version(conn, 3)?;
34        migrations_applied = true;
35    }
36
37    if current < 4 {
38        apply_v4(conn)?;
39        set_version(conn, 4)?;
40        migrations_applied = true;
41    }
42
43    if current < 5 {
44        apply_v5(conn)?;
45        set_version(conn, 5)?;
46        migrations_applied = true;
47    }
48
49    if current < 6 {
50        apply_v6(conn)?;
51        set_version(conn, 6)?;
52        migrations_applied = true;
53    }
54
55    if current < 7 {
56        apply_v7(conn)?;
57        set_version(conn, 7)?;
58        migrations_applied = true;
59    }
60
61    if current < 8 {
62        apply_v8(conn)?;
63        set_version(conn, 8)?;
64        migrations_applied = true;
65    }
66
67    // Force checkpoint after migrations to ensure WAL is merged into the database file.
68    // This prevents ALTER TABLE operations from being stuck in the WAL, which can cause
69    // "no default database set" errors during WAL replay on subsequent startups.
70    // See: https://github.com/duckdb/duckdb/discussions/10200
71    if migrations_applied {
72        conn.execute_batch("FORCE CHECKPOINT;")
73            .context("forcing checkpoint after migrations")?;
74    }
75
76    Ok(())
77}
78
79fn current_version(conn: &Connection) -> Result<i64> {
80    let mut stmt = conn.prepare("SELECT COALESCE(MAX(version), 0) FROM schema_migrations")?;
81    let v: i64 = stmt.query_row([], |row| row.get(0))?;
82    Ok(v)
83}
84
85fn set_version(conn: &Connection, v: i64) -> Result<()> {
86    conn.execute("INSERT INTO schema_migrations (version) VALUES (?)", [v])?;
87    Ok(())
88}
89
90fn apply_v1(conn: &Connection) -> Result<()> {
91    // Core tables per spec: messages, memory_vectors, tool_log, policy_cache
92    conn.execute_batch(
93        r#"
94        -- Sequences for surrogate keys
95        CREATE SEQUENCE IF NOT EXISTS messages_id_seq START 1;
96        CREATE SEQUENCE IF NOT EXISTS memory_vectors_id_seq START 1;
97        CREATE SEQUENCE IF NOT EXISTS tool_log_id_seq START 1;
98
99        CREATE TABLE IF NOT EXISTS messages (
100            id BIGINT PRIMARY KEY DEFAULT nextval('messages_id_seq'),
101            session_id TEXT NOT NULL,
102            role TEXT NOT NULL,
103            content TEXT NOT NULL,
104            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
105        );
106
107        CREATE TABLE IF NOT EXISTS memory_vectors (
108            id BIGINT PRIMARY KEY DEFAULT nextval('memory_vectors_id_seq'),
109            session_id TEXT NOT NULL,
110            message_id BIGINT,
111            embedding TEXT NOT NULL,
112            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
113            FOREIGN KEY (message_id) REFERENCES messages(id)
114        );
115
116        CREATE TABLE IF NOT EXISTS tool_log (
117            id BIGINT PRIMARY KEY DEFAULT nextval('tool_log_id_seq'),
118            tool_name TEXT NOT NULL,
119            arguments TEXT NOT NULL,
120            result TEXT NOT NULL,
121            success BOOLEAN NOT NULL,
122            error TEXT,
123            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
124        );
125
126        CREATE TABLE IF NOT EXISTS policy_cache (
127            key TEXT PRIMARY KEY,
128            value TEXT NOT NULL,
129            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
130        );
131
132        CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
133        CREATE INDEX IF NOT EXISTS idx_memory_vectors_session ON memory_vectors(session_id);
134        "#,
135    )
136    .context("applying v1 schema")?;
137
138    Ok(())
139}
140
141fn apply_v2(conn: &Connection) -> Result<()> {
142    conn.execute_batch(
143        r#"
144        ALTER TABLE tool_log ADD COLUMN session_id TEXT;
145        ALTER TABLE tool_log ADD COLUMN agent TEXT;
146        ALTER TABLE tool_log ADD COLUMN run_id TEXT;
147        UPDATE tool_log SET session_id = COALESCE(session_id, ''), agent = COALESCE(agent, ''), run_id = COALESCE(run_id, '');
148        "#,
149    )
150    .context("applying v2 schema (tool telemetry columns)")
151}
152
153fn apply_v3(conn: &Connection) -> Result<()> {
154    // Knowledge graph tables with DuckPGQ support
155    conn.execute_batch(
156        r#"
157        -- Install DuckPGQ extension for graph capabilities
158        -- Note: DuckPGQ requires DuckDB v1.1.3+
159        -- For now, we'll create the tables without the extension
160        -- Users can manually install DuckPGQ when available
161
162        -- Sequences for graph tables
163        CREATE SEQUENCE IF NOT EXISTS graph_nodes_id_seq START 1;
164        CREATE SEQUENCE IF NOT EXISTS graph_edges_id_seq START 1;
165        CREATE SEQUENCE IF NOT EXISTS graph_metadata_id_seq START 1;
166
167        -- Graph nodes table
168        CREATE TABLE IF NOT EXISTS graph_nodes (
169            id BIGINT PRIMARY KEY DEFAULT nextval('graph_nodes_id_seq'),
170            session_id TEXT NOT NULL,
171            node_type TEXT NOT NULL,  -- 'entity', 'concept', 'fact', 'message', 'tool_result'
172            label TEXT NOT NULL,       -- semantic label (Person, Location, Action, etc.)
173            properties TEXT NOT NULL,  -- JSON properties specific to node type
174            embedding_id BIGINT,       -- FK to memory_vectors for semantic search
175            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
176            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
177            FOREIGN KEY (embedding_id) REFERENCES memory_vectors(id)
178        );
179
180        -- Graph edges table
181        CREATE TABLE IF NOT EXISTS graph_edges (
182            id BIGINT PRIMARY KEY DEFAULT nextval('graph_edges_id_seq'),
183            session_id TEXT NOT NULL,
184            source_id BIGINT NOT NULL,
185            target_id BIGINT NOT NULL,
186            edge_type TEXT NOT NULL,   -- 'RELATES_TO', 'CAUSED_BY', 'PART_OF', 'MENTIONS', etc.
187            predicate TEXT,            -- RDF-style predicate for triple store
188            properties TEXT,           -- JSON for edge metadata
189            weight REAL DEFAULT 1.0,   -- for weighted graphs
190            temporal_start TIMESTAMP,  -- for temporal graphs
191            temporal_end TIMESTAMP,
192            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
193            FOREIGN KEY (source_id) REFERENCES graph_nodes(id),
194            FOREIGN KEY (target_id) REFERENCES graph_nodes(id)
195        );
196
197        -- Graph metadata table
198        CREATE TABLE IF NOT EXISTS graph_metadata (
199            id BIGINT PRIMARY KEY DEFAULT nextval('graph_metadata_id_seq'),
200            session_id TEXT NOT NULL,
201            graph_name TEXT NOT NULL,
202            is_created BOOLEAN DEFAULT FALSE,  -- Track if DuckPGQ graph object exists
203            schema_version INTEGER DEFAULT 1,
204            config TEXT,  -- JSON config for graph-specific settings
205            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
206            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
207            UNIQUE(session_id, graph_name)
208        );
209
210        -- Create indexes for performance
211        CREATE INDEX IF NOT EXISTS idx_graph_nodes_session ON graph_nodes(session_id);
212        CREATE INDEX IF NOT EXISTS idx_graph_nodes_type ON graph_nodes(node_type);
213        CREATE INDEX IF NOT EXISTS idx_graph_nodes_label ON graph_nodes(label);
214        CREATE INDEX IF NOT EXISTS idx_graph_nodes_embedding ON graph_nodes(embedding_id);
215
216        CREATE INDEX IF NOT EXISTS idx_graph_edges_session ON graph_edges(session_id);
217        CREATE INDEX IF NOT EXISTS idx_graph_edges_source ON graph_edges(source_id);
218        CREATE INDEX IF NOT EXISTS idx_graph_edges_target ON graph_edges(target_id);
219        CREATE INDEX IF NOT EXISTS idx_graph_edges_type ON graph_edges(edge_type);
220        CREATE INDEX IF NOT EXISTS idx_graph_edges_temporal ON graph_edges(temporal_start, temporal_end);
221        "#,
222    )
223    .context("applying v3 schema (knowledge graph tables)")
224}
225
226fn apply_v4(conn: &Connection) -> Result<()> {
227    // Transcriptions table for audio transcription storage
228    conn.execute_batch(
229        r#"
230        -- Sequence for transcriptions table
231        CREATE SEQUENCE IF NOT EXISTS transcriptions_id_seq START 1;
232
233        -- Transcriptions table
234        CREATE TABLE IF NOT EXISTS transcriptions (
235            id BIGINT PRIMARY KEY DEFAULT nextval('transcriptions_id_seq'),
236            session_id TEXT NOT NULL,
237            chunk_id INTEGER NOT NULL,
238            text TEXT NOT NULL,
239            timestamp TIMESTAMP NOT NULL,
240            embedding_id BIGINT,
241            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
242            FOREIGN KEY (embedding_id) REFERENCES memory_vectors(id)
243        );
244
245        -- Create indexes for performance
246        CREATE INDEX IF NOT EXISTS idx_transcriptions_session ON transcriptions(session_id);
247        CREATE INDEX IF NOT EXISTS idx_transcriptions_session_chunk ON transcriptions(session_id, chunk_id);
248        CREATE INDEX IF NOT EXISTS idx_transcriptions_embedding ON transcriptions(embedding_id);
249        "#,
250    )
251    .context("applying v4 schema (transcriptions table)")
252}
253
254fn apply_v5(conn: &Connection) -> Result<()> {
255    conn.execute_batch(
256        r#"
257        CREATE SEQUENCE IF NOT EXISTS tokenized_files_id_seq START 1;
258
259        CREATE TABLE IF NOT EXISTS tokenized_files (
260            id BIGINT PRIMARY KEY DEFAULT nextval('tokenized_files_id_seq'),
261            session_id TEXT NOT NULL,
262            path TEXT NOT NULL,
263            file_hash TEXT NOT NULL,
264            raw_tokens INTEGER NOT NULL,
265            cleaned_tokens INTEGER NOT NULL,
266            bytes_captured INTEGER NOT NULL,
267            truncated BOOLEAN DEFAULT FALSE,
268            embedding_id BIGINT,
269            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
270            UNIQUE(session_id, path),
271            FOREIGN KEY (embedding_id) REFERENCES memory_vectors(id)
272        );
273
274        CREATE INDEX IF NOT EXISTS idx_tokenized_files_session ON tokenized_files(session_id);
275        CREATE INDEX IF NOT EXISTS idx_tokenized_files_hash ON tokenized_files(file_hash);
276        "#,
277    )
278    .context("applying v5 schema (tokenized file cache)")
279}
280
281fn apply_v6(conn: &Connection) -> Result<()> {
282    conn.execute_batch(
283        r#"
284        CREATE SEQUENCE IF NOT EXISTS mesh_messages_id_seq START 1;
285
286        -- Service registry for mesh instances
287        CREATE TABLE IF NOT EXISTS mesh_registry (
288            instance_id TEXT PRIMARY KEY,
289            hostname TEXT NOT NULL,
290            port INTEGER NOT NULL,
291            capabilities TEXT, -- JSON array of capabilities
292            is_leader BOOLEAN DEFAULT FALSE,
293            last_heartbeat TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
294            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
295        );
296
297        -- Inter-agent messaging
298        CREATE TABLE IF NOT EXISTS mesh_messages (
299            id BIGINT PRIMARY KEY DEFAULT nextval('mesh_messages_id_seq'),
300            source_instance TEXT NOT NULL,
301            target_instance TEXT,
302            message_type TEXT NOT NULL,
303            payload TEXT, -- JSON payload
304            status TEXT DEFAULT 'pending', -- pending, delivered, failed
305            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
306            delivered_at TIMESTAMP,
307            FOREIGN KEY (source_instance) REFERENCES mesh_registry(instance_id),
308            FOREIGN KEY (target_instance) REFERENCES mesh_registry(instance_id)
309        );
310
311        -- Distributed consensus/locking
312        CREATE TABLE IF NOT EXISTS mesh_consensus (
313            resource TEXT PRIMARY KEY,
314            owner_instance TEXT NOT NULL,
315            acquired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
316            expires_at TIMESTAMP NOT NULL,
317            version INTEGER DEFAULT 1,
318            FOREIGN KEY (owner_instance) REFERENCES mesh_registry(instance_id)
319        );
320
321        -- Indexes for efficient queries
322        CREATE INDEX IF NOT EXISTS idx_mesh_registry_leader ON mesh_registry(is_leader);
323        CREATE INDEX IF NOT EXISTS idx_mesh_messages_target ON mesh_messages(target_instance, status);
324        CREATE INDEX IF NOT EXISTS idx_mesh_messages_created ON mesh_messages(created_at);
325        CREATE INDEX IF NOT EXISTS idx_mesh_consensus_expires ON mesh_consensus(expires_at);
326        "#,
327    )
328    .context("applying v6 schema (mesh networking)")
329}
330
331fn apply_v7(conn: &Connection) -> Result<()> {
332    // Graph synchronization: Add vector clocks, change tracking, and sync state
333    conn.execute_batch(
334        r#"
335        -- Add sync metadata columns to graph_nodes
336        ALTER TABLE graph_nodes ADD COLUMN vector_clock TEXT DEFAULT '{}';
337        ALTER TABLE graph_nodes ADD COLUMN last_modified_by TEXT;
338        ALTER TABLE graph_nodes ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
339        ALTER TABLE graph_nodes ADD COLUMN sync_enabled BOOLEAN DEFAULT FALSE;
340
341        -- Add sync metadata columns to graph_edges
342        ALTER TABLE graph_edges ADD COLUMN vector_clock TEXT DEFAULT '{}';
343        ALTER TABLE graph_edges ADD COLUMN last_modified_by TEXT;
344        ALTER TABLE graph_edges ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
345        ALTER TABLE graph_edges ADD COLUMN sync_enabled BOOLEAN DEFAULT FALSE;
346
347        -- Add sync toggle to graph_metadata for graph-level opt-in
348        ALTER TABLE graph_metadata ADD COLUMN sync_enabled BOOLEAN DEFAULT FALSE;
349
350        -- Create sequence for changelog
351        CREATE SEQUENCE IF NOT EXISTS graph_changelog_id_seq START 1;
352
353        -- Change log for incremental sync
354        CREATE TABLE IF NOT EXISTS graph_changelog (
355            id BIGINT PRIMARY KEY DEFAULT nextval('graph_changelog_id_seq'),
356            session_id TEXT NOT NULL,
357            instance_id TEXT NOT NULL,
358            entity_type TEXT NOT NULL,  -- 'node' or 'edge'
359            entity_id BIGINT NOT NULL,
360            operation TEXT NOT NULL,  -- 'create', 'update', 'delete'
361            vector_clock TEXT NOT NULL,  -- JSON map of instance_id -> version
362            data TEXT,  -- Full entity JSON snapshot
363            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
364            FOREIGN KEY (instance_id) REFERENCES mesh_registry(instance_id)
365        );
366
367        -- Sync state tracking: per-instance vector clocks for each session/graph
368        CREATE TABLE IF NOT EXISTS graph_sync_state (
369            instance_id TEXT NOT NULL,
370            session_id TEXT NOT NULL,
371            graph_name TEXT NOT NULL,
372            vector_clock TEXT NOT NULL,  -- JSON map of instance_id -> version
373            last_sync_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
374            PRIMARY KEY (instance_id, session_id, graph_name),
375            FOREIGN KEY (instance_id) REFERENCES mesh_registry(instance_id)
376        );
377
378        -- Indexes for sync operations
379        CREATE INDEX IF NOT EXISTS idx_graph_nodes_sync ON graph_nodes(sync_enabled, session_id);
380        CREATE INDEX IF NOT EXISTS idx_graph_nodes_deleted ON graph_nodes(is_deleted);
381        CREATE INDEX IF NOT EXISTS idx_graph_nodes_modified ON graph_nodes(last_modified_by);
382
383        CREATE INDEX IF NOT EXISTS idx_graph_edges_sync ON graph_edges(sync_enabled, session_id);
384        CREATE INDEX IF NOT EXISTS idx_graph_edges_deleted ON graph_edges(is_deleted);
385        CREATE INDEX IF NOT EXISTS idx_graph_edges_modified ON graph_edges(last_modified_by);
386
387        CREATE INDEX IF NOT EXISTS idx_graph_changelog_session ON graph_changelog(session_id, created_at);
388        CREATE INDEX IF NOT EXISTS idx_graph_changelog_instance ON graph_changelog(instance_id, created_at);
389        CREATE INDEX IF NOT EXISTS idx_graph_changelog_entity ON graph_changelog(entity_type, entity_id);
390
391        CREATE INDEX IF NOT EXISTS idx_graph_sync_state_session ON graph_sync_state(session_id, graph_name);
392        "#,
393    )
394    .context("applying v7 schema (graph synchronization)")
395}
396
397fn apply_v8(conn: &Connection) -> Result<()> {
398    // Add message_id column to mesh_messages for UUID v7 tracking
399    // This allows consistent message IDs across instances for deduplication and correlation
400
401    // Check if message_id column already exists
402    let column_exists: bool = conn
403        .query_row(
404            "SELECT COUNT(*) > 0 FROM information_schema.columns
405             WHERE table_name = 'mesh_messages' AND column_name = 'message_id'",
406            [],
407            |row| row.get(0),
408        )
409        .unwrap_or(false);
410
411    if !column_exists {
412        // Since mesh_messages has foreign keys, we need to recreate the table
413        // Old messages are transient anyway, so we just delete and recreate
414        conn.execute_batch(
415            r#"
416            -- Drop the old table (messages are transient, this is safe)
417            DROP TABLE IF EXISTS mesh_messages;
418
419            -- Recreate with message_id column
420            CREATE TABLE mesh_messages (
421                id BIGINT PRIMARY KEY DEFAULT nextval('mesh_messages_id_seq'),
422                message_id TEXT UNIQUE NOT NULL,
423                source_instance TEXT NOT NULL,
424                target_instance TEXT,
425                message_type TEXT NOT NULL,
426                payload TEXT,
427                status TEXT DEFAULT 'pending',
428                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
429                delivered_at TIMESTAMP,
430                FOREIGN KEY (source_instance) REFERENCES mesh_registry(instance_id),
431                FOREIGN KEY (target_instance) REFERENCES mesh_registry(instance_id)
432            );
433            "#,
434        )
435        .context("recreating mesh_messages with message_id column")?;
436    }
437
438    // Add indexes (IF NOT EXISTS makes this idempotent)
439    conn.execute_batch(
440        r#"
441        CREATE UNIQUE INDEX IF NOT EXISTS idx_mesh_messages_message_id_unique ON mesh_messages(message_id);
442        CREATE INDEX IF NOT EXISTS idx_mesh_messages_status ON mesh_messages(status);
443        "#,
444    )
445    .context("adding indexes to mesh_messages")?;
446
447    Ok(())
448}