1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
//! Database open/create/migrate lifecycle and connection setup.
//!
//! Part of the [`Database`](super::Database) impl, split out of `lib.rs` for navigability.
use super::*;
impl Database {
/// Open or create the database at the given path.
///
/// `embedding_dim` sets the vector dimension for the sqlite-vec table.
/// If the stored dimension differs from the requested one, the vector index
/// is cleared and recreated (a re-index via `cartog rag index` is needed).
pub fn open(path: impl AsRef<std::path::Path>, embedding_dim: usize) -> DbResult<Self> {
register_sqlite_vec();
let db_path = path.as_ref();
// SQLite::open fails on a missing parent tree, so materialize `.cartog/`.
if let Some(parent) = db_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|source| DbError::PrepareDir {
path: parent.to_path_buf(),
source,
})?;
}
}
let conn = Connection::open(db_path).map_err(|source| DbError::Open {
path: db_path.to_path_buf(),
source,
})?;
conn.execute_batch(&format!(
"PRAGMA journal_mode=WAL;
PRAGMA busy_timeout={BUSY_TIMEOUT_MS};
PRAGMA foreign_keys=ON;
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=-65536;
PRAGMA temp_store=MEMORY;
PRAGMA mmap_size=268435456;"
))
.map_err(DbError::Pragma)?;
conn.execute_batch(SCHEMA).map_err(DbError::Schema)?;
conn.execute_batch(RAG_SCHEMA).map_err(DbError::RagSchema)?;
backup_before_destructive_migration(&conn, db_path)?;
migrate(&conn);
// Partial index requires resolution_state (added in migration 3→4),
// so create it after migrate() rather than from SCHEMA.
conn.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_edges_unresolved
ON edges(file_path) WHERE resolution_state = 0",
)
.map_err(DbError::Schema)?;
handle_embedding_dimension(&conn, embedding_dim).map_err(DbError::EmbeddingDimension)?;
Ok(Self { conn, pinned: None })
}
/// Open an existing on-disk database in **read-write** mode without
/// running schema migrations or the embedding-fingerprint reconcile.
/// Used by the Phase 5 promoter: a secondary that detected its primary
/// died and validated the on-disk schema/fingerprint against its
/// pinned snapshot before claiming the slot. Re-running the migration
/// would re-trigger the SQLITE_BUSY race that the election was meant
/// to prevent.
///
/// Verifies that `schema_version` still matches `SCHEMA_VERSION` to
/// guard against a race where another writer upgraded the schema
/// between the secondary's attach and this promotion. Returns
/// [`DbError::SchemaDrift`] in that case so the promoter aborts and
/// the MCP process exits cleanly.
pub fn open_existing_rw(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
register_sqlite_vec();
let db_path = path.as_ref();
let conn = Connection::open(db_path).map_err(|source| DbError::Open {
path: db_path.to_path_buf(),
source,
})?;
conn.execute_batch(&format!(
"PRAGMA journal_mode=WAL;
PRAGMA busy_timeout={BUSY_TIMEOUT_MS};
PRAGMA foreign_keys=ON;
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=-65536;
PRAGMA temp_store=MEMORY;
PRAGMA mmap_size=268435456;"
))
.map_err(DbError::Pragma)?;
let stored_schema = read_schema_version(&conn)?;
if stored_schema != SCHEMA_VERSION {
return Err(DbError::SchemaDrift {
expected: SCHEMA_VERSION,
stored: stored_schema,
});
}
Ok(Self { conn, pinned: None })
}
/// Open an existing on-disk database in **read-only** mode for a
/// secondary cartog process (Phase 4 read-only attach). Skips schema
/// migrations and the embedding-fingerprint reconcile — the primary
/// writer owns those.
///
/// Behaviour:
/// - Opens with `SQLITE_OPEN_READ_ONLY` so write attempts surface as
/// `SQLITE_READONLY` errors at runtime (a defense-in-depth backup
/// for the higher-level tool gating).
/// - Reads the `metadata` snapshot (schema version + embedding
/// fingerprint) and stores it on the returned [`Database`] so the
/// promoter (Phase 5) can compare against the on-disk values later.
/// - Returns [`DbError::SchemaDrift`] if the stored `schema_version`
/// doesn't match this binary's expected version — the primary
/// upgraded cartog underneath us and queries can't be trusted.
pub fn open_readonly(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
use rusqlite::OpenFlags;
register_sqlite_vec();
let db_path = path.as_ref();
let conn = Connection::open_with_flags(
db_path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.map_err(|source| DbError::Open {
path: db_path.to_path_buf(),
source,
})?;
// busy_timeout is still useful: a long read can stall against a
// writer mid-checkpoint. WAL keeps readers and writers from
// blocking otherwise, but the timeout makes the bound explicit.
conn.execute_batch(&format!("PRAGMA busy_timeout={BUSY_TIMEOUT_MS};"))
.map_err(DbError::Pragma)?;
let stored_schema = read_schema_version(&conn)?;
if stored_schema != SCHEMA_VERSION {
return Err(DbError::SchemaDrift {
expected: SCHEMA_VERSION,
stored: stored_schema,
});
}
let stored_provider: Option<String> = conn
.query_row(
"SELECT value FROM metadata WHERE key = ?1",
params![EMBED_PROVIDER_KEY],
|row| row.get(0),
)
.optional()
.map_err(DbError::Sqlite)?;
let stored_model: Option<String> = conn
.query_row(
"SELECT value FROM metadata WHERE key = ?1",
params![EMBED_MODEL_KEY],
|row| row.get(0),
)
.optional()
.map_err(DbError::Sqlite)?;
let stored_dim: Option<usize> = conn
.query_row(
"SELECT CAST(value AS INTEGER) FROM metadata WHERE key = 'embedding_dimension'",
[],
|row| row.get::<_, i64>(0).map(|v| v as usize),
)
.optional()
.map_err(DbError::Sqlite)?;
// Embedding fingerprint is recorded together (Phase 6b backfill).
// If any field is missing the fingerprint is "unknown" — readers
// can still serve graph queries, just can't validate against a
// promoter swap later.
let embedding = match (stored_provider, stored_model, stored_dim) {
(Some(provider), Some(model), Some(dimension)) => Some(EmbeddingFingerprint {
provider,
model,
dimension,
}),
_ => None,
};
Ok(Self {
conn,
pinned: Some(PinnedAttach {
schema_version: stored_schema,
embedding,
}),
})
}
/// Open an in-memory database (for tests and benchmarks).
#[doc(hidden)]
pub fn open_memory() -> DbResult<Self> {
register_sqlite_vec();
let conn = Connection::open_in_memory()?;
conn.execute_batch("PRAGMA foreign_keys=ON;")
.map_err(DbError::Pragma)?;
conn.execute_batch(SCHEMA).map_err(DbError::Schema)?;
conn.execute_batch(RAG_SCHEMA).map_err(DbError::RagSchema)?;
conn.execute_batch(&rag_vec_schema(DEFAULT_EMBEDDING_DIM))
.map_err(DbError::RagSchema)?;
migrate(&conn);
conn.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_edges_unresolved
ON edges(file_path) WHERE resolution_state = 0",
)
.map_err(DbError::Schema)?;
Ok(Self { conn, pinned: None })
}
/// True when this `Database` was opened via [`Self::open_readonly`].
/// MCP tool gating (Phase 4) consults this to refuse the 2 write tools.
pub fn is_read_only(&self) -> bool {
self.pinned.is_some()
}
/// Snapshot captured at attach time when [`Self::open_readonly`] was
/// used. `None` for read-write opens.
pub fn pinned_attach(&self) -> Option<&PinnedAttach> {
self.pinned.as_ref()
}
/// Cap the number of pages this DB connection can hold.
///
/// Intended for tests that need to force a `SQLITE_FULL` error on a
/// subsequent write (for example, to verify that a transaction rolls back
/// cleanly). Production code should never call this.
#[doc(hidden)]
pub fn set_max_page_count_for_tests(&self, pages: u32) -> Result<()> {
self.conn
.execute_batch(&format!("PRAGMA max_page_count = {pages}"))?;
Ok(())
}
/// Open a single SQLite transaction that the caller is expected to wrap
/// around a multi-step indexing pipeline.
///
/// Drop without `commit()` rolls back, so a panic mid-pipeline leaves the
/// DB in its prior state.
///
/// # Calling conventions inside the transaction
///
/// Helpers fall into two categories:
///
/// 1. **Batched writers must use the `_in_tx` variant.** Their non-`_in_tx`
/// wrapper issues its own `BEGIN` and would error out at runtime
/// (`cannot start a transaction within a transaction`). Examples:
/// [`Self::insert_symbols_in_tx`], [`Self::delete_symbols_in_tx`],
/// [`Self::insert_edges_in_tx`], [`Self::insert_symbol_contents_in_tx`],
/// [`Self::clear_file_data_in_tx`], [`Self::remove_file_in_tx`],
/// [`Self::resolve_edges_in_tx`], [`Self::resolve_edges_scoped_in_tx`].
///
/// 2. **Single-statement helpers can be called directly.** They issue one
/// `self.conn.execute(...)` and participate transparently in the active
/// transaction. Examples used by `cartog-indexer`'s Phase 3 today:
/// [`Self::upsert_file`], [`Self::clear_edges_for_file`],
/// [`Self::set_metadata`], [`Self::compute_in_degrees`],
/// [`Self::compute_in_degrees_scoped`], [`Self::invalidate_edges_targeting`].
/// These are tagged with `// tx-safe: single statement` so the contract
/// survives drive-by edits.
///
/// # Why `unchecked_transaction` rather than [`rusqlite::Connection::transaction`]
///
/// `transaction()` requires `&mut Connection`, which would force every
/// caller of `Database` to hold a mutable borrow for the entire pipeline.
/// `unchecked_transaction()` works through `&Connection` and produces an
/// equivalent [`rusqlite::Transaction`] with the same `DropBehavior::Rollback`
/// default — only borrow-check ergonomics differ.
///
/// # Errors
///
/// Returns an error if SQLite cannot begin a transaction — typically
/// because another transaction is already active on this connection.
pub fn begin_indexing_tx(&self) -> Result<rusqlite::Transaction<'_>> {
Ok(self.conn.unchecked_transaction()?)
}
/// Refresh the query planner's statistics via `PRAGMA optimize`.
///
/// SQLite picks join order and index use from `sqlite_stat1`; without it,
/// the planner guesses from index shape alone and can mis-plan (the tier-2
/// resolution misplan in #110 was one such case). `PRAGMA optimize` runs
/// `ANALYZE` only on tables whose row counts have drifted since the last
/// analyze, so it is a cheap no-op when nothing changed — unlike a bare
/// `ANALYZE`, which would re-scan every index on each call and reintroduce
/// a per-index O(repo) cost.
///
/// Call AFTER committing the indexing transaction, not inside it: a stats
/// rebuild bundled into the big write tx would bloat it. No-op-safe to call
/// when nothing was indexed, but the indexer skips it on no-op runs anyway.
pub fn optimize(&self) -> Result<()> {
self.conn
.execute_batch("PRAGMA optimize;")
.context("PRAGMA optimize (refresh planner statistics)")?;
Ok(())
}
}