Skip to main content

mcpr_integrations/store/
db.rs

1//! SQLite connection management, WAL setup, and schema migrations.
2//!
3//! This module handles the low-level database lifecycle:
4//! - Opening a connection with the right pragmas for performance and safety.
5//! - Running schema migrations on first open or version upgrade.
6//! - Providing separate read-only connections for the query layer.
7//!
8//! # WAL mode
9//!
10//! WAL (Write-Ahead Logging) is enabled on every connection. This gives:
11//! - Concurrent readers while the background writer is writing.
12//! - Better write throughput for the batch write pattern.
13//! - Crash safety — no corruption on unclean shutdown.
14//!
15//! # Thread safety
16//!
17//! `rusqlite::Connection` is `!Send`. The writer owns its connection on a
18//! dedicated OS thread. Query commands open their own read-only connections.
19
20use rusqlite::Connection;
21use std::path::Path;
22
23use super::schema;
24
25/// Open a SQLite connection with WAL mode and performance pragmas.
26///
27/// This is used by both the background writer (read-write) and the query
28/// engine (read-only). The pragmas are safe for both use cases.
29///
30/// # Pragmas
31///
32/// - `journal_mode = WAL`: enables concurrent reads during writes.
33/// - `synchronous = NORMAL`: safe with WAL, ~3x faster than FULL.
34///   Acceptable durability trade-off for a local request log — at most
35///   one batch (200ms) of events could be lost on OS crash.
36/// - `cache_size = -64000`: 64MB page cache in memory. Improves read
37///   performance for repeated queries (e.g., `--follow` polling).
38/// - `temp_store = MEMORY`: temp tables and indexes in memory, not disk.
39/// - `busy_timeout = 5000`: wait up to 5s for locks instead of failing
40///   immediately. Prevents SQLITE_BUSY errors when CLI queries overlap
41///   with writer flushes.
42pub fn open_connection(path: &Path) -> rusqlite::Result<Connection> {
43    let conn = Connection::open(path)?;
44
45    conn.execute_batch(
46        "PRAGMA journal_mode = WAL;
47         PRAGMA synchronous = NORMAL;
48         PRAGMA cache_size = -64000;
49         PRAGMA temp_store = MEMORY;
50         PRAGMA busy_timeout = 5000;",
51    )?;
52
53    Ok(conn)
54}
55
56/// Run schema migrations to bring the database up to the current version.
57///
58/// Checks the `schema_version` in the `meta` table. If the table doesn't
59/// exist (fresh database), runs the full V1 schema. Future versions will
60/// add incremental migrations (V1 → V2, V2 → V3, etc.).
61///
62/// This is called once on `Store::open()` before spawning the writer.
63pub fn run_migrations(conn: &Connection, mcpr_version: &str) -> rusqlite::Result<()> {
64    let version = get_schema_version(conn);
65
66    if version < 1 {
67        conn.execute_batch(schema::V1_SCHEMA)?;
68        conn.execute_batch(schema::V1_META_SEED)?;
69    }
70
71    if version < 2 {
72        conn.execute_batch(schema::V2_SCHEMA)?;
73    }
74
75    if version < 3 {
76        conn.execute_batch(schema::V3_SCHEMA)?;
77    }
78
79    if version < 4 {
80        conn.execute_batch(schema::V4_SCHEMA)?;
81    }
82
83    if version < 5 {
84        conn.execute_batch(schema::V5_SCHEMA)?;
85    }
86
87    // Always update the mcpr binary version on startup.
88    conn.execute(schema::UPSERT_MCPR_VERSION, rusqlite::params![mcpr_version])?;
89
90    Ok(())
91}
92
93/// Read the current schema version from the `meta` table.
94///
95/// Returns 0 if the `meta` table doesn't exist (fresh database)
96/// or if the `schema_version` key is missing.
97fn get_schema_version(conn: &Connection) -> u32 {
98    // Check if the meta table exists at all.
99    let table_exists: bool = conn
100        .query_row(
101            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = 'meta'",
102            [],
103            |row| row.get(0),
104        )
105        .unwrap_or(false);
106
107    if !table_exists {
108        return 0;
109    }
110
111    conn.query_row(
112        "SELECT value FROM meta WHERE key = 'schema_version'",
113        [],
114        |row| {
115            let v: String = row.get(0)?;
116            Ok(v.parse::<u32>().unwrap_or(0))
117        },
118    )
119    .unwrap_or(0)
120}
121
122#[cfg(test)]
123#[allow(non_snake_case)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn run_migrations__fresh_db() {
129        let dir = tempfile::tempdir().unwrap();
130        let db_path = dir.path().join("test.db");
131
132        let conn = open_connection(&db_path).unwrap();
133        run_migrations(&conn, "0.3.0-test").unwrap();
134
135        let count: i64 = conn
136            .query_row(
137                "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name IN ('requests', 'sessions', 'meta')",
138                [],
139                |row| row.get(0),
140            )
141            .unwrap();
142        assert_eq!(count, 3);
143
144        let version: String = conn
145            .query_row(
146                "SELECT value FROM meta WHERE key = 'schema_version'",
147                [],
148                |row| row.get(0),
149            )
150            .unwrap();
151        assert_eq!(version, "5");
152
153        let mcpr_ver: String = conn
154            .query_row(
155                "SELECT value FROM meta WHERE key = 'mcpr_version'",
156                [],
157                |row| row.get(0),
158            )
159            .unwrap();
160        assert_eq!(mcpr_ver, "0.3.0-test");
161    }
162
163    #[test]
164    fn run_migrations__idempotent() {
165        let dir = tempfile::tempdir().unwrap();
166        let db_path = dir.path().join("test.db");
167
168        let conn = open_connection(&db_path).unwrap();
169        run_migrations(&conn, "0.3.0").unwrap();
170        run_migrations(&conn, "0.3.1").unwrap();
171
172        let mcpr_ver: String = conn
173            .query_row(
174                "SELECT value FROM meta WHERE key = 'mcpr_version'",
175                [],
176                |row| row.get(0),
177            )
178            .unwrap();
179        assert_eq!(mcpr_ver, "0.3.1");
180    }
181
182    #[test]
183    fn run_migrations__v3_adds_proxy() {
184        let dir = tempfile::tempdir().unwrap();
185        let db_path = dir.path().join("test.db");
186
187        let conn = open_connection(&db_path).unwrap();
188        run_migrations(&conn, "test").unwrap();
189
190        conn.execute(
191            "INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash)
192             VALUES ('search', 'http://localhost:9000', 'tools/list', '{}', 1000, 'abc')",
193            [],
194        )
195        .unwrap();
196
197        let proxy: String = conn
198            .query_row(
199                "SELECT proxy FROM server_schema WHERE upstream_url = 'http://localhost:9000'",
200                [],
201                |row| row.get(0),
202            )
203            .unwrap();
204        assert_eq!(proxy, "search");
205
206        conn.execute(
207            "INSERT INTO schema_changes (proxy, upstream_url, method, change_type, detected_at)
208             VALUES ('search', 'http://localhost:9000', 'tools/list', 'initial', 1000)",
209            [],
210        )
211        .unwrap();
212
213        let proxy: String = conn
214            .query_row(
215                "SELECT proxy FROM schema_changes WHERE upstream_url = 'http://localhost:9000'",
216                [],
217                |row| row.get(0),
218            )
219            .unwrap();
220        assert_eq!(proxy, "search");
221
222        conn.execute(
223            "INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash)
224             VALUES ('email', 'http://localhost:9000', 'tools/list', '{}', 2000, 'def')",
225            [],
226        )
227        .unwrap();
228
229        let count: i64 = conn
230            .query_row("SELECT COUNT(*) FROM server_schema", [], |row| row.get(0))
231            .unwrap();
232        assert_eq!(count, 2);
233    }
234
235    #[test]
236    fn run_migrations__v4_renames_latency() {
237        let dir = tempfile::tempdir().unwrap();
238        let db_path = dir.path().join("test.db");
239
240        let conn = open_connection(&db_path).unwrap();
241        run_migrations(&conn, "test").unwrap();
242
243        conn.execute(
244            "INSERT INTO requests (request_id, ts, proxy, method, latency_us, status)
245             VALUES ('r1', 1000, 'api', 'tools/call', 142000, 'ok')",
246            [],
247        )
248        .unwrap();
249
250        let latency: i64 = conn
251            .query_row(
252                "SELECT latency_us FROM requests WHERE request_id = 'r1'",
253                [],
254                |row| row.get(0),
255            )
256            .unwrap();
257        assert_eq!(latency, 142_000);
258    }
259
260    #[test]
261    fn run_migrations__v4_converts_ms_to_us() {
262        let dir = tempfile::tempdir().unwrap();
263        let db_path = dir.path().join("test.db");
264
265        let conn = open_connection(&db_path).unwrap();
266
267        conn.execute_batch(schema::V1_SCHEMA).unwrap();
268        conn.execute_batch(schema::V1_META_SEED).unwrap();
269        conn.execute_batch(schema::V2_SCHEMA).unwrap();
270        conn.execute_batch(schema::V3_SCHEMA).unwrap();
271
272        conn.execute(
273            "INSERT INTO requests (request_id, ts, proxy, method, latency_ms, status)
274             VALUES ('r1', 1000, 'api', 'tools/call', 42, 'ok')",
275            [],
276        )
277        .unwrap();
278        conn.execute(
279            "INSERT INTO requests (request_id, ts, proxy, method, latency_ms, status)
280             VALUES ('r2', 2000, 'api', 'tools/call', 1500, 'ok')",
281            [],
282        )
283        .unwrap();
284
285        conn.execute_batch(schema::V4_SCHEMA).unwrap();
286
287        let latency1: i64 = conn
288            .query_row(
289                "SELECT latency_us FROM requests WHERE request_id = 'r1'",
290                [],
291                |row| row.get(0),
292            )
293            .unwrap();
294        assert_eq!(latency1, 42_000);
295
296        let latency2: i64 = conn
297            .query_row(
298                "SELECT latency_us FROM requests WHERE request_id = 'r2'",
299                [],
300                |row| row.get(0),
301            )
302            .unwrap();
303        assert_eq!(latency2, 1_500_000);
304    }
305
306    #[test]
307    fn run_migrations__v4_rebuilds_slow_index() {
308        let dir = tempfile::tempdir().unwrap();
309        let db_path = dir.path().join("test.db");
310
311        let conn = open_connection(&db_path).unwrap();
312        run_migrations(&conn, "test").unwrap();
313
314        let idx_sql: String = conn
315            .query_row(
316                "SELECT sql FROM sqlite_master WHERE type = 'index' AND name = 'idx_requests_slow'",
317                [],
318                |row| row.get(0),
319            )
320            .unwrap();
321        assert!(idx_sql.contains("latency_us"));
322    }
323
324    #[test]
325    fn run_migrations__v3_defaults_proxy() {
326        let dir = tempfile::tempdir().unwrap();
327        let db_path = dir.path().join("test.db");
328
329        let conn = open_connection(&db_path).unwrap();
330
331        conn.execute_batch(super::schema::V1_SCHEMA).unwrap();
332        conn.execute_batch(super::schema::V1_META_SEED).unwrap();
333        conn.execute_batch(super::schema::V2_SCHEMA).unwrap();
334
335        conn.execute(
336            "INSERT INTO server_schema (upstream_url, method, payload, captured_at, schema_hash)
337             VALUES ('http://localhost:9000', 'tools/list', '{}', 1000, 'abc')",
338            [],
339        )
340        .unwrap();
341
342        conn.execute_batch(super::schema::V3_SCHEMA).unwrap();
343
344        let proxy: String = conn
345            .query_row(
346                "SELECT proxy FROM server_schema WHERE upstream_url = 'http://localhost:9000'",
347                [],
348                |row| row.get(0),
349            )
350            .unwrap();
351        assert_eq!(proxy, "default");
352    }
353}