Skip to main content

kbolt_core/storage/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12    BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29    Immediate,
30    Deferred,
31}
32
33pub struct Storage {
34    db: Mutex<Connection>,
35    cache_dir: PathBuf,
36    spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37    dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42    pub id: i64,
43    pub name: String,
44    pub description: Option<String>,
45    pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50    pub id: i64,
51    pub space_id: i64,
52    pub name: String,
53    pub path: PathBuf,
54    pub description: Option<String>,
55    pub extensions: Option<Vec<String>>,
56    pub created: String,
57    pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62    pub id: i64,
63    pub collection_id: i64,
64    pub path: String,
65    pub title: String,
66    pub title_source: DocumentTitleSource,
67    pub hash: String,
68    pub modified: String,
69    pub active: bool,
70    pub deactivated_at: Option<String>,
71    pub fts_dirty: bool,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct FileListRow {
76    pub doc_id: i64,
77    pub path: String,
78    pub title: String,
79    pub hash: String,
80    pub active: bool,
81    pub chunk_count: usize,
82    pub embedded_chunk_count: usize,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChunkRow {
87    pub id: i64,
88    pub doc_id: i64,
89    pub seq: i32,
90    pub offset: usize,
91    pub length: usize,
92    pub heading: Option<String>,
93    pub kind: FinalChunkKind,
94    pub retrieval_prefix: Option<String>,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ChunkInsert {
99    pub seq: i32,
100    pub offset: usize,
101    pub length: usize,
102    pub heading: Option<String>,
103    pub kind: FinalChunkKind,
104    pub retrieval_prefix: Option<String>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DocumentTextRow {
109    pub doc_id: i64,
110    pub extractor_key: String,
111    pub source_hash: String,
112    pub text_hash: String,
113    pub generation_key: String,
114    pub text: String,
115    pub created: String,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ChunkTextRow {
120    pub chunk: ChunkRow,
121    pub extractor_key: String,
122    pub text: String,
123}
124
125pub struct DocumentGenerationReplace<'a> {
126    pub collection_id: i64,
127    pub path: &'a str,
128    pub title: &'a str,
129    pub title_source: DocumentTitleSource,
130    pub hash: &'a str,
131    pub modified: &'a str,
132    pub extractor_key: &'a str,
133    pub source_hash: &'a str,
134    pub text_hash: &'a str,
135    pub generation_key: &'a str,
136    pub text: &'a str,
137    pub chunks: &'a [ChunkInsert],
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct DocumentGenerationReplaceResult {
142    pub doc_id: i64,
143    pub old_chunk_ids: Vec<i64>,
144    pub chunk_ids: Vec<i64>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct EmbedRecord {
149    pub chunk: ChunkRow,
150    pub doc_path: String,
151    pub collection_path: PathBuf,
152    pub space_name: String,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FtsDirtyRecord {
157    pub doc_id: i64,
158    pub doc_path: String,
159    pub doc_title: String,
160    pub doc_title_source: DocumentTitleSource,
161    pub doc_hash: String,
162    pub collection_path: PathBuf,
163    pub space_name: String,
164    pub chunks: Vec<ChunkRow>,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct ReapableDocument {
169    pub doc_id: i64,
170    pub space_name: String,
171    pub chunk_ids: Vec<i64>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TantivyEntry {
176    pub chunk_id: i64,
177    pub doc_id: i64,
178    pub filepath: String,
179    pub semantic_title: Option<String>,
180    pub heading: Option<String>,
181    pub body: String,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum DocumentTitleSource {
186    Extracted,
187    FilenameFallback,
188}
189
190impl DocumentTitleSource {
191    pub fn as_sql(self) -> &'static str {
192        match self {
193            Self::Extracted => "extracted",
194            Self::FilenameFallback => "filename_fallback",
195        }
196    }
197
198    fn from_sql(raw: &str) -> std::result::Result<Self, KboltError> {
199        match raw {
200            "extracted" => Ok(Self::Extracted),
201            "filename_fallback" => Ok(Self::FilenameFallback),
202            other => Err(KboltError::InvalidInput(format!(
203                "invalid stored document title source: {other}"
204            ))),
205        }
206    }
207
208    pub fn semantic_title(self, title: &str) -> Option<&str> {
209        matches!(self, Self::Extracted)
210            .then_some(title.trim())
211            .filter(|title| !title.is_empty())
212    }
213}
214
215#[derive(Debug, Clone, PartialEq)]
216pub struct BM25Hit {
217    pub chunk_id: i64,
218    pub score: f32,
219}
220
221#[derive(Debug, Clone, PartialEq)]
222pub struct DenseHit {
223    pub chunk_id: i64,
224    pub distance: f32,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub struct SearchScopeSummary {
229    pub document_ids: Vec<i64>,
230    pub chunk_count: usize,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub enum SpaceResolution {
235    Found(SpaceRow),
236    Ambiguous(Vec<String>),
237    NotFound,
238}
239
240struct SpaceIndexes {
241    _tantivy_dir: PathBuf,
242    usearch_path: PathBuf,
243    tantivy_index: Index,
244    tantivy_reader: IndexReader,
245    tantivy_writer: Mutex<Option<IndexWriter>>,
246    usearch_index: RwLock<usearch::Index>,
247    fields: TantivyFields,
248}
249
250#[derive(Debug, Clone, Copy)]
251struct TantivyFields {
252    chunk_id: Field,
253    doc_id: Field,
254    filepath: Field,
255    title: Field,
256    heading: Field,
257    body: Field,
258}
259
260#[derive(Debug, Clone, Copy)]
261struct Bm25FieldSpec {
262    field: Field,
263    boost: f32,
264    index_record_option: IndexRecordOption,
265}
266
267impl Storage {
268    pub fn new(cache_dir: &Path) -> Result<Self> {
269        std::fs::create_dir_all(cache_dir)?;
270        let db_path = cache_dir.join(DB_FILE);
271        let conn = Connection::open(db_path)?;
272        reject_incompatible_legacy_index(&conn)?;
273        conn.execute_batch(
274            r#"
275PRAGMA foreign_keys = ON;
276PRAGMA journal_mode = WAL;
277
278CREATE TABLE IF NOT EXISTS spaces (
279    id          INTEGER PRIMARY KEY,
280    name        TEXT NOT NULL UNIQUE,
281    description TEXT,
282    created     TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
283);
284
285CREATE TABLE IF NOT EXISTS collections (
286    id          INTEGER PRIMARY KEY,
287    space_id    INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
288    name        TEXT NOT NULL,
289    path        TEXT NOT NULL,
290    description TEXT,
291    extensions  TEXT,
292    created     TEXT NOT NULL,
293    updated     TEXT NOT NULL,
294    UNIQUE(space_id, name)
295);
296
297CREATE TABLE IF NOT EXISTS documents (
298    id              INTEGER PRIMARY KEY,
299    collection_id   INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
300    path            TEXT NOT NULL,
301    title           TEXT NOT NULL,
302    title_source    TEXT NOT NULL DEFAULT 'extracted',
303    hash            TEXT NOT NULL,
304    modified        TEXT NOT NULL,
305    active          INTEGER NOT NULL DEFAULT 1,
306    deactivated_at  TEXT,
307    fts_dirty       INTEGER NOT NULL DEFAULT 0,
308    UNIQUE(collection_id, path)
309);
310CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
311CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
312CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
313
314CREATE TABLE IF NOT EXISTS chunks (
315    id       INTEGER PRIMARY KEY,
316    doc_id   INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
317    seq      INTEGER NOT NULL,
318    offset   INTEGER NOT NULL,
319    length   INTEGER NOT NULL,
320    heading  TEXT,
321    kind     TEXT NOT NULL DEFAULT 'section',
322    retrieval_prefix TEXT,
323    UNIQUE(doc_id, seq)
324);
325CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
326
327CREATE TABLE IF NOT EXISTS embeddings (
328    chunk_id    INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
329    model       TEXT NOT NULL,
330    embedded_at TEXT NOT NULL,
331    PRIMARY KEY (chunk_id, model)
332);
333
334CREATE TABLE IF NOT EXISTS document_texts (
335    doc_id            INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
336    extractor_key     TEXT NOT NULL,
337    source_hash       TEXT NOT NULL,
338    text_hash         TEXT NOT NULL,
339    generation_key    TEXT NOT NULL DEFAULT '',
340    text              TEXT NOT NULL,
341    created           TEXT NOT NULL
342);
343"#,
344        )?;
345        ensure_documents_title_source_column(&conn)?;
346        ensure_document_texts_generation_key_column(&conn)?;
347        ensure_chunks_retrieval_prefix_column(&conn)?;
348        ensure_schema_version(&conn)?;
349
350        conn.execute(
351            "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
352            params![DEFAULT_SPACE_NAME],
353        )?;
354
355        let storage = Self {
356            db: Mutex::new(conn),
357            cache_dir: cache_dir.to_path_buf(),
358            spaces: RwLock::new(HashMap::new()),
359            dirty_usearch_spaces: Mutex::new(HashSet::new()),
360        };
361        storage.open_space(DEFAULT_SPACE_NAME)?;
362
363        Ok(storage)
364    }
365
366    pub fn open_space(&self, name: &str) -> Result<()> {
367        let _space = self.get_space(name)?;
368
369        {
370            let spaces = self
371                .spaces
372                .read()
373                .map_err(|_| CoreError::poisoned("spaces"))?;
374            if spaces.contains_key(name) {
375                return Ok(());
376            }
377        }
378
379        let (tantivy_dir, usearch_path) = self.space_paths(name);
380        std::fs::create_dir_all(&tantivy_dir)?;
381        std::fs::OpenOptions::new()
382            .create(true)
383            .append(true)
384            .open(&usearch_path)?;
385        let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
386        let tantivy_reader = tantivy_index.reader()?;
387        let usearch_index = open_or_create_usearch_index(&usearch_path)?;
388        let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
389
390        let mut spaces = self
391            .spaces
392            .write()
393            .map_err(|_| CoreError::poisoned("spaces"))?;
394        spaces
395            .entry(name.to_string())
396            .or_insert(Arc::new(SpaceIndexes {
397                _tantivy_dir: tantivy_dir,
398                usearch_path,
399                tantivy_index,
400                tantivy_reader,
401                tantivy_writer: Mutex::new(None),
402                usearch_index: RwLock::new(usearch_index),
403                fields,
404            }));
405
406        Ok(())
407    }
408
409    pub fn close_space(&self, name: &str) -> Result<()> {
410        let _space = self.get_space(name)?;
411        let mut spaces = self
412            .spaces
413            .write()
414            .map_err(|_| CoreError::poisoned("spaces"))?;
415        let _removed = spaces.remove(name);
416        Ok(())
417    }
418
419    pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
420        let space_id = {
421            let conn = self
422                .db
423                .lock()
424                .map_err(|_| CoreError::poisoned("database"))?;
425
426            let result = conn.execute(
427                "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
428                params![name, description],
429            );
430
431            match result {
432                Ok(_) => conn.last_insert_rowid(),
433                Err(err) => {
434                    return match err {
435                        Error::SqliteFailure(sqlite_err, _)
436                            if sqlite_err.code == ErrorCode::ConstraintViolation =>
437                        {
438                            Err(KboltError::SpaceAlreadyExists {
439                                name: name.to_string(),
440                            }
441                            .into())
442                        }
443                        other => Err(other.into()),
444                    };
445                }
446            }
447        };
448
449        if let Err(open_err) = self.open_space(name) {
450            let rollback_result = self
451                .db
452                .lock()
453                .map_err(|_| CoreError::poisoned("database"))?
454                .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
455
456            if let Err(rollback_err) = rollback_result {
457                return Err(CoreError::Internal(format!(
458                    "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
459                )));
460            }
461
462            return Err(open_err);
463        }
464
465        Ok(space_id)
466    }
467
468    pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
469        let conn = self
470            .db
471            .lock()
472            .map_err(|_| CoreError::poisoned("database"))?;
473        let mut stmt =
474            conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
475
476        let row = stmt.query_row(params![name], decode_space_row);
477
478        match row {
479            Ok(space) => Ok(space),
480            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
481                name: name.to_string(),
482            }
483            .into()),
484            Err(err) => Err(err.into()),
485        }
486    }
487
488    pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
489        let conn = self
490            .db
491            .lock()
492            .map_err(|_| CoreError::poisoned("database"))?;
493        let mut stmt =
494            conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
495
496        let row = stmt.query_row(params![id], decode_space_row);
497
498        match row {
499            Ok(space) => Ok(space),
500            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
501                name: format!("id={id}"),
502            }
503            .into()),
504            Err(err) => Err(err.into()),
505        }
506    }
507
508    pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
509        let conn = self
510            .db
511            .lock()
512            .map_err(|_| CoreError::poisoned("database"))?;
513        let mut stmt = conn.prepare(
514            "SELECT id, name, description, created
515             FROM spaces
516             ORDER BY name ASC",
517        )?;
518
519        let rows = stmt.query_map([], decode_space_row)?;
520
521        let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
522        Ok(spaces)
523    }
524
525    pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
526        let conn = self
527            .db
528            .lock()
529            .map_err(|_| CoreError::poisoned("database"))?;
530        let mut stmt = conn.prepare(
531            "SELECT s.id, s.name, s.description, s.created
532             FROM spaces s
533             JOIN collections c ON c.space_id = s.id
534             WHERE c.name = ?1
535             ORDER BY s.name ASC",
536        )?;
537        let rows = stmt.query_map(params![collection], decode_space_row)?;
538        let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
539
540        if matches.is_empty() {
541            return Ok(SpaceResolution::NotFound);
542        }
543
544        if matches.len() == 1 {
545            return Ok(SpaceResolution::Found(matches[0].clone()));
546        }
547
548        let spaces = matches.into_iter().map(|space| space.name).collect();
549        Ok(SpaceResolution::Ambiguous(spaces))
550    }
551
552    pub fn delete_space(&self, name: &str) -> Result<()> {
553        if name == DEFAULT_SPACE_NAME {
554            let conn = self
555                .db
556                .lock()
557                .map_err(|_| CoreError::poisoned("database"))?;
558            conn.execute(
559                "DELETE FROM collections
560                 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
561                params![name],
562            )?;
563            drop(conn);
564
565            self.unload_space(name)?;
566            self.remove_space_artifacts(name)?;
567            self.open_space(name)?;
568            return Ok(());
569        }
570
571        let conn = self
572            .db
573            .lock()
574            .map_err(|_| CoreError::poisoned("database"))?;
575        let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
576        drop(conn);
577
578        if deleted == 0 {
579            return Err(KboltError::SpaceNotFound {
580                name: name.to_string(),
581            }
582            .into());
583        }
584
585        self.unload_space(name)?;
586        self.remove_space_artifacts(name)?;
587
588        Ok(())
589    }
590
591    pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
592        if old == DEFAULT_SPACE_NAME {
593            return Err(
594                KboltError::Config("cannot rename reserved space: default".to_string()).into(),
595            );
596        }
597
598        let conn = self
599            .db
600            .lock()
601            .map_err(|_| CoreError::poisoned("database"))?;
602
603        let result = conn.execute(
604            "UPDATE spaces SET name = ?1 WHERE name = ?2",
605            params![new, old],
606        );
607        drop(conn);
608
609        match result {
610            Ok(0) => Err(KboltError::SpaceNotFound {
611                name: old.to_string(),
612            }
613            .into()),
614            Ok(_) => {
615                if let Err(rename_err) = self.rename_space_artifacts(old, new) {
616                    let rollback = self
617                        .db
618                        .lock()
619                        .map_err(|_| CoreError::poisoned("database"))?
620                        .execute(
621                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
622                            params![old, new],
623                        );
624
625                    if let Err(rollback_err) = rollback {
626                        return Err(CoreError::Internal(format!(
627                            "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
628                        )));
629                    }
630
631                    return Err(rename_err);
632                }
633
634                self.unload_space(old)?;
635                if let Err(open_err) = self.open_space(new) {
636                    let _ = self.rename_space_artifacts(new, old);
637                    let _ = self
638                        .db
639                        .lock()
640                        .map_err(|_| CoreError::poisoned("database"))?
641                        .execute(
642                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
643                            params![old, new],
644                        );
645                    let _ = self.open_space(old);
646                    return Err(open_err);
647                }
648
649                Ok(())
650            }
651            Err(Error::SqliteFailure(sqlite_err, _))
652                if sqlite_err.code == ErrorCode::ConstraintViolation =>
653            {
654                Err(KboltError::SpaceAlreadyExists {
655                    name: new.to_string(),
656                }
657                .into())
658            }
659            Err(err) => Err(err.into()),
660        }
661    }
662
663    pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
664        let conn = self
665            .db
666            .lock()
667            .map_err(|_| CoreError::poisoned("database"))?;
668
669        let updated = conn.execute(
670            "UPDATE spaces SET description = ?1 WHERE name = ?2",
671            params![description, name],
672        )?;
673
674        if updated == 0 {
675            return Err(KboltError::SpaceNotFound {
676                name: name.to_string(),
677            }
678            .into());
679        }
680
681        Ok(())
682    }
683
684    pub fn create_collection(
685        &self,
686        space_id: i64,
687        name: &str,
688        path: &Path,
689        description: Option<&str>,
690        extensions: Option<&[String]>,
691    ) -> Result<i64> {
692        let conn = self
693            .db
694            .lock()
695            .map_err(|_| CoreError::poisoned("database"))?;
696
697        let space_name = lookup_space_name(&conn, space_id)?;
698        let extensions_json = serialize_extensions(extensions)?;
699        let result = conn.execute(
700            "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
701             VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
702            params![space_id, name, path.to_string_lossy(), description, extensions_json],
703        );
704
705        match result {
706            Ok(_) => Ok(conn.last_insert_rowid()),
707            Err(Error::SqliteFailure(sqlite_err, _))
708                if sqlite_err.code == ErrorCode::ConstraintViolation =>
709            {
710                Err(KboltError::CollectionAlreadyExists {
711                    name: name.to_string(),
712                    space: space_name,
713                }
714                .into())
715            }
716            Err(err) => Err(err.into()),
717        }
718    }
719
720    pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
721        let conn = self
722            .db
723            .lock()
724            .map_err(|_| CoreError::poisoned("database"))?;
725        let mut stmt = conn.prepare(
726            "SELECT id, space_id, name, path, description, extensions, created, updated
727             FROM collections
728             WHERE space_id = ?1 AND name = ?2",
729        )?;
730
731        let result = stmt.query_row(params![space_id, name], decode_collection_row);
732        match result {
733            Ok(row) => Ok(row),
734            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
735                name: name.to_string(),
736            }
737            .into()),
738            Err(err) => Err(err.into()),
739        }
740    }
741
742    pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
743        let conn = self
744            .db
745            .lock()
746            .map_err(|_| CoreError::poisoned("database"))?;
747        let mut stmt = conn.prepare(
748            "SELECT id, space_id, name, path, description, extensions, created, updated
749             FROM collections
750             WHERE id = ?1",
751        )?;
752
753        let result = stmt.query_row(params![id], decode_collection_row);
754        match result {
755            Ok(row) => Ok(row),
756            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
757                name: format!("id={id}"),
758            }
759            .into()),
760            Err(err) => Err(err.into()),
761        }
762    }
763
764    pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
765        let conn = self
766            .db
767            .lock()
768            .map_err(|_| CoreError::poisoned("database"))?;
769
770        let (sql, params): (&str, Vec<i64>) = match space_id {
771            Some(id) => (
772                "SELECT id, space_id, name, path, description, extensions, created, updated
773                 FROM collections
774                 WHERE space_id = ?1
775                 ORDER BY name ASC",
776                vec![id],
777            ),
778            None => (
779                "SELECT id, space_id, name, path, description, extensions, created, updated
780                 FROM collections
781                 ORDER BY space_id ASC, name ASC",
782                Vec::new(),
783            ),
784        };
785
786        let mut stmt = conn.prepare(sql)?;
787        let rows = if params.is_empty() {
788            stmt.query_map([], decode_collection_row)?
789        } else {
790            stmt.query_map(params![params[0]], decode_collection_row)?
791        };
792        let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
793        Ok(collections)
794    }
795
796    pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
797        let conn = self
798            .db
799            .lock()
800            .map_err(|_| CoreError::poisoned("database"))?;
801        let _space_name = lookup_space_name(&conn, space_id)?;
802
803        query_count(
804            &conn,
805            "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
806            params![space_id],
807        )
808    }
809
810    pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
811        let conn = self
812            .db
813            .lock()
814            .map_err(|_| CoreError::poisoned("database"))?;
815        let _space_name = lookup_space_name(&conn, space_id)?;
816
817        let deleted = conn.execute(
818            "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
819            params![space_id, name],
820        )?;
821
822        if deleted == 0 {
823            return Err(KboltError::CollectionNotFound {
824                name: name.to_string(),
825            }
826            .into());
827        }
828
829        Ok(())
830    }
831
832    pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
833        let conn = self
834            .db
835            .lock()
836            .map_err(|_| CoreError::poisoned("database"))?;
837        let space_name = lookup_space_name(&conn, space_id)?;
838        let result = conn.execute(
839            "UPDATE collections
840             SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
841             WHERE space_id = ?2 AND name = ?3",
842            params![new, space_id, old],
843        );
844
845        match result {
846            Ok(0) => Err(KboltError::CollectionNotFound {
847                name: old.to_string(),
848            }
849            .into()),
850            Ok(_) => Ok(()),
851            Err(Error::SqliteFailure(sqlite_err, _))
852                if sqlite_err.code == ErrorCode::ConstraintViolation =>
853            {
854                Err(KboltError::CollectionAlreadyExists {
855                    name: new.to_string(),
856                    space: space_name,
857                }
858                .into())
859            }
860            Err(err) => Err(err.into()),
861        }
862    }
863
864    pub fn update_collection_description(
865        &self,
866        space_id: i64,
867        name: &str,
868        desc: &str,
869    ) -> Result<()> {
870        let conn = self
871            .db
872            .lock()
873            .map_err(|_| CoreError::poisoned("database"))?;
874        let _space_name = lookup_space_name(&conn, space_id)?;
875
876        let updated = conn.execute(
877            "UPDATE collections
878             SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
879             WHERE space_id = ?2 AND name = ?3",
880            params![desc, space_id, name],
881        )?;
882
883        if updated == 0 {
884            return Err(KboltError::CollectionNotFound {
885                name: name.to_string(),
886            }
887            .into());
888        }
889
890        Ok(())
891    }
892
893    pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
894        let conn = self
895            .db
896            .lock()
897            .map_err(|_| CoreError::poisoned("database"))?;
898
899        let updated = conn.execute(
900            "UPDATE collections
901             SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
902             WHERE id = ?1",
903            params![collection_id],
904        )?;
905
906        if updated == 0 {
907            return Err(KboltError::CollectionNotFound {
908                name: format!("id={collection_id}"),
909            }
910            .into());
911        }
912
913        Ok(())
914    }
915
916    pub fn upsert_document(
917        &self,
918        collection_id: i64,
919        path: &str,
920        title: &str,
921        title_source: DocumentTitleSource,
922        hash: &str,
923        modified: &str,
924    ) -> Result<i64> {
925        let conn = self
926            .db
927            .lock()
928            .map_err(|_| CoreError::poisoned("database"))?;
929        let _collection_name = lookup_collection_name(&conn, collection_id)?;
930
931        let id = conn.query_row(
932            "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
933             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
934             ON CONFLICT(collection_id, path) DO UPDATE SET
935                 title = excluded.title,
936                 title_source = excluded.title_source,
937                 hash = excluded.hash,
938                 modified = excluded.modified,
939                 active = 1,
940                 deactivated_at = NULL,
941                 fts_dirty = 1
942             RETURNING id",
943            params![
944                collection_id,
945                path,
946                title,
947                title_source.as_sql(),
948                hash,
949                modified
950            ],
951            |row| row.get(0),
952        )?;
953        Ok(id)
954    }
955
956    pub fn get_document_by_path(
957        &self,
958        collection_id: i64,
959        path: &str,
960    ) -> Result<Option<DocumentRow>> {
961        let conn = self
962            .db
963            .lock()
964            .map_err(|_| CoreError::poisoned("database"))?;
965        let _collection_name = lookup_collection_name(&conn, collection_id)?;
966        let mut stmt = conn.prepare(
967            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
968             FROM documents
969             WHERE collection_id = ?1 AND path = ?2",
970        )?;
971
972        let result = stmt.query_row(params![collection_id, path], decode_document_row);
973        match result {
974            Ok(row) => Ok(Some(row)),
975            Err(Error::QueryReturnedNoRows) => Ok(None),
976            Err(err) => Err(err.into()),
977        }
978    }
979
980    pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
981        let conn = self
982            .db
983            .lock()
984            .map_err(|_| CoreError::poisoned("database"))?;
985        let mut stmt = conn.prepare(
986            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
987             FROM documents
988             WHERE id = ?1",
989        )?;
990
991        let result = stmt.query_row(params![id], decode_document_row);
992        match result {
993            Ok(row) => Ok(row),
994            Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
995                path: format!("id={id}"),
996            }
997            .into()),
998            Err(err) => Err(err.into()),
999        }
1000    }
1001
1002    pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
1003        if ids.is_empty() {
1004            return Ok(Vec::new());
1005        }
1006
1007        let conn = self
1008            .db
1009            .lock()
1010            .map_err(|_| CoreError::poisoned("database"))?;
1011
1012        let placeholders = vec!["?"; ids.len()].join(", ");
1013        let sql = format!(
1014            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1015             FROM documents
1016             WHERE id IN ({placeholders})"
1017        );
1018        let mut stmt = conn.prepare(&sql)?;
1019        let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
1020        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1021        Ok(docs)
1022    }
1023
1024    pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
1025        let conn = self
1026            .db
1027            .lock()
1028            .map_err(|_| CoreError::poisoned("database"))?;
1029
1030        let updated = conn.execute(
1031            "UPDATE documents
1032             SET modified = ?1,
1033                 active = 1,
1034                 deactivated_at = NULL
1035             WHERE id = ?2",
1036            params![modified, doc_id],
1037        )?;
1038
1039        if updated == 0 {
1040            return Err(KboltError::DocumentNotFound {
1041                path: format!("id={doc_id}"),
1042            }
1043            .into());
1044        }
1045
1046        Ok(())
1047    }
1048
1049    pub fn list_documents(
1050        &self,
1051        collection_id: i64,
1052        active_only: bool,
1053    ) -> Result<Vec<DocumentRow>> {
1054        let conn = self
1055            .db
1056            .lock()
1057            .map_err(|_| CoreError::poisoned("database"))?;
1058        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1059        let active_only = i64::from(active_only);
1060        let mut stmt = conn.prepare(
1061            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1062             FROM documents
1063             WHERE collection_id = ?1
1064               AND (?2 = 0 OR active = 1)
1065             ORDER BY path ASC",
1066        )?;
1067        let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1068        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1069        Ok(docs)
1070    }
1071
1072    pub fn list_collection_file_rows(
1073        &self,
1074        collection_id: i64,
1075        active_only: bool,
1076    ) -> Result<Vec<FileListRow>> {
1077        let conn = self
1078            .db
1079            .lock()
1080            .map_err(|_| CoreError::poisoned("database"))?;
1081        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1082        let active_only = i64::from(active_only);
1083        let mut stmt = conn.prepare(
1084            "SELECT d.id, d.path, d.title, d.hash, d.active,
1085                    COUNT(DISTINCT c.id) AS chunk_count,
1086                    COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1087             FROM documents d
1088             LEFT JOIN chunks c ON c.doc_id = d.id
1089             LEFT JOIN embeddings e ON e.chunk_id = c.id
1090             WHERE d.collection_id = ?1
1091               AND (?2 = 0 OR d.active = 1)
1092             GROUP BY d.id, d.path, d.title, d.hash, d.active
1093             ORDER BY d.path ASC",
1094        )?;
1095        let rows = stmt.query_map(params![collection_id, active_only], |row| {
1096            let chunk_count: i64 = row.get(5)?;
1097            let embedded_chunk_count: i64 = row.get(6)?;
1098            Ok(FileListRow {
1099                doc_id: row.get(0)?,
1100                path: row.get(1)?,
1101                title: row.get(2)?,
1102                hash: row.get(3)?,
1103                active: row.get::<_, i64>(4)? != 0,
1104                chunk_count: chunk_count as usize,
1105                embedded_chunk_count: embedded_chunk_count as usize,
1106            })
1107        })?;
1108        let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1109        Ok(files)
1110    }
1111
1112    pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1113        let conn = self
1114            .db
1115            .lock()
1116            .map_err(|_| CoreError::poisoned("database"))?;
1117
1118        let pattern = format!("{prefix}%");
1119        let mut stmt = conn.prepare(
1120            "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1121             FROM documents
1122             WHERE hash LIKE ?1
1123             ORDER BY id ASC",
1124        )?;
1125        let rows = stmt.query_map(params![pattern], decode_document_row)?;
1126        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1127        Ok(docs)
1128    }
1129
1130    pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1131        let conn = self
1132            .db
1133            .lock()
1134            .map_err(|_| CoreError::poisoned("database"))?;
1135
1136        let updated = conn.execute(
1137            "UPDATE documents
1138             SET active = 0,
1139                 deactivated_at = CASE
1140                    WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1141                    ELSE deactivated_at
1142                 END
1143             WHERE id = ?1",
1144            params![doc_id],
1145        )?;
1146
1147        if updated == 0 {
1148            return Err(KboltError::DocumentNotFound {
1149                path: format!("id={doc_id}"),
1150            }
1151            .into());
1152        }
1153
1154        Ok(())
1155    }
1156
1157    pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1158        let conn = self
1159            .db
1160            .lock()
1161            .map_err(|_| CoreError::poisoned("database"))?;
1162
1163        let updated = conn.execute(
1164            "UPDATE documents
1165             SET active = 1, deactivated_at = NULL
1166             WHERE id = ?1",
1167            params![doc_id],
1168        )?;
1169
1170        if updated == 0 {
1171            return Err(KboltError::DocumentNotFound {
1172                path: format!("id={doc_id}"),
1173            }
1174            .into());
1175        }
1176
1177        Ok(())
1178    }
1179
1180    pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1181        let reaped = self.list_reapable_documents(older_than_days)?;
1182        let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1183        self.delete_documents(&doc_ids)?;
1184        Ok(doc_ids)
1185    }
1186
1187    pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1188        self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1189    }
1190
1191    pub fn list_reapable_documents_in_space(
1192        &self,
1193        older_than_days: u32,
1194        space_id: i64,
1195    ) -> Result<Vec<ReapableDocument>> {
1196        self.list_reapable_documents_filtered(
1197            older_than_days,
1198            " AND c.space_id = ?",
1199            vec![SqlValue::Integer(space_id)],
1200        )
1201    }
1202
1203    pub fn list_reapable_documents_in_collections(
1204        &self,
1205        older_than_days: u32,
1206        collection_ids: &[i64],
1207    ) -> Result<Vec<ReapableDocument>> {
1208        if collection_ids.is_empty() {
1209            return Ok(Vec::new());
1210        }
1211
1212        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1213        let clause = format!(" AND d.collection_id IN ({placeholders})");
1214        let params = collection_ids
1215            .iter()
1216            .map(|id| SqlValue::Integer(*id))
1217            .collect::<Vec<_>>();
1218        self.list_reapable_documents_filtered(older_than_days, &clause, params)
1219    }
1220
1221    fn list_reapable_documents_filtered(
1222        &self,
1223        older_than_days: u32,
1224        scope_clause: &str,
1225        scope_params: Vec<SqlValue>,
1226    ) -> Result<Vec<ReapableDocument>> {
1227        let conn = self
1228            .db
1229            .lock()
1230            .map_err(|_| CoreError::poisoned("database"))?;
1231
1232        let modifier = format!("-{} days", older_than_days);
1233        let sql = format!(
1234            "SELECT d.id, s.name
1235             FROM documents d
1236             JOIN collections c ON c.id = d.collection_id
1237             JOIN spaces s ON s.id = c.space_id
1238             WHERE d.active = 0
1239               AND d.deactivated_at IS NOT NULL
1240               AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1241             ORDER BY d.id ASC"
1242        );
1243        let mut stmt = conn.prepare(&sql)?;
1244        let mut params = Vec::with_capacity(scope_params.len() + 1);
1245        params.push(SqlValue::Text(modifier));
1246        params.extend(scope_params);
1247        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1248            Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1249        })?;
1250        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1251        drop(stmt);
1252
1253        let mut documents = Vec::with_capacity(headers.len());
1254        for (doc_id, space_name) in headers {
1255            documents.push(ReapableDocument {
1256                doc_id,
1257                space_name,
1258                chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1259            });
1260        }
1261
1262        Ok(documents)
1263    }
1264
1265    pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1266        if doc_ids.is_empty() {
1267            return Ok(());
1268        }
1269
1270        let conn = self
1271            .db
1272            .lock()
1273            .map_err(|_| CoreError::poisoned("database"))?;
1274
1275        let placeholders = vec!["?"; doc_ids.len()].join(", ");
1276        let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1277        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1278        Ok(())
1279    }
1280
1281    pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1282        if chunks.is_empty() {
1283            return Ok(Vec::new());
1284        }
1285
1286        let conn = self
1287            .db
1288            .lock()
1289            .map_err(|_| CoreError::poisoned("database"))?;
1290        let _doc = lookup_document_id(&conn, doc_id)?;
1291
1292        let tx = conn.unchecked_transaction()?;
1293        let mut stmt = tx.prepare(
1294            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1295             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1296        )?;
1297
1298        let mut ids = Vec::with_capacity(chunks.len());
1299        for chunk in chunks {
1300            stmt.execute(params![
1301                doc_id,
1302                chunk.seq,
1303                chunk.offset as i64,
1304                chunk.length as i64,
1305                chunk.heading,
1306                chunk.kind.as_storage_kind(),
1307                chunk.retrieval_prefix.as_deref(),
1308            ])?;
1309            ids.push(tx.last_insert_rowid());
1310        }
1311        drop(stmt);
1312        tx.commit()?;
1313        Ok(ids)
1314    }
1315
1316    pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1317        let conn = self
1318            .db
1319            .lock()
1320            .map_err(|_| CoreError::poisoned("database"))?;
1321        let _doc = lookup_document_id(&conn, doc_id)?;
1322
1323        let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1324        let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1325        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1326
1327        conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1328        Ok(chunk_ids)
1329    }
1330
1331    pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1332        let conn = self
1333            .db
1334            .lock()
1335            .map_err(|_| CoreError::poisoned("database"))?;
1336        let _doc = lookup_document_id(&conn, doc_id)?;
1337
1338        let mut stmt = conn.prepare(
1339            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1340             FROM chunks
1341             WHERE doc_id = ?1
1342             ORDER BY seq ASC",
1343        )?;
1344        let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1345        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1346        Ok(chunks)
1347    }
1348
1349    pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1350        if doc_ids.is_empty() {
1351            return Ok(HashMap::new());
1352        }
1353
1354        let mut requested = doc_ids.to_vec();
1355        requested.sort_unstable();
1356        requested.dedup();
1357
1358        let conn = self
1359            .db
1360            .lock()
1361            .map_err(|_| CoreError::poisoned("database"))?;
1362
1363        let placeholders = vec!["?"; requested.len()].join(", ");
1364        let sql = format!(
1365            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1366             FROM chunks
1367             WHERE doc_id IN ({placeholders})
1368             ORDER BY doc_id ASC, seq ASC"
1369        );
1370        let mut stmt = conn.prepare(&sql)?;
1371        let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1372        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1373        for doc_id in requested {
1374            chunks_by_doc.insert(doc_id, Vec::new());
1375        }
1376        for row in rows {
1377            let chunk = row?;
1378            chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1379        }
1380
1381        Ok(chunks_by_doc)
1382    }
1383
1384    pub fn get_chunks_for_document_seq_ranges(
1385        &self,
1386        ranges: &[(i64, i32, i32)],
1387    ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1388        if ranges.is_empty() {
1389            return Ok(HashMap::new());
1390        }
1391
1392        let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1393        for (doc_id, min_seq, max_seq) in ranges {
1394            if min_seq > max_seq {
1395                return Err(CoreError::Internal(format!(
1396                    "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1397                )));
1398            }
1399
1400            ranges_by_doc
1401                .entry(*doc_id)
1402                .or_default()
1403                .push((*min_seq, *max_seq));
1404        }
1405
1406        for doc_ranges in ranges_by_doc.values_mut() {
1407            doc_ranges.sort_unstable();
1408            let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1409            for (min_seq, max_seq) in doc_ranges.drain(..) {
1410                if let Some((_, merged_max)) = merged.last_mut() {
1411                    if min_seq <= merged_max.saturating_add(1) {
1412                        *merged_max = (*merged_max).max(max_seq);
1413                        continue;
1414                    }
1415                }
1416                merged.push((min_seq, max_seq));
1417            }
1418            *doc_ranges = merged;
1419        }
1420
1421        let conn = self
1422            .db
1423            .lock()
1424            .map_err(|_| CoreError::poisoned("database"))?;
1425        let mut stmt = conn.prepare(
1426            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1427             FROM chunks
1428             WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1429             ORDER BY seq ASC",
1430        )?;
1431
1432        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1433        for (doc_id, doc_ranges) in ranges_by_doc {
1434            chunks_by_doc.entry(doc_id).or_default();
1435            for (min_seq, max_seq) in doc_ranges {
1436                let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1437                for row in rows {
1438                    chunks_by_doc.entry(doc_id).or_default().push(row?);
1439                }
1440            }
1441        }
1442
1443        Ok(chunks_by_doc)
1444    }
1445
1446    pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1447        if chunk_ids.is_empty() {
1448            return Ok(Vec::new());
1449        }
1450
1451        let conn = self
1452            .db
1453            .lock()
1454            .map_err(|_| CoreError::poisoned("database"))?;
1455
1456        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1457        let sql = format!(
1458            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1459             FROM chunks
1460             WHERE id IN ({placeholders})
1461             ORDER BY id ASC"
1462        );
1463        let mut stmt = conn.prepare(&sql)?;
1464        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1465        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1466        Ok(chunks)
1467    }
1468
1469    pub fn get_active_search_scope_summary_in_collections(
1470        &self,
1471        collection_ids: &[i64],
1472    ) -> Result<SearchScopeSummary> {
1473        if collection_ids.is_empty() {
1474            return Ok(SearchScopeSummary {
1475                document_ids: Vec::new(),
1476                chunk_count: 0,
1477            });
1478        }
1479
1480        let conn = self
1481            .db
1482            .lock()
1483            .map_err(|_| CoreError::poisoned("database"))?;
1484
1485        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1486        let sql = format!(
1487            "SELECT d.id, COUNT(c.id)
1488             FROM chunks c
1489             JOIN documents d ON d.id = c.doc_id
1490             WHERE d.active = 1
1491               AND d.collection_id IN ({placeholders})
1492             GROUP BY d.id
1493             ORDER BY d.id ASC"
1494        );
1495        let mut stmt = conn.prepare(&sql)?;
1496        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1497            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1498        })?;
1499
1500        let mut document_ids = Vec::new();
1501        let mut chunk_count = 0usize;
1502        for row in rows {
1503            let (doc_id, doc_chunk_count) = row?;
1504            let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1505                CoreError::Internal(format!(
1506                    "active chunk count must be non-negative for document {doc_id}"
1507                ))
1508            })?;
1509            document_ids.push(doc_id);
1510            chunk_count = chunk_count.saturating_add(doc_chunk_count);
1511        }
1512
1513        Ok(SearchScopeSummary {
1514            document_ids,
1515            chunk_count,
1516        })
1517    }
1518
1519    pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1520        if collection_ids.is_empty() {
1521            return Ok(0);
1522        }
1523
1524        let conn = self
1525            .db
1526            .lock()
1527            .map_err(|_| CoreError::poisoned("database"))?;
1528
1529        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1530        let sql = format!(
1531            "SELECT COUNT(*)
1532             FROM chunks c
1533             JOIN documents d ON d.id = c.doc_id
1534             WHERE d.active = 1
1535               AND d.collection_id IN ({placeholders})"
1536        );
1537        query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1538    }
1539
1540    pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1541        if collection_ids.is_empty() {
1542            return Ok(false);
1543        }
1544
1545        let conn = self
1546            .db
1547            .lock()
1548            .map_err(|_| CoreError::poisoned("database"))?;
1549
1550        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1551        let sql = format!(
1552            "SELECT EXISTS(
1553                 SELECT 1
1554                 FROM documents
1555                 WHERE active = 0
1556                   AND collection_id IN ({placeholders})
1557             )"
1558        );
1559        let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1560            row.get(0)
1561        })?;
1562        Ok(exists != 0)
1563    }
1564
1565    pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1566        if collection_ids.is_empty() {
1567            return Ok(Vec::new());
1568        }
1569
1570        let conn = self
1571            .db
1572            .lock()
1573            .map_err(|_| CoreError::poisoned("database"))?;
1574
1575        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1576        let sql = format!(
1577            "SELECT c.id
1578             FROM chunks c
1579             JOIN documents d ON d.id = c.doc_id
1580             WHERE d.active = 1
1581               AND d.collection_id IN ({placeholders})
1582             ORDER BY d.id ASC, c.seq ASC"
1583        );
1584        let mut stmt = conn.prepare(&sql)?;
1585        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1586        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1587        Ok(chunk_ids)
1588    }
1589
1590    pub fn replace_document_generation(
1591        &self,
1592        replacement: DocumentGenerationReplace<'_>,
1593    ) -> Result<DocumentGenerationReplaceResult> {
1594        for chunk in replacement.chunks {
1595            validate_text_span(
1596                replacement.text,
1597                chunk.offset,
1598                chunk.length,
1599                &format!("chunk seq {}", chunk.seq),
1600            )?;
1601        }
1602
1603        let conn = self
1604            .db
1605            .lock()
1606            .map_err(|_| CoreError::poisoned("database"))?;
1607        let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1608
1609        let tx = conn.unchecked_transaction()?;
1610        let doc_id: i64 = tx.query_row(
1611            "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
1612             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
1613             ON CONFLICT(collection_id, path) DO UPDATE SET
1614                 title = excluded.title,
1615                 title_source = excluded.title_source,
1616                 hash = excluded.hash,
1617                 modified = excluded.modified,
1618                 active = 1,
1619                 deactivated_at = NULL,
1620                 fts_dirty = 1
1621             RETURNING id",
1622            params![
1623                replacement.collection_id,
1624                replacement.path,
1625                replacement.title,
1626                replacement.title_source.as_sql(),
1627                replacement.hash,
1628                replacement.modified
1629            ],
1630            |row| row.get(0),
1631        )?;
1632
1633        let old_chunk_ids = {
1634            let mut stmt =
1635                tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1636            let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1637            rows.collect::<std::result::Result<Vec<_>, _>>()?
1638        };
1639
1640        tx.execute(
1641            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1642             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1643             ON CONFLICT(doc_id) DO UPDATE SET
1644                 extractor_key = excluded.extractor_key,
1645                 source_hash = excluded.source_hash,
1646                 text_hash = excluded.text_hash,
1647                 generation_key = excluded.generation_key,
1648                 text = excluded.text,
1649                 created = excluded.created",
1650            params![
1651                doc_id,
1652                replacement.extractor_key,
1653                replacement.source_hash,
1654                replacement.text_hash,
1655                replacement.generation_key,
1656                replacement.text
1657            ],
1658        )?;
1659
1660        tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1661
1662        let mut stmt = tx.prepare(
1663            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1664             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1665        )?;
1666        let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1667        for chunk in replacement.chunks {
1668            stmt.execute(params![
1669                doc_id,
1670                chunk.seq,
1671                chunk.offset as i64,
1672                chunk.length as i64,
1673                chunk.heading,
1674                chunk.kind.as_storage_kind(),
1675                chunk.retrieval_prefix.as_deref(),
1676            ])?;
1677            chunk_ids.push(tx.last_insert_rowid());
1678        }
1679        drop(stmt);
1680
1681        tx.commit()?;
1682        Ok(DocumentGenerationReplaceResult {
1683            doc_id,
1684            old_chunk_ids,
1685            chunk_ids,
1686        })
1687    }
1688
1689    pub fn put_document_text(
1690        &self,
1691        doc_id: i64,
1692        extractor_key: &str,
1693        source_hash: &str,
1694        text_hash: &str,
1695        generation_key: &str,
1696        text: &str,
1697    ) -> Result<()> {
1698        let conn = self
1699            .db
1700            .lock()
1701            .map_err(|_| CoreError::poisoned("database"))?;
1702        let _doc = lookup_document_id(&conn, doc_id)?;
1703
1704        conn.execute(
1705            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1706             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1707             ON CONFLICT(doc_id) DO UPDATE SET
1708                 extractor_key = excluded.extractor_key,
1709                 source_hash = excluded.source_hash,
1710                 text_hash = excluded.text_hash,
1711                 generation_key = excluded.generation_key,
1712                 text = excluded.text,
1713                 created = excluded.created",
1714            params![
1715                doc_id,
1716                extractor_key,
1717                source_hash,
1718                text_hash,
1719                generation_key,
1720                text
1721            ],
1722        )?;
1723        Ok(())
1724    }
1725
1726    pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1727        let conn = self
1728            .db
1729            .lock()
1730            .map_err(|_| CoreError::poisoned("database"))?;
1731        let _doc = lookup_document_id(&conn, doc_id)?;
1732
1733        let result = conn.query_row(
1734            "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1735             FROM document_texts
1736             WHERE doc_id = ?1",
1737            params![doc_id],
1738            decode_document_text_row,
1739        );
1740        match result {
1741            Ok(row) => Ok(row),
1742            Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1743            Err(err) => Err(err.into()),
1744        }
1745    }
1746
1747    pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1748        if doc_ids.is_empty() {
1749            return Ok(HashMap::new());
1750        }
1751
1752        let mut requested = doc_ids.to_vec();
1753        requested.sort_unstable();
1754        requested.dedup();
1755
1756        let text_by_doc = self.get_existing_document_texts(&requested)?;
1757        for doc_id in requested {
1758            if !text_by_doc.contains_key(&doc_id) {
1759                return Err(missing_document_text_error(doc_id));
1760            }
1761        }
1762
1763        Ok(text_by_doc)
1764    }
1765
1766    pub fn get_existing_document_texts(
1767        &self,
1768        doc_ids: &[i64],
1769    ) -> Result<HashMap<i64, DocumentTextRow>> {
1770        if doc_ids.is_empty() {
1771            return Ok(HashMap::new());
1772        }
1773
1774        let mut requested = doc_ids.to_vec();
1775        requested.sort_unstable();
1776        requested.dedup();
1777
1778        let conn = self
1779            .db
1780            .lock()
1781            .map_err(|_| CoreError::poisoned("database"))?;
1782        let mut text_by_doc = HashMap::new();
1783        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1784            let placeholders = vec!["?"; batch.len()].join(", ");
1785            let sql = format!(
1786                "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1787                 FROM document_texts
1788                 WHERE doc_id IN ({placeholders})
1789                 ORDER BY doc_id ASC"
1790            );
1791            let mut stmt = conn.prepare(&sql)?;
1792            let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1793            for row in rows {
1794                let row = row?;
1795                text_by_doc.insert(row.doc_id, row);
1796            }
1797        }
1798
1799        Ok(text_by_doc)
1800    }
1801
1802    pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1803        let conn = self
1804            .db
1805            .lock()
1806            .map_err(|_| CoreError::poisoned("database"))?;
1807        let exists: i64 = conn.query_row(
1808            "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1809            params![doc_id],
1810            |row| row.get(0),
1811        )?;
1812        Ok(exists != 0)
1813    }
1814
1815    pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1816        let conn = self
1817            .db
1818            .lock()
1819            .map_err(|_| CoreError::poisoned("database"))?;
1820        let exists: i64 = conn.query_row(
1821            "SELECT EXISTS(
1822                SELECT 1 FROM document_texts
1823                WHERE doc_id = ?1 AND generation_key = ?2
1824            )",
1825            params![doc_id, generation_key],
1826            |row| row.get(0),
1827        )?;
1828        Ok(exists != 0)
1829    }
1830
1831    pub fn get_document_text_generation_keys(
1832        &self,
1833        doc_ids: &[i64],
1834    ) -> Result<HashMap<i64, String>> {
1835        if doc_ids.is_empty() {
1836            return Ok(HashMap::new());
1837        }
1838
1839        let mut requested = doc_ids.to_vec();
1840        requested.sort_unstable();
1841        requested.dedup();
1842
1843        let conn = self
1844            .db
1845            .lock()
1846            .map_err(|_| CoreError::poisoned("database"))?;
1847        let mut generation_by_doc = HashMap::with_capacity(requested.len());
1848        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1849            let placeholders = vec!["?"; batch.len()].join(", ");
1850            let sql = format!(
1851                "SELECT doc_id, generation_key
1852                 FROM document_texts
1853                 WHERE doc_id IN ({placeholders})"
1854            );
1855            let mut stmt = conn.prepare(&sql)?;
1856            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1857                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1858            })?;
1859            for row in rows {
1860                let (doc_id, generation_key) = row?;
1861                generation_by_doc.insert(doc_id, generation_key);
1862            }
1863        }
1864
1865        Ok(generation_by_doc)
1866    }
1867
1868    pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1869        if doc_ids.is_empty() {
1870            return Ok(HashMap::new());
1871        }
1872
1873        let mut requested = doc_ids.to_vec();
1874        requested.sort_unstable();
1875        requested.dedup();
1876
1877        let conn = self
1878            .db
1879            .lock()
1880            .map_err(|_| CoreError::poisoned("database"))?;
1881        let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1882        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1883            let placeholders = vec!["?"; batch.len()].join(", ");
1884            let sql = format!(
1885                "SELECT doc_id, extractor_key
1886                 FROM document_texts
1887                 WHERE doc_id IN ({placeholders})"
1888            );
1889            let mut stmt = conn.prepare(&sql)?;
1890            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1891                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1892            })?;
1893            for row in rows {
1894                let (doc_id, extractor_key) = row?;
1895                extractor_by_doc.insert(doc_id, extractor_key);
1896            }
1897        }
1898
1899        for doc_id in requested {
1900            if !extractor_by_doc.contains_key(&doc_id) {
1901                return Err(missing_document_text_error(doc_id));
1902            }
1903        }
1904
1905        Ok(extractor_by_doc)
1906    }
1907
1908    pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1909        if chunk_ids.is_empty() {
1910            return Ok(HashMap::new());
1911        }
1912
1913        let mut requested = chunk_ids.to_vec();
1914        requested.sort_unstable();
1915        requested.dedup();
1916
1917        let conn = self
1918            .db
1919            .lock()
1920            .map_err(|_| CoreError::poisoned("database"))?;
1921        let mut text_by_chunk = HashMap::with_capacity(requested.len());
1922        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1923            let placeholders = vec!["?"; batch.len()].join(", ");
1924            let sql = format!(
1925                "SELECT c.id,
1926                        c.offset,
1927                        c.length,
1928                        length(CAST(dt.text AS BLOB)),
1929                        substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1930                        substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1931                        substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1932                 FROM chunks c
1933                 JOIN document_texts dt ON dt.doc_id = c.doc_id
1934                 WHERE c.id IN ({placeholders})"
1935            );
1936            let mut stmt = conn.prepare(&sql)?;
1937            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1938                Ok((
1939                    row.get::<_, i64>(0)?,
1940                    row.get::<_, i64>(1)?,
1941                    row.get::<_, i64>(2)?,
1942                    row.get::<_, i64>(3)?,
1943                    row.get::<_, Vec<u8>>(4)?,
1944                    row.get::<_, Vec<u8>>(5)?,
1945                    row.get::<_, Vec<u8>>(6)?,
1946                ))
1947            })?;
1948            for row in rows {
1949                let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1950                let text = canonical_chunk_slice_from_bytes(
1951                    chunk_id,
1952                    offset,
1953                    length,
1954                    document_len,
1955                    start_byte,
1956                    end_byte,
1957                    bytes,
1958                )?;
1959                text_by_chunk.insert(chunk_id, text);
1960            }
1961        }
1962
1963        for chunk_id in requested {
1964            if !text_by_chunk.contains_key(&chunk_id) {
1965                return Err(CoreError::Internal(format!(
1966                    "canonical text missing for chunk {chunk_id}"
1967                )));
1968            }
1969        }
1970
1971        Ok(text_by_chunk)
1972    }
1973
1974    pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
1975        let conn = self
1976            .db
1977            .lock()
1978            .map_err(|_| CoreError::poisoned("database"))?;
1979        let result = conn.query_row(
1980            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
1981             FROM chunks c
1982             JOIN document_texts dt ON dt.doc_id = c.doc_id
1983             WHERE c.id = ?1",
1984            params![chunk_id],
1985            |row| {
1986                let chunk = decode_chunk_row(row)?;
1987                let extractor_key: String = row.get(8)?;
1988                let document_text: String = row.get(9)?;
1989                Ok((chunk, extractor_key, document_text))
1990            },
1991        );
1992        let (chunk, extractor_key, document_text) = match result {
1993            Ok(row) => row,
1994            Err(Error::QueryReturnedNoRows) => {
1995                let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
1996                return Err(missing_document_text_error(doc_id));
1997            }
1998            Err(err) => return Err(err.into()),
1999        };
2000        let text = chunk_text_from_canonical(&document_text, &chunk)?;
2001        Ok(ChunkTextRow {
2002            chunk,
2003            extractor_key,
2004            text,
2005        })
2006    }
2007
2008    pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2009        if entries.is_empty() {
2010            return Ok(());
2011        }
2012
2013        let conn = self
2014            .db
2015            .lock()
2016            .map_err(|_| CoreError::poisoned("database"))?;
2017
2018        let tx = conn.unchecked_transaction()?;
2019        let mut stmt = tx.prepare(
2020            "INSERT INTO embeddings (chunk_id, model, embedded_at)
2021             VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2022             ON CONFLICT(chunk_id, model) DO UPDATE SET
2023               embedded_at = excluded.embedded_at",
2024        )?;
2025
2026        for (chunk_id, model) in entries {
2027            stmt.execute(params![chunk_id, model])?;
2028        }
2029
2030        drop(stmt);
2031        tx.commit()?;
2032        Ok(())
2033    }
2034
2035    pub fn get_unembedded_chunks(
2036        &self,
2037        model: &str,
2038        after_chunk_id: i64,
2039        limit: usize,
2040    ) -> Result<Vec<EmbedRecord>> {
2041        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2042    }
2043
2044    pub fn get_unembedded_chunks_in_space(
2045        &self,
2046        model: &str,
2047        space_id: i64,
2048        after_chunk_id: i64,
2049        limit: usize,
2050    ) -> Result<Vec<EmbedRecord>> {
2051        self.get_unembedded_chunks_filtered(
2052            model,
2053            after_chunk_id,
2054            limit,
2055            " AND col.space_id = ?",
2056            vec![SqlValue::Integer(space_id)],
2057        )
2058    }
2059
2060    pub fn get_unembedded_chunks_in_collections(
2061        &self,
2062        model: &str,
2063        collection_ids: &[i64],
2064        after_chunk_id: i64,
2065        limit: usize,
2066    ) -> Result<Vec<EmbedRecord>> {
2067        if collection_ids.is_empty() {
2068            return Ok(Vec::new());
2069        }
2070
2071        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2072        let clause = format!(" AND d.collection_id IN ({placeholders})");
2073        let params = collection_ids
2074            .iter()
2075            .map(|id| SqlValue::Integer(*id))
2076            .collect::<Vec<_>>();
2077        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2078    }
2079
2080    fn get_unembedded_chunks_filtered(
2081        &self,
2082        model: &str,
2083        after_chunk_id: i64,
2084        limit: usize,
2085        scope_clause: &str,
2086        scope_params: Vec<SqlValue>,
2087    ) -> Result<Vec<EmbedRecord>> {
2088        let conn = self
2089            .db
2090            .lock()
2091            .map_err(|_| CoreError::poisoned("database"))?;
2092        let sql_limit = i64::try_from(limit)
2093            .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2094
2095        let sql = format!(
2096            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2097                    d.path, col.path, s.name
2098             FROM chunks c
2099             JOIN documents d ON d.id = c.doc_id
2100             JOIN collections col ON col.id = d.collection_id
2101             JOIN spaces s ON s.id = col.space_id
2102             LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2103             WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2104             ORDER BY c.id ASC
2105             LIMIT ?"
2106        );
2107        let mut stmt = conn.prepare(&sql)?;
2108        let mut params = Vec::with_capacity(scope_params.len() + 3);
2109        params.push(SqlValue::Text(model.to_string()));
2110        params.push(SqlValue::Integer(after_chunk_id));
2111        params.extend(scope_params);
2112        params.push(SqlValue::Integer(sql_limit));
2113        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2114            Ok(EmbedRecord {
2115                chunk: decode_chunk_row(row)?,
2116                doc_path: row.get(8)?,
2117                collection_path: PathBuf::from(row.get::<_, String>(9)?),
2118                space_name: row.get(10)?,
2119            })
2120        })?;
2121
2122        let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2123        Ok(records)
2124    }
2125
2126    pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2127        let conn = self
2128            .db
2129            .lock()
2130            .map_err(|_| CoreError::poisoned("database"))?;
2131
2132        let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2133        Ok(deleted)
2134    }
2135
2136    pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2137        let conn = self
2138            .db
2139            .lock()
2140            .map_err(|_| CoreError::poisoned("database"))?;
2141        let _space_name = lookup_space_name(&conn, space_id)?;
2142
2143        let deleted = conn.execute(
2144            "DELETE FROM embeddings
2145             WHERE chunk_id IN (
2146                 SELECT c.id
2147                 FROM chunks c
2148                 JOIN documents d ON d.id = c.doc_id
2149                 JOIN collections col ON col.id = d.collection_id
2150                 WHERE col.space_id = ?1
2151             )",
2152            params![space_id],
2153        )?;
2154        Ok(deleted)
2155    }
2156
2157    pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2158        let conn = self
2159            .db
2160            .lock()
2161            .map_err(|_| CoreError::poisoned("database"))?;
2162        let _space_name = lookup_space_name(&conn, space_id)?;
2163
2164        let mut stmt = conn.prepare(
2165            "SELECT DISTINCT e.model
2166             FROM embeddings e
2167             JOIN chunks c ON c.id = e.chunk_id
2168             JOIN documents d ON d.id = c.doc_id
2169             JOIN collections col ON col.id = d.collection_id
2170             WHERE col.space_id = ?1
2171             ORDER BY e.model ASC",
2172        )?;
2173        let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2174        let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2175        Ok(models)
2176    }
2177
2178    pub fn count_embeddings(&self) -> Result<usize> {
2179        let conn = self
2180            .db
2181            .lock()
2182            .map_err(|_| CoreError::poisoned("database"))?;
2183
2184        let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2185        Ok(count as usize)
2186    }
2187
2188    pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2189        if entries.is_empty() {
2190            return Ok(());
2191        }
2192
2193        let space_indexes = self.get_space_indexes(space)?;
2194        with_tantivy_writer(&space_indexes, |writer| {
2195            for entry in entries {
2196                let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2197                    CoreError::Internal(format!(
2198                        "chunk_id must be non-negative for tantivy indexing: {}",
2199                        entry.chunk_id
2200                    ))
2201                })?;
2202                let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2203                    CoreError::Internal(format!(
2204                        "doc_id must be non-negative for tantivy indexing: {}",
2205                        entry.doc_id
2206                    ))
2207                })?;
2208
2209                let mut doc = TantivyDocument::default();
2210                doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2211                doc.add_u64(space_indexes.fields.doc_id, doc_id);
2212                doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2213                if let Some(title) = &entry.semantic_title {
2214                    doc.add_text(space_indexes.fields.title, title);
2215                }
2216                if let Some(heading) = &entry.heading {
2217                    doc.add_text(space_indexes.fields.heading, heading);
2218                }
2219                doc.add_text(space_indexes.fields.body, &entry.body);
2220                writer.add_document(doc)?;
2221            }
2222            Ok(())
2223        })
2224    }
2225
2226    pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2227        if chunk_ids.is_empty() {
2228            return Ok(());
2229        }
2230
2231        let space_indexes = self.get_space_indexes(space)?;
2232        with_tantivy_writer(&space_indexes, |writer| {
2233            for chunk_id in chunk_ids {
2234                let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2235                    CoreError::Internal(format!(
2236                        "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2237                    ))
2238                })?;
2239                writer.delete_term(Term::from_field_u64(
2240                    space_indexes.fields.chunk_id,
2241                    chunk_key,
2242                ));
2243            }
2244
2245            Ok(())
2246        })
2247    }
2248
2249    pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2250        let space_indexes = self.get_space_indexes(space)?;
2251        with_tantivy_writer(&space_indexes, |writer| {
2252            let doc_key = u64::try_from(doc_id).map_err(|_| {
2253                CoreError::Internal(format!(
2254                    "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2255                ))
2256            })?;
2257            writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2258            Ok(())
2259        })
2260    }
2261
2262    pub fn query_bm25(
2263        &self,
2264        space: &str,
2265        query: &str,
2266        fields: &[(&str, f32)],
2267        limit: usize,
2268    ) -> Result<Vec<BM25Hit>> {
2269        self.query_bm25_filtered(space, query, fields, None, limit, true)
2270    }
2271
2272    pub(crate) fn query_bm25_cached(
2273        &self,
2274        space: &str,
2275        query: &str,
2276        fields: &[(&str, f32)],
2277        limit: usize,
2278    ) -> Result<Vec<BM25Hit>> {
2279        self.query_bm25_filtered(space, query, fields, None, limit, false)
2280    }
2281
2282    pub fn query_bm25_in_documents(
2283        &self,
2284        space: &str,
2285        query: &str,
2286        fields: &[(&str, f32)],
2287        document_ids: &[i64],
2288        limit: usize,
2289    ) -> Result<Vec<BM25Hit>> {
2290        if document_ids.is_empty() {
2291            return Ok(Vec::new());
2292        }
2293
2294        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2295    }
2296
2297    pub(crate) fn query_bm25_in_documents_cached(
2298        &self,
2299        space: &str,
2300        query: &str,
2301        fields: &[(&str, f32)],
2302        document_ids: &[i64],
2303        limit: usize,
2304    ) -> Result<Vec<BM25Hit>> {
2305        if document_ids.is_empty() {
2306            return Ok(Vec::new());
2307        }
2308
2309        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2310    }
2311
2312    fn query_bm25_filtered(
2313        &self,
2314        space: &str,
2315        query: &str,
2316        fields: &[(&str, f32)],
2317        document_ids: Option<&[i64]>,
2318        limit: usize,
2319        reload_reader: bool,
2320    ) -> Result<Vec<BM25Hit>> {
2321        if limit == 0 {
2322            return Ok(Vec::new());
2323        }
2324
2325        if fields.is_empty() {
2326            return Err(CoreError::Internal(
2327                "bm25 query requires at least one field".to_string(),
2328            ));
2329        }
2330
2331        let space_indexes = self.get_space_indexes(space)?;
2332
2333        let schema = space_indexes.tantivy_index.schema();
2334        let query_fields = fields
2335            .iter()
2336            .map(|(name, boost)| {
2337                let field = resolve_tantivy_field(space_indexes.fields, name)?;
2338                let field_entry = schema.get_field_entry(field);
2339                let index_record_option = field_entry
2340                    .field_type()
2341                    .get_index_record_option()
2342                    .ok_or_else(|| {
2343                        CoreError::Internal(format!(
2344                            "bm25 field '{}' is not indexed",
2345                            field_entry.name()
2346                        ))
2347                    })?;
2348                Ok(Bm25FieldSpec {
2349                    field,
2350                    boost: *boost,
2351                    index_record_option,
2352                })
2353            })
2354            .collect::<Result<Vec<_>>>()?;
2355        let Some(parsed_query) =
2356            build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2357        else {
2358            return Ok(Vec::new());
2359        };
2360        let parsed_query = if let Some(document_ids) = document_ids {
2361            Box::new(BooleanQuery::new(vec![
2362                (Occur::Must, parsed_query),
2363                (
2364                    Occur::Must,
2365                    build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2366                ),
2367            ])) as Box<dyn Query>
2368        } else {
2369            parsed_query
2370        };
2371        if reload_reader {
2372            space_indexes.tantivy_reader.reload()?;
2373        }
2374        let searcher = space_indexes.tantivy_reader.searcher();
2375        let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2376
2377        let mut hits = Vec::with_capacity(docs.len());
2378        let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2379        let mut chunk_id_columns = HashMap::new();
2380        for (score, address) in docs {
2381            let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2382                std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2383                std::collections::hash_map::Entry::Vacant(entry) => {
2384                    let column = searcher
2385                        .segment_reader(address.segment_ord)
2386                        .fast_fields()
2387                        .u64(chunk_id_field_name)?;
2388                    entry.insert(column)
2389                }
2390            };
2391            let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2392                CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2393            })?;
2394            let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2395                CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2396            })?;
2397            hits.push(BM25Hit { chunk_id, score });
2398        }
2399
2400        Ok(hits)
2401    }
2402
2403    pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2404        let space_indexes = self.get_space_indexes(space)?;
2405        space_indexes.tantivy_reader.reload()?;
2406        Ok(())
2407    }
2408
2409    pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2410        let space_indexes = self.get_space_indexes(space)?;
2411        let mut writer = space_indexes
2412            .tantivy_writer
2413            .lock()
2414            .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2415        let Some(mut writer) = writer.take() else {
2416            return Ok(());
2417        };
2418        writer.commit()?;
2419        space_indexes.tantivy_reader.reload()?;
2420        Ok(())
2421    }
2422
2423    pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2424        self.batch_insert_usearch(space, &[(key, vector)])
2425    }
2426
2427    pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2428        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2429    }
2430
2431    pub(crate) fn batch_insert_usearch_deferred(
2432        &self,
2433        space: &str,
2434        entries: &[(i64, &[f32])],
2435    ) -> Result<()> {
2436        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2437    }
2438
2439    fn batch_insert_usearch_with_save_mode(
2440        &self,
2441        space: &str,
2442        entries: &[(i64, &[f32])],
2443        save_mode: UsearchSaveMode,
2444    ) -> Result<()> {
2445        if entries.is_empty() {
2446            return Ok(());
2447        }
2448
2449        let space_indexes = self.get_space_indexes(space)?;
2450
2451        let expected_dimensions = entries[0].1.len();
2452        if expected_dimensions == 0 {
2453            return Err(CoreError::Internal(
2454                "cannot insert empty vector into usearch index".to_string(),
2455            ));
2456        }
2457        for (_, vector) in entries {
2458            if vector.len() != expected_dimensions {
2459                return Err(CoreError::Internal(format!(
2460                    "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2461                    vector.len()
2462                )));
2463            }
2464        }
2465
2466        let mut index = space_indexes
2467            .usearch_index
2468            .write()
2469            .map_err(|_| CoreError::poisoned("usearch index"))?;
2470        let ensure_started = std::time::Instant::now();
2471        ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2472        crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2473
2474        let target_capacity = index.size().saturating_add(entries.len());
2475        let reserve_started = std::time::Instant::now();
2476        index.reserve(target_capacity).map_err(|err| {
2477            CoreError::Internal(format!(
2478                "usearch reserve failed for {target_capacity} items: {err}"
2479            ))
2480        })?;
2481        crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2482
2483        if matches!(save_mode, UsearchSaveMode::Deferred) {
2484            self.mark_usearch_dirty(space)?;
2485        }
2486
2487        let add_started = std::time::Instant::now();
2488        for (key, vector) in entries {
2489            let key = u64::try_from(*key).map_err(|_| {
2490                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2491            })?;
2492            index
2493                .add::<f32>(key, vector)
2494                .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2495        }
2496        crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2497
2498        if matches!(save_mode, UsearchSaveMode::Immediate) {
2499            let save_started = std::time::Instant::now();
2500            save_usearch_index(&index, &space_indexes.usearch_path)?;
2501            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2502        }
2503        Ok(())
2504    }
2505
2506    pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2507        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2508    }
2509
2510    pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2511        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2512    }
2513
2514    fn delete_usearch_with_save_mode(
2515        &self,
2516        space: &str,
2517        keys: &[i64],
2518        save_mode: UsearchSaveMode,
2519    ) -> Result<()> {
2520        if keys.is_empty() {
2521            return Ok(());
2522        }
2523
2524        let space_indexes = self.get_space_indexes(space)?;
2525        let index = space_indexes
2526            .usearch_index
2527            .write()
2528            .map_err(|_| CoreError::poisoned("usearch index"))?;
2529
2530        if matches!(save_mode, UsearchSaveMode::Deferred) {
2531            self.mark_usearch_dirty(space)?;
2532        }
2533
2534        let delete_started = std::time::Instant::now();
2535        for key in keys {
2536            let key = u64::try_from(*key).map_err(|_| {
2537                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2538            })?;
2539            index
2540                .remove(key)
2541                .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2542        }
2543        crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2544
2545        if matches!(save_mode, UsearchSaveMode::Immediate) {
2546            let save_started = std::time::Instant::now();
2547            save_usearch_index(&index, &space_indexes.usearch_path)?;
2548            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2549        }
2550        Ok(())
2551    }
2552
2553    pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2554        let spaces = {
2555            let mut dirty = self
2556                .dirty_usearch_spaces
2557                .lock()
2558                .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2559            if dirty.is_empty() {
2560                return Ok(());
2561            }
2562
2563            let mut spaces = dirty.drain().collect::<Vec<_>>();
2564            spaces.sort();
2565            spaces
2566        };
2567
2568        for (index, space) in spaces.iter().enumerate() {
2569            if let Err(err) = self.save_usearch_for_space(space) {
2570                let mut dirty = self
2571                    .dirty_usearch_spaces
2572                    .lock()
2573                    .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2574                for unsaved in &spaces[index..] {
2575                    dirty.insert(unsaved.clone());
2576                }
2577                return Err(err);
2578            }
2579            crate::profile::increment_update_count("usearch_saved_spaces", 1);
2580        }
2581
2582        Ok(())
2583    }
2584
2585    fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2586        self.dirty_usearch_spaces
2587            .lock()
2588            .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2589            .insert(space.to_string());
2590        Ok(())
2591    }
2592
2593    fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2594        let space_indexes = self.get_space_indexes(space)?;
2595        let index = space_indexes
2596            .usearch_index
2597            .read()
2598            .map_err(|_| CoreError::poisoned("usearch index"))?;
2599
2600        let save_started = std::time::Instant::now();
2601        save_usearch_index(&index, &space_indexes.usearch_path)?;
2602        crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2603        Ok(())
2604    }
2605
2606    pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2607        self.query_dense_filtered(space, vector, None, limit)
2608    }
2609
2610    pub fn query_dense_in_chunks(
2611        &self,
2612        space: &str,
2613        vector: &[f32],
2614        chunk_ids: &[i64],
2615        limit: usize,
2616    ) -> Result<Vec<DenseHit>> {
2617        if chunk_ids.is_empty() {
2618            return Ok(Vec::new());
2619        }
2620
2621        let allowed_keys = chunk_ids
2622            .iter()
2623            .map(|chunk_id| {
2624                u64::try_from(*chunk_id).map_err(|_| {
2625                    CoreError::Internal(format!(
2626                        "chunk_id must be non-negative for usearch query: {chunk_id}"
2627                    ))
2628                })
2629            })
2630            .collect::<Result<HashSet<_>>>()?;
2631        self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2632    }
2633
2634    pub(crate) fn query_dense_in_key_set(
2635        &self,
2636        space: &str,
2637        vector: &[f32],
2638        allowed_keys: &HashSet<u64>,
2639        limit: usize,
2640    ) -> Result<Vec<DenseHit>> {
2641        if allowed_keys.is_empty() {
2642            return Ok(Vec::new());
2643        }
2644
2645        self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2646    }
2647
2648    fn query_dense_filtered(
2649        &self,
2650        space: &str,
2651        vector: &[f32],
2652        allowed_keys: Option<&HashSet<u64>>,
2653        limit: usize,
2654    ) -> Result<Vec<DenseHit>> {
2655        if limit == 0 {
2656            return Ok(Vec::new());
2657        }
2658        if vector.is_empty() {
2659            return Err(CoreError::Internal(
2660                "cannot query usearch with empty vector".to_string(),
2661            ));
2662        }
2663
2664        let space_indexes = self.get_space_indexes(space)?;
2665        let index = space_indexes
2666            .usearch_index
2667            .read()
2668            .map_err(|_| CoreError::poisoned("usearch index"))?;
2669
2670        if index.size() == 0 {
2671            return Ok(Vec::new());
2672        }
2673        if vector.len() != index.dimensions() {
2674            return Err(CoreError::Internal(format!(
2675                "query vector dimension mismatch: expected {}, got {}",
2676                index.dimensions(),
2677                vector.len()
2678            )));
2679        }
2680
2681        let matches = if let Some(allowed_keys) = allowed_keys {
2682            index
2683                .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2684                .map_err(|err| {
2685                    CoreError::Internal(format!("usearch filtered query failed: {err}"))
2686                })?
2687        } else {
2688            index
2689                .search::<f32>(vector, limit)
2690                .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2691        };
2692        let hits = matches
2693            .keys
2694            .into_iter()
2695            .zip(matches.distances)
2696            .map(|(key, distance)| DenseHit {
2697                chunk_id: key as i64,
2698                distance,
2699            })
2700            .collect();
2701        Ok(hits)
2702    }
2703
2704    pub fn count_usearch(&self, space: &str) -> Result<usize> {
2705        let space_indexes = self.get_space_indexes(space)?;
2706        let index = space_indexes
2707            .usearch_index
2708            .read()
2709            .map_err(|_| CoreError::poisoned("usearch index"))?;
2710        Ok(index.size())
2711    }
2712
2713    pub fn clear_usearch(&self, space: &str) -> Result<()> {
2714        let space_indexes = self.get_space_indexes(space)?;
2715        let index = space_indexes
2716            .usearch_index
2717            .write()
2718            .map_err(|_| CoreError::poisoned("usearch index"))?;
2719        let clear_started = std::time::Instant::now();
2720        index
2721            .reset()
2722            .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2723        std::fs::File::create(&space_indexes.usearch_path)?;
2724        crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2725        Ok(())
2726    }
2727
2728    pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2729        self.get_fts_dirty_documents_filtered("", Vec::new())
2730    }
2731
2732    pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2733        self.get_fts_dirty_documents_filtered(
2734            " AND c.space_id = ?",
2735            vec![SqlValue::Integer(space_id)],
2736        )
2737    }
2738
2739    pub fn get_fts_dirty_documents_in_collections(
2740        &self,
2741        collection_ids: &[i64],
2742    ) -> Result<Vec<FtsDirtyRecord>> {
2743        if collection_ids.is_empty() {
2744            return Ok(Vec::new());
2745        }
2746
2747        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2748        let clause = format!(" AND d.collection_id IN ({placeholders})");
2749        let params = collection_ids
2750            .iter()
2751            .map(|id| SqlValue::Integer(*id))
2752            .collect::<Vec<_>>();
2753        self.get_fts_dirty_documents_filtered(&clause, params)
2754    }
2755
2756    fn get_fts_dirty_documents_filtered(
2757        &self,
2758        scope_clause: &str,
2759        scope_params: Vec<SqlValue>,
2760    ) -> Result<Vec<FtsDirtyRecord>> {
2761        let conn = self
2762            .db
2763            .lock()
2764            .map_err(|_| CoreError::poisoned("database"))?;
2765
2766        let sql = format!(
2767            "SELECT d.id, d.path, d.title, d.title_source, d.hash, c.path, s.name
2768             FROM documents d
2769             JOIN collections c ON c.id = d.collection_id
2770             JOIN spaces s ON s.id = c.space_id
2771             WHERE d.fts_dirty = 1{scope_clause}
2772             ORDER BY d.id ASC"
2773        );
2774        let mut stmt = conn.prepare(&sql)?;
2775        let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2776            Ok((
2777                row.get::<_, i64>(0)?,
2778                row.get::<_, String>(1)?,
2779                row.get::<_, String>(2)?,
2780                row.get::<_, String>(3)?,
2781                row.get::<_, String>(4)?,
2782                row.get::<_, String>(5)?,
2783                row.get::<_, String>(6)?,
2784            ))
2785        })?;
2786        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2787        drop(stmt);
2788
2789        let mut records = Vec::with_capacity(headers.len());
2790        for (
2791            doc_id,
2792            doc_path,
2793            doc_title,
2794            doc_title_source,
2795            doc_hash,
2796            collection_path,
2797            space_name,
2798        ) in headers
2799        {
2800            let chunks = load_chunks_for_doc(&conn, doc_id)?;
2801            records.push(FtsDirtyRecord {
2802                doc_id,
2803                doc_path,
2804                doc_title,
2805                doc_title_source: DocumentTitleSource::from_sql(&doc_title_source)?,
2806                doc_hash,
2807                collection_path: PathBuf::from(collection_path),
2808                space_name,
2809                chunks,
2810            });
2811        }
2812
2813        Ok(records)
2814    }
2815
2816    pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2817        if doc_ids.is_empty() {
2818            return Ok(());
2819        }
2820
2821        let conn = self
2822            .db
2823            .lock()
2824            .map_err(|_| CoreError::poisoned("database"))?;
2825
2826        let placeholders = vec!["?"; doc_ids.len()].join(", ");
2827        let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2828        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2829        Ok(())
2830    }
2831
2832    pub fn count_documents_in_collection(
2833        &self,
2834        collection_id: i64,
2835        active_only: bool,
2836    ) -> Result<usize> {
2837        let conn = self
2838            .db
2839            .lock()
2840            .map_err(|_| CoreError::poisoned("database"))?;
2841        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2842        let active_only = i64::from(active_only);
2843        query_count(
2844            &conn,
2845            "SELECT COUNT(*)
2846             FROM documents
2847             WHERE collection_id = ?1
2848               AND (?2 = 0 OR active = 1)",
2849            params![collection_id, active_only],
2850        )
2851    }
2852
2853    pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2854        let conn = self
2855            .db
2856            .lock()
2857            .map_err(|_| CoreError::poisoned("database"))?;
2858        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2859
2860        query_count(
2861            &conn,
2862            "SELECT COUNT(*)
2863             FROM chunks c
2864             JOIN documents d ON d.id = c.doc_id
2865             WHERE d.collection_id = ?1",
2866            params![collection_id],
2867        )
2868    }
2869
2870    pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2871        let conn = self
2872            .db
2873            .lock()
2874            .map_err(|_| CoreError::poisoned("database"))?;
2875        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2876
2877        query_count(
2878            &conn,
2879            "SELECT COUNT(DISTINCT e.chunk_id)
2880             FROM embeddings e
2881             JOIN chunks c ON c.id = e.chunk_id
2882             JOIN documents d ON d.id = c.doc_id
2883             WHERE d.collection_id = ?1",
2884            params![collection_id],
2885        )
2886    }
2887
2888    pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2889        let conn = self
2890            .db
2891            .lock()
2892            .map_err(|_| CoreError::poisoned("database"))?;
2893
2894        match space_id {
2895            Some(space_id) => {
2896                let _space_name = lookup_space_name(&conn, space_id)?;
2897                query_count(
2898                    &conn,
2899                    "SELECT COUNT(*)
2900                     FROM documents d
2901                     JOIN collections c ON c.id = d.collection_id
2902                     WHERE c.space_id = ?1",
2903                    params![space_id],
2904                )
2905            }
2906            None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2907        }
2908    }
2909
2910    pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2911        let conn = self
2912            .db
2913            .lock()
2914            .map_err(|_| CoreError::poisoned("database"))?;
2915
2916        match space_id {
2917            Some(space_id) => {
2918                let _space_name = lookup_space_name(&conn, space_id)?;
2919                query_count(
2920                    &conn,
2921                    "SELECT COUNT(*)
2922                     FROM chunks c
2923                     JOIN documents d ON d.id = c.doc_id
2924                     JOIN collections col ON col.id = d.collection_id
2925                     WHERE col.space_id = ?1",
2926                    params![space_id],
2927                )
2928            }
2929            None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2930        }
2931    }
2932
2933    pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2934        let conn = self
2935            .db
2936            .lock()
2937            .map_err(|_| CoreError::poisoned("database"))?;
2938
2939        match space_id {
2940            Some(space_id) => {
2941                let _space_name = lookup_space_name(&conn, space_id)?;
2942                query_count(
2943                    &conn,
2944                    "SELECT COUNT(DISTINCT e.chunk_id)
2945                     FROM embeddings e
2946                     JOIN chunks c ON c.id = e.chunk_id
2947                     JOIN documents d ON d.id = c.doc_id
2948                     JOIN collections col ON col.id = d.collection_id
2949                     WHERE col.space_id = ?1",
2950                    params![space_id],
2951                )
2952            }
2953            None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2954        }
2955    }
2956
2957    pub fn disk_usage(&self) -> Result<DiskUsage> {
2958        let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2959
2960        let mut tantivy_bytes = 0_u64;
2961        let mut usearch_bytes = 0_u64;
2962        let spaces_dir = self.cache_dir.join(SPACES_DIR);
2963        if spaces_dir.exists() {
2964            for entry in std::fs::read_dir(&spaces_dir)? {
2965                let space_dir = entry?.path();
2966                if !space_dir.is_dir() {
2967                    continue;
2968                }
2969
2970                tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2971                usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2972            }
2973        }
2974
2975        let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2976        let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2977
2978        Ok(DiskUsage {
2979            sqlite_bytes,
2980            tantivy_bytes,
2981            usearch_bytes,
2982            models_bytes,
2983            total_bytes,
2984        })
2985    }
2986
2987    fn unload_space(&self, name: &str) -> Result<()> {
2988        let mut spaces = self
2989            .spaces
2990            .write()
2991            .map_err(|_| CoreError::poisoned("spaces"))?;
2992        spaces.remove(name);
2993        Ok(())
2994    }
2995
2996    fn remove_space_artifacts(&self, name: &str) -> Result<()> {
2997        let space_root = self.space_root_path(name);
2998        if space_root.exists() {
2999            std::fs::remove_dir_all(space_root)?;
3000        }
3001        Ok(())
3002    }
3003
3004    fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
3005        let old_root = self.space_root_path(old);
3006        let new_root = self.space_root_path(new);
3007        if !old_root.exists() {
3008            return Ok(());
3009        }
3010
3011        if new_root.exists() {
3012            return Err(CoreError::Internal(format!(
3013                "cannot rename space artifacts: destination already exists: {}",
3014                new_root.display()
3015            )));
3016        }
3017
3018        if let Some(parent) = new_root.parent() {
3019            std::fs::create_dir_all(parent)?;
3020        }
3021        std::fs::rename(old_root, new_root)?;
3022        Ok(())
3023    }
3024
3025    fn space_root_path(&self, name: &str) -> PathBuf {
3026        self.cache_dir.join(SPACES_DIR).join(name)
3027    }
3028
3029    fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3030        let space_root = self.space_root_path(name);
3031        let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3032        let usearch_path = space_root.join(USEARCH_FILENAME);
3033        (tantivy_dir, usearch_path)
3034    }
3035
3036    fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3037        self.open_space(name)?;
3038        let spaces = self
3039            .spaces
3040            .read()
3041            .map_err(|_| CoreError::poisoned("spaces"))?;
3042        spaces.get(name).cloned().ok_or_else(|| {
3043            KboltError::SpaceNotFound {
3044                name: name.to_string(),
3045            }
3046            .into()
3047        })
3048    }
3049}
3050
3051fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3052    let result = conn.query_row(
3053        "SELECT name FROM spaces WHERE id = ?1",
3054        params![space_id],
3055        |row| row.get::<_, String>(0),
3056    );
3057    match result {
3058        Ok(name) => Ok(name),
3059        Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3060            name: format!("id={space_id}"),
3061        }
3062        .into()),
3063        Err(err) => Err(err.into()),
3064    }
3065}
3066
3067fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3068    let result = conn.query_row(
3069        "SELECT name FROM collections WHERE id = ?1",
3070        params![collection_id],
3071        |row| row.get::<_, String>(0),
3072    );
3073    match result {
3074        Ok(name) => Ok(name),
3075        Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3076            name: format!("id={collection_id}"),
3077        }
3078        .into()),
3079        Err(err) => Err(err.into()),
3080    }
3081}
3082
3083fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3084    let result = conn.query_row(
3085        "SELECT id FROM documents WHERE id = ?1",
3086        params![doc_id],
3087        |row| row.get::<_, i64>(0),
3088    );
3089    match result {
3090        Ok(id) => Ok(id),
3091        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3092            path: format!("id={doc_id}"),
3093        }
3094        .into()),
3095        Err(err) => Err(err.into()),
3096    }
3097}
3098
3099fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3100    let mut stmt = conn.prepare(
3101        "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3102         FROM chunks
3103         WHERE doc_id = ?1
3104         ORDER BY seq ASC",
3105    )?;
3106    let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3107    let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3108    Ok(chunks)
3109}
3110
3111fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3112    let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3113    let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3114    let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3115    Ok(chunk_ids)
3116}
3117
3118fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3119    let result = conn.query_row(
3120        "SELECT doc_id FROM chunks WHERE id = ?1",
3121        params![chunk_id],
3122        |row| row.get::<_, i64>(0),
3123    );
3124    match result {
3125        Ok(doc_id) => Ok(doc_id),
3126        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3127            path: format!("chunk_id={chunk_id}"),
3128        }
3129        .into()),
3130        Err(err) => Err(err.into()),
3131    }
3132}
3133
3134fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3135    let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3136    Ok(count as usize)
3137}
3138
3139fn file_size_or_zero(path: &Path) -> Result<u64> {
3140    if !path.exists() {
3141        return Ok(0);
3142    }
3143
3144    let metadata = std::fs::metadata(path)?;
3145    if metadata.is_file() {
3146        Ok(metadata.len())
3147    } else {
3148        Ok(0)
3149    }
3150}
3151
3152fn dir_size_or_zero(path: &Path) -> Result<u64> {
3153    if !path.exists() {
3154        return Ok(0);
3155    }
3156
3157    let mut total = 0_u64;
3158    for entry in std::fs::read_dir(path)? {
3159        let entry = entry?;
3160        let child_path = entry.path();
3161        let metadata = std::fs::symlink_metadata(&child_path)?;
3162        if metadata.is_file() {
3163            total += metadata.len();
3164        } else if metadata.is_dir() {
3165            total += dir_size_or_zero(&child_path)?;
3166        }
3167    }
3168
3169    Ok(total)
3170}
3171
3172fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3173    let meta_path = path.join("meta.json");
3174    if meta_path.exists() {
3175        return Ok(Index::open_in_dir(path)?);
3176    }
3177
3178    Ok(Index::create_in_dir(path, tantivy_schema())?)
3179}
3180
3181fn with_tantivy_writer<T>(
3182    space_indexes: &SpaceIndexes,
3183    f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3184) -> Result<T> {
3185    let mut writer = space_indexes
3186        .tantivy_writer
3187        .lock()
3188        .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3189
3190    if writer.is_none() {
3191        *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3192    }
3193
3194    let writer = writer
3195        .as_mut()
3196        .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3197    f(writer)
3198}
3199
3200fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3201    if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3202        return Ok(());
3203    }
3204
3205    let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3206    if document_count == 0 {
3207        return Ok(());
3208    }
3209
3210    Err(KboltError::Config(
3211        "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3212    )
3213    .into())
3214}
3215
3216fn ensure_schema_version(conn: &Connection) -> Result<()> {
3217    let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3218    if current > SCHEMA_VERSION {
3219        return Err(KboltError::Config(format!(
3220            "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3221        ))
3222        .into());
3223    }
3224
3225    if current < SCHEMA_VERSION {
3226        conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3227    }
3228
3229    Ok(())
3230}
3231
3232fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3233    let exists = conn.query_row(
3234        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3235        [table],
3236        |row| row.get::<_, i64>(0),
3237    )?;
3238    Ok(exists != 0)
3239}
3240
3241fn ensure_documents_title_source_column(conn: &Connection) -> Result<()> {
3242    let mut stmt = conn.prepare("PRAGMA table_info(documents)")?;
3243    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3244    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3245    drop(stmt);
3246
3247    if columns.iter().any(|column| column == "title_source") {
3248        return Ok(());
3249    }
3250
3251    conn.execute(
3252        "ALTER TABLE documents ADD COLUMN title_source TEXT NOT NULL DEFAULT 'extracted'",
3253        [],
3254    )?;
3255    Ok(())
3256}
3257
3258fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3259    let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3260    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3261    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3262    drop(stmt);
3263
3264    if columns.iter().any(|column| column == "generation_key") {
3265        return Ok(());
3266    }
3267
3268    conn.execute(
3269        "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3270        [],
3271    )?;
3272    Ok(())
3273}
3274
3275fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3276    let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3277    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3278    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3279    drop(stmt);
3280
3281    if columns.iter().any(|column| column == "retrieval_prefix") {
3282        return Ok(());
3283    }
3284
3285    conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3286    Ok(())
3287}
3288
3289fn build_literal_bm25_query(
3290    index: &Index,
3291    fields: &[Bm25FieldSpec],
3292    query: &str,
3293) -> Result<Option<Box<dyn Query>>> {
3294    let mut clauses = Vec::new();
3295    for field in fields {
3296        for token in analyzed_terms_for_field(index, field.field, query)? {
3297            let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3298                Term::from_field_text(field.field, &token),
3299                field.index_record_option,
3300            ));
3301            let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3302                Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3303            } else {
3304                term_query
3305            };
3306            clauses.push((Occur::Should, query));
3307        }
3308    }
3309
3310    if clauses.is_empty() {
3311        Ok(None)
3312    } else {
3313        Ok(Some(Box::new(BooleanQuery::new(clauses))))
3314    }
3315}
3316
3317fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3318    let mut terms = Vec::new();
3319    let mut seen = HashSet::new();
3320    for doc_id in document_ids {
3321        let doc_id = u64::try_from(*doc_id).map_err(|_| {
3322            CoreError::Internal(format!(
3323                "doc_id must be non-negative for tantivy query: {doc_id}"
3324            ))
3325        })?;
3326        if seen.insert(doc_id) {
3327            terms.push(Term::from_field_u64(field, doc_id));
3328        }
3329    }
3330
3331    Ok(Box::new(ConstScoreQuery::new(
3332        Box::new(TermSetQuery::new(terms)),
3333        0.0,
3334    )))
3335}
3336
3337fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3338    let mut analyzer = index.tokenizer_for_field(field)?;
3339    let mut stream = analyzer.token_stream(query);
3340    let mut terms = Vec::new();
3341    let mut seen = HashSet::new();
3342    while let Some(token) = stream.next() {
3343        if token.text.is_empty() {
3344            continue;
3345        }
3346        let text = token.text.clone();
3347        if seen.insert(text.clone()) {
3348            terms.push(text);
3349        }
3350    }
3351    Ok(terms)
3352}
3353
3354fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3355    let options = IndexOptions {
3356        dimensions,
3357        metric: MetricKind::Cos,
3358        quantization: ScalarKind::F32,
3359        connectivity: 16,
3360        expansion_add: 200,
3361        expansion_search: 100,
3362        ..IndexOptions::default()
3363    };
3364    usearch::Index::new(&options)
3365        .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3366}
3367
3368fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3369    let index = new_usearch_index(256)?;
3370    let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3371    if file_size > 0 {
3372        let path = path
3373            .to_str()
3374            .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3375        index
3376            .load(path)
3377            .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3378    }
3379    Ok(index)
3380}
3381
3382fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3383    if index.size() == 0 && index.dimensions() != expected_dimensions {
3384        *index = new_usearch_index(expected_dimensions)?;
3385        return Ok(());
3386    }
3387
3388    if index.dimensions() != expected_dimensions {
3389        return Err(CoreError::Internal(format!(
3390            "usearch vector dimension mismatch: index expects {}, got {}",
3391            index.dimensions(),
3392            expected_dimensions
3393        )));
3394    }
3395    Ok(())
3396}
3397
3398fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3399    if index.size() == 0 {
3400        std::fs::File::create(path)?;
3401        return Ok(());
3402    }
3403
3404    let path = path
3405        .to_str()
3406        .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3407    index
3408        .save(path)
3409        .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3410    Ok(())
3411}
3412
3413fn tantivy_schema() -> tantivy::schema::Schema {
3414    let mut builder = tantivy::schema::Schema::builder();
3415    builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3416    builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3417    builder.add_text_field("filepath", TEXT | STORED);
3418    builder.add_text_field("title", TEXT | STORED);
3419    builder.add_text_field("heading", TEXT | STORED);
3420    builder.add_text_field("body", TEXT);
3421    builder.build()
3422}
3423
3424fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3425    Ok(TantivyFields {
3426        chunk_id: schema.get_field("chunk_id").map_err(|_| {
3427            CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3428        })?,
3429        doc_id: schema
3430            .get_field("doc_id")
3431            .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3432        filepath: schema.get_field("filepath").map_err(|_| {
3433            CoreError::Internal("tantivy schema missing field: filepath".to_string())
3434        })?,
3435        title: schema
3436            .get_field("title")
3437            .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3438        heading: schema.get_field("heading").map_err(|_| {
3439            CoreError::Internal("tantivy schema missing field: heading".to_string())
3440        })?,
3441        body: schema
3442            .get_field("body")
3443            .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3444    })
3445}
3446
3447fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3448    match name {
3449        "chunk_id" => Ok(fields.chunk_id),
3450        "doc_id" => Ok(fields.doc_id),
3451        "filepath" => Ok(fields.filepath),
3452        "title" => Ok(fields.title),
3453        "heading" => Ok(fields.heading),
3454        "body" => Ok(fields.body),
3455        other => Err(CoreError::Internal(format!(
3456            "unsupported tantivy field: {other}"
3457        ))),
3458    }
3459}
3460
3461fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3462    match extensions {
3463        None => Ok(None),
3464        Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3465    }
3466}
3467
3468fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3469    match raw {
3470        None => Ok(None),
3471        Some(json) => serde_json::from_str::<Vec<String>>(&json)
3472            .map(Some)
3473            .map_err(Into::into),
3474    }
3475}
3476
3477fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3478    Ok(SpaceRow {
3479        id: row.get(0)?,
3480        name: row.get(1)?,
3481        description: row.get(2)?,
3482        created: row.get(3)?,
3483    })
3484}
3485
3486fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3487    let raw_extensions: Option<String> = row.get(5)?;
3488    let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3489        Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3490    })?;
3491    Ok(CollectionRow {
3492        id: row.get(0)?,
3493        space_id: row.get(1)?,
3494        name: row.get(2)?,
3495        path: PathBuf::from(row.get::<_, String>(3)?),
3496        description: row.get(4)?,
3497        extensions,
3498        created: row.get(6)?,
3499        updated: row.get(7)?,
3500    })
3501}
3502
3503fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3504    let raw_title_source: String = row.get(4)?;
3505    let title_source = DocumentTitleSource::from_sql(&raw_title_source).map_err(|err| {
3506        Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(err))
3507    })?;
3508    let active_value: i64 = row.get(7)?;
3509    let fts_dirty_value: i64 = row.get(9)?;
3510    Ok(DocumentRow {
3511        id: row.get(0)?,
3512        collection_id: row.get(1)?,
3513        path: row.get(2)?,
3514        title: row.get(3)?,
3515        title_source,
3516        hash: row.get(5)?,
3517        modified: row.get(6)?,
3518        active: active_value != 0,
3519        deactivated_at: row.get(8)?,
3520        fts_dirty: fts_dirty_value != 0,
3521    })
3522}
3523
3524fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3525    Ok(DocumentTextRow {
3526        doc_id: row.get(0)?,
3527        extractor_key: row.get(1)?,
3528        source_hash: row.get(2)?,
3529        text_hash: row.get(3)?,
3530        generation_key: row.get(4)?,
3531        text: row.get(5)?,
3532        created: row.get(6)?,
3533    })
3534}
3535
3536fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3537    let offset_value: i64 = row.get(3)?;
3538    let length_value: i64 = row.get(4)?;
3539    let kind_raw: String = row.get(6)?;
3540    let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3541        Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(err))
3542    })?;
3543    Ok(ChunkRow {
3544        id: row.get(0)?,
3545        doc_id: row.get(1)?,
3546        seq: row.get(2)?,
3547        offset: decode_non_negative_usize(offset_value, 3, "chunks.offset")?,
3548        length: decode_non_negative_usize(length_value, 4, "chunks.length")?,
3549        heading: row.get(5)?,
3550        kind,
3551        retrieval_prefix: row.get(7)?,
3552    })
3553}
3554
3555fn decode_non_negative_usize(
3556    value: i64,
3557    column: usize,
3558    name: &'static str,
3559) -> rusqlite::Result<usize> {
3560    if value < 0 {
3561        return Err(Error::FromSqlConversionFailure(
3562            column,
3563            SqlType::Integer,
3564            Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3565        ));
3566    }
3567
3568    Ok(value as usize)
3569}
3570
3571fn canonical_chunk_slice_from_bytes(
3572    chunk_id: i64,
3573    offset_value: i64,
3574    length_value: i64,
3575    document_len_value: i64,
3576    start_byte: Vec<u8>,
3577    end_byte: Vec<u8>,
3578    bytes: Vec<u8>,
3579) -> Result<String> {
3580    if offset_value < 0 {
3581        return Err(CoreError::Internal(format!(
3582            "chunk {chunk_id} offset must not be negative"
3583        )));
3584    }
3585    if length_value < 0 {
3586        return Err(CoreError::Internal(format!(
3587            "chunk {chunk_id} length must not be negative"
3588        )));
3589    }
3590    if document_len_value < 0 {
3591        return Err(CoreError::Internal(format!(
3592            "document text length for chunk {chunk_id} must not be negative"
3593        )));
3594    }
3595
3596    let offset = offset_value as usize;
3597    let length = length_value as usize;
3598    let document_len = document_len_value as usize;
3599    let end = offset.checked_add(length).ok_or_else(|| {
3600        CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3601    })?;
3602    if end > document_len {
3603        return Err(CoreError::Internal(format!(
3604            "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3605        )));
3606    }
3607    if bytes.len() != length {
3608        return Err(CoreError::Internal(format!(
3609            "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3610            bytes.len()
3611        )));
3612    }
3613    validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3614    validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3615
3616    String::from_utf8(bytes).map_err(|err| {
3617        CoreError::Internal(format!(
3618            "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3619        ))
3620    })
3621}
3622
3623fn validate_sqlite_utf8_boundary(
3624    chunk_id: i64,
3625    index: usize,
3626    document_len: usize,
3627    byte_at_index: &[u8],
3628) -> Result<()> {
3629    if index == 0 || index == document_len {
3630        return Ok(());
3631    }
3632    if byte_at_index.len() != 1 {
3633        return Err(CoreError::Internal(format!(
3634            "chunk {chunk_id} text boundary byte at {index} is missing"
3635        )));
3636    }
3637    if (0x80..0xC0).contains(&byte_at_index[0]) {
3638        return Err(CoreError::Internal(format!(
3639            "chunk {chunk_id} text span is not on UTF-8 boundaries"
3640        )));
3641    }
3642
3643    Ok(())
3644}
3645
3646pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3647    let label = format!("chunk {}", chunk.id);
3648    let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3649
3650    Ok(document_text[chunk.offset..end].to_string())
3651}
3652
3653fn validate_text_span(
3654    document_text: &str,
3655    offset: usize,
3656    length: usize,
3657    label: &str,
3658) -> Result<usize> {
3659    let end = offset
3660        .checked_add(length)
3661        .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3662    if end > document_text.len() {
3663        return Err(CoreError::Internal(format!(
3664            "{label} text span {}..{} exceeds document text length {}",
3665            offset,
3666            end,
3667            document_text.len()
3668        )));
3669    }
3670    if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3671        return Err(CoreError::Internal(format!(
3672            "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3673        )));
3674    }
3675
3676    Ok(end)
3677}
3678
3679pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3680    KboltError::Internal(format!(
3681        "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3682    ))
3683    .into()
3684}
3685
3686#[cfg(test)]
3687mod tests;