cartog_db/store/lifecycle.rs
1//! Database open/create/migrate lifecycle and connection setup.
2//!
3//! Part of the [`Database`](super::Database) impl, split out of `lib.rs` for navigability.
4
5use super::*;
6
7impl Database {
8 /// Open or create the database at the given path.
9 ///
10 /// `embedding_dim` sets the vector dimension for the sqlite-vec table.
11 /// If the stored dimension differs from the requested one, the vector index
12 /// is cleared and recreated (a re-index via `cartog rag index` is needed).
13 pub fn open(path: impl AsRef<std::path::Path>, embedding_dim: usize) -> DbResult<Self> {
14 register_sqlite_vec();
15 let db_path = path.as_ref();
16 // SQLite::open fails on a missing parent tree, so materialize `.cartog/`.
17 if let Some(parent) = db_path.parent() {
18 if !parent.as_os_str().is_empty() {
19 std::fs::create_dir_all(parent).map_err(|source| DbError::PrepareDir {
20 path: parent.to_path_buf(),
21 source,
22 })?;
23 }
24 }
25 let conn = Connection::open(db_path).map_err(|source| DbError::Open {
26 path: db_path.to_path_buf(),
27 source,
28 })?;
29 conn.execute_batch(&format!(
30 "PRAGMA journal_mode=WAL;
31 PRAGMA busy_timeout={BUSY_TIMEOUT_MS};
32 PRAGMA foreign_keys=ON;
33 PRAGMA synchronous=NORMAL;
34 PRAGMA cache_size=-65536;
35 PRAGMA temp_store=MEMORY;
36 PRAGMA mmap_size=268435456;"
37 ))
38 .map_err(DbError::Pragma)?;
39 conn.execute_batch(SCHEMA).map_err(DbError::Schema)?;
40 conn.execute_batch(RAG_SCHEMA).map_err(DbError::RagSchema)?;
41 backup_before_destructive_migration(&conn, db_path)?;
42 migrate(&conn);
43 // Partial index requires resolution_state (added in migration 3→4),
44 // so create it after migrate() rather than from SCHEMA.
45 conn.execute_batch(
46 "CREATE INDEX IF NOT EXISTS idx_edges_unresolved
47 ON edges(file_path) WHERE resolution_state = 0",
48 )
49 .map_err(DbError::Schema)?;
50 handle_embedding_dimension(&conn, embedding_dim).map_err(DbError::EmbeddingDimension)?;
51 Ok(Self { conn, pinned: None })
52 }
53
54 /// Open an existing on-disk database in **read-write** mode without
55 /// running schema migrations or the embedding-fingerprint reconcile.
56 /// Used by the Phase 5 promoter: a secondary that detected its primary
57 /// died and validated the on-disk schema/fingerprint against its
58 /// pinned snapshot before claiming the slot. Re-running the migration
59 /// would re-trigger the SQLITE_BUSY race that the election was meant
60 /// to prevent.
61 ///
62 /// Verifies that `schema_version` still matches `SCHEMA_VERSION` to
63 /// guard against a race where another writer upgraded the schema
64 /// between the secondary's attach and this promotion. Returns
65 /// [`DbError::SchemaDrift`] in that case so the promoter aborts and
66 /// the MCP process exits cleanly.
67 pub fn open_existing_rw(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
68 register_sqlite_vec();
69 let db_path = path.as_ref();
70 let conn = Connection::open(db_path).map_err(|source| DbError::Open {
71 path: db_path.to_path_buf(),
72 source,
73 })?;
74 conn.execute_batch(&format!(
75 "PRAGMA journal_mode=WAL;
76 PRAGMA busy_timeout={BUSY_TIMEOUT_MS};
77 PRAGMA foreign_keys=ON;
78 PRAGMA synchronous=NORMAL;
79 PRAGMA cache_size=-65536;
80 PRAGMA temp_store=MEMORY;
81 PRAGMA mmap_size=268435456;"
82 ))
83 .map_err(DbError::Pragma)?;
84
85 let stored_schema = read_schema_version(&conn)?;
86 if stored_schema != SCHEMA_VERSION {
87 return Err(DbError::SchemaDrift {
88 expected: SCHEMA_VERSION,
89 stored: stored_schema,
90 });
91 }
92
93 Ok(Self { conn, pinned: None })
94 }
95
96 /// Open an existing on-disk database in **read-only** mode for a
97 /// secondary cartog process (Phase 4 read-only attach). Skips schema
98 /// migrations and the embedding-fingerprint reconcile — the primary
99 /// writer owns those.
100 ///
101 /// Behaviour:
102 /// - Opens with `SQLITE_OPEN_READ_ONLY` so write attempts surface as
103 /// `SQLITE_READONLY` errors at runtime (a defense-in-depth backup
104 /// for the higher-level tool gating).
105 /// - Reads the `metadata` snapshot (schema version + embedding
106 /// fingerprint) and stores it on the returned [`Database`] so the
107 /// promoter (Phase 5) can compare against the on-disk values later.
108 /// - Returns [`DbError::SchemaDrift`] if the stored `schema_version`
109 /// doesn't match this binary's expected version — the primary
110 /// upgraded cartog underneath us and queries can't be trusted.
111 pub fn open_readonly(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
112 use rusqlite::OpenFlags;
113 register_sqlite_vec();
114 let db_path = path.as_ref();
115 let conn = Connection::open_with_flags(
116 db_path,
117 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
118 )
119 .map_err(|source| DbError::Open {
120 path: db_path.to_path_buf(),
121 source,
122 })?;
123 // busy_timeout is still useful: a long read can stall against a
124 // writer mid-checkpoint. WAL keeps readers and writers from
125 // blocking otherwise, but the timeout makes the bound explicit.
126 conn.execute_batch(&format!("PRAGMA busy_timeout={BUSY_TIMEOUT_MS};"))
127 .map_err(DbError::Pragma)?;
128
129 let stored_schema = read_schema_version(&conn)?;
130 if stored_schema != SCHEMA_VERSION {
131 return Err(DbError::SchemaDrift {
132 expected: SCHEMA_VERSION,
133 stored: stored_schema,
134 });
135 }
136
137 let stored_provider: Option<String> = conn
138 .query_row(
139 "SELECT value FROM metadata WHERE key = ?1",
140 params![EMBED_PROVIDER_KEY],
141 |row| row.get(0),
142 )
143 .optional()
144 .map_err(DbError::Sqlite)?;
145 let stored_model: Option<String> = conn
146 .query_row(
147 "SELECT value FROM metadata WHERE key = ?1",
148 params![EMBED_MODEL_KEY],
149 |row| row.get(0),
150 )
151 .optional()
152 .map_err(DbError::Sqlite)?;
153 let stored_dim: Option<usize> = conn
154 .query_row(
155 "SELECT CAST(value AS INTEGER) FROM metadata WHERE key = 'embedding_dimension'",
156 [],
157 |row| row.get::<_, i64>(0).map(|v| v as usize),
158 )
159 .optional()
160 .map_err(DbError::Sqlite)?;
161 // Embedding fingerprint is recorded together (Phase 6b backfill).
162 // If any field is missing the fingerprint is "unknown" — readers
163 // can still serve graph queries, just can't validate against a
164 // promoter swap later.
165 let embedding = match (stored_provider, stored_model, stored_dim) {
166 (Some(provider), Some(model), Some(dimension)) => Some(EmbeddingFingerprint {
167 provider,
168 model,
169 dimension,
170 }),
171 _ => None,
172 };
173
174 Ok(Self {
175 conn,
176 pinned: Some(PinnedAttach {
177 schema_version: stored_schema,
178 embedding,
179 }),
180 })
181 }
182
183 /// Open an in-memory database (for tests and benchmarks).
184 #[doc(hidden)]
185 pub fn open_memory() -> DbResult<Self> {
186 register_sqlite_vec();
187 let conn = Connection::open_in_memory()?;
188 conn.execute_batch("PRAGMA foreign_keys=ON;")
189 .map_err(DbError::Pragma)?;
190 conn.execute_batch(SCHEMA).map_err(DbError::Schema)?;
191 conn.execute_batch(RAG_SCHEMA).map_err(DbError::RagSchema)?;
192 conn.execute_batch(&rag_vec_schema(DEFAULT_EMBEDDING_DIM))
193 .map_err(DbError::RagSchema)?;
194 migrate(&conn);
195 conn.execute_batch(
196 "CREATE INDEX IF NOT EXISTS idx_edges_unresolved
197 ON edges(file_path) WHERE resolution_state = 0",
198 )
199 .map_err(DbError::Schema)?;
200 Ok(Self { conn, pinned: None })
201 }
202
203 /// True when this `Database` was opened via [`Self::open_readonly`].
204 /// MCP tool gating (Phase 4) consults this to refuse the 2 write tools.
205 pub fn is_read_only(&self) -> bool {
206 self.pinned.is_some()
207 }
208
209 /// Snapshot captured at attach time when [`Self::open_readonly`] was
210 /// used. `None` for read-write opens.
211 pub fn pinned_attach(&self) -> Option<&PinnedAttach> {
212 self.pinned.as_ref()
213 }
214
215 /// Cap the number of pages this DB connection can hold.
216 ///
217 /// Intended for tests that need to force a `SQLITE_FULL` error on a
218 /// subsequent write (for example, to verify that a transaction rolls back
219 /// cleanly). Production code should never call this.
220 #[doc(hidden)]
221 pub fn set_max_page_count_for_tests(&self, pages: u32) -> Result<()> {
222 self.conn
223 .execute_batch(&format!("PRAGMA max_page_count = {pages}"))?;
224 Ok(())
225 }
226
227 /// Open a single SQLite transaction that the caller is expected to wrap
228 /// around a multi-step indexing pipeline.
229 ///
230 /// Drop without `commit()` rolls back, so a panic mid-pipeline leaves the
231 /// DB in its prior state.
232 ///
233 /// # Calling conventions inside the transaction
234 ///
235 /// Helpers fall into two categories:
236 ///
237 /// 1. **Batched writers must use the `_in_tx` variant.** Their non-`_in_tx`
238 /// wrapper issues its own `BEGIN` and would error out at runtime
239 /// (`cannot start a transaction within a transaction`). Examples:
240 /// [`Self::insert_symbols_in_tx`], [`Self::delete_symbols_in_tx`],
241 /// [`Self::insert_edges_in_tx`], [`Self::insert_symbol_contents_in_tx`],
242 /// [`Self::clear_file_data_in_tx`], [`Self::remove_file_in_tx`],
243 /// [`Self::resolve_edges_in_tx`], [`Self::resolve_edges_scoped_in_tx`].
244 ///
245 /// 2. **Single-statement helpers can be called directly.** They issue one
246 /// `self.conn.execute(...)` and participate transparently in the active
247 /// transaction. Examples used by `cartog-indexer`'s Phase 3 today:
248 /// [`Self::upsert_file`], [`Self::clear_edges_for_file`],
249 /// [`Self::set_metadata`], [`Self::compute_in_degrees`],
250 /// [`Self::compute_in_degrees_scoped`], [`Self::invalidate_edges_targeting`].
251 /// These are tagged with `// tx-safe: single statement` so the contract
252 /// survives drive-by edits.
253 ///
254 /// # Why `unchecked_transaction` rather than [`rusqlite::Connection::transaction`]
255 ///
256 /// `transaction()` requires `&mut Connection`, which would force every
257 /// caller of `Database` to hold a mutable borrow for the entire pipeline.
258 /// `unchecked_transaction()` works through `&Connection` and produces an
259 /// equivalent [`rusqlite::Transaction`] with the same `DropBehavior::Rollback`
260 /// default — only borrow-check ergonomics differ.
261 ///
262 /// # Errors
263 ///
264 /// Returns an error if SQLite cannot begin a transaction — typically
265 /// because another transaction is already active on this connection.
266 pub fn begin_indexing_tx(&self) -> Result<rusqlite::Transaction<'_>> {
267 Ok(self.conn.unchecked_transaction()?)
268 }
269
270 /// Refresh the query planner's statistics via `PRAGMA optimize`.
271 ///
272 /// SQLite picks join order and index use from `sqlite_stat1`; without it,
273 /// the planner guesses from index shape alone and can mis-plan (the tier-2
274 /// resolution misplan in #110 was one such case). `PRAGMA optimize` runs
275 /// `ANALYZE` only on tables whose row counts have drifted since the last
276 /// analyze, so it is a cheap no-op when nothing changed — unlike a bare
277 /// `ANALYZE`, which would re-scan every index on each call and reintroduce
278 /// a per-index O(repo) cost.
279 ///
280 /// Call AFTER committing the indexing transaction, not inside it: a stats
281 /// rebuild bundled into the big write tx would bloat it. No-op-safe to call
282 /// when nothing was indexed, but the indexer skips it on no-op runs anyway.
283 pub fn optimize(&self) -> Result<()> {
284 self.conn
285 .execute_batch("PRAGMA optimize;")
286 .context("PRAGMA optimize (refresh planner statistics)")?;
287 Ok(())
288 }
289}