Skip to main content

mnemo_core/storage/
migrations.rs

1pub const CREATE_MEMORIES_TABLE: &str = "
2CREATE TABLE IF NOT EXISTS memories (
3    id VARCHAR PRIMARY KEY,
4    agent_id VARCHAR NOT NULL,
5    content TEXT NOT NULL,
6    memory_type VARCHAR NOT NULL,
7    scope VARCHAR NOT NULL DEFAULT 'private',
8    importance FLOAT NOT NULL DEFAULT 0.5,
9    tags JSON,
10    metadata JSON,
11    embedding BLOB,
12    content_hash BLOB NOT NULL,
13    prev_hash BLOB,
14    source_type VARCHAR NOT NULL DEFAULT 'agent',
15    source_id VARCHAR,
16    consolidation_state VARCHAR NOT NULL DEFAULT 'raw',
17    access_count BIGINT NOT NULL DEFAULT 0,
18    org_id VARCHAR,
19    thread_id VARCHAR,
20    created_at VARCHAR NOT NULL,
21    updated_at VARCHAR NOT NULL,
22    last_accessed_at VARCHAR,
23    expires_at VARCHAR,
24    deleted_at VARCHAR,
25    decay_rate FLOAT,
26    created_by VARCHAR,
27    version INTEGER NOT NULL DEFAULT 1,
28    prev_version_id VARCHAR,
29    quarantined BOOLEAN NOT NULL DEFAULT false,
30    quarantine_reason VARCHAR,
31    decay_function VARCHAR
32);
33CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id);
34CREATE INDEX IF NOT EXISTS idx_memories_scope ON memories(scope);
35CREATE INDEX IF NOT EXISTS idx_memories_memory_type ON memories(memory_type);
36CREATE INDEX IF NOT EXISTS idx_memories_org_id ON memories(org_id);
37CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at);
38CREATE INDEX IF NOT EXISTS idx_memories_deleted_at ON memories(deleted_at);
39CREATE INDEX IF NOT EXISTS idx_memories_thread_id ON memories(thread_id);
40CREATE INDEX IF NOT EXISTS idx_memories_expires_at ON memories(expires_at);
41CREATE INDEX IF NOT EXISTS idx_memories_consolidation_state ON memories(consolidation_state);
42";
43
44pub const CREATE_ACLS_TABLE: &str = "
45CREATE TABLE IF NOT EXISTS acls (
46    id VARCHAR PRIMARY KEY,
47    memory_id VARCHAR NOT NULL,
48    principal_type VARCHAR NOT NULL,
49    principal_id VARCHAR NOT NULL,
50    permission VARCHAR NOT NULL,
51    granted_by VARCHAR NOT NULL,
52    created_at VARCHAR NOT NULL,
53    expires_at VARCHAR
54);
55CREATE INDEX IF NOT EXISTS idx_acls_memory_id ON acls(memory_id);
56CREATE INDEX IF NOT EXISTS idx_acls_principal ON acls(principal_type, principal_id);
57";
58
59pub const CREATE_RELATIONS_TABLE: &str = "
60CREATE TABLE IF NOT EXISTS relations (
61    id VARCHAR PRIMARY KEY,
62    source_id VARCHAR NOT NULL,
63    target_id VARCHAR NOT NULL,
64    relation_type VARCHAR NOT NULL,
65    weight FLOAT NOT NULL DEFAULT 1.0,
66    metadata JSON,
67    created_at VARCHAR NOT NULL
68);
69CREATE INDEX IF NOT EXISTS idx_relations_source ON relations(source_id);
70CREATE INDEX IF NOT EXISTS idx_relations_target ON relations(target_id);
71";
72
73// NOTE: agent_events is append-only by design. DuckDB lacks trigger support,
74// so enforcement is application-level. The PostgreSQL backend uses a
75// BEFORE UPDATE OR DELETE trigger (prevent_event_modification) to enforce
76// this at the schema level. Application code must never UPDATE or DELETE
77// rows from this table.
78pub const CREATE_AGENT_EVENTS_TABLE: &str = "
79CREATE TABLE IF NOT EXISTS agent_events (
80    id VARCHAR PRIMARY KEY,
81    agent_id VARCHAR NOT NULL,
82    thread_id VARCHAR,
83    run_id VARCHAR,
84    parent_event_id VARCHAR,
85    event_type VARCHAR NOT NULL,
86    payload JSON,
87    trace_id VARCHAR,
88    span_id VARCHAR,
89    model VARCHAR,
90    tokens_input BIGINT,
91    tokens_output BIGINT,
92    latency_ms BIGINT,
93    cost_usd DOUBLE,
94    timestamp VARCHAR NOT NULL,
95    logical_clock BIGINT NOT NULL DEFAULT 0,
96    content_hash BLOB NOT NULL,
97    prev_hash BLOB,
98    embedding BLOB
99);
100CREATE INDEX IF NOT EXISTS idx_events_agent_id ON agent_events(agent_id);
101CREATE INDEX IF NOT EXISTS idx_events_thread_id ON agent_events(thread_id);
102CREATE INDEX IF NOT EXISTS idx_events_event_type ON agent_events(event_type);
103CREATE INDEX IF NOT EXISTS idx_events_timestamp ON agent_events(timestamp);
104CREATE INDEX IF NOT EXISTS idx_events_trace_id ON agent_events(trace_id);
105CREATE INDEX IF NOT EXISTS idx_events_parent ON agent_events(parent_event_id);
106";
107
108pub const CREATE_CHECKPOINTS_TABLE: &str = "
109CREATE TABLE IF NOT EXISTS checkpoints (
110    id VARCHAR PRIMARY KEY,
111    thread_id VARCHAR NOT NULL,
112    agent_id VARCHAR NOT NULL,
113    parent_id VARCHAR,
114    branch_name VARCHAR NOT NULL DEFAULT 'main',
115    state_snapshot JSON,
116    state_diff JSON,
117    memory_refs JSON,
118    event_cursor VARCHAR,
119    label VARCHAR,
120    created_at VARCHAR NOT NULL,
121    metadata JSON
122);
123CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_id ON checkpoints(thread_id);
124CREATE INDEX IF NOT EXISTS idx_checkpoints_branch ON checkpoints(thread_id, branch_name);
125CREATE INDEX IF NOT EXISTS idx_checkpoints_agent ON checkpoints(agent_id);
126CREATE INDEX IF NOT EXISTS idx_checkpoints_created_at ON checkpoints(created_at);
127";
128
129// Sprint 3 ALTER TABLE migrations for upgrading existing databases.
130// New databases already have these columns in CREATE TABLE.
131// DuckDB doesn't support ADD COLUMN IF NOT EXISTS, so we skip ALTER for fresh DBs.
132// These are only needed when upgrading from Sprint 2 databases.
133pub const SPRINT3_COLUMN_ALTERS: &[&str] = &[
134    "ALTER TABLE memories ADD COLUMN decay_rate FLOAT",
135    "ALTER TABLE memories ADD COLUMN created_by VARCHAR",
136    "ALTER TABLE memories ADD COLUMN version INTEGER DEFAULT 1",
137    "ALTER TABLE memories ADD COLUMN prev_version_id VARCHAR",
138    "ALTER TABLE memories ADD COLUMN quarantined BOOLEAN DEFAULT false",
139    "ALTER TABLE memories ADD COLUMN quarantine_reason VARCHAR",
140];
141
142// Sprint 4 migrations for event embeddings and custom decay functions
143pub const SPRINT4_COLUMN_ALTERS: &[&str] = &[
144    "ALTER TABLE agent_events ADD COLUMN embedding BLOB",
145    "ALTER TABLE memories ADD COLUMN decay_function VARCHAR",
146];
147
148pub const CREATE_DELEGATIONS_TABLE: &str = "
149CREATE TABLE IF NOT EXISTS delegations (
150    id VARCHAR PRIMARY KEY,
151    delegator_id VARCHAR NOT NULL,
152    delegate_id VARCHAR NOT NULL,
153    permission VARCHAR NOT NULL,
154    scope_type VARCHAR NOT NULL DEFAULT 'all_memories',
155    scope_value JSON,
156    max_depth INTEGER NOT NULL DEFAULT 0,
157    current_depth INTEGER NOT NULL DEFAULT 0,
158    parent_delegation_id VARCHAR,
159    created_at VARCHAR NOT NULL,
160    expires_at VARCHAR,
161    revoked_at VARCHAR
162);
163CREATE INDEX IF NOT EXISTS idx_delegations_delegator ON delegations(delegator_id);
164CREATE INDEX IF NOT EXISTS idx_delegations_delegate ON delegations(delegate_id);
165";
166
167pub const CREATE_AGENT_PROFILES_TABLE: &str = "
168CREATE TABLE IF NOT EXISTS agent_profiles (
169    agent_id VARCHAR PRIMARY KEY,
170    avg_importance DOUBLE NOT NULL DEFAULT 0.5,
171    avg_content_length DOUBLE NOT NULL DEFAULT 100,
172    total_memories BIGINT NOT NULL DEFAULT 0,
173    last_updated VARCHAR NOT NULL
174);
175";
176
177pub const CREATE_SYNC_METADATA_TABLE: &str = "
178CREATE TABLE IF NOT EXISTS sync_metadata (
179    key VARCHAR PRIMARY KEY,
180    value VARCHAR NOT NULL,
181    updated_at VARCHAR NOT NULL DEFAULT CURRENT_TIMESTAMP
182);
183";
184
185/// Schema version stamp table. One row per database file, populated on
186/// first `run_migrations` call. A missing row on an existing database
187/// indicates a pre-0.3.1 file and is treated as version 1.
188pub const CREATE_MNEMO_META_TABLE: &str = "
189CREATE TABLE IF NOT EXISTS mnemo_meta (
190    key VARCHAR PRIMARY KEY,
191    value VARCHAR NOT NULL,
192    updated_at VARCHAR NOT NULL DEFAULT CURRENT_TIMESTAMP
193);
194";
195
196/// Per-agent embedding-space baseline used by the z-score outlier
197/// detector (v0.3.3, Task A). `mu` and `cov_diag` are stored as JSON
198/// arrays of f32 — DuckDB's native array type isn't a good fit because
199/// length varies per embedding model.
200pub const CREATE_EMBEDDING_BASELINE_TABLE: &str = "
201CREATE TABLE IF NOT EXISTS embedding_baseline (
202    agent_id VARCHAR PRIMARY KEY,
203    mu JSON NOT NULL,
204    cov_diag JSON NOT NULL,
205    n BIGINT NOT NULL,
206    updated_at VARCHAR NOT NULL
207);
208";
209
210/// Persistence format version this release writes. Bump when the on-disk
211/// schema changes in a way that requires a migrator pass.
212pub const CURRENT_PERSISTENCE_VERSION: u32 = 4;
213
214pub fn run_migrations(conn: &duckdb::Connection) -> duckdb::Result<()> {
215    conn.execute_batch(CREATE_MEMORIES_TABLE)?;
216    conn.execute_batch(CREATE_ACLS_TABLE)?;
217    conn.execute_batch(CREATE_RELATIONS_TABLE)?;
218    conn.execute_batch(CREATE_AGENT_EVENTS_TABLE)?;
219    conn.execute_batch(CREATE_CHECKPOINTS_TABLE)?;
220    // Sprint 3 column upgrades — silently ignore if columns already exist
221    for alter_sql in SPRINT3_COLUMN_ALTERS {
222        let _ = conn.execute(alter_sql, []);
223    }
224    conn.execute_batch(CREATE_DELEGATIONS_TABLE)?;
225    conn.execute_batch(CREATE_AGENT_PROFILES_TABLE)?;
226    // Sprint 4 column upgrades — silently ignore if columns already exist
227    for alter_sql in SPRINT4_COLUMN_ALTERS {
228        let _ = conn.execute(alter_sql, []);
229    }
230    // Create parent_event_id index if missing
231    let _ = conn.execute(
232        "CREATE INDEX IF NOT EXISTS idx_events_parent ON agent_events(parent_event_id)",
233        [],
234    );
235    // Sprint 8: sync watermarks table
236    conn.execute_batch(CREATE_SYNC_METADATA_TABLE)?;
237    // v0.3.2: persistence-version stamp.
238    conn.execute_batch(CREATE_MNEMO_META_TABLE)?;
239    // v0.3.3: embedding baseline table (z-score outlier detector).
240    conn.execute_batch(CREATE_EMBEDDING_BASELINE_TABLE)?;
241    stamp_persistence_version(conn)?;
242    Ok(())
243}
244
245/// Read the stored persistence version, or `None` if the marker is absent
246/// (i.e. this is a fresh database OR a pre-0.3.2 file).
247pub fn read_persistence_version(conn: &duckdb::Connection) -> duckdb::Result<Option<u32>> {
248    let mut stmt =
249        conn.prepare("SELECT value FROM mnemo_meta WHERE key = 'persistence_version'")?;
250    let mut rows = stmt.query([])?;
251    if let Some(row) = rows.next()? {
252        let raw: String = row.get(0)?;
253        Ok(raw.parse::<u32>().ok())
254    } else {
255        Ok(None)
256    }
257}
258
259/// Write / update the persistence version stamp. Called at the end of
260/// `run_migrations` after every schema operation has succeeded.
261///
262/// * If the stamp is missing, this is either a fresh DB or a pre-0.3.2
263///   file. Either way the post-run schema is the current one, so we
264///   write `CURRENT_PERSISTENCE_VERSION`.
265/// * If the stamp is older than `CURRENT_PERSISTENCE_VERSION`, we've
266///   just run a migrator over a legacy file; update to current.
267/// * If the stamp is already current, no-op.
268fn stamp_persistence_version(conn: &duckdb::Connection) -> duckdb::Result<()> {
269    let existing = read_persistence_version(conn)?;
270    let current = CURRENT_PERSISTENCE_VERSION;
271    if let Some(v) = existing
272        && v == current
273    {
274        return Ok(());
275    }
276    let now = chrono::Utc::now().to_rfc3339();
277    // DuckDB's ON CONFLICT parser is picky with DEFAULT columns; drive the
278    // updated_at value from Rust explicitly instead.
279    conn.execute(
280        "DELETE FROM mnemo_meta WHERE key = 'persistence_version'",
281        [],
282    )?;
283    conn.execute(
284        "INSERT INTO mnemo_meta(key, value, updated_at) VALUES ('persistence_version', ?, ?)",
285        duckdb::params![current.to_string(), now],
286    )?;
287    Ok(())
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn test_fresh_db_stamps_current_persistence_version() {
296        let conn = duckdb::Connection::open_in_memory().unwrap();
297        run_migrations(&conn).unwrap();
298        let v = read_persistence_version(&conn).unwrap();
299        assert_eq!(v, Some(CURRENT_PERSISTENCE_VERSION));
300    }
301
302    /// A "legacy" database (no mnemo_meta row) must get stamped to the
303    /// current version the first time run_migrations sees it. Subsequent
304    /// passes are no-ops. This mirrors what will happen when a v0.1.1
305    /// DuckDB file is opened by a v0.3.2 reader.
306    #[test]
307    fn test_legacy_db_gets_stamped_on_open() {
308        let conn = duckdb::Connection::open_in_memory().unwrap();
309        // Simulate a pre-0.3.2 file: create every table EXCEPT mnemo_meta.
310        conn.execute_batch(CREATE_MEMORIES_TABLE).unwrap();
311        conn.execute_batch(CREATE_ACLS_TABLE).unwrap();
312        conn.execute_batch(CREATE_RELATIONS_TABLE).unwrap();
313        conn.execute_batch(CREATE_AGENT_EVENTS_TABLE).unwrap();
314        conn.execute_batch(CREATE_CHECKPOINTS_TABLE).unwrap();
315        conn.execute_batch(CREATE_DELEGATIONS_TABLE).unwrap();
316        conn.execute_batch(CREATE_AGENT_PROFILES_TABLE).unwrap();
317        conn.execute_batch(CREATE_SYNC_METADATA_TABLE).unwrap();
318
319        assert!(
320            read_persistence_version(&conn).is_err()
321                || read_persistence_version(&conn).unwrap().is_none(),
322            "pre-migration legacy file should have no stamp"
323        );
324
325        run_migrations(&conn).unwrap();
326        assert_eq!(
327            read_persistence_version(&conn).unwrap(),
328            Some(CURRENT_PERSISTENCE_VERSION)
329        );
330
331        // Second pass is a no-op.
332        run_migrations(&conn).unwrap();
333        assert_eq!(
334            read_persistence_version(&conn).unwrap(),
335            Some(CURRENT_PERSISTENCE_VERSION)
336        );
337    }
338
339    #[test]
340    fn test_migrations_run_on_in_memory_db() {
341        let conn = duckdb::Connection::open_in_memory().unwrap();
342        run_migrations(&conn).unwrap();
343        // Verify tables exist by querying them
344        let mut stmt = conn.prepare("SELECT COUNT(*) FROM memories").unwrap();
345        let count: i64 = stmt.query_row([], |row| row.get(0)).unwrap();
346        assert_eq!(count, 0);
347
348        let mut stmt = conn.prepare("SELECT COUNT(*) FROM agent_events").unwrap();
349        let count: i64 = stmt.query_row([], |row| row.get(0)).unwrap();
350        assert_eq!(count, 0);
351
352        let mut stmt = conn.prepare("SELECT COUNT(*) FROM checkpoints").unwrap();
353        let count: i64 = stmt.query_row([], |row| row.get(0)).unwrap();
354        assert_eq!(count, 0);
355    }
356}