Skip to main content

kbolt_core/storage/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12    BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29    Immediate,
30    Deferred,
31}
32
33pub struct Storage {
34    db: Mutex<Connection>,
35    cache_dir: PathBuf,
36    spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37    dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42    pub id: i64,
43    pub name: String,
44    pub description: Option<String>,
45    pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50    pub id: i64,
51    pub space_id: i64,
52    pub name: String,
53    pub path: PathBuf,
54    pub description: Option<String>,
55    pub extensions: Option<Vec<String>>,
56    pub created: String,
57    pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62    pub id: i64,
63    pub collection_id: i64,
64    pub path: String,
65    pub title: String,
66    pub title_source: DocumentTitleSource,
67    pub hash: String,
68    pub modified: String,
69    pub active: bool,
70    pub deactivated_at: Option<String>,
71    pub fts_dirty: bool,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct FileListRow {
76    pub doc_id: i64,
77    pub path: String,
78    pub title: String,
79    pub hash: String,
80    pub active: bool,
81    pub chunk_count: usize,
82    pub embedded_chunk_count: usize,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChunkRow {
87    pub id: i64,
88    pub doc_id: i64,
89    pub seq: i32,
90    pub offset: usize,
91    pub length: usize,
92    pub heading: Option<String>,
93    pub kind: FinalChunkKind,
94    pub retrieval_prefix: Option<String>,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ChunkInsert {
99    pub seq: i32,
100    pub offset: usize,
101    pub length: usize,
102    pub heading: Option<String>,
103    pub kind: FinalChunkKind,
104    pub retrieval_prefix: Option<String>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DocumentTextRow {
109    pub doc_id: i64,
110    pub extractor_key: String,
111    pub source_hash: String,
112    pub text_hash: String,
113    pub generation_key: String,
114    pub text: String,
115    pub created: String,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ChunkTextRow {
120    pub chunk: ChunkRow,
121    pub extractor_key: String,
122    pub text: String,
123}
124
125pub struct DocumentGenerationReplace<'a> {
126    pub collection_id: i64,
127    pub path: &'a str,
128    pub title: &'a str,
129    pub title_source: DocumentTitleSource,
130    pub hash: &'a str,
131    pub modified: &'a str,
132    pub extractor_key: &'a str,
133    pub source_hash: &'a str,
134    pub text_hash: &'a str,
135    pub generation_key: &'a str,
136    pub text: &'a str,
137    pub chunks: &'a [ChunkInsert],
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct DocumentGenerationReplaceResult {
142    pub doc_id: i64,
143    pub old_chunk_ids: Vec<i64>,
144    pub chunk_ids: Vec<i64>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct EmbedRecord {
149    pub chunk: ChunkRow,
150    pub doc_path: String,
151    pub collection_path: PathBuf,
152    pub space_name: String,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FtsDirtyRecord {
157    pub doc_id: i64,
158    pub doc_path: String,
159    pub doc_title: String,
160    pub doc_title_source: DocumentTitleSource,
161    pub doc_hash: String,
162    pub collection_path: PathBuf,
163    pub space_name: String,
164    pub chunks: Vec<ChunkRow>,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct ReapableDocument {
169    pub doc_id: i64,
170    pub space_name: String,
171    pub chunk_ids: Vec<i64>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TantivyEntry {
176    pub chunk_id: i64,
177    pub doc_id: i64,
178    pub filepath: String,
179    pub semantic_title: Option<String>,
180    pub heading: Option<String>,
181    pub body: String,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum DocumentTitleSource {
186    Extracted,
187    FilenameFallback,
188}
189
190impl DocumentTitleSource {
191    pub fn as_sql(self) -> &'static str {
192        match self {
193            Self::Extracted => "extracted",
194            Self::FilenameFallback => "filename_fallback",
195        }
196    }
197
198    fn from_sql(raw: &str) -> std::result::Result<Self, KboltError> {
199        match raw {
200            "extracted" => Ok(Self::Extracted),
201            "filename_fallback" => Ok(Self::FilenameFallback),
202            other => Err(KboltError::InvalidInput(format!(
203                "invalid stored document title source: {other}"
204            ))),
205        }
206    }
207
208    pub fn semantic_title(self, title: &str) -> Option<&str> {
209        matches!(self, Self::Extracted)
210            .then_some(title.trim())
211            .filter(|title| !title.is_empty())
212    }
213}
214
215#[derive(Debug, Clone, PartialEq)]
216pub struct BM25Hit {
217    pub chunk_id: i64,
218    pub score: f32,
219}
220
221#[derive(Debug, Clone, PartialEq)]
222pub struct DenseHit {
223    pub chunk_id: i64,
224    pub distance: f32,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub struct SearchScopeSummary {
229    pub document_ids: Vec<i64>,
230    pub chunk_count: usize,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub enum SpaceResolution {
235    Found(SpaceRow),
236    Ambiguous(Vec<String>),
237    NotFound,
238}
239
240struct SpaceIndexes {
241    _tantivy_dir: PathBuf,
242    usearch_path: PathBuf,
243    tantivy_index: Index,
244    tantivy_reader: IndexReader,
245    tantivy_writer: Mutex<Option<IndexWriter>>,
246    usearch_index: RwLock<usearch::Index>,
247    fields: TantivyFields,
248}
249
250#[derive(Debug, Clone, Copy)]
251struct TantivyFields {
252    chunk_id: Field,
253    doc_id: Field,
254    filepath: Field,
255    title: Field,
256    heading: Field,
257    body: Field,
258}
259
260#[derive(Debug, Clone, Copy)]
261struct Bm25FieldSpec {
262    field: Field,
263    boost: f32,
264    index_record_option: IndexRecordOption,
265}
266
267impl Storage {
268    pub fn new(cache_dir: &Path) -> Result<Self> {
269        std::fs::create_dir_all(cache_dir)?;
270        let db_path = cache_dir.join(DB_FILE);
271        let conn = Connection::open(db_path)?;
272        reject_incompatible_legacy_index(&conn)?;
273        conn.execute_batch(
274            r#"
275PRAGMA foreign_keys = ON;
276PRAGMA journal_mode = WAL;
277
278CREATE TABLE IF NOT EXISTS spaces (
279    id          INTEGER PRIMARY KEY,
280    name        TEXT NOT NULL UNIQUE,
281    description TEXT,
282    created     TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
283);
284
285CREATE TABLE IF NOT EXISTS collections (
286    id          INTEGER PRIMARY KEY,
287    space_id    INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
288    name        TEXT NOT NULL,
289    path        TEXT NOT NULL,
290    description TEXT,
291    extensions  TEXT,
292    created     TEXT NOT NULL,
293    updated     TEXT NOT NULL,
294    UNIQUE(space_id, name)
295);
296
297CREATE TABLE IF NOT EXISTS documents (
298    id              INTEGER PRIMARY KEY,
299    collection_id   INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
300    path            TEXT NOT NULL,
301    title           TEXT NOT NULL,
302    title_source    TEXT NOT NULL DEFAULT 'extracted',
303    hash            TEXT NOT NULL,
304    modified        TEXT NOT NULL,
305    active          INTEGER NOT NULL DEFAULT 1,
306    deactivated_at  TEXT,
307    fts_dirty       INTEGER NOT NULL DEFAULT 0,
308    UNIQUE(collection_id, path)
309);
310CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
311CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
312CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
313
314CREATE TABLE IF NOT EXISTS chunks (
315    id       INTEGER PRIMARY KEY,
316    doc_id   INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
317    seq      INTEGER NOT NULL,
318    offset   INTEGER NOT NULL,
319    length   INTEGER NOT NULL,
320    heading  TEXT,
321    kind     TEXT NOT NULL DEFAULT 'section',
322    retrieval_prefix TEXT,
323    UNIQUE(doc_id, seq)
324);
325CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
326
327CREATE TABLE IF NOT EXISTS embeddings (
328    chunk_id    INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
329    model       TEXT NOT NULL,
330    embedded_at TEXT NOT NULL,
331    PRIMARY KEY (chunk_id, model)
332);
333
334CREATE TABLE IF NOT EXISTS document_texts (
335    doc_id            INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
336    extractor_key     TEXT NOT NULL,
337    source_hash       TEXT NOT NULL,
338    text_hash         TEXT NOT NULL,
339    generation_key    TEXT NOT NULL DEFAULT '',
340    text              TEXT NOT NULL,
341    created           TEXT NOT NULL
342);
343"#,
344        )?;
345        ensure_documents_title_source_column(&conn)?;
346        ensure_document_texts_generation_key_column(&conn)?;
347        ensure_chunks_retrieval_prefix_column(&conn)?;
348        ensure_schema_version(&conn)?;
349
350        conn.execute(
351            "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
352            params![DEFAULT_SPACE_NAME],
353        )?;
354
355        let storage = Self {
356            db: Mutex::new(conn),
357            cache_dir: cache_dir.to_path_buf(),
358            spaces: RwLock::new(HashMap::new()),
359            dirty_usearch_spaces: Mutex::new(HashSet::new()),
360        };
361        storage.open_space(DEFAULT_SPACE_NAME)?;
362
363        Ok(storage)
364    }
365
366    pub fn open_space(&self, name: &str) -> Result<()> {
367        let _space = self.get_space(name)?;
368
369        {
370            let spaces = self
371                .spaces
372                .read()
373                .map_err(|_| CoreError::poisoned("spaces"))?;
374            if spaces.contains_key(name) {
375                return Ok(());
376            }
377        }
378
379        let (tantivy_dir, usearch_path) = self.space_paths(name);
380        std::fs::create_dir_all(&tantivy_dir)?;
381        std::fs::OpenOptions::new()
382            .create(true)
383            .append(true)
384            .open(&usearch_path)?;
385
386        let mut spaces = self
387            .spaces
388            .write()
389            .map_err(|_| CoreError::poisoned("spaces"))?;
390        if spaces.contains_key(name) {
391            return Ok(());
392        }
393
394        let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
395        let tantivy_reader = tantivy_index.reader()?;
396        let usearch_index = open_or_create_usearch_index(&usearch_path)?;
397        let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
398
399        spaces.insert(
400            name.to_string(),
401            Arc::new(SpaceIndexes {
402                _tantivy_dir: tantivy_dir,
403                usearch_path,
404                tantivy_index,
405                tantivy_reader,
406                tantivy_writer: Mutex::new(None),
407                usearch_index: RwLock::new(usearch_index),
408                fields,
409            }),
410        );
411
412        Ok(())
413    }
414
415    pub fn close_space(&self, name: &str) -> Result<()> {
416        let _space = self.get_space(name)?;
417        let mut spaces = self
418            .spaces
419            .write()
420            .map_err(|_| CoreError::poisoned("spaces"))?;
421        let _removed = spaces.remove(name);
422        Ok(())
423    }
424
425    pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
426        let space_id = {
427            let conn = self
428                .db
429                .lock()
430                .map_err(|_| CoreError::poisoned("database"))?;
431
432            let result = conn.execute(
433                "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
434                params![name, description],
435            );
436
437            match result {
438                Ok(_) => conn.last_insert_rowid(),
439                Err(err) => {
440                    return match err {
441                        Error::SqliteFailure(sqlite_err, _)
442                            if sqlite_err.code == ErrorCode::ConstraintViolation =>
443                        {
444                            Err(KboltError::SpaceAlreadyExists {
445                                name: name.to_string(),
446                            }
447                            .into())
448                        }
449                        other => Err(other.into()),
450                    };
451                }
452            }
453        };
454
455        if let Err(open_err) = self.open_space(name) {
456            let rollback_result = self
457                .db
458                .lock()
459                .map_err(|_| CoreError::poisoned("database"))?
460                .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
461
462            if let Err(rollback_err) = rollback_result {
463                return Err(CoreError::Internal(format!(
464                    "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
465                )));
466            }
467
468            return Err(open_err);
469        }
470
471        Ok(space_id)
472    }
473
474    pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
475        let conn = self
476            .db
477            .lock()
478            .map_err(|_| CoreError::poisoned("database"))?;
479        let mut stmt =
480            conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
481
482        let row = stmt.query_row(params![name], decode_space_row);
483
484        match row {
485            Ok(space) => Ok(space),
486            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
487                name: name.to_string(),
488            }
489            .into()),
490            Err(err) => Err(err.into()),
491        }
492    }
493
494    pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
495        let conn = self
496            .db
497            .lock()
498            .map_err(|_| CoreError::poisoned("database"))?;
499        let mut stmt =
500            conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
501
502        let row = stmt.query_row(params![id], decode_space_row);
503
504        match row {
505            Ok(space) => Ok(space),
506            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
507                name: format!("id={id}"),
508            }
509            .into()),
510            Err(err) => Err(err.into()),
511        }
512    }
513
514    pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
515        let conn = self
516            .db
517            .lock()
518            .map_err(|_| CoreError::poisoned("database"))?;
519        let mut stmt = conn.prepare(
520            "SELECT id, name, description, created
521             FROM spaces
522             ORDER BY name ASC",
523        )?;
524
525        let rows = stmt.query_map([], decode_space_row)?;
526
527        let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
528        Ok(spaces)
529    }
530
531    pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
532        let conn = self
533            .db
534            .lock()
535            .map_err(|_| CoreError::poisoned("database"))?;
536        let mut stmt = conn.prepare(
537            "SELECT s.id, s.name, s.description, s.created
538             FROM spaces s
539             JOIN collections c ON c.space_id = s.id
540             WHERE c.name = ?1
541             ORDER BY s.name ASC",
542        )?;
543        let rows = stmt.query_map(params![collection], decode_space_row)?;
544        let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
545
546        if matches.is_empty() {
547            return Ok(SpaceResolution::NotFound);
548        }
549
550        if matches.len() == 1 {
551            return Ok(SpaceResolution::Found(matches[0].clone()));
552        }
553
554        let spaces = matches.into_iter().map(|space| space.name).collect();
555        Ok(SpaceResolution::Ambiguous(spaces))
556    }
557
558    pub fn delete_space(&self, name: &str) -> Result<()> {
559        if name == DEFAULT_SPACE_NAME {
560            let conn = self
561                .db
562                .lock()
563                .map_err(|_| CoreError::poisoned("database"))?;
564            conn.execute(
565                "DELETE FROM collections
566                 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
567                params![name],
568            )?;
569            drop(conn);
570
571            self.unload_space(name)?;
572            self.remove_space_artifacts(name)?;
573            self.open_space(name)?;
574            return Ok(());
575        }
576
577        let conn = self
578            .db
579            .lock()
580            .map_err(|_| CoreError::poisoned("database"))?;
581        let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
582        drop(conn);
583
584        if deleted == 0 {
585            return Err(KboltError::SpaceNotFound {
586                name: name.to_string(),
587            }
588            .into());
589        }
590
591        self.unload_space(name)?;
592        self.remove_space_artifacts(name)?;
593
594        Ok(())
595    }
596
597    pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
598        if old == DEFAULT_SPACE_NAME {
599            return Err(
600                KboltError::Config("cannot rename reserved space: default".to_string()).into(),
601            );
602        }
603
604        let conn = self
605            .db
606            .lock()
607            .map_err(|_| CoreError::poisoned("database"))?;
608
609        let result = conn.execute(
610            "UPDATE spaces SET name = ?1 WHERE name = ?2",
611            params![new, old],
612        );
613        drop(conn);
614
615        match result {
616            Ok(0) => Err(KboltError::SpaceNotFound {
617                name: old.to_string(),
618            }
619            .into()),
620            Ok(_) => {
621                if let Err(rename_err) = self.rename_space_artifacts(old, new) {
622                    let rollback = self
623                        .db
624                        .lock()
625                        .map_err(|_| CoreError::poisoned("database"))?
626                        .execute(
627                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
628                            params![old, new],
629                        );
630
631                    if let Err(rollback_err) = rollback {
632                        return Err(CoreError::Internal(format!(
633                            "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
634                        )));
635                    }
636
637                    return Err(rename_err);
638                }
639
640                self.unload_space(old)?;
641                if let Err(open_err) = self.open_space(new) {
642                    let _ = self.rename_space_artifacts(new, old);
643                    let _ = self
644                        .db
645                        .lock()
646                        .map_err(|_| CoreError::poisoned("database"))?
647                        .execute(
648                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
649                            params![old, new],
650                        );
651                    let _ = self.open_space(old);
652                    return Err(open_err);
653                }
654
655                Ok(())
656            }
657            Err(Error::SqliteFailure(sqlite_err, _))
658                if sqlite_err.code == ErrorCode::ConstraintViolation =>
659            {
660                Err(KboltError::SpaceAlreadyExists {
661                    name: new.to_string(),
662                }
663                .into())
664            }
665            Err(err) => Err(err.into()),
666        }
667    }
668
669    pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
670        let conn = self
671            .db
672            .lock()
673            .map_err(|_| CoreError::poisoned("database"))?;
674
675        let updated = conn.execute(
676            "UPDATE spaces SET description = ?1 WHERE name = ?2",
677            params![description, name],
678        )?;
679
680        if updated == 0 {
681            return Err(KboltError::SpaceNotFound {
682                name: name.to_string(),
683            }
684            .into());
685        }
686
687        Ok(())
688    }
689
690    pub fn create_collection(
691        &self,
692        space_id: i64,
693        name: &str,
694        path: &Path,
695        description: Option<&str>,
696        extensions: Option<&[String]>,
697    ) -> Result<i64> {
698        let conn = self
699            .db
700            .lock()
701            .map_err(|_| CoreError::poisoned("database"))?;
702
703        let space_name = lookup_space_name(&conn, space_id)?;
704        let extensions_json = serialize_extensions(extensions)?;
705        let result = conn.execute(
706            "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
707             VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
708            params![space_id, name, path.to_string_lossy(), description, extensions_json],
709        );
710
711        match result {
712            Ok(_) => Ok(conn.last_insert_rowid()),
713            Err(Error::SqliteFailure(sqlite_err, _))
714                if sqlite_err.code == ErrorCode::ConstraintViolation =>
715            {
716                Err(KboltError::CollectionAlreadyExists {
717                    name: name.to_string(),
718                    space: space_name,
719                }
720                .into())
721            }
722            Err(err) => Err(err.into()),
723        }
724    }
725
726    pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
727        let conn = self
728            .db
729            .lock()
730            .map_err(|_| CoreError::poisoned("database"))?;
731        let mut stmt = conn.prepare(
732            "SELECT id, space_id, name, path, description, extensions, created, updated
733             FROM collections
734             WHERE space_id = ?1 AND name = ?2",
735        )?;
736
737        let result = stmt.query_row(params![space_id, name], decode_collection_row);
738        match result {
739            Ok(row) => Ok(row),
740            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
741                name: name.to_string(),
742            }
743            .into()),
744            Err(err) => Err(err.into()),
745        }
746    }
747
748    pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
749        let conn = self
750            .db
751            .lock()
752            .map_err(|_| CoreError::poisoned("database"))?;
753        let mut stmt = conn.prepare(
754            "SELECT id, space_id, name, path, description, extensions, created, updated
755             FROM collections
756             WHERE id = ?1",
757        )?;
758
759        let result = stmt.query_row(params![id], decode_collection_row);
760        match result {
761            Ok(row) => Ok(row),
762            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
763                name: format!("id={id}"),
764            }
765            .into()),
766            Err(err) => Err(err.into()),
767        }
768    }
769
770    pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
771        let conn = self
772            .db
773            .lock()
774            .map_err(|_| CoreError::poisoned("database"))?;
775
776        let (sql, params): (&str, Vec<i64>) = match space_id {
777            Some(id) => (
778                "SELECT id, space_id, name, path, description, extensions, created, updated
779                 FROM collections
780                 WHERE space_id = ?1
781                 ORDER BY name ASC",
782                vec![id],
783            ),
784            None => (
785                "SELECT id, space_id, name, path, description, extensions, created, updated
786                 FROM collections
787                 ORDER BY space_id ASC, name ASC",
788                Vec::new(),
789            ),
790        };
791
792        let mut stmt = conn.prepare(sql)?;
793        let rows = if params.is_empty() {
794            stmt.query_map([], decode_collection_row)?
795        } else {
796            stmt.query_map(params![params[0]], decode_collection_row)?
797        };
798        let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
799        Ok(collections)
800    }
801
802    pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
803        let conn = self
804            .db
805            .lock()
806            .map_err(|_| CoreError::poisoned("database"))?;
807        let _space_name = lookup_space_name(&conn, space_id)?;
808
809        query_count(
810            &conn,
811            "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
812            params![space_id],
813        )
814    }
815
816    pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
817        let conn = self
818            .db
819            .lock()
820            .map_err(|_| CoreError::poisoned("database"))?;
821        let _space_name = lookup_space_name(&conn, space_id)?;
822
823        let deleted = conn.execute(
824            "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
825            params![space_id, name],
826        )?;
827
828        if deleted == 0 {
829            return Err(KboltError::CollectionNotFound {
830                name: name.to_string(),
831            }
832            .into());
833        }
834
835        Ok(())
836    }
837
838    pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
839        let conn = self
840            .db
841            .lock()
842            .map_err(|_| CoreError::poisoned("database"))?;
843        let space_name = lookup_space_name(&conn, space_id)?;
844        let result = conn.execute(
845            "UPDATE collections
846             SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
847             WHERE space_id = ?2 AND name = ?3",
848            params![new, space_id, old],
849        );
850
851        match result {
852            Ok(0) => Err(KboltError::CollectionNotFound {
853                name: old.to_string(),
854            }
855            .into()),
856            Ok(_) => Ok(()),
857            Err(Error::SqliteFailure(sqlite_err, _))
858                if sqlite_err.code == ErrorCode::ConstraintViolation =>
859            {
860                Err(KboltError::CollectionAlreadyExists {
861                    name: new.to_string(),
862                    space: space_name,
863                }
864                .into())
865            }
866            Err(err) => Err(err.into()),
867        }
868    }
869
870    pub fn update_collection_description(
871        &self,
872        space_id: i64,
873        name: &str,
874        desc: &str,
875    ) -> Result<()> {
876        let conn = self
877            .db
878            .lock()
879            .map_err(|_| CoreError::poisoned("database"))?;
880        let _space_name = lookup_space_name(&conn, space_id)?;
881
882        let updated = conn.execute(
883            "UPDATE collections
884             SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
885             WHERE space_id = ?2 AND name = ?3",
886            params![desc, space_id, name],
887        )?;
888
889        if updated == 0 {
890            return Err(KboltError::CollectionNotFound {
891                name: name.to_string(),
892            }
893            .into());
894        }
895
896        Ok(())
897    }
898
899    pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
900        let conn = self
901            .db
902            .lock()
903            .map_err(|_| CoreError::poisoned("database"))?;
904
905        let updated = conn.execute(
906            "UPDATE collections
907             SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
908             WHERE id = ?1",
909            params![collection_id],
910        )?;
911
912        if updated == 0 {
913            return Err(KboltError::CollectionNotFound {
914                name: format!("id={collection_id}"),
915            }
916            .into());
917        }
918
919        Ok(())
920    }
921
922    pub fn upsert_document(
923        &self,
924        collection_id: i64,
925        path: &str,
926        title: &str,
927        title_source: DocumentTitleSource,
928        hash: &str,
929        modified: &str,
930    ) -> Result<i64> {
931        let conn = self
932            .db
933            .lock()
934            .map_err(|_| CoreError::poisoned("database"))?;
935        let _collection_name = lookup_collection_name(&conn, collection_id)?;
936
937        let id = conn.query_row(
938            "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
939             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
940             ON CONFLICT(collection_id, path) DO UPDATE SET
941                 title = excluded.title,
942                 title_source = excluded.title_source,
943                 hash = excluded.hash,
944                 modified = excluded.modified,
945                 active = 1,
946                 deactivated_at = NULL,
947                 fts_dirty = 1
948             RETURNING id",
949            params![
950                collection_id,
951                path,
952                title,
953                title_source.as_sql(),
954                hash,
955                modified
956            ],
957            |row| row.get(0),
958        )?;
959        Ok(id)
960    }
961
962    pub fn get_document_by_path(
963        &self,
964        collection_id: i64,
965        path: &str,
966    ) -> Result<Option<DocumentRow>> {
967        let conn = self
968            .db
969            .lock()
970            .map_err(|_| CoreError::poisoned("database"))?;
971        let _collection_name = lookup_collection_name(&conn, collection_id)?;
972        let mut stmt = conn.prepare(
973            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
974             FROM documents
975             WHERE collection_id = ?1 AND path = ?2",
976        )?;
977
978        let result = stmt.query_row(params![collection_id, path], decode_document_row);
979        match result {
980            Ok(row) => Ok(Some(row)),
981            Err(Error::QueryReturnedNoRows) => Ok(None),
982            Err(err) => Err(err.into()),
983        }
984    }
985
986    pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
987        let conn = self
988            .db
989            .lock()
990            .map_err(|_| CoreError::poisoned("database"))?;
991        let mut stmt = conn.prepare(
992            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
993             FROM documents
994             WHERE id = ?1",
995        )?;
996
997        let result = stmt.query_row(params![id], decode_document_row);
998        match result {
999            Ok(row) => Ok(row),
1000            Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
1001                path: format!("id={id}"),
1002            }
1003            .into()),
1004            Err(err) => Err(err.into()),
1005        }
1006    }
1007
1008    pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
1009        if ids.is_empty() {
1010            return Ok(Vec::new());
1011        }
1012
1013        let conn = self
1014            .db
1015            .lock()
1016            .map_err(|_| CoreError::poisoned("database"))?;
1017
1018        let placeholders = vec!["?"; ids.len()].join(", ");
1019        let sql = format!(
1020            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1021             FROM documents
1022             WHERE id IN ({placeholders})"
1023        );
1024        let mut stmt = conn.prepare(&sql)?;
1025        let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
1026        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1027        Ok(docs)
1028    }
1029
1030    pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
1031        let conn = self
1032            .db
1033            .lock()
1034            .map_err(|_| CoreError::poisoned("database"))?;
1035
1036        let updated = conn.execute(
1037            "UPDATE documents
1038             SET modified = ?1,
1039                 active = 1,
1040                 deactivated_at = NULL
1041             WHERE id = ?2",
1042            params![modified, doc_id],
1043        )?;
1044
1045        if updated == 0 {
1046            return Err(KboltError::DocumentNotFound {
1047                path: format!("id={doc_id}"),
1048            }
1049            .into());
1050        }
1051
1052        Ok(())
1053    }
1054
1055    pub fn list_documents(
1056        &self,
1057        collection_id: i64,
1058        active_only: bool,
1059    ) -> Result<Vec<DocumentRow>> {
1060        let conn = self
1061            .db
1062            .lock()
1063            .map_err(|_| CoreError::poisoned("database"))?;
1064        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1065        let active_only = i64::from(active_only);
1066        let mut stmt = conn.prepare(
1067            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1068             FROM documents
1069             WHERE collection_id = ?1
1070               AND (?2 = 0 OR active = 1)
1071             ORDER BY path ASC",
1072        )?;
1073        let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1074        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1075        Ok(docs)
1076    }
1077
1078    pub fn list_collection_file_rows(
1079        &self,
1080        collection_id: i64,
1081        active_only: bool,
1082    ) -> Result<Vec<FileListRow>> {
1083        let conn = self
1084            .db
1085            .lock()
1086            .map_err(|_| CoreError::poisoned("database"))?;
1087        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1088        let active_only = i64::from(active_only);
1089        let mut stmt = conn.prepare(
1090            "SELECT d.id, d.path, d.title, d.hash, d.active,
1091                    COUNT(DISTINCT c.id) AS chunk_count,
1092                    COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1093             FROM documents d
1094             LEFT JOIN chunks c ON c.doc_id = d.id
1095             LEFT JOIN embeddings e ON e.chunk_id = c.id
1096             WHERE d.collection_id = ?1
1097               AND (?2 = 0 OR d.active = 1)
1098             GROUP BY d.id, d.path, d.title, d.hash, d.active
1099             ORDER BY d.path ASC",
1100        )?;
1101        let rows = stmt.query_map(params![collection_id, active_only], |row| {
1102            let chunk_count: i64 = row.get(5)?;
1103            let embedded_chunk_count: i64 = row.get(6)?;
1104            Ok(FileListRow {
1105                doc_id: row.get(0)?,
1106                path: row.get(1)?,
1107                title: row.get(2)?,
1108                hash: row.get(3)?,
1109                active: row.get::<_, i64>(4)? != 0,
1110                chunk_count: chunk_count as usize,
1111                embedded_chunk_count: embedded_chunk_count as usize,
1112            })
1113        })?;
1114        let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1115        Ok(files)
1116    }
1117
1118    pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1119        let conn = self
1120            .db
1121            .lock()
1122            .map_err(|_| CoreError::poisoned("database"))?;
1123
1124        let pattern = format!("{prefix}%");
1125        let mut stmt = conn.prepare(
1126            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1127             FROM documents
1128             WHERE hash LIKE ?1
1129             ORDER BY id ASC",
1130        )?;
1131        let rows = stmt.query_map(params![pattern], decode_document_row)?;
1132        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1133        Ok(docs)
1134    }
1135
1136    pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1137        let conn = self
1138            .db
1139            .lock()
1140            .map_err(|_| CoreError::poisoned("database"))?;
1141
1142        let updated = conn.execute(
1143            "UPDATE documents
1144             SET active = 0,
1145                 deactivated_at = CASE
1146                    WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1147                    ELSE deactivated_at
1148                 END
1149             WHERE id = ?1",
1150            params![doc_id],
1151        )?;
1152
1153        if updated == 0 {
1154            return Err(KboltError::DocumentNotFound {
1155                path: format!("id={doc_id}"),
1156            }
1157            .into());
1158        }
1159
1160        Ok(())
1161    }
1162
1163    pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1164        let conn = self
1165            .db
1166            .lock()
1167            .map_err(|_| CoreError::poisoned("database"))?;
1168
1169        let updated = conn.execute(
1170            "UPDATE documents
1171             SET active = 1, deactivated_at = NULL
1172             WHERE id = ?1",
1173            params![doc_id],
1174        )?;
1175
1176        if updated == 0 {
1177            return Err(KboltError::DocumentNotFound {
1178                path: format!("id={doc_id}"),
1179            }
1180            .into());
1181        }
1182
1183        Ok(())
1184    }
1185
1186    pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1187        let reaped = self.list_reapable_documents(older_than_days)?;
1188        let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1189        self.delete_documents(&doc_ids)?;
1190        Ok(doc_ids)
1191    }
1192
1193    pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1194        self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1195    }
1196
1197    pub fn list_reapable_documents_in_space(
1198        &self,
1199        older_than_days: u32,
1200        space_id: i64,
1201    ) -> Result<Vec<ReapableDocument>> {
1202        self.list_reapable_documents_filtered(
1203            older_than_days,
1204            " AND c.space_id = ?",
1205            vec![SqlValue::Integer(space_id)],
1206        )
1207    }
1208
1209    pub fn list_reapable_documents_in_collections(
1210        &self,
1211        older_than_days: u32,
1212        collection_ids: &[i64],
1213    ) -> Result<Vec<ReapableDocument>> {
1214        if collection_ids.is_empty() {
1215            return Ok(Vec::new());
1216        }
1217
1218        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1219        let clause = format!(" AND d.collection_id IN ({placeholders})");
1220        let params = collection_ids
1221            .iter()
1222            .map(|id| SqlValue::Integer(*id))
1223            .collect::<Vec<_>>();
1224        self.list_reapable_documents_filtered(older_than_days, &clause, params)
1225    }
1226
1227    fn list_reapable_documents_filtered(
1228        &self,
1229        older_than_days: u32,
1230        scope_clause: &str,
1231        scope_params: Vec<SqlValue>,
1232    ) -> Result<Vec<ReapableDocument>> {
1233        let conn = self
1234            .db
1235            .lock()
1236            .map_err(|_| CoreError::poisoned("database"))?;
1237
1238        let modifier = format!("-{} days", older_than_days);
1239        let sql = format!(
1240            "SELECT d.id, s.name
1241             FROM documents d
1242             JOIN collections c ON c.id = d.collection_id
1243             JOIN spaces s ON s.id = c.space_id
1244             WHERE d.active = 0
1245               AND d.deactivated_at IS NOT NULL
1246               AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1247             ORDER BY d.id ASC"
1248        );
1249        let mut stmt = conn.prepare(&sql)?;
1250        let mut params = Vec::with_capacity(scope_params.len() + 1);
1251        params.push(SqlValue::Text(modifier));
1252        params.extend(scope_params);
1253        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1254            Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1255        })?;
1256        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1257        drop(stmt);
1258
1259        let mut documents = Vec::with_capacity(headers.len());
1260        for (doc_id, space_name) in headers {
1261            documents.push(ReapableDocument {
1262                doc_id,
1263                space_name,
1264                chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1265            });
1266        }
1267
1268        Ok(documents)
1269    }
1270
1271    pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1272        if doc_ids.is_empty() {
1273            return Ok(());
1274        }
1275
1276        let conn = self
1277            .db
1278            .lock()
1279            .map_err(|_| CoreError::poisoned("database"))?;
1280
1281        let placeholders = vec!["?"; doc_ids.len()].join(", ");
1282        let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1283        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1284        Ok(())
1285    }
1286
1287    pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1288        if chunks.is_empty() {
1289            return Ok(Vec::new());
1290        }
1291
1292        let conn = self
1293            .db
1294            .lock()
1295            .map_err(|_| CoreError::poisoned("database"))?;
1296        let _doc = lookup_document_id(&conn, doc_id)?;
1297
1298        let tx = conn.unchecked_transaction()?;
1299        let mut stmt = tx.prepare(
1300            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1301             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1302        )?;
1303
1304        let mut ids = Vec::with_capacity(chunks.len());
1305        for chunk in chunks {
1306            stmt.execute(params![
1307                doc_id,
1308                chunk.seq,
1309                chunk.offset as i64,
1310                chunk.length as i64,
1311                chunk.heading,
1312                chunk.kind.as_storage_kind(),
1313                chunk.retrieval_prefix.as_deref(),
1314            ])?;
1315            ids.push(tx.last_insert_rowid());
1316        }
1317        drop(stmt);
1318        tx.commit()?;
1319        Ok(ids)
1320    }
1321
1322    pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1323        let conn = self
1324            .db
1325            .lock()
1326            .map_err(|_| CoreError::poisoned("database"))?;
1327        let _doc = lookup_document_id(&conn, doc_id)?;
1328
1329        let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1330        let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1331        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1332
1333        conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1334        Ok(chunk_ids)
1335    }
1336
1337    pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1338        let conn = self
1339            .db
1340            .lock()
1341            .map_err(|_| CoreError::poisoned("database"))?;
1342        let _doc = lookup_document_id(&conn, doc_id)?;
1343
1344        let mut stmt = conn.prepare(
1345            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1346             FROM chunks
1347             WHERE doc_id = ?1
1348             ORDER BY seq ASC",
1349        )?;
1350        let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1351        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1352        Ok(chunks)
1353    }
1354
1355    pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1356        if doc_ids.is_empty() {
1357            return Ok(HashMap::new());
1358        }
1359
1360        let mut requested = doc_ids.to_vec();
1361        requested.sort_unstable();
1362        requested.dedup();
1363
1364        let conn = self
1365            .db
1366            .lock()
1367            .map_err(|_| CoreError::poisoned("database"))?;
1368
1369        let placeholders = vec!["?"; requested.len()].join(", ");
1370        let sql = format!(
1371            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1372             FROM chunks
1373             WHERE doc_id IN ({placeholders})
1374             ORDER BY doc_id ASC, seq ASC"
1375        );
1376        let mut stmt = conn.prepare(&sql)?;
1377        let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1378        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1379        for doc_id in requested {
1380            chunks_by_doc.insert(doc_id, Vec::new());
1381        }
1382        for row in rows {
1383            let chunk = row?;
1384            chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1385        }
1386
1387        Ok(chunks_by_doc)
1388    }
1389
1390    pub fn get_chunks_for_document_seq_ranges(
1391        &self,
1392        ranges: &[(i64, i32, i32)],
1393    ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1394        if ranges.is_empty() {
1395            return Ok(HashMap::new());
1396        }
1397
1398        let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1399        for (doc_id, min_seq, max_seq) in ranges {
1400            if min_seq > max_seq {
1401                return Err(CoreError::Internal(format!(
1402                    "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1403                )));
1404            }
1405
1406            ranges_by_doc
1407                .entry(*doc_id)
1408                .or_default()
1409                .push((*min_seq, *max_seq));
1410        }
1411
1412        for doc_ranges in ranges_by_doc.values_mut() {
1413            doc_ranges.sort_unstable();
1414            let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1415            for (min_seq, max_seq) in doc_ranges.drain(..) {
1416                if let Some((_, merged_max)) = merged.last_mut() {
1417                    if min_seq <= merged_max.saturating_add(1) {
1418                        *merged_max = (*merged_max).max(max_seq);
1419                        continue;
1420                    }
1421                }
1422                merged.push((min_seq, max_seq));
1423            }
1424            *doc_ranges = merged;
1425        }
1426
1427        let conn = self
1428            .db
1429            .lock()
1430            .map_err(|_| CoreError::poisoned("database"))?;
1431        let mut stmt = conn.prepare(
1432            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1433             FROM chunks
1434             WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1435             ORDER BY seq ASC",
1436        )?;
1437
1438        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1439        for (doc_id, doc_ranges) in ranges_by_doc {
1440            chunks_by_doc.entry(doc_id).or_default();
1441            for (min_seq, max_seq) in doc_ranges {
1442                let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1443                for row in rows {
1444                    chunks_by_doc.entry(doc_id).or_default().push(row?);
1445                }
1446            }
1447        }
1448
1449        Ok(chunks_by_doc)
1450    }
1451
1452    pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1453        if chunk_ids.is_empty() {
1454            return Ok(Vec::new());
1455        }
1456
1457        let conn = self
1458            .db
1459            .lock()
1460            .map_err(|_| CoreError::poisoned("database"))?;
1461
1462        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1463        let sql = format!(
1464            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1465             FROM chunks
1466             WHERE id IN ({placeholders})
1467             ORDER BY id ASC"
1468        );
1469        let mut stmt = conn.prepare(&sql)?;
1470        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1471        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1472        Ok(chunks)
1473    }
1474
1475    pub fn get_chunks_with_documents(
1476        &self,
1477        chunk_ids: &[i64],
1478    ) -> Result<Vec<(ChunkRow, DocumentRow)>> {
1479        if chunk_ids.is_empty() {
1480            return Ok(Vec::new());
1481        }
1482
1483        let conn = self
1484            .db
1485            .lock()
1486            .map_err(|_| CoreError::poisoned("database"))?;
1487
1488        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1489        let sql = format!(
1490            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
1491                    d.id, d.collection_id, d.path, d.title, d.title_source, d.hash, d.modified, d.active, d.deactivated_at, d.fts_dirty
1492             FROM chunks c
1493             JOIN documents d ON d.id = c.doc_id
1494             WHERE c.id IN ({placeholders})
1495             ORDER BY c.id ASC"
1496        );
1497        let mut stmt = conn.prepare(&sql)?;
1498        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), |row| {
1499            Ok((
1500                decode_chunk_row_at(row, 0)?,
1501                decode_document_row_at(row, 8)?,
1502            ))
1503        })?;
1504        let rows = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1505        Ok(rows)
1506    }
1507
1508    pub fn get_active_search_scope_summary_in_collections(
1509        &self,
1510        collection_ids: &[i64],
1511    ) -> Result<SearchScopeSummary> {
1512        if collection_ids.is_empty() {
1513            return Ok(SearchScopeSummary {
1514                document_ids: Vec::new(),
1515                chunk_count: 0,
1516            });
1517        }
1518
1519        let conn = self
1520            .db
1521            .lock()
1522            .map_err(|_| CoreError::poisoned("database"))?;
1523
1524        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1525        let sql = format!(
1526            "SELECT d.id, COUNT(c.id)
1527             FROM chunks c
1528             JOIN documents d ON d.id = c.doc_id
1529             WHERE d.active = 1
1530               AND d.collection_id IN ({placeholders})
1531             GROUP BY d.id
1532             ORDER BY d.id ASC"
1533        );
1534        let mut stmt = conn.prepare(&sql)?;
1535        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1536            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1537        })?;
1538
1539        let mut document_ids = Vec::new();
1540        let mut chunk_count = 0usize;
1541        for row in rows {
1542            let (doc_id, doc_chunk_count) = row?;
1543            let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1544                CoreError::Internal(format!(
1545                    "active chunk count must be non-negative for document {doc_id}"
1546                ))
1547            })?;
1548            document_ids.push(doc_id);
1549            chunk_count = chunk_count.saturating_add(doc_chunk_count);
1550        }
1551
1552        Ok(SearchScopeSummary {
1553            document_ids,
1554            chunk_count,
1555        })
1556    }
1557
1558    pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1559        if collection_ids.is_empty() {
1560            return Ok(0);
1561        }
1562
1563        let conn = self
1564            .db
1565            .lock()
1566            .map_err(|_| CoreError::poisoned("database"))?;
1567
1568        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1569        let sql = format!(
1570            "SELECT COUNT(*)
1571             FROM chunks c
1572             JOIN documents d ON d.id = c.doc_id
1573             WHERE d.active = 1
1574               AND d.collection_id IN ({placeholders})"
1575        );
1576        query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1577    }
1578
1579    pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1580        if collection_ids.is_empty() {
1581            return Ok(false);
1582        }
1583
1584        let conn = self
1585            .db
1586            .lock()
1587            .map_err(|_| CoreError::poisoned("database"))?;
1588
1589        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1590        let sql = format!(
1591            "SELECT EXISTS(
1592                 SELECT 1
1593                 FROM documents
1594                 WHERE active = 0
1595                   AND collection_id IN ({placeholders})
1596             )"
1597        );
1598        let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1599            row.get(0)
1600        })?;
1601        Ok(exists != 0)
1602    }
1603
1604    pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1605        if collection_ids.is_empty() {
1606            return Ok(Vec::new());
1607        }
1608
1609        let conn = self
1610            .db
1611            .lock()
1612            .map_err(|_| CoreError::poisoned("database"))?;
1613
1614        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1615        let sql = format!(
1616            "SELECT c.id
1617             FROM chunks c
1618             JOIN documents d ON d.id = c.doc_id
1619             WHERE d.active = 1
1620               AND d.collection_id IN ({placeholders})
1621             ORDER BY d.id ASC, c.seq ASC"
1622        );
1623        let mut stmt = conn.prepare(&sql)?;
1624        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1625        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1626        Ok(chunk_ids)
1627    }
1628
1629    pub fn replace_document_generation(
1630        &self,
1631        replacement: DocumentGenerationReplace<'_>,
1632    ) -> Result<DocumentGenerationReplaceResult> {
1633        for chunk in replacement.chunks {
1634            validate_text_span(
1635                replacement.text,
1636                chunk.offset,
1637                chunk.length,
1638                &format!("chunk seq {}", chunk.seq),
1639            )?;
1640        }
1641
1642        let conn = self
1643            .db
1644            .lock()
1645            .map_err(|_| CoreError::poisoned("database"))?;
1646        let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1647
1648        let tx = conn.unchecked_transaction()?;
1649        let doc_id: i64 = tx.query_row(
1650            "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
1651             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
1652             ON CONFLICT(collection_id, path) DO UPDATE SET
1653                 title = excluded.title,
1654                 title_source = excluded.title_source,
1655                 hash = excluded.hash,
1656                 modified = excluded.modified,
1657                 active = 1,
1658                 deactivated_at = NULL,
1659                 fts_dirty = 1
1660             RETURNING id",
1661            params![
1662                replacement.collection_id,
1663                replacement.path,
1664                replacement.title,
1665                replacement.title_source.as_sql(),
1666                replacement.hash,
1667                replacement.modified
1668            ],
1669            |row| row.get(0),
1670        )?;
1671
1672        let old_chunk_ids = {
1673            let mut stmt =
1674                tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1675            let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1676            rows.collect::<std::result::Result<Vec<_>, _>>()?
1677        };
1678
1679        tx.execute(
1680            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1681             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1682             ON CONFLICT(doc_id) DO UPDATE SET
1683                 extractor_key = excluded.extractor_key,
1684                 source_hash = excluded.source_hash,
1685                 text_hash = excluded.text_hash,
1686                 generation_key = excluded.generation_key,
1687                 text = excluded.text,
1688                 created = excluded.created",
1689            params![
1690                doc_id,
1691                replacement.extractor_key,
1692                replacement.source_hash,
1693                replacement.text_hash,
1694                replacement.generation_key,
1695                replacement.text
1696            ],
1697        )?;
1698
1699        tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1700
1701        let mut stmt = tx.prepare(
1702            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1703             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1704        )?;
1705        let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1706        for chunk in replacement.chunks {
1707            stmt.execute(params![
1708                doc_id,
1709                chunk.seq,
1710                chunk.offset as i64,
1711                chunk.length as i64,
1712                chunk.heading,
1713                chunk.kind.as_storage_kind(),
1714                chunk.retrieval_prefix.as_deref(),
1715            ])?;
1716            chunk_ids.push(tx.last_insert_rowid());
1717        }
1718        drop(stmt);
1719
1720        tx.commit()?;
1721        Ok(DocumentGenerationReplaceResult {
1722            doc_id,
1723            old_chunk_ids,
1724            chunk_ids,
1725        })
1726    }
1727
1728    pub fn put_document_text(
1729        &self,
1730        doc_id: i64,
1731        extractor_key: &str,
1732        source_hash: &str,
1733        text_hash: &str,
1734        generation_key: &str,
1735        text: &str,
1736    ) -> Result<()> {
1737        let conn = self
1738            .db
1739            .lock()
1740            .map_err(|_| CoreError::poisoned("database"))?;
1741        let _doc = lookup_document_id(&conn, doc_id)?;
1742
1743        conn.execute(
1744            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1745             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1746             ON CONFLICT(doc_id) DO UPDATE SET
1747                 extractor_key = excluded.extractor_key,
1748                 source_hash = excluded.source_hash,
1749                 text_hash = excluded.text_hash,
1750                 generation_key = excluded.generation_key,
1751                 text = excluded.text,
1752                 created = excluded.created",
1753            params![
1754                doc_id,
1755                extractor_key,
1756                source_hash,
1757                text_hash,
1758                generation_key,
1759                text
1760            ],
1761        )?;
1762        Ok(())
1763    }
1764
1765    pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1766        let conn = self
1767            .db
1768            .lock()
1769            .map_err(|_| CoreError::poisoned("database"))?;
1770        let _doc = lookup_document_id(&conn, doc_id)?;
1771
1772        let result = conn.query_row(
1773            "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1774             FROM document_texts
1775             WHERE doc_id = ?1",
1776            params![doc_id],
1777            decode_document_text_row,
1778        );
1779        match result {
1780            Ok(row) => Ok(row),
1781            Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1782            Err(err) => Err(err.into()),
1783        }
1784    }
1785
1786    pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1787        if doc_ids.is_empty() {
1788            return Ok(HashMap::new());
1789        }
1790
1791        let mut requested = doc_ids.to_vec();
1792        requested.sort_unstable();
1793        requested.dedup();
1794
1795        let text_by_doc = self.get_existing_document_texts(&requested)?;
1796        for doc_id in requested {
1797            if !text_by_doc.contains_key(&doc_id) {
1798                return Err(missing_document_text_error(doc_id));
1799            }
1800        }
1801
1802        Ok(text_by_doc)
1803    }
1804
1805    pub fn get_existing_document_texts(
1806        &self,
1807        doc_ids: &[i64],
1808    ) -> Result<HashMap<i64, DocumentTextRow>> {
1809        if doc_ids.is_empty() {
1810            return Ok(HashMap::new());
1811        }
1812
1813        let mut requested = doc_ids.to_vec();
1814        requested.sort_unstable();
1815        requested.dedup();
1816
1817        let conn = self
1818            .db
1819            .lock()
1820            .map_err(|_| CoreError::poisoned("database"))?;
1821        let mut text_by_doc = HashMap::new();
1822        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1823            let placeholders = vec!["?"; batch.len()].join(", ");
1824            let sql = format!(
1825                "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1826                 FROM document_texts
1827                 WHERE doc_id IN ({placeholders})
1828                 ORDER BY doc_id ASC"
1829            );
1830            let mut stmt = conn.prepare(&sql)?;
1831            let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1832            for row in rows {
1833                let row = row?;
1834                text_by_doc.insert(row.doc_id, row);
1835            }
1836        }
1837
1838        Ok(text_by_doc)
1839    }
1840
1841    pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1842        let conn = self
1843            .db
1844            .lock()
1845            .map_err(|_| CoreError::poisoned("database"))?;
1846        let exists: i64 = conn.query_row(
1847            "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1848            params![doc_id],
1849            |row| row.get(0),
1850        )?;
1851        Ok(exists != 0)
1852    }
1853
1854    pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1855        let conn = self
1856            .db
1857            .lock()
1858            .map_err(|_| CoreError::poisoned("database"))?;
1859        let exists: i64 = conn.query_row(
1860            "SELECT EXISTS(
1861                SELECT 1 FROM document_texts
1862                WHERE doc_id = ?1 AND generation_key = ?2
1863            )",
1864            params![doc_id, generation_key],
1865            |row| row.get(0),
1866        )?;
1867        Ok(exists != 0)
1868    }
1869
1870    pub fn get_document_text_generation_keys(
1871        &self,
1872        doc_ids: &[i64],
1873    ) -> Result<HashMap<i64, String>> {
1874        if doc_ids.is_empty() {
1875            return Ok(HashMap::new());
1876        }
1877
1878        let mut requested = doc_ids.to_vec();
1879        requested.sort_unstable();
1880        requested.dedup();
1881
1882        let conn = self
1883            .db
1884            .lock()
1885            .map_err(|_| CoreError::poisoned("database"))?;
1886        let mut generation_by_doc = HashMap::with_capacity(requested.len());
1887        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1888            let placeholders = vec!["?"; batch.len()].join(", ");
1889            let sql = format!(
1890                "SELECT doc_id, generation_key
1891                 FROM document_texts
1892                 WHERE doc_id IN ({placeholders})"
1893            );
1894            let mut stmt = conn.prepare(&sql)?;
1895            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1896                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1897            })?;
1898            for row in rows {
1899                let (doc_id, generation_key) = row?;
1900                generation_by_doc.insert(doc_id, generation_key);
1901            }
1902        }
1903
1904        Ok(generation_by_doc)
1905    }
1906
1907    pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1908        if doc_ids.is_empty() {
1909            return Ok(HashMap::new());
1910        }
1911
1912        let mut requested = doc_ids.to_vec();
1913        requested.sort_unstable();
1914        requested.dedup();
1915
1916        let conn = self
1917            .db
1918            .lock()
1919            .map_err(|_| CoreError::poisoned("database"))?;
1920        let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1921        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1922            let placeholders = vec!["?"; batch.len()].join(", ");
1923            let sql = format!(
1924                "SELECT doc_id, extractor_key
1925                 FROM document_texts
1926                 WHERE doc_id IN ({placeholders})"
1927            );
1928            let mut stmt = conn.prepare(&sql)?;
1929            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1930                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1931            })?;
1932            for row in rows {
1933                let (doc_id, extractor_key) = row?;
1934                extractor_by_doc.insert(doc_id, extractor_key);
1935            }
1936        }
1937
1938        for doc_id in requested {
1939            if !extractor_by_doc.contains_key(&doc_id) {
1940                return Err(missing_document_text_error(doc_id));
1941            }
1942        }
1943
1944        Ok(extractor_by_doc)
1945    }
1946
1947    pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1948        if chunk_ids.is_empty() {
1949            return Ok(HashMap::new());
1950        }
1951
1952        let mut requested = chunk_ids.to_vec();
1953        requested.sort_unstable();
1954        requested.dedup();
1955
1956        let conn = self
1957            .db
1958            .lock()
1959            .map_err(|_| CoreError::poisoned("database"))?;
1960        let mut text_by_chunk = HashMap::with_capacity(requested.len());
1961        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1962            let placeholders = vec!["?"; batch.len()].join(", ");
1963            let sql = format!(
1964                "SELECT c.id,
1965                        c.offset,
1966                        c.length,
1967                        length(CAST(dt.text AS BLOB)),
1968                        substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1969                        substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1970                        substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1971                 FROM chunks c
1972                 JOIN document_texts dt ON dt.doc_id = c.doc_id
1973                 WHERE c.id IN ({placeholders})"
1974            );
1975            let mut stmt = conn.prepare(&sql)?;
1976            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1977                Ok((
1978                    row.get::<_, i64>(0)?,
1979                    row.get::<_, i64>(1)?,
1980                    row.get::<_, i64>(2)?,
1981                    row.get::<_, i64>(3)?,
1982                    row.get::<_, Vec<u8>>(4)?,
1983                    row.get::<_, Vec<u8>>(5)?,
1984                    row.get::<_, Vec<u8>>(6)?,
1985                ))
1986            })?;
1987            for row in rows {
1988                let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1989                let text = canonical_chunk_slice_from_bytes(
1990                    chunk_id,
1991                    offset,
1992                    length,
1993                    document_len,
1994                    start_byte,
1995                    end_byte,
1996                    bytes,
1997                )?;
1998                text_by_chunk.insert(chunk_id, text);
1999            }
2000        }
2001
2002        for chunk_id in requested {
2003            if !text_by_chunk.contains_key(&chunk_id) {
2004                return Err(CoreError::Internal(format!(
2005                    "canonical text missing for chunk {chunk_id}"
2006                )));
2007            }
2008        }
2009
2010        Ok(text_by_chunk)
2011    }
2012
2013    pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
2014        let conn = self
2015            .db
2016            .lock()
2017            .map_err(|_| CoreError::poisoned("database"))?;
2018        let result = conn.query_row(
2019            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
2020             FROM chunks c
2021             JOIN document_texts dt ON dt.doc_id = c.doc_id
2022             WHERE c.id = ?1",
2023            params![chunk_id],
2024            |row| {
2025                let chunk = decode_chunk_row(row)?;
2026                let extractor_key: String = row.get(8)?;
2027                let document_text: String = row.get(9)?;
2028                Ok((chunk, extractor_key, document_text))
2029            },
2030        );
2031        let (chunk, extractor_key, document_text) = match result {
2032            Ok(row) => row,
2033            Err(Error::QueryReturnedNoRows) => {
2034                let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
2035                return Err(missing_document_text_error(doc_id));
2036            }
2037            Err(err) => return Err(err.into()),
2038        };
2039        let text = chunk_text_from_canonical(&document_text, &chunk)?;
2040        Ok(ChunkTextRow {
2041            chunk,
2042            extractor_key,
2043            text,
2044        })
2045    }
2046
2047    pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2048        if entries.is_empty() {
2049            return Ok(());
2050        }
2051
2052        let conn = self
2053            .db
2054            .lock()
2055            .map_err(|_| CoreError::poisoned("database"))?;
2056
2057        let tx = conn.unchecked_transaction()?;
2058        let mut stmt = tx.prepare(
2059            "INSERT INTO embeddings (chunk_id, model, embedded_at)
2060             VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2061             ON CONFLICT(chunk_id, model) DO UPDATE SET
2062               embedded_at = excluded.embedded_at",
2063        )?;
2064
2065        for (chunk_id, model) in entries {
2066            stmt.execute(params![chunk_id, model])?;
2067        }
2068
2069        drop(stmt);
2070        tx.commit()?;
2071        Ok(())
2072    }
2073
2074    pub fn get_unembedded_chunks(
2075        &self,
2076        model: &str,
2077        after_chunk_id: i64,
2078        limit: usize,
2079    ) -> Result<Vec<EmbedRecord>> {
2080        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2081    }
2082
2083    pub fn get_unembedded_chunks_in_space(
2084        &self,
2085        model: &str,
2086        space_id: i64,
2087        after_chunk_id: i64,
2088        limit: usize,
2089    ) -> Result<Vec<EmbedRecord>> {
2090        self.get_unembedded_chunks_filtered(
2091            model,
2092            after_chunk_id,
2093            limit,
2094            " AND col.space_id = ?",
2095            vec![SqlValue::Integer(space_id)],
2096        )
2097    }
2098
2099    pub fn get_unembedded_chunks_in_collections(
2100        &self,
2101        model: &str,
2102        collection_ids: &[i64],
2103        after_chunk_id: i64,
2104        limit: usize,
2105    ) -> Result<Vec<EmbedRecord>> {
2106        if collection_ids.is_empty() {
2107            return Ok(Vec::new());
2108        }
2109
2110        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2111        let clause = format!(" AND d.collection_id IN ({placeholders})");
2112        let params = collection_ids
2113            .iter()
2114            .map(|id| SqlValue::Integer(*id))
2115            .collect::<Vec<_>>();
2116        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2117    }
2118
2119    fn get_unembedded_chunks_filtered(
2120        &self,
2121        model: &str,
2122        after_chunk_id: i64,
2123        limit: usize,
2124        scope_clause: &str,
2125        scope_params: Vec<SqlValue>,
2126    ) -> Result<Vec<EmbedRecord>> {
2127        let conn = self
2128            .db
2129            .lock()
2130            .map_err(|_| CoreError::poisoned("database"))?;
2131        let sql_limit = i64::try_from(limit)
2132            .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2133
2134        let sql = format!(
2135            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2136                    d.path, col.path, s.name
2137             FROM chunks c
2138             JOIN documents d ON d.id = c.doc_id
2139             JOIN collections col ON col.id = d.collection_id
2140             JOIN spaces s ON s.id = col.space_id
2141             LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2142             WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2143             ORDER BY c.id ASC
2144             LIMIT ?"
2145        );
2146        let mut stmt = conn.prepare(&sql)?;
2147        let mut params = Vec::with_capacity(scope_params.len() + 3);
2148        params.push(SqlValue::Text(model.to_string()));
2149        params.push(SqlValue::Integer(after_chunk_id));
2150        params.extend(scope_params);
2151        params.push(SqlValue::Integer(sql_limit));
2152        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2153            Ok(EmbedRecord {
2154                chunk: decode_chunk_row(row)?,
2155                doc_path: row.get(8)?,
2156                collection_path: PathBuf::from(row.get::<_, String>(9)?),
2157                space_name: row.get(10)?,
2158            })
2159        })?;
2160
2161        let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2162        Ok(records)
2163    }
2164
2165    pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2166        let conn = self
2167            .db
2168            .lock()
2169            .map_err(|_| CoreError::poisoned("database"))?;
2170
2171        let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2172        Ok(deleted)
2173    }
2174
2175    pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2176        let conn = self
2177            .db
2178            .lock()
2179            .map_err(|_| CoreError::poisoned("database"))?;
2180        let _space_name = lookup_space_name(&conn, space_id)?;
2181
2182        let deleted = conn.execute(
2183            "DELETE FROM embeddings
2184             WHERE chunk_id IN (
2185                 SELECT c.id
2186                 FROM chunks c
2187                 JOIN documents d ON d.id = c.doc_id
2188                 JOIN collections col ON col.id = d.collection_id
2189                 WHERE col.space_id = ?1
2190             )",
2191            params![space_id],
2192        )?;
2193        Ok(deleted)
2194    }
2195
2196    pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2197        let conn = self
2198            .db
2199            .lock()
2200            .map_err(|_| CoreError::poisoned("database"))?;
2201        let _space_name = lookup_space_name(&conn, space_id)?;
2202
2203        let mut stmt = conn.prepare(
2204            "SELECT DISTINCT e.model
2205             FROM embeddings e
2206             JOIN chunks c ON c.id = e.chunk_id
2207             JOIN documents d ON d.id = c.doc_id
2208             JOIN collections col ON col.id = d.collection_id
2209             WHERE col.space_id = ?1
2210             ORDER BY e.model ASC",
2211        )?;
2212        let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2213        let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2214        Ok(models)
2215    }
2216
2217    pub fn count_embeddings(&self) -> Result<usize> {
2218        let conn = self
2219            .db
2220            .lock()
2221            .map_err(|_| CoreError::poisoned("database"))?;
2222
2223        let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2224        Ok(count as usize)
2225    }
2226
2227    pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2228        if entries.is_empty() {
2229            return Ok(());
2230        }
2231
2232        let space_indexes = self.get_space_indexes(space)?;
2233        with_tantivy_writer(&space_indexes, |writer| {
2234            for entry in entries {
2235                let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2236                    CoreError::Internal(format!(
2237                        "chunk_id must be non-negative for tantivy indexing: {}",
2238                        entry.chunk_id
2239                    ))
2240                })?;
2241                let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2242                    CoreError::Internal(format!(
2243                        "doc_id must be non-negative for tantivy indexing: {}",
2244                        entry.doc_id
2245                    ))
2246                })?;
2247
2248                let mut doc = TantivyDocument::default();
2249                doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2250                doc.add_u64(space_indexes.fields.doc_id, doc_id);
2251                doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2252                if let Some(title) = &entry.semantic_title {
2253                    doc.add_text(space_indexes.fields.title, title);
2254                }
2255                if let Some(heading) = &entry.heading {
2256                    doc.add_text(space_indexes.fields.heading, heading);
2257                }
2258                doc.add_text(space_indexes.fields.body, &entry.body);
2259                writer.add_document(doc)?;
2260            }
2261            Ok(())
2262        })
2263    }
2264
2265    pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2266        if chunk_ids.is_empty() {
2267            return Ok(());
2268        }
2269
2270        let space_indexes = self.get_space_indexes(space)?;
2271        with_tantivy_writer(&space_indexes, |writer| {
2272            for chunk_id in chunk_ids {
2273                let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2274                    CoreError::Internal(format!(
2275                        "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2276                    ))
2277                })?;
2278                writer.delete_term(Term::from_field_u64(
2279                    space_indexes.fields.chunk_id,
2280                    chunk_key,
2281                ));
2282            }
2283
2284            Ok(())
2285        })
2286    }
2287
2288    pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2289        let space_indexes = self.get_space_indexes(space)?;
2290        with_tantivy_writer(&space_indexes, |writer| {
2291            let doc_key = u64::try_from(doc_id).map_err(|_| {
2292                CoreError::Internal(format!(
2293                    "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2294                ))
2295            })?;
2296            writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2297            Ok(())
2298        })
2299    }
2300
2301    pub fn query_bm25(
2302        &self,
2303        space: &str,
2304        query: &str,
2305        fields: &[(&str, f32)],
2306        limit: usize,
2307    ) -> Result<Vec<BM25Hit>> {
2308        self.query_bm25_filtered(space, query, fields, None, limit, true)
2309    }
2310
2311    pub(crate) fn query_bm25_cached(
2312        &self,
2313        space: &str,
2314        query: &str,
2315        fields: &[(&str, f32)],
2316        limit: usize,
2317    ) -> Result<Vec<BM25Hit>> {
2318        self.query_bm25_filtered(space, query, fields, None, limit, false)
2319    }
2320
2321    pub fn query_bm25_in_documents(
2322        &self,
2323        space: &str,
2324        query: &str,
2325        fields: &[(&str, f32)],
2326        document_ids: &[i64],
2327        limit: usize,
2328    ) -> Result<Vec<BM25Hit>> {
2329        if document_ids.is_empty() {
2330            return Ok(Vec::new());
2331        }
2332
2333        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2334    }
2335
2336    pub(crate) fn query_bm25_in_documents_cached(
2337        &self,
2338        space: &str,
2339        query: &str,
2340        fields: &[(&str, f32)],
2341        document_ids: &[i64],
2342        limit: usize,
2343    ) -> Result<Vec<BM25Hit>> {
2344        if document_ids.is_empty() {
2345            return Ok(Vec::new());
2346        }
2347
2348        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2349    }
2350
2351    fn query_bm25_filtered(
2352        &self,
2353        space: &str,
2354        query: &str,
2355        fields: &[(&str, f32)],
2356        document_ids: Option<&[i64]>,
2357        limit: usize,
2358        reload_reader: bool,
2359    ) -> Result<Vec<BM25Hit>> {
2360        if limit == 0 {
2361            return Ok(Vec::new());
2362        }
2363
2364        if fields.is_empty() {
2365            return Err(CoreError::Internal(
2366                "bm25 query requires at least one field".to_string(),
2367            ));
2368        }
2369
2370        let space_indexes = self.get_space_indexes(space)?;
2371
2372        let schema = space_indexes.tantivy_index.schema();
2373        let query_fields = fields
2374            .iter()
2375            .map(|(name, boost)| {
2376                let field = resolve_tantivy_field(space_indexes.fields, name)?;
2377                let field_entry = schema.get_field_entry(field);
2378                let index_record_option = field_entry
2379                    .field_type()
2380                    .get_index_record_option()
2381                    .ok_or_else(|| {
2382                        CoreError::Internal(format!(
2383                            "bm25 field '{}' is not indexed",
2384                            field_entry.name()
2385                        ))
2386                    })?;
2387                Ok(Bm25FieldSpec {
2388                    field,
2389                    boost: *boost,
2390                    index_record_option,
2391                })
2392            })
2393            .collect::<Result<Vec<_>>>()?;
2394        let Some(parsed_query) =
2395            build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2396        else {
2397            return Ok(Vec::new());
2398        };
2399        let parsed_query = if let Some(document_ids) = document_ids {
2400            Box::new(BooleanQuery::new(vec![
2401                (Occur::Must, parsed_query),
2402                (
2403                    Occur::Must,
2404                    build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2405                ),
2406            ])) as Box<dyn Query>
2407        } else {
2408            parsed_query
2409        };
2410        if reload_reader {
2411            space_indexes.tantivy_reader.reload()?;
2412        }
2413        let searcher = space_indexes.tantivy_reader.searcher();
2414        let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2415
2416        let mut hits = Vec::with_capacity(docs.len());
2417        let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2418        let mut chunk_id_columns = HashMap::new();
2419        for (score, address) in docs {
2420            let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2421                std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2422                std::collections::hash_map::Entry::Vacant(entry) => {
2423                    let column = searcher
2424                        .segment_reader(address.segment_ord)
2425                        .fast_fields()
2426                        .u64(chunk_id_field_name)?;
2427                    entry.insert(column)
2428                }
2429            };
2430            let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2431                CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2432            })?;
2433            let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2434                CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2435            })?;
2436            hits.push(BM25Hit { chunk_id, score });
2437        }
2438
2439        Ok(hits)
2440    }
2441
2442    pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2443        let space_indexes = self.get_space_indexes(space)?;
2444        space_indexes.tantivy_reader.reload()?;
2445        Ok(())
2446    }
2447
2448    pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2449        let space_indexes = self.get_space_indexes(space)?;
2450        let mut writer = space_indexes
2451            .tantivy_writer
2452            .lock()
2453            .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2454        let Some(mut writer) = writer.take() else {
2455            return Ok(());
2456        };
2457        writer.commit()?;
2458        space_indexes.tantivy_reader.reload()?;
2459        Ok(())
2460    }
2461
2462    pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2463        self.batch_insert_usearch(space, &[(key, vector)])
2464    }
2465
2466    pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2467        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2468    }
2469
2470    pub(crate) fn batch_insert_usearch_deferred(
2471        &self,
2472        space: &str,
2473        entries: &[(i64, &[f32])],
2474    ) -> Result<()> {
2475        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2476    }
2477
2478    fn batch_insert_usearch_with_save_mode(
2479        &self,
2480        space: &str,
2481        entries: &[(i64, &[f32])],
2482        save_mode: UsearchSaveMode,
2483    ) -> Result<()> {
2484        if entries.is_empty() {
2485            return Ok(());
2486        }
2487
2488        let space_indexes = self.get_space_indexes(space)?;
2489
2490        let expected_dimensions = entries[0].1.len();
2491        if expected_dimensions == 0 {
2492            return Err(CoreError::Internal(
2493                "cannot insert empty vector into usearch index".to_string(),
2494            ));
2495        }
2496        for (_, vector) in entries {
2497            if vector.len() != expected_dimensions {
2498                return Err(CoreError::Internal(format!(
2499                    "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2500                    vector.len()
2501                )));
2502            }
2503        }
2504
2505        let mut index = space_indexes
2506            .usearch_index
2507            .write()
2508            .map_err(|_| CoreError::poisoned("usearch index"))?;
2509        let ensure_started = std::time::Instant::now();
2510        ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2511        crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2512
2513        let target_capacity = index.size().saturating_add(entries.len());
2514        let reserve_started = std::time::Instant::now();
2515        index.reserve(target_capacity).map_err(|err| {
2516            CoreError::Internal(format!(
2517                "usearch reserve failed for {target_capacity} items: {err}"
2518            ))
2519        })?;
2520        crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2521
2522        if matches!(save_mode, UsearchSaveMode::Deferred) {
2523            self.mark_usearch_dirty(space)?;
2524        }
2525
2526        let add_started = std::time::Instant::now();
2527        for (key, vector) in entries {
2528            let key = u64::try_from(*key).map_err(|_| {
2529                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2530            })?;
2531            index
2532                .add::<f32>(key, vector)
2533                .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2534        }
2535        crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2536
2537        if matches!(save_mode, UsearchSaveMode::Immediate) {
2538            let save_started = std::time::Instant::now();
2539            save_usearch_index(&index, &space_indexes.usearch_path)?;
2540            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2541        }
2542        Ok(())
2543    }
2544
2545    pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2546        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2547    }
2548
2549    pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2550        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2551    }
2552
2553    fn delete_usearch_with_save_mode(
2554        &self,
2555        space: &str,
2556        keys: &[i64],
2557        save_mode: UsearchSaveMode,
2558    ) -> Result<()> {
2559        if keys.is_empty() {
2560            return Ok(());
2561        }
2562
2563        let space_indexes = self.get_space_indexes(space)?;
2564        let index = space_indexes
2565            .usearch_index
2566            .write()
2567            .map_err(|_| CoreError::poisoned("usearch index"))?;
2568
2569        if matches!(save_mode, UsearchSaveMode::Deferred) {
2570            self.mark_usearch_dirty(space)?;
2571        }
2572
2573        let delete_started = std::time::Instant::now();
2574        for key in keys {
2575            let key = u64::try_from(*key).map_err(|_| {
2576                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2577            })?;
2578            index
2579                .remove(key)
2580                .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2581        }
2582        crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2583
2584        if matches!(save_mode, UsearchSaveMode::Immediate) {
2585            let save_started = std::time::Instant::now();
2586            save_usearch_index(&index, &space_indexes.usearch_path)?;
2587            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2588        }
2589        Ok(())
2590    }
2591
2592    pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2593        let spaces = {
2594            let mut dirty = self
2595                .dirty_usearch_spaces
2596                .lock()
2597                .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2598            if dirty.is_empty() {
2599                return Ok(());
2600            }
2601
2602            let mut spaces = dirty.drain().collect::<Vec<_>>();
2603            spaces.sort();
2604            spaces
2605        };
2606
2607        for (index, space) in spaces.iter().enumerate() {
2608            if let Err(err) = self.save_usearch_for_space(space) {
2609                let mut dirty = self
2610                    .dirty_usearch_spaces
2611                    .lock()
2612                    .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2613                for unsaved in &spaces[index..] {
2614                    dirty.insert(unsaved.clone());
2615                }
2616                return Err(err);
2617            }
2618            crate::profile::increment_update_count("usearch_saved_spaces", 1);
2619        }
2620
2621        Ok(())
2622    }
2623
2624    fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2625        self.dirty_usearch_spaces
2626            .lock()
2627            .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2628            .insert(space.to_string());
2629        Ok(())
2630    }
2631
2632    fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2633        let space_indexes = self.get_space_indexes(space)?;
2634        let index = space_indexes
2635            .usearch_index
2636            .read()
2637            .map_err(|_| CoreError::poisoned("usearch index"))?;
2638
2639        let save_started = std::time::Instant::now();
2640        save_usearch_index(&index, &space_indexes.usearch_path)?;
2641        crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2642        Ok(())
2643    }
2644
2645    pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2646        self.query_dense_filtered(space, vector, None, limit)
2647    }
2648
2649    pub fn query_dense_in_chunks(
2650        &self,
2651        space: &str,
2652        vector: &[f32],
2653        chunk_ids: &[i64],
2654        limit: usize,
2655    ) -> Result<Vec<DenseHit>> {
2656        if chunk_ids.is_empty() {
2657            return Ok(Vec::new());
2658        }
2659
2660        let allowed_keys = chunk_ids
2661            .iter()
2662            .map(|chunk_id| {
2663                u64::try_from(*chunk_id).map_err(|_| {
2664                    CoreError::Internal(format!(
2665                        "chunk_id must be non-negative for usearch query: {chunk_id}"
2666                    ))
2667                })
2668            })
2669            .collect::<Result<HashSet<_>>>()?;
2670        self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2671    }
2672
2673    pub(crate) fn query_dense_in_key_set(
2674        &self,
2675        space: &str,
2676        vector: &[f32],
2677        allowed_keys: &HashSet<u64>,
2678        limit: usize,
2679    ) -> Result<Vec<DenseHit>> {
2680        if allowed_keys.is_empty() {
2681            return Ok(Vec::new());
2682        }
2683
2684        self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2685    }
2686
2687    fn query_dense_filtered(
2688        &self,
2689        space: &str,
2690        vector: &[f32],
2691        allowed_keys: Option<&HashSet<u64>>,
2692        limit: usize,
2693    ) -> Result<Vec<DenseHit>> {
2694        if limit == 0 {
2695            return Ok(Vec::new());
2696        }
2697        if vector.is_empty() {
2698            return Err(CoreError::Internal(
2699                "cannot query usearch with empty vector".to_string(),
2700            ));
2701        }
2702
2703        let space_indexes = self.get_space_indexes(space)?;
2704        let index = space_indexes
2705            .usearch_index
2706            .read()
2707            .map_err(|_| CoreError::poisoned("usearch index"))?;
2708
2709        if index.size() == 0 {
2710            return Ok(Vec::new());
2711        }
2712        if vector.len() != index.dimensions() {
2713            return Err(CoreError::Internal(format!(
2714                "query vector dimension mismatch: expected {}, got {}",
2715                index.dimensions(),
2716                vector.len()
2717            )));
2718        }
2719
2720        let matches = if let Some(allowed_keys) = allowed_keys {
2721            index
2722                .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2723                .map_err(|err| {
2724                    CoreError::Internal(format!("usearch filtered query failed: {err}"))
2725                })?
2726        } else {
2727            index
2728                .search::<f32>(vector, limit)
2729                .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2730        };
2731        let hits = matches
2732            .keys
2733            .into_iter()
2734            .zip(matches.distances)
2735            .map(|(key, distance)| DenseHit {
2736                chunk_id: key as i64,
2737                distance,
2738            })
2739            .collect();
2740        Ok(hits)
2741    }
2742
2743    pub fn count_usearch(&self, space: &str) -> Result<usize> {
2744        let space_indexes = self.get_space_indexes(space)?;
2745        let index = space_indexes
2746            .usearch_index
2747            .read()
2748            .map_err(|_| CoreError::poisoned("usearch index"))?;
2749        Ok(index.size())
2750    }
2751
2752    pub fn clear_usearch(&self, space: &str) -> Result<()> {
2753        let space_indexes = self.get_space_indexes(space)?;
2754        let index = space_indexes
2755            .usearch_index
2756            .write()
2757            .map_err(|_| CoreError::poisoned("usearch index"))?;
2758        let clear_started = std::time::Instant::now();
2759        index
2760            .reset()
2761            .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2762        std::fs::File::create(&space_indexes.usearch_path)?;
2763        crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2764        Ok(())
2765    }
2766
2767    pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2768        self.get_fts_dirty_documents_filtered("", Vec::new())
2769    }
2770
2771    pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2772        self.get_fts_dirty_documents_filtered(
2773            " AND c.space_id = ?",
2774            vec![SqlValue::Integer(space_id)],
2775        )
2776    }
2777
2778    pub fn get_fts_dirty_documents_in_collections(
2779        &self,
2780        collection_ids: &[i64],
2781    ) -> Result<Vec<FtsDirtyRecord>> {
2782        if collection_ids.is_empty() {
2783            return Ok(Vec::new());
2784        }
2785
2786        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2787        let clause = format!(" AND d.collection_id IN ({placeholders})");
2788        let params = collection_ids
2789            .iter()
2790            .map(|id| SqlValue::Integer(*id))
2791            .collect::<Vec<_>>();
2792        self.get_fts_dirty_documents_filtered(&clause, params)
2793    }
2794
2795    fn get_fts_dirty_documents_filtered(
2796        &self,
2797        scope_clause: &str,
2798        scope_params: Vec<SqlValue>,
2799    ) -> Result<Vec<FtsDirtyRecord>> {
2800        let conn = self
2801            .db
2802            .lock()
2803            .map_err(|_| CoreError::poisoned("database"))?;
2804
2805        let sql = format!(
2806            "SELECT d.id, d.path, d.title, d.title_source, d.hash, c.path, s.name
2807             FROM documents d
2808             JOIN collections c ON c.id = d.collection_id
2809             JOIN spaces s ON s.id = c.space_id
2810             WHERE d.fts_dirty = 1{scope_clause}
2811             ORDER BY d.id ASC"
2812        );
2813        let mut stmt = conn.prepare(&sql)?;
2814        let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2815            Ok((
2816                row.get::<_, i64>(0)?,
2817                row.get::<_, String>(1)?,
2818                row.get::<_, String>(2)?,
2819                row.get::<_, String>(3)?,
2820                row.get::<_, String>(4)?,
2821                row.get::<_, String>(5)?,
2822                row.get::<_, String>(6)?,
2823            ))
2824        })?;
2825        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2826        drop(stmt);
2827
2828        let mut records = Vec::with_capacity(headers.len());
2829        for (
2830            doc_id,
2831            doc_path,
2832            doc_title,
2833            doc_title_source,
2834            doc_hash,
2835            collection_path,
2836            space_name,
2837        ) in headers
2838        {
2839            let chunks = load_chunks_for_doc(&conn, doc_id)?;
2840            records.push(FtsDirtyRecord {
2841                doc_id,
2842                doc_path,
2843                doc_title,
2844                doc_title_source: DocumentTitleSource::from_sql(&doc_title_source)?,
2845                doc_hash,
2846                collection_path: PathBuf::from(collection_path),
2847                space_name,
2848                chunks,
2849            });
2850        }
2851
2852        Ok(records)
2853    }
2854
2855    pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2856        if doc_ids.is_empty() {
2857            return Ok(());
2858        }
2859
2860        let conn = self
2861            .db
2862            .lock()
2863            .map_err(|_| CoreError::poisoned("database"))?;
2864
2865        let placeholders = vec!["?"; doc_ids.len()].join(", ");
2866        let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2867        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2868        Ok(())
2869    }
2870
2871    pub fn count_documents_in_collection(
2872        &self,
2873        collection_id: i64,
2874        active_only: bool,
2875    ) -> Result<usize> {
2876        let conn = self
2877            .db
2878            .lock()
2879            .map_err(|_| CoreError::poisoned("database"))?;
2880        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2881        let active_only = i64::from(active_only);
2882        query_count(
2883            &conn,
2884            "SELECT COUNT(*)
2885             FROM documents
2886             WHERE collection_id = ?1
2887               AND (?2 = 0 OR active = 1)",
2888            params![collection_id, active_only],
2889        )
2890    }
2891
2892    pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2893        let conn = self
2894            .db
2895            .lock()
2896            .map_err(|_| CoreError::poisoned("database"))?;
2897        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2898
2899        query_count(
2900            &conn,
2901            "SELECT COUNT(*)
2902             FROM chunks c
2903             JOIN documents d ON d.id = c.doc_id
2904             WHERE d.collection_id = ?1",
2905            params![collection_id],
2906        )
2907    }
2908
2909    pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2910        let conn = self
2911            .db
2912            .lock()
2913            .map_err(|_| CoreError::poisoned("database"))?;
2914        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2915
2916        query_count(
2917            &conn,
2918            "SELECT COUNT(DISTINCT e.chunk_id)
2919             FROM embeddings e
2920             JOIN chunks c ON c.id = e.chunk_id
2921             JOIN documents d ON d.id = c.doc_id
2922             WHERE d.collection_id = ?1",
2923            params![collection_id],
2924        )
2925    }
2926
2927    pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2928        let conn = self
2929            .db
2930            .lock()
2931            .map_err(|_| CoreError::poisoned("database"))?;
2932
2933        match space_id {
2934            Some(space_id) => {
2935                let _space_name = lookup_space_name(&conn, space_id)?;
2936                query_count(
2937                    &conn,
2938                    "SELECT COUNT(*)
2939                     FROM documents d
2940                     JOIN collections c ON c.id = d.collection_id
2941                     WHERE c.space_id = ?1",
2942                    params![space_id],
2943                )
2944            }
2945            None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2946        }
2947    }
2948
2949    pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2950        let conn = self
2951            .db
2952            .lock()
2953            .map_err(|_| CoreError::poisoned("database"))?;
2954
2955        match space_id {
2956            Some(space_id) => {
2957                let _space_name = lookup_space_name(&conn, space_id)?;
2958                query_count(
2959                    &conn,
2960                    "SELECT COUNT(*)
2961                     FROM chunks c
2962                     JOIN documents d ON d.id = c.doc_id
2963                     JOIN collections col ON col.id = d.collection_id
2964                     WHERE col.space_id = ?1",
2965                    params![space_id],
2966                )
2967            }
2968            None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2969        }
2970    }
2971
2972    pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2973        let conn = self
2974            .db
2975            .lock()
2976            .map_err(|_| CoreError::poisoned("database"))?;
2977
2978        match space_id {
2979            Some(space_id) => {
2980                let _space_name = lookup_space_name(&conn, space_id)?;
2981                query_count(
2982                    &conn,
2983                    "SELECT COUNT(DISTINCT e.chunk_id)
2984                     FROM embeddings e
2985                     JOIN chunks c ON c.id = e.chunk_id
2986                     JOIN documents d ON d.id = c.doc_id
2987                     JOIN collections col ON col.id = d.collection_id
2988                     WHERE col.space_id = ?1",
2989                    params![space_id],
2990                )
2991            }
2992            None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2993        }
2994    }
2995
2996    pub fn disk_usage(&self) -> Result<DiskUsage> {
2997        let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2998
2999        let mut tantivy_bytes = 0_u64;
3000        let mut usearch_bytes = 0_u64;
3001        let spaces_dir = self.cache_dir.join(SPACES_DIR);
3002        if spaces_dir.exists() {
3003            for entry in std::fs::read_dir(&spaces_dir)? {
3004                let space_dir = entry?.path();
3005                if !space_dir.is_dir() {
3006                    continue;
3007                }
3008
3009                tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
3010                usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
3011            }
3012        }
3013
3014        let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
3015        let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
3016
3017        Ok(DiskUsage {
3018            sqlite_bytes,
3019            tantivy_bytes,
3020            usearch_bytes,
3021            models_bytes,
3022            total_bytes,
3023        })
3024    }
3025
3026    fn unload_space(&self, name: &str) -> Result<()> {
3027        let mut spaces = self
3028            .spaces
3029            .write()
3030            .map_err(|_| CoreError::poisoned("spaces"))?;
3031        spaces.remove(name);
3032        Ok(())
3033    }
3034
3035    fn remove_space_artifacts(&self, name: &str) -> Result<()> {
3036        let space_root = self.space_root_path(name);
3037        if space_root.exists() {
3038            std::fs::remove_dir_all(space_root)?;
3039        }
3040        Ok(())
3041    }
3042
3043    fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
3044        let old_root = self.space_root_path(old);
3045        let new_root = self.space_root_path(new);
3046        if !old_root.exists() {
3047            return Ok(());
3048        }
3049
3050        if new_root.exists() {
3051            return Err(CoreError::Internal(format!(
3052                "cannot rename space artifacts: destination already exists: {}",
3053                new_root.display()
3054            )));
3055        }
3056
3057        if let Some(parent) = new_root.parent() {
3058            std::fs::create_dir_all(parent)?;
3059        }
3060        std::fs::rename(old_root, new_root)?;
3061        Ok(())
3062    }
3063
3064    fn space_root_path(&self, name: &str) -> PathBuf {
3065        self.cache_dir.join(SPACES_DIR).join(name)
3066    }
3067
3068    fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3069        let space_root = self.space_root_path(name);
3070        let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3071        let usearch_path = space_root.join(USEARCH_FILENAME);
3072        (tantivy_dir, usearch_path)
3073    }
3074
3075    fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3076        self.open_space(name)?;
3077        let spaces = self
3078            .spaces
3079            .read()
3080            .map_err(|_| CoreError::poisoned("spaces"))?;
3081        spaces.get(name).cloned().ok_or_else(|| {
3082            KboltError::SpaceNotFound {
3083                name: name.to_string(),
3084            }
3085            .into()
3086        })
3087    }
3088}
3089
3090fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3091    let result = conn.query_row(
3092        "SELECT name FROM spaces WHERE id = ?1",
3093        params![space_id],
3094        |row| row.get::<_, String>(0),
3095    );
3096    match result {
3097        Ok(name) => Ok(name),
3098        Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3099            name: format!("id={space_id}"),
3100        }
3101        .into()),
3102        Err(err) => Err(err.into()),
3103    }
3104}
3105
3106fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3107    let result = conn.query_row(
3108        "SELECT name FROM collections WHERE id = ?1",
3109        params![collection_id],
3110        |row| row.get::<_, String>(0),
3111    );
3112    match result {
3113        Ok(name) => Ok(name),
3114        Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3115            name: format!("id={collection_id}"),
3116        }
3117        .into()),
3118        Err(err) => Err(err.into()),
3119    }
3120}
3121
3122fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3123    let result = conn.query_row(
3124        "SELECT id FROM documents WHERE id = ?1",
3125        params![doc_id],
3126        |row| row.get::<_, i64>(0),
3127    );
3128    match result {
3129        Ok(id) => Ok(id),
3130        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3131            path: format!("id={doc_id}"),
3132        }
3133        .into()),
3134        Err(err) => Err(err.into()),
3135    }
3136}
3137
3138fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3139    let mut stmt = conn.prepare(
3140        "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3141         FROM chunks
3142         WHERE doc_id = ?1
3143         ORDER BY seq ASC",
3144    )?;
3145    let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3146    let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3147    Ok(chunks)
3148}
3149
3150fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3151    let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3152    let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3153    let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3154    Ok(chunk_ids)
3155}
3156
3157fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3158    let result = conn.query_row(
3159        "SELECT doc_id FROM chunks WHERE id = ?1",
3160        params![chunk_id],
3161        |row| row.get::<_, i64>(0),
3162    );
3163    match result {
3164        Ok(doc_id) => Ok(doc_id),
3165        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3166            path: format!("chunk_id={chunk_id}"),
3167        }
3168        .into()),
3169        Err(err) => Err(err.into()),
3170    }
3171}
3172
3173fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3174    let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3175    Ok(count as usize)
3176}
3177
3178fn file_size_or_zero(path: &Path) -> Result<u64> {
3179    if !path.exists() {
3180        return Ok(0);
3181    }
3182
3183    let metadata = std::fs::metadata(path)?;
3184    if metadata.is_file() {
3185        Ok(metadata.len())
3186    } else {
3187        Ok(0)
3188    }
3189}
3190
3191fn dir_size_or_zero(path: &Path) -> Result<u64> {
3192    if !path.exists() {
3193        return Ok(0);
3194    }
3195
3196    let mut total = 0_u64;
3197    for entry in std::fs::read_dir(path)? {
3198        let entry = entry?;
3199        let child_path = entry.path();
3200        let metadata = std::fs::symlink_metadata(&child_path)?;
3201        if metadata.is_file() {
3202            total += metadata.len();
3203        } else if metadata.is_dir() {
3204            total += dir_size_or_zero(&child_path)?;
3205        }
3206    }
3207
3208    Ok(total)
3209}
3210
3211fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3212    let meta_path = path.join("meta.json");
3213    if meta_path.exists() {
3214        return Ok(Index::open_in_dir(path)?);
3215    }
3216
3217    Ok(Index::create_in_dir(path, tantivy_schema())?)
3218}
3219
3220fn with_tantivy_writer<T>(
3221    space_indexes: &SpaceIndexes,
3222    f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3223) -> Result<T> {
3224    let mut writer = space_indexes
3225        .tantivy_writer
3226        .lock()
3227        .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3228
3229    if writer.is_none() {
3230        *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3231    }
3232
3233    let writer = writer
3234        .as_mut()
3235        .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3236    f(writer)
3237}
3238
3239fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3240    if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3241        return Ok(());
3242    }
3243
3244    let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3245    if document_count == 0 {
3246        return Ok(());
3247    }
3248
3249    Err(KboltError::Config(
3250        "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3251    )
3252    .into())
3253}
3254
3255fn ensure_schema_version(conn: &Connection) -> Result<()> {
3256    let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3257    if current > SCHEMA_VERSION {
3258        return Err(KboltError::Config(format!(
3259            "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3260        ))
3261        .into());
3262    }
3263
3264    if current < SCHEMA_VERSION {
3265        conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3266    }
3267
3268    Ok(())
3269}
3270
3271fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3272    let exists = conn.query_row(
3273        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3274        [table],
3275        |row| row.get::<_, i64>(0),
3276    )?;
3277    Ok(exists != 0)
3278}
3279
3280fn ensure_documents_title_source_column(conn: &Connection) -> Result<()> {
3281    let mut stmt = conn.prepare("PRAGMA table_info(documents)")?;
3282    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3283    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3284    drop(stmt);
3285
3286    if columns.iter().any(|column| column == "title_source") {
3287        return Ok(());
3288    }
3289
3290    conn.execute(
3291        "ALTER TABLE documents ADD COLUMN title_source TEXT NOT NULL DEFAULT 'extracted'",
3292        [],
3293    )?;
3294    Ok(())
3295}
3296
3297fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3298    let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3299    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3300    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3301    drop(stmt);
3302
3303    if columns.iter().any(|column| column == "generation_key") {
3304        return Ok(());
3305    }
3306
3307    conn.execute(
3308        "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3309        [],
3310    )?;
3311    Ok(())
3312}
3313
3314fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3315    let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3316    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3317    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3318    drop(stmt);
3319
3320    if columns.iter().any(|column| column == "retrieval_prefix") {
3321        return Ok(());
3322    }
3323
3324    conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3325    Ok(())
3326}
3327
3328fn build_literal_bm25_query(
3329    index: &Index,
3330    fields: &[Bm25FieldSpec],
3331    query: &str,
3332) -> Result<Option<Box<dyn Query>>> {
3333    let mut clauses = Vec::new();
3334    for field in fields {
3335        for token in analyzed_terms_for_field(index, field.field, query)? {
3336            let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3337                Term::from_field_text(field.field, &token),
3338                field.index_record_option,
3339            ));
3340            let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3341                Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3342            } else {
3343                term_query
3344            };
3345            clauses.push((Occur::Should, query));
3346        }
3347    }
3348
3349    if clauses.is_empty() {
3350        Ok(None)
3351    } else {
3352        Ok(Some(Box::new(BooleanQuery::new(clauses))))
3353    }
3354}
3355
3356fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3357    let mut terms = Vec::new();
3358    let mut seen = HashSet::new();
3359    for doc_id in document_ids {
3360        let doc_id = u64::try_from(*doc_id).map_err(|_| {
3361            CoreError::Internal(format!(
3362                "doc_id must be non-negative for tantivy query: {doc_id}"
3363            ))
3364        })?;
3365        if seen.insert(doc_id) {
3366            terms.push(Term::from_field_u64(field, doc_id));
3367        }
3368    }
3369
3370    Ok(Box::new(ConstScoreQuery::new(
3371        Box::new(TermSetQuery::new(terms)),
3372        0.0,
3373    )))
3374}
3375
3376fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3377    let mut analyzer = index.tokenizer_for_field(field)?;
3378    let mut stream = analyzer.token_stream(query);
3379    let mut terms = Vec::new();
3380    let mut seen = HashSet::new();
3381    while let Some(token) = stream.next() {
3382        if token.text.is_empty() {
3383            continue;
3384        }
3385        let text = token.text.clone();
3386        if seen.insert(text.clone()) {
3387            terms.push(text);
3388        }
3389    }
3390    Ok(terms)
3391}
3392
3393fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3394    let options = IndexOptions {
3395        dimensions,
3396        metric: MetricKind::Cos,
3397        quantization: ScalarKind::F32,
3398        connectivity: 16,
3399        expansion_add: 200,
3400        expansion_search: 100,
3401        ..IndexOptions::default()
3402    };
3403    usearch::Index::new(&options)
3404        .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3405}
3406
3407fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3408    let index = new_usearch_index(256)?;
3409    let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3410    if file_size > 0 {
3411        let path = path
3412            .to_str()
3413            .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3414        index
3415            .load(path)
3416            .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3417    }
3418    Ok(index)
3419}
3420
3421fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3422    if index.size() == 0 && index.dimensions() != expected_dimensions {
3423        *index = new_usearch_index(expected_dimensions)?;
3424        return Ok(());
3425    }
3426
3427    if index.dimensions() != expected_dimensions {
3428        return Err(CoreError::Internal(format!(
3429            "usearch vector dimension mismatch: index expects {}, got {}",
3430            index.dimensions(),
3431            expected_dimensions
3432        )));
3433    }
3434    Ok(())
3435}
3436
3437fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3438    if index.size() == 0 {
3439        std::fs::File::create(path)?;
3440        return Ok(());
3441    }
3442
3443    let path = path
3444        .to_str()
3445        .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3446    index
3447        .save(path)
3448        .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3449    Ok(())
3450}
3451
3452fn tantivy_schema() -> tantivy::schema::Schema {
3453    let mut builder = tantivy::schema::Schema::builder();
3454    builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3455    builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3456    builder.add_text_field("filepath", TEXT | STORED);
3457    builder.add_text_field("title", TEXT | STORED);
3458    builder.add_text_field("heading", TEXT | STORED);
3459    builder.add_text_field("body", TEXT);
3460    builder.build()
3461}
3462
3463fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3464    Ok(TantivyFields {
3465        chunk_id: schema.get_field("chunk_id").map_err(|_| {
3466            CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3467        })?,
3468        doc_id: schema
3469            .get_field("doc_id")
3470            .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3471        filepath: schema.get_field("filepath").map_err(|_| {
3472            CoreError::Internal("tantivy schema missing field: filepath".to_string())
3473        })?,
3474        title: schema
3475            .get_field("title")
3476            .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3477        heading: schema.get_field("heading").map_err(|_| {
3478            CoreError::Internal("tantivy schema missing field: heading".to_string())
3479        })?,
3480        body: schema
3481            .get_field("body")
3482            .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3483    })
3484}
3485
3486fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3487    match name {
3488        "chunk_id" => Ok(fields.chunk_id),
3489        "doc_id" => Ok(fields.doc_id),
3490        "filepath" => Ok(fields.filepath),
3491        "title" => Ok(fields.title),
3492        "heading" => Ok(fields.heading),
3493        "body" => Ok(fields.body),
3494        other => Err(CoreError::Internal(format!(
3495            "unsupported tantivy field: {other}"
3496        ))),
3497    }
3498}
3499
3500fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3501    match extensions {
3502        None => Ok(None),
3503        Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3504    }
3505}
3506
3507fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3508    match raw {
3509        None => Ok(None),
3510        Some(json) => serde_json::from_str::<Vec<String>>(&json)
3511            .map(Some)
3512            .map_err(Into::into),
3513    }
3514}
3515
3516fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3517    Ok(SpaceRow {
3518        id: row.get(0)?,
3519        name: row.get(1)?,
3520        description: row.get(2)?,
3521        created: row.get(3)?,
3522    })
3523}
3524
3525fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3526    let raw_extensions: Option<String> = row.get(5)?;
3527    let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3528        Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3529    })?;
3530    Ok(CollectionRow {
3531        id: row.get(0)?,
3532        space_id: row.get(1)?,
3533        name: row.get(2)?,
3534        path: PathBuf::from(row.get::<_, String>(3)?),
3535        description: row.get(4)?,
3536        extensions,
3537        created: row.get(6)?,
3538        updated: row.get(7)?,
3539    })
3540}
3541
3542fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3543    decode_document_row_at(row, 0)
3544}
3545
3546fn decode_document_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<DocumentRow> {
3547    let raw_title_source: String = row.get(base + 4)?;
3548    let title_source = DocumentTitleSource::from_sql(&raw_title_source).map_err(|err| {
3549        Error::FromSqlConversionFailure(base + 4, rusqlite::types::Type::Text, Box::new(err))
3550    })?;
3551    let active_value: i64 = row.get(base + 7)?;
3552    let fts_dirty_value: i64 = row.get(base + 9)?;
3553    Ok(DocumentRow {
3554        id: row.get(base)?,
3555        collection_id: row.get(base + 1)?,
3556        path: row.get(base + 2)?,
3557        title: row.get(base + 3)?,
3558        title_source,
3559        hash: row.get(base + 5)?,
3560        modified: row.get(base + 6)?,
3561        active: active_value != 0,
3562        deactivated_at: row.get(base + 8)?,
3563        fts_dirty: fts_dirty_value != 0,
3564    })
3565}
3566
3567fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3568    Ok(DocumentTextRow {
3569        doc_id: row.get(0)?,
3570        extractor_key: row.get(1)?,
3571        source_hash: row.get(2)?,
3572        text_hash: row.get(3)?,
3573        generation_key: row.get(4)?,
3574        text: row.get(5)?,
3575        created: row.get(6)?,
3576    })
3577}
3578
3579fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3580    decode_chunk_row_at(row, 0)
3581}
3582
3583fn decode_chunk_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<ChunkRow> {
3584    let offset_value: i64 = row.get(base + 3)?;
3585    let length_value: i64 = row.get(base + 4)?;
3586    let kind_raw: String = row.get(base + 6)?;
3587    let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3588        Error::FromSqlConversionFailure(base + 6, rusqlite::types::Type::Text, Box::new(err))
3589    })?;
3590    Ok(ChunkRow {
3591        id: row.get(base)?,
3592        doc_id: row.get(base + 1)?,
3593        seq: row.get(base + 2)?,
3594        offset: decode_non_negative_usize(offset_value, base + 3, "chunks.offset")?,
3595        length: decode_non_negative_usize(length_value, base + 4, "chunks.length")?,
3596        heading: row.get(base + 5)?,
3597        kind,
3598        retrieval_prefix: row.get(base + 7)?,
3599    })
3600}
3601
3602fn decode_non_negative_usize(
3603    value: i64,
3604    column: usize,
3605    name: &'static str,
3606) -> rusqlite::Result<usize> {
3607    if value < 0 {
3608        return Err(Error::FromSqlConversionFailure(
3609            column,
3610            SqlType::Integer,
3611            Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3612        ));
3613    }
3614
3615    Ok(value as usize)
3616}
3617
3618fn canonical_chunk_slice_from_bytes(
3619    chunk_id: i64,
3620    offset_value: i64,
3621    length_value: i64,
3622    document_len_value: i64,
3623    start_byte: Vec<u8>,
3624    end_byte: Vec<u8>,
3625    bytes: Vec<u8>,
3626) -> Result<String> {
3627    if offset_value < 0 {
3628        return Err(CoreError::Internal(format!(
3629            "chunk {chunk_id} offset must not be negative"
3630        )));
3631    }
3632    if length_value < 0 {
3633        return Err(CoreError::Internal(format!(
3634            "chunk {chunk_id} length must not be negative"
3635        )));
3636    }
3637    if document_len_value < 0 {
3638        return Err(CoreError::Internal(format!(
3639            "document text length for chunk {chunk_id} must not be negative"
3640        )));
3641    }
3642
3643    let offset = offset_value as usize;
3644    let length = length_value as usize;
3645    let document_len = document_len_value as usize;
3646    let end = offset.checked_add(length).ok_or_else(|| {
3647        CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3648    })?;
3649    if end > document_len {
3650        return Err(CoreError::Internal(format!(
3651            "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3652        )));
3653    }
3654    if bytes.len() != length {
3655        return Err(CoreError::Internal(format!(
3656            "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3657            bytes.len()
3658        )));
3659    }
3660    validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3661    validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3662
3663    String::from_utf8(bytes).map_err(|err| {
3664        CoreError::Internal(format!(
3665            "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3666        ))
3667    })
3668}
3669
3670fn validate_sqlite_utf8_boundary(
3671    chunk_id: i64,
3672    index: usize,
3673    document_len: usize,
3674    byte_at_index: &[u8],
3675) -> Result<()> {
3676    if index == 0 || index == document_len {
3677        return Ok(());
3678    }
3679    if byte_at_index.len() != 1 {
3680        return Err(CoreError::Internal(format!(
3681            "chunk {chunk_id} text boundary byte at {index} is missing"
3682        )));
3683    }
3684    if (0x80..0xC0).contains(&byte_at_index[0]) {
3685        return Err(CoreError::Internal(format!(
3686            "chunk {chunk_id} text span is not on UTF-8 boundaries"
3687        )));
3688    }
3689
3690    Ok(())
3691}
3692
3693pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3694    let label = format!("chunk {}", chunk.id);
3695    let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3696
3697    Ok(document_text[chunk.offset..end].to_string())
3698}
3699
3700fn validate_text_span(
3701    document_text: &str,
3702    offset: usize,
3703    length: usize,
3704    label: &str,
3705) -> Result<usize> {
3706    let end = offset
3707        .checked_add(length)
3708        .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3709    if end > document_text.len() {
3710        return Err(CoreError::Internal(format!(
3711            "{label} text span {}..{} exceeds document text length {}",
3712            offset,
3713            end,
3714            document_text.len()
3715        )));
3716    }
3717    if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3718        return Err(CoreError::Internal(format!(
3719            "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3720        )));
3721    }
3722
3723    Ok(end)
3724}
3725
3726pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3727    KboltError::Internal(format!(
3728        "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3729    ))
3730    .into()
3731}
3732
3733#[cfg(test)]
3734mod tests;