Skip to main content

innate_core/
migrate.rs

1//! Schema migration runner — 4.0 → current chain.
2//!
3//! Each step is atomic: BEGIN IMMEDIATE … COMMIT. Any failure rolls back the
4//! entire step and returns an error — no half-migrated state.
5
6use std::path::Path;
7
8use rusqlite::{Connection, OptionalExtension};
9
10use crate::errors::{InnateError, Result};
11
12// Embedded migration SQL ordered from lowest to highest target version.
13const MIGRATIONS: &[(&str, &str, &str)] = &[
14    ("4.0", "4.1", include_str!("migrations/4.0_to_4.1.sql")),
15    ("4.1", "4.2", include_str!("migrations/4.1_to_4.2.sql")),
16    ("4.2", "4.3", include_str!("migrations/4.2_to_4.3.sql")),
17    ("4.3", "4.4", include_str!("migrations/4.3_to_4.4.sql")),
18    ("4.4", "4.5", include_str!("migrations/4.4_to_4.5.sql")),
19    ("4.5", "4.5.1", include_str!("migrations/4.5_to_4.5.1.sql")),
20    (
21        "4.5.1",
22        "4.5.2",
23        include_str!("migrations/4.5.1_to_4.5.2.sql"),
24    ),
25    ("4.5.2", "4.6", include_str!("migrations/4.5.2_to_4.6.sql")),
26    ("4.6", "4.7", include_str!("migrations/4.6_to_4.7.sql")),
27    ("4.7", "4.8", include_str!("migrations/4.7_to_4.8.sql")),
28    ("4.8", "4.9", include_str!("migrations/4.8_to_4.9.sql")),
29    ("4.9", "4.10", include_str!("migrations/4.9_to_4.10.sql")),
30    ("4.10", "4.11", include_str!("migrations/4.10_to_4.11.sql")),
31    ("4.11", "4.12", include_str!("migrations/4.11_to_4.12.sql")),
32    ("4.12", "4.13", include_str!("migrations/4.12_to_4.13.sql")),
33    ("4.13", "4.14", include_str!("migrations/4.13_to_4.14.sql")),
34    ("4.14", "4.15", include_str!("migrations/4.14_to_4.15.sql")),
35    ("4.15", "4.16", include_str!("migrations/4.15_to_4.16.sql")),
36    ("4.16", "4.17", include_str!("migrations/4.16_to_4.17.sql")),
37];
38
39const TARGET: &str = "4.17";
40
41/// Run all pending migrations on `db_path`. Idempotent if already at target.
42/// Returns the list of migration steps executed.
43pub fn run_migrations(db_path: impl AsRef<Path>) -> Result<Vec<String>> {
44    let conn = Connection::open(db_path.as_ref())?;
45    conn.execute_batch(
46        "PRAGMA journal_mode=WAL;
47         PRAGMA foreign_keys=ON;
48         PRAGMA synchronous=NORMAL;",
49    )?;
50
51    let current = schema_version(&conn)?;
52    if current == TARGET {
53        return Ok(vec![]);
54    }
55
56    let mut applied = vec![];
57    let mut ver = current;
58
59    for (from, to, sql) in MIGRATIONS {
60        if ver_tuple(&ver) >= ver_tuple(to) {
61            continue; // already at or beyond this step
62        }
63        if ver_tuple(&ver) < ver_tuple(from) {
64            return Err(InnateError::Other(format!(
65                "Migration gap: database at {ver}, expected {from}→{to}. \
66                 Is the database from an unsupported version?"
67            )));
68        }
69        let copy_last_used = *to == "4.12" && column_exists(&conn, "chunks", "last_used_at")?;
70        // 方案 C:provenance 列在 4.14→4.15 条件添加(ALTER ADD COLUMN 无 IF NOT EXISTS)。
71        let add_provenance =
72            *to == "4.15" && !column_exists(&conn, "confidence_evidence", "provenance")?;
73        // 混合检索:FTS5 词法通道。仅当 chunks 拥有索引列时建/灌(部分 schema
74        // 测试夹具缺这些列 → 跳过,不致命;真实库恒满足)。DDL 自带 IF NOT EXISTS,
75        // 回填 DELETE+INSERT,可重复执行。
76        let add_fts = *to == "4.16"
77            && column_exists(&conn, "chunks", "content")?
78            && column_exists(&conn, "chunks", "trigger_desc")?
79            && column_exists(&conn, "chunks", "skill_name")?;
80        // agent 来源列(4.16→4.17)。ALTER ADD COLUMN 无 IF NOT EXISTS,故按表是否
81        // 存在(借已知列探测)且 agent 列尚缺时条件添加,对部分 schema 夹具不致命。
82        let add_agent_log = *to == "4.17"
83            && column_exists(&conn, "episodic_log", "trace_id")?
84            && !column_exists(&conn, "episodic_log", "agent")?;
85        let add_agent_chunk = *to == "4.17"
86            && column_exists(&conn, "chunks", "content")?
87            && !column_exists(&conn, "chunks", "agent")?;
88        // Run the step atomically.
89        conn.execute_batch("BEGIN IMMEDIATE")?;
90        let r = conn.execute_batch(sql);
91        match r {
92            Ok(()) => {
93                if add_fts {
94                    if let Err(error) =
95                        conn.execute_batch(include_str!("migrations/4.16_fts.sql"))
96                    {
97                        let _ = conn.execute_batch("ROLLBACK");
98                        return Err(error.into());
99                    }
100                }
101                if add_agent_log {
102                    if let Err(error) =
103                        conn.execute_batch("ALTER TABLE episodic_log ADD COLUMN agent TEXT")
104                    {
105                        let _ = conn.execute_batch("ROLLBACK");
106                        return Err(error.into());
107                    }
108                }
109                if add_agent_chunk {
110                    if let Err(error) =
111                        conn.execute_batch("ALTER TABLE chunks ADD COLUMN agent TEXT")
112                    {
113                        let _ = conn.execute_batch("ROLLBACK");
114                        return Err(error.into());
115                    }
116                }
117                if add_provenance {
118                    if let Err(error) = conn.execute_batch(
119                        "ALTER TABLE confidence_evidence
120                         ADD COLUMN provenance TEXT NOT NULL DEFAULT 'observed'",
121                    ) {
122                        let _ = conn.execute_batch("ROLLBACK");
123                        return Err(error.into());
124                    }
125                }
126                if copy_last_used {
127                    if let Err(error) = conn.execute(
128                        "UPDATE chunks
129                             SET last_used_base=CASE
130                               WHEN EXISTS (
131                                 SELECT 1 FROM usage_trace u
132                                 WHERE u.chunk_id=chunks.id AND u.event='used'
133                               ) THEN NULL
134                               ELSE last_used_at
135                             END",
136                        [],
137                    ) {
138                        let _ = conn.execute_batch("ROLLBACK");
139                        return Err(error.into());
140                    }
141                }
142                conn.execute_batch("COMMIT")?;
143                applied.push(format!("{from}→{to}"));
144                ver = to.to_string();
145            }
146            Err(e) => {
147                let _ = conn.execute_batch("ROLLBACK");
148                return Err(InnateError::Other(format!(
149                    "Migration {from}→{to} failed: {e}"
150                )));
151            }
152        }
153    }
154
155    if ver != TARGET {
156        return Err(InnateError::Other(format!(
157            "After all migrations, schema version is {ver}, expected {TARGET}."
158        )));
159    }
160
161    Ok(applied)
162}
163
164fn column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
165    let sql = format!("SELECT COUNT(*) FROM pragma_table_info('{table}') WHERE name=?");
166    Ok(conn.query_row(&sql, [column], |row| row.get::<_, i64>(0))? > 0)
167}
168
169fn schema_version(conn: &Connection) -> Result<String> {
170    let has_meta: bool = conn.query_row(
171        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='meta'",
172        [],
173        |r| r.get::<_, i64>(0),
174    )? > 0;
175
176    if !has_meta {
177        return Err(InnateError::Other(
178            "Database has no meta table — cannot migrate. \
179             Use `innate` to create a fresh database."
180                .into(),
181        ));
182    }
183
184    let ver: Option<String> = conn
185        .query_row(
186            "SELECT value FROM meta WHERE key='schema_version'",
187            [],
188            |r| r.get(0),
189        )
190        .optional()?;
191
192    ver.ok_or_else(|| InnateError::Other("meta table missing schema_version".into()))
193}
194
195fn ver_tuple(v: &str) -> (u32, u32, u32) {
196    let parts: Vec<u32> = v.split('.').filter_map(|s| s.parse().ok()).collect();
197    (
198        parts.first().copied().unwrap_or(0),
199        parts.get(1).copied().unwrap_or(0),
200        parts.get(2).copied().unwrap_or(0),
201    )
202}