Skip to main content

normalize_semantic/
store.rs

1//! Embedding storage: read/write embeddings from/to the structural index SQLite.
2//!
3//! Operates through direct `libsql` calls on the same connection as `FileIndex`.
4//! The embeddings table is created lazily on first write.
5//!
6//! When sqlite-vec is available (`vec_embeddings` virtual table exists), inserts
7//! and deletes are mirrored there so that [`ann_search`] can use the ANN index
8//! instead of loading all vectors into memory.
9
10use crate::schema::{
11    CREATE_EMBEDDINGS_IDX_MODEL, CREATE_EMBEDDINGS_IDX_SOURCE, CREATE_EMBEDDINGS_TABLE,
12    DROP_EMBEDDINGS_IDX_MODEL, DROP_EMBEDDINGS_IDX_SOURCE, DROP_EMBEDDINGS_TABLE,
13    DROP_VEC_EMBEDDINGS_TABLE, create_vec_embeddings_ddl,
14};
15use crate::search::StoredEmbedding;
16use crate::vec_ext::VecConnection;
17use libsql::{Connection, params};
18
19/// Number of ANN candidates to retrieve before staleness re-ranking.
20///
21/// Fetching more candidates gives the re-ranker more material to work with;
22/// the caller can truncate to a smaller `top_k` afterwards.
23pub const ANN_CANDIDATE_COUNT: usize = 50;
24
25/// Ensure the embeddings table and indices exist.
26pub async fn ensure_schema(conn: &Connection) -> Result<(), libsql::Error> {
27    conn.execute(CREATE_EMBEDDINGS_TABLE, ()).await?;
28    conn.execute(CREATE_EMBEDDINGS_IDX_SOURCE, ()).await?;
29    conn.execute(CREATE_EMBEDDINGS_IDX_MODEL, ()).await?;
30    Ok(())
31}
32
33/// Ensure the `vec_embeddings` ANN virtual table exists.
34///
35/// When a `VecConnection` is provided, the DDL is executed on it (where
36/// sqlite-vec is registered).  Otherwise falls back to the main `libsql`
37/// connection (which may not have vec, in which case the CREATE silently
38/// fails).  Returns `true` if the table is available after this call.
39pub async fn ensure_vec_schema(
40    conn: &Connection,
41    dims: usize,
42    vec_conn: Option<&VecConnection>,
43) -> bool {
44    let ddl = create_vec_embeddings_ddl(dims);
45    if let Some(vc) = vec_conn {
46        vc.execute(&ddl).is_ok()
47    } else {
48        conn.execute(&ddl, ()).await.is_ok()
49    }
50}
51
52/// Returns `true` if the `vec_embeddings` virtual table exists and is queryable.
53///
54/// When a `VecConnection` is provided, queries through it; otherwise falls
55/// back to the `libsql` connection.
56pub async fn vec_table_available(conn: &Connection, vec_conn: Option<&VecConnection>) -> bool {
57    if let Some(vc) = vec_conn {
58        vc.execute("SELECT rowid FROM vec_embeddings LIMIT 1")
59            .is_ok()
60    } else {
61        conn.query("SELECT rowid FROM vec_embeddings LIMIT 1", ())
62            .await
63            .is_ok()
64    }
65}
66
67/// Insert or replace one embedding row.
68///
69/// Uses `INSERT OR REPLACE` keyed on the UNIQUE constraint
70/// `(source_type, source_path, source_id)` so re-indexing a symbol replaces the
71/// old vector in a single statement — no SELECT-then-DELETE round-trip.
72///
73/// The corresponding row in `vec_embeddings` is also updated when a
74/// `VecConnection` is provided (best-effort; errors are silently ignored).
75#[allow(clippy::too_many_arguments)]
76pub async fn upsert_embedding(
77    conn: &Connection,
78    source_type: &str,
79    source_path: &str,
80    source_id: Option<i64>,
81    model: &str,
82    last_commit: Option<&str>,
83    staleness: f32,
84    chunk_text: &str,
85    embedding_bytes: &[u8],
86    vec_conn: Option<&VecConnection>,
87) -> Result<(), libsql::Error> {
88    // INSERT OR REPLACE: if a row with the same (source_type, source_path, source_id)
89    // already exists, SQLite deletes it and inserts the new one. This gives us a new
90    // rowid, which we use for the vec_embeddings mirror.
91    conn.execute(
92        "INSERT OR REPLACE INTO embeddings (source_type, source_path, source_id, model, last_commit, staleness, chunk_text, embedding)
93         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
94        params![
95            source_type,
96            source_path,
97            source_id,
98            model,
99            last_commit,
100            staleness as f64,
101            chunk_text,
102            embedding_bytes.to_vec()
103        ],
104    )
105    .await?;
106
107    // Mirror into vec_embeddings for ANN queries (best-effort).
108    // INSERT OR REPLACE handles the case where the old rowid was already present.
109    let new_id: i64 = {
110        let mut rows = conn.query("SELECT last_insert_rowid()", ()).await?;
111        if let Some(row) = rows.next().await? {
112            row.get(0)?
113        } else {
114            return Ok(());
115        }
116    };
117
118    if let Some(vc) = vec_conn {
119        if let Ok(stmt) =
120            vc.prepare("INSERT OR REPLACE INTO vec_embeddings(rowid, embedding) VALUES (?1, ?2)")
121        {
122            stmt.bind_int64(1, new_id);
123            stmt.bind_blob(2, embedding_bytes);
124            let _ = stmt.step();
125        }
126    } else {
127        let _ = conn
128            .execute(
129                "INSERT OR REPLACE INTO vec_embeddings(rowid, embedding) VALUES (?1, ?2)",
130                params![new_id, embedding_bytes.to_vec()],
131            )
132            .await;
133    }
134
135    Ok(())
136}
137
138/// ANN search using the `vec_embeddings` virtual table.
139///
140/// Returns up to `k` candidate rows from `embeddings` ordered by vector
141/// distance (closest first), ready for staleness re-ranking.
142///
143/// When a `VecConnection` is provided, uses it for the vec query (required for
144/// sqlite-vec support).  Returns `None` if `vec_embeddings` is not available.
145/// The caller should fall back to [`load_all_embeddings`] +
146/// [`crate::search::rerank`] in that case.
147pub async fn ann_search(
148    conn: &Connection,
149    model: &str,
150    query_bytes: &[u8],
151    k: usize,
152    vec_conn: Option<&VecConnection>,
153) -> Option<Vec<StoredEmbedding>> {
154    // First verify the virtual table exists.
155    if !vec_table_available(conn, vec_conn).await {
156        return None;
157    }
158
159    // sqlite-vec ANN query: returns rowid + distance for the k nearest vectors.
160    // We JOIN back to `embeddings` to get all metadata in one round-trip.
161    //
162    // Note: `v.k` is a hidden column consumed by the vec0 module as the
163    // result-limit parameter; `v.embedding MATCH ?1` supplies the query vector.
164    let sql = "
165        SELECT e.id, e.source_type, e.source_path, e.source_id,
166               e.staleness, e.chunk_text, e.last_commit, e.embedding
167        FROM vec_embeddings v
168        JOIN embeddings e ON e.id = v.rowid
169        WHERE v.embedding MATCH ?1
170          AND v.k = ?2
171          AND e.model = ?3
172        ORDER BY v.distance
173    ";
174
175    if let Some(vc) = vec_conn {
176        // Use the VecConnection for the ANN query (it has sqlite-vec registered).
177        let stmt = vc.prepare(sql).ok()?;
178        stmt.bind_blob(1, query_bytes);
179        stmt.bind_int64(2, k as i64);
180        stmt.bind_text(3, model);
181
182        let mut result = Vec::new();
183        while stmt.step().ok()? {
184            let id = stmt.column_int64(0);
185            let source_type = stmt.column_text(1).unwrap_or_default();
186            let source_path = stmt.column_text(2).unwrap_or_default();
187            let source_id_val = stmt.column_int64(3);
188            let source_id = if source_id_val != 0 {
189                Some(source_id_val)
190            } else {
191                None
192            };
193            let staleness = stmt.column_double(4) as f32;
194            let chunk_text = stmt.column_text(5).unwrap_or_default();
195            let last_commit = stmt.column_text(6);
196            let blob = stmt.column_blob(7);
197            let vector = crate::search::parse_blob(blob);
198
199            result.push(StoredEmbedding {
200                id,
201                source_type,
202                source_path,
203                source_id,
204                staleness,
205                chunk_text,
206                last_commit,
207                vector,
208            });
209        }
210
211        Some(result)
212    } else {
213        // Fallback: try through libsql connection (may not have vec loaded).
214        let mut rows = conn
215            .query(sql, params![query_bytes.to_vec(), k as i64, model])
216            .await
217            .ok()?;
218
219        let mut result = Vec::new();
220        while let Some(row) = rows.next().await.ok()? {
221            let id: i64 = row.get(0).ok()?;
222            let source_type: String = row.get(1).ok()?;
223            let source_path: String = row.get(2).ok()?;
224            let source_id: Option<i64> = row.get(3).ok()?;
225            let staleness: f64 = row.get(4).ok()?;
226            let chunk_text: String = row.get(5).ok()?;
227            let last_commit: Option<String> = row.get(6).ok()?;
228            let blob: Vec<u8> = row.get(7).ok()?;
229
230            let vector = crate::search::parse_blob(blob);
231
232            result.push(StoredEmbedding {
233                id,
234                source_type,
235                source_path,
236                source_id,
237                staleness: staleness as f32,
238                chunk_text,
239                last_commit,
240                vector,
241            });
242        }
243
244        Some(result)
245    }
246}
247
248/// Load all stored embeddings for a given model name.
249///
250/// Returns all rows so the caller can do brute-force cosine search in memory.
251/// Prefer [`ann_search`] when the sqlite-vec virtual table is available.
252pub async fn load_all_embeddings(
253    conn: &Connection,
254    model: &str,
255) -> Result<Vec<StoredEmbedding>, libsql::Error> {
256    let mut rows = conn
257        .query(
258            "SELECT id, source_type, source_path, source_id, staleness, chunk_text, last_commit, embedding
259             FROM embeddings WHERE model = ?1",
260            params![model],
261        )
262        .await?;
263
264    let mut result = Vec::new();
265    while let Some(row) = rows.next().await? {
266        let id: i64 = row.get(0)?;
267        let source_type: String = row.get(1)?;
268        let source_path: String = row.get(2)?;
269        let source_id: Option<i64> = row.get(3)?;
270        let staleness: f64 = row.get(4)?;
271        let chunk_text: String = row.get(5)?;
272        let last_commit: Option<String> = row.get(6)?;
273        let blob: Vec<u8> = row.get(7)?;
274
275        let vector = crate::search::parse_blob(blob);
276
277        result.push(StoredEmbedding {
278            id,
279            source_type,
280            source_path,
281            source_id,
282            staleness: staleness as f32,
283            chunk_text,
284            last_commit,
285            vector,
286        });
287    }
288
289    Ok(result)
290}
291
292/// Load stored embeddings filtered by a specific source type.
293///
294/// Like [`load_all_embeddings`] but scoped to one `source_type` (e.g. `"context"`).
295/// Used for the brute-force fallback path when ANN search is not available and
296/// the caller wants to restrict results to a single source type.
297pub async fn load_embeddings_for_type(
298    conn: &Connection,
299    model: &str,
300    source_type: &str,
301) -> Result<Vec<StoredEmbedding>, libsql::Error> {
302    let mut rows = conn
303        .query(
304            "SELECT id, source_type, source_path, source_id, staleness, chunk_text, last_commit, embedding
305             FROM embeddings WHERE model = ?1 AND source_type = ?2",
306            params![model, source_type],
307        )
308        .await?;
309
310    let mut result = Vec::new();
311    while let Some(row) = rows.next().await? {
312        let id: i64 = row.get(0)?;
313        let source_type_val: String = row.get(1)?;
314        let source_path: String = row.get(2)?;
315        let source_id: Option<i64> = row.get(3)?;
316        let staleness: f64 = row.get(4)?;
317        let chunk_text: String = row.get(5)?;
318        let last_commit: Option<String> = row.get(6)?;
319        let blob: Vec<u8> = row.get(7)?;
320
321        let vector = crate::search::parse_blob(blob);
322
323        result.push(StoredEmbedding {
324            id,
325            source_type: source_type_val,
326            source_path,
327            source_id,
328            staleness: staleness as f32,
329            chunk_text,
330            last_commit,
331            vector,
332        });
333    }
334
335    Ok(result)
336}
337
338/// Count embeddings stored for a given model.
339pub async fn count_embeddings(conn: &Connection, model: &str) -> Result<i64, libsql::Error> {
340    let mut rows = conn
341        .query(
342            "SELECT COUNT(*) FROM embeddings WHERE model = ?1",
343            params![model],
344        )
345        .await?;
346    if let Some(row) = rows.next().await? {
347        Ok(row.get(0)?)
348    } else {
349        Ok(0)
350    }
351}
352
353/// Delete embeddings for a specific (source_type, source_path) pair, all models.
354/// Used during incremental rebuild when a file changes.
355///
356/// Also removes the corresponding rows from `vec_embeddings` (best-effort)
357/// when a `VecConnection` is provided.
358pub async fn delete_embeddings_for_path(
359    conn: &Connection,
360    source_path: &str,
361    vec_conn: Option<&VecConnection>,
362) -> Result<u64, libsql::Error> {
363    // Collect IDs before deletion so we can clean up vec_embeddings.
364    let mut rows = conn
365        .query(
366            "SELECT id FROM embeddings WHERE source_path = ?1",
367            params![source_path],
368        )
369        .await?;
370    let mut ids: Vec<i64> = Vec::new();
371    while let Some(row) = rows.next().await? {
372        ids.push(row.get(0)?);
373    }
374
375    let affected = conn
376        .execute(
377            "DELETE FROM embeddings WHERE source_path = ?1",
378            params![source_path],
379        )
380        .await?;
381
382    for id in ids {
383        if let Some(vc) = vec_conn {
384            if let Ok(stmt) = vc.prepare("DELETE FROM vec_embeddings WHERE rowid = ?1") {
385                stmt.bind_int64(1, id);
386                let _ = stmt.step();
387            }
388        } else {
389            let _ = conn
390                .execute("DELETE FROM vec_embeddings WHERE rowid = ?1", params![id])
391                .await;
392        }
393    }
394
395    Ok(affected)
396}
397
398/// Drop embedding tables entirely for a full rebuild.
399///
400/// This is much faster than `DELETE FROM` for large tables — it avoids
401/// generating tombstone pages that bloat the database file.  The caller
402/// must call [`ensure_schema`] + [`ensure_vec_schema`] afterwards to
403/// recreate the tables.
404pub async fn drop_embedding_tables(
405    conn: &Connection,
406    vec_conn: Option<&VecConnection>,
407) -> Result<(), libsql::Error> {
408    if let Some(vc) = vec_conn {
409        let _ = vc.execute(DROP_VEC_EMBEDDINGS_TABLE);
410    } else {
411        let _ = conn.execute(DROP_VEC_EMBEDDINGS_TABLE, ()).await;
412    }
413    conn.execute(DROP_EMBEDDINGS_IDX_SOURCE, ()).await?;
414    conn.execute(DROP_EMBEDDINGS_IDX_MODEL, ()).await?;
415    conn.execute(DROP_EMBEDDINGS_TABLE, ()).await?;
416    Ok(())
417}
418
419/// Run `VACUUM` to reclaim space after a full rebuild.
420pub async fn vacuum(conn: &Connection) {
421    let _ = conn.execute("VACUUM", ()).await;
422}
423
424/// Return the set of file paths that have at least one embedding for the given model.
425pub async fn embedded_paths(
426    conn: &Connection,
427    model: &str,
428) -> Result<std::collections::HashSet<String>, libsql::Error> {
429    let mut rows = conn
430        .query(
431            "SELECT DISTINCT source_path FROM embeddings WHERE model = ?1",
432            params![model],
433        )
434        .await?;
435    let mut set = std::collections::HashSet::new();
436    while let Some(row) = rows.next().await? {
437        set.insert(row.get::<String>(0)?);
438    }
439    Ok(set)
440}