Skip to main content

cartog_db/store/
crud.rs

1//! Metadata, file, symbol, and edge writes (the core CRUD surface).
2//!
3//! Part of the [`Database`](super::Database) impl, split out of `lib.rs` for navigability.
4
5use super::*;
6
7impl Database {
8    // ── Metadata ──
9
10    /// Retrieve a metadata value by key.
11    pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
12        self.conn
13            .query_row(
14                "SELECT value FROM metadata WHERE key = ?1",
15                params![key],
16                |row| row.get(0),
17            )
18            .optional()
19            .context("Failed to query metadata")
20    }
21
22    /// Store a metadata key-value pair (upserts on conflict).
23    ///
24    /// tx-safe: single statement — see [`Self::begin_indexing_tx`].
25    pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
26        self.conn.execute(
27            "INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)",
28            params![key, value],
29        )?;
30        Ok(())
31    }
32
33    /// Reconcile the stored embedding fingerprint with the one currently in
34    /// use. Call right after `Database::open` from any code path that owns
35    /// an `EmbeddingProvider` (indexer, watcher, MCP serve, `rag index`).
36    ///
37    /// Behavior:
38    /// - All three fields (provider, model, dimension) match stored → no-op,
39    ///   zero writes.
40    /// - Any field differs → drop `symbol_vec`, clear `symbol_embedding_map`,
41    ///   recreate the vector table at the new dimension, update all three
42    ///   metadata keys. The user must run `cartog rag index` to repopulate.
43    /// - DB has dimension but no provider/model (older cartog versions) →
44    ///   backfill provider+model without wiping. The stored vectors stay
45    ///   valid against whatever stack produced them; we just record the
46    ///   identity going forward.
47    ///
48    /// Writes use `retry_busy` so a concurrent writer on the same DB does
49    /// not crash this caller with `SQLITE_BUSY`.
50    pub fn reconcile_embedding_fingerprint(&self, fp: &EmbeddingFingerprint) -> Result<()> {
51        let stored_provider: Option<String> = self.get_metadata(EMBED_PROVIDER_KEY)?;
52        let stored_model: Option<String> = self.get_metadata(EMBED_MODEL_KEY)?;
53        let stored_dim: Option<usize> = self
54            .conn
55            .query_row(
56                "SELECT CAST(value AS INTEGER) FROM metadata WHERE key = 'embedding_dimension'",
57                [],
58                |row| row.get::<_, i64>(0).map(|v| v as usize),
59            )
60            .optional()
61            .context("Failed to query embedding dimension")?;
62
63        // Full match AND `symbol_vec` actually exists on disk: zero writes.
64        // The dim+table pair is the real invariant; checking metadata
65        // alone misses the case where a previous open crashed mid-
66        // migration and left the DB without `symbol_vec` while metadata
67        // still claims a fingerprint.
68        if stored_provider.as_deref() == Some(fp.provider.as_str())
69            && stored_model.as_deref() == Some(fp.model.as_str())
70            && stored_dim == Some(fp.dimension)
71            && symbol_vec_exists(&self.conn)?
72        {
73            return Ok(());
74        }
75
76        // Backwards-compat: stored has dim from an older cartog (no provider/model
77        // recorded yet). Treat first-time-with-provider as a backfill, not an
78        // invalidation. Embeddings produced by the previous run are still valid
79        // against whatever stack the user had configured then.
80        let dim_matches = stored_dim == Some(fp.dimension);
81        let is_backfill = dim_matches && stored_provider.is_none() && stored_model.is_none();
82
83        if !is_backfill {
84            tracing::warn!(
85                old_provider = ?stored_provider,
86                old_model = ?stored_model,
87                old_dim = ?stored_dim,
88                new_provider = %fp.provider,
89                new_model = %fp.model,
90                new_dim = fp.dimension,
91                "Embedding fingerprint changed — clearing vector index. Run `cartog rag index` to re-embed."
92            );
93        }
94
95        // Wrap the whole sequence in a transaction so a mid-sequence
96        // failure (e.g. busy-retry exhausted on the third metadata write)
97        // rolls back atomically. Otherwise the next open could see
98        // partial state — e.g. provider/model match but dimension stale,
99        // or symbol_vec dropped but metadata still pointing at the old
100        // dim — and either skip migration or silently re-wipe.
101        let schema = rag_vec_schema(fp.dimension);
102        let do_wipe = !is_backfill;
103        retry_busy(|| {
104            let tx = self.conn.unchecked_transaction()?;
105            if do_wipe {
106                tx.execute("DROP TABLE IF EXISTS symbol_vec", [])?;
107                tx.execute("DELETE FROM symbol_embedding_map", [])?;
108            }
109            tx.execute_batch(&schema)?;
110            tx.execute(
111                "INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)",
112                params![EMBED_PROVIDER_KEY, fp.provider],
113            )?;
114            tx.execute(
115                "INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)",
116                params![EMBED_MODEL_KEY, fp.model],
117            )?;
118            #[cfg(test)]
119            if RECONCILE_FAIL_AFTER_MODEL
120                .with(|b| b.swap(false, std::sync::atomic::Ordering::SeqCst))
121            {
122                return Err(rusqlite::Error::SqliteFailure(
123                    rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_FULL),
124                    Some("injected mid-sequence failure".into()),
125                ));
126            }
127            tx.execute(
128                "INSERT OR REPLACE INTO metadata (key, value) VALUES ('embedding_dimension', ?1)",
129                params![fp.dimension.to_string()],
130            )?;
131            tx.commit()
132        })
133        .map_err(|e| anyhow::anyhow!("failed to reconcile embedding fingerprint: {e}"))?;
134
135        Ok(())
136    }
137
138    // ── Files ──
139
140    /// Insert or update file metadata.
141    ///
142    /// tx-safe: single statement — see [`Self::begin_indexing_tx`].
143    pub fn upsert_file(&self, file: &FileInfo) -> Result<()> {
144        self.conn.execute(
145            "INSERT OR REPLACE INTO files (path, last_modified, hash, language, num_symbols)
146             VALUES (?1, ?2, ?3, ?4, ?5)",
147            params![
148                file.path,
149                file.last_modified,
150                file.hash,
151                file.language,
152                file.num_symbols,
153            ],
154        )?;
155        Ok(())
156    }
157
158    /// Look up stored metadata for a file.
159    pub fn get_file(&self, path: &str) -> Result<Option<FileInfo>> {
160        self.conn
161            .query_row(
162                "SELECT path, last_modified, hash, language, num_symbols FROM files WHERE path = ?1",
163                params![path],
164                |row| {
165                    Ok(FileInfo {
166                        path: row.get(0)?,
167                        last_modified: row.get(1)?,
168                        hash: row.get(2)?,
169                        language: row.get(3)?,
170                        num_symbols: row.get(4)?,
171                    })
172                },
173            )
174            .optional()
175            .context("Failed to query file")
176    }
177
178    /// Remove edges only for a file (used by Merkle diff which updates symbols surgically).
179    ///
180    /// tx-safe: single statement — see [`Self::begin_indexing_tx`].
181    pub fn clear_edges_for_file(&self, path: &str) -> Result<()> {
182        self.conn
183            .execute("DELETE FROM edges WHERE file_path = ?1", params![path])?;
184        Ok(())
185    }
186
187    /// Remove all symbols, edges, and RAG data for a file (before re-indexing it).
188    pub fn clear_file_data(&self, path: &str) -> Result<()> {
189        let tx = self.conn.unchecked_transaction()?;
190        self.clear_file_data_in_tx(path)?;
191        tx.commit()?;
192        Ok(())
193    }
194
195    /// Like [`Self::clear_file_data`] but assumes the caller already holds an
196    /// open transaction. Used by `cartog-indexer` to wrap the entire Phase 3
197    /// pipeline atomically.
198    pub fn clear_file_data_in_tx(&self, path: &str) -> Result<()> {
199        self.clear_rag_data_for_file(path)?;
200        self.conn
201            .execute("DELETE FROM edges WHERE file_path = ?1", params![path])?;
202        self.conn
203            .execute("DELETE FROM symbols WHERE file_path = ?1", params![path])?;
204        Ok(())
205    }
206
207    /// Remove a file and all its symbols and edges from the index.
208    pub fn remove_file(&self, path: &str) -> Result<()> {
209        let tx = self.conn.unchecked_transaction()?;
210        self.remove_file_in_tx(path)?;
211        tx.commit()?;
212        Ok(())
213    }
214
215    /// Like [`Self::remove_file`] but assumes the caller already holds an
216    /// open transaction.
217    pub fn remove_file_in_tx(&self, path: &str) -> Result<()> {
218        self.clear_file_data_in_tx(path)?;
219        self.conn
220            .execute("DELETE FROM files WHERE path = ?1", params![path])?;
221        Ok(())
222    }
223
224    // ── Symbols ──
225
226    /// Insert or replace a single symbol.
227    #[cfg_attr(not(test), allow(dead_code))]
228    pub fn insert_symbol(&self, sym: &Symbol) -> Result<()> {
229        self.conn
230            .prepare_cached(SQL_INSERT_SYMBOL)?
231            .execute(params![
232                sym.id,
233                sym.name,
234                sym.kind.as_str(),
235                sym.file_path,
236                sym.start_line,
237                sym.end_line,
238                sym.start_byte,
239                sym.end_byte,
240                sym.parent_id,
241                sym.signature,
242                sym.visibility.as_str(),
243                sym.is_async,
244                sym.docstring,
245                sym.content_hash,
246                sym.subtree_hash,
247            ])?;
248        Ok(())
249    }
250
251    /// Insert or replace multiple symbols in a single transaction.
252    pub fn insert_symbols(&self, symbols: &[Symbol]) -> Result<()> {
253        let tx = self.conn.unchecked_transaction()?;
254        self.insert_symbols_in_tx(symbols)?;
255        tx.commit()?;
256        Ok(())
257    }
258
259    /// Like [`Self::insert_symbols`] but assumes the caller already holds an
260    /// open transaction.
261    pub fn insert_symbols_in_tx(&self, symbols: &[Symbol]) -> Result<()> {
262        let mut stmt = self.conn.prepare_cached(SQL_INSERT_SYMBOL)?;
263        for sym in symbols {
264            stmt.execute(params![
265                sym.id,
266                sym.name,
267                sym.kind.as_str(),
268                sym.file_path,
269                sym.start_line,
270                sym.end_line,
271                sym.start_byte,
272                sym.end_byte,
273                sym.parent_id,
274                sym.signature,
275                sym.visibility.as_str(),
276                sym.is_async,
277                sym.docstring,
278                sym.content_hash,
279                sym.subtree_hash,
280            ])?;
281        }
282        Ok(())
283    }
284
285    /// Get stored symbol hashes for a file (for Merkle diff).
286    /// Returns `(id, content_hash, subtree_hash)` tuples.
287    #[allow(clippy::type_complexity)]
288    pub fn get_symbol_hashes_for_file(
289        &self,
290        file_path: &str,
291    ) -> Result<Vec<(String, Option<String>, Option<String>)>> {
292        let mut stmt = self
293            .conn
294            .prepare("SELECT id, content_hash, subtree_hash FROM symbols WHERE file_path = ?1")?;
295        let rows = stmt
296            .query_map(params![file_path], |row| {
297                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
298            })?
299            .collect::<std::result::Result<Vec<_>, _>>()?;
300        Ok(rows)
301    }
302
303    /// Update only the position fields of a symbol (for moved-but-unchanged symbols).
304    pub fn update_symbol_position(
305        &self,
306        id: &str,
307        start_line: u32,
308        end_line: u32,
309        start_byte: u32,
310        end_byte: u32,
311    ) -> Result<()> {
312        self.conn.execute(
313            "UPDATE symbols SET start_line = ?2, end_line = ?3,
314                    start_byte = ?4, end_byte = ?5 WHERE id = ?1",
315            params![id, start_line, end_line, start_byte, end_byte],
316        )?;
317        Ok(())
318    }
319
320    /// Delete multiple symbols and cascade (edges, content, embeddings) in a
321    /// single transaction.
322    pub fn delete_symbols(&self, ids: &[String]) -> Result<()> {
323        if ids.is_empty() {
324            return Ok(());
325        }
326        let tx = self.conn.unchecked_transaction()?;
327        self.delete_symbols_in_tx(ids)?;
328        tx.commit()?;
329        Ok(())
330    }
331
332    /// Like [`Self::delete_symbols`] but assumes the caller already holds an
333    /// open transaction.
334    pub fn delete_symbols_in_tx(&self, ids: &[String]) -> Result<()> {
335        if ids.is_empty() {
336            return Ok(());
337        }
338        let mut del_out = self
339            .conn
340            .prepare_cached("DELETE FROM edges WHERE source_id = ?1")?;
341        // Reset state alongside target_id so the orphaned edge re-enters
342        // unresolved_edges() instead of becoming a (NULL, state=1) zombie.
343        // Clear resolution_source too, else the edge keeps a stale provenance
344        // tag that no longer reflects a real target until/unless it re-resolves.
345        let mut null_in = self.conn.prepare_cached(
346            "UPDATE edges SET target_id = NULL, resolution_state = 0, resolution_source = NULL
347             WHERE target_id = ?1",
348        )?;
349        let mut del_content = self
350            .conn
351            .prepare_cached("DELETE FROM symbol_content WHERE symbol_id = ?1")?;
352        let mut del_sym = self
353            .conn
354            .prepare_cached("DELETE FROM symbols WHERE id = ?1")?;
355        for id in ids {
356            del_out.execute(params![id])?;
357            null_in.execute(params![id])?;
358            // Embedding vec+map delete shared with clear_embeddings_for_symbols_in_tx.
359            self.delete_embedding_rows_for_id_in_tx(id)?;
360            del_content.execute(params![id])?;
361            del_sym.execute(params![id])?;
362        }
363        Ok(())
364    }
365
366    /// Delete a single symbol and cascade to edges, content, and embeddings.
367    pub fn delete_symbol(&self, id: &str) -> Result<()> {
368        let tx = self.conn.unchecked_transaction()?;
369        self.conn
370            .execute("DELETE FROM edges WHERE source_id = ?1", params![id])?;
371        self.conn.execute(
372            "UPDATE edges SET target_id = NULL, resolution_state = 0, resolution_source = NULL
373             WHERE target_id = ?1",
374            params![id],
375        )?;
376        let _ = self.conn.execute(
377            "DELETE FROM symbol_vec WHERE rowid IN \
378             (SELECT id FROM symbol_embedding_map WHERE symbol_id = ?1)",
379            params![id],
380        );
381        let _ = self.conn.execute(
382            "DELETE FROM symbol_embedding_map WHERE symbol_id = ?1",
383            params![id],
384        );
385        let _ = self.conn.execute(
386            "DELETE FROM symbol_content WHERE symbol_id = ?1",
387            params![id],
388        );
389        self.conn
390            .execute("DELETE FROM symbols WHERE id = ?1", params![id])?;
391        tx.commit()?;
392        Ok(())
393    }
394
395    // ── Edges ──
396
397    /// Insert a single edge.
398    #[cfg_attr(not(test), allow(dead_code))]
399    pub fn insert_edge(&self, edge: &Edge) -> Result<()> {
400        self.conn.prepare_cached(SQL_INSERT_EDGE)?.execute(params![
401            edge.source_id,
402            edge.target_name,
403            edge.target_id,
404            edge.kind.as_str(),
405            edge.file_path,
406            edge.line,
407            i64::from(edge.target_id.is_some()),
408            edge.provenance.map(|p| p.as_str()),
409        ])?;
410        Ok(())
411    }
412
413    /// Insert multiple edges in a single transaction.
414    pub fn insert_edges(&self, edges: &[Edge]) -> Result<()> {
415        let tx = self.conn.unchecked_transaction()?;
416        self.insert_edges_in_tx(edges)?;
417        tx.commit()?;
418        Ok(())
419    }
420
421    /// Like [`Self::insert_edges`] but assumes the caller already holds an
422    /// open transaction.
423    pub fn insert_edges_in_tx(&self, edges: &[Edge]) -> Result<()> {
424        let mut stmt = self.conn.prepare_cached(SQL_INSERT_EDGE)?;
425        for edge in edges {
426            stmt.execute(params![
427                edge.source_id,
428                edge.target_name,
429                edge.target_id,
430                edge.kind.as_str(),
431                edge.file_path,
432                edge.line,
433                i64::from(edge.target_id.is_some()),
434                edge.provenance.map(|p| p.as_str()),
435            ])?;
436        }
437        Ok(())
438    }
439}