Skip to main content

kbolt_core/storage/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::Value as SqlValue;
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{BooleanQuery, BoostQuery, Occur, Query, TermQuery};
12use tantivy::schema::{Field, IndexRecordOption, Value, FAST, INDEXED, STORED, TEXT};
13use tantivy::tokenizer::TokenStream;
14use tantivy::{Index, IndexWriter, TantivyDocument, Term};
15use usearch::{IndexOptions, MetricKind, ScalarKind};
16
17const DB_FILE: &str = "meta.sqlite";
18const DEFAULT_SPACE_NAME: &str = "default";
19const SPACES_DIR: &str = "spaces";
20const TANTIVY_DIR_NAME: &str = "tantivy";
21const USEARCH_FILENAME: &str = "vectors.usearch";
22
23pub struct Storage {
24    db: Mutex<Connection>,
25    cache_dir: PathBuf,
26    spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct SpaceRow {
31    pub id: i64,
32    pub name: String,
33    pub description: Option<String>,
34    pub created: String,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct CollectionRow {
39    pub id: i64,
40    pub space_id: i64,
41    pub name: String,
42    pub path: PathBuf,
43    pub description: Option<String>,
44    pub extensions: Option<Vec<String>>,
45    pub created: String,
46    pub updated: String,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct DocumentRow {
51    pub id: i64,
52    pub collection_id: i64,
53    pub path: String,
54    pub title: String,
55    pub title_source: DocumentTitleSource,
56    pub hash: String,
57    pub modified: String,
58    pub active: bool,
59    pub deactivated_at: Option<String>,
60    pub fts_dirty: bool,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct FileListRow {
65    pub doc_id: i64,
66    pub path: String,
67    pub title: String,
68    pub hash: String,
69    pub active: bool,
70    pub chunk_count: usize,
71    pub embedded_chunk_count: usize,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ChunkRow {
76    pub id: i64,
77    pub doc_id: i64,
78    pub seq: i32,
79    pub offset: usize,
80    pub length: usize,
81    pub heading: Option<String>,
82    pub kind: FinalChunkKind,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChunkInsert {
87    pub seq: i32,
88    pub offset: usize,
89    pub length: usize,
90    pub heading: Option<String>,
91    pub kind: FinalChunkKind,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct EmbedRecord {
96    pub chunk_id: i64,
97    pub doc_path: String,
98    pub collection_path: PathBuf,
99    pub space_name: String,
100    pub offset: usize,
101    pub length: usize,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct FtsDirtyRecord {
106    pub doc_id: i64,
107    pub doc_path: String,
108    pub doc_title: String,
109    pub doc_title_source: DocumentTitleSource,
110    pub doc_hash: String,
111    pub collection_path: PathBuf,
112    pub space_name: String,
113    pub chunks: Vec<ChunkRow>,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct ReapableDocument {
118    pub doc_id: i64,
119    pub space_name: String,
120    pub chunk_ids: Vec<i64>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct TantivyEntry {
125    pub chunk_id: i64,
126    pub doc_id: i64,
127    pub filepath: String,
128    pub semantic_title: Option<String>,
129    pub heading: Option<String>,
130    pub body: String,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum DocumentTitleSource {
135    Extracted,
136    FilenameFallback,
137}
138
139impl DocumentTitleSource {
140    pub fn as_sql(self) -> &'static str {
141        match self {
142            Self::Extracted => "extracted",
143            Self::FilenameFallback => "filename_fallback",
144        }
145    }
146
147    fn from_sql(raw: &str) -> std::result::Result<Self, KboltError> {
148        match raw {
149            "extracted" => Ok(Self::Extracted),
150            "filename_fallback" => Ok(Self::FilenameFallback),
151            other => Err(KboltError::InvalidInput(format!(
152                "invalid stored document title source: {other}"
153            ))),
154        }
155    }
156
157    pub fn semantic_title(self, title: &str) -> Option<&str> {
158        matches!(self, Self::Extracted)
159            .then_some(title.trim())
160            .filter(|title| !title.is_empty())
161    }
162}
163
164#[derive(Debug, Clone, PartialEq)]
165pub struct BM25Hit {
166    pub chunk_id: i64,
167    pub score: f32,
168}
169
170#[derive(Debug, Clone, PartialEq)]
171pub struct DenseHit {
172    pub chunk_id: i64,
173    pub distance: f32,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub enum SpaceResolution {
178    Found(SpaceRow),
179    Ambiguous(Vec<String>),
180    NotFound,
181}
182
183struct SpaceIndexes {
184    _tantivy_dir: PathBuf,
185    usearch_path: PathBuf,
186    tantivy_index: Index,
187    tantivy_writer: Mutex<Option<IndexWriter>>,
188    usearch_index: RwLock<usearch::Index>,
189    fields: TantivyFields,
190}
191
192#[derive(Debug, Clone, Copy)]
193struct TantivyFields {
194    chunk_id: Field,
195    doc_id: Field,
196    filepath: Field,
197    title: Field,
198    heading: Field,
199    body: Field,
200}
201
202#[derive(Debug, Clone, Copy)]
203struct Bm25FieldSpec {
204    field: Field,
205    boost: f32,
206    index_record_option: IndexRecordOption,
207}
208
209impl Storage {
210    pub fn new(cache_dir: &Path) -> Result<Self> {
211        std::fs::create_dir_all(cache_dir)?;
212        let db_path = cache_dir.join(DB_FILE);
213        let conn = Connection::open(db_path)?;
214        conn.execute_batch(
215            r#"
216PRAGMA foreign_keys = ON;
217PRAGMA journal_mode = WAL;
218
219CREATE TABLE IF NOT EXISTS spaces (
220    id          INTEGER PRIMARY KEY,
221    name        TEXT NOT NULL UNIQUE,
222    description TEXT,
223    created     TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
224);
225
226CREATE TABLE IF NOT EXISTS collections (
227    id          INTEGER PRIMARY KEY,
228    space_id    INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
229    name        TEXT NOT NULL,
230    path        TEXT NOT NULL,
231    description TEXT,
232    extensions  TEXT,
233    created     TEXT NOT NULL,
234    updated     TEXT NOT NULL,
235    UNIQUE(space_id, name)
236);
237
238CREATE TABLE IF NOT EXISTS documents (
239    id              INTEGER PRIMARY KEY,
240    collection_id   INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
241    path            TEXT NOT NULL,
242    title           TEXT NOT NULL,
243    title_source    TEXT NOT NULL DEFAULT 'extracted',
244    hash            TEXT NOT NULL,
245    modified        TEXT NOT NULL,
246    active          INTEGER NOT NULL DEFAULT 1,
247    deactivated_at  TEXT,
248    fts_dirty       INTEGER NOT NULL DEFAULT 0,
249    UNIQUE(collection_id, path)
250);
251CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
252CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
253CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
254
255CREATE TABLE IF NOT EXISTS chunks (
256    id       INTEGER PRIMARY KEY,
257    doc_id   INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
258    seq      INTEGER NOT NULL,
259    offset   INTEGER NOT NULL,
260    length   INTEGER NOT NULL,
261    heading  TEXT,
262    kind     TEXT NOT NULL DEFAULT 'section',
263    UNIQUE(doc_id, seq)
264);
265CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
266
267CREATE TABLE IF NOT EXISTS embeddings (
268    chunk_id    INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
269    model       TEXT NOT NULL,
270    embedded_at TEXT NOT NULL,
271    PRIMARY KEY (chunk_id, model)
272);
273"#,
274        )?;
275        ensure_documents_title_source_column(&conn)?;
276
277        conn.execute(
278            "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
279            params![DEFAULT_SPACE_NAME],
280        )?;
281
282        let mut stmt = conn.prepare("SELECT name FROM spaces ORDER BY name ASC")?;
283        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
284        let space_names = rows.collect::<std::result::Result<Vec<_>, _>>()?;
285        drop(stmt);
286
287        let storage = Self {
288            db: Mutex::new(conn),
289            cache_dir: cache_dir.to_path_buf(),
290            spaces: RwLock::new(HashMap::new()),
291        };
292
293        for space_name in space_names {
294            storage.open_space(&space_name)?;
295        }
296
297        Ok(storage)
298    }
299
300    pub fn open_space(&self, name: &str) -> Result<()> {
301        let _space = self.get_space(name)?;
302
303        {
304            let spaces = self
305                .spaces
306                .read()
307                .map_err(|_| CoreError::poisoned("spaces"))?;
308            if spaces.contains_key(name) {
309                return Ok(());
310            }
311        }
312
313        let (tantivy_dir, usearch_path) = self.space_paths(name);
314        std::fs::create_dir_all(&tantivy_dir)?;
315        std::fs::OpenOptions::new()
316            .create(true)
317            .append(true)
318            .open(&usearch_path)?;
319        let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
320        let usearch_index = open_or_create_usearch_index(&usearch_path)?;
321        let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
322
323        let mut spaces = self
324            .spaces
325            .write()
326            .map_err(|_| CoreError::poisoned("spaces"))?;
327        spaces
328            .entry(name.to_string())
329            .or_insert(Arc::new(SpaceIndexes {
330                _tantivy_dir: tantivy_dir,
331                usearch_path,
332                tantivy_index,
333                tantivy_writer: Mutex::new(None),
334                usearch_index: RwLock::new(usearch_index),
335                fields,
336            }));
337
338        Ok(())
339    }
340
341    pub fn close_space(&self, name: &str) -> Result<()> {
342        let _space = self.get_space(name)?;
343        let mut spaces = self
344            .spaces
345            .write()
346            .map_err(|_| CoreError::poisoned("spaces"))?;
347        let _removed = spaces.remove(name);
348        Ok(())
349    }
350
351    pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
352        let space_id = {
353            let conn = self
354                .db
355                .lock()
356                .map_err(|_| CoreError::poisoned("database"))?;
357
358            let result = conn.execute(
359                "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
360                params![name, description],
361            );
362
363            match result {
364                Ok(_) => conn.last_insert_rowid(),
365                Err(err) => {
366                    return match err {
367                        Error::SqliteFailure(sqlite_err, _)
368                            if sqlite_err.code == ErrorCode::ConstraintViolation =>
369                        {
370                            Err(KboltError::SpaceAlreadyExists {
371                                name: name.to_string(),
372                            }
373                            .into())
374                        }
375                        other => Err(other.into()),
376                    };
377                }
378            }
379        };
380
381        if let Err(open_err) = self.open_space(name) {
382            let rollback_result = self
383                .db
384                .lock()
385                .map_err(|_| CoreError::poisoned("database"))?
386                .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
387
388            if let Err(rollback_err) = rollback_result {
389                return Err(CoreError::Internal(format!(
390                    "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
391                )));
392            }
393
394            return Err(open_err);
395        }
396
397        Ok(space_id)
398    }
399
400    pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
401        let conn = self
402            .db
403            .lock()
404            .map_err(|_| CoreError::poisoned("database"))?;
405        let mut stmt =
406            conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
407
408        let row = stmt.query_row(params![name], decode_space_row);
409
410        match row {
411            Ok(space) => Ok(space),
412            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
413                name: name.to_string(),
414            }
415            .into()),
416            Err(err) => Err(err.into()),
417        }
418    }
419
420    pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
421        let conn = self
422            .db
423            .lock()
424            .map_err(|_| CoreError::poisoned("database"))?;
425        let mut stmt =
426            conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
427
428        let row = stmt.query_row(params![id], decode_space_row);
429
430        match row {
431            Ok(space) => Ok(space),
432            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
433                name: format!("id={id}"),
434            }
435            .into()),
436            Err(err) => Err(err.into()),
437        }
438    }
439
440    pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
441        let conn = self
442            .db
443            .lock()
444            .map_err(|_| CoreError::poisoned("database"))?;
445        let mut stmt = conn.prepare(
446            "SELECT id, name, description, created
447             FROM spaces
448             ORDER BY name ASC",
449        )?;
450
451        let rows = stmt.query_map([], decode_space_row)?;
452
453        let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
454        Ok(spaces)
455    }
456
457    pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
458        let conn = self
459            .db
460            .lock()
461            .map_err(|_| CoreError::poisoned("database"))?;
462        let mut stmt = conn.prepare(
463            "SELECT s.id, s.name, s.description, s.created
464             FROM spaces s
465             JOIN collections c ON c.space_id = s.id
466             WHERE c.name = ?1
467             ORDER BY s.name ASC",
468        )?;
469        let rows = stmt.query_map(params![collection], decode_space_row)?;
470        let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
471
472        if matches.is_empty() {
473            return Ok(SpaceResolution::NotFound);
474        }
475
476        if matches.len() == 1 {
477            return Ok(SpaceResolution::Found(matches[0].clone()));
478        }
479
480        let spaces = matches.into_iter().map(|space| space.name).collect();
481        Ok(SpaceResolution::Ambiguous(spaces))
482    }
483
484    pub fn delete_space(&self, name: &str) -> Result<()> {
485        if name == DEFAULT_SPACE_NAME {
486            let conn = self
487                .db
488                .lock()
489                .map_err(|_| CoreError::poisoned("database"))?;
490            conn.execute(
491                "DELETE FROM collections
492                 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
493                params![name],
494            )?;
495            drop(conn);
496
497            self.unload_space(name)?;
498            self.remove_space_artifacts(name)?;
499            self.open_space(name)?;
500            return Ok(());
501        }
502
503        let conn = self
504            .db
505            .lock()
506            .map_err(|_| CoreError::poisoned("database"))?;
507        let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
508        drop(conn);
509
510        if deleted == 0 {
511            return Err(KboltError::SpaceNotFound {
512                name: name.to_string(),
513            }
514            .into());
515        }
516
517        self.unload_space(name)?;
518        self.remove_space_artifacts(name)?;
519
520        Ok(())
521    }
522
523    pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
524        if old == DEFAULT_SPACE_NAME {
525            return Err(
526                KboltError::Config("cannot rename reserved space: default".to_string()).into(),
527            );
528        }
529
530        let conn = self
531            .db
532            .lock()
533            .map_err(|_| CoreError::poisoned("database"))?;
534
535        let result = conn.execute(
536            "UPDATE spaces SET name = ?1 WHERE name = ?2",
537            params![new, old],
538        );
539        drop(conn);
540
541        match result {
542            Ok(0) => Err(KboltError::SpaceNotFound {
543                name: old.to_string(),
544            }
545            .into()),
546            Ok(_) => {
547                if let Err(rename_err) = self.rename_space_artifacts(old, new) {
548                    let rollback = self
549                        .db
550                        .lock()
551                        .map_err(|_| CoreError::poisoned("database"))?
552                        .execute(
553                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
554                            params![old, new],
555                        );
556
557                    if let Err(rollback_err) = rollback {
558                        return Err(CoreError::Internal(format!(
559                            "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
560                        )));
561                    }
562
563                    return Err(rename_err);
564                }
565
566                self.unload_space(old)?;
567                if let Err(open_err) = self.open_space(new) {
568                    let _ = self.rename_space_artifacts(new, old);
569                    let _ = self
570                        .db
571                        .lock()
572                        .map_err(|_| CoreError::poisoned("database"))?
573                        .execute(
574                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
575                            params![old, new],
576                        );
577                    let _ = self.open_space(old);
578                    return Err(open_err);
579                }
580
581                Ok(())
582            }
583            Err(Error::SqliteFailure(sqlite_err, _))
584                if sqlite_err.code == ErrorCode::ConstraintViolation =>
585            {
586                Err(KboltError::SpaceAlreadyExists {
587                    name: new.to_string(),
588                }
589                .into())
590            }
591            Err(err) => Err(err.into()),
592        }
593    }
594
595    pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
596        let conn = self
597            .db
598            .lock()
599            .map_err(|_| CoreError::poisoned("database"))?;
600
601        let updated = conn.execute(
602            "UPDATE spaces SET description = ?1 WHERE name = ?2",
603            params![description, name],
604        )?;
605
606        if updated == 0 {
607            return Err(KboltError::SpaceNotFound {
608                name: name.to_string(),
609            }
610            .into());
611        }
612
613        Ok(())
614    }
615
616    pub fn create_collection(
617        &self,
618        space_id: i64,
619        name: &str,
620        path: &Path,
621        description: Option<&str>,
622        extensions: Option<&[String]>,
623    ) -> Result<i64> {
624        let conn = self
625            .db
626            .lock()
627            .map_err(|_| CoreError::poisoned("database"))?;
628
629        let space_name = lookup_space_name(&conn, space_id)?;
630        let extensions_json = serialize_extensions(extensions)?;
631        let result = conn.execute(
632            "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
633             VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
634            params![space_id, name, path.to_string_lossy(), description, extensions_json],
635        );
636
637        match result {
638            Ok(_) => Ok(conn.last_insert_rowid()),
639            Err(Error::SqliteFailure(sqlite_err, _))
640                if sqlite_err.code == ErrorCode::ConstraintViolation =>
641            {
642                Err(KboltError::CollectionAlreadyExists {
643                    name: name.to_string(),
644                    space: space_name,
645                }
646                .into())
647            }
648            Err(err) => Err(err.into()),
649        }
650    }
651
652    pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
653        let conn = self
654            .db
655            .lock()
656            .map_err(|_| CoreError::poisoned("database"))?;
657        let mut stmt = conn.prepare(
658            "SELECT id, space_id, name, path, description, extensions, created, updated
659             FROM collections
660             WHERE space_id = ?1 AND name = ?2",
661        )?;
662
663        let result = stmt.query_row(params![space_id, name], decode_collection_row);
664        match result {
665            Ok(row) => Ok(row),
666            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
667                name: name.to_string(),
668            }
669            .into()),
670            Err(err) => Err(err.into()),
671        }
672    }
673
674    pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
675        let conn = self
676            .db
677            .lock()
678            .map_err(|_| CoreError::poisoned("database"))?;
679        let mut stmt = conn.prepare(
680            "SELECT id, space_id, name, path, description, extensions, created, updated
681             FROM collections
682             WHERE id = ?1",
683        )?;
684
685        let result = stmt.query_row(params![id], decode_collection_row);
686        match result {
687            Ok(row) => Ok(row),
688            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
689                name: format!("id={id}"),
690            }
691            .into()),
692            Err(err) => Err(err.into()),
693        }
694    }
695
696    pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
697        let conn = self
698            .db
699            .lock()
700            .map_err(|_| CoreError::poisoned("database"))?;
701
702        let (sql, params): (&str, Vec<i64>) = match space_id {
703            Some(id) => (
704                "SELECT id, space_id, name, path, description, extensions, created, updated
705                 FROM collections
706                 WHERE space_id = ?1
707                 ORDER BY name ASC",
708                vec![id],
709            ),
710            None => (
711                "SELECT id, space_id, name, path, description, extensions, created, updated
712                 FROM collections
713                 ORDER BY space_id ASC, name ASC",
714                Vec::new(),
715            ),
716        };
717
718        let mut stmt = conn.prepare(sql)?;
719        let rows = if params.is_empty() {
720            stmt.query_map([], decode_collection_row)?
721        } else {
722            stmt.query_map(params![params[0]], decode_collection_row)?
723        };
724        let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
725        Ok(collections)
726    }
727
728    pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
729        let conn = self
730            .db
731            .lock()
732            .map_err(|_| CoreError::poisoned("database"))?;
733        let _space_name = lookup_space_name(&conn, space_id)?;
734
735        let deleted = conn.execute(
736            "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
737            params![space_id, name],
738        )?;
739
740        if deleted == 0 {
741            return Err(KboltError::CollectionNotFound {
742                name: name.to_string(),
743            }
744            .into());
745        }
746
747        Ok(())
748    }
749
750    pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
751        let conn = self
752            .db
753            .lock()
754            .map_err(|_| CoreError::poisoned("database"))?;
755        let space_name = lookup_space_name(&conn, space_id)?;
756        let result = conn.execute(
757            "UPDATE collections
758             SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
759             WHERE space_id = ?2 AND name = ?3",
760            params![new, space_id, old],
761        );
762
763        match result {
764            Ok(0) => Err(KboltError::CollectionNotFound {
765                name: old.to_string(),
766            }
767            .into()),
768            Ok(_) => Ok(()),
769            Err(Error::SqliteFailure(sqlite_err, _))
770                if sqlite_err.code == ErrorCode::ConstraintViolation =>
771            {
772                Err(KboltError::CollectionAlreadyExists {
773                    name: new.to_string(),
774                    space: space_name,
775                }
776                .into())
777            }
778            Err(err) => Err(err.into()),
779        }
780    }
781
782    pub fn update_collection_description(
783        &self,
784        space_id: i64,
785        name: &str,
786        desc: &str,
787    ) -> Result<()> {
788        let conn = self
789            .db
790            .lock()
791            .map_err(|_| CoreError::poisoned("database"))?;
792        let _space_name = lookup_space_name(&conn, space_id)?;
793
794        let updated = conn.execute(
795            "UPDATE collections
796             SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
797             WHERE space_id = ?2 AND name = ?3",
798            params![desc, space_id, name],
799        )?;
800
801        if updated == 0 {
802            return Err(KboltError::CollectionNotFound {
803                name: name.to_string(),
804            }
805            .into());
806        }
807
808        Ok(())
809    }
810
811    pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
812        let conn = self
813            .db
814            .lock()
815            .map_err(|_| CoreError::poisoned("database"))?;
816
817        let updated = conn.execute(
818            "UPDATE collections
819             SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
820             WHERE id = ?1",
821            params![collection_id],
822        )?;
823
824        if updated == 0 {
825            return Err(KboltError::CollectionNotFound {
826                name: format!("id={collection_id}"),
827            }
828            .into());
829        }
830
831        Ok(())
832    }
833
834    pub fn upsert_document(
835        &self,
836        collection_id: i64,
837        path: &str,
838        title: &str,
839        title_source: DocumentTitleSource,
840        hash: &str,
841        modified: &str,
842    ) -> Result<i64> {
843        let conn = self
844            .db
845            .lock()
846            .map_err(|_| CoreError::poisoned("database"))?;
847        let _collection_name = lookup_collection_name(&conn, collection_id)?;
848
849        conn.execute(
850            "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
851             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
852             ON CONFLICT(collection_id, path) DO UPDATE SET
853                 title = excluded.title,
854                 title_source = excluded.title_source,
855                 hash = excluded.hash,
856                 modified = excluded.modified,
857                 active = 1,
858                 deactivated_at = NULL,
859                 fts_dirty = 1",
860            params![
861                collection_id,
862                path,
863                title,
864                title_source.as_sql(),
865                hash,
866                modified
867            ],
868        )?;
869
870        let id: i64 = conn.query_row(
871            "SELECT id FROM documents WHERE collection_id = ?1 AND path = ?2",
872            params![collection_id, path],
873            |row| row.get(0),
874        )?;
875        Ok(id)
876    }
877
878    pub fn get_document_by_path(
879        &self,
880        collection_id: i64,
881        path: &str,
882    ) -> Result<Option<DocumentRow>> {
883        let conn = self
884            .db
885            .lock()
886            .map_err(|_| CoreError::poisoned("database"))?;
887        let _collection_name = lookup_collection_name(&conn, collection_id)?;
888        let mut stmt = conn.prepare(
889            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
890             FROM documents
891             WHERE collection_id = ?1 AND path = ?2",
892        )?;
893
894        let result = stmt.query_row(params![collection_id, path], decode_document_row);
895        match result {
896            Ok(row) => Ok(Some(row)),
897            Err(Error::QueryReturnedNoRows) => Ok(None),
898            Err(err) => Err(err.into()),
899        }
900    }
901
902    pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
903        let conn = self
904            .db
905            .lock()
906            .map_err(|_| CoreError::poisoned("database"))?;
907        let mut stmt = conn.prepare(
908            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
909             FROM documents
910             WHERE id = ?1",
911        )?;
912
913        let result = stmt.query_row(params![id], decode_document_row);
914        match result {
915            Ok(row) => Ok(row),
916            Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
917                path: format!("id={id}"),
918            }
919            .into()),
920            Err(err) => Err(err.into()),
921        }
922    }
923
924    pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
925        if ids.is_empty() {
926            return Ok(Vec::new());
927        }
928
929        let conn = self
930            .db
931            .lock()
932            .map_err(|_| CoreError::poisoned("database"))?;
933
934        let placeholders = vec!["?"; ids.len()].join(", ");
935        let sql = format!(
936            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
937             FROM documents
938             WHERE id IN ({placeholders})"
939        );
940        let mut stmt = conn.prepare(&sql)?;
941        let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
942        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
943        Ok(docs)
944    }
945
946    pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
947        let conn = self
948            .db
949            .lock()
950            .map_err(|_| CoreError::poisoned("database"))?;
951
952        let updated = conn.execute(
953            "UPDATE documents
954             SET modified = ?1,
955                 active = 1,
956                 deactivated_at = NULL
957             WHERE id = ?2",
958            params![modified, doc_id],
959        )?;
960
961        if updated == 0 {
962            return Err(KboltError::DocumentNotFound {
963                path: format!("id={doc_id}"),
964            }
965            .into());
966        }
967
968        Ok(())
969    }
970
971    pub fn list_documents(
972        &self,
973        collection_id: i64,
974        active_only: bool,
975    ) -> Result<Vec<DocumentRow>> {
976        let conn = self
977            .db
978            .lock()
979            .map_err(|_| CoreError::poisoned("database"))?;
980        let _collection_name = lookup_collection_name(&conn, collection_id)?;
981        let active_only = i64::from(active_only);
982        let mut stmt = conn.prepare(
983            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
984             FROM documents
985             WHERE collection_id = ?1
986               AND (?2 = 0 OR active = 1)
987             ORDER BY path ASC",
988        )?;
989        let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
990        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
991        Ok(docs)
992    }
993
994    pub fn list_collection_file_rows(
995        &self,
996        collection_id: i64,
997        active_only: bool,
998    ) -> Result<Vec<FileListRow>> {
999        let conn = self
1000            .db
1001            .lock()
1002            .map_err(|_| CoreError::poisoned("database"))?;
1003        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1004        let active_only = i64::from(active_only);
1005        let mut stmt = conn.prepare(
1006            "SELECT d.id, d.path, d.title, d.hash, d.active,
1007                    COUNT(DISTINCT c.id) AS chunk_count,
1008                    COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1009             FROM documents d
1010             LEFT JOIN chunks c ON c.doc_id = d.id
1011             LEFT JOIN embeddings e ON e.chunk_id = c.id
1012             WHERE d.collection_id = ?1
1013               AND (?2 = 0 OR d.active = 1)
1014             GROUP BY d.id, d.path, d.title, d.hash, d.active
1015             ORDER BY d.path ASC",
1016        )?;
1017        let rows = stmt.query_map(params![collection_id, active_only], |row| {
1018            let chunk_count: i64 = row.get(5)?;
1019            let embedded_chunk_count: i64 = row.get(6)?;
1020            Ok(FileListRow {
1021                doc_id: row.get(0)?,
1022                path: row.get(1)?,
1023                title: row.get(2)?,
1024                hash: row.get(3)?,
1025                active: row.get::<_, i64>(4)? != 0,
1026                chunk_count: chunk_count as usize,
1027                embedded_chunk_count: embedded_chunk_count as usize,
1028            })
1029        })?;
1030        let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1031        Ok(files)
1032    }
1033
1034    pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1035        let conn = self
1036            .db
1037            .lock()
1038            .map_err(|_| CoreError::poisoned("database"))?;
1039
1040        let pattern = format!("{prefix}%");
1041        let mut stmt = conn.prepare(
1042            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1043             FROM documents
1044             WHERE hash LIKE ?1
1045             ORDER BY id ASC",
1046        )?;
1047        let rows = stmt.query_map(params![pattern], decode_document_row)?;
1048        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1049        Ok(docs)
1050    }
1051
1052    pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1053        let conn = self
1054            .db
1055            .lock()
1056            .map_err(|_| CoreError::poisoned("database"))?;
1057
1058        let updated = conn.execute(
1059            "UPDATE documents
1060             SET active = 0,
1061                 deactivated_at = CASE
1062                    WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1063                    ELSE deactivated_at
1064                 END
1065             WHERE id = ?1",
1066            params![doc_id],
1067        )?;
1068
1069        if updated == 0 {
1070            return Err(KboltError::DocumentNotFound {
1071                path: format!("id={doc_id}"),
1072            }
1073            .into());
1074        }
1075
1076        Ok(())
1077    }
1078
1079    pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1080        let conn = self
1081            .db
1082            .lock()
1083            .map_err(|_| CoreError::poisoned("database"))?;
1084
1085        let updated = conn.execute(
1086            "UPDATE documents
1087             SET active = 1, deactivated_at = NULL
1088             WHERE id = ?1",
1089            params![doc_id],
1090        )?;
1091
1092        if updated == 0 {
1093            return Err(KboltError::DocumentNotFound {
1094                path: format!("id={doc_id}"),
1095            }
1096            .into());
1097        }
1098
1099        Ok(())
1100    }
1101
1102    pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1103        let reaped = self.list_reapable_documents(older_than_days)?;
1104        let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1105        self.delete_documents(&doc_ids)?;
1106        Ok(doc_ids)
1107    }
1108
1109    pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1110        self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1111    }
1112
1113    pub fn list_reapable_documents_in_space(
1114        &self,
1115        older_than_days: u32,
1116        space_id: i64,
1117    ) -> Result<Vec<ReapableDocument>> {
1118        self.list_reapable_documents_filtered(
1119            older_than_days,
1120            " AND c.space_id = ?",
1121            vec![SqlValue::Integer(space_id)],
1122        )
1123    }
1124
1125    pub fn list_reapable_documents_in_collections(
1126        &self,
1127        older_than_days: u32,
1128        collection_ids: &[i64],
1129    ) -> Result<Vec<ReapableDocument>> {
1130        if collection_ids.is_empty() {
1131            return Ok(Vec::new());
1132        }
1133
1134        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1135        let clause = format!(" AND d.collection_id IN ({placeholders})");
1136        let params = collection_ids
1137            .iter()
1138            .map(|id| SqlValue::Integer(*id))
1139            .collect::<Vec<_>>();
1140        self.list_reapable_documents_filtered(older_than_days, &clause, params)
1141    }
1142
1143    fn list_reapable_documents_filtered(
1144        &self,
1145        older_than_days: u32,
1146        scope_clause: &str,
1147        scope_params: Vec<SqlValue>,
1148    ) -> Result<Vec<ReapableDocument>> {
1149        let conn = self
1150            .db
1151            .lock()
1152            .map_err(|_| CoreError::poisoned("database"))?;
1153
1154        let modifier = format!("-{} days", older_than_days);
1155        let sql = format!(
1156            "SELECT d.id, s.name
1157             FROM documents d
1158             JOIN collections c ON c.id = d.collection_id
1159             JOIN spaces s ON s.id = c.space_id
1160             WHERE d.active = 0
1161               AND d.deactivated_at IS NOT NULL
1162               AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1163             ORDER BY d.id ASC"
1164        );
1165        let mut stmt = conn.prepare(&sql)?;
1166        let mut params = Vec::with_capacity(scope_params.len() + 1);
1167        params.push(SqlValue::Text(modifier));
1168        params.extend(scope_params);
1169        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1170            Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1171        })?;
1172        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1173        drop(stmt);
1174
1175        let mut documents = Vec::with_capacity(headers.len());
1176        for (doc_id, space_name) in headers {
1177            documents.push(ReapableDocument {
1178                doc_id,
1179                space_name,
1180                chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1181            });
1182        }
1183
1184        Ok(documents)
1185    }
1186
1187    pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1188        if doc_ids.is_empty() {
1189            return Ok(());
1190        }
1191
1192        let conn = self
1193            .db
1194            .lock()
1195            .map_err(|_| CoreError::poisoned("database"))?;
1196
1197        let placeholders = vec!["?"; doc_ids.len()].join(", ");
1198        let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1199        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1200        Ok(())
1201    }
1202
1203    pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1204        if chunks.is_empty() {
1205            return Ok(Vec::new());
1206        }
1207
1208        let conn = self
1209            .db
1210            .lock()
1211            .map_err(|_| CoreError::poisoned("database"))?;
1212        let _doc = lookup_document_id(&conn, doc_id)?;
1213
1214        let tx = conn.unchecked_transaction()?;
1215        let mut stmt = tx.prepare(
1216            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind)
1217             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1218        )?;
1219
1220        let mut ids = Vec::with_capacity(chunks.len());
1221        for chunk in chunks {
1222            stmt.execute(params![
1223                doc_id,
1224                chunk.seq,
1225                chunk.offset as i64,
1226                chunk.length as i64,
1227                chunk.heading,
1228                chunk.kind.as_storage_kind(),
1229            ])?;
1230            ids.push(tx.last_insert_rowid());
1231        }
1232        drop(stmt);
1233        tx.commit()?;
1234        Ok(ids)
1235    }
1236
1237    pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1238        let conn = self
1239            .db
1240            .lock()
1241            .map_err(|_| CoreError::poisoned("database"))?;
1242        let _doc = lookup_document_id(&conn, doc_id)?;
1243
1244        let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1245        let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1246        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1247
1248        conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1249        Ok(chunk_ids)
1250    }
1251
1252    pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1253        let conn = self
1254            .db
1255            .lock()
1256            .map_err(|_| CoreError::poisoned("database"))?;
1257        let _doc = lookup_document_id(&conn, doc_id)?;
1258
1259        let mut stmt = conn.prepare(
1260            "SELECT id, doc_id, seq, offset, length, heading, kind
1261             FROM chunks
1262             WHERE doc_id = ?1
1263             ORDER BY seq ASC",
1264        )?;
1265        let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1266        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1267        Ok(chunks)
1268    }
1269
1270    pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1271        if chunk_ids.is_empty() {
1272            return Ok(Vec::new());
1273        }
1274
1275        let conn = self
1276            .db
1277            .lock()
1278            .map_err(|_| CoreError::poisoned("database"))?;
1279
1280        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1281        let sql = format!(
1282            "SELECT id, doc_id, seq, offset, length, heading, kind
1283             FROM chunks
1284             WHERE id IN ({placeholders})
1285             ORDER BY id ASC"
1286        );
1287        let mut stmt = conn.prepare(&sql)?;
1288        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1289        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1290        Ok(chunks)
1291    }
1292
1293    pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
1294        if entries.is_empty() {
1295            return Ok(());
1296        }
1297
1298        let conn = self
1299            .db
1300            .lock()
1301            .map_err(|_| CoreError::poisoned("database"))?;
1302
1303        let tx = conn.unchecked_transaction()?;
1304        let mut stmt = tx.prepare(
1305            "INSERT INTO embeddings (chunk_id, model, embedded_at)
1306             VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1307             ON CONFLICT(chunk_id, model) DO UPDATE SET
1308               embedded_at = excluded.embedded_at",
1309        )?;
1310
1311        for (chunk_id, model) in entries {
1312            stmt.execute(params![chunk_id, model])?;
1313        }
1314
1315        drop(stmt);
1316        tx.commit()?;
1317        Ok(())
1318    }
1319
1320    pub fn get_unembedded_chunks(
1321        &self,
1322        model: &str,
1323        after_chunk_id: i64,
1324        limit: usize,
1325    ) -> Result<Vec<EmbedRecord>> {
1326        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
1327    }
1328
1329    pub fn get_unembedded_chunks_in_space(
1330        &self,
1331        model: &str,
1332        space_id: i64,
1333        after_chunk_id: i64,
1334        limit: usize,
1335    ) -> Result<Vec<EmbedRecord>> {
1336        self.get_unembedded_chunks_filtered(
1337            model,
1338            after_chunk_id,
1339            limit,
1340            " AND col.space_id = ?",
1341            vec![SqlValue::Integer(space_id)],
1342        )
1343    }
1344
1345    pub fn get_unembedded_chunks_in_collections(
1346        &self,
1347        model: &str,
1348        collection_ids: &[i64],
1349        after_chunk_id: i64,
1350        limit: usize,
1351    ) -> Result<Vec<EmbedRecord>> {
1352        if collection_ids.is_empty() {
1353            return Ok(Vec::new());
1354        }
1355
1356        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1357        let clause = format!(" AND d.collection_id IN ({placeholders})");
1358        let params = collection_ids
1359            .iter()
1360            .map(|id| SqlValue::Integer(*id))
1361            .collect::<Vec<_>>();
1362        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
1363    }
1364
1365    fn get_unembedded_chunks_filtered(
1366        &self,
1367        model: &str,
1368        after_chunk_id: i64,
1369        limit: usize,
1370        scope_clause: &str,
1371        scope_params: Vec<SqlValue>,
1372    ) -> Result<Vec<EmbedRecord>> {
1373        let conn = self
1374            .db
1375            .lock()
1376            .map_err(|_| CoreError::poisoned("database"))?;
1377        let sql_limit = i64::try_from(limit)
1378            .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
1379
1380        let sql = format!(
1381            "SELECT c.id, d.path, col.path, s.name, c.offset, c.length
1382             FROM chunks c
1383             JOIN documents d ON d.id = c.doc_id
1384             JOIN collections col ON col.id = d.collection_id
1385             JOIN spaces s ON s.id = col.space_id
1386             LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
1387             WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
1388             ORDER BY c.id ASC
1389             LIMIT ?"
1390        );
1391        let mut stmt = conn.prepare(&sql)?;
1392        let mut params = Vec::with_capacity(scope_params.len() + 3);
1393        params.push(SqlValue::Text(model.to_string()));
1394        params.push(SqlValue::Integer(after_chunk_id));
1395        params.extend(scope_params);
1396        params.push(SqlValue::Integer(sql_limit));
1397        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1398            let offset_value: i64 = row.get(4)?;
1399            let length_value: i64 = row.get(5)?;
1400            Ok(EmbedRecord {
1401                chunk_id: row.get(0)?,
1402                doc_path: row.get(1)?,
1403                collection_path: PathBuf::from(row.get::<_, String>(2)?),
1404                space_name: row.get(3)?,
1405                offset: offset_value as usize,
1406                length: length_value as usize,
1407            })
1408        })?;
1409
1410        let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1411        Ok(records)
1412    }
1413
1414    pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
1415        let conn = self
1416            .db
1417            .lock()
1418            .map_err(|_| CoreError::poisoned("database"))?;
1419
1420        let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
1421        Ok(deleted)
1422    }
1423
1424    pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
1425        let conn = self
1426            .db
1427            .lock()
1428            .map_err(|_| CoreError::poisoned("database"))?;
1429        let _space_name = lookup_space_name(&conn, space_id)?;
1430
1431        let deleted = conn.execute(
1432            "DELETE FROM embeddings
1433             WHERE chunk_id IN (
1434                 SELECT c.id
1435                 FROM chunks c
1436                 JOIN documents d ON d.id = c.doc_id
1437                 JOIN collections col ON col.id = d.collection_id
1438                 WHERE col.space_id = ?1
1439             )",
1440            params![space_id],
1441        )?;
1442        Ok(deleted)
1443    }
1444
1445    pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
1446        let conn = self
1447            .db
1448            .lock()
1449            .map_err(|_| CoreError::poisoned("database"))?;
1450        let _space_name = lookup_space_name(&conn, space_id)?;
1451
1452        let mut stmt = conn.prepare(
1453            "SELECT DISTINCT e.model
1454             FROM embeddings e
1455             JOIN chunks c ON c.id = e.chunk_id
1456             JOIN documents d ON d.id = c.doc_id
1457             JOIN collections col ON col.id = d.collection_id
1458             WHERE col.space_id = ?1
1459             ORDER BY e.model ASC",
1460        )?;
1461        let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
1462        let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1463        Ok(models)
1464    }
1465
1466    pub fn count_embeddings(&self) -> Result<usize> {
1467        let conn = self
1468            .db
1469            .lock()
1470            .map_err(|_| CoreError::poisoned("database"))?;
1471
1472        let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
1473        Ok(count as usize)
1474    }
1475
1476    pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
1477        if entries.is_empty() {
1478            return Ok(());
1479        }
1480
1481        let space_indexes = self.get_space_indexes(space)?;
1482        with_tantivy_writer(&space_indexes, |writer| {
1483            for entry in entries {
1484                let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
1485                    CoreError::Internal(format!(
1486                        "chunk_id must be non-negative for tantivy indexing: {}",
1487                        entry.chunk_id
1488                    ))
1489                })?;
1490                let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
1491                    CoreError::Internal(format!(
1492                        "doc_id must be non-negative for tantivy indexing: {}",
1493                        entry.doc_id
1494                    ))
1495                })?;
1496
1497                let mut doc = TantivyDocument::default();
1498                doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
1499                doc.add_u64(space_indexes.fields.doc_id, doc_id);
1500                doc.add_text(space_indexes.fields.filepath, &entry.filepath);
1501                if let Some(title) = &entry.semantic_title {
1502                    doc.add_text(space_indexes.fields.title, title);
1503                }
1504                if let Some(heading) = &entry.heading {
1505                    doc.add_text(space_indexes.fields.heading, heading);
1506                }
1507                doc.add_text(space_indexes.fields.body, &entry.body);
1508                writer.add_document(doc)?;
1509            }
1510            Ok(())
1511        })
1512    }
1513
1514    pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
1515        if chunk_ids.is_empty() {
1516            return Ok(());
1517        }
1518
1519        let space_indexes = self.get_space_indexes(space)?;
1520        with_tantivy_writer(&space_indexes, |writer| {
1521            for chunk_id in chunk_ids {
1522                let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
1523                    CoreError::Internal(format!(
1524                        "chunk_id must be non-negative for tantivy delete: {chunk_id}"
1525                    ))
1526                })?;
1527                writer.delete_term(Term::from_field_u64(
1528                    space_indexes.fields.chunk_id,
1529                    chunk_key,
1530                ));
1531            }
1532
1533            Ok(())
1534        })
1535    }
1536
1537    pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
1538        let space_indexes = self.get_space_indexes(space)?;
1539        with_tantivy_writer(&space_indexes, |writer| {
1540            let doc_key = u64::try_from(doc_id).map_err(|_| {
1541                CoreError::Internal(format!(
1542                    "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
1543                ))
1544            })?;
1545            writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
1546            Ok(())
1547        })
1548    }
1549
1550    pub fn query_bm25(
1551        &self,
1552        space: &str,
1553        query: &str,
1554        fields: &[(&str, f32)],
1555        limit: usize,
1556    ) -> Result<Vec<BM25Hit>> {
1557        if limit == 0 {
1558            return Ok(Vec::new());
1559        }
1560
1561        if fields.is_empty() {
1562            return Err(CoreError::Internal(
1563                "bm25 query requires at least one field".to_string(),
1564            ));
1565        }
1566
1567        let space_indexes = self.get_space_indexes(space)?;
1568
1569        let schema = space_indexes.tantivy_index.schema();
1570        let query_fields = fields
1571            .iter()
1572            .map(|(name, boost)| {
1573                let field = resolve_tantivy_field(space_indexes.fields, name)?;
1574                let field_entry = schema.get_field_entry(field);
1575                let index_record_option = field_entry
1576                    .field_type()
1577                    .get_index_record_option()
1578                    .ok_or_else(|| {
1579                        CoreError::Internal(format!(
1580                            "bm25 field '{}' is not indexed",
1581                            field_entry.name()
1582                        ))
1583                    })?;
1584                Ok(Bm25FieldSpec {
1585                    field,
1586                    boost: *boost,
1587                    index_record_option,
1588                })
1589            })
1590            .collect::<Result<Vec<_>>>()?;
1591        let Some(parsed_query) =
1592            build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
1593        else {
1594            return Ok(Vec::new());
1595        };
1596        let reader = space_indexes.tantivy_index.reader()?;
1597        reader.reload()?;
1598        let searcher = reader.searcher();
1599        let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
1600
1601        let mut hits = Vec::with_capacity(docs.len());
1602        for (score, address) in docs {
1603            let doc = searcher.doc::<TantivyDocument>(address)?;
1604            let chunk_id = doc
1605                .get_first(space_indexes.fields.chunk_id)
1606                .and_then(|value| value.as_u64())
1607                .ok_or_else(|| {
1608                    CoreError::Internal("tantivy hit missing chunk_id field".to_string())
1609                })?;
1610            hits.push(BM25Hit {
1611                chunk_id: chunk_id as i64,
1612                score,
1613            });
1614        }
1615
1616        Ok(hits)
1617    }
1618
1619    pub fn commit_tantivy(&self, space: &str) -> Result<()> {
1620        let space_indexes = self.get_space_indexes(space)?;
1621        let mut writer = space_indexes
1622            .tantivy_writer
1623            .lock()
1624            .map_err(|_| CoreError::poisoned("tantivy writer"))?;
1625        let Some(writer) = writer.as_mut() else {
1626            return Ok(());
1627        };
1628        writer.commit()?;
1629        Ok(())
1630    }
1631
1632    pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
1633        self.batch_insert_usearch(space, &[(key, vector)])
1634    }
1635
1636    pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
1637        if entries.is_empty() {
1638            return Ok(());
1639        }
1640
1641        let space_indexes = self.get_space_indexes(space)?;
1642
1643        let expected_dimensions = entries[0].1.len();
1644        if expected_dimensions == 0 {
1645            return Err(CoreError::Internal(
1646                "cannot insert empty vector into usearch index".to_string(),
1647            ));
1648        }
1649        for (_, vector) in entries {
1650            if vector.len() != expected_dimensions {
1651                return Err(CoreError::Internal(format!(
1652                    "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
1653                    vector.len()
1654                )));
1655            }
1656        }
1657
1658        let mut index = space_indexes
1659            .usearch_index
1660            .write()
1661            .map_err(|_| CoreError::poisoned("usearch index"))?;
1662        ensure_usearch_dimensions(&mut index, expected_dimensions)?;
1663        let target_capacity = index.size().saturating_add(entries.len());
1664        index.reserve(target_capacity).map_err(|err| {
1665            CoreError::Internal(format!(
1666                "usearch reserve failed for {target_capacity} items: {err}"
1667            ))
1668        })?;
1669
1670        for (key, vector) in entries {
1671            let key = u64::try_from(*key).map_err(|_| {
1672                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
1673            })?;
1674            index
1675                .add::<f32>(key, vector)
1676                .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
1677        }
1678
1679        save_usearch_index(&index, &space_indexes.usearch_path)?;
1680        Ok(())
1681    }
1682
1683    pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
1684        if keys.is_empty() {
1685            return Ok(());
1686        }
1687
1688        let space_indexes = self.get_space_indexes(space)?;
1689        let index = space_indexes
1690            .usearch_index
1691            .write()
1692            .map_err(|_| CoreError::poisoned("usearch index"))?;
1693
1694        for key in keys {
1695            let key = u64::try_from(*key).map_err(|_| {
1696                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
1697            })?;
1698            index
1699                .remove(key)
1700                .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
1701        }
1702
1703        save_usearch_index(&index, &space_indexes.usearch_path)?;
1704        Ok(())
1705    }
1706
1707    pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
1708        if limit == 0 {
1709            return Ok(Vec::new());
1710        }
1711        if vector.is_empty() {
1712            return Err(CoreError::Internal(
1713                "cannot query usearch with empty vector".to_string(),
1714            ));
1715        }
1716
1717        let space_indexes = self.get_space_indexes(space)?;
1718        let index = space_indexes
1719            .usearch_index
1720            .read()
1721            .map_err(|_| CoreError::poisoned("usearch index"))?;
1722
1723        if index.size() == 0 {
1724            return Ok(Vec::new());
1725        }
1726        if vector.len() != index.dimensions() {
1727            return Err(CoreError::Internal(format!(
1728                "query vector dimension mismatch: expected {}, got {}",
1729                index.dimensions(),
1730                vector.len()
1731            )));
1732        }
1733
1734        let matches = index
1735            .search::<f32>(vector, limit)
1736            .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?;
1737        let hits = matches
1738            .keys
1739            .into_iter()
1740            .zip(matches.distances)
1741            .map(|(key, distance)| DenseHit {
1742                chunk_id: key as i64,
1743                distance,
1744            })
1745            .collect();
1746        Ok(hits)
1747    }
1748
1749    pub fn count_usearch(&self, space: &str) -> Result<usize> {
1750        let space_indexes = self.get_space_indexes(space)?;
1751        let index = space_indexes
1752            .usearch_index
1753            .read()
1754            .map_err(|_| CoreError::poisoned("usearch index"))?;
1755        Ok(index.size())
1756    }
1757
1758    pub fn clear_usearch(&self, space: &str) -> Result<()> {
1759        let space_indexes = self.get_space_indexes(space)?;
1760        let index = space_indexes
1761            .usearch_index
1762            .write()
1763            .map_err(|_| CoreError::poisoned("usearch index"))?;
1764        index
1765            .reset()
1766            .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
1767        std::fs::File::create(&space_indexes.usearch_path)?;
1768        Ok(())
1769    }
1770
1771    pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
1772        self.get_fts_dirty_documents_filtered("", Vec::new())
1773    }
1774
1775    pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
1776        self.get_fts_dirty_documents_filtered(
1777            " AND c.space_id = ?",
1778            vec![SqlValue::Integer(space_id)],
1779        )
1780    }
1781
1782    pub fn get_fts_dirty_documents_in_collections(
1783        &self,
1784        collection_ids: &[i64],
1785    ) -> Result<Vec<FtsDirtyRecord>> {
1786        if collection_ids.is_empty() {
1787            return Ok(Vec::new());
1788        }
1789
1790        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1791        let clause = format!(" AND d.collection_id IN ({placeholders})");
1792        let params = collection_ids
1793            .iter()
1794            .map(|id| SqlValue::Integer(*id))
1795            .collect::<Vec<_>>();
1796        self.get_fts_dirty_documents_filtered(&clause, params)
1797    }
1798
1799    fn get_fts_dirty_documents_filtered(
1800        &self,
1801        scope_clause: &str,
1802        scope_params: Vec<SqlValue>,
1803    ) -> Result<Vec<FtsDirtyRecord>> {
1804        let conn = self
1805            .db
1806            .lock()
1807            .map_err(|_| CoreError::poisoned("database"))?;
1808
1809        let sql = format!(
1810            "SELECT d.id, d.path, d.title, d.title_source, d.hash, c.path, s.name
1811             FROM documents d
1812             JOIN collections c ON c.id = d.collection_id
1813             JOIN spaces s ON s.id = c.space_id
1814             WHERE d.fts_dirty = 1{scope_clause}
1815             ORDER BY d.id ASC"
1816        );
1817        let mut stmt = conn.prepare(&sql)?;
1818        let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
1819            Ok((
1820                row.get::<_, i64>(0)?,
1821                row.get::<_, String>(1)?,
1822                row.get::<_, String>(2)?,
1823                row.get::<_, String>(3)?,
1824                row.get::<_, String>(4)?,
1825                row.get::<_, String>(5)?,
1826                row.get::<_, String>(6)?,
1827            ))
1828        })?;
1829        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1830        drop(stmt);
1831
1832        let mut records = Vec::with_capacity(headers.len());
1833        for (
1834            doc_id,
1835            doc_path,
1836            doc_title,
1837            doc_title_source,
1838            doc_hash,
1839            collection_path,
1840            space_name,
1841        ) in headers
1842        {
1843            let chunks = load_chunks_for_doc(&conn, doc_id)?;
1844            records.push(FtsDirtyRecord {
1845                doc_id,
1846                doc_path,
1847                doc_title,
1848                doc_title_source: DocumentTitleSource::from_sql(&doc_title_source)?,
1849                doc_hash,
1850                collection_path: PathBuf::from(collection_path),
1851                space_name,
1852                chunks,
1853            });
1854        }
1855
1856        Ok(records)
1857    }
1858
1859    pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
1860        if doc_ids.is_empty() {
1861            return Ok(());
1862        }
1863
1864        let conn = self
1865            .db
1866            .lock()
1867            .map_err(|_| CoreError::poisoned("database"))?;
1868
1869        let placeholders = vec!["?"; doc_ids.len()].join(", ");
1870        let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
1871        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1872        Ok(())
1873    }
1874
1875    pub fn count_documents_in_collection(
1876        &self,
1877        collection_id: i64,
1878        active_only: bool,
1879    ) -> Result<usize> {
1880        let conn = self
1881            .db
1882            .lock()
1883            .map_err(|_| CoreError::poisoned("database"))?;
1884        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1885        let active_only = i64::from(active_only);
1886        query_count(
1887            &conn,
1888            "SELECT COUNT(*)
1889             FROM documents
1890             WHERE collection_id = ?1
1891               AND (?2 = 0 OR active = 1)",
1892            params![collection_id, active_only],
1893        )
1894    }
1895
1896    pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
1897        let conn = self
1898            .db
1899            .lock()
1900            .map_err(|_| CoreError::poisoned("database"))?;
1901        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1902
1903        query_count(
1904            &conn,
1905            "SELECT COUNT(*)
1906             FROM chunks c
1907             JOIN documents d ON d.id = c.doc_id
1908             WHERE d.collection_id = ?1",
1909            params![collection_id],
1910        )
1911    }
1912
1913    pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
1914        let conn = self
1915            .db
1916            .lock()
1917            .map_err(|_| CoreError::poisoned("database"))?;
1918        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1919
1920        query_count(
1921            &conn,
1922            "SELECT COUNT(DISTINCT e.chunk_id)
1923             FROM embeddings e
1924             JOIN chunks c ON c.id = e.chunk_id
1925             JOIN documents d ON d.id = c.doc_id
1926             WHERE d.collection_id = ?1",
1927            params![collection_id],
1928        )
1929    }
1930
1931    pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
1932        let conn = self
1933            .db
1934            .lock()
1935            .map_err(|_| CoreError::poisoned("database"))?;
1936
1937        match space_id {
1938            Some(space_id) => {
1939                let _space_name = lookup_space_name(&conn, space_id)?;
1940                query_count(
1941                    &conn,
1942                    "SELECT COUNT(*)
1943                     FROM documents d
1944                     JOIN collections c ON c.id = d.collection_id
1945                     WHERE c.space_id = ?1",
1946                    params![space_id],
1947                )
1948            }
1949            None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
1950        }
1951    }
1952
1953    pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
1954        let conn = self
1955            .db
1956            .lock()
1957            .map_err(|_| CoreError::poisoned("database"))?;
1958
1959        match space_id {
1960            Some(space_id) => {
1961                let _space_name = lookup_space_name(&conn, space_id)?;
1962                query_count(
1963                    &conn,
1964                    "SELECT COUNT(*)
1965                     FROM chunks c
1966                     JOIN documents d ON d.id = c.doc_id
1967                     JOIN collections col ON col.id = d.collection_id
1968                     WHERE col.space_id = ?1",
1969                    params![space_id],
1970                )
1971            }
1972            None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
1973        }
1974    }
1975
1976    pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
1977        let conn = self
1978            .db
1979            .lock()
1980            .map_err(|_| CoreError::poisoned("database"))?;
1981
1982        match space_id {
1983            Some(space_id) => {
1984                let _space_name = lookup_space_name(&conn, space_id)?;
1985                query_count(
1986                    &conn,
1987                    "SELECT COUNT(DISTINCT e.chunk_id)
1988                     FROM embeddings e
1989                     JOIN chunks c ON c.id = e.chunk_id
1990                     JOIN documents d ON d.id = c.doc_id
1991                     JOIN collections col ON col.id = d.collection_id
1992                     WHERE col.space_id = ?1",
1993                    params![space_id],
1994                )
1995            }
1996            None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
1997        }
1998    }
1999
2000    pub fn disk_usage(&self) -> Result<DiskUsage> {
2001        let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2002
2003        let mut tantivy_bytes = 0_u64;
2004        let mut usearch_bytes = 0_u64;
2005        let spaces_dir = self.cache_dir.join(SPACES_DIR);
2006        if spaces_dir.exists() {
2007            for entry in std::fs::read_dir(&spaces_dir)? {
2008                let space_dir = entry?.path();
2009                if !space_dir.is_dir() {
2010                    continue;
2011                }
2012
2013                tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2014                usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2015            }
2016        }
2017
2018        let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2019        let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2020
2021        Ok(DiskUsage {
2022            sqlite_bytes,
2023            tantivy_bytes,
2024            usearch_bytes,
2025            models_bytes,
2026            total_bytes,
2027        })
2028    }
2029
2030    fn unload_space(&self, name: &str) -> Result<()> {
2031        let mut spaces = self
2032            .spaces
2033            .write()
2034            .map_err(|_| CoreError::poisoned("spaces"))?;
2035        spaces.remove(name);
2036        Ok(())
2037    }
2038
2039    fn remove_space_artifacts(&self, name: &str) -> Result<()> {
2040        let space_root = self.space_root_path(name);
2041        if space_root.exists() {
2042            std::fs::remove_dir_all(space_root)?;
2043        }
2044        Ok(())
2045    }
2046
2047    fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
2048        let old_root = self.space_root_path(old);
2049        let new_root = self.space_root_path(new);
2050        if !old_root.exists() {
2051            return Ok(());
2052        }
2053
2054        if new_root.exists() {
2055            return Err(CoreError::Internal(format!(
2056                "cannot rename space artifacts: destination already exists: {}",
2057                new_root.display()
2058            )));
2059        }
2060
2061        if let Some(parent) = new_root.parent() {
2062            std::fs::create_dir_all(parent)?;
2063        }
2064        std::fs::rename(old_root, new_root)?;
2065        Ok(())
2066    }
2067
2068    fn space_root_path(&self, name: &str) -> PathBuf {
2069        self.cache_dir.join(SPACES_DIR).join(name)
2070    }
2071
2072    fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
2073        let space_root = self.space_root_path(name);
2074        let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
2075        let usearch_path = space_root.join(USEARCH_FILENAME);
2076        (tantivy_dir, usearch_path)
2077    }
2078
2079    fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
2080        self.open_space(name)?;
2081        let spaces = self
2082            .spaces
2083            .read()
2084            .map_err(|_| CoreError::poisoned("spaces"))?;
2085        spaces.get(name).cloned().ok_or_else(|| {
2086            KboltError::SpaceNotFound {
2087                name: name.to_string(),
2088            }
2089            .into()
2090        })
2091    }
2092}
2093
2094fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
2095    let result = conn.query_row(
2096        "SELECT name FROM spaces WHERE id = ?1",
2097        params![space_id],
2098        |row| row.get::<_, String>(0),
2099    );
2100    match result {
2101        Ok(name) => Ok(name),
2102        Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
2103            name: format!("id={space_id}"),
2104        }
2105        .into()),
2106        Err(err) => Err(err.into()),
2107    }
2108}
2109
2110fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
2111    let result = conn.query_row(
2112        "SELECT name FROM collections WHERE id = ?1",
2113        params![collection_id],
2114        |row| row.get::<_, String>(0),
2115    );
2116    match result {
2117        Ok(name) => Ok(name),
2118        Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
2119            name: format!("id={collection_id}"),
2120        }
2121        .into()),
2122        Err(err) => Err(err.into()),
2123    }
2124}
2125
2126fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
2127    let result = conn.query_row(
2128        "SELECT id FROM documents WHERE id = ?1",
2129        params![doc_id],
2130        |row| row.get::<_, i64>(0),
2131    );
2132    match result {
2133        Ok(id) => Ok(id),
2134        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
2135            path: format!("id={doc_id}"),
2136        }
2137        .into()),
2138        Err(err) => Err(err.into()),
2139    }
2140}
2141
2142fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
2143    let mut stmt = conn.prepare(
2144        "SELECT id, doc_id, seq, offset, length, heading, kind
2145         FROM chunks
2146         WHERE doc_id = ?1
2147         ORDER BY seq ASC",
2148    )?;
2149    let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
2150    let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2151    Ok(chunks)
2152}
2153
2154fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
2155    let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
2156    let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
2157    let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2158    Ok(chunk_ids)
2159}
2160
2161fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
2162    let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
2163    Ok(count as usize)
2164}
2165
2166fn file_size_or_zero(path: &Path) -> Result<u64> {
2167    if !path.exists() {
2168        return Ok(0);
2169    }
2170
2171    let metadata = std::fs::metadata(path)?;
2172    if metadata.is_file() {
2173        Ok(metadata.len())
2174    } else {
2175        Ok(0)
2176    }
2177}
2178
2179fn dir_size_or_zero(path: &Path) -> Result<u64> {
2180    if !path.exists() {
2181        return Ok(0);
2182    }
2183
2184    let mut total = 0_u64;
2185    for entry in std::fs::read_dir(path)? {
2186        let entry = entry?;
2187        let child_path = entry.path();
2188        let metadata = std::fs::symlink_metadata(&child_path)?;
2189        if metadata.is_file() {
2190            total += metadata.len();
2191        } else if metadata.is_dir() {
2192            total += dir_size_or_zero(&child_path)?;
2193        }
2194    }
2195
2196    Ok(total)
2197}
2198
2199fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
2200    let meta_path = path.join("meta.json");
2201    if meta_path.exists() {
2202        return Ok(Index::open_in_dir(path)?);
2203    }
2204
2205    Ok(Index::create_in_dir(path, tantivy_schema())?)
2206}
2207
2208fn with_tantivy_writer<T>(
2209    space_indexes: &SpaceIndexes,
2210    f: impl FnOnce(&mut IndexWriter) -> Result<T>,
2211) -> Result<T> {
2212    let mut writer = space_indexes
2213        .tantivy_writer
2214        .lock()
2215        .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2216
2217    if writer.is_none() {
2218        *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
2219    }
2220
2221    let writer = writer
2222        .as_mut()
2223        .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
2224    f(writer)
2225}
2226
2227fn ensure_documents_title_source_column(conn: &Connection) -> Result<()> {
2228    let mut stmt = conn.prepare("PRAGMA table_info(documents)")?;
2229    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2230    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2231    drop(stmt);
2232
2233    if columns.iter().any(|column| column == "title_source") {
2234        return Ok(());
2235    }
2236
2237    conn.execute(
2238        "ALTER TABLE documents ADD COLUMN title_source TEXT NOT NULL DEFAULT 'extracted'",
2239        [],
2240    )?;
2241    Ok(())
2242}
2243
2244fn build_literal_bm25_query(
2245    index: &Index,
2246    fields: &[Bm25FieldSpec],
2247    query: &str,
2248) -> Result<Option<Box<dyn Query>>> {
2249    let mut clauses = Vec::new();
2250    for field in fields {
2251        for token in analyzed_terms_for_field(index, field.field, query)? {
2252            let term_query: Box<dyn Query> = Box::new(TermQuery::new(
2253                Term::from_field_text(field.field, &token),
2254                field.index_record_option,
2255            ));
2256            let query = if (field.boost - 1.0).abs() > f32::EPSILON {
2257                Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
2258            } else {
2259                term_query
2260            };
2261            clauses.push((Occur::Should, query));
2262        }
2263    }
2264
2265    if clauses.is_empty() {
2266        Ok(None)
2267    } else {
2268        Ok(Some(Box::new(BooleanQuery::new(clauses))))
2269    }
2270}
2271
2272fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
2273    let mut analyzer = index.tokenizer_for_field(field)?;
2274    let mut stream = analyzer.token_stream(query);
2275    let mut terms = Vec::new();
2276    let mut seen = HashSet::new();
2277    while let Some(token) = stream.next() {
2278        if token.text.is_empty() {
2279            continue;
2280        }
2281        let text = token.text.clone();
2282        if seen.insert(text.clone()) {
2283            terms.push(text);
2284        }
2285    }
2286    Ok(terms)
2287}
2288
2289fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
2290    let options = IndexOptions {
2291        dimensions,
2292        metric: MetricKind::Cos,
2293        quantization: ScalarKind::F32,
2294        connectivity: 16,
2295        expansion_add: 200,
2296        expansion_search: 100,
2297        ..IndexOptions::default()
2298    };
2299    usearch::Index::new(&options)
2300        .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
2301}
2302
2303fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
2304    let index = new_usearch_index(256)?;
2305    let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
2306    if file_size > 0 {
2307        let path = path
2308            .to_str()
2309            .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
2310        index
2311            .load(path)
2312            .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
2313    }
2314    Ok(index)
2315}
2316
2317fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
2318    if index.size() == 0 && index.dimensions() != expected_dimensions {
2319        *index = new_usearch_index(expected_dimensions)?;
2320        return Ok(());
2321    }
2322
2323    if index.dimensions() != expected_dimensions {
2324        return Err(CoreError::Internal(format!(
2325            "usearch vector dimension mismatch: index expects {}, got {}",
2326            index.dimensions(),
2327            expected_dimensions
2328        )));
2329    }
2330    Ok(())
2331}
2332
2333fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
2334    if index.size() == 0 {
2335        std::fs::File::create(path)?;
2336        return Ok(());
2337    }
2338
2339    let path = path
2340        .to_str()
2341        .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
2342    index
2343        .save(path)
2344        .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
2345    Ok(())
2346}
2347
2348fn tantivy_schema() -> tantivy::schema::Schema {
2349    let mut builder = tantivy::schema::Schema::builder();
2350    builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
2351    builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
2352    builder.add_text_field("filepath", TEXT | STORED);
2353    builder.add_text_field("title", TEXT | STORED);
2354    builder.add_text_field("heading", TEXT | STORED);
2355    builder.add_text_field("body", TEXT);
2356    builder.build()
2357}
2358
2359fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
2360    Ok(TantivyFields {
2361        chunk_id: schema.get_field("chunk_id").map_err(|_| {
2362            CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
2363        })?,
2364        doc_id: schema
2365            .get_field("doc_id")
2366            .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
2367        filepath: schema.get_field("filepath").map_err(|_| {
2368            CoreError::Internal("tantivy schema missing field: filepath".to_string())
2369        })?,
2370        title: schema
2371            .get_field("title")
2372            .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
2373        heading: schema.get_field("heading").map_err(|_| {
2374            CoreError::Internal("tantivy schema missing field: heading".to_string())
2375        })?,
2376        body: schema
2377            .get_field("body")
2378            .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
2379    })
2380}
2381
2382fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
2383    match name {
2384        "chunk_id" => Ok(fields.chunk_id),
2385        "doc_id" => Ok(fields.doc_id),
2386        "filepath" => Ok(fields.filepath),
2387        "title" => Ok(fields.title),
2388        "heading" => Ok(fields.heading),
2389        "body" => Ok(fields.body),
2390        other => Err(CoreError::Internal(format!(
2391            "unsupported tantivy field: {other}"
2392        ))),
2393    }
2394}
2395
2396fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
2397    match extensions {
2398        None => Ok(None),
2399        Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
2400    }
2401}
2402
2403fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
2404    match raw {
2405        None => Ok(None),
2406        Some(json) => serde_json::from_str::<Vec<String>>(&json)
2407            .map(Some)
2408            .map_err(Into::into),
2409    }
2410}
2411
2412fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
2413    Ok(SpaceRow {
2414        id: row.get(0)?,
2415        name: row.get(1)?,
2416        description: row.get(2)?,
2417        created: row.get(3)?,
2418    })
2419}
2420
2421fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
2422    let raw_extensions: Option<String> = row.get(5)?;
2423    let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
2424        Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
2425    })?;
2426    Ok(CollectionRow {
2427        id: row.get(0)?,
2428        space_id: row.get(1)?,
2429        name: row.get(2)?,
2430        path: PathBuf::from(row.get::<_, String>(3)?),
2431        description: row.get(4)?,
2432        extensions,
2433        created: row.get(6)?,
2434        updated: row.get(7)?,
2435    })
2436}
2437
2438fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
2439    let raw_title_source: String = row.get(4)?;
2440    let title_source = DocumentTitleSource::from_sql(&raw_title_source).map_err(|err| {
2441        Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(err))
2442    })?;
2443    let active_value: i64 = row.get(7)?;
2444    let fts_dirty_value: i64 = row.get(9)?;
2445    Ok(DocumentRow {
2446        id: row.get(0)?,
2447        collection_id: row.get(1)?,
2448        path: row.get(2)?,
2449        title: row.get(3)?,
2450        title_source,
2451        hash: row.get(5)?,
2452        modified: row.get(6)?,
2453        active: active_value != 0,
2454        deactivated_at: row.get(8)?,
2455        fts_dirty: fts_dirty_value != 0,
2456    })
2457}
2458
2459fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
2460    let offset_value: i64 = row.get(3)?;
2461    let length_value: i64 = row.get(4)?;
2462    let kind_raw: String = row.get(6)?;
2463    let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
2464        Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(err))
2465    })?;
2466    Ok(ChunkRow {
2467        id: row.get(0)?,
2468        doc_id: row.get(1)?,
2469        seq: row.get(2)?,
2470        offset: offset_value as usize,
2471        length: length_value as usize,
2472        heading: row.get(5)?,
2473        kind,
2474    })
2475}
2476
2477#[cfg(test)]
2478mod tests;