1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::Value as SqlValue;
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{BooleanQuery, BoostQuery, Occur, Query, TermQuery};
12use tantivy::schema::{Field, IndexRecordOption, Value, FAST, INDEXED, STORED, TEXT};
13use tantivy::tokenizer::TokenStream;
14use tantivy::{Index, IndexWriter, TantivyDocument, Term};
15use usearch::{IndexOptions, MetricKind, ScalarKind};
16
17const DB_FILE: &str = "meta.sqlite";
18const DEFAULT_SPACE_NAME: &str = "default";
19const SPACES_DIR: &str = "spaces";
20const TANTIVY_DIR_NAME: &str = "tantivy";
21const USEARCH_FILENAME: &str = "vectors.usearch";
22
23pub struct Storage {
24 db: Mutex<Connection>,
25 cache_dir: PathBuf,
26 spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct SpaceRow {
31 pub id: i64,
32 pub name: String,
33 pub description: Option<String>,
34 pub created: String,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct CollectionRow {
39 pub id: i64,
40 pub space_id: i64,
41 pub name: String,
42 pub path: PathBuf,
43 pub description: Option<String>,
44 pub extensions: Option<Vec<String>>,
45 pub created: String,
46 pub updated: String,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct DocumentRow {
51 pub id: i64,
52 pub collection_id: i64,
53 pub path: String,
54 pub title: String,
55 pub title_source: DocumentTitleSource,
56 pub hash: String,
57 pub modified: String,
58 pub active: bool,
59 pub deactivated_at: Option<String>,
60 pub fts_dirty: bool,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct FileListRow {
65 pub doc_id: i64,
66 pub path: String,
67 pub title: String,
68 pub hash: String,
69 pub active: bool,
70 pub chunk_count: usize,
71 pub embedded_chunk_count: usize,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ChunkRow {
76 pub id: i64,
77 pub doc_id: i64,
78 pub seq: i32,
79 pub offset: usize,
80 pub length: usize,
81 pub heading: Option<String>,
82 pub kind: FinalChunkKind,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChunkInsert {
87 pub seq: i32,
88 pub offset: usize,
89 pub length: usize,
90 pub heading: Option<String>,
91 pub kind: FinalChunkKind,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct EmbedRecord {
96 pub chunk_id: i64,
97 pub doc_path: String,
98 pub collection_path: PathBuf,
99 pub space_name: String,
100 pub offset: usize,
101 pub length: usize,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct FtsDirtyRecord {
106 pub doc_id: i64,
107 pub doc_path: String,
108 pub doc_title: String,
109 pub doc_title_source: DocumentTitleSource,
110 pub doc_hash: String,
111 pub collection_path: PathBuf,
112 pub space_name: String,
113 pub chunks: Vec<ChunkRow>,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct ReapableDocument {
118 pub doc_id: i64,
119 pub space_name: String,
120 pub chunk_ids: Vec<i64>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct TantivyEntry {
125 pub chunk_id: i64,
126 pub doc_id: i64,
127 pub filepath: String,
128 pub semantic_title: Option<String>,
129 pub heading: Option<String>,
130 pub body: String,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum DocumentTitleSource {
135 Extracted,
136 FilenameFallback,
137}
138
139impl DocumentTitleSource {
140 pub fn as_sql(self) -> &'static str {
141 match self {
142 Self::Extracted => "extracted",
143 Self::FilenameFallback => "filename_fallback",
144 }
145 }
146
147 fn from_sql(raw: &str) -> std::result::Result<Self, KboltError> {
148 match raw {
149 "extracted" => Ok(Self::Extracted),
150 "filename_fallback" => Ok(Self::FilenameFallback),
151 other => Err(KboltError::InvalidInput(format!(
152 "invalid stored document title source: {other}"
153 ))),
154 }
155 }
156
157 pub fn semantic_title(self, title: &str) -> Option<&str> {
158 matches!(self, Self::Extracted)
159 .then_some(title.trim())
160 .filter(|title| !title.is_empty())
161 }
162}
163
164#[derive(Debug, Clone, PartialEq)]
165pub struct BM25Hit {
166 pub chunk_id: i64,
167 pub score: f32,
168}
169
170#[derive(Debug, Clone, PartialEq)]
171pub struct DenseHit {
172 pub chunk_id: i64,
173 pub distance: f32,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub enum SpaceResolution {
178 Found(SpaceRow),
179 Ambiguous(Vec<String>),
180 NotFound,
181}
182
183struct SpaceIndexes {
184 _tantivy_dir: PathBuf,
185 usearch_path: PathBuf,
186 tantivy_index: Index,
187 tantivy_writer: Mutex<Option<IndexWriter>>,
188 usearch_index: RwLock<usearch::Index>,
189 fields: TantivyFields,
190}
191
192#[derive(Debug, Clone, Copy)]
193struct TantivyFields {
194 chunk_id: Field,
195 doc_id: Field,
196 filepath: Field,
197 title: Field,
198 heading: Field,
199 body: Field,
200}
201
202#[derive(Debug, Clone, Copy)]
203struct Bm25FieldSpec {
204 field: Field,
205 boost: f32,
206 index_record_option: IndexRecordOption,
207}
208
209impl Storage {
210 pub fn new(cache_dir: &Path) -> Result<Self> {
211 std::fs::create_dir_all(cache_dir)?;
212 let db_path = cache_dir.join(DB_FILE);
213 let conn = Connection::open(db_path)?;
214 conn.execute_batch(
215 r#"
216PRAGMA foreign_keys = ON;
217PRAGMA journal_mode = WAL;
218
219CREATE TABLE IF NOT EXISTS spaces (
220 id INTEGER PRIMARY KEY,
221 name TEXT NOT NULL UNIQUE,
222 description TEXT,
223 created TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
224);
225
226CREATE TABLE IF NOT EXISTS collections (
227 id INTEGER PRIMARY KEY,
228 space_id INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
229 name TEXT NOT NULL,
230 path TEXT NOT NULL,
231 description TEXT,
232 extensions TEXT,
233 created TEXT NOT NULL,
234 updated TEXT NOT NULL,
235 UNIQUE(space_id, name)
236);
237
238CREATE TABLE IF NOT EXISTS documents (
239 id INTEGER PRIMARY KEY,
240 collection_id INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
241 path TEXT NOT NULL,
242 title TEXT NOT NULL,
243 title_source TEXT NOT NULL DEFAULT 'extracted',
244 hash TEXT NOT NULL,
245 modified TEXT NOT NULL,
246 active INTEGER NOT NULL DEFAULT 1,
247 deactivated_at TEXT,
248 fts_dirty INTEGER NOT NULL DEFAULT 0,
249 UNIQUE(collection_id, path)
250);
251CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
252CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
253CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
254
255CREATE TABLE IF NOT EXISTS chunks (
256 id INTEGER PRIMARY KEY,
257 doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
258 seq INTEGER NOT NULL,
259 offset INTEGER NOT NULL,
260 length INTEGER NOT NULL,
261 heading TEXT,
262 kind TEXT NOT NULL DEFAULT 'section',
263 UNIQUE(doc_id, seq)
264);
265CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
266
267CREATE TABLE IF NOT EXISTS embeddings (
268 chunk_id INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
269 model TEXT NOT NULL,
270 embedded_at TEXT NOT NULL,
271 PRIMARY KEY (chunk_id, model)
272);
273"#,
274 )?;
275 ensure_documents_title_source_column(&conn)?;
276
277 conn.execute(
278 "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
279 params![DEFAULT_SPACE_NAME],
280 )?;
281
282 let mut stmt = conn.prepare("SELECT name FROM spaces ORDER BY name ASC")?;
283 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
284 let space_names = rows.collect::<std::result::Result<Vec<_>, _>>()?;
285 drop(stmt);
286
287 let storage = Self {
288 db: Mutex::new(conn),
289 cache_dir: cache_dir.to_path_buf(),
290 spaces: RwLock::new(HashMap::new()),
291 };
292
293 for space_name in space_names {
294 storage.open_space(&space_name)?;
295 }
296
297 Ok(storage)
298 }
299
300 pub fn open_space(&self, name: &str) -> Result<()> {
301 let _space = self.get_space(name)?;
302
303 {
304 let spaces = self
305 .spaces
306 .read()
307 .map_err(|_| CoreError::poisoned("spaces"))?;
308 if spaces.contains_key(name) {
309 return Ok(());
310 }
311 }
312
313 let (tantivy_dir, usearch_path) = self.space_paths(name);
314 std::fs::create_dir_all(&tantivy_dir)?;
315 std::fs::OpenOptions::new()
316 .create(true)
317 .append(true)
318 .open(&usearch_path)?;
319 let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
320 let usearch_index = open_or_create_usearch_index(&usearch_path)?;
321 let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
322
323 let mut spaces = self
324 .spaces
325 .write()
326 .map_err(|_| CoreError::poisoned("spaces"))?;
327 spaces
328 .entry(name.to_string())
329 .or_insert(Arc::new(SpaceIndexes {
330 _tantivy_dir: tantivy_dir,
331 usearch_path,
332 tantivy_index,
333 tantivy_writer: Mutex::new(None),
334 usearch_index: RwLock::new(usearch_index),
335 fields,
336 }));
337
338 Ok(())
339 }
340
341 pub fn close_space(&self, name: &str) -> Result<()> {
342 let _space = self.get_space(name)?;
343 let mut spaces = self
344 .spaces
345 .write()
346 .map_err(|_| CoreError::poisoned("spaces"))?;
347 let _removed = spaces.remove(name);
348 Ok(())
349 }
350
351 pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
352 let space_id = {
353 let conn = self
354 .db
355 .lock()
356 .map_err(|_| CoreError::poisoned("database"))?;
357
358 let result = conn.execute(
359 "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
360 params![name, description],
361 );
362
363 match result {
364 Ok(_) => conn.last_insert_rowid(),
365 Err(err) => {
366 return match err {
367 Error::SqliteFailure(sqlite_err, _)
368 if sqlite_err.code == ErrorCode::ConstraintViolation =>
369 {
370 Err(KboltError::SpaceAlreadyExists {
371 name: name.to_string(),
372 }
373 .into())
374 }
375 other => Err(other.into()),
376 };
377 }
378 }
379 };
380
381 if let Err(open_err) = self.open_space(name) {
382 let rollback_result = self
383 .db
384 .lock()
385 .map_err(|_| CoreError::poisoned("database"))?
386 .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
387
388 if let Err(rollback_err) = rollback_result {
389 return Err(CoreError::Internal(format!(
390 "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
391 )));
392 }
393
394 return Err(open_err);
395 }
396
397 Ok(space_id)
398 }
399
400 pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
401 let conn = self
402 .db
403 .lock()
404 .map_err(|_| CoreError::poisoned("database"))?;
405 let mut stmt =
406 conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
407
408 let row = stmt.query_row(params![name], decode_space_row);
409
410 match row {
411 Ok(space) => Ok(space),
412 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
413 name: name.to_string(),
414 }
415 .into()),
416 Err(err) => Err(err.into()),
417 }
418 }
419
420 pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
421 let conn = self
422 .db
423 .lock()
424 .map_err(|_| CoreError::poisoned("database"))?;
425 let mut stmt =
426 conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
427
428 let row = stmt.query_row(params![id], decode_space_row);
429
430 match row {
431 Ok(space) => Ok(space),
432 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
433 name: format!("id={id}"),
434 }
435 .into()),
436 Err(err) => Err(err.into()),
437 }
438 }
439
440 pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
441 let conn = self
442 .db
443 .lock()
444 .map_err(|_| CoreError::poisoned("database"))?;
445 let mut stmt = conn.prepare(
446 "SELECT id, name, description, created
447 FROM spaces
448 ORDER BY name ASC",
449 )?;
450
451 let rows = stmt.query_map([], decode_space_row)?;
452
453 let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
454 Ok(spaces)
455 }
456
457 pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
458 let conn = self
459 .db
460 .lock()
461 .map_err(|_| CoreError::poisoned("database"))?;
462 let mut stmt = conn.prepare(
463 "SELECT s.id, s.name, s.description, s.created
464 FROM spaces s
465 JOIN collections c ON c.space_id = s.id
466 WHERE c.name = ?1
467 ORDER BY s.name ASC",
468 )?;
469 let rows = stmt.query_map(params![collection], decode_space_row)?;
470 let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
471
472 if matches.is_empty() {
473 return Ok(SpaceResolution::NotFound);
474 }
475
476 if matches.len() == 1 {
477 return Ok(SpaceResolution::Found(matches[0].clone()));
478 }
479
480 let spaces = matches.into_iter().map(|space| space.name).collect();
481 Ok(SpaceResolution::Ambiguous(spaces))
482 }
483
484 pub fn delete_space(&self, name: &str) -> Result<()> {
485 if name == DEFAULT_SPACE_NAME {
486 let conn = self
487 .db
488 .lock()
489 .map_err(|_| CoreError::poisoned("database"))?;
490 conn.execute(
491 "DELETE FROM collections
492 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
493 params![name],
494 )?;
495 drop(conn);
496
497 self.unload_space(name)?;
498 self.remove_space_artifacts(name)?;
499 self.open_space(name)?;
500 return Ok(());
501 }
502
503 let conn = self
504 .db
505 .lock()
506 .map_err(|_| CoreError::poisoned("database"))?;
507 let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
508 drop(conn);
509
510 if deleted == 0 {
511 return Err(KboltError::SpaceNotFound {
512 name: name.to_string(),
513 }
514 .into());
515 }
516
517 self.unload_space(name)?;
518 self.remove_space_artifacts(name)?;
519
520 Ok(())
521 }
522
523 pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
524 if old == DEFAULT_SPACE_NAME {
525 return Err(
526 KboltError::Config("cannot rename reserved space: default".to_string()).into(),
527 );
528 }
529
530 let conn = self
531 .db
532 .lock()
533 .map_err(|_| CoreError::poisoned("database"))?;
534
535 let result = conn.execute(
536 "UPDATE spaces SET name = ?1 WHERE name = ?2",
537 params![new, old],
538 );
539 drop(conn);
540
541 match result {
542 Ok(0) => Err(KboltError::SpaceNotFound {
543 name: old.to_string(),
544 }
545 .into()),
546 Ok(_) => {
547 if let Err(rename_err) = self.rename_space_artifacts(old, new) {
548 let rollback = self
549 .db
550 .lock()
551 .map_err(|_| CoreError::poisoned("database"))?
552 .execute(
553 "UPDATE spaces SET name = ?1 WHERE name = ?2",
554 params![old, new],
555 );
556
557 if let Err(rollback_err) = rollback {
558 return Err(CoreError::Internal(format!(
559 "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
560 )));
561 }
562
563 return Err(rename_err);
564 }
565
566 self.unload_space(old)?;
567 if let Err(open_err) = self.open_space(new) {
568 let _ = self.rename_space_artifacts(new, old);
569 let _ = self
570 .db
571 .lock()
572 .map_err(|_| CoreError::poisoned("database"))?
573 .execute(
574 "UPDATE spaces SET name = ?1 WHERE name = ?2",
575 params![old, new],
576 );
577 let _ = self.open_space(old);
578 return Err(open_err);
579 }
580
581 Ok(())
582 }
583 Err(Error::SqliteFailure(sqlite_err, _))
584 if sqlite_err.code == ErrorCode::ConstraintViolation =>
585 {
586 Err(KboltError::SpaceAlreadyExists {
587 name: new.to_string(),
588 }
589 .into())
590 }
591 Err(err) => Err(err.into()),
592 }
593 }
594
595 pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
596 let conn = self
597 .db
598 .lock()
599 .map_err(|_| CoreError::poisoned("database"))?;
600
601 let updated = conn.execute(
602 "UPDATE spaces SET description = ?1 WHERE name = ?2",
603 params![description, name],
604 )?;
605
606 if updated == 0 {
607 return Err(KboltError::SpaceNotFound {
608 name: name.to_string(),
609 }
610 .into());
611 }
612
613 Ok(())
614 }
615
616 pub fn create_collection(
617 &self,
618 space_id: i64,
619 name: &str,
620 path: &Path,
621 description: Option<&str>,
622 extensions: Option<&[String]>,
623 ) -> Result<i64> {
624 let conn = self
625 .db
626 .lock()
627 .map_err(|_| CoreError::poisoned("database"))?;
628
629 let space_name = lookup_space_name(&conn, space_id)?;
630 let extensions_json = serialize_extensions(extensions)?;
631 let result = conn.execute(
632 "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
633 VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
634 params![space_id, name, path.to_string_lossy(), description, extensions_json],
635 );
636
637 match result {
638 Ok(_) => Ok(conn.last_insert_rowid()),
639 Err(Error::SqliteFailure(sqlite_err, _))
640 if sqlite_err.code == ErrorCode::ConstraintViolation =>
641 {
642 Err(KboltError::CollectionAlreadyExists {
643 name: name.to_string(),
644 space: space_name,
645 }
646 .into())
647 }
648 Err(err) => Err(err.into()),
649 }
650 }
651
652 pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
653 let conn = self
654 .db
655 .lock()
656 .map_err(|_| CoreError::poisoned("database"))?;
657 let mut stmt = conn.prepare(
658 "SELECT id, space_id, name, path, description, extensions, created, updated
659 FROM collections
660 WHERE space_id = ?1 AND name = ?2",
661 )?;
662
663 let result = stmt.query_row(params![space_id, name], decode_collection_row);
664 match result {
665 Ok(row) => Ok(row),
666 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
667 name: name.to_string(),
668 }
669 .into()),
670 Err(err) => Err(err.into()),
671 }
672 }
673
674 pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
675 let conn = self
676 .db
677 .lock()
678 .map_err(|_| CoreError::poisoned("database"))?;
679 let mut stmt = conn.prepare(
680 "SELECT id, space_id, name, path, description, extensions, created, updated
681 FROM collections
682 WHERE id = ?1",
683 )?;
684
685 let result = stmt.query_row(params![id], decode_collection_row);
686 match result {
687 Ok(row) => Ok(row),
688 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
689 name: format!("id={id}"),
690 }
691 .into()),
692 Err(err) => Err(err.into()),
693 }
694 }
695
696 pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
697 let conn = self
698 .db
699 .lock()
700 .map_err(|_| CoreError::poisoned("database"))?;
701
702 let (sql, params): (&str, Vec<i64>) = match space_id {
703 Some(id) => (
704 "SELECT id, space_id, name, path, description, extensions, created, updated
705 FROM collections
706 WHERE space_id = ?1
707 ORDER BY name ASC",
708 vec![id],
709 ),
710 None => (
711 "SELECT id, space_id, name, path, description, extensions, created, updated
712 FROM collections
713 ORDER BY space_id ASC, name ASC",
714 Vec::new(),
715 ),
716 };
717
718 let mut stmt = conn.prepare(sql)?;
719 let rows = if params.is_empty() {
720 stmt.query_map([], decode_collection_row)?
721 } else {
722 stmt.query_map(params![params[0]], decode_collection_row)?
723 };
724 let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
725 Ok(collections)
726 }
727
728 pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
729 let conn = self
730 .db
731 .lock()
732 .map_err(|_| CoreError::poisoned("database"))?;
733 let _space_name = lookup_space_name(&conn, space_id)?;
734
735 let deleted = conn.execute(
736 "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
737 params![space_id, name],
738 )?;
739
740 if deleted == 0 {
741 return Err(KboltError::CollectionNotFound {
742 name: name.to_string(),
743 }
744 .into());
745 }
746
747 Ok(())
748 }
749
750 pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
751 let conn = self
752 .db
753 .lock()
754 .map_err(|_| CoreError::poisoned("database"))?;
755 let space_name = lookup_space_name(&conn, space_id)?;
756 let result = conn.execute(
757 "UPDATE collections
758 SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
759 WHERE space_id = ?2 AND name = ?3",
760 params![new, space_id, old],
761 );
762
763 match result {
764 Ok(0) => Err(KboltError::CollectionNotFound {
765 name: old.to_string(),
766 }
767 .into()),
768 Ok(_) => Ok(()),
769 Err(Error::SqliteFailure(sqlite_err, _))
770 if sqlite_err.code == ErrorCode::ConstraintViolation =>
771 {
772 Err(KboltError::CollectionAlreadyExists {
773 name: new.to_string(),
774 space: space_name,
775 }
776 .into())
777 }
778 Err(err) => Err(err.into()),
779 }
780 }
781
782 pub fn update_collection_description(
783 &self,
784 space_id: i64,
785 name: &str,
786 desc: &str,
787 ) -> Result<()> {
788 let conn = self
789 .db
790 .lock()
791 .map_err(|_| CoreError::poisoned("database"))?;
792 let _space_name = lookup_space_name(&conn, space_id)?;
793
794 let updated = conn.execute(
795 "UPDATE collections
796 SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
797 WHERE space_id = ?2 AND name = ?3",
798 params![desc, space_id, name],
799 )?;
800
801 if updated == 0 {
802 return Err(KboltError::CollectionNotFound {
803 name: name.to_string(),
804 }
805 .into());
806 }
807
808 Ok(())
809 }
810
811 pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
812 let conn = self
813 .db
814 .lock()
815 .map_err(|_| CoreError::poisoned("database"))?;
816
817 let updated = conn.execute(
818 "UPDATE collections
819 SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
820 WHERE id = ?1",
821 params![collection_id],
822 )?;
823
824 if updated == 0 {
825 return Err(KboltError::CollectionNotFound {
826 name: format!("id={collection_id}"),
827 }
828 .into());
829 }
830
831 Ok(())
832 }
833
834 pub fn upsert_document(
835 &self,
836 collection_id: i64,
837 path: &str,
838 title: &str,
839 title_source: DocumentTitleSource,
840 hash: &str,
841 modified: &str,
842 ) -> Result<i64> {
843 let conn = self
844 .db
845 .lock()
846 .map_err(|_| CoreError::poisoned("database"))?;
847 let _collection_name = lookup_collection_name(&conn, collection_id)?;
848
849 conn.execute(
850 "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
851 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
852 ON CONFLICT(collection_id, path) DO UPDATE SET
853 title = excluded.title,
854 title_source = excluded.title_source,
855 hash = excluded.hash,
856 modified = excluded.modified,
857 active = 1,
858 deactivated_at = NULL,
859 fts_dirty = 1",
860 params![
861 collection_id,
862 path,
863 title,
864 title_source.as_sql(),
865 hash,
866 modified
867 ],
868 )?;
869
870 let id: i64 = conn.query_row(
871 "SELECT id FROM documents WHERE collection_id = ?1 AND path = ?2",
872 params![collection_id, path],
873 |row| row.get(0),
874 )?;
875 Ok(id)
876 }
877
878 pub fn get_document_by_path(
879 &self,
880 collection_id: i64,
881 path: &str,
882 ) -> Result<Option<DocumentRow>> {
883 let conn = self
884 .db
885 .lock()
886 .map_err(|_| CoreError::poisoned("database"))?;
887 let _collection_name = lookup_collection_name(&conn, collection_id)?;
888 let mut stmt = conn.prepare(
889 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
890 FROM documents
891 WHERE collection_id = ?1 AND path = ?2",
892 )?;
893
894 let result = stmt.query_row(params![collection_id, path], decode_document_row);
895 match result {
896 Ok(row) => Ok(Some(row)),
897 Err(Error::QueryReturnedNoRows) => Ok(None),
898 Err(err) => Err(err.into()),
899 }
900 }
901
902 pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
903 let conn = self
904 .db
905 .lock()
906 .map_err(|_| CoreError::poisoned("database"))?;
907 let mut stmt = conn.prepare(
908 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
909 FROM documents
910 WHERE id = ?1",
911 )?;
912
913 let result = stmt.query_row(params![id], decode_document_row);
914 match result {
915 Ok(row) => Ok(row),
916 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
917 path: format!("id={id}"),
918 }
919 .into()),
920 Err(err) => Err(err.into()),
921 }
922 }
923
924 pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
925 if ids.is_empty() {
926 return Ok(Vec::new());
927 }
928
929 let conn = self
930 .db
931 .lock()
932 .map_err(|_| CoreError::poisoned("database"))?;
933
934 let placeholders = vec!["?"; ids.len()].join(", ");
935 let sql = format!(
936 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
937 FROM documents
938 WHERE id IN ({placeholders})"
939 );
940 let mut stmt = conn.prepare(&sql)?;
941 let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
942 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
943 Ok(docs)
944 }
945
946 pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
947 let conn = self
948 .db
949 .lock()
950 .map_err(|_| CoreError::poisoned("database"))?;
951
952 let updated = conn.execute(
953 "UPDATE documents
954 SET modified = ?1,
955 active = 1,
956 deactivated_at = NULL
957 WHERE id = ?2",
958 params![modified, doc_id],
959 )?;
960
961 if updated == 0 {
962 return Err(KboltError::DocumentNotFound {
963 path: format!("id={doc_id}"),
964 }
965 .into());
966 }
967
968 Ok(())
969 }
970
971 pub fn list_documents(
972 &self,
973 collection_id: i64,
974 active_only: bool,
975 ) -> Result<Vec<DocumentRow>> {
976 let conn = self
977 .db
978 .lock()
979 .map_err(|_| CoreError::poisoned("database"))?;
980 let _collection_name = lookup_collection_name(&conn, collection_id)?;
981 let active_only = i64::from(active_only);
982 let mut stmt = conn.prepare(
983 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
984 FROM documents
985 WHERE collection_id = ?1
986 AND (?2 = 0 OR active = 1)
987 ORDER BY path ASC",
988 )?;
989 let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
990 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
991 Ok(docs)
992 }
993
994 pub fn list_collection_file_rows(
995 &self,
996 collection_id: i64,
997 active_only: bool,
998 ) -> Result<Vec<FileListRow>> {
999 let conn = self
1000 .db
1001 .lock()
1002 .map_err(|_| CoreError::poisoned("database"))?;
1003 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1004 let active_only = i64::from(active_only);
1005 let mut stmt = conn.prepare(
1006 "SELECT d.id, d.path, d.title, d.hash, d.active,
1007 COUNT(DISTINCT c.id) AS chunk_count,
1008 COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1009 FROM documents d
1010 LEFT JOIN chunks c ON c.doc_id = d.id
1011 LEFT JOIN embeddings e ON e.chunk_id = c.id
1012 WHERE d.collection_id = ?1
1013 AND (?2 = 0 OR d.active = 1)
1014 GROUP BY d.id, d.path, d.title, d.hash, d.active
1015 ORDER BY d.path ASC",
1016 )?;
1017 let rows = stmt.query_map(params![collection_id, active_only], |row| {
1018 let chunk_count: i64 = row.get(5)?;
1019 let embedded_chunk_count: i64 = row.get(6)?;
1020 Ok(FileListRow {
1021 doc_id: row.get(0)?,
1022 path: row.get(1)?,
1023 title: row.get(2)?,
1024 hash: row.get(3)?,
1025 active: row.get::<_, i64>(4)? != 0,
1026 chunk_count: chunk_count as usize,
1027 embedded_chunk_count: embedded_chunk_count as usize,
1028 })
1029 })?;
1030 let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1031 Ok(files)
1032 }
1033
1034 pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1035 let conn = self
1036 .db
1037 .lock()
1038 .map_err(|_| CoreError::poisoned("database"))?;
1039
1040 let pattern = format!("{prefix}%");
1041 let mut stmt = conn.prepare(
1042 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1043 FROM documents
1044 WHERE hash LIKE ?1
1045 ORDER BY id ASC",
1046 )?;
1047 let rows = stmt.query_map(params![pattern], decode_document_row)?;
1048 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1049 Ok(docs)
1050 }
1051
1052 pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1053 let conn = self
1054 .db
1055 .lock()
1056 .map_err(|_| CoreError::poisoned("database"))?;
1057
1058 let updated = conn.execute(
1059 "UPDATE documents
1060 SET active = 0,
1061 deactivated_at = CASE
1062 WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1063 ELSE deactivated_at
1064 END
1065 WHERE id = ?1",
1066 params![doc_id],
1067 )?;
1068
1069 if updated == 0 {
1070 return Err(KboltError::DocumentNotFound {
1071 path: format!("id={doc_id}"),
1072 }
1073 .into());
1074 }
1075
1076 Ok(())
1077 }
1078
1079 pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1080 let conn = self
1081 .db
1082 .lock()
1083 .map_err(|_| CoreError::poisoned("database"))?;
1084
1085 let updated = conn.execute(
1086 "UPDATE documents
1087 SET active = 1, deactivated_at = NULL
1088 WHERE id = ?1",
1089 params![doc_id],
1090 )?;
1091
1092 if updated == 0 {
1093 return Err(KboltError::DocumentNotFound {
1094 path: format!("id={doc_id}"),
1095 }
1096 .into());
1097 }
1098
1099 Ok(())
1100 }
1101
1102 pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1103 let reaped = self.list_reapable_documents(older_than_days)?;
1104 let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1105 self.delete_documents(&doc_ids)?;
1106 Ok(doc_ids)
1107 }
1108
1109 pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1110 self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1111 }
1112
1113 pub fn list_reapable_documents_in_space(
1114 &self,
1115 older_than_days: u32,
1116 space_id: i64,
1117 ) -> Result<Vec<ReapableDocument>> {
1118 self.list_reapable_documents_filtered(
1119 older_than_days,
1120 " AND c.space_id = ?",
1121 vec![SqlValue::Integer(space_id)],
1122 )
1123 }
1124
1125 pub fn list_reapable_documents_in_collections(
1126 &self,
1127 older_than_days: u32,
1128 collection_ids: &[i64],
1129 ) -> Result<Vec<ReapableDocument>> {
1130 if collection_ids.is_empty() {
1131 return Ok(Vec::new());
1132 }
1133
1134 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1135 let clause = format!(" AND d.collection_id IN ({placeholders})");
1136 let params = collection_ids
1137 .iter()
1138 .map(|id| SqlValue::Integer(*id))
1139 .collect::<Vec<_>>();
1140 self.list_reapable_documents_filtered(older_than_days, &clause, params)
1141 }
1142
1143 fn list_reapable_documents_filtered(
1144 &self,
1145 older_than_days: u32,
1146 scope_clause: &str,
1147 scope_params: Vec<SqlValue>,
1148 ) -> Result<Vec<ReapableDocument>> {
1149 let conn = self
1150 .db
1151 .lock()
1152 .map_err(|_| CoreError::poisoned("database"))?;
1153
1154 let modifier = format!("-{} days", older_than_days);
1155 let sql = format!(
1156 "SELECT d.id, s.name
1157 FROM documents d
1158 JOIN collections c ON c.id = d.collection_id
1159 JOIN spaces s ON s.id = c.space_id
1160 WHERE d.active = 0
1161 AND d.deactivated_at IS NOT NULL
1162 AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1163 ORDER BY d.id ASC"
1164 );
1165 let mut stmt = conn.prepare(&sql)?;
1166 let mut params = Vec::with_capacity(scope_params.len() + 1);
1167 params.push(SqlValue::Text(modifier));
1168 params.extend(scope_params);
1169 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1170 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1171 })?;
1172 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1173 drop(stmt);
1174
1175 let mut documents = Vec::with_capacity(headers.len());
1176 for (doc_id, space_name) in headers {
1177 documents.push(ReapableDocument {
1178 doc_id,
1179 space_name,
1180 chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1181 });
1182 }
1183
1184 Ok(documents)
1185 }
1186
1187 pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1188 if doc_ids.is_empty() {
1189 return Ok(());
1190 }
1191
1192 let conn = self
1193 .db
1194 .lock()
1195 .map_err(|_| CoreError::poisoned("database"))?;
1196
1197 let placeholders = vec!["?"; doc_ids.len()].join(", ");
1198 let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1199 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1200 Ok(())
1201 }
1202
1203 pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1204 if chunks.is_empty() {
1205 return Ok(Vec::new());
1206 }
1207
1208 let conn = self
1209 .db
1210 .lock()
1211 .map_err(|_| CoreError::poisoned("database"))?;
1212 let _doc = lookup_document_id(&conn, doc_id)?;
1213
1214 let tx = conn.unchecked_transaction()?;
1215 let mut stmt = tx.prepare(
1216 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind)
1217 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1218 )?;
1219
1220 let mut ids = Vec::with_capacity(chunks.len());
1221 for chunk in chunks {
1222 stmt.execute(params![
1223 doc_id,
1224 chunk.seq,
1225 chunk.offset as i64,
1226 chunk.length as i64,
1227 chunk.heading,
1228 chunk.kind.as_storage_kind(),
1229 ])?;
1230 ids.push(tx.last_insert_rowid());
1231 }
1232 drop(stmt);
1233 tx.commit()?;
1234 Ok(ids)
1235 }
1236
1237 pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1238 let conn = self
1239 .db
1240 .lock()
1241 .map_err(|_| CoreError::poisoned("database"))?;
1242 let _doc = lookup_document_id(&conn, doc_id)?;
1243
1244 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1245 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1246 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1247
1248 conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1249 Ok(chunk_ids)
1250 }
1251
1252 pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1253 let conn = self
1254 .db
1255 .lock()
1256 .map_err(|_| CoreError::poisoned("database"))?;
1257 let _doc = lookup_document_id(&conn, doc_id)?;
1258
1259 let mut stmt = conn.prepare(
1260 "SELECT id, doc_id, seq, offset, length, heading, kind
1261 FROM chunks
1262 WHERE doc_id = ?1
1263 ORDER BY seq ASC",
1264 )?;
1265 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1266 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1267 Ok(chunks)
1268 }
1269
1270 pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1271 if chunk_ids.is_empty() {
1272 return Ok(Vec::new());
1273 }
1274
1275 let conn = self
1276 .db
1277 .lock()
1278 .map_err(|_| CoreError::poisoned("database"))?;
1279
1280 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1281 let sql = format!(
1282 "SELECT id, doc_id, seq, offset, length, heading, kind
1283 FROM chunks
1284 WHERE id IN ({placeholders})
1285 ORDER BY id ASC"
1286 );
1287 let mut stmt = conn.prepare(&sql)?;
1288 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1289 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1290 Ok(chunks)
1291 }
1292
1293 pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
1294 if entries.is_empty() {
1295 return Ok(());
1296 }
1297
1298 let conn = self
1299 .db
1300 .lock()
1301 .map_err(|_| CoreError::poisoned("database"))?;
1302
1303 let tx = conn.unchecked_transaction()?;
1304 let mut stmt = tx.prepare(
1305 "INSERT INTO embeddings (chunk_id, model, embedded_at)
1306 VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1307 ON CONFLICT(chunk_id, model) DO UPDATE SET
1308 embedded_at = excluded.embedded_at",
1309 )?;
1310
1311 for (chunk_id, model) in entries {
1312 stmt.execute(params![chunk_id, model])?;
1313 }
1314
1315 drop(stmt);
1316 tx.commit()?;
1317 Ok(())
1318 }
1319
1320 pub fn get_unembedded_chunks(
1321 &self,
1322 model: &str,
1323 after_chunk_id: i64,
1324 limit: usize,
1325 ) -> Result<Vec<EmbedRecord>> {
1326 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
1327 }
1328
1329 pub fn get_unembedded_chunks_in_space(
1330 &self,
1331 model: &str,
1332 space_id: i64,
1333 after_chunk_id: i64,
1334 limit: usize,
1335 ) -> Result<Vec<EmbedRecord>> {
1336 self.get_unembedded_chunks_filtered(
1337 model,
1338 after_chunk_id,
1339 limit,
1340 " AND col.space_id = ?",
1341 vec![SqlValue::Integer(space_id)],
1342 )
1343 }
1344
1345 pub fn get_unembedded_chunks_in_collections(
1346 &self,
1347 model: &str,
1348 collection_ids: &[i64],
1349 after_chunk_id: i64,
1350 limit: usize,
1351 ) -> Result<Vec<EmbedRecord>> {
1352 if collection_ids.is_empty() {
1353 return Ok(Vec::new());
1354 }
1355
1356 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1357 let clause = format!(" AND d.collection_id IN ({placeholders})");
1358 let params = collection_ids
1359 .iter()
1360 .map(|id| SqlValue::Integer(*id))
1361 .collect::<Vec<_>>();
1362 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
1363 }
1364
1365 fn get_unembedded_chunks_filtered(
1366 &self,
1367 model: &str,
1368 after_chunk_id: i64,
1369 limit: usize,
1370 scope_clause: &str,
1371 scope_params: Vec<SqlValue>,
1372 ) -> Result<Vec<EmbedRecord>> {
1373 let conn = self
1374 .db
1375 .lock()
1376 .map_err(|_| CoreError::poisoned("database"))?;
1377 let sql_limit = i64::try_from(limit)
1378 .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
1379
1380 let sql = format!(
1381 "SELECT c.id, d.path, col.path, s.name, c.offset, c.length
1382 FROM chunks c
1383 JOIN documents d ON d.id = c.doc_id
1384 JOIN collections col ON col.id = d.collection_id
1385 JOIN spaces s ON s.id = col.space_id
1386 LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
1387 WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
1388 ORDER BY c.id ASC
1389 LIMIT ?"
1390 );
1391 let mut stmt = conn.prepare(&sql)?;
1392 let mut params = Vec::with_capacity(scope_params.len() + 3);
1393 params.push(SqlValue::Text(model.to_string()));
1394 params.push(SqlValue::Integer(after_chunk_id));
1395 params.extend(scope_params);
1396 params.push(SqlValue::Integer(sql_limit));
1397 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1398 let offset_value: i64 = row.get(4)?;
1399 let length_value: i64 = row.get(5)?;
1400 Ok(EmbedRecord {
1401 chunk_id: row.get(0)?,
1402 doc_path: row.get(1)?,
1403 collection_path: PathBuf::from(row.get::<_, String>(2)?),
1404 space_name: row.get(3)?,
1405 offset: offset_value as usize,
1406 length: length_value as usize,
1407 })
1408 })?;
1409
1410 let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1411 Ok(records)
1412 }
1413
1414 pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
1415 let conn = self
1416 .db
1417 .lock()
1418 .map_err(|_| CoreError::poisoned("database"))?;
1419
1420 let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
1421 Ok(deleted)
1422 }
1423
1424 pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
1425 let conn = self
1426 .db
1427 .lock()
1428 .map_err(|_| CoreError::poisoned("database"))?;
1429 let _space_name = lookup_space_name(&conn, space_id)?;
1430
1431 let deleted = conn.execute(
1432 "DELETE FROM embeddings
1433 WHERE chunk_id IN (
1434 SELECT c.id
1435 FROM chunks c
1436 JOIN documents d ON d.id = c.doc_id
1437 JOIN collections col ON col.id = d.collection_id
1438 WHERE col.space_id = ?1
1439 )",
1440 params![space_id],
1441 )?;
1442 Ok(deleted)
1443 }
1444
1445 pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
1446 let conn = self
1447 .db
1448 .lock()
1449 .map_err(|_| CoreError::poisoned("database"))?;
1450 let _space_name = lookup_space_name(&conn, space_id)?;
1451
1452 let mut stmt = conn.prepare(
1453 "SELECT DISTINCT e.model
1454 FROM embeddings e
1455 JOIN chunks c ON c.id = e.chunk_id
1456 JOIN documents d ON d.id = c.doc_id
1457 JOIN collections col ON col.id = d.collection_id
1458 WHERE col.space_id = ?1
1459 ORDER BY e.model ASC",
1460 )?;
1461 let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
1462 let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1463 Ok(models)
1464 }
1465
1466 pub fn count_embeddings(&self) -> Result<usize> {
1467 let conn = self
1468 .db
1469 .lock()
1470 .map_err(|_| CoreError::poisoned("database"))?;
1471
1472 let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
1473 Ok(count as usize)
1474 }
1475
1476 pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
1477 if entries.is_empty() {
1478 return Ok(());
1479 }
1480
1481 let space_indexes = self.get_space_indexes(space)?;
1482 with_tantivy_writer(&space_indexes, |writer| {
1483 for entry in entries {
1484 let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
1485 CoreError::Internal(format!(
1486 "chunk_id must be non-negative for tantivy indexing: {}",
1487 entry.chunk_id
1488 ))
1489 })?;
1490 let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
1491 CoreError::Internal(format!(
1492 "doc_id must be non-negative for tantivy indexing: {}",
1493 entry.doc_id
1494 ))
1495 })?;
1496
1497 let mut doc = TantivyDocument::default();
1498 doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
1499 doc.add_u64(space_indexes.fields.doc_id, doc_id);
1500 doc.add_text(space_indexes.fields.filepath, &entry.filepath);
1501 if let Some(title) = &entry.semantic_title {
1502 doc.add_text(space_indexes.fields.title, title);
1503 }
1504 if let Some(heading) = &entry.heading {
1505 doc.add_text(space_indexes.fields.heading, heading);
1506 }
1507 doc.add_text(space_indexes.fields.body, &entry.body);
1508 writer.add_document(doc)?;
1509 }
1510 Ok(())
1511 })
1512 }
1513
1514 pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
1515 if chunk_ids.is_empty() {
1516 return Ok(());
1517 }
1518
1519 let space_indexes = self.get_space_indexes(space)?;
1520 with_tantivy_writer(&space_indexes, |writer| {
1521 for chunk_id in chunk_ids {
1522 let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
1523 CoreError::Internal(format!(
1524 "chunk_id must be non-negative for tantivy delete: {chunk_id}"
1525 ))
1526 })?;
1527 writer.delete_term(Term::from_field_u64(
1528 space_indexes.fields.chunk_id,
1529 chunk_key,
1530 ));
1531 }
1532
1533 Ok(())
1534 })
1535 }
1536
1537 pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
1538 let space_indexes = self.get_space_indexes(space)?;
1539 with_tantivy_writer(&space_indexes, |writer| {
1540 let doc_key = u64::try_from(doc_id).map_err(|_| {
1541 CoreError::Internal(format!(
1542 "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
1543 ))
1544 })?;
1545 writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
1546 Ok(())
1547 })
1548 }
1549
1550 pub fn query_bm25(
1551 &self,
1552 space: &str,
1553 query: &str,
1554 fields: &[(&str, f32)],
1555 limit: usize,
1556 ) -> Result<Vec<BM25Hit>> {
1557 if limit == 0 {
1558 return Ok(Vec::new());
1559 }
1560
1561 if fields.is_empty() {
1562 return Err(CoreError::Internal(
1563 "bm25 query requires at least one field".to_string(),
1564 ));
1565 }
1566
1567 let space_indexes = self.get_space_indexes(space)?;
1568
1569 let schema = space_indexes.tantivy_index.schema();
1570 let query_fields = fields
1571 .iter()
1572 .map(|(name, boost)| {
1573 let field = resolve_tantivy_field(space_indexes.fields, name)?;
1574 let field_entry = schema.get_field_entry(field);
1575 let index_record_option = field_entry
1576 .field_type()
1577 .get_index_record_option()
1578 .ok_or_else(|| {
1579 CoreError::Internal(format!(
1580 "bm25 field '{}' is not indexed",
1581 field_entry.name()
1582 ))
1583 })?;
1584 Ok(Bm25FieldSpec {
1585 field,
1586 boost: *boost,
1587 index_record_option,
1588 })
1589 })
1590 .collect::<Result<Vec<_>>>()?;
1591 let Some(parsed_query) =
1592 build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
1593 else {
1594 return Ok(Vec::new());
1595 };
1596 let reader = space_indexes.tantivy_index.reader()?;
1597 reader.reload()?;
1598 let searcher = reader.searcher();
1599 let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
1600
1601 let mut hits = Vec::with_capacity(docs.len());
1602 for (score, address) in docs {
1603 let doc = searcher.doc::<TantivyDocument>(address)?;
1604 let chunk_id = doc
1605 .get_first(space_indexes.fields.chunk_id)
1606 .and_then(|value| value.as_u64())
1607 .ok_or_else(|| {
1608 CoreError::Internal("tantivy hit missing chunk_id field".to_string())
1609 })?;
1610 hits.push(BM25Hit {
1611 chunk_id: chunk_id as i64,
1612 score,
1613 });
1614 }
1615
1616 Ok(hits)
1617 }
1618
1619 pub fn commit_tantivy(&self, space: &str) -> Result<()> {
1620 let space_indexes = self.get_space_indexes(space)?;
1621 let mut writer = space_indexes
1622 .tantivy_writer
1623 .lock()
1624 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
1625 let Some(writer) = writer.as_mut() else {
1626 return Ok(());
1627 };
1628 writer.commit()?;
1629 Ok(())
1630 }
1631
1632 pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
1633 self.batch_insert_usearch(space, &[(key, vector)])
1634 }
1635
1636 pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
1637 if entries.is_empty() {
1638 return Ok(());
1639 }
1640
1641 let space_indexes = self.get_space_indexes(space)?;
1642
1643 let expected_dimensions = entries[0].1.len();
1644 if expected_dimensions == 0 {
1645 return Err(CoreError::Internal(
1646 "cannot insert empty vector into usearch index".to_string(),
1647 ));
1648 }
1649 for (_, vector) in entries {
1650 if vector.len() != expected_dimensions {
1651 return Err(CoreError::Internal(format!(
1652 "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
1653 vector.len()
1654 )));
1655 }
1656 }
1657
1658 let mut index = space_indexes
1659 .usearch_index
1660 .write()
1661 .map_err(|_| CoreError::poisoned("usearch index"))?;
1662 ensure_usearch_dimensions(&mut index, expected_dimensions)?;
1663 let target_capacity = index.size().saturating_add(entries.len());
1664 index.reserve(target_capacity).map_err(|err| {
1665 CoreError::Internal(format!(
1666 "usearch reserve failed for {target_capacity} items: {err}"
1667 ))
1668 })?;
1669
1670 for (key, vector) in entries {
1671 let key = u64::try_from(*key).map_err(|_| {
1672 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
1673 })?;
1674 index
1675 .add::<f32>(key, vector)
1676 .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
1677 }
1678
1679 save_usearch_index(&index, &space_indexes.usearch_path)?;
1680 Ok(())
1681 }
1682
1683 pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
1684 if keys.is_empty() {
1685 return Ok(());
1686 }
1687
1688 let space_indexes = self.get_space_indexes(space)?;
1689 let index = space_indexes
1690 .usearch_index
1691 .write()
1692 .map_err(|_| CoreError::poisoned("usearch index"))?;
1693
1694 for key in keys {
1695 let key = u64::try_from(*key).map_err(|_| {
1696 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
1697 })?;
1698 index
1699 .remove(key)
1700 .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
1701 }
1702
1703 save_usearch_index(&index, &space_indexes.usearch_path)?;
1704 Ok(())
1705 }
1706
1707 pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
1708 if limit == 0 {
1709 return Ok(Vec::new());
1710 }
1711 if vector.is_empty() {
1712 return Err(CoreError::Internal(
1713 "cannot query usearch with empty vector".to_string(),
1714 ));
1715 }
1716
1717 let space_indexes = self.get_space_indexes(space)?;
1718 let index = space_indexes
1719 .usearch_index
1720 .read()
1721 .map_err(|_| CoreError::poisoned("usearch index"))?;
1722
1723 if index.size() == 0 {
1724 return Ok(Vec::new());
1725 }
1726 if vector.len() != index.dimensions() {
1727 return Err(CoreError::Internal(format!(
1728 "query vector dimension mismatch: expected {}, got {}",
1729 index.dimensions(),
1730 vector.len()
1731 )));
1732 }
1733
1734 let matches = index
1735 .search::<f32>(vector, limit)
1736 .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?;
1737 let hits = matches
1738 .keys
1739 .into_iter()
1740 .zip(matches.distances)
1741 .map(|(key, distance)| DenseHit {
1742 chunk_id: key as i64,
1743 distance,
1744 })
1745 .collect();
1746 Ok(hits)
1747 }
1748
1749 pub fn count_usearch(&self, space: &str) -> Result<usize> {
1750 let space_indexes = self.get_space_indexes(space)?;
1751 let index = space_indexes
1752 .usearch_index
1753 .read()
1754 .map_err(|_| CoreError::poisoned("usearch index"))?;
1755 Ok(index.size())
1756 }
1757
1758 pub fn clear_usearch(&self, space: &str) -> Result<()> {
1759 let space_indexes = self.get_space_indexes(space)?;
1760 let index = space_indexes
1761 .usearch_index
1762 .write()
1763 .map_err(|_| CoreError::poisoned("usearch index"))?;
1764 index
1765 .reset()
1766 .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
1767 std::fs::File::create(&space_indexes.usearch_path)?;
1768 Ok(())
1769 }
1770
1771 pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
1772 self.get_fts_dirty_documents_filtered("", Vec::new())
1773 }
1774
1775 pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
1776 self.get_fts_dirty_documents_filtered(
1777 " AND c.space_id = ?",
1778 vec![SqlValue::Integer(space_id)],
1779 )
1780 }
1781
1782 pub fn get_fts_dirty_documents_in_collections(
1783 &self,
1784 collection_ids: &[i64],
1785 ) -> Result<Vec<FtsDirtyRecord>> {
1786 if collection_ids.is_empty() {
1787 return Ok(Vec::new());
1788 }
1789
1790 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1791 let clause = format!(" AND d.collection_id IN ({placeholders})");
1792 let params = collection_ids
1793 .iter()
1794 .map(|id| SqlValue::Integer(*id))
1795 .collect::<Vec<_>>();
1796 self.get_fts_dirty_documents_filtered(&clause, params)
1797 }
1798
1799 fn get_fts_dirty_documents_filtered(
1800 &self,
1801 scope_clause: &str,
1802 scope_params: Vec<SqlValue>,
1803 ) -> Result<Vec<FtsDirtyRecord>> {
1804 let conn = self
1805 .db
1806 .lock()
1807 .map_err(|_| CoreError::poisoned("database"))?;
1808
1809 let sql = format!(
1810 "SELECT d.id, d.path, d.title, d.title_source, d.hash, c.path, s.name
1811 FROM documents d
1812 JOIN collections c ON c.id = d.collection_id
1813 JOIN spaces s ON s.id = c.space_id
1814 WHERE d.fts_dirty = 1{scope_clause}
1815 ORDER BY d.id ASC"
1816 );
1817 let mut stmt = conn.prepare(&sql)?;
1818 let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
1819 Ok((
1820 row.get::<_, i64>(0)?,
1821 row.get::<_, String>(1)?,
1822 row.get::<_, String>(2)?,
1823 row.get::<_, String>(3)?,
1824 row.get::<_, String>(4)?,
1825 row.get::<_, String>(5)?,
1826 row.get::<_, String>(6)?,
1827 ))
1828 })?;
1829 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1830 drop(stmt);
1831
1832 let mut records = Vec::with_capacity(headers.len());
1833 for (
1834 doc_id,
1835 doc_path,
1836 doc_title,
1837 doc_title_source,
1838 doc_hash,
1839 collection_path,
1840 space_name,
1841 ) in headers
1842 {
1843 let chunks = load_chunks_for_doc(&conn, doc_id)?;
1844 records.push(FtsDirtyRecord {
1845 doc_id,
1846 doc_path,
1847 doc_title,
1848 doc_title_source: DocumentTitleSource::from_sql(&doc_title_source)?,
1849 doc_hash,
1850 collection_path: PathBuf::from(collection_path),
1851 space_name,
1852 chunks,
1853 });
1854 }
1855
1856 Ok(records)
1857 }
1858
1859 pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
1860 if doc_ids.is_empty() {
1861 return Ok(());
1862 }
1863
1864 let conn = self
1865 .db
1866 .lock()
1867 .map_err(|_| CoreError::poisoned("database"))?;
1868
1869 let placeholders = vec!["?"; doc_ids.len()].join(", ");
1870 let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
1871 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1872 Ok(())
1873 }
1874
1875 pub fn count_documents_in_collection(
1876 &self,
1877 collection_id: i64,
1878 active_only: bool,
1879 ) -> Result<usize> {
1880 let conn = self
1881 .db
1882 .lock()
1883 .map_err(|_| CoreError::poisoned("database"))?;
1884 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1885 let active_only = i64::from(active_only);
1886 query_count(
1887 &conn,
1888 "SELECT COUNT(*)
1889 FROM documents
1890 WHERE collection_id = ?1
1891 AND (?2 = 0 OR active = 1)",
1892 params![collection_id, active_only],
1893 )
1894 }
1895
1896 pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
1897 let conn = self
1898 .db
1899 .lock()
1900 .map_err(|_| CoreError::poisoned("database"))?;
1901 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1902
1903 query_count(
1904 &conn,
1905 "SELECT COUNT(*)
1906 FROM chunks c
1907 JOIN documents d ON d.id = c.doc_id
1908 WHERE d.collection_id = ?1",
1909 params![collection_id],
1910 )
1911 }
1912
1913 pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
1914 let conn = self
1915 .db
1916 .lock()
1917 .map_err(|_| CoreError::poisoned("database"))?;
1918 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1919
1920 query_count(
1921 &conn,
1922 "SELECT COUNT(DISTINCT e.chunk_id)
1923 FROM embeddings e
1924 JOIN chunks c ON c.id = e.chunk_id
1925 JOIN documents d ON d.id = c.doc_id
1926 WHERE d.collection_id = ?1",
1927 params![collection_id],
1928 )
1929 }
1930
1931 pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
1932 let conn = self
1933 .db
1934 .lock()
1935 .map_err(|_| CoreError::poisoned("database"))?;
1936
1937 match space_id {
1938 Some(space_id) => {
1939 let _space_name = lookup_space_name(&conn, space_id)?;
1940 query_count(
1941 &conn,
1942 "SELECT COUNT(*)
1943 FROM documents d
1944 JOIN collections c ON c.id = d.collection_id
1945 WHERE c.space_id = ?1",
1946 params![space_id],
1947 )
1948 }
1949 None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
1950 }
1951 }
1952
1953 pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
1954 let conn = self
1955 .db
1956 .lock()
1957 .map_err(|_| CoreError::poisoned("database"))?;
1958
1959 match space_id {
1960 Some(space_id) => {
1961 let _space_name = lookup_space_name(&conn, space_id)?;
1962 query_count(
1963 &conn,
1964 "SELECT COUNT(*)
1965 FROM chunks c
1966 JOIN documents d ON d.id = c.doc_id
1967 JOIN collections col ON col.id = d.collection_id
1968 WHERE col.space_id = ?1",
1969 params![space_id],
1970 )
1971 }
1972 None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
1973 }
1974 }
1975
1976 pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
1977 let conn = self
1978 .db
1979 .lock()
1980 .map_err(|_| CoreError::poisoned("database"))?;
1981
1982 match space_id {
1983 Some(space_id) => {
1984 let _space_name = lookup_space_name(&conn, space_id)?;
1985 query_count(
1986 &conn,
1987 "SELECT COUNT(DISTINCT e.chunk_id)
1988 FROM embeddings e
1989 JOIN chunks c ON c.id = e.chunk_id
1990 JOIN documents d ON d.id = c.doc_id
1991 JOIN collections col ON col.id = d.collection_id
1992 WHERE col.space_id = ?1",
1993 params![space_id],
1994 )
1995 }
1996 None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
1997 }
1998 }
1999
2000 pub fn disk_usage(&self) -> Result<DiskUsage> {
2001 let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2002
2003 let mut tantivy_bytes = 0_u64;
2004 let mut usearch_bytes = 0_u64;
2005 let spaces_dir = self.cache_dir.join(SPACES_DIR);
2006 if spaces_dir.exists() {
2007 for entry in std::fs::read_dir(&spaces_dir)? {
2008 let space_dir = entry?.path();
2009 if !space_dir.is_dir() {
2010 continue;
2011 }
2012
2013 tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2014 usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2015 }
2016 }
2017
2018 let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2019 let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2020
2021 Ok(DiskUsage {
2022 sqlite_bytes,
2023 tantivy_bytes,
2024 usearch_bytes,
2025 models_bytes,
2026 total_bytes,
2027 })
2028 }
2029
2030 fn unload_space(&self, name: &str) -> Result<()> {
2031 let mut spaces = self
2032 .spaces
2033 .write()
2034 .map_err(|_| CoreError::poisoned("spaces"))?;
2035 spaces.remove(name);
2036 Ok(())
2037 }
2038
2039 fn remove_space_artifacts(&self, name: &str) -> Result<()> {
2040 let space_root = self.space_root_path(name);
2041 if space_root.exists() {
2042 std::fs::remove_dir_all(space_root)?;
2043 }
2044 Ok(())
2045 }
2046
2047 fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
2048 let old_root = self.space_root_path(old);
2049 let new_root = self.space_root_path(new);
2050 if !old_root.exists() {
2051 return Ok(());
2052 }
2053
2054 if new_root.exists() {
2055 return Err(CoreError::Internal(format!(
2056 "cannot rename space artifacts: destination already exists: {}",
2057 new_root.display()
2058 )));
2059 }
2060
2061 if let Some(parent) = new_root.parent() {
2062 std::fs::create_dir_all(parent)?;
2063 }
2064 std::fs::rename(old_root, new_root)?;
2065 Ok(())
2066 }
2067
2068 fn space_root_path(&self, name: &str) -> PathBuf {
2069 self.cache_dir.join(SPACES_DIR).join(name)
2070 }
2071
2072 fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
2073 let space_root = self.space_root_path(name);
2074 let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
2075 let usearch_path = space_root.join(USEARCH_FILENAME);
2076 (tantivy_dir, usearch_path)
2077 }
2078
2079 fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
2080 self.open_space(name)?;
2081 let spaces = self
2082 .spaces
2083 .read()
2084 .map_err(|_| CoreError::poisoned("spaces"))?;
2085 spaces.get(name).cloned().ok_or_else(|| {
2086 KboltError::SpaceNotFound {
2087 name: name.to_string(),
2088 }
2089 .into()
2090 })
2091 }
2092}
2093
2094fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
2095 let result = conn.query_row(
2096 "SELECT name FROM spaces WHERE id = ?1",
2097 params![space_id],
2098 |row| row.get::<_, String>(0),
2099 );
2100 match result {
2101 Ok(name) => Ok(name),
2102 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
2103 name: format!("id={space_id}"),
2104 }
2105 .into()),
2106 Err(err) => Err(err.into()),
2107 }
2108}
2109
2110fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
2111 let result = conn.query_row(
2112 "SELECT name FROM collections WHERE id = ?1",
2113 params![collection_id],
2114 |row| row.get::<_, String>(0),
2115 );
2116 match result {
2117 Ok(name) => Ok(name),
2118 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
2119 name: format!("id={collection_id}"),
2120 }
2121 .into()),
2122 Err(err) => Err(err.into()),
2123 }
2124}
2125
2126fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
2127 let result = conn.query_row(
2128 "SELECT id FROM documents WHERE id = ?1",
2129 params![doc_id],
2130 |row| row.get::<_, i64>(0),
2131 );
2132 match result {
2133 Ok(id) => Ok(id),
2134 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
2135 path: format!("id={doc_id}"),
2136 }
2137 .into()),
2138 Err(err) => Err(err.into()),
2139 }
2140}
2141
2142fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
2143 let mut stmt = conn.prepare(
2144 "SELECT id, doc_id, seq, offset, length, heading, kind
2145 FROM chunks
2146 WHERE doc_id = ?1
2147 ORDER BY seq ASC",
2148 )?;
2149 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
2150 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2151 Ok(chunks)
2152}
2153
2154fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
2155 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
2156 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
2157 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2158 Ok(chunk_ids)
2159}
2160
2161fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
2162 let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
2163 Ok(count as usize)
2164}
2165
2166fn file_size_or_zero(path: &Path) -> Result<u64> {
2167 if !path.exists() {
2168 return Ok(0);
2169 }
2170
2171 let metadata = std::fs::metadata(path)?;
2172 if metadata.is_file() {
2173 Ok(metadata.len())
2174 } else {
2175 Ok(0)
2176 }
2177}
2178
2179fn dir_size_or_zero(path: &Path) -> Result<u64> {
2180 if !path.exists() {
2181 return Ok(0);
2182 }
2183
2184 let mut total = 0_u64;
2185 for entry in std::fs::read_dir(path)? {
2186 let entry = entry?;
2187 let child_path = entry.path();
2188 let metadata = std::fs::symlink_metadata(&child_path)?;
2189 if metadata.is_file() {
2190 total += metadata.len();
2191 } else if metadata.is_dir() {
2192 total += dir_size_or_zero(&child_path)?;
2193 }
2194 }
2195
2196 Ok(total)
2197}
2198
2199fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
2200 let meta_path = path.join("meta.json");
2201 if meta_path.exists() {
2202 return Ok(Index::open_in_dir(path)?);
2203 }
2204
2205 Ok(Index::create_in_dir(path, tantivy_schema())?)
2206}
2207
2208fn with_tantivy_writer<T>(
2209 space_indexes: &SpaceIndexes,
2210 f: impl FnOnce(&mut IndexWriter) -> Result<T>,
2211) -> Result<T> {
2212 let mut writer = space_indexes
2213 .tantivy_writer
2214 .lock()
2215 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2216
2217 if writer.is_none() {
2218 *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
2219 }
2220
2221 let writer = writer
2222 .as_mut()
2223 .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
2224 f(writer)
2225}
2226
2227fn ensure_documents_title_source_column(conn: &Connection) -> Result<()> {
2228 let mut stmt = conn.prepare("PRAGMA table_info(documents)")?;
2229 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2230 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2231 drop(stmt);
2232
2233 if columns.iter().any(|column| column == "title_source") {
2234 return Ok(());
2235 }
2236
2237 conn.execute(
2238 "ALTER TABLE documents ADD COLUMN title_source TEXT NOT NULL DEFAULT 'extracted'",
2239 [],
2240 )?;
2241 Ok(())
2242}
2243
2244fn build_literal_bm25_query(
2245 index: &Index,
2246 fields: &[Bm25FieldSpec],
2247 query: &str,
2248) -> Result<Option<Box<dyn Query>>> {
2249 let mut clauses = Vec::new();
2250 for field in fields {
2251 for token in analyzed_terms_for_field(index, field.field, query)? {
2252 let term_query: Box<dyn Query> = Box::new(TermQuery::new(
2253 Term::from_field_text(field.field, &token),
2254 field.index_record_option,
2255 ));
2256 let query = if (field.boost - 1.0).abs() > f32::EPSILON {
2257 Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
2258 } else {
2259 term_query
2260 };
2261 clauses.push((Occur::Should, query));
2262 }
2263 }
2264
2265 if clauses.is_empty() {
2266 Ok(None)
2267 } else {
2268 Ok(Some(Box::new(BooleanQuery::new(clauses))))
2269 }
2270}
2271
2272fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
2273 let mut analyzer = index.tokenizer_for_field(field)?;
2274 let mut stream = analyzer.token_stream(query);
2275 let mut terms = Vec::new();
2276 let mut seen = HashSet::new();
2277 while let Some(token) = stream.next() {
2278 if token.text.is_empty() {
2279 continue;
2280 }
2281 let text = token.text.clone();
2282 if seen.insert(text.clone()) {
2283 terms.push(text);
2284 }
2285 }
2286 Ok(terms)
2287}
2288
2289fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
2290 let options = IndexOptions {
2291 dimensions,
2292 metric: MetricKind::Cos,
2293 quantization: ScalarKind::F32,
2294 connectivity: 16,
2295 expansion_add: 200,
2296 expansion_search: 100,
2297 ..IndexOptions::default()
2298 };
2299 usearch::Index::new(&options)
2300 .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
2301}
2302
2303fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
2304 let index = new_usearch_index(256)?;
2305 let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
2306 if file_size > 0 {
2307 let path = path
2308 .to_str()
2309 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
2310 index
2311 .load(path)
2312 .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
2313 }
2314 Ok(index)
2315}
2316
2317fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
2318 if index.size() == 0 && index.dimensions() != expected_dimensions {
2319 *index = new_usearch_index(expected_dimensions)?;
2320 return Ok(());
2321 }
2322
2323 if index.dimensions() != expected_dimensions {
2324 return Err(CoreError::Internal(format!(
2325 "usearch vector dimension mismatch: index expects {}, got {}",
2326 index.dimensions(),
2327 expected_dimensions
2328 )));
2329 }
2330 Ok(())
2331}
2332
2333fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
2334 if index.size() == 0 {
2335 std::fs::File::create(path)?;
2336 return Ok(());
2337 }
2338
2339 let path = path
2340 .to_str()
2341 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
2342 index
2343 .save(path)
2344 .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
2345 Ok(())
2346}
2347
2348fn tantivy_schema() -> tantivy::schema::Schema {
2349 let mut builder = tantivy::schema::Schema::builder();
2350 builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
2351 builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
2352 builder.add_text_field("filepath", TEXT | STORED);
2353 builder.add_text_field("title", TEXT | STORED);
2354 builder.add_text_field("heading", TEXT | STORED);
2355 builder.add_text_field("body", TEXT);
2356 builder.build()
2357}
2358
2359fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
2360 Ok(TantivyFields {
2361 chunk_id: schema.get_field("chunk_id").map_err(|_| {
2362 CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
2363 })?,
2364 doc_id: schema
2365 .get_field("doc_id")
2366 .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
2367 filepath: schema.get_field("filepath").map_err(|_| {
2368 CoreError::Internal("tantivy schema missing field: filepath".to_string())
2369 })?,
2370 title: schema
2371 .get_field("title")
2372 .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
2373 heading: schema.get_field("heading").map_err(|_| {
2374 CoreError::Internal("tantivy schema missing field: heading".to_string())
2375 })?,
2376 body: schema
2377 .get_field("body")
2378 .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
2379 })
2380}
2381
2382fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
2383 match name {
2384 "chunk_id" => Ok(fields.chunk_id),
2385 "doc_id" => Ok(fields.doc_id),
2386 "filepath" => Ok(fields.filepath),
2387 "title" => Ok(fields.title),
2388 "heading" => Ok(fields.heading),
2389 "body" => Ok(fields.body),
2390 other => Err(CoreError::Internal(format!(
2391 "unsupported tantivy field: {other}"
2392 ))),
2393 }
2394}
2395
2396fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
2397 match extensions {
2398 None => Ok(None),
2399 Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
2400 }
2401}
2402
2403fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
2404 match raw {
2405 None => Ok(None),
2406 Some(json) => serde_json::from_str::<Vec<String>>(&json)
2407 .map(Some)
2408 .map_err(Into::into),
2409 }
2410}
2411
2412fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
2413 Ok(SpaceRow {
2414 id: row.get(0)?,
2415 name: row.get(1)?,
2416 description: row.get(2)?,
2417 created: row.get(3)?,
2418 })
2419}
2420
2421fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
2422 let raw_extensions: Option<String> = row.get(5)?;
2423 let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
2424 Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
2425 })?;
2426 Ok(CollectionRow {
2427 id: row.get(0)?,
2428 space_id: row.get(1)?,
2429 name: row.get(2)?,
2430 path: PathBuf::from(row.get::<_, String>(3)?),
2431 description: row.get(4)?,
2432 extensions,
2433 created: row.get(6)?,
2434 updated: row.get(7)?,
2435 })
2436}
2437
2438fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
2439 let raw_title_source: String = row.get(4)?;
2440 let title_source = DocumentTitleSource::from_sql(&raw_title_source).map_err(|err| {
2441 Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(err))
2442 })?;
2443 let active_value: i64 = row.get(7)?;
2444 let fts_dirty_value: i64 = row.get(9)?;
2445 Ok(DocumentRow {
2446 id: row.get(0)?,
2447 collection_id: row.get(1)?,
2448 path: row.get(2)?,
2449 title: row.get(3)?,
2450 title_source,
2451 hash: row.get(5)?,
2452 modified: row.get(6)?,
2453 active: active_value != 0,
2454 deactivated_at: row.get(8)?,
2455 fts_dirty: fts_dirty_value != 0,
2456 })
2457}
2458
2459fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
2460 let offset_value: i64 = row.get(3)?;
2461 let length_value: i64 = row.get(4)?;
2462 let kind_raw: String = row.get(6)?;
2463 let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
2464 Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(err))
2465 })?;
2466 Ok(ChunkRow {
2467 id: row.get(0)?,
2468 doc_id: row.get(1)?,
2469 seq: row.get(2)?,
2470 offset: offset_value as usize,
2471 length: length_value as usize,
2472 heading: row.get(5)?,
2473 kind,
2474 })
2475}
2476
2477#[cfg(test)]
2478mod tests;