Skip to main content

kbolt_core/storage/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12    BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29    Immediate,
30    Deferred,
31}
32
33pub struct Storage {
34    db: Mutex<Connection>,
35    cache_dir: PathBuf,
36    spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37    dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42    pub id: i64,
43    pub name: String,
44    pub description: Option<String>,
45    pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50    pub id: i64,
51    pub space_id: i64,
52    pub name: String,
53    pub path: PathBuf,
54    pub description: Option<String>,
55    pub extensions: Option<Vec<String>>,
56    pub created: String,
57    pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62    pub id: i64,
63    pub collection_id: i64,
64    pub path: String,
65    pub title: Option<String>,
66    pub hash: String,
67    pub modified: String,
68    pub active: bool,
69    pub deactivated_at: Option<String>,
70    pub fts_dirty: bool,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FileListRow {
75    pub doc_id: i64,
76    pub path: String,
77    pub title: Option<String>,
78    pub hash: String,
79    pub active: bool,
80    pub chunk_count: usize,
81    pub embedded_chunk_count: usize,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct ChunkRow {
86    pub id: i64,
87    pub doc_id: i64,
88    pub seq: i32,
89    pub offset: usize,
90    pub length: usize,
91    pub heading: Option<String>,
92    pub kind: FinalChunkKind,
93    pub retrieval_prefix: Option<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ChunkInsert {
98    pub seq: i32,
99    pub offset: usize,
100    pub length: usize,
101    pub heading: Option<String>,
102    pub kind: FinalChunkKind,
103    pub retrieval_prefix: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct DocumentTextRow {
108    pub doc_id: i64,
109    pub extractor_key: String,
110    pub source_hash: String,
111    pub text_hash: String,
112    pub generation_key: String,
113    pub text: String,
114    pub created: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct ChunkTextRow {
119    pub chunk: ChunkRow,
120    pub extractor_key: String,
121    pub text: String,
122}
123
124pub struct DocumentGenerationReplace<'a> {
125    pub collection_id: i64,
126    pub path: &'a str,
127    pub title: Option<&'a str>,
128    pub hash: &'a str,
129    pub modified: &'a str,
130    pub extractor_key: &'a str,
131    pub source_hash: &'a str,
132    pub text_hash: &'a str,
133    pub generation_key: &'a str,
134    pub text: &'a str,
135    pub chunks: &'a [ChunkInsert],
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct DocumentGenerationReplaceResult {
140    pub doc_id: i64,
141    pub old_chunk_ids: Vec<i64>,
142    pub chunk_ids: Vec<i64>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct EmbedRecord {
147    pub chunk: ChunkRow,
148    pub doc_path: String,
149    pub doc_title: Option<String>,
150    pub collection_path: PathBuf,
151    pub space_name: String,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct FtsDirtyRecord {
156    pub doc_id: i64,
157    pub doc_path: String,
158    pub doc_title: Option<String>,
159    pub doc_hash: String,
160    pub collection_path: PathBuf,
161    pub space_name: String,
162    pub chunks: Vec<ChunkRow>,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct ReapableDocument {
167    pub doc_id: i64,
168    pub space_name: String,
169    pub chunk_ids: Vec<i64>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
173pub struct TantivyEntry {
174    pub chunk_id: i64,
175    pub doc_id: i64,
176    pub filepath: String,
177    pub title: Option<String>,
178    pub heading: Option<String>,
179    pub body: String,
180}
181
182#[derive(Debug, Clone, PartialEq)]
183pub struct BM25Hit {
184    pub chunk_id: i64,
185    pub score: f32,
186}
187
188#[derive(Debug, Clone, PartialEq)]
189pub struct DenseHit {
190    pub chunk_id: i64,
191    pub distance: f32,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct SearchScopeSummary {
196    pub document_ids: Vec<i64>,
197    pub chunk_count: usize,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub enum SpaceResolution {
202    Found(SpaceRow),
203    Ambiguous(Vec<String>),
204    NotFound,
205}
206
207struct SpaceIndexes {
208    _tantivy_dir: PathBuf,
209    usearch_path: PathBuf,
210    tantivy_index: Index,
211    tantivy_reader: IndexReader,
212    tantivy_writer: Mutex<Option<IndexWriter>>,
213    usearch_index: RwLock<usearch::Index>,
214    fields: TantivyFields,
215}
216
217#[derive(Debug, Clone, Copy)]
218struct TantivyFields {
219    chunk_id: Field,
220    doc_id: Field,
221    filepath: Field,
222    title: Field,
223    heading: Field,
224    body: Field,
225}
226
227#[derive(Debug, Clone, Copy)]
228struct Bm25FieldSpec {
229    field: Field,
230    boost: f32,
231    index_record_option: IndexRecordOption,
232}
233
234impl Storage {
235    pub fn new(cache_dir: &Path) -> Result<Self> {
236        std::fs::create_dir_all(cache_dir)?;
237        let db_path = cache_dir.join(DB_FILE);
238        let conn = Connection::open(db_path)?;
239        reject_incompatible_legacy_index(&conn)?;
240        conn.execute_batch(
241            r#"
242PRAGMA foreign_keys = ON;
243PRAGMA journal_mode = WAL;
244
245CREATE TABLE IF NOT EXISTS spaces (
246    id          INTEGER PRIMARY KEY,
247    name        TEXT NOT NULL UNIQUE,
248    description TEXT,
249    created     TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
250);
251
252CREATE TABLE IF NOT EXISTS collections (
253    id          INTEGER PRIMARY KEY,
254    space_id    INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
255    name        TEXT NOT NULL,
256    path        TEXT NOT NULL,
257    description TEXT,
258    extensions  TEXT,
259    created     TEXT NOT NULL,
260    updated     TEXT NOT NULL,
261    UNIQUE(space_id, name)
262);
263
264CREATE TABLE IF NOT EXISTS documents (
265    id              INTEGER PRIMARY KEY,
266    collection_id   INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
267    path            TEXT NOT NULL,
268    title           TEXT NOT NULL DEFAULT '',
269    hash            TEXT NOT NULL,
270    modified        TEXT NOT NULL,
271    active          INTEGER NOT NULL DEFAULT 1,
272    deactivated_at  TEXT,
273    fts_dirty       INTEGER NOT NULL DEFAULT 0,
274    UNIQUE(collection_id, path)
275);
276CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
277CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
278CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
279
280CREATE TABLE IF NOT EXISTS chunks (
281    id       INTEGER PRIMARY KEY,
282    doc_id   INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
283    seq      INTEGER NOT NULL,
284    offset   INTEGER NOT NULL,
285    length   INTEGER NOT NULL,
286    heading  TEXT,
287    kind     TEXT NOT NULL DEFAULT 'section',
288    retrieval_prefix TEXT,
289    UNIQUE(doc_id, seq)
290);
291CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
292
293CREATE TABLE IF NOT EXISTS embeddings (
294    chunk_id    INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
295    model       TEXT NOT NULL,
296    embedded_at TEXT NOT NULL,
297    PRIMARY KEY (chunk_id, model)
298);
299
300CREATE TABLE IF NOT EXISTS document_texts (
301    doc_id            INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
302    extractor_key     TEXT NOT NULL,
303    source_hash       TEXT NOT NULL,
304    text_hash         TEXT NOT NULL,
305    generation_key    TEXT NOT NULL DEFAULT '',
306    text              TEXT NOT NULL,
307    created           TEXT NOT NULL
308);
309"#,
310        )?;
311        ensure_document_texts_generation_key_column(&conn)?;
312        ensure_chunks_retrieval_prefix_column(&conn)?;
313        ensure_schema_version(&conn)?;
314
315        conn.execute(
316            "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
317            params![DEFAULT_SPACE_NAME],
318        )?;
319
320        let storage = Self {
321            db: Mutex::new(conn),
322            cache_dir: cache_dir.to_path_buf(),
323            spaces: RwLock::new(HashMap::new()),
324            dirty_usearch_spaces: Mutex::new(HashSet::new()),
325        };
326        storage.open_space(DEFAULT_SPACE_NAME)?;
327
328        Ok(storage)
329    }
330
331    pub fn open_space(&self, name: &str) -> Result<()> {
332        let _space = self.get_space(name)?;
333
334        {
335            let spaces = self
336                .spaces
337                .read()
338                .map_err(|_| CoreError::poisoned("spaces"))?;
339            if spaces.contains_key(name) {
340                return Ok(());
341            }
342        }
343
344        let (tantivy_dir, usearch_path) = self.space_paths(name);
345        std::fs::create_dir_all(&tantivy_dir)?;
346        std::fs::OpenOptions::new()
347            .create(true)
348            .append(true)
349            .open(&usearch_path)?;
350
351        let mut spaces = self
352            .spaces
353            .write()
354            .map_err(|_| CoreError::poisoned("spaces"))?;
355        if spaces.contains_key(name) {
356            return Ok(());
357        }
358
359        let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
360        let tantivy_reader = tantivy_index.reader()?;
361        let usearch_index = open_or_create_usearch_index(&usearch_path)?;
362        let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
363
364        spaces.insert(
365            name.to_string(),
366            Arc::new(SpaceIndexes {
367                _tantivy_dir: tantivy_dir,
368                usearch_path,
369                tantivy_index,
370                tantivy_reader,
371                tantivy_writer: Mutex::new(None),
372                usearch_index: RwLock::new(usearch_index),
373                fields,
374            }),
375        );
376
377        Ok(())
378    }
379
380    pub fn close_space(&self, name: &str) -> Result<()> {
381        let _space = self.get_space(name)?;
382        let mut spaces = self
383            .spaces
384            .write()
385            .map_err(|_| CoreError::poisoned("spaces"))?;
386        let _removed = spaces.remove(name);
387        Ok(())
388    }
389
390    pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
391        let space_id = {
392            let conn = self
393                .db
394                .lock()
395                .map_err(|_| CoreError::poisoned("database"))?;
396
397            let result = conn.execute(
398                "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
399                params![name, description],
400            );
401
402            match result {
403                Ok(_) => conn.last_insert_rowid(),
404                Err(err) => {
405                    return match err {
406                        Error::SqliteFailure(sqlite_err, _)
407                            if sqlite_err.code == ErrorCode::ConstraintViolation =>
408                        {
409                            Err(KboltError::SpaceAlreadyExists {
410                                name: name.to_string(),
411                            }
412                            .into())
413                        }
414                        other => Err(other.into()),
415                    };
416                }
417            }
418        };
419
420        if let Err(open_err) = self.open_space(name) {
421            let rollback_result = self
422                .db
423                .lock()
424                .map_err(|_| CoreError::poisoned("database"))?
425                .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
426
427            if let Err(rollback_err) = rollback_result {
428                return Err(CoreError::Internal(format!(
429                    "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
430                )));
431            }
432
433            return Err(open_err);
434        }
435
436        Ok(space_id)
437    }
438
439    pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
440        let conn = self
441            .db
442            .lock()
443            .map_err(|_| CoreError::poisoned("database"))?;
444        let mut stmt =
445            conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
446
447        let row = stmt.query_row(params![name], decode_space_row);
448
449        match row {
450            Ok(space) => Ok(space),
451            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
452                name: name.to_string(),
453            }
454            .into()),
455            Err(err) => Err(err.into()),
456        }
457    }
458
459    pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
460        let conn = self
461            .db
462            .lock()
463            .map_err(|_| CoreError::poisoned("database"))?;
464        let mut stmt =
465            conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
466
467        let row = stmt.query_row(params![id], decode_space_row);
468
469        match row {
470            Ok(space) => Ok(space),
471            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
472                name: format!("id={id}"),
473            }
474            .into()),
475            Err(err) => Err(err.into()),
476        }
477    }
478
479    pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
480        let conn = self
481            .db
482            .lock()
483            .map_err(|_| CoreError::poisoned("database"))?;
484        let mut stmt = conn.prepare(
485            "SELECT id, name, description, created
486             FROM spaces
487             ORDER BY name ASC",
488        )?;
489
490        let rows = stmt.query_map([], decode_space_row)?;
491
492        let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
493        Ok(spaces)
494    }
495
496    pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
497        let conn = self
498            .db
499            .lock()
500            .map_err(|_| CoreError::poisoned("database"))?;
501        let mut stmt = conn.prepare(
502            "SELECT s.id, s.name, s.description, s.created
503             FROM spaces s
504             JOIN collections c ON c.space_id = s.id
505             WHERE c.name = ?1
506             ORDER BY s.name ASC",
507        )?;
508        let rows = stmt.query_map(params![collection], decode_space_row)?;
509        let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
510
511        if matches.is_empty() {
512            return Ok(SpaceResolution::NotFound);
513        }
514
515        if matches.len() == 1 {
516            return Ok(SpaceResolution::Found(matches[0].clone()));
517        }
518
519        let spaces = matches.into_iter().map(|space| space.name).collect();
520        Ok(SpaceResolution::Ambiguous(spaces))
521    }
522
523    pub fn delete_space(&self, name: &str) -> Result<()> {
524        if name == DEFAULT_SPACE_NAME {
525            let conn = self
526                .db
527                .lock()
528                .map_err(|_| CoreError::poisoned("database"))?;
529            conn.execute(
530                "DELETE FROM collections
531                 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
532                params![name],
533            )?;
534            drop(conn);
535
536            self.unload_space(name)?;
537            self.remove_space_artifacts(name)?;
538            self.open_space(name)?;
539            return Ok(());
540        }
541
542        let conn = self
543            .db
544            .lock()
545            .map_err(|_| CoreError::poisoned("database"))?;
546        let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
547        drop(conn);
548
549        if deleted == 0 {
550            return Err(KboltError::SpaceNotFound {
551                name: name.to_string(),
552            }
553            .into());
554        }
555
556        self.unload_space(name)?;
557        self.remove_space_artifacts(name)?;
558
559        Ok(())
560    }
561
562    pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
563        if old == DEFAULT_SPACE_NAME {
564            return Err(
565                KboltError::Config("cannot rename reserved space: default".to_string()).into(),
566            );
567        }
568
569        let conn = self
570            .db
571            .lock()
572            .map_err(|_| CoreError::poisoned("database"))?;
573
574        let result = conn.execute(
575            "UPDATE spaces SET name = ?1 WHERE name = ?2",
576            params![new, old],
577        );
578        drop(conn);
579
580        match result {
581            Ok(0) => Err(KboltError::SpaceNotFound {
582                name: old.to_string(),
583            }
584            .into()),
585            Ok(_) => {
586                if let Err(rename_err) = self.rename_space_artifacts(old, new) {
587                    let rollback = self
588                        .db
589                        .lock()
590                        .map_err(|_| CoreError::poisoned("database"))?
591                        .execute(
592                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
593                            params![old, new],
594                        );
595
596                    if let Err(rollback_err) = rollback {
597                        return Err(CoreError::Internal(format!(
598                            "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
599                        )));
600                    }
601
602                    return Err(rename_err);
603                }
604
605                self.unload_space(old)?;
606                if let Err(open_err) = self.open_space(new) {
607                    let _ = self.rename_space_artifacts(new, old);
608                    let _ = self
609                        .db
610                        .lock()
611                        .map_err(|_| CoreError::poisoned("database"))?
612                        .execute(
613                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
614                            params![old, new],
615                        );
616                    let _ = self.open_space(old);
617                    return Err(open_err);
618                }
619
620                Ok(())
621            }
622            Err(Error::SqliteFailure(sqlite_err, _))
623                if sqlite_err.code == ErrorCode::ConstraintViolation =>
624            {
625                Err(KboltError::SpaceAlreadyExists {
626                    name: new.to_string(),
627                }
628                .into())
629            }
630            Err(err) => Err(err.into()),
631        }
632    }
633
634    pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
635        let conn = self
636            .db
637            .lock()
638            .map_err(|_| CoreError::poisoned("database"))?;
639
640        let updated = conn.execute(
641            "UPDATE spaces SET description = ?1 WHERE name = ?2",
642            params![description, name],
643        )?;
644
645        if updated == 0 {
646            return Err(KboltError::SpaceNotFound {
647                name: name.to_string(),
648            }
649            .into());
650        }
651
652        Ok(())
653    }
654
655    pub fn create_collection(
656        &self,
657        space_id: i64,
658        name: &str,
659        path: &Path,
660        description: Option<&str>,
661        extensions: Option<&[String]>,
662    ) -> Result<i64> {
663        let conn = self
664            .db
665            .lock()
666            .map_err(|_| CoreError::poisoned("database"))?;
667
668        let space_name = lookup_space_name(&conn, space_id)?;
669        let extensions_json = serialize_extensions(extensions)?;
670        let result = conn.execute(
671            "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
672             VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
673            params![space_id, name, path.to_string_lossy(), description, extensions_json],
674        );
675
676        match result {
677            Ok(_) => Ok(conn.last_insert_rowid()),
678            Err(Error::SqliteFailure(sqlite_err, _))
679                if sqlite_err.code == ErrorCode::ConstraintViolation =>
680            {
681                Err(KboltError::CollectionAlreadyExists {
682                    name: name.to_string(),
683                    space: space_name,
684                }
685                .into())
686            }
687            Err(err) => Err(err.into()),
688        }
689    }
690
691    pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
692        let conn = self
693            .db
694            .lock()
695            .map_err(|_| CoreError::poisoned("database"))?;
696        let mut stmt = conn.prepare(
697            "SELECT id, space_id, name, path, description, extensions, created, updated
698             FROM collections
699             WHERE space_id = ?1 AND name = ?2",
700        )?;
701
702        let result = stmt.query_row(params![space_id, name], decode_collection_row);
703        match result {
704            Ok(row) => Ok(row),
705            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
706                name: name.to_string(),
707            }
708            .into()),
709            Err(err) => Err(err.into()),
710        }
711    }
712
713    pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
714        let conn = self
715            .db
716            .lock()
717            .map_err(|_| CoreError::poisoned("database"))?;
718        let mut stmt = conn.prepare(
719            "SELECT id, space_id, name, path, description, extensions, created, updated
720             FROM collections
721             WHERE id = ?1",
722        )?;
723
724        let result = stmt.query_row(params![id], decode_collection_row);
725        match result {
726            Ok(row) => Ok(row),
727            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
728                name: format!("id={id}"),
729            }
730            .into()),
731            Err(err) => Err(err.into()),
732        }
733    }
734
735    pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
736        let conn = self
737            .db
738            .lock()
739            .map_err(|_| CoreError::poisoned("database"))?;
740
741        let (sql, params): (&str, Vec<i64>) = match space_id {
742            Some(id) => (
743                "SELECT id, space_id, name, path, description, extensions, created, updated
744                 FROM collections
745                 WHERE space_id = ?1
746                 ORDER BY name ASC",
747                vec![id],
748            ),
749            None => (
750                "SELECT id, space_id, name, path, description, extensions, created, updated
751                 FROM collections
752                 ORDER BY space_id ASC, name ASC",
753                Vec::new(),
754            ),
755        };
756
757        let mut stmt = conn.prepare(sql)?;
758        let rows = if params.is_empty() {
759            stmt.query_map([], decode_collection_row)?
760        } else {
761            stmt.query_map(params![params[0]], decode_collection_row)?
762        };
763        let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
764        Ok(collections)
765    }
766
767    pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
768        let conn = self
769            .db
770            .lock()
771            .map_err(|_| CoreError::poisoned("database"))?;
772        let _space_name = lookup_space_name(&conn, space_id)?;
773
774        query_count(
775            &conn,
776            "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
777            params![space_id],
778        )
779    }
780
781    pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
782        let conn = self
783            .db
784            .lock()
785            .map_err(|_| CoreError::poisoned("database"))?;
786        let _space_name = lookup_space_name(&conn, space_id)?;
787
788        let deleted = conn.execute(
789            "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
790            params![space_id, name],
791        )?;
792
793        if deleted == 0 {
794            return Err(KboltError::CollectionNotFound {
795                name: name.to_string(),
796            }
797            .into());
798        }
799
800        Ok(())
801    }
802
803    pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
804        let conn = self
805            .db
806            .lock()
807            .map_err(|_| CoreError::poisoned("database"))?;
808        let space_name = lookup_space_name(&conn, space_id)?;
809        let result = conn.execute(
810            "UPDATE collections
811             SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
812             WHERE space_id = ?2 AND name = ?3",
813            params![new, space_id, old],
814        );
815
816        match result {
817            Ok(0) => Err(KboltError::CollectionNotFound {
818                name: old.to_string(),
819            }
820            .into()),
821            Ok(_) => Ok(()),
822            Err(Error::SqliteFailure(sqlite_err, _))
823                if sqlite_err.code == ErrorCode::ConstraintViolation =>
824            {
825                Err(KboltError::CollectionAlreadyExists {
826                    name: new.to_string(),
827                    space: space_name,
828                }
829                .into())
830            }
831            Err(err) => Err(err.into()),
832        }
833    }
834
835    pub fn update_collection_description(
836        &self,
837        space_id: i64,
838        name: &str,
839        desc: &str,
840    ) -> Result<()> {
841        let conn = self
842            .db
843            .lock()
844            .map_err(|_| CoreError::poisoned("database"))?;
845        let _space_name = lookup_space_name(&conn, space_id)?;
846
847        let updated = conn.execute(
848            "UPDATE collections
849             SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
850             WHERE space_id = ?2 AND name = ?3",
851            params![desc, space_id, name],
852        )?;
853
854        if updated == 0 {
855            return Err(KboltError::CollectionNotFound {
856                name: name.to_string(),
857            }
858            .into());
859        }
860
861        Ok(())
862    }
863
864    pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
865        let conn = self
866            .db
867            .lock()
868            .map_err(|_| CoreError::poisoned("database"))?;
869
870        let updated = conn.execute(
871            "UPDATE collections
872             SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
873             WHERE id = ?1",
874            params![collection_id],
875        )?;
876
877        if updated == 0 {
878            return Err(KboltError::CollectionNotFound {
879                name: format!("id={collection_id}"),
880            }
881            .into());
882        }
883
884        Ok(())
885    }
886
887    pub fn upsert_document(
888        &self,
889        collection_id: i64,
890        path: &str,
891        title: Option<&str>,
892        hash: &str,
893        modified: &str,
894    ) -> Result<i64> {
895        let conn = self
896            .db
897            .lock()
898            .map_err(|_| CoreError::poisoned("database"))?;
899        let _collection_name = lookup_collection_name(&conn, collection_id)?;
900
901        let id = conn.query_row(
902            "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
903             VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
904             ON CONFLICT(collection_id, path) DO UPDATE SET
905                 title = excluded.title,
906                 hash = excluded.hash,
907                 modified = excluded.modified,
908                 active = 1,
909                 deactivated_at = NULL,
910                 fts_dirty = 1
911             RETURNING id",
912            params![
913                collection_id,
914                path,
915                normalized_title(title).unwrap_or(""),
916                hash,
917                modified
918            ],
919            |row| row.get(0),
920        )?;
921        Ok(id)
922    }
923
924    pub fn get_document_by_path(
925        &self,
926        collection_id: i64,
927        path: &str,
928    ) -> Result<Option<DocumentRow>> {
929        let conn = self
930            .db
931            .lock()
932            .map_err(|_| CoreError::poisoned("database"))?;
933        let _collection_name = lookup_collection_name(&conn, collection_id)?;
934        let mut stmt = conn.prepare(
935            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
936             FROM documents
937             WHERE collection_id = ?1 AND path = ?2",
938        )?;
939
940        let result = stmt.query_row(params![collection_id, path], decode_document_row);
941        match result {
942            Ok(row) => Ok(Some(row)),
943            Err(Error::QueryReturnedNoRows) => Ok(None),
944            Err(err) => Err(err.into()),
945        }
946    }
947
948    pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
949        let conn = self
950            .db
951            .lock()
952            .map_err(|_| CoreError::poisoned("database"))?;
953        let mut stmt = conn.prepare(
954            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
955             FROM documents
956             WHERE id = ?1",
957        )?;
958
959        let result = stmt.query_row(params![id], decode_document_row);
960        match result {
961            Ok(row) => Ok(row),
962            Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
963                path: format!("id={id}"),
964            }
965            .into()),
966            Err(err) => Err(err.into()),
967        }
968    }
969
970    pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
971        if ids.is_empty() {
972            return Ok(Vec::new());
973        }
974
975        let conn = self
976            .db
977            .lock()
978            .map_err(|_| CoreError::poisoned("database"))?;
979
980        let placeholders = vec!["?"; ids.len()].join(", ");
981        let sql = format!(
982            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
983             FROM documents
984             WHERE id IN ({placeholders})"
985        );
986        let mut stmt = conn.prepare(&sql)?;
987        let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
988        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
989        Ok(docs)
990    }
991
992    pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
993        let conn = self
994            .db
995            .lock()
996            .map_err(|_| CoreError::poisoned("database"))?;
997
998        let updated = conn.execute(
999            "UPDATE documents
1000             SET modified = ?1,
1001                 active = 1,
1002                 deactivated_at = NULL
1003             WHERE id = ?2",
1004            params![modified, doc_id],
1005        )?;
1006
1007        if updated == 0 {
1008            return Err(KboltError::DocumentNotFound {
1009                path: format!("id={doc_id}"),
1010            }
1011            .into());
1012        }
1013
1014        Ok(())
1015    }
1016
1017    pub fn list_documents(
1018        &self,
1019        collection_id: i64,
1020        active_only: bool,
1021    ) -> Result<Vec<DocumentRow>> {
1022        let conn = self
1023            .db
1024            .lock()
1025            .map_err(|_| CoreError::poisoned("database"))?;
1026        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1027        let active_only = i64::from(active_only);
1028        let mut stmt = conn.prepare(
1029            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1030             FROM documents
1031             WHERE collection_id = ?1
1032               AND (?2 = 0 OR active = 1)
1033             ORDER BY path ASC",
1034        )?;
1035        let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1036        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1037        Ok(docs)
1038    }
1039
1040    pub fn list_collection_file_rows(
1041        &self,
1042        collection_id: i64,
1043        active_only: bool,
1044    ) -> Result<Vec<FileListRow>> {
1045        let conn = self
1046            .db
1047            .lock()
1048            .map_err(|_| CoreError::poisoned("database"))?;
1049        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1050        let active_only = i64::from(active_only);
1051        let mut stmt = conn.prepare(
1052            "SELECT d.id, d.path, d.title, d.hash, d.active,
1053                    COUNT(DISTINCT c.id) AS chunk_count,
1054                    COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1055             FROM documents d
1056             LEFT JOIN chunks c ON c.doc_id = d.id
1057             LEFT JOIN embeddings e ON e.chunk_id = c.id
1058             WHERE d.collection_id = ?1
1059               AND (?2 = 0 OR d.active = 1)
1060             GROUP BY d.id, d.path, d.title, d.hash, d.active
1061             ORDER BY d.path ASC",
1062        )?;
1063        let rows = stmt.query_map(params![collection_id, active_only], |row| {
1064            let chunk_count: i64 = row.get(5)?;
1065            let embedded_chunk_count: i64 = row.get(6)?;
1066            Ok(FileListRow {
1067                doc_id: row.get(0)?,
1068                path: row.get(1)?,
1069                title: decode_optional_title(row.get::<_, String>(2)?),
1070                hash: row.get(3)?,
1071                active: row.get::<_, i64>(4)? != 0,
1072                chunk_count: chunk_count as usize,
1073                embedded_chunk_count: embedded_chunk_count as usize,
1074            })
1075        })?;
1076        let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1077        Ok(files)
1078    }
1079
1080    pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1081        let conn = self
1082            .db
1083            .lock()
1084            .map_err(|_| CoreError::poisoned("database"))?;
1085
1086        let pattern = format!("{prefix}%");
1087        let mut stmt = conn.prepare(
1088            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1089             FROM documents
1090             WHERE hash LIKE ?1
1091             ORDER BY id ASC",
1092        )?;
1093        let rows = stmt.query_map(params![pattern], decode_document_row)?;
1094        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1095        Ok(docs)
1096    }
1097
1098    pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1099        let conn = self
1100            .db
1101            .lock()
1102            .map_err(|_| CoreError::poisoned("database"))?;
1103
1104        let updated = conn.execute(
1105            "UPDATE documents
1106             SET active = 0,
1107                 deactivated_at = CASE
1108                    WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1109                    ELSE deactivated_at
1110                 END
1111             WHERE id = ?1",
1112            params![doc_id],
1113        )?;
1114
1115        if updated == 0 {
1116            return Err(KboltError::DocumentNotFound {
1117                path: format!("id={doc_id}"),
1118            }
1119            .into());
1120        }
1121
1122        Ok(())
1123    }
1124
1125    pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1126        let conn = self
1127            .db
1128            .lock()
1129            .map_err(|_| CoreError::poisoned("database"))?;
1130
1131        let updated = conn.execute(
1132            "UPDATE documents
1133             SET active = 1, deactivated_at = NULL
1134             WHERE id = ?1",
1135            params![doc_id],
1136        )?;
1137
1138        if updated == 0 {
1139            return Err(KboltError::DocumentNotFound {
1140                path: format!("id={doc_id}"),
1141            }
1142            .into());
1143        }
1144
1145        Ok(())
1146    }
1147
1148    pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1149        let reaped = self.list_reapable_documents(older_than_days)?;
1150        let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1151        self.delete_documents(&doc_ids)?;
1152        Ok(doc_ids)
1153    }
1154
1155    pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1156        self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1157    }
1158
1159    pub fn list_reapable_documents_in_space(
1160        &self,
1161        older_than_days: u32,
1162        space_id: i64,
1163    ) -> Result<Vec<ReapableDocument>> {
1164        self.list_reapable_documents_filtered(
1165            older_than_days,
1166            " AND c.space_id = ?",
1167            vec![SqlValue::Integer(space_id)],
1168        )
1169    }
1170
1171    pub fn list_reapable_documents_in_collections(
1172        &self,
1173        older_than_days: u32,
1174        collection_ids: &[i64],
1175    ) -> Result<Vec<ReapableDocument>> {
1176        if collection_ids.is_empty() {
1177            return Ok(Vec::new());
1178        }
1179
1180        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1181        let clause = format!(" AND d.collection_id IN ({placeholders})");
1182        let params = collection_ids
1183            .iter()
1184            .map(|id| SqlValue::Integer(*id))
1185            .collect::<Vec<_>>();
1186        self.list_reapable_documents_filtered(older_than_days, &clause, params)
1187    }
1188
1189    fn list_reapable_documents_filtered(
1190        &self,
1191        older_than_days: u32,
1192        scope_clause: &str,
1193        scope_params: Vec<SqlValue>,
1194    ) -> Result<Vec<ReapableDocument>> {
1195        let conn = self
1196            .db
1197            .lock()
1198            .map_err(|_| CoreError::poisoned("database"))?;
1199
1200        let modifier = format!("-{} days", older_than_days);
1201        let sql = format!(
1202            "SELECT d.id, s.name
1203             FROM documents d
1204             JOIN collections c ON c.id = d.collection_id
1205             JOIN spaces s ON s.id = c.space_id
1206             WHERE d.active = 0
1207               AND d.deactivated_at IS NOT NULL
1208               AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1209             ORDER BY d.id ASC"
1210        );
1211        let mut stmt = conn.prepare(&sql)?;
1212        let mut params = Vec::with_capacity(scope_params.len() + 1);
1213        params.push(SqlValue::Text(modifier));
1214        params.extend(scope_params);
1215        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1216            Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1217        })?;
1218        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1219        drop(stmt);
1220
1221        let mut documents = Vec::with_capacity(headers.len());
1222        for (doc_id, space_name) in headers {
1223            documents.push(ReapableDocument {
1224                doc_id,
1225                space_name,
1226                chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1227            });
1228        }
1229
1230        Ok(documents)
1231    }
1232
1233    pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1234        if doc_ids.is_empty() {
1235            return Ok(());
1236        }
1237
1238        let conn = self
1239            .db
1240            .lock()
1241            .map_err(|_| CoreError::poisoned("database"))?;
1242
1243        let placeholders = vec!["?"; doc_ids.len()].join(", ");
1244        let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1245        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1246        Ok(())
1247    }
1248
1249    pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1250        if chunks.is_empty() {
1251            return Ok(Vec::new());
1252        }
1253
1254        let conn = self
1255            .db
1256            .lock()
1257            .map_err(|_| CoreError::poisoned("database"))?;
1258        let _doc = lookup_document_id(&conn, doc_id)?;
1259
1260        let tx = conn.unchecked_transaction()?;
1261        let mut stmt = tx.prepare(
1262            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1263             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1264        )?;
1265
1266        let mut ids = Vec::with_capacity(chunks.len());
1267        for chunk in chunks {
1268            stmt.execute(params![
1269                doc_id,
1270                chunk.seq,
1271                chunk.offset as i64,
1272                chunk.length as i64,
1273                chunk.heading,
1274                chunk.kind.as_storage_kind(),
1275                chunk.retrieval_prefix.as_deref(),
1276            ])?;
1277            ids.push(tx.last_insert_rowid());
1278        }
1279        drop(stmt);
1280        tx.commit()?;
1281        Ok(ids)
1282    }
1283
1284    pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1285        let conn = self
1286            .db
1287            .lock()
1288            .map_err(|_| CoreError::poisoned("database"))?;
1289        let _doc = lookup_document_id(&conn, doc_id)?;
1290
1291        let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1292        let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1293        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1294
1295        conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1296        Ok(chunk_ids)
1297    }
1298
1299    pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1300        let conn = self
1301            .db
1302            .lock()
1303            .map_err(|_| CoreError::poisoned("database"))?;
1304        let _doc = lookup_document_id(&conn, doc_id)?;
1305
1306        let mut stmt = conn.prepare(
1307            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1308             FROM chunks
1309             WHERE doc_id = ?1
1310             ORDER BY seq ASC",
1311        )?;
1312        let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1313        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1314        Ok(chunks)
1315    }
1316
1317    pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1318        if doc_ids.is_empty() {
1319            return Ok(HashMap::new());
1320        }
1321
1322        let mut requested = doc_ids.to_vec();
1323        requested.sort_unstable();
1324        requested.dedup();
1325
1326        let conn = self
1327            .db
1328            .lock()
1329            .map_err(|_| CoreError::poisoned("database"))?;
1330
1331        let placeholders = vec!["?"; requested.len()].join(", ");
1332        let sql = format!(
1333            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1334             FROM chunks
1335             WHERE doc_id IN ({placeholders})
1336             ORDER BY doc_id ASC, seq ASC"
1337        );
1338        let mut stmt = conn.prepare(&sql)?;
1339        let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1340        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1341        for doc_id in requested {
1342            chunks_by_doc.insert(doc_id, Vec::new());
1343        }
1344        for row in rows {
1345            let chunk = row?;
1346            chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1347        }
1348
1349        Ok(chunks_by_doc)
1350    }
1351
1352    pub fn get_chunks_for_document_seq_ranges(
1353        &self,
1354        ranges: &[(i64, i32, i32)],
1355    ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1356        if ranges.is_empty() {
1357            return Ok(HashMap::new());
1358        }
1359
1360        let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1361        for (doc_id, min_seq, max_seq) in ranges {
1362            if min_seq > max_seq {
1363                return Err(CoreError::Internal(format!(
1364                    "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1365                )));
1366            }
1367
1368            ranges_by_doc
1369                .entry(*doc_id)
1370                .or_default()
1371                .push((*min_seq, *max_seq));
1372        }
1373
1374        for doc_ranges in ranges_by_doc.values_mut() {
1375            doc_ranges.sort_unstable();
1376            let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1377            for (min_seq, max_seq) in doc_ranges.drain(..) {
1378                if let Some((_, merged_max)) = merged.last_mut() {
1379                    if min_seq <= merged_max.saturating_add(1) {
1380                        *merged_max = (*merged_max).max(max_seq);
1381                        continue;
1382                    }
1383                }
1384                merged.push((min_seq, max_seq));
1385            }
1386            *doc_ranges = merged;
1387        }
1388
1389        let conn = self
1390            .db
1391            .lock()
1392            .map_err(|_| CoreError::poisoned("database"))?;
1393        let mut stmt = conn.prepare(
1394            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1395             FROM chunks
1396             WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1397             ORDER BY seq ASC",
1398        )?;
1399
1400        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1401        for (doc_id, doc_ranges) in ranges_by_doc {
1402            chunks_by_doc.entry(doc_id).or_default();
1403            for (min_seq, max_seq) in doc_ranges {
1404                let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1405                for row in rows {
1406                    chunks_by_doc.entry(doc_id).or_default().push(row?);
1407                }
1408            }
1409        }
1410
1411        Ok(chunks_by_doc)
1412    }
1413
1414    pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1415        if chunk_ids.is_empty() {
1416            return Ok(Vec::new());
1417        }
1418
1419        let conn = self
1420            .db
1421            .lock()
1422            .map_err(|_| CoreError::poisoned("database"))?;
1423
1424        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1425        let sql = format!(
1426            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1427             FROM chunks
1428             WHERE id IN ({placeholders})
1429             ORDER BY id ASC"
1430        );
1431        let mut stmt = conn.prepare(&sql)?;
1432        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1433        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1434        Ok(chunks)
1435    }
1436
1437    pub fn get_chunks_with_documents(
1438        &self,
1439        chunk_ids: &[i64],
1440    ) -> Result<Vec<(ChunkRow, DocumentRow)>> {
1441        if chunk_ids.is_empty() {
1442            return Ok(Vec::new());
1443        }
1444
1445        let conn = self
1446            .db
1447            .lock()
1448            .map_err(|_| CoreError::poisoned("database"))?;
1449
1450        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1451        let sql = format!(
1452            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
1453                    d.id, d.collection_id, d.path, d.title, d.hash, d.modified, d.active, d.deactivated_at, d.fts_dirty
1454             FROM chunks c
1455             JOIN documents d ON d.id = c.doc_id
1456             WHERE c.id IN ({placeholders})
1457             ORDER BY c.id ASC"
1458        );
1459        let mut stmt = conn.prepare(&sql)?;
1460        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), |row| {
1461            Ok((
1462                decode_chunk_row_at(row, 0)?,
1463                decode_document_row_at(row, 8)?,
1464            ))
1465        })?;
1466        let rows = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1467        Ok(rows)
1468    }
1469
1470    pub fn get_active_search_scope_summary_in_collections(
1471        &self,
1472        collection_ids: &[i64],
1473    ) -> Result<SearchScopeSummary> {
1474        if collection_ids.is_empty() {
1475            return Ok(SearchScopeSummary {
1476                document_ids: Vec::new(),
1477                chunk_count: 0,
1478            });
1479        }
1480
1481        let conn = self
1482            .db
1483            .lock()
1484            .map_err(|_| CoreError::poisoned("database"))?;
1485
1486        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1487        let sql = format!(
1488            "SELECT d.id, COUNT(c.id)
1489             FROM chunks c
1490             JOIN documents d ON d.id = c.doc_id
1491             WHERE d.active = 1
1492               AND d.collection_id IN ({placeholders})
1493             GROUP BY d.id
1494             ORDER BY d.id ASC"
1495        );
1496        let mut stmt = conn.prepare(&sql)?;
1497        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1498            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1499        })?;
1500
1501        let mut document_ids = Vec::new();
1502        let mut chunk_count = 0usize;
1503        for row in rows {
1504            let (doc_id, doc_chunk_count) = row?;
1505            let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1506                CoreError::Internal(format!(
1507                    "active chunk count must be non-negative for document {doc_id}"
1508                ))
1509            })?;
1510            document_ids.push(doc_id);
1511            chunk_count = chunk_count.saturating_add(doc_chunk_count);
1512        }
1513
1514        Ok(SearchScopeSummary {
1515            document_ids,
1516            chunk_count,
1517        })
1518    }
1519
1520    pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1521        if collection_ids.is_empty() {
1522            return Ok(0);
1523        }
1524
1525        let conn = self
1526            .db
1527            .lock()
1528            .map_err(|_| CoreError::poisoned("database"))?;
1529
1530        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1531        let sql = format!(
1532            "SELECT COUNT(*)
1533             FROM chunks c
1534             JOIN documents d ON d.id = c.doc_id
1535             WHERE d.active = 1
1536               AND d.collection_id IN ({placeholders})"
1537        );
1538        query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1539    }
1540
1541    pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1542        if collection_ids.is_empty() {
1543            return Ok(false);
1544        }
1545
1546        let conn = self
1547            .db
1548            .lock()
1549            .map_err(|_| CoreError::poisoned("database"))?;
1550
1551        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1552        let sql = format!(
1553            "SELECT EXISTS(
1554                 SELECT 1
1555                 FROM documents
1556                 WHERE active = 0
1557                   AND collection_id IN ({placeholders})
1558             )"
1559        );
1560        let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1561            row.get(0)
1562        })?;
1563        Ok(exists != 0)
1564    }
1565
1566    pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1567        if collection_ids.is_empty() {
1568            return Ok(Vec::new());
1569        }
1570
1571        let conn = self
1572            .db
1573            .lock()
1574            .map_err(|_| CoreError::poisoned("database"))?;
1575
1576        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1577        let sql = format!(
1578            "SELECT c.id
1579             FROM chunks c
1580             JOIN documents d ON d.id = c.doc_id
1581             WHERE d.active = 1
1582               AND d.collection_id IN ({placeholders})
1583             ORDER BY d.id ASC, c.seq ASC"
1584        );
1585        let mut stmt = conn.prepare(&sql)?;
1586        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1587        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1588        Ok(chunk_ids)
1589    }
1590
1591    pub fn replace_document_generation(
1592        &self,
1593        replacement: DocumentGenerationReplace<'_>,
1594    ) -> Result<DocumentGenerationReplaceResult> {
1595        for chunk in replacement.chunks {
1596            validate_text_span(
1597                replacement.text,
1598                chunk.offset,
1599                chunk.length,
1600                &format!("chunk seq {}", chunk.seq),
1601            )?;
1602        }
1603
1604        let conn = self
1605            .db
1606            .lock()
1607            .map_err(|_| CoreError::poisoned("database"))?;
1608        let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1609
1610        let tx = conn.unchecked_transaction()?;
1611        let doc_id: i64 = tx.query_row(
1612            "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
1613             VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
1614             ON CONFLICT(collection_id, path) DO UPDATE SET
1615                 title = excluded.title,
1616                 hash = excluded.hash,
1617                 modified = excluded.modified,
1618                 active = 1,
1619                 deactivated_at = NULL,
1620                 fts_dirty = 1
1621             RETURNING id",
1622            params![
1623                replacement.collection_id,
1624                replacement.path,
1625                normalized_title(replacement.title).unwrap_or(""),
1626                replacement.hash,
1627                replacement.modified
1628            ],
1629            |row| row.get(0),
1630        )?;
1631
1632        let old_chunk_ids = {
1633            let mut stmt =
1634                tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1635            let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1636            rows.collect::<std::result::Result<Vec<_>, _>>()?
1637        };
1638
1639        tx.execute(
1640            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1641             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1642             ON CONFLICT(doc_id) DO UPDATE SET
1643                 extractor_key = excluded.extractor_key,
1644                 source_hash = excluded.source_hash,
1645                 text_hash = excluded.text_hash,
1646                 generation_key = excluded.generation_key,
1647                 text = excluded.text,
1648                 created = excluded.created",
1649            params![
1650                doc_id,
1651                replacement.extractor_key,
1652                replacement.source_hash,
1653                replacement.text_hash,
1654                replacement.generation_key,
1655                replacement.text
1656            ],
1657        )?;
1658
1659        tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1660
1661        let mut stmt = tx.prepare(
1662            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1663             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1664        )?;
1665        let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1666        for chunk in replacement.chunks {
1667            stmt.execute(params![
1668                doc_id,
1669                chunk.seq,
1670                chunk.offset as i64,
1671                chunk.length as i64,
1672                chunk.heading,
1673                chunk.kind.as_storage_kind(),
1674                chunk.retrieval_prefix.as_deref(),
1675            ])?;
1676            chunk_ids.push(tx.last_insert_rowid());
1677        }
1678        drop(stmt);
1679
1680        tx.commit()?;
1681        Ok(DocumentGenerationReplaceResult {
1682            doc_id,
1683            old_chunk_ids,
1684            chunk_ids,
1685        })
1686    }
1687
1688    pub fn put_document_text(
1689        &self,
1690        doc_id: i64,
1691        extractor_key: &str,
1692        source_hash: &str,
1693        text_hash: &str,
1694        generation_key: &str,
1695        text: &str,
1696    ) -> Result<()> {
1697        let conn = self
1698            .db
1699            .lock()
1700            .map_err(|_| CoreError::poisoned("database"))?;
1701        let _doc = lookup_document_id(&conn, doc_id)?;
1702
1703        conn.execute(
1704            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1705             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1706             ON CONFLICT(doc_id) DO UPDATE SET
1707                 extractor_key = excluded.extractor_key,
1708                 source_hash = excluded.source_hash,
1709                 text_hash = excluded.text_hash,
1710                 generation_key = excluded.generation_key,
1711                 text = excluded.text,
1712                 created = excluded.created",
1713            params![
1714                doc_id,
1715                extractor_key,
1716                source_hash,
1717                text_hash,
1718                generation_key,
1719                text
1720            ],
1721        )?;
1722        Ok(())
1723    }
1724
1725    pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1726        let conn = self
1727            .db
1728            .lock()
1729            .map_err(|_| CoreError::poisoned("database"))?;
1730        let _doc = lookup_document_id(&conn, doc_id)?;
1731
1732        let result = conn.query_row(
1733            "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1734             FROM document_texts
1735             WHERE doc_id = ?1",
1736            params![doc_id],
1737            decode_document_text_row,
1738        );
1739        match result {
1740            Ok(row) => Ok(row),
1741            Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1742            Err(err) => Err(err.into()),
1743        }
1744    }
1745
1746    pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1747        if doc_ids.is_empty() {
1748            return Ok(HashMap::new());
1749        }
1750
1751        let mut requested = doc_ids.to_vec();
1752        requested.sort_unstable();
1753        requested.dedup();
1754
1755        let text_by_doc = self.get_existing_document_texts(&requested)?;
1756        for doc_id in requested {
1757            if !text_by_doc.contains_key(&doc_id) {
1758                return Err(missing_document_text_error(doc_id));
1759            }
1760        }
1761
1762        Ok(text_by_doc)
1763    }
1764
1765    pub fn get_existing_document_texts(
1766        &self,
1767        doc_ids: &[i64],
1768    ) -> Result<HashMap<i64, DocumentTextRow>> {
1769        if doc_ids.is_empty() {
1770            return Ok(HashMap::new());
1771        }
1772
1773        let mut requested = doc_ids.to_vec();
1774        requested.sort_unstable();
1775        requested.dedup();
1776
1777        let conn = self
1778            .db
1779            .lock()
1780            .map_err(|_| CoreError::poisoned("database"))?;
1781        let mut text_by_doc = HashMap::new();
1782        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1783            let placeholders = vec!["?"; batch.len()].join(", ");
1784            let sql = format!(
1785                "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1786                 FROM document_texts
1787                 WHERE doc_id IN ({placeholders})
1788                 ORDER BY doc_id ASC"
1789            );
1790            let mut stmt = conn.prepare(&sql)?;
1791            let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1792            for row in rows {
1793                let row = row?;
1794                text_by_doc.insert(row.doc_id, row);
1795            }
1796        }
1797
1798        Ok(text_by_doc)
1799    }
1800
1801    pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1802        let conn = self
1803            .db
1804            .lock()
1805            .map_err(|_| CoreError::poisoned("database"))?;
1806        let exists: i64 = conn.query_row(
1807            "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1808            params![doc_id],
1809            |row| row.get(0),
1810        )?;
1811        Ok(exists != 0)
1812    }
1813
1814    pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1815        let conn = self
1816            .db
1817            .lock()
1818            .map_err(|_| CoreError::poisoned("database"))?;
1819        let exists: i64 = conn.query_row(
1820            "SELECT EXISTS(
1821                SELECT 1 FROM document_texts
1822                WHERE doc_id = ?1 AND generation_key = ?2
1823            )",
1824            params![doc_id, generation_key],
1825            |row| row.get(0),
1826        )?;
1827        Ok(exists != 0)
1828    }
1829
1830    pub fn get_document_text_generation_keys(
1831        &self,
1832        doc_ids: &[i64],
1833    ) -> Result<HashMap<i64, String>> {
1834        if doc_ids.is_empty() {
1835            return Ok(HashMap::new());
1836        }
1837
1838        let mut requested = doc_ids.to_vec();
1839        requested.sort_unstable();
1840        requested.dedup();
1841
1842        let conn = self
1843            .db
1844            .lock()
1845            .map_err(|_| CoreError::poisoned("database"))?;
1846        let mut generation_by_doc = HashMap::with_capacity(requested.len());
1847        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1848            let placeholders = vec!["?"; batch.len()].join(", ");
1849            let sql = format!(
1850                "SELECT doc_id, generation_key
1851                 FROM document_texts
1852                 WHERE doc_id IN ({placeholders})"
1853            );
1854            let mut stmt = conn.prepare(&sql)?;
1855            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1856                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1857            })?;
1858            for row in rows {
1859                let (doc_id, generation_key) = row?;
1860                generation_by_doc.insert(doc_id, generation_key);
1861            }
1862        }
1863
1864        Ok(generation_by_doc)
1865    }
1866
1867    pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1868        if doc_ids.is_empty() {
1869            return Ok(HashMap::new());
1870        }
1871
1872        let mut requested = doc_ids.to_vec();
1873        requested.sort_unstable();
1874        requested.dedup();
1875
1876        let conn = self
1877            .db
1878            .lock()
1879            .map_err(|_| CoreError::poisoned("database"))?;
1880        let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1881        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1882            let placeholders = vec!["?"; batch.len()].join(", ");
1883            let sql = format!(
1884                "SELECT doc_id, extractor_key
1885                 FROM document_texts
1886                 WHERE doc_id IN ({placeholders})"
1887            );
1888            let mut stmt = conn.prepare(&sql)?;
1889            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1890                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1891            })?;
1892            for row in rows {
1893                let (doc_id, extractor_key) = row?;
1894                extractor_by_doc.insert(doc_id, extractor_key);
1895            }
1896        }
1897
1898        for doc_id in requested {
1899            if !extractor_by_doc.contains_key(&doc_id) {
1900                return Err(missing_document_text_error(doc_id));
1901            }
1902        }
1903
1904        Ok(extractor_by_doc)
1905    }
1906
1907    pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1908        if chunk_ids.is_empty() {
1909            return Ok(HashMap::new());
1910        }
1911
1912        let mut requested = chunk_ids.to_vec();
1913        requested.sort_unstable();
1914        requested.dedup();
1915
1916        let conn = self
1917            .db
1918            .lock()
1919            .map_err(|_| CoreError::poisoned("database"))?;
1920        let mut text_by_chunk = HashMap::with_capacity(requested.len());
1921        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1922            let placeholders = vec!["?"; batch.len()].join(", ");
1923            let sql = format!(
1924                "SELECT c.id,
1925                        c.offset,
1926                        c.length,
1927                        length(CAST(dt.text AS BLOB)),
1928                        substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1929                        substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1930                        substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1931                 FROM chunks c
1932                 JOIN document_texts dt ON dt.doc_id = c.doc_id
1933                 WHERE c.id IN ({placeholders})"
1934            );
1935            let mut stmt = conn.prepare(&sql)?;
1936            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1937                Ok((
1938                    row.get::<_, i64>(0)?,
1939                    row.get::<_, i64>(1)?,
1940                    row.get::<_, i64>(2)?,
1941                    row.get::<_, i64>(3)?,
1942                    row.get::<_, Vec<u8>>(4)?,
1943                    row.get::<_, Vec<u8>>(5)?,
1944                    row.get::<_, Vec<u8>>(6)?,
1945                ))
1946            })?;
1947            for row in rows {
1948                let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1949                let text = canonical_chunk_slice_from_bytes(
1950                    chunk_id,
1951                    offset,
1952                    length,
1953                    document_len,
1954                    start_byte,
1955                    end_byte,
1956                    bytes,
1957                )?;
1958                text_by_chunk.insert(chunk_id, text);
1959            }
1960        }
1961
1962        for chunk_id in requested {
1963            if !text_by_chunk.contains_key(&chunk_id) {
1964                return Err(CoreError::Internal(format!(
1965                    "canonical text missing for chunk {chunk_id}"
1966                )));
1967            }
1968        }
1969
1970        Ok(text_by_chunk)
1971    }
1972
1973    pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
1974        let conn = self
1975            .db
1976            .lock()
1977            .map_err(|_| CoreError::poisoned("database"))?;
1978        let result = conn.query_row(
1979            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
1980             FROM chunks c
1981             JOIN document_texts dt ON dt.doc_id = c.doc_id
1982             WHERE c.id = ?1",
1983            params![chunk_id],
1984            |row| {
1985                let chunk = decode_chunk_row(row)?;
1986                let extractor_key: String = row.get(8)?;
1987                let document_text: String = row.get(9)?;
1988                Ok((chunk, extractor_key, document_text))
1989            },
1990        );
1991        let (chunk, extractor_key, document_text) = match result {
1992            Ok(row) => row,
1993            Err(Error::QueryReturnedNoRows) => {
1994                let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
1995                return Err(missing_document_text_error(doc_id));
1996            }
1997            Err(err) => return Err(err.into()),
1998        };
1999        let text = chunk_text_from_canonical(&document_text, &chunk)?;
2000        Ok(ChunkTextRow {
2001            chunk,
2002            extractor_key,
2003            text,
2004        })
2005    }
2006
2007    pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2008        if entries.is_empty() {
2009            return Ok(());
2010        }
2011
2012        let conn = self
2013            .db
2014            .lock()
2015            .map_err(|_| CoreError::poisoned("database"))?;
2016
2017        let tx = conn.unchecked_transaction()?;
2018        let mut stmt = tx.prepare(
2019            "INSERT INTO embeddings (chunk_id, model, embedded_at)
2020             VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2021             ON CONFLICT(chunk_id, model) DO UPDATE SET
2022               embedded_at = excluded.embedded_at",
2023        )?;
2024
2025        for (chunk_id, model) in entries {
2026            stmt.execute(params![chunk_id, model])?;
2027        }
2028
2029        drop(stmt);
2030        tx.commit()?;
2031        Ok(())
2032    }
2033
2034    pub fn get_unembedded_chunks(
2035        &self,
2036        model: &str,
2037        after_chunk_id: i64,
2038        limit: usize,
2039    ) -> Result<Vec<EmbedRecord>> {
2040        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2041    }
2042
2043    pub fn get_unembedded_chunks_in_space(
2044        &self,
2045        model: &str,
2046        space_id: i64,
2047        after_chunk_id: i64,
2048        limit: usize,
2049    ) -> Result<Vec<EmbedRecord>> {
2050        self.get_unembedded_chunks_filtered(
2051            model,
2052            after_chunk_id,
2053            limit,
2054            " AND col.space_id = ?",
2055            vec![SqlValue::Integer(space_id)],
2056        )
2057    }
2058
2059    pub fn get_unembedded_chunks_in_collections(
2060        &self,
2061        model: &str,
2062        collection_ids: &[i64],
2063        after_chunk_id: i64,
2064        limit: usize,
2065    ) -> Result<Vec<EmbedRecord>> {
2066        if collection_ids.is_empty() {
2067            return Ok(Vec::new());
2068        }
2069
2070        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2071        let clause = format!(" AND d.collection_id IN ({placeholders})");
2072        let params = collection_ids
2073            .iter()
2074            .map(|id| SqlValue::Integer(*id))
2075            .collect::<Vec<_>>();
2076        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2077    }
2078
2079    fn get_unembedded_chunks_filtered(
2080        &self,
2081        model: &str,
2082        after_chunk_id: i64,
2083        limit: usize,
2084        scope_clause: &str,
2085        scope_params: Vec<SqlValue>,
2086    ) -> Result<Vec<EmbedRecord>> {
2087        let conn = self
2088            .db
2089            .lock()
2090            .map_err(|_| CoreError::poisoned("database"))?;
2091        let sql_limit = i64::try_from(limit)
2092            .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2093
2094        let sql = format!(
2095            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2096                    d.path, d.title, col.path, s.name
2097             FROM chunks c
2098             JOIN documents d ON d.id = c.doc_id
2099             JOIN collections col ON col.id = d.collection_id
2100             JOIN spaces s ON s.id = col.space_id
2101             LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2102             WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2103             ORDER BY c.id ASC
2104             LIMIT ?"
2105        );
2106        let mut stmt = conn.prepare(&sql)?;
2107        let mut params = Vec::with_capacity(scope_params.len() + 3);
2108        params.push(SqlValue::Text(model.to_string()));
2109        params.push(SqlValue::Integer(after_chunk_id));
2110        params.extend(scope_params);
2111        params.push(SqlValue::Integer(sql_limit));
2112        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2113            Ok(EmbedRecord {
2114                chunk: decode_chunk_row(row)?,
2115                doc_path: row.get(8)?,
2116                doc_title: decode_optional_title(row.get::<_, String>(9)?),
2117                collection_path: PathBuf::from(row.get::<_, String>(10)?),
2118                space_name: row.get(11)?,
2119            })
2120        })?;
2121
2122        let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2123        Ok(records)
2124    }
2125
2126    pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2127        let conn = self
2128            .db
2129            .lock()
2130            .map_err(|_| CoreError::poisoned("database"))?;
2131
2132        let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2133        Ok(deleted)
2134    }
2135
2136    pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2137        let conn = self
2138            .db
2139            .lock()
2140            .map_err(|_| CoreError::poisoned("database"))?;
2141        let _space_name = lookup_space_name(&conn, space_id)?;
2142
2143        let deleted = conn.execute(
2144            "DELETE FROM embeddings
2145             WHERE chunk_id IN (
2146                 SELECT c.id
2147                 FROM chunks c
2148                 JOIN documents d ON d.id = c.doc_id
2149                 JOIN collections col ON col.id = d.collection_id
2150                 WHERE col.space_id = ?1
2151             )",
2152            params![space_id],
2153        )?;
2154        Ok(deleted)
2155    }
2156
2157    pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2158        let conn = self
2159            .db
2160            .lock()
2161            .map_err(|_| CoreError::poisoned("database"))?;
2162        let _space_name = lookup_space_name(&conn, space_id)?;
2163
2164        let mut stmt = conn.prepare(
2165            "SELECT DISTINCT e.model
2166             FROM embeddings e
2167             JOIN chunks c ON c.id = e.chunk_id
2168             JOIN documents d ON d.id = c.doc_id
2169             JOIN collections col ON col.id = d.collection_id
2170             WHERE col.space_id = ?1
2171             ORDER BY e.model ASC",
2172        )?;
2173        let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2174        let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2175        Ok(models)
2176    }
2177
2178    pub fn count_embeddings(&self) -> Result<usize> {
2179        let conn = self
2180            .db
2181            .lock()
2182            .map_err(|_| CoreError::poisoned("database"))?;
2183
2184        let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2185        Ok(count as usize)
2186    }
2187
2188    pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2189        if entries.is_empty() {
2190            return Ok(());
2191        }
2192
2193        let space_indexes = self.get_space_indexes(space)?;
2194        with_tantivy_writer(&space_indexes, |writer| {
2195            for entry in entries {
2196                let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2197                    CoreError::Internal(format!(
2198                        "chunk_id must be non-negative for tantivy indexing: {}",
2199                        entry.chunk_id
2200                    ))
2201                })?;
2202                let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2203                    CoreError::Internal(format!(
2204                        "doc_id must be non-negative for tantivy indexing: {}",
2205                        entry.doc_id
2206                    ))
2207                })?;
2208
2209                let mut doc = TantivyDocument::default();
2210                doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2211                doc.add_u64(space_indexes.fields.doc_id, doc_id);
2212                doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2213                if let Some(title) = &entry.title {
2214                    doc.add_text(space_indexes.fields.title, title);
2215                }
2216                if let Some(heading) = &entry.heading {
2217                    doc.add_text(space_indexes.fields.heading, heading);
2218                }
2219                doc.add_text(space_indexes.fields.body, &entry.body);
2220                writer.add_document(doc)?;
2221            }
2222            Ok(())
2223        })
2224    }
2225
2226    pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2227        if chunk_ids.is_empty() {
2228            return Ok(());
2229        }
2230
2231        let space_indexes = self.get_space_indexes(space)?;
2232        with_tantivy_writer(&space_indexes, |writer| {
2233            for chunk_id in chunk_ids {
2234                let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2235                    CoreError::Internal(format!(
2236                        "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2237                    ))
2238                })?;
2239                writer.delete_term(Term::from_field_u64(
2240                    space_indexes.fields.chunk_id,
2241                    chunk_key,
2242                ));
2243            }
2244
2245            Ok(())
2246        })
2247    }
2248
2249    pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2250        let space_indexes = self.get_space_indexes(space)?;
2251        with_tantivy_writer(&space_indexes, |writer| {
2252            let doc_key = u64::try_from(doc_id).map_err(|_| {
2253                CoreError::Internal(format!(
2254                    "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2255                ))
2256            })?;
2257            writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2258            Ok(())
2259        })
2260    }
2261
2262    pub fn query_bm25(
2263        &self,
2264        space: &str,
2265        query: &str,
2266        fields: &[(&str, f32)],
2267        limit: usize,
2268    ) -> Result<Vec<BM25Hit>> {
2269        self.query_bm25_filtered(space, query, fields, None, limit, true)
2270    }
2271
2272    pub(crate) fn query_bm25_cached(
2273        &self,
2274        space: &str,
2275        query: &str,
2276        fields: &[(&str, f32)],
2277        limit: usize,
2278    ) -> Result<Vec<BM25Hit>> {
2279        self.query_bm25_filtered(space, query, fields, None, limit, false)
2280    }
2281
2282    pub fn query_bm25_in_documents(
2283        &self,
2284        space: &str,
2285        query: &str,
2286        fields: &[(&str, f32)],
2287        document_ids: &[i64],
2288        limit: usize,
2289    ) -> Result<Vec<BM25Hit>> {
2290        if document_ids.is_empty() {
2291            return Ok(Vec::new());
2292        }
2293
2294        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2295    }
2296
2297    pub(crate) fn query_bm25_in_documents_cached(
2298        &self,
2299        space: &str,
2300        query: &str,
2301        fields: &[(&str, f32)],
2302        document_ids: &[i64],
2303        limit: usize,
2304    ) -> Result<Vec<BM25Hit>> {
2305        if document_ids.is_empty() {
2306            return Ok(Vec::new());
2307        }
2308
2309        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2310    }
2311
2312    fn query_bm25_filtered(
2313        &self,
2314        space: &str,
2315        query: &str,
2316        fields: &[(&str, f32)],
2317        document_ids: Option<&[i64]>,
2318        limit: usize,
2319        reload_reader: bool,
2320    ) -> Result<Vec<BM25Hit>> {
2321        if limit == 0 {
2322            return Ok(Vec::new());
2323        }
2324
2325        if fields.is_empty() {
2326            return Err(CoreError::Internal(
2327                "bm25 query requires at least one field".to_string(),
2328            ));
2329        }
2330
2331        let space_indexes = self.get_space_indexes(space)?;
2332
2333        let schema = space_indexes.tantivy_index.schema();
2334        let query_fields = fields
2335            .iter()
2336            .map(|(name, boost)| {
2337                let field = resolve_tantivy_field(space_indexes.fields, name)?;
2338                let field_entry = schema.get_field_entry(field);
2339                let index_record_option = field_entry
2340                    .field_type()
2341                    .get_index_record_option()
2342                    .ok_or_else(|| {
2343                        CoreError::Internal(format!(
2344                            "bm25 field '{}' is not indexed",
2345                            field_entry.name()
2346                        ))
2347                    })?;
2348                Ok(Bm25FieldSpec {
2349                    field,
2350                    boost: *boost,
2351                    index_record_option,
2352                })
2353            })
2354            .collect::<Result<Vec<_>>>()?;
2355        let Some(parsed_query) =
2356            build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2357        else {
2358            return Ok(Vec::new());
2359        };
2360        let parsed_query = if let Some(document_ids) = document_ids {
2361            Box::new(BooleanQuery::new(vec![
2362                (Occur::Must, parsed_query),
2363                (
2364                    Occur::Must,
2365                    build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2366                ),
2367            ])) as Box<dyn Query>
2368        } else {
2369            parsed_query
2370        };
2371        if reload_reader {
2372            space_indexes.tantivy_reader.reload()?;
2373        }
2374        let searcher = space_indexes.tantivy_reader.searcher();
2375        let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2376
2377        let mut hits = Vec::with_capacity(docs.len());
2378        let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2379        let mut chunk_id_columns = HashMap::new();
2380        for (score, address) in docs {
2381            let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2382                std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2383                std::collections::hash_map::Entry::Vacant(entry) => {
2384                    let column = searcher
2385                        .segment_reader(address.segment_ord)
2386                        .fast_fields()
2387                        .u64(chunk_id_field_name)?;
2388                    entry.insert(column)
2389                }
2390            };
2391            let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2392                CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2393            })?;
2394            let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2395                CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2396            })?;
2397            hits.push(BM25Hit { chunk_id, score });
2398        }
2399
2400        Ok(hits)
2401    }
2402
2403    pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2404        let space_indexes = self.get_space_indexes(space)?;
2405        space_indexes.tantivy_reader.reload()?;
2406        Ok(())
2407    }
2408
2409    pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2410        let space_indexes = self.get_space_indexes(space)?;
2411        let mut writer = space_indexes
2412            .tantivy_writer
2413            .lock()
2414            .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2415        let Some(mut writer) = writer.take() else {
2416            return Ok(());
2417        };
2418        writer.commit()?;
2419        space_indexes.tantivy_reader.reload()?;
2420        Ok(())
2421    }
2422
2423    pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2424        self.batch_insert_usearch(space, &[(key, vector)])
2425    }
2426
2427    pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2428        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2429    }
2430
2431    pub(crate) fn batch_insert_usearch_deferred(
2432        &self,
2433        space: &str,
2434        entries: &[(i64, &[f32])],
2435    ) -> Result<()> {
2436        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2437    }
2438
2439    fn batch_insert_usearch_with_save_mode(
2440        &self,
2441        space: &str,
2442        entries: &[(i64, &[f32])],
2443        save_mode: UsearchSaveMode,
2444    ) -> Result<()> {
2445        if entries.is_empty() {
2446            return Ok(());
2447        }
2448
2449        let space_indexes = self.get_space_indexes(space)?;
2450
2451        let expected_dimensions = entries[0].1.len();
2452        if expected_dimensions == 0 {
2453            return Err(CoreError::Internal(
2454                "cannot insert empty vector into usearch index".to_string(),
2455            ));
2456        }
2457        for (_, vector) in entries {
2458            if vector.len() != expected_dimensions {
2459                return Err(CoreError::Internal(format!(
2460                    "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2461                    vector.len()
2462                )));
2463            }
2464        }
2465
2466        let mut index = space_indexes
2467            .usearch_index
2468            .write()
2469            .map_err(|_| CoreError::poisoned("usearch index"))?;
2470        let ensure_started = std::time::Instant::now();
2471        ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2472        crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2473
2474        let target_capacity = index.size().saturating_add(entries.len());
2475        let reserve_started = std::time::Instant::now();
2476        index.reserve(target_capacity).map_err(|err| {
2477            CoreError::Internal(format!(
2478                "usearch reserve failed for {target_capacity} items: {err}"
2479            ))
2480        })?;
2481        crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2482
2483        if matches!(save_mode, UsearchSaveMode::Deferred) {
2484            self.mark_usearch_dirty(space)?;
2485        }
2486
2487        let add_started = std::time::Instant::now();
2488        for (key, vector) in entries {
2489            let key = u64::try_from(*key).map_err(|_| {
2490                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2491            })?;
2492            index
2493                .add::<f32>(key, vector)
2494                .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2495        }
2496        crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2497
2498        if matches!(save_mode, UsearchSaveMode::Immediate) {
2499            let save_started = std::time::Instant::now();
2500            save_usearch_index(&index, &space_indexes.usearch_path)?;
2501            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2502        }
2503        Ok(())
2504    }
2505
2506    pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2507        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2508    }
2509
2510    pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2511        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2512    }
2513
2514    fn delete_usearch_with_save_mode(
2515        &self,
2516        space: &str,
2517        keys: &[i64],
2518        save_mode: UsearchSaveMode,
2519    ) -> Result<()> {
2520        if keys.is_empty() {
2521            return Ok(());
2522        }
2523
2524        let space_indexes = self.get_space_indexes(space)?;
2525        let index = space_indexes
2526            .usearch_index
2527            .write()
2528            .map_err(|_| CoreError::poisoned("usearch index"))?;
2529
2530        if matches!(save_mode, UsearchSaveMode::Deferred) {
2531            self.mark_usearch_dirty(space)?;
2532        }
2533
2534        let delete_started = std::time::Instant::now();
2535        for key in keys {
2536            let key = u64::try_from(*key).map_err(|_| {
2537                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2538            })?;
2539            index
2540                .remove(key)
2541                .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2542        }
2543        crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2544
2545        if matches!(save_mode, UsearchSaveMode::Immediate) {
2546            let save_started = std::time::Instant::now();
2547            save_usearch_index(&index, &space_indexes.usearch_path)?;
2548            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2549        }
2550        Ok(())
2551    }
2552
2553    pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2554        let spaces = {
2555            let mut dirty = self
2556                .dirty_usearch_spaces
2557                .lock()
2558                .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2559            if dirty.is_empty() {
2560                return Ok(());
2561            }
2562
2563            let mut spaces = dirty.drain().collect::<Vec<_>>();
2564            spaces.sort();
2565            spaces
2566        };
2567
2568        for (index, space) in spaces.iter().enumerate() {
2569            if let Err(err) = self.save_usearch_for_space(space) {
2570                let mut dirty = self
2571                    .dirty_usearch_spaces
2572                    .lock()
2573                    .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2574                for unsaved in &spaces[index..] {
2575                    dirty.insert(unsaved.clone());
2576                }
2577                return Err(err);
2578            }
2579            crate::profile::increment_update_count("usearch_saved_spaces", 1);
2580        }
2581
2582        Ok(())
2583    }
2584
2585    fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2586        self.dirty_usearch_spaces
2587            .lock()
2588            .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2589            .insert(space.to_string());
2590        Ok(())
2591    }
2592
2593    fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2594        let space_indexes = self.get_space_indexes(space)?;
2595        let index = space_indexes
2596            .usearch_index
2597            .read()
2598            .map_err(|_| CoreError::poisoned("usearch index"))?;
2599
2600        let save_started = std::time::Instant::now();
2601        save_usearch_index(&index, &space_indexes.usearch_path)?;
2602        crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2603        Ok(())
2604    }
2605
2606    pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2607        self.query_dense_filtered(space, vector, None, limit)
2608    }
2609
2610    pub fn query_dense_in_chunks(
2611        &self,
2612        space: &str,
2613        vector: &[f32],
2614        chunk_ids: &[i64],
2615        limit: usize,
2616    ) -> Result<Vec<DenseHit>> {
2617        if chunk_ids.is_empty() {
2618            return Ok(Vec::new());
2619        }
2620
2621        let allowed_keys = chunk_ids
2622            .iter()
2623            .map(|chunk_id| {
2624                u64::try_from(*chunk_id).map_err(|_| {
2625                    CoreError::Internal(format!(
2626                        "chunk_id must be non-negative for usearch query: {chunk_id}"
2627                    ))
2628                })
2629            })
2630            .collect::<Result<HashSet<_>>>()?;
2631        self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2632    }
2633
2634    pub(crate) fn query_dense_in_key_set(
2635        &self,
2636        space: &str,
2637        vector: &[f32],
2638        allowed_keys: &HashSet<u64>,
2639        limit: usize,
2640    ) -> Result<Vec<DenseHit>> {
2641        if allowed_keys.is_empty() {
2642            return Ok(Vec::new());
2643        }
2644
2645        self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2646    }
2647
2648    fn query_dense_filtered(
2649        &self,
2650        space: &str,
2651        vector: &[f32],
2652        allowed_keys: Option<&HashSet<u64>>,
2653        limit: usize,
2654    ) -> Result<Vec<DenseHit>> {
2655        if limit == 0 {
2656            return Ok(Vec::new());
2657        }
2658        if vector.is_empty() {
2659            return Err(CoreError::Internal(
2660                "cannot query usearch with empty vector".to_string(),
2661            ));
2662        }
2663
2664        let space_indexes = self.get_space_indexes(space)?;
2665        let index = space_indexes
2666            .usearch_index
2667            .read()
2668            .map_err(|_| CoreError::poisoned("usearch index"))?;
2669
2670        if index.size() == 0 {
2671            return Ok(Vec::new());
2672        }
2673        if vector.len() != index.dimensions() {
2674            return Err(CoreError::Internal(format!(
2675                "query vector dimension mismatch: expected {}, got {}",
2676                index.dimensions(),
2677                vector.len()
2678            )));
2679        }
2680
2681        let matches = if let Some(allowed_keys) = allowed_keys {
2682            index
2683                .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2684                .map_err(|err| {
2685                    CoreError::Internal(format!("usearch filtered query failed: {err}"))
2686                })?
2687        } else {
2688            index
2689                .search::<f32>(vector, limit)
2690                .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2691        };
2692        let hits = matches
2693            .keys
2694            .into_iter()
2695            .zip(matches.distances)
2696            .map(|(key, distance)| DenseHit {
2697                chunk_id: key as i64,
2698                distance,
2699            })
2700            .collect();
2701        Ok(hits)
2702    }
2703
2704    pub fn count_usearch(&self, space: &str) -> Result<usize> {
2705        let space_indexes = self.get_space_indexes(space)?;
2706        let index = space_indexes
2707            .usearch_index
2708            .read()
2709            .map_err(|_| CoreError::poisoned("usearch index"))?;
2710        Ok(index.size())
2711    }
2712
2713    pub fn clear_usearch(&self, space: &str) -> Result<()> {
2714        let space_indexes = self.get_space_indexes(space)?;
2715        let index = space_indexes
2716            .usearch_index
2717            .write()
2718            .map_err(|_| CoreError::poisoned("usearch index"))?;
2719        let clear_started = std::time::Instant::now();
2720        index
2721            .reset()
2722            .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2723        std::fs::File::create(&space_indexes.usearch_path)?;
2724        crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2725        Ok(())
2726    }
2727
2728    pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2729        self.get_fts_dirty_documents_filtered("", Vec::new())
2730    }
2731
2732    pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2733        self.get_fts_dirty_documents_filtered(
2734            " AND c.space_id = ?",
2735            vec![SqlValue::Integer(space_id)],
2736        )
2737    }
2738
2739    pub fn get_fts_dirty_documents_in_collections(
2740        &self,
2741        collection_ids: &[i64],
2742    ) -> Result<Vec<FtsDirtyRecord>> {
2743        if collection_ids.is_empty() {
2744            return Ok(Vec::new());
2745        }
2746
2747        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2748        let clause = format!(" AND d.collection_id IN ({placeholders})");
2749        let params = collection_ids
2750            .iter()
2751            .map(|id| SqlValue::Integer(*id))
2752            .collect::<Vec<_>>();
2753        self.get_fts_dirty_documents_filtered(&clause, params)
2754    }
2755
2756    fn get_fts_dirty_documents_filtered(
2757        &self,
2758        scope_clause: &str,
2759        scope_params: Vec<SqlValue>,
2760    ) -> Result<Vec<FtsDirtyRecord>> {
2761        let conn = self
2762            .db
2763            .lock()
2764            .map_err(|_| CoreError::poisoned("database"))?;
2765
2766        let sql = format!(
2767            "SELECT d.id, d.path, d.title, d.hash, c.path, s.name
2768             FROM documents d
2769             JOIN collections c ON c.id = d.collection_id
2770             JOIN spaces s ON s.id = c.space_id
2771             WHERE d.fts_dirty = 1{scope_clause}
2772             ORDER BY d.id ASC"
2773        );
2774        let mut stmt = conn.prepare(&sql)?;
2775        let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2776            Ok((
2777                row.get::<_, i64>(0)?,
2778                row.get::<_, String>(1)?,
2779                row.get::<_, String>(2)?,
2780                row.get::<_, String>(3)?,
2781                row.get::<_, String>(4)?,
2782                row.get::<_, String>(5)?,
2783            ))
2784        })?;
2785        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2786        drop(stmt);
2787
2788        let mut records = Vec::with_capacity(headers.len());
2789        for (doc_id, doc_path, doc_title, doc_hash, collection_path, space_name) in headers {
2790            let chunks = load_chunks_for_doc(&conn, doc_id)?;
2791            records.push(FtsDirtyRecord {
2792                doc_id,
2793                doc_path,
2794                doc_title: decode_optional_title(doc_title),
2795                doc_hash,
2796                collection_path: PathBuf::from(collection_path),
2797                space_name,
2798                chunks,
2799            });
2800        }
2801
2802        Ok(records)
2803    }
2804
2805    pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2806        if doc_ids.is_empty() {
2807            return Ok(());
2808        }
2809
2810        let conn = self
2811            .db
2812            .lock()
2813            .map_err(|_| CoreError::poisoned("database"))?;
2814
2815        let placeholders = vec!["?"; doc_ids.len()].join(", ");
2816        let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2817        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2818        Ok(())
2819    }
2820
2821    pub fn count_documents_in_collection(
2822        &self,
2823        collection_id: i64,
2824        active_only: bool,
2825    ) -> Result<usize> {
2826        let conn = self
2827            .db
2828            .lock()
2829            .map_err(|_| CoreError::poisoned("database"))?;
2830        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2831        let active_only = i64::from(active_only);
2832        query_count(
2833            &conn,
2834            "SELECT COUNT(*)
2835             FROM documents
2836             WHERE collection_id = ?1
2837               AND (?2 = 0 OR active = 1)",
2838            params![collection_id, active_only],
2839        )
2840    }
2841
2842    pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2843        let conn = self
2844            .db
2845            .lock()
2846            .map_err(|_| CoreError::poisoned("database"))?;
2847        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2848
2849        query_count(
2850            &conn,
2851            "SELECT COUNT(*)
2852             FROM chunks c
2853             JOIN documents d ON d.id = c.doc_id
2854             WHERE d.collection_id = ?1",
2855            params![collection_id],
2856        )
2857    }
2858
2859    pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2860        let conn = self
2861            .db
2862            .lock()
2863            .map_err(|_| CoreError::poisoned("database"))?;
2864        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2865
2866        query_count(
2867            &conn,
2868            "SELECT COUNT(DISTINCT e.chunk_id)
2869             FROM embeddings e
2870             JOIN chunks c ON c.id = e.chunk_id
2871             JOIN documents d ON d.id = c.doc_id
2872             WHERE d.collection_id = ?1",
2873            params![collection_id],
2874        )
2875    }
2876
2877    pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2878        let conn = self
2879            .db
2880            .lock()
2881            .map_err(|_| CoreError::poisoned("database"))?;
2882
2883        match space_id {
2884            Some(space_id) => {
2885                let _space_name = lookup_space_name(&conn, space_id)?;
2886                query_count(
2887                    &conn,
2888                    "SELECT COUNT(*)
2889                     FROM documents d
2890                     JOIN collections c ON c.id = d.collection_id
2891                     WHERE c.space_id = ?1",
2892                    params![space_id],
2893                )
2894            }
2895            None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2896        }
2897    }
2898
2899    pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2900        let conn = self
2901            .db
2902            .lock()
2903            .map_err(|_| CoreError::poisoned("database"))?;
2904
2905        match space_id {
2906            Some(space_id) => {
2907                let _space_name = lookup_space_name(&conn, space_id)?;
2908                query_count(
2909                    &conn,
2910                    "SELECT COUNT(*)
2911                     FROM chunks c
2912                     JOIN documents d ON d.id = c.doc_id
2913                     JOIN collections col ON col.id = d.collection_id
2914                     WHERE col.space_id = ?1",
2915                    params![space_id],
2916                )
2917            }
2918            None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2919        }
2920    }
2921
2922    pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2923        let conn = self
2924            .db
2925            .lock()
2926            .map_err(|_| CoreError::poisoned("database"))?;
2927
2928        match space_id {
2929            Some(space_id) => {
2930                let _space_name = lookup_space_name(&conn, space_id)?;
2931                query_count(
2932                    &conn,
2933                    "SELECT COUNT(DISTINCT e.chunk_id)
2934                     FROM embeddings e
2935                     JOIN chunks c ON c.id = e.chunk_id
2936                     JOIN documents d ON d.id = c.doc_id
2937                     JOIN collections col ON col.id = d.collection_id
2938                     WHERE col.space_id = ?1",
2939                    params![space_id],
2940                )
2941            }
2942            None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2943        }
2944    }
2945
2946    pub fn disk_usage(&self) -> Result<DiskUsage> {
2947        let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2948
2949        let mut tantivy_bytes = 0_u64;
2950        let mut usearch_bytes = 0_u64;
2951        let spaces_dir = self.cache_dir.join(SPACES_DIR);
2952        if spaces_dir.exists() {
2953            for entry in std::fs::read_dir(&spaces_dir)? {
2954                let space_dir = entry?.path();
2955                if !space_dir.is_dir() {
2956                    continue;
2957                }
2958
2959                tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2960                usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2961            }
2962        }
2963
2964        let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2965        let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2966
2967        Ok(DiskUsage {
2968            sqlite_bytes,
2969            tantivy_bytes,
2970            usearch_bytes,
2971            models_bytes,
2972            total_bytes,
2973        })
2974    }
2975
2976    fn unload_space(&self, name: &str) -> Result<()> {
2977        let mut spaces = self
2978            .spaces
2979            .write()
2980            .map_err(|_| CoreError::poisoned("spaces"))?;
2981        spaces.remove(name);
2982        Ok(())
2983    }
2984
2985    fn remove_space_artifacts(&self, name: &str) -> Result<()> {
2986        let space_root = self.space_root_path(name);
2987        if space_root.exists() {
2988            std::fs::remove_dir_all(space_root)?;
2989        }
2990        Ok(())
2991    }
2992
2993    fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
2994        let old_root = self.space_root_path(old);
2995        let new_root = self.space_root_path(new);
2996        if !old_root.exists() {
2997            return Ok(());
2998        }
2999
3000        if new_root.exists() {
3001            return Err(CoreError::Internal(format!(
3002                "cannot rename space artifacts: destination already exists: {}",
3003                new_root.display()
3004            )));
3005        }
3006
3007        if let Some(parent) = new_root.parent() {
3008            std::fs::create_dir_all(parent)?;
3009        }
3010        std::fs::rename(old_root, new_root)?;
3011        Ok(())
3012    }
3013
3014    fn space_root_path(&self, name: &str) -> PathBuf {
3015        self.cache_dir.join(SPACES_DIR).join(name)
3016    }
3017
3018    fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3019        let space_root = self.space_root_path(name);
3020        let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3021        let usearch_path = space_root.join(USEARCH_FILENAME);
3022        (tantivy_dir, usearch_path)
3023    }
3024
3025    fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3026        self.open_space(name)?;
3027        let spaces = self
3028            .spaces
3029            .read()
3030            .map_err(|_| CoreError::poisoned("spaces"))?;
3031        spaces.get(name).cloned().ok_or_else(|| {
3032            KboltError::SpaceNotFound {
3033                name: name.to_string(),
3034            }
3035            .into()
3036        })
3037    }
3038}
3039
3040fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3041    let result = conn.query_row(
3042        "SELECT name FROM spaces WHERE id = ?1",
3043        params![space_id],
3044        |row| row.get::<_, String>(0),
3045    );
3046    match result {
3047        Ok(name) => Ok(name),
3048        Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3049            name: format!("id={space_id}"),
3050        }
3051        .into()),
3052        Err(err) => Err(err.into()),
3053    }
3054}
3055
3056fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3057    let result = conn.query_row(
3058        "SELECT name FROM collections WHERE id = ?1",
3059        params![collection_id],
3060        |row| row.get::<_, String>(0),
3061    );
3062    match result {
3063        Ok(name) => Ok(name),
3064        Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3065            name: format!("id={collection_id}"),
3066        }
3067        .into()),
3068        Err(err) => Err(err.into()),
3069    }
3070}
3071
3072fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3073    let result = conn.query_row(
3074        "SELECT id FROM documents WHERE id = ?1",
3075        params![doc_id],
3076        |row| row.get::<_, i64>(0),
3077    );
3078    match result {
3079        Ok(id) => Ok(id),
3080        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3081            path: format!("id={doc_id}"),
3082        }
3083        .into()),
3084        Err(err) => Err(err.into()),
3085    }
3086}
3087
3088fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3089    let mut stmt = conn.prepare(
3090        "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3091         FROM chunks
3092         WHERE doc_id = ?1
3093         ORDER BY seq ASC",
3094    )?;
3095    let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3096    let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3097    Ok(chunks)
3098}
3099
3100fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3101    let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3102    let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3103    let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3104    Ok(chunk_ids)
3105}
3106
3107fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3108    let result = conn.query_row(
3109        "SELECT doc_id FROM chunks WHERE id = ?1",
3110        params![chunk_id],
3111        |row| row.get::<_, i64>(0),
3112    );
3113    match result {
3114        Ok(doc_id) => Ok(doc_id),
3115        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3116            path: format!("chunk_id={chunk_id}"),
3117        }
3118        .into()),
3119        Err(err) => Err(err.into()),
3120    }
3121}
3122
3123fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3124    let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3125    Ok(count as usize)
3126}
3127
3128fn file_size_or_zero(path: &Path) -> Result<u64> {
3129    if !path.exists() {
3130        return Ok(0);
3131    }
3132
3133    let metadata = std::fs::metadata(path)?;
3134    if metadata.is_file() {
3135        Ok(metadata.len())
3136    } else {
3137        Ok(0)
3138    }
3139}
3140
3141fn dir_size_or_zero(path: &Path) -> Result<u64> {
3142    if !path.exists() {
3143        return Ok(0);
3144    }
3145
3146    let mut total = 0_u64;
3147    for entry in std::fs::read_dir(path)? {
3148        let entry = entry?;
3149        let child_path = entry.path();
3150        let metadata = std::fs::symlink_metadata(&child_path)?;
3151        if metadata.is_file() {
3152            total += metadata.len();
3153        } else if metadata.is_dir() {
3154            total += dir_size_or_zero(&child_path)?;
3155        }
3156    }
3157
3158    Ok(total)
3159}
3160
3161fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3162    let meta_path = path.join("meta.json");
3163    if meta_path.exists() {
3164        return Ok(Index::open_in_dir(path)?);
3165    }
3166
3167    Ok(Index::create_in_dir(path, tantivy_schema())?)
3168}
3169
3170fn with_tantivy_writer<T>(
3171    space_indexes: &SpaceIndexes,
3172    f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3173) -> Result<T> {
3174    let mut writer = space_indexes
3175        .tantivy_writer
3176        .lock()
3177        .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3178
3179    if writer.is_none() {
3180        *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3181    }
3182
3183    let writer = writer
3184        .as_mut()
3185        .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3186    f(writer)
3187}
3188
3189fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3190    if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3191        return Ok(());
3192    }
3193
3194    let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3195    if document_count == 0 {
3196        return Ok(());
3197    }
3198
3199    Err(KboltError::Config(
3200        "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3201    )
3202    .into())
3203}
3204
3205fn ensure_schema_version(conn: &Connection) -> Result<()> {
3206    let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3207    if current > SCHEMA_VERSION {
3208        return Err(KboltError::Config(format!(
3209            "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3210        ))
3211        .into());
3212    }
3213
3214    if current < SCHEMA_VERSION {
3215        conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3216    }
3217
3218    Ok(())
3219}
3220
3221fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3222    let exists = conn.query_row(
3223        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3224        [table],
3225        |row| row.get::<_, i64>(0),
3226    )?;
3227    Ok(exists != 0)
3228}
3229
3230fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3231    let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3232    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3233    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3234    drop(stmt);
3235
3236    if columns.iter().any(|column| column == "generation_key") {
3237        return Ok(());
3238    }
3239
3240    conn.execute(
3241        "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3242        [],
3243    )?;
3244    Ok(())
3245}
3246
3247fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3248    let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3249    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3250    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3251    drop(stmt);
3252
3253    if columns.iter().any(|column| column == "retrieval_prefix") {
3254        return Ok(());
3255    }
3256
3257    conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3258    Ok(())
3259}
3260
3261fn build_literal_bm25_query(
3262    index: &Index,
3263    fields: &[Bm25FieldSpec],
3264    query: &str,
3265) -> Result<Option<Box<dyn Query>>> {
3266    let mut clauses = Vec::new();
3267    for field in fields {
3268        for token in analyzed_terms_for_field(index, field.field, query)? {
3269            let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3270                Term::from_field_text(field.field, &token),
3271                field.index_record_option,
3272            ));
3273            let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3274                Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3275            } else {
3276                term_query
3277            };
3278            clauses.push((Occur::Should, query));
3279        }
3280    }
3281
3282    if clauses.is_empty() {
3283        Ok(None)
3284    } else {
3285        Ok(Some(Box::new(BooleanQuery::new(clauses))))
3286    }
3287}
3288
3289fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3290    let mut terms = Vec::new();
3291    let mut seen = HashSet::new();
3292    for doc_id in document_ids {
3293        let doc_id = u64::try_from(*doc_id).map_err(|_| {
3294            CoreError::Internal(format!(
3295                "doc_id must be non-negative for tantivy query: {doc_id}"
3296            ))
3297        })?;
3298        if seen.insert(doc_id) {
3299            terms.push(Term::from_field_u64(field, doc_id));
3300        }
3301    }
3302
3303    Ok(Box::new(ConstScoreQuery::new(
3304        Box::new(TermSetQuery::new(terms)),
3305        0.0,
3306    )))
3307}
3308
3309fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3310    let mut analyzer = index.tokenizer_for_field(field)?;
3311    let mut stream = analyzer.token_stream(query);
3312    let mut terms = Vec::new();
3313    let mut seen = HashSet::new();
3314    while let Some(token) = stream.next() {
3315        if token.text.is_empty() {
3316            continue;
3317        }
3318        let text = token.text.clone();
3319        if seen.insert(text.clone()) {
3320            terms.push(text);
3321        }
3322    }
3323    Ok(terms)
3324}
3325
3326fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3327    let options = IndexOptions {
3328        dimensions,
3329        metric: MetricKind::Cos,
3330        quantization: ScalarKind::F32,
3331        connectivity: 16,
3332        expansion_add: 200,
3333        expansion_search: 100,
3334        ..IndexOptions::default()
3335    };
3336    usearch::Index::new(&options)
3337        .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3338}
3339
3340fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3341    let index = new_usearch_index(256)?;
3342    let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3343    if file_size > 0 {
3344        let path = path
3345            .to_str()
3346            .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3347        index
3348            .load(path)
3349            .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3350    }
3351    Ok(index)
3352}
3353
3354fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3355    if index.size() == 0 && index.dimensions() != expected_dimensions {
3356        *index = new_usearch_index(expected_dimensions)?;
3357        return Ok(());
3358    }
3359
3360    if index.dimensions() != expected_dimensions {
3361        return Err(CoreError::Internal(format!(
3362            "usearch vector dimension mismatch: index expects {}, got {}",
3363            index.dimensions(),
3364            expected_dimensions
3365        )));
3366    }
3367    Ok(())
3368}
3369
3370fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3371    if index.size() == 0 {
3372        std::fs::File::create(path)?;
3373        return Ok(());
3374    }
3375
3376    let path = path
3377        .to_str()
3378        .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3379    index
3380        .save(path)
3381        .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3382    Ok(())
3383}
3384
3385fn tantivy_schema() -> tantivy::schema::Schema {
3386    let mut builder = tantivy::schema::Schema::builder();
3387    builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3388    builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3389    builder.add_text_field("filepath", TEXT | STORED);
3390    builder.add_text_field("title", TEXT | STORED);
3391    builder.add_text_field("heading", TEXT | STORED);
3392    builder.add_text_field("body", TEXT);
3393    builder.build()
3394}
3395
3396fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3397    Ok(TantivyFields {
3398        chunk_id: schema.get_field("chunk_id").map_err(|_| {
3399            CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3400        })?,
3401        doc_id: schema
3402            .get_field("doc_id")
3403            .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3404        filepath: schema.get_field("filepath").map_err(|_| {
3405            CoreError::Internal("tantivy schema missing field: filepath".to_string())
3406        })?,
3407        title: schema
3408            .get_field("title")
3409            .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3410        heading: schema.get_field("heading").map_err(|_| {
3411            CoreError::Internal("tantivy schema missing field: heading".to_string())
3412        })?,
3413        body: schema
3414            .get_field("body")
3415            .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3416    })
3417}
3418
3419fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3420    match name {
3421        "chunk_id" => Ok(fields.chunk_id),
3422        "doc_id" => Ok(fields.doc_id),
3423        "filepath" => Ok(fields.filepath),
3424        "title" => Ok(fields.title),
3425        "heading" => Ok(fields.heading),
3426        "body" => Ok(fields.body),
3427        other => Err(CoreError::Internal(format!(
3428            "unsupported tantivy field: {other}"
3429        ))),
3430    }
3431}
3432
3433fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3434    match extensions {
3435        None => Ok(None),
3436        Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3437    }
3438}
3439
3440fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3441    match raw {
3442        None => Ok(None),
3443        Some(json) => serde_json::from_str::<Vec<String>>(&json)
3444            .map(Some)
3445            .map_err(Into::into),
3446    }
3447}
3448
3449fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3450    Ok(SpaceRow {
3451        id: row.get(0)?,
3452        name: row.get(1)?,
3453        description: row.get(2)?,
3454        created: row.get(3)?,
3455    })
3456}
3457
3458fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3459    let raw_extensions: Option<String> = row.get(5)?;
3460    let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3461        Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3462    })?;
3463    Ok(CollectionRow {
3464        id: row.get(0)?,
3465        space_id: row.get(1)?,
3466        name: row.get(2)?,
3467        path: PathBuf::from(row.get::<_, String>(3)?),
3468        description: row.get(4)?,
3469        extensions,
3470        created: row.get(6)?,
3471        updated: row.get(7)?,
3472    })
3473}
3474
3475fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3476    decode_document_row_at(row, 0)
3477}
3478
3479fn decode_document_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<DocumentRow> {
3480    let active_value: i64 = row.get(base + 6)?;
3481    let fts_dirty_value: i64 = row.get(base + 8)?;
3482    Ok(DocumentRow {
3483        id: row.get(base)?,
3484        collection_id: row.get(base + 1)?,
3485        path: row.get(base + 2)?,
3486        title: decode_optional_title(row.get::<_, String>(base + 3)?),
3487        hash: row.get(base + 4)?,
3488        modified: row.get(base + 5)?,
3489        active: active_value != 0,
3490        deactivated_at: row.get(base + 7)?,
3491        fts_dirty: fts_dirty_value != 0,
3492    })
3493}
3494
3495fn normalized_title(title: Option<&str>) -> Option<&str> {
3496    title.map(str::trim).filter(|title| !title.is_empty())
3497}
3498
3499fn decode_optional_title(title: String) -> Option<String> {
3500    let trimmed = title.trim();
3501    if trimmed.is_empty() {
3502        None
3503    } else if trimmed.len() == title.len() {
3504        Some(title)
3505    } else {
3506        Some(trimmed.to_string())
3507    }
3508}
3509
3510fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3511    Ok(DocumentTextRow {
3512        doc_id: row.get(0)?,
3513        extractor_key: row.get(1)?,
3514        source_hash: row.get(2)?,
3515        text_hash: row.get(3)?,
3516        generation_key: row.get(4)?,
3517        text: row.get(5)?,
3518        created: row.get(6)?,
3519    })
3520}
3521
3522fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3523    decode_chunk_row_at(row, 0)
3524}
3525
3526fn decode_chunk_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<ChunkRow> {
3527    let offset_value: i64 = row.get(base + 3)?;
3528    let length_value: i64 = row.get(base + 4)?;
3529    let kind_raw: String = row.get(base + 6)?;
3530    let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3531        Error::FromSqlConversionFailure(base + 6, rusqlite::types::Type::Text, Box::new(err))
3532    })?;
3533    Ok(ChunkRow {
3534        id: row.get(base)?,
3535        doc_id: row.get(base + 1)?,
3536        seq: row.get(base + 2)?,
3537        offset: decode_non_negative_usize(offset_value, base + 3, "chunks.offset")?,
3538        length: decode_non_negative_usize(length_value, base + 4, "chunks.length")?,
3539        heading: row.get(base + 5)?,
3540        kind,
3541        retrieval_prefix: row.get(base + 7)?,
3542    })
3543}
3544
3545fn decode_non_negative_usize(
3546    value: i64,
3547    column: usize,
3548    name: &'static str,
3549) -> rusqlite::Result<usize> {
3550    if value < 0 {
3551        return Err(Error::FromSqlConversionFailure(
3552            column,
3553            SqlType::Integer,
3554            Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3555        ));
3556    }
3557
3558    Ok(value as usize)
3559}
3560
3561fn canonical_chunk_slice_from_bytes(
3562    chunk_id: i64,
3563    offset_value: i64,
3564    length_value: i64,
3565    document_len_value: i64,
3566    start_byte: Vec<u8>,
3567    end_byte: Vec<u8>,
3568    bytes: Vec<u8>,
3569) -> Result<String> {
3570    if offset_value < 0 {
3571        return Err(CoreError::Internal(format!(
3572            "chunk {chunk_id} offset must not be negative"
3573        )));
3574    }
3575    if length_value < 0 {
3576        return Err(CoreError::Internal(format!(
3577            "chunk {chunk_id} length must not be negative"
3578        )));
3579    }
3580    if document_len_value < 0 {
3581        return Err(CoreError::Internal(format!(
3582            "document text length for chunk {chunk_id} must not be negative"
3583        )));
3584    }
3585
3586    let offset = offset_value as usize;
3587    let length = length_value as usize;
3588    let document_len = document_len_value as usize;
3589    let end = offset.checked_add(length).ok_or_else(|| {
3590        CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3591    })?;
3592    if end > document_len {
3593        return Err(CoreError::Internal(format!(
3594            "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3595        )));
3596    }
3597    if bytes.len() != length {
3598        return Err(CoreError::Internal(format!(
3599            "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3600            bytes.len()
3601        )));
3602    }
3603    validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3604    validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3605
3606    String::from_utf8(bytes).map_err(|err| {
3607        CoreError::Internal(format!(
3608            "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3609        ))
3610    })
3611}
3612
3613fn validate_sqlite_utf8_boundary(
3614    chunk_id: i64,
3615    index: usize,
3616    document_len: usize,
3617    byte_at_index: &[u8],
3618) -> Result<()> {
3619    if index == 0 || index == document_len {
3620        return Ok(());
3621    }
3622    if byte_at_index.len() != 1 {
3623        return Err(CoreError::Internal(format!(
3624            "chunk {chunk_id} text boundary byte at {index} is missing"
3625        )));
3626    }
3627    if (0x80..0xC0).contains(&byte_at_index[0]) {
3628        return Err(CoreError::Internal(format!(
3629            "chunk {chunk_id} text span is not on UTF-8 boundaries"
3630        )));
3631    }
3632
3633    Ok(())
3634}
3635
3636pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3637    let label = format!("chunk {}", chunk.id);
3638    let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3639
3640    Ok(document_text[chunk.offset..end].to_string())
3641}
3642
3643fn validate_text_span(
3644    document_text: &str,
3645    offset: usize,
3646    length: usize,
3647    label: &str,
3648) -> Result<usize> {
3649    let end = offset
3650        .checked_add(length)
3651        .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3652    if end > document_text.len() {
3653        return Err(CoreError::Internal(format!(
3654            "{label} text span {}..{} exceeds document text length {}",
3655            offset,
3656            end,
3657            document_text.len()
3658        )));
3659    }
3660    if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3661        return Err(CoreError::Internal(format!(
3662            "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3663        )));
3664    }
3665
3666    Ok(end)
3667}
3668
3669pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3670    KboltError::Internal(format!(
3671        "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3672    ))
3673    .into()
3674}
3675
3676#[cfg(test)]
3677mod tests;