1use crate::schema::{
11 CREATE_EMBEDDINGS_IDX_MODEL, CREATE_EMBEDDINGS_IDX_SOURCE, CREATE_EMBEDDINGS_TABLE,
12 DROP_EMBEDDINGS_IDX_MODEL, DROP_EMBEDDINGS_IDX_SOURCE, DROP_EMBEDDINGS_TABLE,
13 DROP_VEC_EMBEDDINGS_TABLE, create_vec_embeddings_ddl,
14};
15use crate::search::StoredEmbedding;
16use crate::vec_ext::VecConnection;
17use libsql::{Connection, params};
18
19pub const ANN_CANDIDATE_COUNT: usize = 50;
24
25pub async fn ensure_schema(conn: &Connection) -> Result<(), libsql::Error> {
27 conn.execute(CREATE_EMBEDDINGS_TABLE, ()).await?;
28 conn.execute(CREATE_EMBEDDINGS_IDX_SOURCE, ()).await?;
29 conn.execute(CREATE_EMBEDDINGS_IDX_MODEL, ()).await?;
30 Ok(())
31}
32
33pub async fn ensure_vec_schema(
40 conn: &Connection,
41 dims: usize,
42 vec_conn: Option<&VecConnection>,
43) -> bool {
44 let ddl = create_vec_embeddings_ddl(dims);
45 if let Some(vc) = vec_conn {
46 vc.execute(&ddl).is_ok()
47 } else {
48 conn.execute(&ddl, ()).await.is_ok()
49 }
50}
51
52pub async fn vec_table_available(conn: &Connection, vec_conn: Option<&VecConnection>) -> bool {
57 if let Some(vc) = vec_conn {
58 vc.execute("SELECT rowid FROM vec_embeddings LIMIT 1")
59 .is_ok()
60 } else {
61 conn.query("SELECT rowid FROM vec_embeddings LIMIT 1", ())
62 .await
63 .is_ok()
64 }
65}
66
67#[allow(clippy::too_many_arguments)]
76pub async fn upsert_embedding(
77 conn: &Connection,
78 source_type: &str,
79 source_path: &str,
80 source_id: Option<i64>,
81 model: &str,
82 last_commit: Option<&str>,
83 staleness: f32,
84 chunk_text: &str,
85 embedding_bytes: &[u8],
86 vec_conn: Option<&VecConnection>,
87) -> Result<(), libsql::Error> {
88 conn.execute(
92 "INSERT OR REPLACE INTO embeddings (source_type, source_path, source_id, model, last_commit, staleness, chunk_text, embedding)
93 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
94 params![
95 source_type,
96 source_path,
97 source_id,
98 model,
99 last_commit,
100 staleness as f64,
101 chunk_text,
102 embedding_bytes.to_vec()
103 ],
104 )
105 .await?;
106
107 let new_id: i64 = {
110 let mut rows = conn.query("SELECT last_insert_rowid()", ()).await?;
111 if let Some(row) = rows.next().await? {
112 row.get(0)?
113 } else {
114 return Ok(());
115 }
116 };
117
118 if let Some(vc) = vec_conn {
119 if let Ok(stmt) =
120 vc.prepare("INSERT OR REPLACE INTO vec_embeddings(rowid, embedding) VALUES (?1, ?2)")
121 {
122 stmt.bind_int64(1, new_id);
123 stmt.bind_blob(2, embedding_bytes);
124 let _ = stmt.step();
125 }
126 } else {
127 let _ = conn
128 .execute(
129 "INSERT OR REPLACE INTO vec_embeddings(rowid, embedding) VALUES (?1, ?2)",
130 params![new_id, embedding_bytes.to_vec()],
131 )
132 .await;
133 }
134
135 Ok(())
136}
137
138pub async fn ann_search(
148 conn: &Connection,
149 model: &str,
150 query_bytes: &[u8],
151 k: usize,
152 vec_conn: Option<&VecConnection>,
153) -> Option<Vec<StoredEmbedding>> {
154 if !vec_table_available(conn, vec_conn).await {
156 return None;
157 }
158
159 let sql = "
165 SELECT e.id, e.source_type, e.source_path, e.source_id,
166 e.staleness, e.chunk_text, e.last_commit, e.embedding
167 FROM vec_embeddings v
168 JOIN embeddings e ON e.id = v.rowid
169 WHERE v.embedding MATCH ?1
170 AND v.k = ?2
171 AND e.model = ?3
172 ORDER BY v.distance
173 ";
174
175 if let Some(vc) = vec_conn {
176 let stmt = vc.prepare(sql).ok()?;
178 stmt.bind_blob(1, query_bytes);
179 stmt.bind_int64(2, k as i64);
180 stmt.bind_text(3, model);
181
182 let mut result = Vec::new();
183 while stmt.step().ok()? {
184 let id = stmt.column_int64(0);
185 let source_type = stmt.column_text(1).unwrap_or_default();
186 let source_path = stmt.column_text(2).unwrap_or_default();
187 let source_id_val = stmt.column_int64(3);
188 let source_id = if source_id_val != 0 {
189 Some(source_id_val)
190 } else {
191 None
192 };
193 let staleness = stmt.column_double(4) as f32;
194 let chunk_text = stmt.column_text(5).unwrap_or_default();
195 let last_commit = stmt.column_text(6);
196 let blob = stmt.column_blob(7);
197 let vector = crate::search::parse_blob(blob);
198
199 result.push(StoredEmbedding {
200 id,
201 source_type,
202 source_path,
203 source_id,
204 staleness,
205 chunk_text,
206 last_commit,
207 vector,
208 });
209 }
210
211 Some(result)
212 } else {
213 let mut rows = conn
215 .query(sql, params![query_bytes.to_vec(), k as i64, model])
216 .await
217 .ok()?;
218
219 let mut result = Vec::new();
220 while let Some(row) = rows.next().await.ok()? {
221 let id: i64 = row.get(0).ok()?;
222 let source_type: String = row.get(1).ok()?;
223 let source_path: String = row.get(2).ok()?;
224 let source_id: Option<i64> = row.get(3).ok()?;
225 let staleness: f64 = row.get(4).ok()?;
226 let chunk_text: String = row.get(5).ok()?;
227 let last_commit: Option<String> = row.get(6).ok()?;
228 let blob: Vec<u8> = row.get(7).ok()?;
229
230 let vector = crate::search::parse_blob(blob);
231
232 result.push(StoredEmbedding {
233 id,
234 source_type,
235 source_path,
236 source_id,
237 staleness: staleness as f32,
238 chunk_text,
239 last_commit,
240 vector,
241 });
242 }
243
244 Some(result)
245 }
246}
247
248pub async fn load_all_embeddings(
253 conn: &Connection,
254 model: &str,
255) -> Result<Vec<StoredEmbedding>, libsql::Error> {
256 let mut rows = conn
257 .query(
258 "SELECT id, source_type, source_path, source_id, staleness, chunk_text, last_commit, embedding
259 FROM embeddings WHERE model = ?1",
260 params![model],
261 )
262 .await?;
263
264 let mut result = Vec::new();
265 while let Some(row) = rows.next().await? {
266 let id: i64 = row.get(0)?;
267 let source_type: String = row.get(1)?;
268 let source_path: String = row.get(2)?;
269 let source_id: Option<i64> = row.get(3)?;
270 let staleness: f64 = row.get(4)?;
271 let chunk_text: String = row.get(5)?;
272 let last_commit: Option<String> = row.get(6)?;
273 let blob: Vec<u8> = row.get(7)?;
274
275 let vector = crate::search::parse_blob(blob);
276
277 result.push(StoredEmbedding {
278 id,
279 source_type,
280 source_path,
281 source_id,
282 staleness: staleness as f32,
283 chunk_text,
284 last_commit,
285 vector,
286 });
287 }
288
289 Ok(result)
290}
291
292pub async fn load_embeddings_for_type(
298 conn: &Connection,
299 model: &str,
300 source_type: &str,
301) -> Result<Vec<StoredEmbedding>, libsql::Error> {
302 let mut rows = conn
303 .query(
304 "SELECT id, source_type, source_path, source_id, staleness, chunk_text, last_commit, embedding
305 FROM embeddings WHERE model = ?1 AND source_type = ?2",
306 params![model, source_type],
307 )
308 .await?;
309
310 let mut result = Vec::new();
311 while let Some(row) = rows.next().await? {
312 let id: i64 = row.get(0)?;
313 let source_type_val: String = row.get(1)?;
314 let source_path: String = row.get(2)?;
315 let source_id: Option<i64> = row.get(3)?;
316 let staleness: f64 = row.get(4)?;
317 let chunk_text: String = row.get(5)?;
318 let last_commit: Option<String> = row.get(6)?;
319 let blob: Vec<u8> = row.get(7)?;
320
321 let vector = crate::search::parse_blob(blob);
322
323 result.push(StoredEmbedding {
324 id,
325 source_type: source_type_val,
326 source_path,
327 source_id,
328 staleness: staleness as f32,
329 chunk_text,
330 last_commit,
331 vector,
332 });
333 }
334
335 Ok(result)
336}
337
338pub async fn count_embeddings(conn: &Connection, model: &str) -> Result<i64, libsql::Error> {
340 let mut rows = conn
341 .query(
342 "SELECT COUNT(*) FROM embeddings WHERE model = ?1",
343 params![model],
344 )
345 .await?;
346 if let Some(row) = rows.next().await? {
347 Ok(row.get(0)?)
348 } else {
349 Ok(0)
350 }
351}
352
353pub async fn delete_embeddings_for_path(
359 conn: &Connection,
360 source_path: &str,
361 vec_conn: Option<&VecConnection>,
362) -> Result<u64, libsql::Error> {
363 let mut rows = conn
365 .query(
366 "SELECT id FROM embeddings WHERE source_path = ?1",
367 params![source_path],
368 )
369 .await?;
370 let mut ids: Vec<i64> = Vec::new();
371 while let Some(row) = rows.next().await? {
372 ids.push(row.get(0)?);
373 }
374
375 let affected = conn
376 .execute(
377 "DELETE FROM embeddings WHERE source_path = ?1",
378 params![source_path],
379 )
380 .await?;
381
382 for id in ids {
383 if let Some(vc) = vec_conn {
384 if let Ok(stmt) = vc.prepare("DELETE FROM vec_embeddings WHERE rowid = ?1") {
385 stmt.bind_int64(1, id);
386 let _ = stmt.step();
387 }
388 } else {
389 let _ = conn
390 .execute("DELETE FROM vec_embeddings WHERE rowid = ?1", params![id])
391 .await;
392 }
393 }
394
395 Ok(affected)
396}
397
398pub async fn drop_embedding_tables(
405 conn: &Connection,
406 vec_conn: Option<&VecConnection>,
407) -> Result<(), libsql::Error> {
408 if let Some(vc) = vec_conn {
409 let _ = vc.execute(DROP_VEC_EMBEDDINGS_TABLE);
410 } else {
411 let _ = conn.execute(DROP_VEC_EMBEDDINGS_TABLE, ()).await;
412 }
413 conn.execute(DROP_EMBEDDINGS_IDX_SOURCE, ()).await?;
414 conn.execute(DROP_EMBEDDINGS_IDX_MODEL, ()).await?;
415 conn.execute(DROP_EMBEDDINGS_TABLE, ()).await?;
416 Ok(())
417}
418
419pub async fn vacuum(conn: &Connection) {
421 let _ = conn.execute("VACUUM", ()).await;
422}
423
424pub async fn embedded_paths(
426 conn: &Connection,
427 model: &str,
428) -> Result<std::collections::HashSet<String>, libsql::Error> {
429 let mut rows = conn
430 .query(
431 "SELECT DISTINCT source_path FROM embeddings WHERE model = ?1",
432 params![model],
433 )
434 .await?;
435 let mut set = std::collections::HashSet::new();
436 while let Some(row) = rows.next().await? {
437 set.insert(row.get::<String>(0)?);
438 }
439 Ok(set)
440}