Skip to main content

kbolt_core/storage/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12    BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29    Immediate,
30    Deferred,
31}
32
33pub struct Storage {
34    db: Mutex<Connection>,
35    cache_dir: PathBuf,
36    spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37    dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42    pub id: i64,
43    pub name: String,
44    pub description: Option<String>,
45    pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50    pub id: i64,
51    pub space_id: i64,
52    pub name: String,
53    pub path: PathBuf,
54    pub description: Option<String>,
55    pub extensions: Option<Vec<String>>,
56    pub created: String,
57    pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62    pub id: i64,
63    pub collection_id: i64,
64    pub path: String,
65    pub title: Option<String>,
66    pub hash: String,
67    pub modified: String,
68    pub active: bool,
69    pub deactivated_at: Option<String>,
70    pub fts_dirty: bool,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FileListRow {
75    pub doc_id: i64,
76    pub path: String,
77    pub title: Option<String>,
78    pub hash: String,
79    pub active: bool,
80    pub chunk_count: usize,
81    pub embedded_chunk_count: usize,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct ChunkRow {
86    pub id: i64,
87    pub doc_id: i64,
88    pub seq: i32,
89    pub offset: usize,
90    pub length: usize,
91    pub heading: Option<String>,
92    pub kind: FinalChunkKind,
93    pub retrieval_prefix: Option<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ChunkInsert {
98    pub seq: i32,
99    pub offset: usize,
100    pub length: usize,
101    pub heading: Option<String>,
102    pub kind: FinalChunkKind,
103    pub retrieval_prefix: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct DocumentTextRow {
108    pub doc_id: i64,
109    pub extractor_key: String,
110    pub source_hash: String,
111    pub text_hash: String,
112    pub generation_key: String,
113    pub text: String,
114    pub created: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct ChunkTextRow {
119    pub chunk: ChunkRow,
120    pub extractor_key: String,
121    pub text: String,
122}
123
124pub struct DocumentGenerationReplace<'a> {
125    pub collection_id: i64,
126    pub path: &'a str,
127    pub title: Option<&'a str>,
128    pub hash: &'a str,
129    pub modified: &'a str,
130    pub extractor_key: &'a str,
131    pub source_hash: &'a str,
132    pub text_hash: &'a str,
133    pub generation_key: &'a str,
134    pub text: &'a str,
135    pub chunks: &'a [ChunkInsert],
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct DocumentGenerationReplaceResult {
140    pub doc_id: i64,
141    pub old_chunk_ids: Vec<i64>,
142    pub chunk_ids: Vec<i64>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct EmbedRecord {
147    pub chunk: ChunkRow,
148    pub doc_path: String,
149    pub doc_title: Option<String>,
150    pub collection_path: PathBuf,
151    pub space_name: String,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct FtsDirtyRecord {
156    pub doc_id: i64,
157    pub doc_path: String,
158    pub doc_title: Option<String>,
159    pub doc_hash: String,
160    pub collection_path: PathBuf,
161    pub space_name: String,
162    pub chunks: Vec<ChunkRow>,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct ReapableDocument {
167    pub doc_id: i64,
168    pub space_name: String,
169    pub chunk_ids: Vec<i64>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
173pub struct TantivyEntry {
174    pub chunk_id: i64,
175    pub doc_id: i64,
176    pub filepath: String,
177    pub title: Option<String>,
178    pub heading: Option<String>,
179    pub body: String,
180}
181
182#[derive(Debug, Clone, PartialEq)]
183pub struct BM25Hit {
184    pub chunk_id: i64,
185    pub score: f32,
186}
187
188#[derive(Debug, Clone, PartialEq)]
189pub struct DenseHit {
190    pub chunk_id: i64,
191    pub distance: f32,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct SearchScopeSummary {
196    pub document_ids: Vec<i64>,
197    pub chunk_count: usize,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub enum SpaceResolution {
202    Found(SpaceRow),
203    Ambiguous(Vec<String>),
204    NotFound,
205}
206
207struct SpaceIndexes {
208    _tantivy_dir: PathBuf,
209    usearch_path: PathBuf,
210    tantivy_index: Index,
211    tantivy_reader: IndexReader,
212    tantivy_writer: Mutex<Option<IndexWriter>>,
213    usearch_index: RwLock<usearch::Index>,
214    fields: TantivyFields,
215}
216
217#[derive(Debug, Clone, Copy)]
218struct TantivyFields {
219    chunk_id: Field,
220    doc_id: Field,
221    filepath: Field,
222    title: Field,
223    heading: Field,
224    body: Field,
225}
226
227#[derive(Debug, Clone, Copy)]
228struct Bm25FieldSpec {
229    field: Field,
230    boost: f32,
231    index_record_option: IndexRecordOption,
232}
233
234impl Storage {
235    pub fn new(cache_dir: &Path) -> Result<Self> {
236        std::fs::create_dir_all(cache_dir)?;
237        let db_path = cache_dir.join(DB_FILE);
238        let conn = Connection::open(db_path)?;
239        reject_incompatible_legacy_index(&conn)?;
240        conn.execute_batch(
241            r#"
242PRAGMA foreign_keys = ON;
243PRAGMA journal_mode = WAL;
244
245CREATE TABLE IF NOT EXISTS spaces (
246    id          INTEGER PRIMARY KEY,
247    name        TEXT NOT NULL UNIQUE,
248    description TEXT,
249    created     TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
250);
251
252CREATE TABLE IF NOT EXISTS collections (
253    id          INTEGER PRIMARY KEY,
254    space_id    INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
255    name        TEXT NOT NULL,
256    path        TEXT NOT NULL,
257    description TEXT,
258    extensions  TEXT,
259    created     TEXT NOT NULL,
260    updated     TEXT NOT NULL,
261    UNIQUE(space_id, name)
262);
263
264CREATE TABLE IF NOT EXISTS documents (
265    id              INTEGER PRIMARY KEY,
266    collection_id   INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
267    path            TEXT NOT NULL,
268    title           TEXT NOT NULL DEFAULT '',
269    hash            TEXT NOT NULL,
270    modified        TEXT NOT NULL,
271    active          INTEGER NOT NULL DEFAULT 1,
272    deactivated_at  TEXT,
273    fts_dirty       INTEGER NOT NULL DEFAULT 0,
274    UNIQUE(collection_id, path)
275);
276CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
277CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
278CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
279
280CREATE TABLE IF NOT EXISTS chunks (
281    id       INTEGER PRIMARY KEY,
282    doc_id   INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
283    seq      INTEGER NOT NULL,
284    offset   INTEGER NOT NULL,
285    length   INTEGER NOT NULL,
286    heading  TEXT,
287    kind     TEXT NOT NULL DEFAULT 'section',
288    retrieval_prefix TEXT,
289    UNIQUE(doc_id, seq)
290);
291CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
292
293CREATE TABLE IF NOT EXISTS embeddings (
294    chunk_id    INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
295    model       TEXT NOT NULL,
296    embedded_at TEXT NOT NULL,
297    PRIMARY KEY (chunk_id, model)
298);
299
300CREATE TABLE IF NOT EXISTS document_texts (
301    doc_id            INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
302    extractor_key     TEXT NOT NULL,
303    source_hash       TEXT NOT NULL,
304    text_hash         TEXT NOT NULL,
305    generation_key    TEXT NOT NULL DEFAULT '',
306    text              TEXT NOT NULL,
307    created           TEXT NOT NULL
308);
309"#,
310        )?;
311        ensure_document_texts_generation_key_column(&conn)?;
312        ensure_chunks_retrieval_prefix_column(&conn)?;
313        ensure_schema_version(&conn)?;
314
315        conn.execute(
316            "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
317            params![DEFAULT_SPACE_NAME],
318        )?;
319
320        let storage = Self {
321            db: Mutex::new(conn),
322            cache_dir: cache_dir.to_path_buf(),
323            spaces: RwLock::new(HashMap::new()),
324            dirty_usearch_spaces: Mutex::new(HashSet::new()),
325        };
326        storage.open_space(DEFAULT_SPACE_NAME)?;
327
328        Ok(storage)
329    }
330
331    pub fn open_space(&self, name: &str) -> Result<()> {
332        let _space = self.get_space(name)?;
333
334        {
335            let spaces = self
336                .spaces
337                .read()
338                .map_err(|_| CoreError::poisoned("spaces"))?;
339            if spaces.contains_key(name) {
340                return Ok(());
341            }
342        }
343
344        let (tantivy_dir, usearch_path) = self.space_paths(name);
345        std::fs::create_dir_all(&tantivy_dir)?;
346        std::fs::OpenOptions::new()
347            .create(true)
348            .append(true)
349            .open(&usearch_path)?;
350
351        let mut spaces = self
352            .spaces
353            .write()
354            .map_err(|_| CoreError::poisoned("spaces"))?;
355        if spaces.contains_key(name) {
356            return Ok(());
357        }
358
359        let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
360        let tantivy_reader = tantivy_index.reader()?;
361        let usearch_index = open_or_create_usearch_index(&usearch_path)?;
362        let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
363
364        spaces.insert(
365            name.to_string(),
366            Arc::new(SpaceIndexes {
367                _tantivy_dir: tantivy_dir,
368                usearch_path,
369                tantivy_index,
370                tantivy_reader,
371                tantivy_writer: Mutex::new(None),
372                usearch_index: RwLock::new(usearch_index),
373                fields,
374            }),
375        );
376
377        Ok(())
378    }
379
380    pub fn close_space(&self, name: &str) -> Result<()> {
381        let _space = self.get_space(name)?;
382        let mut spaces = self
383            .spaces
384            .write()
385            .map_err(|_| CoreError::poisoned("spaces"))?;
386        let _removed = spaces.remove(name);
387        Ok(())
388    }
389
390    pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
391        let space_id = {
392            let conn = self
393                .db
394                .lock()
395                .map_err(|_| CoreError::poisoned("database"))?;
396
397            let result = conn.execute(
398                "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
399                params![name, description],
400            );
401
402            match result {
403                Ok(_) => conn.last_insert_rowid(),
404                Err(err) => {
405                    return match err {
406                        Error::SqliteFailure(sqlite_err, _)
407                            if sqlite_err.code == ErrorCode::ConstraintViolation =>
408                        {
409                            Err(KboltError::SpaceAlreadyExists {
410                                name: name.to_string(),
411                            }
412                            .into())
413                        }
414                        other => Err(other.into()),
415                    };
416                }
417            }
418        };
419
420        if let Err(open_err) = self.open_space(name) {
421            let rollback_result = self
422                .db
423                .lock()
424                .map_err(|_| CoreError::poisoned("database"))?
425                .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
426
427            if let Err(rollback_err) = rollback_result {
428                return Err(CoreError::Internal(format!(
429                    "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
430                )));
431            }
432
433            return Err(open_err);
434        }
435
436        Ok(space_id)
437    }
438
439    pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
440        let conn = self
441            .db
442            .lock()
443            .map_err(|_| CoreError::poisoned("database"))?;
444        let mut stmt =
445            conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
446
447        let row = stmt.query_row(params![name], decode_space_row);
448
449        match row {
450            Ok(space) => Ok(space),
451            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
452                name: name.to_string(),
453            }
454            .into()),
455            Err(err) => Err(err.into()),
456        }
457    }
458
459    pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
460        let conn = self
461            .db
462            .lock()
463            .map_err(|_| CoreError::poisoned("database"))?;
464        let mut stmt =
465            conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
466
467        let row = stmt.query_row(params![id], decode_space_row);
468
469        match row {
470            Ok(space) => Ok(space),
471            Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
472                name: format!("id={id}"),
473            }
474            .into()),
475            Err(err) => Err(err.into()),
476        }
477    }
478
479    pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
480        let conn = self
481            .db
482            .lock()
483            .map_err(|_| CoreError::poisoned("database"))?;
484        let mut stmt = conn.prepare(
485            "SELECT id, name, description, created
486             FROM spaces
487             ORDER BY name ASC",
488        )?;
489
490        let rows = stmt.query_map([], decode_space_row)?;
491
492        let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
493        Ok(spaces)
494    }
495
496    pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
497        let conn = self
498            .db
499            .lock()
500            .map_err(|_| CoreError::poisoned("database"))?;
501        let mut stmt = conn.prepare(
502            "SELECT s.id, s.name, s.description, s.created
503             FROM spaces s
504             JOIN collections c ON c.space_id = s.id
505             WHERE c.name = ?1
506             ORDER BY s.name ASC",
507        )?;
508        let rows = stmt.query_map(params![collection], decode_space_row)?;
509        let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
510
511        if matches.is_empty() {
512            return Ok(SpaceResolution::NotFound);
513        }
514
515        if matches.len() == 1 {
516            return Ok(SpaceResolution::Found(matches[0].clone()));
517        }
518
519        let spaces = matches.into_iter().map(|space| space.name).collect();
520        Ok(SpaceResolution::Ambiguous(spaces))
521    }
522
523    pub fn delete_space(&self, name: &str) -> Result<()> {
524        if name == DEFAULT_SPACE_NAME {
525            let conn = self
526                .db
527                .lock()
528                .map_err(|_| CoreError::poisoned("database"))?;
529            conn.execute(
530                "DELETE FROM collections
531                 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
532                params![name],
533            )?;
534            drop(conn);
535
536            self.unload_space(name)?;
537            self.remove_space_artifacts(name)?;
538            self.open_space(name)?;
539            return Ok(());
540        }
541
542        let conn = self
543            .db
544            .lock()
545            .map_err(|_| CoreError::poisoned("database"))?;
546        let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
547        drop(conn);
548
549        if deleted == 0 {
550            return Err(KboltError::SpaceNotFound {
551                name: name.to_string(),
552            }
553            .into());
554        }
555
556        self.unload_space(name)?;
557        self.remove_space_artifacts(name)?;
558
559        Ok(())
560    }
561
562    pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
563        if old == DEFAULT_SPACE_NAME {
564            return Err(
565                KboltError::Config("cannot rename reserved space: default".to_string()).into(),
566            );
567        }
568
569        let conn = self
570            .db
571            .lock()
572            .map_err(|_| CoreError::poisoned("database"))?;
573
574        let result = conn.execute(
575            "UPDATE spaces SET name = ?1 WHERE name = ?2",
576            params![new, old],
577        );
578        drop(conn);
579
580        match result {
581            Ok(0) => Err(KboltError::SpaceNotFound {
582                name: old.to_string(),
583            }
584            .into()),
585            Ok(_) => {
586                if let Err(rename_err) = self.rename_space_artifacts(old, new) {
587                    let rollback = self
588                        .db
589                        .lock()
590                        .map_err(|_| CoreError::poisoned("database"))?
591                        .execute(
592                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
593                            params![old, new],
594                        );
595
596                    if let Err(rollback_err) = rollback {
597                        return Err(CoreError::Internal(format!(
598                            "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
599                        )));
600                    }
601
602                    return Err(rename_err);
603                }
604
605                self.unload_space(old)?;
606                if let Err(open_err) = self.open_space(new) {
607                    let _ = self.rename_space_artifacts(new, old);
608                    let _ = self
609                        .db
610                        .lock()
611                        .map_err(|_| CoreError::poisoned("database"))?
612                        .execute(
613                            "UPDATE spaces SET name = ?1 WHERE name = ?2",
614                            params![old, new],
615                        );
616                    let _ = self.open_space(old);
617                    return Err(open_err);
618                }
619
620                Ok(())
621            }
622            Err(Error::SqliteFailure(sqlite_err, _))
623                if sqlite_err.code == ErrorCode::ConstraintViolation =>
624            {
625                Err(KboltError::SpaceAlreadyExists {
626                    name: new.to_string(),
627                }
628                .into())
629            }
630            Err(err) => Err(err.into()),
631        }
632    }
633
634    pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
635        let conn = self
636            .db
637            .lock()
638            .map_err(|_| CoreError::poisoned("database"))?;
639
640        let updated = conn.execute(
641            "UPDATE spaces SET description = ?1 WHERE name = ?2",
642            params![description, name],
643        )?;
644
645        if updated == 0 {
646            return Err(KboltError::SpaceNotFound {
647                name: name.to_string(),
648            }
649            .into());
650        }
651
652        Ok(())
653    }
654
655    pub fn create_collection(
656        &self,
657        space_id: i64,
658        name: &str,
659        path: &Path,
660        description: Option<&str>,
661        extensions: Option<&[String]>,
662    ) -> Result<i64> {
663        let conn = self
664            .db
665            .lock()
666            .map_err(|_| CoreError::poisoned("database"))?;
667
668        let space_name = lookup_space_name(&conn, space_id)?;
669        let extensions_json = serialize_extensions(extensions)?;
670        let result = conn.execute(
671            "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
672             VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
673            params![space_id, name, path.to_string_lossy(), description, extensions_json],
674        );
675
676        match result {
677            Ok(_) => Ok(conn.last_insert_rowid()),
678            Err(Error::SqliteFailure(sqlite_err, _))
679                if sqlite_err.code == ErrorCode::ConstraintViolation =>
680            {
681                Err(KboltError::CollectionAlreadyExists {
682                    name: name.to_string(),
683                    space: space_name,
684                }
685                .into())
686            }
687            Err(err) => Err(err.into()),
688        }
689    }
690
691    pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
692        let conn = self
693            .db
694            .lock()
695            .map_err(|_| CoreError::poisoned("database"))?;
696        let mut stmt = conn.prepare(
697            "SELECT id, space_id, name, path, description, extensions, created, updated
698             FROM collections
699             WHERE space_id = ?1 AND name = ?2",
700        )?;
701
702        let result = stmt.query_row(params![space_id, name], decode_collection_row);
703        match result {
704            Ok(row) => Ok(row),
705            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
706                name: name.to_string(),
707            }
708            .into()),
709            Err(err) => Err(err.into()),
710        }
711    }
712
713    pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
714        let conn = self
715            .db
716            .lock()
717            .map_err(|_| CoreError::poisoned("database"))?;
718        let mut stmt = conn.prepare(
719            "SELECT id, space_id, name, path, description, extensions, created, updated
720             FROM collections
721             WHERE id = ?1",
722        )?;
723
724        let result = stmt.query_row(params![id], decode_collection_row);
725        match result {
726            Ok(row) => Ok(row),
727            Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
728                name: format!("id={id}"),
729            }
730            .into()),
731            Err(err) => Err(err.into()),
732        }
733    }
734
735    pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
736        let conn = self
737            .db
738            .lock()
739            .map_err(|_| CoreError::poisoned("database"))?;
740
741        let (sql, params): (&str, Vec<i64>) = match space_id {
742            Some(id) => (
743                "SELECT id, space_id, name, path, description, extensions, created, updated
744                 FROM collections
745                 WHERE space_id = ?1
746                 ORDER BY name ASC",
747                vec![id],
748            ),
749            None => (
750                "SELECT id, space_id, name, path, description, extensions, created, updated
751                 FROM collections
752                 ORDER BY space_id ASC, name ASC",
753                Vec::new(),
754            ),
755        };
756
757        let mut stmt = conn.prepare(sql)?;
758        let rows = if params.is_empty() {
759            stmt.query_map([], decode_collection_row)?
760        } else {
761            stmt.query_map(params![params[0]], decode_collection_row)?
762        };
763        let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
764        Ok(collections)
765    }
766
767    pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
768        let conn = self
769            .db
770            .lock()
771            .map_err(|_| CoreError::poisoned("database"))?;
772        let _space_name = lookup_space_name(&conn, space_id)?;
773
774        query_count(
775            &conn,
776            "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
777            params![space_id],
778        )
779    }
780
781    pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
782        let conn = self
783            .db
784            .lock()
785            .map_err(|_| CoreError::poisoned("database"))?;
786        let _space_name = lookup_space_name(&conn, space_id)?;
787
788        let deleted = conn.execute(
789            "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
790            params![space_id, name],
791        )?;
792
793        if deleted == 0 {
794            return Err(KboltError::CollectionNotFound {
795                name: name.to_string(),
796            }
797            .into());
798        }
799
800        Ok(())
801    }
802
803    pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
804        let conn = self
805            .db
806            .lock()
807            .map_err(|_| CoreError::poisoned("database"))?;
808        let space_name = lookup_space_name(&conn, space_id)?;
809        let result = conn.execute(
810            "UPDATE collections
811             SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
812             WHERE space_id = ?2 AND name = ?3",
813            params![new, space_id, old],
814        );
815
816        match result {
817            Ok(0) => Err(KboltError::CollectionNotFound {
818                name: old.to_string(),
819            }
820            .into()),
821            Ok(_) => Ok(()),
822            Err(Error::SqliteFailure(sqlite_err, _))
823                if sqlite_err.code == ErrorCode::ConstraintViolation =>
824            {
825                Err(KboltError::CollectionAlreadyExists {
826                    name: new.to_string(),
827                    space: space_name,
828                }
829                .into())
830            }
831            Err(err) => Err(err.into()),
832        }
833    }
834
835    pub fn update_collection_description(
836        &self,
837        space_id: i64,
838        name: &str,
839        desc: &str,
840    ) -> Result<()> {
841        let conn = self
842            .db
843            .lock()
844            .map_err(|_| CoreError::poisoned("database"))?;
845        let _space_name = lookup_space_name(&conn, space_id)?;
846
847        let updated = conn.execute(
848            "UPDATE collections
849             SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
850             WHERE space_id = ?2 AND name = ?3",
851            params![desc, space_id, name],
852        )?;
853
854        if updated == 0 {
855            return Err(KboltError::CollectionNotFound {
856                name: name.to_string(),
857            }
858            .into());
859        }
860
861        Ok(())
862    }
863
864    pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
865        let conn = self
866            .db
867            .lock()
868            .map_err(|_| CoreError::poisoned("database"))?;
869
870        let updated = conn.execute(
871            "UPDATE collections
872             SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
873             WHERE id = ?1",
874            params![collection_id],
875        )?;
876
877        if updated == 0 {
878            return Err(KboltError::CollectionNotFound {
879                name: format!("id={collection_id}"),
880            }
881            .into());
882        }
883
884        Ok(())
885    }
886
887    pub fn upsert_document(
888        &self,
889        collection_id: i64,
890        path: &str,
891        title: Option<&str>,
892        hash: &str,
893        modified: &str,
894    ) -> Result<i64> {
895        let conn = self
896            .db
897            .lock()
898            .map_err(|_| CoreError::poisoned("database"))?;
899        let _collection_name = lookup_collection_name(&conn, collection_id)?;
900
901        let id = conn.query_row(
902            "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
903             VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
904             ON CONFLICT(collection_id, path) DO UPDATE SET
905                 title = excluded.title,
906                 hash = excluded.hash,
907                 modified = excluded.modified,
908                 active = 1,
909                 deactivated_at = NULL,
910                 fts_dirty = 1
911             RETURNING id",
912            params![
913                collection_id,
914                path,
915                normalized_title(title).unwrap_or(""),
916                hash,
917                modified
918            ],
919            |row| row.get(0),
920        )?;
921        Ok(id)
922    }
923
924    pub fn get_document_by_path(
925        &self,
926        collection_id: i64,
927        path: &str,
928    ) -> Result<Option<DocumentRow>> {
929        let conn = self
930            .db
931            .lock()
932            .map_err(|_| CoreError::poisoned("database"))?;
933        let _collection_name = lookup_collection_name(&conn, collection_id)?;
934        let mut stmt = conn.prepare(
935            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
936             FROM documents
937             WHERE collection_id = ?1 AND path = ?2",
938        )?;
939
940        let result = stmt.query_row(params![collection_id, path], decode_document_row);
941        match result {
942            Ok(row) => Ok(Some(row)),
943            Err(Error::QueryReturnedNoRows) => Ok(None),
944            Err(err) => Err(err.into()),
945        }
946    }
947
948    pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
949        let conn = self
950            .db
951            .lock()
952            .map_err(|_| CoreError::poisoned("database"))?;
953        let mut stmt = conn.prepare(
954            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
955             FROM documents
956             WHERE id = ?1",
957        )?;
958
959        let result = stmt.query_row(params![id], decode_document_row);
960        match result {
961            Ok(row) => Ok(row),
962            Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
963                path: format!("id={id}"),
964            }
965            .into()),
966            Err(err) => Err(err.into()),
967        }
968    }
969
970    pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
971        if ids.is_empty() {
972            return Ok(Vec::new());
973        }
974
975        let conn = self
976            .db
977            .lock()
978            .map_err(|_| CoreError::poisoned("database"))?;
979
980        let placeholders = vec!["?"; ids.len()].join(", ");
981        let sql = format!(
982            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
983             FROM documents
984             WHERE id IN ({placeholders})"
985        );
986        let mut stmt = conn.prepare(&sql)?;
987        let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
988        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
989        Ok(docs)
990    }
991
992    pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
993        let conn = self
994            .db
995            .lock()
996            .map_err(|_| CoreError::poisoned("database"))?;
997
998        let updated = conn.execute(
999            "UPDATE documents
1000             SET modified = ?1,
1001                 active = 1,
1002                 deactivated_at = NULL
1003             WHERE id = ?2",
1004            params![modified, doc_id],
1005        )?;
1006
1007        if updated == 0 {
1008            return Err(KboltError::DocumentNotFound {
1009                path: format!("id={doc_id}"),
1010            }
1011            .into());
1012        }
1013
1014        Ok(())
1015    }
1016
1017    pub fn list_documents(
1018        &self,
1019        collection_id: i64,
1020        active_only: bool,
1021    ) -> Result<Vec<DocumentRow>> {
1022        let conn = self
1023            .db
1024            .lock()
1025            .map_err(|_| CoreError::poisoned("database"))?;
1026        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1027        let active_only = i64::from(active_only);
1028        let mut stmt = conn.prepare(
1029            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1030             FROM documents
1031             WHERE collection_id = ?1
1032               AND (?2 = 0 OR active = 1)
1033             ORDER BY path ASC",
1034        )?;
1035        let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1036        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1037        Ok(docs)
1038    }
1039
1040    pub fn list_collection_file_rows(
1041        &self,
1042        collection_id: i64,
1043        active_only: bool,
1044    ) -> Result<Vec<FileListRow>> {
1045        let conn = self
1046            .db
1047            .lock()
1048            .map_err(|_| CoreError::poisoned("database"))?;
1049        let _collection_name = lookup_collection_name(&conn, collection_id)?;
1050        let active_only = i64::from(active_only);
1051        let mut stmt = conn.prepare(
1052            "SELECT d.id, d.path, d.title, d.hash, d.active,
1053                    COUNT(DISTINCT c.id) AS chunk_count,
1054                    COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1055             FROM documents d
1056             LEFT JOIN chunks c ON c.doc_id = d.id
1057             LEFT JOIN embeddings e ON e.chunk_id = c.id
1058             WHERE d.collection_id = ?1
1059               AND (?2 = 0 OR d.active = 1)
1060             GROUP BY d.id, d.path, d.title, d.hash, d.active
1061             ORDER BY d.path ASC",
1062        )?;
1063        let rows = stmt.query_map(params![collection_id, active_only], |row| {
1064            let chunk_count: i64 = row.get(5)?;
1065            let embedded_chunk_count: i64 = row.get(6)?;
1066            Ok(FileListRow {
1067                doc_id: row.get(0)?,
1068                path: row.get(1)?,
1069                title: decode_optional_title(row.get::<_, String>(2)?),
1070                hash: row.get(3)?,
1071                active: row.get::<_, i64>(4)? != 0,
1072                chunk_count: chunk_count as usize,
1073                embedded_chunk_count: embedded_chunk_count as usize,
1074            })
1075        })?;
1076        let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1077        Ok(files)
1078    }
1079
1080    pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1081        let conn = self
1082            .db
1083            .lock()
1084            .map_err(|_| CoreError::poisoned("database"))?;
1085
1086        let pattern = format!("{prefix}%");
1087        let mut stmt = conn.prepare(
1088            "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1089             FROM documents
1090             WHERE hash LIKE ?1
1091             ORDER BY id ASC",
1092        )?;
1093        let rows = stmt.query_map(params![pattern], decode_document_row)?;
1094        let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1095        Ok(docs)
1096    }
1097
1098    pub fn document_hashes_by_prefix_in_space(
1099        &self,
1100        prefix: &str,
1101        space: &str,
1102    ) -> Result<Vec<String>> {
1103        let conn = self
1104            .db
1105            .lock()
1106            .map_err(|_| CoreError::poisoned("database"))?;
1107
1108        let pattern = format!("{prefix}%");
1109        let mut stmt = conn.prepare(
1110            "SELECT d.hash
1111             FROM documents d
1112             JOIN collections c ON d.collection_id = c.id
1113             JOIN spaces s ON c.space_id = s.id
1114             WHERE s.name = ?1 AND d.hash LIKE ?2
1115             ORDER BY d.id ASC",
1116        )?;
1117        let rows = stmt.query_map(params![space, pattern], |row| row.get(0))?;
1118        let hashes = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1119        Ok(hashes)
1120    }
1121
1122    pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1123        let conn = self
1124            .db
1125            .lock()
1126            .map_err(|_| CoreError::poisoned("database"))?;
1127
1128        let updated = conn.execute(
1129            "UPDATE documents
1130             SET active = 0,
1131                 deactivated_at = CASE
1132                    WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1133                    ELSE deactivated_at
1134                 END
1135             WHERE id = ?1",
1136            params![doc_id],
1137        )?;
1138
1139        if updated == 0 {
1140            return Err(KboltError::DocumentNotFound {
1141                path: format!("id={doc_id}"),
1142            }
1143            .into());
1144        }
1145
1146        Ok(())
1147    }
1148
1149    pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1150        let conn = self
1151            .db
1152            .lock()
1153            .map_err(|_| CoreError::poisoned("database"))?;
1154
1155        let updated = conn.execute(
1156            "UPDATE documents
1157             SET active = 1, deactivated_at = NULL
1158             WHERE id = ?1",
1159            params![doc_id],
1160        )?;
1161
1162        if updated == 0 {
1163            return Err(KboltError::DocumentNotFound {
1164                path: format!("id={doc_id}"),
1165            }
1166            .into());
1167        }
1168
1169        Ok(())
1170    }
1171
1172    pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1173        let reaped = self.list_reapable_documents(older_than_days)?;
1174        let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1175        self.delete_documents(&doc_ids)?;
1176        Ok(doc_ids)
1177    }
1178
1179    pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1180        self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1181    }
1182
1183    pub fn list_reapable_documents_in_space(
1184        &self,
1185        older_than_days: u32,
1186        space_id: i64,
1187    ) -> Result<Vec<ReapableDocument>> {
1188        self.list_reapable_documents_filtered(
1189            older_than_days,
1190            " AND c.space_id = ?",
1191            vec![SqlValue::Integer(space_id)],
1192        )
1193    }
1194
1195    pub fn list_reapable_documents_in_collections(
1196        &self,
1197        older_than_days: u32,
1198        collection_ids: &[i64],
1199    ) -> Result<Vec<ReapableDocument>> {
1200        if collection_ids.is_empty() {
1201            return Ok(Vec::new());
1202        }
1203
1204        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1205        let clause = format!(" AND d.collection_id IN ({placeholders})");
1206        let params = collection_ids
1207            .iter()
1208            .map(|id| SqlValue::Integer(*id))
1209            .collect::<Vec<_>>();
1210        self.list_reapable_documents_filtered(older_than_days, &clause, params)
1211    }
1212
1213    fn list_reapable_documents_filtered(
1214        &self,
1215        older_than_days: u32,
1216        scope_clause: &str,
1217        scope_params: Vec<SqlValue>,
1218    ) -> Result<Vec<ReapableDocument>> {
1219        let conn = self
1220            .db
1221            .lock()
1222            .map_err(|_| CoreError::poisoned("database"))?;
1223
1224        let modifier = format!("-{} days", older_than_days);
1225        let sql = format!(
1226            "SELECT d.id, s.name
1227             FROM documents d
1228             JOIN collections c ON c.id = d.collection_id
1229             JOIN spaces s ON s.id = c.space_id
1230             WHERE d.active = 0
1231               AND d.deactivated_at IS NOT NULL
1232               AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1233             ORDER BY d.id ASC"
1234        );
1235        let mut stmt = conn.prepare(&sql)?;
1236        let mut params = Vec::with_capacity(scope_params.len() + 1);
1237        params.push(SqlValue::Text(modifier));
1238        params.extend(scope_params);
1239        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1240            Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1241        })?;
1242        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1243        drop(stmt);
1244
1245        let mut documents = Vec::with_capacity(headers.len());
1246        for (doc_id, space_name) in headers {
1247            documents.push(ReapableDocument {
1248                doc_id,
1249                space_name,
1250                chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1251            });
1252        }
1253
1254        Ok(documents)
1255    }
1256
1257    pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1258        if doc_ids.is_empty() {
1259            return Ok(());
1260        }
1261
1262        let conn = self
1263            .db
1264            .lock()
1265            .map_err(|_| CoreError::poisoned("database"))?;
1266
1267        let placeholders = vec!["?"; doc_ids.len()].join(", ");
1268        let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1269        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1270        Ok(())
1271    }
1272
1273    pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1274        if chunks.is_empty() {
1275            return Ok(Vec::new());
1276        }
1277
1278        let conn = self
1279            .db
1280            .lock()
1281            .map_err(|_| CoreError::poisoned("database"))?;
1282        let _doc = lookup_document_id(&conn, doc_id)?;
1283
1284        let tx = conn.unchecked_transaction()?;
1285        let mut stmt = tx.prepare(
1286            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1287             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1288        )?;
1289
1290        let mut ids = Vec::with_capacity(chunks.len());
1291        for chunk in chunks {
1292            stmt.execute(params![
1293                doc_id,
1294                chunk.seq,
1295                chunk.offset as i64,
1296                chunk.length as i64,
1297                chunk.heading,
1298                chunk.kind.as_storage_kind(),
1299                chunk.retrieval_prefix.as_deref(),
1300            ])?;
1301            ids.push(tx.last_insert_rowid());
1302        }
1303        drop(stmt);
1304        tx.commit()?;
1305        Ok(ids)
1306    }
1307
1308    pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1309        let conn = self
1310            .db
1311            .lock()
1312            .map_err(|_| CoreError::poisoned("database"))?;
1313        let _doc = lookup_document_id(&conn, doc_id)?;
1314
1315        let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1316        let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1317        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1318
1319        conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1320        Ok(chunk_ids)
1321    }
1322
1323    pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1324        let conn = self
1325            .db
1326            .lock()
1327            .map_err(|_| CoreError::poisoned("database"))?;
1328        let _doc = lookup_document_id(&conn, doc_id)?;
1329
1330        let mut stmt = conn.prepare(
1331            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1332             FROM chunks
1333             WHERE doc_id = ?1
1334             ORDER BY seq ASC",
1335        )?;
1336        let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1337        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1338        Ok(chunks)
1339    }
1340
1341    pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1342        if doc_ids.is_empty() {
1343            return Ok(HashMap::new());
1344        }
1345
1346        let mut requested = doc_ids.to_vec();
1347        requested.sort_unstable();
1348        requested.dedup();
1349
1350        let conn = self
1351            .db
1352            .lock()
1353            .map_err(|_| CoreError::poisoned("database"))?;
1354
1355        let placeholders = vec!["?"; requested.len()].join(", ");
1356        let sql = format!(
1357            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1358             FROM chunks
1359             WHERE doc_id IN ({placeholders})
1360             ORDER BY doc_id ASC, seq ASC"
1361        );
1362        let mut stmt = conn.prepare(&sql)?;
1363        let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1364        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1365        for doc_id in requested {
1366            chunks_by_doc.insert(doc_id, Vec::new());
1367        }
1368        for row in rows {
1369            let chunk = row?;
1370            chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1371        }
1372
1373        Ok(chunks_by_doc)
1374    }
1375
1376    pub fn get_chunks_for_document_seq_ranges(
1377        &self,
1378        ranges: &[(i64, i32, i32)],
1379    ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1380        if ranges.is_empty() {
1381            return Ok(HashMap::new());
1382        }
1383
1384        let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1385        for (doc_id, min_seq, max_seq) in ranges {
1386            if min_seq > max_seq {
1387                return Err(CoreError::Internal(format!(
1388                    "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1389                )));
1390            }
1391
1392            ranges_by_doc
1393                .entry(*doc_id)
1394                .or_default()
1395                .push((*min_seq, *max_seq));
1396        }
1397
1398        for doc_ranges in ranges_by_doc.values_mut() {
1399            doc_ranges.sort_unstable();
1400            let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1401            for (min_seq, max_seq) in doc_ranges.drain(..) {
1402                if let Some((_, merged_max)) = merged.last_mut() {
1403                    if min_seq <= merged_max.saturating_add(1) {
1404                        *merged_max = (*merged_max).max(max_seq);
1405                        continue;
1406                    }
1407                }
1408                merged.push((min_seq, max_seq));
1409            }
1410            *doc_ranges = merged;
1411        }
1412
1413        let conn = self
1414            .db
1415            .lock()
1416            .map_err(|_| CoreError::poisoned("database"))?;
1417        let mut stmt = conn.prepare(
1418            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1419             FROM chunks
1420             WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1421             ORDER BY seq ASC",
1422        )?;
1423
1424        let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1425        for (doc_id, doc_ranges) in ranges_by_doc {
1426            chunks_by_doc.entry(doc_id).or_default();
1427            for (min_seq, max_seq) in doc_ranges {
1428                let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1429                for row in rows {
1430                    chunks_by_doc.entry(doc_id).or_default().push(row?);
1431                }
1432            }
1433        }
1434
1435        Ok(chunks_by_doc)
1436    }
1437
1438    pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1439        if chunk_ids.is_empty() {
1440            return Ok(Vec::new());
1441        }
1442
1443        let conn = self
1444            .db
1445            .lock()
1446            .map_err(|_| CoreError::poisoned("database"))?;
1447
1448        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1449        let sql = format!(
1450            "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1451             FROM chunks
1452             WHERE id IN ({placeholders})
1453             ORDER BY id ASC"
1454        );
1455        let mut stmt = conn.prepare(&sql)?;
1456        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1457        let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1458        Ok(chunks)
1459    }
1460
1461    pub fn get_chunks_with_documents(
1462        &self,
1463        chunk_ids: &[i64],
1464    ) -> Result<Vec<(ChunkRow, DocumentRow)>> {
1465        if chunk_ids.is_empty() {
1466            return Ok(Vec::new());
1467        }
1468
1469        let conn = self
1470            .db
1471            .lock()
1472            .map_err(|_| CoreError::poisoned("database"))?;
1473
1474        let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1475        let sql = format!(
1476            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
1477                    d.id, d.collection_id, d.path, d.title, d.hash, d.modified, d.active, d.deactivated_at, d.fts_dirty
1478             FROM chunks c
1479             JOIN documents d ON d.id = c.doc_id
1480             WHERE c.id IN ({placeholders})
1481             ORDER BY c.id ASC"
1482        );
1483        let mut stmt = conn.prepare(&sql)?;
1484        let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), |row| {
1485            Ok((
1486                decode_chunk_row_at(row, 0)?,
1487                decode_document_row_at(row, 8)?,
1488            ))
1489        })?;
1490        let rows = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1491        Ok(rows)
1492    }
1493
1494    pub fn get_active_search_scope_summary_in_collections(
1495        &self,
1496        collection_ids: &[i64],
1497    ) -> Result<SearchScopeSummary> {
1498        if collection_ids.is_empty() {
1499            return Ok(SearchScopeSummary {
1500                document_ids: Vec::new(),
1501                chunk_count: 0,
1502            });
1503        }
1504
1505        let conn = self
1506            .db
1507            .lock()
1508            .map_err(|_| CoreError::poisoned("database"))?;
1509
1510        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1511        let sql = format!(
1512            "SELECT d.id, COUNT(c.id)
1513             FROM chunks c
1514             JOIN documents d ON d.id = c.doc_id
1515             WHERE d.active = 1
1516               AND d.collection_id IN ({placeholders})
1517             GROUP BY d.id
1518             ORDER BY d.id ASC"
1519        );
1520        let mut stmt = conn.prepare(&sql)?;
1521        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1522            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1523        })?;
1524
1525        let mut document_ids = Vec::new();
1526        let mut chunk_count = 0usize;
1527        for row in rows {
1528            let (doc_id, doc_chunk_count) = row?;
1529            let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1530                CoreError::Internal(format!(
1531                    "active chunk count must be non-negative for document {doc_id}"
1532                ))
1533            })?;
1534            document_ids.push(doc_id);
1535            chunk_count = chunk_count.saturating_add(doc_chunk_count);
1536        }
1537
1538        Ok(SearchScopeSummary {
1539            document_ids,
1540            chunk_count,
1541        })
1542    }
1543
1544    pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1545        if collection_ids.is_empty() {
1546            return Ok(0);
1547        }
1548
1549        let conn = self
1550            .db
1551            .lock()
1552            .map_err(|_| CoreError::poisoned("database"))?;
1553
1554        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1555        let sql = format!(
1556            "SELECT COUNT(*)
1557             FROM chunks c
1558             JOIN documents d ON d.id = c.doc_id
1559             WHERE d.active = 1
1560               AND d.collection_id IN ({placeholders})"
1561        );
1562        query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1563    }
1564
1565    pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1566        if collection_ids.is_empty() {
1567            return Ok(false);
1568        }
1569
1570        let conn = self
1571            .db
1572            .lock()
1573            .map_err(|_| CoreError::poisoned("database"))?;
1574
1575        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1576        let sql = format!(
1577            "SELECT EXISTS(
1578                 SELECT 1
1579                 FROM documents
1580                 WHERE active = 0
1581                   AND collection_id IN ({placeholders})
1582             )"
1583        );
1584        let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1585            row.get(0)
1586        })?;
1587        Ok(exists != 0)
1588    }
1589
1590    pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1591        if collection_ids.is_empty() {
1592            return Ok(Vec::new());
1593        }
1594
1595        let conn = self
1596            .db
1597            .lock()
1598            .map_err(|_| CoreError::poisoned("database"))?;
1599
1600        let placeholders = vec!["?"; collection_ids.len()].join(", ");
1601        let sql = format!(
1602            "SELECT c.id
1603             FROM chunks c
1604             JOIN documents d ON d.id = c.doc_id
1605             WHERE d.active = 1
1606               AND d.collection_id IN ({placeholders})
1607             ORDER BY d.id ASC, c.seq ASC"
1608        );
1609        let mut stmt = conn.prepare(&sql)?;
1610        let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1611        let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1612        Ok(chunk_ids)
1613    }
1614
1615    pub fn replace_document_generation(
1616        &self,
1617        replacement: DocumentGenerationReplace<'_>,
1618    ) -> Result<DocumentGenerationReplaceResult> {
1619        for chunk in replacement.chunks {
1620            validate_text_span(
1621                replacement.text,
1622                chunk.offset,
1623                chunk.length,
1624                &format!("chunk seq {}", chunk.seq),
1625            )?;
1626        }
1627
1628        let conn = self
1629            .db
1630            .lock()
1631            .map_err(|_| CoreError::poisoned("database"))?;
1632        let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1633
1634        let tx = conn.unchecked_transaction()?;
1635        let doc_id: i64 = tx.query_row(
1636            "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
1637             VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
1638             ON CONFLICT(collection_id, path) DO UPDATE SET
1639                 title = excluded.title,
1640                 hash = excluded.hash,
1641                 modified = excluded.modified,
1642                 active = 1,
1643                 deactivated_at = NULL,
1644                 fts_dirty = 1
1645             RETURNING id",
1646            params![
1647                replacement.collection_id,
1648                replacement.path,
1649                normalized_title(replacement.title).unwrap_or(""),
1650                replacement.hash,
1651                replacement.modified
1652            ],
1653            |row| row.get(0),
1654        )?;
1655
1656        let old_chunk_ids = {
1657            let mut stmt =
1658                tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1659            let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1660            rows.collect::<std::result::Result<Vec<_>, _>>()?
1661        };
1662
1663        tx.execute(
1664            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1665             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1666             ON CONFLICT(doc_id) DO UPDATE SET
1667                 extractor_key = excluded.extractor_key,
1668                 source_hash = excluded.source_hash,
1669                 text_hash = excluded.text_hash,
1670                 generation_key = excluded.generation_key,
1671                 text = excluded.text,
1672                 created = excluded.created",
1673            params![
1674                doc_id,
1675                replacement.extractor_key,
1676                replacement.source_hash,
1677                replacement.text_hash,
1678                replacement.generation_key,
1679                replacement.text
1680            ],
1681        )?;
1682
1683        tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1684
1685        let mut stmt = tx.prepare(
1686            "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1687             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1688        )?;
1689        let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1690        for chunk in replacement.chunks {
1691            stmt.execute(params![
1692                doc_id,
1693                chunk.seq,
1694                chunk.offset as i64,
1695                chunk.length as i64,
1696                chunk.heading,
1697                chunk.kind.as_storage_kind(),
1698                chunk.retrieval_prefix.as_deref(),
1699            ])?;
1700            chunk_ids.push(tx.last_insert_rowid());
1701        }
1702        drop(stmt);
1703
1704        tx.commit()?;
1705        Ok(DocumentGenerationReplaceResult {
1706            doc_id,
1707            old_chunk_ids,
1708            chunk_ids,
1709        })
1710    }
1711
1712    pub fn put_document_text(
1713        &self,
1714        doc_id: i64,
1715        extractor_key: &str,
1716        source_hash: &str,
1717        text_hash: &str,
1718        generation_key: &str,
1719        text: &str,
1720    ) -> Result<()> {
1721        let conn = self
1722            .db
1723            .lock()
1724            .map_err(|_| CoreError::poisoned("database"))?;
1725        let _doc = lookup_document_id(&conn, doc_id)?;
1726
1727        conn.execute(
1728            "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1729             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1730             ON CONFLICT(doc_id) DO UPDATE SET
1731                 extractor_key = excluded.extractor_key,
1732                 source_hash = excluded.source_hash,
1733                 text_hash = excluded.text_hash,
1734                 generation_key = excluded.generation_key,
1735                 text = excluded.text,
1736                 created = excluded.created",
1737            params![
1738                doc_id,
1739                extractor_key,
1740                source_hash,
1741                text_hash,
1742                generation_key,
1743                text
1744            ],
1745        )?;
1746        Ok(())
1747    }
1748
1749    pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1750        let conn = self
1751            .db
1752            .lock()
1753            .map_err(|_| CoreError::poisoned("database"))?;
1754        let _doc = lookup_document_id(&conn, doc_id)?;
1755
1756        let result = conn.query_row(
1757            "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1758             FROM document_texts
1759             WHERE doc_id = ?1",
1760            params![doc_id],
1761            decode_document_text_row,
1762        );
1763        match result {
1764            Ok(row) => Ok(row),
1765            Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1766            Err(err) => Err(err.into()),
1767        }
1768    }
1769
1770    pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1771        if doc_ids.is_empty() {
1772            return Ok(HashMap::new());
1773        }
1774
1775        let mut requested = doc_ids.to_vec();
1776        requested.sort_unstable();
1777        requested.dedup();
1778
1779        let text_by_doc = self.get_existing_document_texts(&requested)?;
1780        for doc_id in requested {
1781            if !text_by_doc.contains_key(&doc_id) {
1782                return Err(missing_document_text_error(doc_id));
1783            }
1784        }
1785
1786        Ok(text_by_doc)
1787    }
1788
1789    pub fn get_existing_document_texts(
1790        &self,
1791        doc_ids: &[i64],
1792    ) -> Result<HashMap<i64, DocumentTextRow>> {
1793        if doc_ids.is_empty() {
1794            return Ok(HashMap::new());
1795        }
1796
1797        let mut requested = doc_ids.to_vec();
1798        requested.sort_unstable();
1799        requested.dedup();
1800
1801        let conn = self
1802            .db
1803            .lock()
1804            .map_err(|_| CoreError::poisoned("database"))?;
1805        let mut text_by_doc = HashMap::new();
1806        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1807            let placeholders = vec!["?"; batch.len()].join(", ");
1808            let sql = format!(
1809                "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1810                 FROM document_texts
1811                 WHERE doc_id IN ({placeholders})
1812                 ORDER BY doc_id ASC"
1813            );
1814            let mut stmt = conn.prepare(&sql)?;
1815            let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1816            for row in rows {
1817                let row = row?;
1818                text_by_doc.insert(row.doc_id, row);
1819            }
1820        }
1821
1822        Ok(text_by_doc)
1823    }
1824
1825    pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1826        let conn = self
1827            .db
1828            .lock()
1829            .map_err(|_| CoreError::poisoned("database"))?;
1830        let exists: i64 = conn.query_row(
1831            "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1832            params![doc_id],
1833            |row| row.get(0),
1834        )?;
1835        Ok(exists != 0)
1836    }
1837
1838    pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1839        let conn = self
1840            .db
1841            .lock()
1842            .map_err(|_| CoreError::poisoned("database"))?;
1843        let exists: i64 = conn.query_row(
1844            "SELECT EXISTS(
1845                SELECT 1 FROM document_texts
1846                WHERE doc_id = ?1 AND generation_key = ?2
1847            )",
1848            params![doc_id, generation_key],
1849            |row| row.get(0),
1850        )?;
1851        Ok(exists != 0)
1852    }
1853
1854    pub fn get_document_text_generation_keys(
1855        &self,
1856        doc_ids: &[i64],
1857    ) -> Result<HashMap<i64, String>> {
1858        if doc_ids.is_empty() {
1859            return Ok(HashMap::new());
1860        }
1861
1862        let mut requested = doc_ids.to_vec();
1863        requested.sort_unstable();
1864        requested.dedup();
1865
1866        let conn = self
1867            .db
1868            .lock()
1869            .map_err(|_| CoreError::poisoned("database"))?;
1870        let mut generation_by_doc = HashMap::with_capacity(requested.len());
1871        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1872            let placeholders = vec!["?"; batch.len()].join(", ");
1873            let sql = format!(
1874                "SELECT doc_id, generation_key
1875                 FROM document_texts
1876                 WHERE doc_id IN ({placeholders})"
1877            );
1878            let mut stmt = conn.prepare(&sql)?;
1879            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1880                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1881            })?;
1882            for row in rows {
1883                let (doc_id, generation_key) = row?;
1884                generation_by_doc.insert(doc_id, generation_key);
1885            }
1886        }
1887
1888        Ok(generation_by_doc)
1889    }
1890
1891    pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1892        if doc_ids.is_empty() {
1893            return Ok(HashMap::new());
1894        }
1895
1896        let mut requested = doc_ids.to_vec();
1897        requested.sort_unstable();
1898        requested.dedup();
1899
1900        let conn = self
1901            .db
1902            .lock()
1903            .map_err(|_| CoreError::poisoned("database"))?;
1904        let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1905        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1906            let placeholders = vec!["?"; batch.len()].join(", ");
1907            let sql = format!(
1908                "SELECT doc_id, extractor_key
1909                 FROM document_texts
1910                 WHERE doc_id IN ({placeholders})"
1911            );
1912            let mut stmt = conn.prepare(&sql)?;
1913            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1914                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1915            })?;
1916            for row in rows {
1917                let (doc_id, extractor_key) = row?;
1918                extractor_by_doc.insert(doc_id, extractor_key);
1919            }
1920        }
1921
1922        for doc_id in requested {
1923            if !extractor_by_doc.contains_key(&doc_id) {
1924                return Err(missing_document_text_error(doc_id));
1925            }
1926        }
1927
1928        Ok(extractor_by_doc)
1929    }
1930
1931    pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1932        if chunk_ids.is_empty() {
1933            return Ok(HashMap::new());
1934        }
1935
1936        let mut requested = chunk_ids.to_vec();
1937        requested.sort_unstable();
1938        requested.dedup();
1939
1940        let conn = self
1941            .db
1942            .lock()
1943            .map_err(|_| CoreError::poisoned("database"))?;
1944        let mut text_by_chunk = HashMap::with_capacity(requested.len());
1945        for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1946            let placeholders = vec!["?"; batch.len()].join(", ");
1947            let sql = format!(
1948                "SELECT c.id,
1949                        c.offset,
1950                        c.length,
1951                        length(CAST(dt.text AS BLOB)),
1952                        substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1953                        substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1954                        substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1955                 FROM chunks c
1956                 JOIN document_texts dt ON dt.doc_id = c.doc_id
1957                 WHERE c.id IN ({placeholders})"
1958            );
1959            let mut stmt = conn.prepare(&sql)?;
1960            let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1961                Ok((
1962                    row.get::<_, i64>(0)?,
1963                    row.get::<_, i64>(1)?,
1964                    row.get::<_, i64>(2)?,
1965                    row.get::<_, i64>(3)?,
1966                    row.get::<_, Vec<u8>>(4)?,
1967                    row.get::<_, Vec<u8>>(5)?,
1968                    row.get::<_, Vec<u8>>(6)?,
1969                ))
1970            })?;
1971            for row in rows {
1972                let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1973                let text = canonical_chunk_slice_from_bytes(
1974                    chunk_id,
1975                    offset,
1976                    length,
1977                    document_len,
1978                    start_byte,
1979                    end_byte,
1980                    bytes,
1981                )?;
1982                text_by_chunk.insert(chunk_id, text);
1983            }
1984        }
1985
1986        for chunk_id in requested {
1987            if !text_by_chunk.contains_key(&chunk_id) {
1988                return Err(CoreError::Internal(format!(
1989                    "canonical text missing for chunk {chunk_id}"
1990                )));
1991            }
1992        }
1993
1994        Ok(text_by_chunk)
1995    }
1996
1997    pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
1998        let conn = self
1999            .db
2000            .lock()
2001            .map_err(|_| CoreError::poisoned("database"))?;
2002        let result = conn.query_row(
2003            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
2004             FROM chunks c
2005             JOIN document_texts dt ON dt.doc_id = c.doc_id
2006             WHERE c.id = ?1",
2007            params![chunk_id],
2008            |row| {
2009                let chunk = decode_chunk_row(row)?;
2010                let extractor_key: String = row.get(8)?;
2011                let document_text: String = row.get(9)?;
2012                Ok((chunk, extractor_key, document_text))
2013            },
2014        );
2015        let (chunk, extractor_key, document_text) = match result {
2016            Ok(row) => row,
2017            Err(Error::QueryReturnedNoRows) => {
2018                let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
2019                return Err(missing_document_text_error(doc_id));
2020            }
2021            Err(err) => return Err(err.into()),
2022        };
2023        let text = chunk_text_from_canonical(&document_text, &chunk)?;
2024        Ok(ChunkTextRow {
2025            chunk,
2026            extractor_key,
2027            text,
2028        })
2029    }
2030
2031    pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2032        if entries.is_empty() {
2033            return Ok(());
2034        }
2035
2036        let conn = self
2037            .db
2038            .lock()
2039            .map_err(|_| CoreError::poisoned("database"))?;
2040
2041        let tx = conn.unchecked_transaction()?;
2042        let mut stmt = tx.prepare(
2043            "INSERT INTO embeddings (chunk_id, model, embedded_at)
2044             VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2045             ON CONFLICT(chunk_id, model) DO UPDATE SET
2046               embedded_at = excluded.embedded_at",
2047        )?;
2048
2049        for (chunk_id, model) in entries {
2050            stmt.execute(params![chunk_id, model])?;
2051        }
2052
2053        drop(stmt);
2054        tx.commit()?;
2055        Ok(())
2056    }
2057
2058    pub fn get_unembedded_chunks(
2059        &self,
2060        model: &str,
2061        after_chunk_id: i64,
2062        limit: usize,
2063    ) -> Result<Vec<EmbedRecord>> {
2064        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2065    }
2066
2067    pub fn get_unembedded_chunks_in_space(
2068        &self,
2069        model: &str,
2070        space_id: i64,
2071        after_chunk_id: i64,
2072        limit: usize,
2073    ) -> Result<Vec<EmbedRecord>> {
2074        self.get_unembedded_chunks_filtered(
2075            model,
2076            after_chunk_id,
2077            limit,
2078            " AND col.space_id = ?",
2079            vec![SqlValue::Integer(space_id)],
2080        )
2081    }
2082
2083    pub fn get_unembedded_chunks_in_collections(
2084        &self,
2085        model: &str,
2086        collection_ids: &[i64],
2087        after_chunk_id: i64,
2088        limit: usize,
2089    ) -> Result<Vec<EmbedRecord>> {
2090        if collection_ids.is_empty() {
2091            return Ok(Vec::new());
2092        }
2093
2094        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2095        let clause = format!(" AND d.collection_id IN ({placeholders})");
2096        let params = collection_ids
2097            .iter()
2098            .map(|id| SqlValue::Integer(*id))
2099            .collect::<Vec<_>>();
2100        self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2101    }
2102
2103    fn get_unembedded_chunks_filtered(
2104        &self,
2105        model: &str,
2106        after_chunk_id: i64,
2107        limit: usize,
2108        scope_clause: &str,
2109        scope_params: Vec<SqlValue>,
2110    ) -> Result<Vec<EmbedRecord>> {
2111        let conn = self
2112            .db
2113            .lock()
2114            .map_err(|_| CoreError::poisoned("database"))?;
2115        let sql_limit = i64::try_from(limit)
2116            .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2117
2118        let sql = format!(
2119            "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2120                    d.path, d.title, col.path, s.name
2121             FROM chunks c
2122             JOIN documents d ON d.id = c.doc_id
2123             JOIN collections col ON col.id = d.collection_id
2124             JOIN spaces s ON s.id = col.space_id
2125             LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2126             WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2127             ORDER BY c.id ASC
2128             LIMIT ?"
2129        );
2130        let mut stmt = conn.prepare(&sql)?;
2131        let mut params = Vec::with_capacity(scope_params.len() + 3);
2132        params.push(SqlValue::Text(model.to_string()));
2133        params.push(SqlValue::Integer(after_chunk_id));
2134        params.extend(scope_params);
2135        params.push(SqlValue::Integer(sql_limit));
2136        let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2137            Ok(EmbedRecord {
2138                chunk: decode_chunk_row(row)?,
2139                doc_path: row.get(8)?,
2140                doc_title: decode_optional_title(row.get::<_, String>(9)?),
2141                collection_path: PathBuf::from(row.get::<_, String>(10)?),
2142                space_name: row.get(11)?,
2143            })
2144        })?;
2145
2146        let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2147        Ok(records)
2148    }
2149
2150    pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2151        let conn = self
2152            .db
2153            .lock()
2154            .map_err(|_| CoreError::poisoned("database"))?;
2155
2156        let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2157        Ok(deleted)
2158    }
2159
2160    pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2161        let conn = self
2162            .db
2163            .lock()
2164            .map_err(|_| CoreError::poisoned("database"))?;
2165        let _space_name = lookup_space_name(&conn, space_id)?;
2166
2167        let deleted = conn.execute(
2168            "DELETE FROM embeddings
2169             WHERE chunk_id IN (
2170                 SELECT c.id
2171                 FROM chunks c
2172                 JOIN documents d ON d.id = c.doc_id
2173                 JOIN collections col ON col.id = d.collection_id
2174                 WHERE col.space_id = ?1
2175             )",
2176            params![space_id],
2177        )?;
2178        Ok(deleted)
2179    }
2180
2181    pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2182        let conn = self
2183            .db
2184            .lock()
2185            .map_err(|_| CoreError::poisoned("database"))?;
2186        let _space_name = lookup_space_name(&conn, space_id)?;
2187
2188        let mut stmt = conn.prepare(
2189            "SELECT DISTINCT e.model
2190             FROM embeddings e
2191             JOIN chunks c ON c.id = e.chunk_id
2192             JOIN documents d ON d.id = c.doc_id
2193             JOIN collections col ON col.id = d.collection_id
2194             WHERE col.space_id = ?1
2195             ORDER BY e.model ASC",
2196        )?;
2197        let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2198        let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2199        Ok(models)
2200    }
2201
2202    pub fn count_embeddings(&self) -> Result<usize> {
2203        let conn = self
2204            .db
2205            .lock()
2206            .map_err(|_| CoreError::poisoned("database"))?;
2207
2208        let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2209        Ok(count as usize)
2210    }
2211
2212    pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2213        if entries.is_empty() {
2214            return Ok(());
2215        }
2216
2217        let space_indexes = self.get_space_indexes(space)?;
2218        with_tantivy_writer(&space_indexes, |writer| {
2219            for entry in entries {
2220                let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2221                    CoreError::Internal(format!(
2222                        "chunk_id must be non-negative for tantivy indexing: {}",
2223                        entry.chunk_id
2224                    ))
2225                })?;
2226                let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2227                    CoreError::Internal(format!(
2228                        "doc_id must be non-negative for tantivy indexing: {}",
2229                        entry.doc_id
2230                    ))
2231                })?;
2232
2233                let mut doc = TantivyDocument::default();
2234                doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2235                doc.add_u64(space_indexes.fields.doc_id, doc_id);
2236                doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2237                if let Some(title) = &entry.title {
2238                    doc.add_text(space_indexes.fields.title, title);
2239                }
2240                if let Some(heading) = &entry.heading {
2241                    doc.add_text(space_indexes.fields.heading, heading);
2242                }
2243                doc.add_text(space_indexes.fields.body, &entry.body);
2244                writer.add_document(doc)?;
2245            }
2246            Ok(())
2247        })
2248    }
2249
2250    pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2251        if chunk_ids.is_empty() {
2252            return Ok(());
2253        }
2254
2255        let space_indexes = self.get_space_indexes(space)?;
2256        with_tantivy_writer(&space_indexes, |writer| {
2257            for chunk_id in chunk_ids {
2258                let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2259                    CoreError::Internal(format!(
2260                        "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2261                    ))
2262                })?;
2263                writer.delete_term(Term::from_field_u64(
2264                    space_indexes.fields.chunk_id,
2265                    chunk_key,
2266                ));
2267            }
2268
2269            Ok(())
2270        })
2271    }
2272
2273    pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2274        let space_indexes = self.get_space_indexes(space)?;
2275        with_tantivy_writer(&space_indexes, |writer| {
2276            let doc_key = u64::try_from(doc_id).map_err(|_| {
2277                CoreError::Internal(format!(
2278                    "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2279                ))
2280            })?;
2281            writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2282            Ok(())
2283        })
2284    }
2285
2286    pub fn query_bm25(
2287        &self,
2288        space: &str,
2289        query: &str,
2290        fields: &[(&str, f32)],
2291        limit: usize,
2292    ) -> Result<Vec<BM25Hit>> {
2293        self.query_bm25_filtered(space, query, fields, None, limit, true)
2294    }
2295
2296    pub(crate) fn query_bm25_cached(
2297        &self,
2298        space: &str,
2299        query: &str,
2300        fields: &[(&str, f32)],
2301        limit: usize,
2302    ) -> Result<Vec<BM25Hit>> {
2303        self.query_bm25_filtered(space, query, fields, None, limit, false)
2304    }
2305
2306    pub fn query_bm25_in_documents(
2307        &self,
2308        space: &str,
2309        query: &str,
2310        fields: &[(&str, f32)],
2311        document_ids: &[i64],
2312        limit: usize,
2313    ) -> Result<Vec<BM25Hit>> {
2314        if document_ids.is_empty() {
2315            return Ok(Vec::new());
2316        }
2317
2318        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2319    }
2320
2321    pub(crate) fn query_bm25_in_documents_cached(
2322        &self,
2323        space: &str,
2324        query: &str,
2325        fields: &[(&str, f32)],
2326        document_ids: &[i64],
2327        limit: usize,
2328    ) -> Result<Vec<BM25Hit>> {
2329        if document_ids.is_empty() {
2330            return Ok(Vec::new());
2331        }
2332
2333        self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2334    }
2335
2336    fn query_bm25_filtered(
2337        &self,
2338        space: &str,
2339        query: &str,
2340        fields: &[(&str, f32)],
2341        document_ids: Option<&[i64]>,
2342        limit: usize,
2343        reload_reader: bool,
2344    ) -> Result<Vec<BM25Hit>> {
2345        if limit == 0 {
2346            return Ok(Vec::new());
2347        }
2348
2349        if fields.is_empty() {
2350            return Err(CoreError::Internal(
2351                "bm25 query requires at least one field".to_string(),
2352            ));
2353        }
2354
2355        let space_indexes = self.get_space_indexes(space)?;
2356
2357        let schema = space_indexes.tantivy_index.schema();
2358        let query_fields = fields
2359            .iter()
2360            .map(|(name, boost)| {
2361                let field = resolve_tantivy_field(space_indexes.fields, name)?;
2362                let field_entry = schema.get_field_entry(field);
2363                let index_record_option = field_entry
2364                    .field_type()
2365                    .get_index_record_option()
2366                    .ok_or_else(|| {
2367                        CoreError::Internal(format!(
2368                            "bm25 field '{}' is not indexed",
2369                            field_entry.name()
2370                        ))
2371                    })?;
2372                Ok(Bm25FieldSpec {
2373                    field,
2374                    boost: *boost,
2375                    index_record_option,
2376                })
2377            })
2378            .collect::<Result<Vec<_>>>()?;
2379        let Some(parsed_query) =
2380            build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2381        else {
2382            return Ok(Vec::new());
2383        };
2384        let parsed_query = if let Some(document_ids) = document_ids {
2385            Box::new(BooleanQuery::new(vec![
2386                (Occur::Must, parsed_query),
2387                (
2388                    Occur::Must,
2389                    build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2390                ),
2391            ])) as Box<dyn Query>
2392        } else {
2393            parsed_query
2394        };
2395        if reload_reader {
2396            space_indexes.tantivy_reader.reload()?;
2397        }
2398        let searcher = space_indexes.tantivy_reader.searcher();
2399        let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2400
2401        let mut hits = Vec::with_capacity(docs.len());
2402        let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2403        let mut chunk_id_columns = HashMap::new();
2404        for (score, address) in docs {
2405            let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2406                std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2407                std::collections::hash_map::Entry::Vacant(entry) => {
2408                    let column = searcher
2409                        .segment_reader(address.segment_ord)
2410                        .fast_fields()
2411                        .u64(chunk_id_field_name)?;
2412                    entry.insert(column)
2413                }
2414            };
2415            let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2416                CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2417            })?;
2418            let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2419                CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2420            })?;
2421            hits.push(BM25Hit { chunk_id, score });
2422        }
2423
2424        Ok(hits)
2425    }
2426
2427    pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2428        let space_indexes = self.get_space_indexes(space)?;
2429        space_indexes.tantivy_reader.reload()?;
2430        Ok(())
2431    }
2432
2433    pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2434        let space_indexes = self.get_space_indexes(space)?;
2435        let mut writer = space_indexes
2436            .tantivy_writer
2437            .lock()
2438            .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2439        let Some(mut writer) = writer.take() else {
2440            return Ok(());
2441        };
2442        writer.commit()?;
2443        space_indexes.tantivy_reader.reload()?;
2444        Ok(())
2445    }
2446
2447    pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2448        self.batch_insert_usearch(space, &[(key, vector)])
2449    }
2450
2451    pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2452        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2453    }
2454
2455    pub(crate) fn batch_insert_usearch_deferred(
2456        &self,
2457        space: &str,
2458        entries: &[(i64, &[f32])],
2459    ) -> Result<()> {
2460        self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2461    }
2462
2463    fn batch_insert_usearch_with_save_mode(
2464        &self,
2465        space: &str,
2466        entries: &[(i64, &[f32])],
2467        save_mode: UsearchSaveMode,
2468    ) -> Result<()> {
2469        if entries.is_empty() {
2470            return Ok(());
2471        }
2472
2473        let space_indexes = self.get_space_indexes(space)?;
2474
2475        let expected_dimensions = entries[0].1.len();
2476        if expected_dimensions == 0 {
2477            return Err(CoreError::Internal(
2478                "cannot insert empty vector into usearch index".to_string(),
2479            ));
2480        }
2481        for (_, vector) in entries {
2482            if vector.len() != expected_dimensions {
2483                return Err(CoreError::Internal(format!(
2484                    "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2485                    vector.len()
2486                )));
2487            }
2488        }
2489
2490        let mut index = space_indexes
2491            .usearch_index
2492            .write()
2493            .map_err(|_| CoreError::poisoned("usearch index"))?;
2494        let ensure_started = std::time::Instant::now();
2495        ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2496        crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2497
2498        let target_capacity = index.size().saturating_add(entries.len());
2499        let reserve_started = std::time::Instant::now();
2500        index.reserve(target_capacity).map_err(|err| {
2501            CoreError::Internal(format!(
2502                "usearch reserve failed for {target_capacity} items: {err}"
2503            ))
2504        })?;
2505        crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2506
2507        if matches!(save_mode, UsearchSaveMode::Deferred) {
2508            self.mark_usearch_dirty(space)?;
2509        }
2510
2511        let add_started = std::time::Instant::now();
2512        for (key, vector) in entries {
2513            let key = u64::try_from(*key).map_err(|_| {
2514                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2515            })?;
2516            index
2517                .add::<f32>(key, vector)
2518                .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2519        }
2520        crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2521
2522        if matches!(save_mode, UsearchSaveMode::Immediate) {
2523            let save_started = std::time::Instant::now();
2524            save_usearch_index(&index, &space_indexes.usearch_path)?;
2525            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2526        }
2527        Ok(())
2528    }
2529
2530    pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2531        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2532    }
2533
2534    pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2535        self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2536    }
2537
2538    fn delete_usearch_with_save_mode(
2539        &self,
2540        space: &str,
2541        keys: &[i64],
2542        save_mode: UsearchSaveMode,
2543    ) -> Result<()> {
2544        if keys.is_empty() {
2545            return Ok(());
2546        }
2547
2548        let space_indexes = self.get_space_indexes(space)?;
2549        let index = space_indexes
2550            .usearch_index
2551            .write()
2552            .map_err(|_| CoreError::poisoned("usearch index"))?;
2553
2554        if matches!(save_mode, UsearchSaveMode::Deferred) {
2555            self.mark_usearch_dirty(space)?;
2556        }
2557
2558        let delete_started = std::time::Instant::now();
2559        for key in keys {
2560            let key = u64::try_from(*key).map_err(|_| {
2561                CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2562            })?;
2563            index
2564                .remove(key)
2565                .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2566        }
2567        crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2568
2569        if matches!(save_mode, UsearchSaveMode::Immediate) {
2570            let save_started = std::time::Instant::now();
2571            save_usearch_index(&index, &space_indexes.usearch_path)?;
2572            crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2573        }
2574        Ok(())
2575    }
2576
2577    pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2578        let spaces = {
2579            let mut dirty = self
2580                .dirty_usearch_spaces
2581                .lock()
2582                .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2583            if dirty.is_empty() {
2584                return Ok(());
2585            }
2586
2587            let mut spaces = dirty.drain().collect::<Vec<_>>();
2588            spaces.sort();
2589            spaces
2590        };
2591
2592        for (index, space) in spaces.iter().enumerate() {
2593            if let Err(err) = self.save_usearch_for_space(space) {
2594                let mut dirty = self
2595                    .dirty_usearch_spaces
2596                    .lock()
2597                    .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2598                for unsaved in &spaces[index..] {
2599                    dirty.insert(unsaved.clone());
2600                }
2601                return Err(err);
2602            }
2603            crate::profile::increment_update_count("usearch_saved_spaces", 1);
2604        }
2605
2606        Ok(())
2607    }
2608
2609    fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2610        self.dirty_usearch_spaces
2611            .lock()
2612            .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2613            .insert(space.to_string());
2614        Ok(())
2615    }
2616
2617    fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2618        let space_indexes = self.get_space_indexes(space)?;
2619        let index = space_indexes
2620            .usearch_index
2621            .read()
2622            .map_err(|_| CoreError::poisoned("usearch index"))?;
2623
2624        let save_started = std::time::Instant::now();
2625        save_usearch_index(&index, &space_indexes.usearch_path)?;
2626        crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2627        Ok(())
2628    }
2629
2630    pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2631        self.query_dense_filtered(space, vector, None, limit)
2632    }
2633
2634    pub fn query_dense_in_chunks(
2635        &self,
2636        space: &str,
2637        vector: &[f32],
2638        chunk_ids: &[i64],
2639        limit: usize,
2640    ) -> Result<Vec<DenseHit>> {
2641        if chunk_ids.is_empty() {
2642            return Ok(Vec::new());
2643        }
2644
2645        let allowed_keys = chunk_ids
2646            .iter()
2647            .map(|chunk_id| {
2648                u64::try_from(*chunk_id).map_err(|_| {
2649                    CoreError::Internal(format!(
2650                        "chunk_id must be non-negative for usearch query: {chunk_id}"
2651                    ))
2652                })
2653            })
2654            .collect::<Result<HashSet<_>>>()?;
2655        self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2656    }
2657
2658    pub(crate) fn query_dense_in_key_set(
2659        &self,
2660        space: &str,
2661        vector: &[f32],
2662        allowed_keys: &HashSet<u64>,
2663        limit: usize,
2664    ) -> Result<Vec<DenseHit>> {
2665        if allowed_keys.is_empty() {
2666            return Ok(Vec::new());
2667        }
2668
2669        self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2670    }
2671
2672    fn query_dense_filtered(
2673        &self,
2674        space: &str,
2675        vector: &[f32],
2676        allowed_keys: Option<&HashSet<u64>>,
2677        limit: usize,
2678    ) -> Result<Vec<DenseHit>> {
2679        if limit == 0 {
2680            return Ok(Vec::new());
2681        }
2682        if vector.is_empty() {
2683            return Err(CoreError::Internal(
2684                "cannot query usearch with empty vector".to_string(),
2685            ));
2686        }
2687
2688        let space_indexes = self.get_space_indexes(space)?;
2689        let index = space_indexes
2690            .usearch_index
2691            .read()
2692            .map_err(|_| CoreError::poisoned("usearch index"))?;
2693
2694        if index.size() == 0 {
2695            return Ok(Vec::new());
2696        }
2697        if vector.len() != index.dimensions() {
2698            return Err(CoreError::Internal(format!(
2699                "query vector dimension mismatch: expected {}, got {}",
2700                index.dimensions(),
2701                vector.len()
2702            )));
2703        }
2704
2705        let matches = if let Some(allowed_keys) = allowed_keys {
2706            index
2707                .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2708                .map_err(|err| {
2709                    CoreError::Internal(format!("usearch filtered query failed: {err}"))
2710                })?
2711        } else {
2712            index
2713                .search::<f32>(vector, limit)
2714                .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2715        };
2716        let hits = matches
2717            .keys
2718            .into_iter()
2719            .zip(matches.distances)
2720            .map(|(key, distance)| DenseHit {
2721                chunk_id: key as i64,
2722                distance,
2723            })
2724            .collect();
2725        Ok(hits)
2726    }
2727
2728    pub fn count_usearch(&self, space: &str) -> Result<usize> {
2729        let space_indexes = self.get_space_indexes(space)?;
2730        let index = space_indexes
2731            .usearch_index
2732            .read()
2733            .map_err(|_| CoreError::poisoned("usearch index"))?;
2734        Ok(index.size())
2735    }
2736
2737    pub fn clear_usearch(&self, space: &str) -> Result<()> {
2738        let space_indexes = self.get_space_indexes(space)?;
2739        let index = space_indexes
2740            .usearch_index
2741            .write()
2742            .map_err(|_| CoreError::poisoned("usearch index"))?;
2743        let clear_started = std::time::Instant::now();
2744        index
2745            .reset()
2746            .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2747        std::fs::File::create(&space_indexes.usearch_path)?;
2748        crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2749        Ok(())
2750    }
2751
2752    pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2753        self.get_fts_dirty_documents_filtered("", Vec::new())
2754    }
2755
2756    pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2757        self.get_fts_dirty_documents_filtered(
2758            " AND c.space_id = ?",
2759            vec![SqlValue::Integer(space_id)],
2760        )
2761    }
2762
2763    pub fn get_fts_dirty_documents_in_collections(
2764        &self,
2765        collection_ids: &[i64],
2766    ) -> Result<Vec<FtsDirtyRecord>> {
2767        if collection_ids.is_empty() {
2768            return Ok(Vec::new());
2769        }
2770
2771        let placeholders = vec!["?"; collection_ids.len()].join(", ");
2772        let clause = format!(" AND d.collection_id IN ({placeholders})");
2773        let params = collection_ids
2774            .iter()
2775            .map(|id| SqlValue::Integer(*id))
2776            .collect::<Vec<_>>();
2777        self.get_fts_dirty_documents_filtered(&clause, params)
2778    }
2779
2780    fn get_fts_dirty_documents_filtered(
2781        &self,
2782        scope_clause: &str,
2783        scope_params: Vec<SqlValue>,
2784    ) -> Result<Vec<FtsDirtyRecord>> {
2785        let conn = self
2786            .db
2787            .lock()
2788            .map_err(|_| CoreError::poisoned("database"))?;
2789
2790        let sql = format!(
2791            "SELECT d.id, d.path, d.title, d.hash, c.path, s.name
2792             FROM documents d
2793             JOIN collections c ON c.id = d.collection_id
2794             JOIN spaces s ON s.id = c.space_id
2795             WHERE d.fts_dirty = 1{scope_clause}
2796             ORDER BY d.id ASC"
2797        );
2798        let mut stmt = conn.prepare(&sql)?;
2799        let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2800            Ok((
2801                row.get::<_, i64>(0)?,
2802                row.get::<_, String>(1)?,
2803                row.get::<_, String>(2)?,
2804                row.get::<_, String>(3)?,
2805                row.get::<_, String>(4)?,
2806                row.get::<_, String>(5)?,
2807            ))
2808        })?;
2809        let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2810        drop(stmt);
2811
2812        let mut records = Vec::with_capacity(headers.len());
2813        for (doc_id, doc_path, doc_title, doc_hash, collection_path, space_name) in headers {
2814            let chunks = load_chunks_for_doc(&conn, doc_id)?;
2815            records.push(FtsDirtyRecord {
2816                doc_id,
2817                doc_path,
2818                doc_title: decode_optional_title(doc_title),
2819                doc_hash,
2820                collection_path: PathBuf::from(collection_path),
2821                space_name,
2822                chunks,
2823            });
2824        }
2825
2826        Ok(records)
2827    }
2828
2829    pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2830        if doc_ids.is_empty() {
2831            return Ok(());
2832        }
2833
2834        let conn = self
2835            .db
2836            .lock()
2837            .map_err(|_| CoreError::poisoned("database"))?;
2838
2839        let placeholders = vec!["?"; doc_ids.len()].join(", ");
2840        let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2841        conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2842        Ok(())
2843    }
2844
2845    pub fn count_documents_in_collection(
2846        &self,
2847        collection_id: i64,
2848        active_only: bool,
2849    ) -> Result<usize> {
2850        let conn = self
2851            .db
2852            .lock()
2853            .map_err(|_| CoreError::poisoned("database"))?;
2854        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2855        let active_only = i64::from(active_only);
2856        query_count(
2857            &conn,
2858            "SELECT COUNT(*)
2859             FROM documents
2860             WHERE collection_id = ?1
2861               AND (?2 = 0 OR active = 1)",
2862            params![collection_id, active_only],
2863        )
2864    }
2865
2866    pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2867        let conn = self
2868            .db
2869            .lock()
2870            .map_err(|_| CoreError::poisoned("database"))?;
2871        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2872
2873        query_count(
2874            &conn,
2875            "SELECT COUNT(*)
2876             FROM chunks c
2877             JOIN documents d ON d.id = c.doc_id
2878             WHERE d.collection_id = ?1",
2879            params![collection_id],
2880        )
2881    }
2882
2883    pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2884        let conn = self
2885            .db
2886            .lock()
2887            .map_err(|_| CoreError::poisoned("database"))?;
2888        let _collection_name = lookup_collection_name(&conn, collection_id)?;
2889
2890        query_count(
2891            &conn,
2892            "SELECT COUNT(DISTINCT e.chunk_id)
2893             FROM embeddings e
2894             JOIN chunks c ON c.id = e.chunk_id
2895             JOIN documents d ON d.id = c.doc_id
2896             WHERE d.collection_id = ?1",
2897            params![collection_id],
2898        )
2899    }
2900
2901    pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2902        let conn = self
2903            .db
2904            .lock()
2905            .map_err(|_| CoreError::poisoned("database"))?;
2906
2907        match space_id {
2908            Some(space_id) => {
2909                let _space_name = lookup_space_name(&conn, space_id)?;
2910                query_count(
2911                    &conn,
2912                    "SELECT COUNT(*)
2913                     FROM documents d
2914                     JOIN collections c ON c.id = d.collection_id
2915                     WHERE c.space_id = ?1",
2916                    params![space_id],
2917                )
2918            }
2919            None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2920        }
2921    }
2922
2923    pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2924        let conn = self
2925            .db
2926            .lock()
2927            .map_err(|_| CoreError::poisoned("database"))?;
2928
2929        match space_id {
2930            Some(space_id) => {
2931                let _space_name = lookup_space_name(&conn, space_id)?;
2932                query_count(
2933                    &conn,
2934                    "SELECT COUNT(*)
2935                     FROM chunks c
2936                     JOIN documents d ON d.id = c.doc_id
2937                     JOIN collections col ON col.id = d.collection_id
2938                     WHERE col.space_id = ?1",
2939                    params![space_id],
2940                )
2941            }
2942            None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2943        }
2944    }
2945
2946    pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2947        let conn = self
2948            .db
2949            .lock()
2950            .map_err(|_| CoreError::poisoned("database"))?;
2951
2952        match space_id {
2953            Some(space_id) => {
2954                let _space_name = lookup_space_name(&conn, space_id)?;
2955                query_count(
2956                    &conn,
2957                    "SELECT COUNT(DISTINCT e.chunk_id)
2958                     FROM embeddings e
2959                     JOIN chunks c ON c.id = e.chunk_id
2960                     JOIN documents d ON d.id = c.doc_id
2961                     JOIN collections col ON col.id = d.collection_id
2962                     WHERE col.space_id = ?1",
2963                    params![space_id],
2964                )
2965            }
2966            None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2967        }
2968    }
2969
2970    pub fn disk_usage(&self) -> Result<DiskUsage> {
2971        let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2972
2973        let mut tantivy_bytes = 0_u64;
2974        let mut usearch_bytes = 0_u64;
2975        let spaces_dir = self.cache_dir.join(SPACES_DIR);
2976        if spaces_dir.exists() {
2977            for entry in std::fs::read_dir(&spaces_dir)? {
2978                let space_dir = entry?.path();
2979                if !space_dir.is_dir() {
2980                    continue;
2981                }
2982
2983                tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2984                usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2985            }
2986        }
2987
2988        let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2989        let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2990
2991        Ok(DiskUsage {
2992            sqlite_bytes,
2993            tantivy_bytes,
2994            usearch_bytes,
2995            models_bytes,
2996            total_bytes,
2997        })
2998    }
2999
3000    fn unload_space(&self, name: &str) -> Result<()> {
3001        let mut spaces = self
3002            .spaces
3003            .write()
3004            .map_err(|_| CoreError::poisoned("spaces"))?;
3005        spaces.remove(name);
3006        Ok(())
3007    }
3008
3009    fn remove_space_artifacts(&self, name: &str) -> Result<()> {
3010        let space_root = self.space_root_path(name);
3011        if space_root.exists() {
3012            std::fs::remove_dir_all(space_root)?;
3013        }
3014        Ok(())
3015    }
3016
3017    fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
3018        let old_root = self.space_root_path(old);
3019        let new_root = self.space_root_path(new);
3020        if !old_root.exists() {
3021            return Ok(());
3022        }
3023
3024        if new_root.exists() {
3025            return Err(CoreError::Internal(format!(
3026                "cannot rename space artifacts: destination already exists: {}",
3027                new_root.display()
3028            )));
3029        }
3030
3031        if let Some(parent) = new_root.parent() {
3032            std::fs::create_dir_all(parent)?;
3033        }
3034        std::fs::rename(old_root, new_root)?;
3035        Ok(())
3036    }
3037
3038    fn space_root_path(&self, name: &str) -> PathBuf {
3039        self.cache_dir.join(SPACES_DIR).join(name)
3040    }
3041
3042    fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3043        let space_root = self.space_root_path(name);
3044        let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3045        let usearch_path = space_root.join(USEARCH_FILENAME);
3046        (tantivy_dir, usearch_path)
3047    }
3048
3049    fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3050        self.open_space(name)?;
3051        let spaces = self
3052            .spaces
3053            .read()
3054            .map_err(|_| CoreError::poisoned("spaces"))?;
3055        spaces.get(name).cloned().ok_or_else(|| {
3056            KboltError::SpaceNotFound {
3057                name: name.to_string(),
3058            }
3059            .into()
3060        })
3061    }
3062}
3063
3064fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3065    let result = conn.query_row(
3066        "SELECT name FROM spaces WHERE id = ?1",
3067        params![space_id],
3068        |row| row.get::<_, String>(0),
3069    );
3070    match result {
3071        Ok(name) => Ok(name),
3072        Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3073            name: format!("id={space_id}"),
3074        }
3075        .into()),
3076        Err(err) => Err(err.into()),
3077    }
3078}
3079
3080fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3081    let result = conn.query_row(
3082        "SELECT name FROM collections WHERE id = ?1",
3083        params![collection_id],
3084        |row| row.get::<_, String>(0),
3085    );
3086    match result {
3087        Ok(name) => Ok(name),
3088        Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3089            name: format!("id={collection_id}"),
3090        }
3091        .into()),
3092        Err(err) => Err(err.into()),
3093    }
3094}
3095
3096fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3097    let result = conn.query_row(
3098        "SELECT id FROM documents WHERE id = ?1",
3099        params![doc_id],
3100        |row| row.get::<_, i64>(0),
3101    );
3102    match result {
3103        Ok(id) => Ok(id),
3104        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3105            path: format!("id={doc_id}"),
3106        }
3107        .into()),
3108        Err(err) => Err(err.into()),
3109    }
3110}
3111
3112fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3113    let mut stmt = conn.prepare(
3114        "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3115         FROM chunks
3116         WHERE doc_id = ?1
3117         ORDER BY seq ASC",
3118    )?;
3119    let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3120    let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3121    Ok(chunks)
3122}
3123
3124fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3125    let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3126    let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3127    let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3128    Ok(chunk_ids)
3129}
3130
3131fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3132    let result = conn.query_row(
3133        "SELECT doc_id FROM chunks WHERE id = ?1",
3134        params![chunk_id],
3135        |row| row.get::<_, i64>(0),
3136    );
3137    match result {
3138        Ok(doc_id) => Ok(doc_id),
3139        Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3140            path: format!("chunk_id={chunk_id}"),
3141        }
3142        .into()),
3143        Err(err) => Err(err.into()),
3144    }
3145}
3146
3147fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3148    let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3149    Ok(count as usize)
3150}
3151
3152fn file_size_or_zero(path: &Path) -> Result<u64> {
3153    if !path.exists() {
3154        return Ok(0);
3155    }
3156
3157    let metadata = std::fs::metadata(path)?;
3158    if metadata.is_file() {
3159        Ok(metadata.len())
3160    } else {
3161        Ok(0)
3162    }
3163}
3164
3165fn dir_size_or_zero(path: &Path) -> Result<u64> {
3166    if !path.exists() {
3167        return Ok(0);
3168    }
3169
3170    let mut total = 0_u64;
3171    for entry in std::fs::read_dir(path)? {
3172        let entry = entry?;
3173        let child_path = entry.path();
3174        let metadata = std::fs::symlink_metadata(&child_path)?;
3175        if metadata.is_file() {
3176            total += metadata.len();
3177        } else if metadata.is_dir() {
3178            total += dir_size_or_zero(&child_path)?;
3179        }
3180    }
3181
3182    Ok(total)
3183}
3184
3185fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3186    let meta_path = path.join("meta.json");
3187    if meta_path.exists() {
3188        return Ok(Index::open_in_dir(path)?);
3189    }
3190
3191    Ok(Index::create_in_dir(path, tantivy_schema())?)
3192}
3193
3194fn with_tantivy_writer<T>(
3195    space_indexes: &SpaceIndexes,
3196    f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3197) -> Result<T> {
3198    let mut writer = space_indexes
3199        .tantivy_writer
3200        .lock()
3201        .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3202
3203    if writer.is_none() {
3204        *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3205    }
3206
3207    let writer = writer
3208        .as_mut()
3209        .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3210    f(writer)
3211}
3212
3213fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3214    if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3215        return Ok(());
3216    }
3217
3218    let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3219    if document_count == 0 {
3220        return Ok(());
3221    }
3222
3223    Err(KboltError::Config(
3224        "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3225    )
3226    .into())
3227}
3228
3229fn ensure_schema_version(conn: &Connection) -> Result<()> {
3230    let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3231    if current > SCHEMA_VERSION {
3232        return Err(KboltError::Config(format!(
3233            "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3234        ))
3235        .into());
3236    }
3237
3238    if current < SCHEMA_VERSION {
3239        conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3240    }
3241
3242    Ok(())
3243}
3244
3245fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3246    let exists = conn.query_row(
3247        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3248        [table],
3249        |row| row.get::<_, i64>(0),
3250    )?;
3251    Ok(exists != 0)
3252}
3253
3254fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3255    let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3256    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3257    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3258    drop(stmt);
3259
3260    if columns.iter().any(|column| column == "generation_key") {
3261        return Ok(());
3262    }
3263
3264    conn.execute(
3265        "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3266        [],
3267    )?;
3268    Ok(())
3269}
3270
3271fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3272    let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3273    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3274    let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3275    drop(stmt);
3276
3277    if columns.iter().any(|column| column == "retrieval_prefix") {
3278        return Ok(());
3279    }
3280
3281    conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3282    Ok(())
3283}
3284
3285fn build_literal_bm25_query(
3286    index: &Index,
3287    fields: &[Bm25FieldSpec],
3288    query: &str,
3289) -> Result<Option<Box<dyn Query>>> {
3290    let mut clauses = Vec::new();
3291    for field in fields {
3292        for token in analyzed_terms_for_field(index, field.field, query)? {
3293            let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3294                Term::from_field_text(field.field, &token),
3295                field.index_record_option,
3296            ));
3297            let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3298                Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3299            } else {
3300                term_query
3301            };
3302            clauses.push((Occur::Should, query));
3303        }
3304    }
3305
3306    if clauses.is_empty() {
3307        Ok(None)
3308    } else {
3309        Ok(Some(Box::new(BooleanQuery::new(clauses))))
3310    }
3311}
3312
3313fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3314    let mut terms = Vec::new();
3315    let mut seen = HashSet::new();
3316    for doc_id in document_ids {
3317        let doc_id = u64::try_from(*doc_id).map_err(|_| {
3318            CoreError::Internal(format!(
3319                "doc_id must be non-negative for tantivy query: {doc_id}"
3320            ))
3321        })?;
3322        if seen.insert(doc_id) {
3323            terms.push(Term::from_field_u64(field, doc_id));
3324        }
3325    }
3326
3327    Ok(Box::new(ConstScoreQuery::new(
3328        Box::new(TermSetQuery::new(terms)),
3329        0.0,
3330    )))
3331}
3332
3333fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3334    let mut analyzer = index.tokenizer_for_field(field)?;
3335    let mut stream = analyzer.token_stream(query);
3336    let mut terms = Vec::new();
3337    let mut seen = HashSet::new();
3338    while let Some(token) = stream.next() {
3339        if token.text.is_empty() {
3340            continue;
3341        }
3342        let text = token.text.clone();
3343        if seen.insert(text.clone()) {
3344            terms.push(text);
3345        }
3346    }
3347    Ok(terms)
3348}
3349
3350fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3351    let options = IndexOptions {
3352        dimensions,
3353        metric: MetricKind::Cos,
3354        quantization: ScalarKind::F32,
3355        connectivity: 16,
3356        expansion_add: 200,
3357        expansion_search: 100,
3358        ..IndexOptions::default()
3359    };
3360    usearch::Index::new(&options)
3361        .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3362}
3363
3364fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3365    let index = new_usearch_index(256)?;
3366    let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3367    if file_size > 0 {
3368        let path = path
3369            .to_str()
3370            .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3371        index
3372            .load(path)
3373            .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3374    }
3375    Ok(index)
3376}
3377
3378fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3379    if index.size() == 0 && index.dimensions() != expected_dimensions {
3380        *index = new_usearch_index(expected_dimensions)?;
3381        return Ok(());
3382    }
3383
3384    if index.dimensions() != expected_dimensions {
3385        return Err(CoreError::Internal(format!(
3386            "usearch vector dimension mismatch: index expects {}, got {}",
3387            index.dimensions(),
3388            expected_dimensions
3389        )));
3390    }
3391    Ok(())
3392}
3393
3394fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3395    if index.size() == 0 {
3396        std::fs::File::create(path)?;
3397        return Ok(());
3398    }
3399
3400    let path = path
3401        .to_str()
3402        .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3403    index
3404        .save(path)
3405        .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3406    Ok(())
3407}
3408
3409fn tantivy_schema() -> tantivy::schema::Schema {
3410    let mut builder = tantivy::schema::Schema::builder();
3411    builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3412    builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3413    builder.add_text_field("filepath", TEXT | STORED);
3414    builder.add_text_field("title", TEXT | STORED);
3415    builder.add_text_field("heading", TEXT | STORED);
3416    builder.add_text_field("body", TEXT);
3417    builder.build()
3418}
3419
3420fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3421    Ok(TantivyFields {
3422        chunk_id: schema.get_field("chunk_id").map_err(|_| {
3423            CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3424        })?,
3425        doc_id: schema
3426            .get_field("doc_id")
3427            .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3428        filepath: schema.get_field("filepath").map_err(|_| {
3429            CoreError::Internal("tantivy schema missing field: filepath".to_string())
3430        })?,
3431        title: schema
3432            .get_field("title")
3433            .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3434        heading: schema.get_field("heading").map_err(|_| {
3435            CoreError::Internal("tantivy schema missing field: heading".to_string())
3436        })?,
3437        body: schema
3438            .get_field("body")
3439            .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3440    })
3441}
3442
3443fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3444    match name {
3445        "chunk_id" => Ok(fields.chunk_id),
3446        "doc_id" => Ok(fields.doc_id),
3447        "filepath" => Ok(fields.filepath),
3448        "title" => Ok(fields.title),
3449        "heading" => Ok(fields.heading),
3450        "body" => Ok(fields.body),
3451        other => Err(CoreError::Internal(format!(
3452            "unsupported tantivy field: {other}"
3453        ))),
3454    }
3455}
3456
3457fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3458    match extensions {
3459        None => Ok(None),
3460        Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3461    }
3462}
3463
3464fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3465    match raw {
3466        None => Ok(None),
3467        Some(json) => serde_json::from_str::<Vec<String>>(&json)
3468            .map(Some)
3469            .map_err(Into::into),
3470    }
3471}
3472
3473fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3474    Ok(SpaceRow {
3475        id: row.get(0)?,
3476        name: row.get(1)?,
3477        description: row.get(2)?,
3478        created: row.get(3)?,
3479    })
3480}
3481
3482fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3483    let raw_extensions: Option<String> = row.get(5)?;
3484    let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3485        Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3486    })?;
3487    Ok(CollectionRow {
3488        id: row.get(0)?,
3489        space_id: row.get(1)?,
3490        name: row.get(2)?,
3491        path: PathBuf::from(row.get::<_, String>(3)?),
3492        description: row.get(4)?,
3493        extensions,
3494        created: row.get(6)?,
3495        updated: row.get(7)?,
3496    })
3497}
3498
3499fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3500    decode_document_row_at(row, 0)
3501}
3502
3503fn decode_document_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<DocumentRow> {
3504    let active_value: i64 = row.get(base + 6)?;
3505    let fts_dirty_value: i64 = row.get(base + 8)?;
3506    Ok(DocumentRow {
3507        id: row.get(base)?,
3508        collection_id: row.get(base + 1)?,
3509        path: row.get(base + 2)?,
3510        title: decode_optional_title(row.get::<_, String>(base + 3)?),
3511        hash: row.get(base + 4)?,
3512        modified: row.get(base + 5)?,
3513        active: active_value != 0,
3514        deactivated_at: row.get(base + 7)?,
3515        fts_dirty: fts_dirty_value != 0,
3516    })
3517}
3518
3519fn normalized_title(title: Option<&str>) -> Option<&str> {
3520    title.map(str::trim).filter(|title| !title.is_empty())
3521}
3522
3523fn decode_optional_title(title: String) -> Option<String> {
3524    let trimmed = title.trim();
3525    if trimmed.is_empty() {
3526        None
3527    } else if trimmed.len() == title.len() {
3528        Some(title)
3529    } else {
3530        Some(trimmed.to_string())
3531    }
3532}
3533
3534fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3535    Ok(DocumentTextRow {
3536        doc_id: row.get(0)?,
3537        extractor_key: row.get(1)?,
3538        source_hash: row.get(2)?,
3539        text_hash: row.get(3)?,
3540        generation_key: row.get(4)?,
3541        text: row.get(5)?,
3542        created: row.get(6)?,
3543    })
3544}
3545
3546fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3547    decode_chunk_row_at(row, 0)
3548}
3549
3550fn decode_chunk_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<ChunkRow> {
3551    let offset_value: i64 = row.get(base + 3)?;
3552    let length_value: i64 = row.get(base + 4)?;
3553    let kind_raw: String = row.get(base + 6)?;
3554    let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3555        Error::FromSqlConversionFailure(base + 6, rusqlite::types::Type::Text, Box::new(err))
3556    })?;
3557    Ok(ChunkRow {
3558        id: row.get(base)?,
3559        doc_id: row.get(base + 1)?,
3560        seq: row.get(base + 2)?,
3561        offset: decode_non_negative_usize(offset_value, base + 3, "chunks.offset")?,
3562        length: decode_non_negative_usize(length_value, base + 4, "chunks.length")?,
3563        heading: row.get(base + 5)?,
3564        kind,
3565        retrieval_prefix: row.get(base + 7)?,
3566    })
3567}
3568
3569fn decode_non_negative_usize(
3570    value: i64,
3571    column: usize,
3572    name: &'static str,
3573) -> rusqlite::Result<usize> {
3574    if value < 0 {
3575        return Err(Error::FromSqlConversionFailure(
3576            column,
3577            SqlType::Integer,
3578            Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3579        ));
3580    }
3581
3582    Ok(value as usize)
3583}
3584
3585fn canonical_chunk_slice_from_bytes(
3586    chunk_id: i64,
3587    offset_value: i64,
3588    length_value: i64,
3589    document_len_value: i64,
3590    start_byte: Vec<u8>,
3591    end_byte: Vec<u8>,
3592    bytes: Vec<u8>,
3593) -> Result<String> {
3594    if offset_value < 0 {
3595        return Err(CoreError::Internal(format!(
3596            "chunk {chunk_id} offset must not be negative"
3597        )));
3598    }
3599    if length_value < 0 {
3600        return Err(CoreError::Internal(format!(
3601            "chunk {chunk_id} length must not be negative"
3602        )));
3603    }
3604    if document_len_value < 0 {
3605        return Err(CoreError::Internal(format!(
3606            "document text length for chunk {chunk_id} must not be negative"
3607        )));
3608    }
3609
3610    let offset = offset_value as usize;
3611    let length = length_value as usize;
3612    let document_len = document_len_value as usize;
3613    let end = offset.checked_add(length).ok_or_else(|| {
3614        CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3615    })?;
3616    if end > document_len {
3617        return Err(CoreError::Internal(format!(
3618            "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3619        )));
3620    }
3621    if bytes.len() != length {
3622        return Err(CoreError::Internal(format!(
3623            "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3624            bytes.len()
3625        )));
3626    }
3627    validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3628    validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3629
3630    String::from_utf8(bytes).map_err(|err| {
3631        CoreError::Internal(format!(
3632            "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3633        ))
3634    })
3635}
3636
3637fn validate_sqlite_utf8_boundary(
3638    chunk_id: i64,
3639    index: usize,
3640    document_len: usize,
3641    byte_at_index: &[u8],
3642) -> Result<()> {
3643    if index == 0 || index == document_len {
3644        return Ok(());
3645    }
3646    if byte_at_index.len() != 1 {
3647        return Err(CoreError::Internal(format!(
3648            "chunk {chunk_id} text boundary byte at {index} is missing"
3649        )));
3650    }
3651    if (0x80..0xC0).contains(&byte_at_index[0]) {
3652        return Err(CoreError::Internal(format!(
3653            "chunk {chunk_id} text span is not on UTF-8 boundaries"
3654        )));
3655    }
3656
3657    Ok(())
3658}
3659
3660pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3661    let label = format!("chunk {}", chunk.id);
3662    let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3663
3664    Ok(document_text[chunk.offset..end].to_string())
3665}
3666
3667fn validate_text_span(
3668    document_text: &str,
3669    offset: usize,
3670    length: usize,
3671    label: &str,
3672) -> Result<usize> {
3673    let end = offset
3674        .checked_add(length)
3675        .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3676    if end > document_text.len() {
3677        return Err(CoreError::Internal(format!(
3678            "{label} text span {}..{} exceeds document text length {}",
3679            offset,
3680            end,
3681            document_text.len()
3682        )));
3683    }
3684    if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3685        return Err(CoreError::Internal(format!(
3686            "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3687        )));
3688    }
3689
3690    Ok(end)
3691}
3692
3693pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3694    KboltError::Internal(format!(
3695        "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3696    ))
3697    .into()
3698}
3699
3700#[cfg(test)]
3701mod tests;