Skip to main content

cortex_store/
migrate.rs

1//! SQLite migration application and tracking.
2
3use rusqlite::{params, OptionalExtension};
4
5use crate::{Pool, StoreResult};
6
7const MIGRATIONS: &[(&str, &str)] = &[
8    ("001_init", crate::INITIAL_MIGRATION_SQL),
9    (
10        "002_authority_timeline",
11        include_str!("../migrations/002_authority_timeline.sql"),
12    ),
13    (
14        "003_schema_v2_expand",
15        include_str!("../migrations/003_schema_v2_expand.sql"),
16    ),
17    (
18        "004_principle_promotion_policy_record",
19        include_str!("../migrations/004_principle_promotion_policy_record.sql"),
20    ),
21    (
22        "005_outcome_relation_scope",
23        include_str!("../migrations/005_outcome_relation_scope.sql"),
24    ),
25    (
26        "006_fts5_memories",
27        include_str!("../migrations/006_fts5_memories.sql"),
28    ),
29    (
30        "007_embeddings",
31        include_str!("../migrations/007_embeddings.sql"),
32    ),
33    (
34        "008_decay_jobs",
35        include_str!("../migrations/008_decay_jobs.sql"),
36    ),
37    (
38        "009_decay_supersessions",
39        include_str!("../migrations/009_decay_supersessions.sql"),
40    ),
41    (
42        "010_pending_mcp_commit",
43        include_str!("../migrations/010_pending_mcp_commit.sql"),
44    ),
45];
46
47const KNOWN_MIGRATION_NAMES: &[&str] = &[
48    "001_init",
49    "002_authority_timeline",
50    "003_schema_v2_expand",
51    "004_principle_promotion_policy_record",
52    "005_outcome_relation_scope",
53    "006_fts5_memories",
54    "007_embeddings",
55    "008_decay_jobs",
56    "009_decay_supersessions",
57    "010_pending_mcp_commit",
58];
59
60/// Names of migrations known to this binary, in application order.
61#[must_use]
62pub fn known_migration_names() -> &'static [&'static str] {
63    KNOWN_MIGRATION_NAMES
64}
65
66/// Applies migrations that do not already exist in `_migrations`.
67///
68/// Returns the number of migrations applied during this call.
69///
70/// # Idempotency note for `003_schema_v2_expand`
71///
72/// Migration `003_schema_v2_expand` adds nullable v2 columns via `ALTER TABLE`.
73/// SQLite does not support `ADD COLUMN IF NOT EXISTS`. Stores created with an
74/// older binary may already have some or all of these columns (added by
75/// `cortex_store::migrate_v2::apply_expand_backfill_skeleton`). To prevent a
76/// "duplicate column name" error this function special-cases `003_schema_v2_expand`
77/// and guards each `ALTER TABLE ADD COLUMN` with a `PRAGMA table_info` check.
78pub fn apply_pending(pool: &Pool) -> StoreResult<usize> {
79    pool.execute_batch(
80        "PRAGMA foreign_keys = ON;
81         PRAGMA journal_mode = WAL;
82         CREATE TABLE IF NOT EXISTS _migrations (
83             name TEXT PRIMARY KEY,
84             applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
85         );",
86    )?;
87
88    let mut applied = 0;
89    for (name, sql) in MIGRATIONS {
90        let existing: Option<String> = pool
91            .query_row(
92                "SELECT name FROM _migrations WHERE name = ?1;",
93                params![name],
94                |row| row.get(0),
95            )
96            .optional()?;
97
98        if existing.is_some() {
99            continue;
100        }
101
102        if *name == "003_schema_v2_expand" {
103            apply_003_schema_v2_expand_guarded(pool)?;
104        } else {
105            pool.execute_batch(sql)?;
106        }
107        pool.execute("INSERT INTO _migrations (name) VALUES (?1);", params![name])?;
108        applied += 1;
109    }
110
111    Ok(applied)
112}
113
114/// Apply `003_schema_v2_expand` with per-column idempotency guards.
115///
116/// `ALTER TABLE … ADD COLUMN` fails with "duplicate column name" if the column
117/// already exists. SQLite has no `ADD COLUMN IF NOT EXISTS` syntax. This
118/// function guards each column addition with a `PRAGMA table_info` check so
119/// that stores where `apply_expand_backfill_skeleton` ran before this migration
120/// was bundled into `apply_pending` are not broken.
121///
122/// The two `CREATE TABLE` statements at the end of the migration use
123/// `CREATE TABLE IF NOT EXISTS` so they are safe to run unconditionally.
124fn apply_003_schema_v2_expand_guarded(pool: &Pool) -> StoreResult<()> {
125    add_column_if_missing(
126        pool,
127        "events",
128        "source_attestation_json",
129        "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
130         CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
131    )?;
132    add_column_if_missing(
133        pool,
134        "episodes",
135        "summary_spans_json",
136        "ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
137         CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
138    )?;
139    add_column_if_missing(
140        pool,
141        "memories",
142        "summary_spans_json",
143        "ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
144         CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
145    )?;
146    add_column_if_missing(
147        pool,
148        "memories",
149        "cross_session_use_count",
150        "ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
151         CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
152    )?;
153    add_column_if_missing(
154        pool,
155        "memories",
156        "first_used_at",
157        "ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
158    )?;
159    add_column_if_missing(
160        pool,
161        "memories",
162        "last_cross_session_use_at",
163        "ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
164    )?;
165    add_column_if_missing(
166        pool,
167        "memories",
168        "last_validation_at",
169        "ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
170    )?;
171    add_column_if_missing(
172        pool,
173        "memories",
174        "validation_epoch",
175        "ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
176         CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
177    )?;
178    add_column_if_missing(
179        pool,
180        "memories",
181        "blessed_until",
182        "ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
183    )?;
184    add_column_if_missing(
185        pool,
186        "context_packs",
187        "consumer_advisory_json",
188        "ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
189         CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
190    )?;
191
192    // CREATE TABLE statements are safe to apply without guards only when the
193    // table genuinely does not exist. Use CREATE TABLE IF NOT EXISTS so that
194    // stores where apply_expand_backfill_skeleton already created these tables
195    // do not error.
196    pool.execute_batch(
197        "CREATE TABLE IF NOT EXISTS memory_session_uses (
198            memory_id TEXT NOT NULL REFERENCES memories(id),
199            session_id TEXT NOT NULL,
200            first_used_at TEXT NOT NULL,
201            last_used_at TEXT NOT NULL,
202            use_count INTEGER NOT NULL CHECK (use_count >= 0),
203            PRIMARY KEY (memory_id, session_id)
204        );
205        CREATE TABLE IF NOT EXISTS outcome_memory_relations (
206            outcome_ref TEXT NOT NULL,
207            memory_id TEXT NOT NULL REFERENCES memories(id),
208            relation TEXT NOT NULL,
209            recorded_at TEXT NOT NULL,
210            source_event_id TEXT NULL REFERENCES events(id),
211            PRIMARY KEY (outcome_ref, memory_id, relation)
212        );",
213    )?;
214
215    Ok(())
216}
217
218fn column_exists(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
219    let sql = format!("PRAGMA table_info({table});");
220    let mut stmt = pool.prepare(&sql)?;
221    let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
222    for found in columns {
223        if found? == column {
224            return Ok(true);
225        }
226    }
227    Ok(false)
228}
229
230fn add_column_if_missing(pool: &Pool, table: &str, column: &str, ddl: &str) -> StoreResult<()> {
231    if column_exists(pool, table, column)? {
232        return Ok(());
233    }
234    pool.execute_batch(ddl)?;
235    Ok(())
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn known_migration_names_match_migration_bundle() {
244        let bundled_names = MIGRATIONS.iter().map(|(name, _)| *name).collect::<Vec<_>>();
245
246        assert_eq!(known_migration_names(), bundled_names.as_slice());
247    }
248
249    /// Reproduces the duplicate-column bug: if a store already has
250    /// `source_attestation_json` on `events` (added by
251    /// `apply_expand_backfill_skeleton` before `003_schema_v2_expand` was
252    /// bundled), `apply_pending` must not error with "duplicate column name".
253    #[test]
254    fn apply_pending_is_idempotent_when_003_columns_pre_exist() {
255        use rusqlite::Connection;
256
257        let pool = Connection::open_in_memory().expect("open in-memory sqlite");
258
259        // Bootstrap the schema manually as 001_init does.
260        pool.execute_batch(crate::INITIAL_MIGRATION_SQL)
261            .expect("init schema");
262
263        // Simulate apply_expand_backfill_skeleton: add the columns that
264        // 003_schema_v2_expand would normally add, without recording the migration.
265        pool.execute_batch(
266            "CREATE TABLE IF NOT EXISTS _migrations (
267                name TEXT PRIMARY KEY,
268                applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
269             );
270             INSERT OR IGNORE INTO _migrations (name) VALUES ('001_init');",
271        )
272        .expect("bootstrap _migrations");
273
274        // Pre-add the column that triggers the duplicate-column error.
275        pool.execute_batch(
276            "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
277             CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
278        )
279        .expect("pre-add source_attestation_json");
280
281        // apply_pending must not error even though source_attestation_json exists.
282        apply_pending(&pool).expect("apply_pending must be idempotent when columns pre-exist");
283
284        // All migrations should now be recorded.
285        let names: Vec<String> = pool
286            .prepare("SELECT name FROM _migrations ORDER BY name;")
287            .unwrap()
288            .query_map([], |row| row.get(0))
289            .unwrap()
290            .collect::<Result<_, _>>()
291            .unwrap();
292        assert!(
293            names.contains(&"003_schema_v2_expand".to_string()),
294            "003_schema_v2_expand must be recorded after guarded apply"
295        );
296    }
297
298    /// Running apply_pending a second time on a fully-migrated store must be a
299    /// no-op (returns 0 applied).
300    #[test]
301    fn apply_pending_second_run_is_noop() {
302        use rusqlite::Connection;
303
304        let pool = Connection::open_in_memory().expect("open in-memory sqlite");
305        let first = apply_pending(&pool).expect("first apply_pending");
306        assert!(first > 0, "first apply should apply migrations");
307        let second = apply_pending(&pool).expect("second apply_pending");
308        assert_eq!(second, 0, "second apply_pending must be a no-op");
309    }
310}