1use rusqlite::{params, OptionalExtension};
4
5use crate::{Pool, StoreResult};
6
7const MIGRATIONS: &[(&str, &str)] = &[
8 ("001_init", crate::INITIAL_MIGRATION_SQL),
9 (
10 "002_authority_timeline",
11 include_str!("../migrations/002_authority_timeline.sql"),
12 ),
13 (
14 "003_schema_v2_expand",
15 include_str!("../migrations/003_schema_v2_expand.sql"),
16 ),
17 (
18 "004_principle_promotion_policy_record",
19 include_str!("../migrations/004_principle_promotion_policy_record.sql"),
20 ),
21 (
22 "005_outcome_relation_scope",
23 include_str!("../migrations/005_outcome_relation_scope.sql"),
24 ),
25 (
26 "006_fts5_memories",
27 include_str!("../migrations/006_fts5_memories.sql"),
28 ),
29 (
30 "007_embeddings",
31 include_str!("../migrations/007_embeddings.sql"),
32 ),
33 (
34 "008_decay_jobs",
35 include_str!("../migrations/008_decay_jobs.sql"),
36 ),
37 (
38 "009_decay_supersessions",
39 include_str!("../migrations/009_decay_supersessions.sql"),
40 ),
41 (
42 "010_pending_mcp_commit",
43 include_str!("../migrations/010_pending_mcp_commit.sql"),
44 ),
45];
46
47const KNOWN_MIGRATION_NAMES: &[&str] = &[
48 "001_init",
49 "002_authority_timeline",
50 "003_schema_v2_expand",
51 "004_principle_promotion_policy_record",
52 "005_outcome_relation_scope",
53 "006_fts5_memories",
54 "007_embeddings",
55 "008_decay_jobs",
56 "009_decay_supersessions",
57 "010_pending_mcp_commit",
58];
59
60#[must_use]
62pub fn known_migration_names() -> &'static [&'static str] {
63 KNOWN_MIGRATION_NAMES
64}
65
66pub fn apply_pending(pool: &Pool) -> StoreResult<usize> {
79 pool.execute_batch(
80 "PRAGMA foreign_keys = ON;
81 PRAGMA journal_mode = WAL;
82 CREATE TABLE IF NOT EXISTS _migrations (
83 name TEXT PRIMARY KEY,
84 applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
85 );",
86 )?;
87
88 let mut applied = 0;
89 for (name, sql) in MIGRATIONS {
90 let existing: Option<String> = pool
91 .query_row(
92 "SELECT name FROM _migrations WHERE name = ?1;",
93 params![name],
94 |row| row.get(0),
95 )
96 .optional()?;
97
98 if existing.is_some() {
99 continue;
100 }
101
102 if *name == "003_schema_v2_expand" {
103 apply_003_schema_v2_expand_guarded(pool)?;
104 } else {
105 pool.execute_batch(sql)?;
106 }
107 pool.execute("INSERT INTO _migrations (name) VALUES (?1);", params![name])?;
108 applied += 1;
109 }
110
111 Ok(applied)
112}
113
114fn apply_003_schema_v2_expand_guarded(pool: &Pool) -> StoreResult<()> {
125 add_column_if_missing(
126 pool,
127 "events",
128 "source_attestation_json",
129 "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
130 CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
131 )?;
132 add_column_if_missing(
133 pool,
134 "episodes",
135 "summary_spans_json",
136 "ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
137 CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
138 )?;
139 add_column_if_missing(
140 pool,
141 "memories",
142 "summary_spans_json",
143 "ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
144 CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
145 )?;
146 add_column_if_missing(
147 pool,
148 "memories",
149 "cross_session_use_count",
150 "ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
151 CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
152 )?;
153 add_column_if_missing(
154 pool,
155 "memories",
156 "first_used_at",
157 "ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
158 )?;
159 add_column_if_missing(
160 pool,
161 "memories",
162 "last_cross_session_use_at",
163 "ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
164 )?;
165 add_column_if_missing(
166 pool,
167 "memories",
168 "last_validation_at",
169 "ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
170 )?;
171 add_column_if_missing(
172 pool,
173 "memories",
174 "validation_epoch",
175 "ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
176 CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
177 )?;
178 add_column_if_missing(
179 pool,
180 "memories",
181 "blessed_until",
182 "ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
183 )?;
184 add_column_if_missing(
185 pool,
186 "context_packs",
187 "consumer_advisory_json",
188 "ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
189 CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
190 )?;
191
192 pool.execute_batch(
197 "CREATE TABLE IF NOT EXISTS memory_session_uses (
198 memory_id TEXT NOT NULL REFERENCES memories(id),
199 session_id TEXT NOT NULL,
200 first_used_at TEXT NOT NULL,
201 last_used_at TEXT NOT NULL,
202 use_count INTEGER NOT NULL CHECK (use_count >= 0),
203 PRIMARY KEY (memory_id, session_id)
204 );
205 CREATE TABLE IF NOT EXISTS outcome_memory_relations (
206 outcome_ref TEXT NOT NULL,
207 memory_id TEXT NOT NULL REFERENCES memories(id),
208 relation TEXT NOT NULL,
209 recorded_at TEXT NOT NULL,
210 source_event_id TEXT NULL REFERENCES events(id),
211 PRIMARY KEY (outcome_ref, memory_id, relation)
212 );",
213 )?;
214
215 Ok(())
216}
217
218fn column_exists(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
219 let sql = format!("PRAGMA table_info({table});");
220 let mut stmt = pool.prepare(&sql)?;
221 let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
222 for found in columns {
223 if found? == column {
224 return Ok(true);
225 }
226 }
227 Ok(false)
228}
229
230fn add_column_if_missing(pool: &Pool, table: &str, column: &str, ddl: &str) -> StoreResult<()> {
231 if column_exists(pool, table, column)? {
232 return Ok(());
233 }
234 pool.execute_batch(ddl)?;
235 Ok(())
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn known_migration_names_match_migration_bundle() {
244 let bundled_names = MIGRATIONS.iter().map(|(name, _)| *name).collect::<Vec<_>>();
245
246 assert_eq!(known_migration_names(), bundled_names.as_slice());
247 }
248
249 #[test]
254 fn apply_pending_is_idempotent_when_003_columns_pre_exist() {
255 use rusqlite::Connection;
256
257 let pool = Connection::open_in_memory().expect("open in-memory sqlite");
258
259 pool.execute_batch(crate::INITIAL_MIGRATION_SQL)
261 .expect("init schema");
262
263 pool.execute_batch(
266 "CREATE TABLE IF NOT EXISTS _migrations (
267 name TEXT PRIMARY KEY,
268 applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
269 );
270 INSERT OR IGNORE INTO _migrations (name) VALUES ('001_init');",
271 )
272 .expect("bootstrap _migrations");
273
274 pool.execute_batch(
276 "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
277 CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
278 )
279 .expect("pre-add source_attestation_json");
280
281 apply_pending(&pool).expect("apply_pending must be idempotent when columns pre-exist");
283
284 let names: Vec<String> = pool
286 .prepare("SELECT name FROM _migrations ORDER BY name;")
287 .unwrap()
288 .query_map([], |row| row.get(0))
289 .unwrap()
290 .collect::<Result<_, _>>()
291 .unwrap();
292 assert!(
293 names.contains(&"003_schema_v2_expand".to_string()),
294 "003_schema_v2_expand must be recorded after guarded apply"
295 );
296 }
297
298 #[test]
301 fn apply_pending_second_run_is_noop() {
302 use rusqlite::Connection;
303
304 let pool = Connection::open_in_memory().expect("open in-memory sqlite");
305 let first = apply_pending(&pool).expect("first apply_pending");
306 assert!(first > 0, "first apply should apply migrations");
307 let second = apply_pending(&pool).expect("second apply_pending");
308 assert_eq!(second, 0, "second apply_pending must be a no-op");
309 }
310}