Skip to main content

cartog_db/store/
lifecycle.rs

1//! Database open/create/migrate lifecycle and connection setup.
2//!
3//! Part of the [`Database`](super::Database) impl, split out of `lib.rs` for navigability.
4
5use super::*;
6
7impl Database {
8    /// Open or create the database at the given path.
9    ///
10    /// `embedding_dim` sets the vector dimension for the sqlite-vec table.
11    /// If the stored dimension differs from the requested one, the vector index
12    /// is cleared and recreated (a re-index via `cartog rag index` is needed).
13    pub fn open(path: impl AsRef<std::path::Path>, embedding_dim: usize) -> DbResult<Self> {
14        register_sqlite_vec();
15        let db_path = path.as_ref();
16        // SQLite::open fails on a missing parent tree, so materialize `.cartog/`.
17        if let Some(parent) = db_path.parent() {
18            if !parent.as_os_str().is_empty() {
19                std::fs::create_dir_all(parent).map_err(|source| DbError::PrepareDir {
20                    path: parent.to_path_buf(),
21                    source,
22                })?;
23            }
24        }
25        let conn = Connection::open(db_path).map_err(|source| DbError::Open {
26            path: db_path.to_path_buf(),
27            source,
28        })?;
29        conn.execute_batch(&format!(
30            "PRAGMA journal_mode=WAL;
31             PRAGMA busy_timeout={BUSY_TIMEOUT_MS};
32             PRAGMA foreign_keys=ON;
33             PRAGMA synchronous=NORMAL;
34             PRAGMA cache_size=-65536;
35             PRAGMA temp_store=MEMORY;
36             PRAGMA mmap_size=268435456;"
37        ))
38        .map_err(DbError::Pragma)?;
39        conn.execute_batch(SCHEMA).map_err(DbError::Schema)?;
40        conn.execute_batch(RAG_SCHEMA).map_err(DbError::RagSchema)?;
41        backup_before_destructive_migration(&conn, db_path)?;
42        migrate(&conn);
43        // Partial index requires resolution_state (added in migration 3→4),
44        // so create it after migrate() rather than from SCHEMA.
45        conn.execute_batch(
46            "CREATE INDEX IF NOT EXISTS idx_edges_unresolved
47                 ON edges(file_path) WHERE resolution_state = 0",
48        )
49        .map_err(DbError::Schema)?;
50        handle_embedding_dimension(&conn, embedding_dim).map_err(DbError::EmbeddingDimension)?;
51        Ok(Self { conn, pinned: None })
52    }
53
54    /// Open an existing on-disk database in **read-write** mode without
55    /// running schema migrations or the embedding-fingerprint reconcile.
56    /// Used by the Phase 5 promoter: a secondary that detected its primary
57    /// died and validated the on-disk schema/fingerprint against its
58    /// pinned snapshot before claiming the slot. Re-running the migration
59    /// would re-trigger the SQLITE_BUSY race that the election was meant
60    /// to prevent.
61    ///
62    /// Verifies that `schema_version` still matches `SCHEMA_VERSION` to
63    /// guard against a race where another writer upgraded the schema
64    /// between the secondary's attach and this promotion. Returns
65    /// [`DbError::SchemaDrift`] in that case so the promoter aborts and
66    /// the MCP process exits cleanly.
67    pub fn open_existing_rw(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
68        register_sqlite_vec();
69        let db_path = path.as_ref();
70        let conn = Connection::open(db_path).map_err(|source| DbError::Open {
71            path: db_path.to_path_buf(),
72            source,
73        })?;
74        conn.execute_batch(&format!(
75            "PRAGMA journal_mode=WAL;
76             PRAGMA busy_timeout={BUSY_TIMEOUT_MS};
77             PRAGMA foreign_keys=ON;
78             PRAGMA synchronous=NORMAL;
79             PRAGMA cache_size=-65536;
80             PRAGMA temp_store=MEMORY;
81             PRAGMA mmap_size=268435456;"
82        ))
83        .map_err(DbError::Pragma)?;
84
85        let stored_schema = read_schema_version(&conn)?;
86        if stored_schema != SCHEMA_VERSION {
87            return Err(DbError::SchemaDrift {
88                expected: SCHEMA_VERSION,
89                stored: stored_schema,
90            });
91        }
92
93        Ok(Self { conn, pinned: None })
94    }
95
96    /// Open an existing on-disk database in **read-only** mode for a
97    /// secondary cartog process (Phase 4 read-only attach). Skips schema
98    /// migrations and the embedding-fingerprint reconcile — the primary
99    /// writer owns those.
100    ///
101    /// Behaviour:
102    /// - Opens with `SQLITE_OPEN_READ_ONLY` so write attempts surface as
103    ///   `SQLITE_READONLY` errors at runtime (a defense-in-depth backup
104    ///   for the higher-level tool gating).
105    /// - Reads the `metadata` snapshot (schema version + embedding
106    ///   fingerprint) and stores it on the returned [`Database`] so the
107    ///   promoter (Phase 5) can compare against the on-disk values later.
108    /// - Returns [`DbError::SchemaDrift`] if the stored `schema_version`
109    ///   doesn't match this binary's expected version — the primary
110    ///   upgraded cartog underneath us and queries can't be trusted.
111    pub fn open_readonly(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
112        use rusqlite::OpenFlags;
113        register_sqlite_vec();
114        let db_path = path.as_ref();
115        let conn = Connection::open_with_flags(
116            db_path,
117            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
118        )
119        .map_err(|source| DbError::Open {
120            path: db_path.to_path_buf(),
121            source,
122        })?;
123        // busy_timeout is still useful: a long read can stall against a
124        // writer mid-checkpoint. WAL keeps readers and writers from
125        // blocking otherwise, but the timeout makes the bound explicit.
126        conn.execute_batch(&format!("PRAGMA busy_timeout={BUSY_TIMEOUT_MS};"))
127            .map_err(DbError::Pragma)?;
128
129        let stored_schema = read_schema_version(&conn)?;
130        if stored_schema != SCHEMA_VERSION {
131            return Err(DbError::SchemaDrift {
132                expected: SCHEMA_VERSION,
133                stored: stored_schema,
134            });
135        }
136
137        let stored_provider: Option<String> = conn
138            .query_row(
139                "SELECT value FROM metadata WHERE key = ?1",
140                params![EMBED_PROVIDER_KEY],
141                |row| row.get(0),
142            )
143            .optional()
144            .map_err(DbError::Sqlite)?;
145        let stored_model: Option<String> = conn
146            .query_row(
147                "SELECT value FROM metadata WHERE key = ?1",
148                params![EMBED_MODEL_KEY],
149                |row| row.get(0),
150            )
151            .optional()
152            .map_err(DbError::Sqlite)?;
153        let stored_dim: Option<usize> = conn
154            .query_row(
155                "SELECT CAST(value AS INTEGER) FROM metadata WHERE key = 'embedding_dimension'",
156                [],
157                |row| row.get::<_, i64>(0).map(|v| v as usize),
158            )
159            .optional()
160            .map_err(DbError::Sqlite)?;
161        // Embedding fingerprint is recorded together (Phase 6b backfill).
162        // If any field is missing the fingerprint is "unknown" — readers
163        // can still serve graph queries, just can't validate against a
164        // promoter swap later.
165        let embedding = match (stored_provider, stored_model, stored_dim) {
166            (Some(provider), Some(model), Some(dimension)) => Some(EmbeddingFingerprint {
167                provider,
168                model,
169                dimension,
170            }),
171            _ => None,
172        };
173
174        Ok(Self {
175            conn,
176            pinned: Some(PinnedAttach {
177                schema_version: stored_schema,
178                embedding,
179            }),
180        })
181    }
182
183    /// Open an in-memory database (for tests and benchmarks).
184    #[doc(hidden)]
185    pub fn open_memory() -> DbResult<Self> {
186        register_sqlite_vec();
187        let conn = Connection::open_in_memory()?;
188        conn.execute_batch("PRAGMA foreign_keys=ON;")
189            .map_err(DbError::Pragma)?;
190        conn.execute_batch(SCHEMA).map_err(DbError::Schema)?;
191        conn.execute_batch(RAG_SCHEMA).map_err(DbError::RagSchema)?;
192        conn.execute_batch(&rag_vec_schema(DEFAULT_EMBEDDING_DIM))
193            .map_err(DbError::RagSchema)?;
194        migrate(&conn);
195        conn.execute_batch(
196            "CREATE INDEX IF NOT EXISTS idx_edges_unresolved
197                 ON edges(file_path) WHERE resolution_state = 0",
198        )
199        .map_err(DbError::Schema)?;
200        Ok(Self { conn, pinned: None })
201    }
202
203    /// True when this `Database` was opened via [`Self::open_readonly`].
204    /// MCP tool gating (Phase 4) consults this to refuse the 2 write tools.
205    pub fn is_read_only(&self) -> bool {
206        self.pinned.is_some()
207    }
208
209    /// Snapshot captured at attach time when [`Self::open_readonly`] was
210    /// used. `None` for read-write opens.
211    pub fn pinned_attach(&self) -> Option<&PinnedAttach> {
212        self.pinned.as_ref()
213    }
214
215    /// Cap the number of pages this DB connection can hold.
216    ///
217    /// Intended for tests that need to force a `SQLITE_FULL` error on a
218    /// subsequent write (for example, to verify that a transaction rolls back
219    /// cleanly). Production code should never call this.
220    #[doc(hidden)]
221    pub fn set_max_page_count_for_tests(&self, pages: u32) -> Result<()> {
222        self.conn
223            .execute_batch(&format!("PRAGMA max_page_count = {pages}"))?;
224        Ok(())
225    }
226
227    /// Open a single SQLite transaction that the caller is expected to wrap
228    /// around a multi-step indexing pipeline.
229    ///
230    /// Drop without `commit()` rolls back, so a panic mid-pipeline leaves the
231    /// DB in its prior state.
232    ///
233    /// # Calling conventions inside the transaction
234    ///
235    /// Helpers fall into two categories:
236    ///
237    /// 1. **Batched writers must use the `_in_tx` variant.** Their non-`_in_tx`
238    ///    wrapper issues its own `BEGIN` and would error out at runtime
239    ///    (`cannot start a transaction within a transaction`). Examples:
240    ///    [`Self::insert_symbols_in_tx`], [`Self::delete_symbols_in_tx`],
241    ///    [`Self::insert_edges_in_tx`], [`Self::insert_symbol_contents_in_tx`],
242    ///    [`Self::clear_file_data_in_tx`], [`Self::remove_file_in_tx`],
243    ///    [`Self::resolve_edges_in_tx`], [`Self::resolve_edges_scoped_in_tx`].
244    ///
245    /// 2. **Single-statement helpers can be called directly.** They issue one
246    ///    `self.conn.execute(...)` and participate transparently in the active
247    ///    transaction. Examples used by `cartog-indexer`'s Phase 3 today:
248    ///    [`Self::upsert_file`], [`Self::clear_edges_for_file`],
249    ///    [`Self::set_metadata`], [`Self::compute_in_degrees`],
250    ///    [`Self::compute_in_degrees_scoped`], [`Self::invalidate_edges_targeting`].
251    ///    These are tagged with `// tx-safe: single statement` so the contract
252    ///    survives drive-by edits.
253    ///
254    /// # Why `unchecked_transaction` rather than [`rusqlite::Connection::transaction`]
255    ///
256    /// `transaction()` requires `&mut Connection`, which would force every
257    /// caller of `Database` to hold a mutable borrow for the entire pipeline.
258    /// `unchecked_transaction()` works through `&Connection` and produces an
259    /// equivalent [`rusqlite::Transaction`] with the same `DropBehavior::Rollback`
260    /// default — only borrow-check ergonomics differ.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if SQLite cannot begin a transaction — typically
265    /// because another transaction is already active on this connection.
266    pub fn begin_indexing_tx(&self) -> Result<rusqlite::Transaction<'_>> {
267        Ok(self.conn.unchecked_transaction()?)
268    }
269
270    /// Refresh the query planner's statistics via `PRAGMA optimize`.
271    ///
272    /// SQLite picks join order and index use from `sqlite_stat1`; without it,
273    /// the planner guesses from index shape alone and can mis-plan (the tier-2
274    /// resolution misplan in #110 was one such case). `PRAGMA optimize` runs
275    /// `ANALYZE` only on tables whose row counts have drifted since the last
276    /// analyze, so it is a cheap no-op when nothing changed — unlike a bare
277    /// `ANALYZE`, which would re-scan every index on each call and reintroduce
278    /// a per-index O(repo) cost.
279    ///
280    /// Call AFTER committing the indexing transaction, not inside it: a stats
281    /// rebuild bundled into the big write tx would bloat it. No-op-safe to call
282    /// when nothing was indexed, but the indexer skips it on no-op runs anyway.
283    pub fn optimize(&self) -> Result<()> {
284        self.conn
285            .execute_batch("PRAGMA optimize;")
286            .context("PRAGMA optimize (refresh planner statistics)")?;
287        Ok(())
288    }
289}