1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12 BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29 Immediate,
30 Deferred,
31}
32
33pub struct Storage {
34 db: Mutex<Connection>,
35 cache_dir: PathBuf,
36 spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37 dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42 pub id: i64,
43 pub name: String,
44 pub description: Option<String>,
45 pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50 pub id: i64,
51 pub space_id: i64,
52 pub name: String,
53 pub path: PathBuf,
54 pub description: Option<String>,
55 pub extensions: Option<Vec<String>>,
56 pub created: String,
57 pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62 pub id: i64,
63 pub collection_id: i64,
64 pub path: String,
65 pub title: Option<String>,
66 pub hash: String,
67 pub modified: String,
68 pub active: bool,
69 pub deactivated_at: Option<String>,
70 pub fts_dirty: bool,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FileListRow {
75 pub doc_id: i64,
76 pub path: String,
77 pub title: Option<String>,
78 pub hash: String,
79 pub active: bool,
80 pub chunk_count: usize,
81 pub embedded_chunk_count: usize,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct ChunkRow {
86 pub id: i64,
87 pub doc_id: i64,
88 pub seq: i32,
89 pub offset: usize,
90 pub length: usize,
91 pub heading: Option<String>,
92 pub kind: FinalChunkKind,
93 pub retrieval_prefix: Option<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ChunkInsert {
98 pub seq: i32,
99 pub offset: usize,
100 pub length: usize,
101 pub heading: Option<String>,
102 pub kind: FinalChunkKind,
103 pub retrieval_prefix: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct DocumentTextRow {
108 pub doc_id: i64,
109 pub extractor_key: String,
110 pub source_hash: String,
111 pub text_hash: String,
112 pub generation_key: String,
113 pub text: String,
114 pub created: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct ChunkTextRow {
119 pub chunk: ChunkRow,
120 pub extractor_key: String,
121 pub text: String,
122}
123
124pub struct DocumentGenerationReplace<'a> {
125 pub collection_id: i64,
126 pub path: &'a str,
127 pub title: Option<&'a str>,
128 pub hash: &'a str,
129 pub modified: &'a str,
130 pub extractor_key: &'a str,
131 pub source_hash: &'a str,
132 pub text_hash: &'a str,
133 pub generation_key: &'a str,
134 pub text: &'a str,
135 pub chunks: &'a [ChunkInsert],
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct DocumentGenerationReplaceResult {
140 pub doc_id: i64,
141 pub old_chunk_ids: Vec<i64>,
142 pub chunk_ids: Vec<i64>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct EmbedRecord {
147 pub chunk: ChunkRow,
148 pub doc_path: String,
149 pub doc_title: Option<String>,
150 pub collection_path: PathBuf,
151 pub space_name: String,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct FtsDirtyRecord {
156 pub doc_id: i64,
157 pub doc_path: String,
158 pub doc_title: Option<String>,
159 pub doc_hash: String,
160 pub collection_path: PathBuf,
161 pub space_name: String,
162 pub chunks: Vec<ChunkRow>,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct ReapableDocument {
167 pub doc_id: i64,
168 pub space_name: String,
169 pub chunk_ids: Vec<i64>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
173pub struct TantivyEntry {
174 pub chunk_id: i64,
175 pub doc_id: i64,
176 pub filepath: String,
177 pub title: Option<String>,
178 pub heading: Option<String>,
179 pub body: String,
180}
181
182#[derive(Debug, Clone, PartialEq)]
183pub struct BM25Hit {
184 pub chunk_id: i64,
185 pub score: f32,
186}
187
188#[derive(Debug, Clone, PartialEq)]
189pub struct DenseHit {
190 pub chunk_id: i64,
191 pub distance: f32,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct SearchScopeSummary {
196 pub document_ids: Vec<i64>,
197 pub chunk_count: usize,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub enum SpaceResolution {
202 Found(SpaceRow),
203 Ambiguous(Vec<String>),
204 NotFound,
205}
206
207struct SpaceIndexes {
208 _tantivy_dir: PathBuf,
209 usearch_path: PathBuf,
210 tantivy_index: Index,
211 tantivy_reader: IndexReader,
212 tantivy_writer: Mutex<Option<IndexWriter>>,
213 usearch_index: RwLock<usearch::Index>,
214 fields: TantivyFields,
215}
216
217#[derive(Debug, Clone, Copy)]
218struct TantivyFields {
219 chunk_id: Field,
220 doc_id: Field,
221 filepath: Field,
222 title: Field,
223 heading: Field,
224 body: Field,
225}
226
227#[derive(Debug, Clone, Copy)]
228struct Bm25FieldSpec {
229 field: Field,
230 boost: f32,
231 index_record_option: IndexRecordOption,
232}
233
234impl Storage {
235 pub fn new(cache_dir: &Path) -> Result<Self> {
236 std::fs::create_dir_all(cache_dir)?;
237 let db_path = cache_dir.join(DB_FILE);
238 let conn = Connection::open(db_path)?;
239 reject_incompatible_legacy_index(&conn)?;
240 conn.execute_batch(
241 r#"
242PRAGMA foreign_keys = ON;
243PRAGMA journal_mode = WAL;
244
245CREATE TABLE IF NOT EXISTS spaces (
246 id INTEGER PRIMARY KEY,
247 name TEXT NOT NULL UNIQUE,
248 description TEXT,
249 created TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
250);
251
252CREATE TABLE IF NOT EXISTS collections (
253 id INTEGER PRIMARY KEY,
254 space_id INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
255 name TEXT NOT NULL,
256 path TEXT NOT NULL,
257 description TEXT,
258 extensions TEXT,
259 created TEXT NOT NULL,
260 updated TEXT NOT NULL,
261 UNIQUE(space_id, name)
262);
263
264CREATE TABLE IF NOT EXISTS documents (
265 id INTEGER PRIMARY KEY,
266 collection_id INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
267 path TEXT NOT NULL,
268 title TEXT NOT NULL DEFAULT '',
269 hash TEXT NOT NULL,
270 modified TEXT NOT NULL,
271 active INTEGER NOT NULL DEFAULT 1,
272 deactivated_at TEXT,
273 fts_dirty INTEGER NOT NULL DEFAULT 0,
274 UNIQUE(collection_id, path)
275);
276CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
277CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
278CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
279
280CREATE TABLE IF NOT EXISTS chunks (
281 id INTEGER PRIMARY KEY,
282 doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
283 seq INTEGER NOT NULL,
284 offset INTEGER NOT NULL,
285 length INTEGER NOT NULL,
286 heading TEXT,
287 kind TEXT NOT NULL DEFAULT 'section',
288 retrieval_prefix TEXT,
289 UNIQUE(doc_id, seq)
290);
291CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
292
293CREATE TABLE IF NOT EXISTS embeddings (
294 chunk_id INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
295 model TEXT NOT NULL,
296 embedded_at TEXT NOT NULL,
297 PRIMARY KEY (chunk_id, model)
298);
299
300CREATE TABLE IF NOT EXISTS document_texts (
301 doc_id INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
302 extractor_key TEXT NOT NULL,
303 source_hash TEXT NOT NULL,
304 text_hash TEXT NOT NULL,
305 generation_key TEXT NOT NULL DEFAULT '',
306 text TEXT NOT NULL,
307 created TEXT NOT NULL
308);
309"#,
310 )?;
311 ensure_document_texts_generation_key_column(&conn)?;
312 ensure_chunks_retrieval_prefix_column(&conn)?;
313 ensure_schema_version(&conn)?;
314
315 conn.execute(
316 "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
317 params![DEFAULT_SPACE_NAME],
318 )?;
319
320 let storage = Self {
321 db: Mutex::new(conn),
322 cache_dir: cache_dir.to_path_buf(),
323 spaces: RwLock::new(HashMap::new()),
324 dirty_usearch_spaces: Mutex::new(HashSet::new()),
325 };
326 storage.open_space(DEFAULT_SPACE_NAME)?;
327
328 Ok(storage)
329 }
330
331 pub fn open_space(&self, name: &str) -> Result<()> {
332 let _space = self.get_space(name)?;
333
334 {
335 let spaces = self
336 .spaces
337 .read()
338 .map_err(|_| CoreError::poisoned("spaces"))?;
339 if spaces.contains_key(name) {
340 return Ok(());
341 }
342 }
343
344 let (tantivy_dir, usearch_path) = self.space_paths(name);
345 std::fs::create_dir_all(&tantivy_dir)?;
346 std::fs::OpenOptions::new()
347 .create(true)
348 .append(true)
349 .open(&usearch_path)?;
350
351 let mut spaces = self
352 .spaces
353 .write()
354 .map_err(|_| CoreError::poisoned("spaces"))?;
355 if spaces.contains_key(name) {
356 return Ok(());
357 }
358
359 let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
360 let tantivy_reader = tantivy_index.reader()?;
361 let usearch_index = open_or_create_usearch_index(&usearch_path)?;
362 let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
363
364 spaces.insert(
365 name.to_string(),
366 Arc::new(SpaceIndexes {
367 _tantivy_dir: tantivy_dir,
368 usearch_path,
369 tantivy_index,
370 tantivy_reader,
371 tantivy_writer: Mutex::new(None),
372 usearch_index: RwLock::new(usearch_index),
373 fields,
374 }),
375 );
376
377 Ok(())
378 }
379
380 pub fn close_space(&self, name: &str) -> Result<()> {
381 let _space = self.get_space(name)?;
382 let mut spaces = self
383 .spaces
384 .write()
385 .map_err(|_| CoreError::poisoned("spaces"))?;
386 let _removed = spaces.remove(name);
387 Ok(())
388 }
389
390 pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
391 let space_id = {
392 let conn = self
393 .db
394 .lock()
395 .map_err(|_| CoreError::poisoned("database"))?;
396
397 let result = conn.execute(
398 "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
399 params![name, description],
400 );
401
402 match result {
403 Ok(_) => conn.last_insert_rowid(),
404 Err(err) => {
405 return match err {
406 Error::SqliteFailure(sqlite_err, _)
407 if sqlite_err.code == ErrorCode::ConstraintViolation =>
408 {
409 Err(KboltError::SpaceAlreadyExists {
410 name: name.to_string(),
411 }
412 .into())
413 }
414 other => Err(other.into()),
415 };
416 }
417 }
418 };
419
420 if let Err(open_err) = self.open_space(name) {
421 let rollback_result = self
422 .db
423 .lock()
424 .map_err(|_| CoreError::poisoned("database"))?
425 .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
426
427 if let Err(rollback_err) = rollback_result {
428 return Err(CoreError::Internal(format!(
429 "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
430 )));
431 }
432
433 return Err(open_err);
434 }
435
436 Ok(space_id)
437 }
438
439 pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
440 let conn = self
441 .db
442 .lock()
443 .map_err(|_| CoreError::poisoned("database"))?;
444 let mut stmt =
445 conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
446
447 let row = stmt.query_row(params![name], decode_space_row);
448
449 match row {
450 Ok(space) => Ok(space),
451 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
452 name: name.to_string(),
453 }
454 .into()),
455 Err(err) => Err(err.into()),
456 }
457 }
458
459 pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
460 let conn = self
461 .db
462 .lock()
463 .map_err(|_| CoreError::poisoned("database"))?;
464 let mut stmt =
465 conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
466
467 let row = stmt.query_row(params![id], decode_space_row);
468
469 match row {
470 Ok(space) => Ok(space),
471 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
472 name: format!("id={id}"),
473 }
474 .into()),
475 Err(err) => Err(err.into()),
476 }
477 }
478
479 pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
480 let conn = self
481 .db
482 .lock()
483 .map_err(|_| CoreError::poisoned("database"))?;
484 let mut stmt = conn.prepare(
485 "SELECT id, name, description, created
486 FROM spaces
487 ORDER BY name ASC",
488 )?;
489
490 let rows = stmt.query_map([], decode_space_row)?;
491
492 let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
493 Ok(spaces)
494 }
495
496 pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
497 let conn = self
498 .db
499 .lock()
500 .map_err(|_| CoreError::poisoned("database"))?;
501 let mut stmt = conn.prepare(
502 "SELECT s.id, s.name, s.description, s.created
503 FROM spaces s
504 JOIN collections c ON c.space_id = s.id
505 WHERE c.name = ?1
506 ORDER BY s.name ASC",
507 )?;
508 let rows = stmt.query_map(params![collection], decode_space_row)?;
509 let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
510
511 if matches.is_empty() {
512 return Ok(SpaceResolution::NotFound);
513 }
514
515 if matches.len() == 1 {
516 return Ok(SpaceResolution::Found(matches[0].clone()));
517 }
518
519 let spaces = matches.into_iter().map(|space| space.name).collect();
520 Ok(SpaceResolution::Ambiguous(spaces))
521 }
522
523 pub fn delete_space(&self, name: &str) -> Result<()> {
524 if name == DEFAULT_SPACE_NAME {
525 let conn = self
526 .db
527 .lock()
528 .map_err(|_| CoreError::poisoned("database"))?;
529 conn.execute(
530 "DELETE FROM collections
531 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
532 params![name],
533 )?;
534 drop(conn);
535
536 self.unload_space(name)?;
537 self.remove_space_artifacts(name)?;
538 self.open_space(name)?;
539 return Ok(());
540 }
541
542 let conn = self
543 .db
544 .lock()
545 .map_err(|_| CoreError::poisoned("database"))?;
546 let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
547 drop(conn);
548
549 if deleted == 0 {
550 return Err(KboltError::SpaceNotFound {
551 name: name.to_string(),
552 }
553 .into());
554 }
555
556 self.unload_space(name)?;
557 self.remove_space_artifacts(name)?;
558
559 Ok(())
560 }
561
562 pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
563 if old == DEFAULT_SPACE_NAME {
564 return Err(
565 KboltError::Config("cannot rename reserved space: default".to_string()).into(),
566 );
567 }
568
569 let conn = self
570 .db
571 .lock()
572 .map_err(|_| CoreError::poisoned("database"))?;
573
574 let result = conn.execute(
575 "UPDATE spaces SET name = ?1 WHERE name = ?2",
576 params![new, old],
577 );
578 drop(conn);
579
580 match result {
581 Ok(0) => Err(KboltError::SpaceNotFound {
582 name: old.to_string(),
583 }
584 .into()),
585 Ok(_) => {
586 if let Err(rename_err) = self.rename_space_artifacts(old, new) {
587 let rollback = self
588 .db
589 .lock()
590 .map_err(|_| CoreError::poisoned("database"))?
591 .execute(
592 "UPDATE spaces SET name = ?1 WHERE name = ?2",
593 params![old, new],
594 );
595
596 if let Err(rollback_err) = rollback {
597 return Err(CoreError::Internal(format!(
598 "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
599 )));
600 }
601
602 return Err(rename_err);
603 }
604
605 self.unload_space(old)?;
606 if let Err(open_err) = self.open_space(new) {
607 let _ = self.rename_space_artifacts(new, old);
608 let _ = self
609 .db
610 .lock()
611 .map_err(|_| CoreError::poisoned("database"))?
612 .execute(
613 "UPDATE spaces SET name = ?1 WHERE name = ?2",
614 params![old, new],
615 );
616 let _ = self.open_space(old);
617 return Err(open_err);
618 }
619
620 Ok(())
621 }
622 Err(Error::SqliteFailure(sqlite_err, _))
623 if sqlite_err.code == ErrorCode::ConstraintViolation =>
624 {
625 Err(KboltError::SpaceAlreadyExists {
626 name: new.to_string(),
627 }
628 .into())
629 }
630 Err(err) => Err(err.into()),
631 }
632 }
633
634 pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
635 let conn = self
636 .db
637 .lock()
638 .map_err(|_| CoreError::poisoned("database"))?;
639
640 let updated = conn.execute(
641 "UPDATE spaces SET description = ?1 WHERE name = ?2",
642 params![description, name],
643 )?;
644
645 if updated == 0 {
646 return Err(KboltError::SpaceNotFound {
647 name: name.to_string(),
648 }
649 .into());
650 }
651
652 Ok(())
653 }
654
655 pub fn create_collection(
656 &self,
657 space_id: i64,
658 name: &str,
659 path: &Path,
660 description: Option<&str>,
661 extensions: Option<&[String]>,
662 ) -> Result<i64> {
663 let conn = self
664 .db
665 .lock()
666 .map_err(|_| CoreError::poisoned("database"))?;
667
668 let space_name = lookup_space_name(&conn, space_id)?;
669 let extensions_json = serialize_extensions(extensions)?;
670 let result = conn.execute(
671 "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
672 VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
673 params![space_id, name, path.to_string_lossy(), description, extensions_json],
674 );
675
676 match result {
677 Ok(_) => Ok(conn.last_insert_rowid()),
678 Err(Error::SqliteFailure(sqlite_err, _))
679 if sqlite_err.code == ErrorCode::ConstraintViolation =>
680 {
681 Err(KboltError::CollectionAlreadyExists {
682 name: name.to_string(),
683 space: space_name,
684 }
685 .into())
686 }
687 Err(err) => Err(err.into()),
688 }
689 }
690
691 pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
692 let conn = self
693 .db
694 .lock()
695 .map_err(|_| CoreError::poisoned("database"))?;
696 let mut stmt = conn.prepare(
697 "SELECT id, space_id, name, path, description, extensions, created, updated
698 FROM collections
699 WHERE space_id = ?1 AND name = ?2",
700 )?;
701
702 let result = stmt.query_row(params![space_id, name], decode_collection_row);
703 match result {
704 Ok(row) => Ok(row),
705 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
706 name: name.to_string(),
707 }
708 .into()),
709 Err(err) => Err(err.into()),
710 }
711 }
712
713 pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
714 let conn = self
715 .db
716 .lock()
717 .map_err(|_| CoreError::poisoned("database"))?;
718 let mut stmt = conn.prepare(
719 "SELECT id, space_id, name, path, description, extensions, created, updated
720 FROM collections
721 WHERE id = ?1",
722 )?;
723
724 let result = stmt.query_row(params![id], decode_collection_row);
725 match result {
726 Ok(row) => Ok(row),
727 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
728 name: format!("id={id}"),
729 }
730 .into()),
731 Err(err) => Err(err.into()),
732 }
733 }
734
735 pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
736 let conn = self
737 .db
738 .lock()
739 .map_err(|_| CoreError::poisoned("database"))?;
740
741 let (sql, params): (&str, Vec<i64>) = match space_id {
742 Some(id) => (
743 "SELECT id, space_id, name, path, description, extensions, created, updated
744 FROM collections
745 WHERE space_id = ?1
746 ORDER BY name ASC",
747 vec![id],
748 ),
749 None => (
750 "SELECT id, space_id, name, path, description, extensions, created, updated
751 FROM collections
752 ORDER BY space_id ASC, name ASC",
753 Vec::new(),
754 ),
755 };
756
757 let mut stmt = conn.prepare(sql)?;
758 let rows = if params.is_empty() {
759 stmt.query_map([], decode_collection_row)?
760 } else {
761 stmt.query_map(params![params[0]], decode_collection_row)?
762 };
763 let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
764 Ok(collections)
765 }
766
767 pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
768 let conn = self
769 .db
770 .lock()
771 .map_err(|_| CoreError::poisoned("database"))?;
772 let _space_name = lookup_space_name(&conn, space_id)?;
773
774 query_count(
775 &conn,
776 "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
777 params![space_id],
778 )
779 }
780
781 pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
782 let conn = self
783 .db
784 .lock()
785 .map_err(|_| CoreError::poisoned("database"))?;
786 let _space_name = lookup_space_name(&conn, space_id)?;
787
788 let deleted = conn.execute(
789 "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
790 params![space_id, name],
791 )?;
792
793 if deleted == 0 {
794 return Err(KboltError::CollectionNotFound {
795 name: name.to_string(),
796 }
797 .into());
798 }
799
800 Ok(())
801 }
802
803 pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
804 let conn = self
805 .db
806 .lock()
807 .map_err(|_| CoreError::poisoned("database"))?;
808 let space_name = lookup_space_name(&conn, space_id)?;
809 let result = conn.execute(
810 "UPDATE collections
811 SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
812 WHERE space_id = ?2 AND name = ?3",
813 params![new, space_id, old],
814 );
815
816 match result {
817 Ok(0) => Err(KboltError::CollectionNotFound {
818 name: old.to_string(),
819 }
820 .into()),
821 Ok(_) => Ok(()),
822 Err(Error::SqliteFailure(sqlite_err, _))
823 if sqlite_err.code == ErrorCode::ConstraintViolation =>
824 {
825 Err(KboltError::CollectionAlreadyExists {
826 name: new.to_string(),
827 space: space_name,
828 }
829 .into())
830 }
831 Err(err) => Err(err.into()),
832 }
833 }
834
835 pub fn update_collection_description(
836 &self,
837 space_id: i64,
838 name: &str,
839 desc: &str,
840 ) -> Result<()> {
841 let conn = self
842 .db
843 .lock()
844 .map_err(|_| CoreError::poisoned("database"))?;
845 let _space_name = lookup_space_name(&conn, space_id)?;
846
847 let updated = conn.execute(
848 "UPDATE collections
849 SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
850 WHERE space_id = ?2 AND name = ?3",
851 params![desc, space_id, name],
852 )?;
853
854 if updated == 0 {
855 return Err(KboltError::CollectionNotFound {
856 name: name.to_string(),
857 }
858 .into());
859 }
860
861 Ok(())
862 }
863
864 pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
865 let conn = self
866 .db
867 .lock()
868 .map_err(|_| CoreError::poisoned("database"))?;
869
870 let updated = conn.execute(
871 "UPDATE collections
872 SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
873 WHERE id = ?1",
874 params![collection_id],
875 )?;
876
877 if updated == 0 {
878 return Err(KboltError::CollectionNotFound {
879 name: format!("id={collection_id}"),
880 }
881 .into());
882 }
883
884 Ok(())
885 }
886
887 pub fn upsert_document(
888 &self,
889 collection_id: i64,
890 path: &str,
891 title: Option<&str>,
892 hash: &str,
893 modified: &str,
894 ) -> Result<i64> {
895 let conn = self
896 .db
897 .lock()
898 .map_err(|_| CoreError::poisoned("database"))?;
899 let _collection_name = lookup_collection_name(&conn, collection_id)?;
900
901 let id = conn.query_row(
902 "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
903 VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
904 ON CONFLICT(collection_id, path) DO UPDATE SET
905 title = excluded.title,
906 hash = excluded.hash,
907 modified = excluded.modified,
908 active = 1,
909 deactivated_at = NULL,
910 fts_dirty = 1
911 RETURNING id",
912 params![
913 collection_id,
914 path,
915 normalized_title(title).unwrap_or(""),
916 hash,
917 modified
918 ],
919 |row| row.get(0),
920 )?;
921 Ok(id)
922 }
923
924 pub fn get_document_by_path(
925 &self,
926 collection_id: i64,
927 path: &str,
928 ) -> Result<Option<DocumentRow>> {
929 let conn = self
930 .db
931 .lock()
932 .map_err(|_| CoreError::poisoned("database"))?;
933 let _collection_name = lookup_collection_name(&conn, collection_id)?;
934 let mut stmt = conn.prepare(
935 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
936 FROM documents
937 WHERE collection_id = ?1 AND path = ?2",
938 )?;
939
940 let result = stmt.query_row(params![collection_id, path], decode_document_row);
941 match result {
942 Ok(row) => Ok(Some(row)),
943 Err(Error::QueryReturnedNoRows) => Ok(None),
944 Err(err) => Err(err.into()),
945 }
946 }
947
948 pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
949 let conn = self
950 .db
951 .lock()
952 .map_err(|_| CoreError::poisoned("database"))?;
953 let mut stmt = conn.prepare(
954 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
955 FROM documents
956 WHERE id = ?1",
957 )?;
958
959 let result = stmt.query_row(params![id], decode_document_row);
960 match result {
961 Ok(row) => Ok(row),
962 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
963 path: format!("id={id}"),
964 }
965 .into()),
966 Err(err) => Err(err.into()),
967 }
968 }
969
970 pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
971 if ids.is_empty() {
972 return Ok(Vec::new());
973 }
974
975 let conn = self
976 .db
977 .lock()
978 .map_err(|_| CoreError::poisoned("database"))?;
979
980 let placeholders = vec!["?"; ids.len()].join(", ");
981 let sql = format!(
982 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
983 FROM documents
984 WHERE id IN ({placeholders})"
985 );
986 let mut stmt = conn.prepare(&sql)?;
987 let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
988 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
989 Ok(docs)
990 }
991
992 pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
993 let conn = self
994 .db
995 .lock()
996 .map_err(|_| CoreError::poisoned("database"))?;
997
998 let updated = conn.execute(
999 "UPDATE documents
1000 SET modified = ?1,
1001 active = 1,
1002 deactivated_at = NULL
1003 WHERE id = ?2",
1004 params![modified, doc_id],
1005 )?;
1006
1007 if updated == 0 {
1008 return Err(KboltError::DocumentNotFound {
1009 path: format!("id={doc_id}"),
1010 }
1011 .into());
1012 }
1013
1014 Ok(())
1015 }
1016
1017 pub fn list_documents(
1018 &self,
1019 collection_id: i64,
1020 active_only: bool,
1021 ) -> Result<Vec<DocumentRow>> {
1022 let conn = self
1023 .db
1024 .lock()
1025 .map_err(|_| CoreError::poisoned("database"))?;
1026 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1027 let active_only = i64::from(active_only);
1028 let mut stmt = conn.prepare(
1029 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1030 FROM documents
1031 WHERE collection_id = ?1
1032 AND (?2 = 0 OR active = 1)
1033 ORDER BY path ASC",
1034 )?;
1035 let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1036 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1037 Ok(docs)
1038 }
1039
1040 pub fn list_collection_file_rows(
1041 &self,
1042 collection_id: i64,
1043 active_only: bool,
1044 ) -> Result<Vec<FileListRow>> {
1045 let conn = self
1046 .db
1047 .lock()
1048 .map_err(|_| CoreError::poisoned("database"))?;
1049 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1050 let active_only = i64::from(active_only);
1051 let mut stmt = conn.prepare(
1052 "SELECT d.id, d.path, d.title, d.hash, d.active,
1053 COUNT(DISTINCT c.id) AS chunk_count,
1054 COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1055 FROM documents d
1056 LEFT JOIN chunks c ON c.doc_id = d.id
1057 LEFT JOIN embeddings e ON e.chunk_id = c.id
1058 WHERE d.collection_id = ?1
1059 AND (?2 = 0 OR d.active = 1)
1060 GROUP BY d.id, d.path, d.title, d.hash, d.active
1061 ORDER BY d.path ASC",
1062 )?;
1063 let rows = stmt.query_map(params![collection_id, active_only], |row| {
1064 let chunk_count: i64 = row.get(5)?;
1065 let embedded_chunk_count: i64 = row.get(6)?;
1066 Ok(FileListRow {
1067 doc_id: row.get(0)?,
1068 path: row.get(1)?,
1069 title: decode_optional_title(row.get::<_, String>(2)?),
1070 hash: row.get(3)?,
1071 active: row.get::<_, i64>(4)? != 0,
1072 chunk_count: chunk_count as usize,
1073 embedded_chunk_count: embedded_chunk_count as usize,
1074 })
1075 })?;
1076 let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1077 Ok(files)
1078 }
1079
1080 pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1081 let conn = self
1082 .db
1083 .lock()
1084 .map_err(|_| CoreError::poisoned("database"))?;
1085
1086 let pattern = format!("{prefix}%");
1087 let mut stmt = conn.prepare(
1088 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1089 FROM documents
1090 WHERE hash LIKE ?1
1091 ORDER BY id ASC",
1092 )?;
1093 let rows = stmt.query_map(params![pattern], decode_document_row)?;
1094 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1095 Ok(docs)
1096 }
1097
1098 pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1099 let conn = self
1100 .db
1101 .lock()
1102 .map_err(|_| CoreError::poisoned("database"))?;
1103
1104 let updated = conn.execute(
1105 "UPDATE documents
1106 SET active = 0,
1107 deactivated_at = CASE
1108 WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1109 ELSE deactivated_at
1110 END
1111 WHERE id = ?1",
1112 params![doc_id],
1113 )?;
1114
1115 if updated == 0 {
1116 return Err(KboltError::DocumentNotFound {
1117 path: format!("id={doc_id}"),
1118 }
1119 .into());
1120 }
1121
1122 Ok(())
1123 }
1124
1125 pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1126 let conn = self
1127 .db
1128 .lock()
1129 .map_err(|_| CoreError::poisoned("database"))?;
1130
1131 let updated = conn.execute(
1132 "UPDATE documents
1133 SET active = 1, deactivated_at = NULL
1134 WHERE id = ?1",
1135 params![doc_id],
1136 )?;
1137
1138 if updated == 0 {
1139 return Err(KboltError::DocumentNotFound {
1140 path: format!("id={doc_id}"),
1141 }
1142 .into());
1143 }
1144
1145 Ok(())
1146 }
1147
1148 pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1149 let reaped = self.list_reapable_documents(older_than_days)?;
1150 let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1151 self.delete_documents(&doc_ids)?;
1152 Ok(doc_ids)
1153 }
1154
1155 pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1156 self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1157 }
1158
1159 pub fn list_reapable_documents_in_space(
1160 &self,
1161 older_than_days: u32,
1162 space_id: i64,
1163 ) -> Result<Vec<ReapableDocument>> {
1164 self.list_reapable_documents_filtered(
1165 older_than_days,
1166 " AND c.space_id = ?",
1167 vec![SqlValue::Integer(space_id)],
1168 )
1169 }
1170
1171 pub fn list_reapable_documents_in_collections(
1172 &self,
1173 older_than_days: u32,
1174 collection_ids: &[i64],
1175 ) -> Result<Vec<ReapableDocument>> {
1176 if collection_ids.is_empty() {
1177 return Ok(Vec::new());
1178 }
1179
1180 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1181 let clause = format!(" AND d.collection_id IN ({placeholders})");
1182 let params = collection_ids
1183 .iter()
1184 .map(|id| SqlValue::Integer(*id))
1185 .collect::<Vec<_>>();
1186 self.list_reapable_documents_filtered(older_than_days, &clause, params)
1187 }
1188
1189 fn list_reapable_documents_filtered(
1190 &self,
1191 older_than_days: u32,
1192 scope_clause: &str,
1193 scope_params: Vec<SqlValue>,
1194 ) -> Result<Vec<ReapableDocument>> {
1195 let conn = self
1196 .db
1197 .lock()
1198 .map_err(|_| CoreError::poisoned("database"))?;
1199
1200 let modifier = format!("-{} days", older_than_days);
1201 let sql = format!(
1202 "SELECT d.id, s.name
1203 FROM documents d
1204 JOIN collections c ON c.id = d.collection_id
1205 JOIN spaces s ON s.id = c.space_id
1206 WHERE d.active = 0
1207 AND d.deactivated_at IS NOT NULL
1208 AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1209 ORDER BY d.id ASC"
1210 );
1211 let mut stmt = conn.prepare(&sql)?;
1212 let mut params = Vec::with_capacity(scope_params.len() + 1);
1213 params.push(SqlValue::Text(modifier));
1214 params.extend(scope_params);
1215 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1216 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1217 })?;
1218 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1219 drop(stmt);
1220
1221 let mut documents = Vec::with_capacity(headers.len());
1222 for (doc_id, space_name) in headers {
1223 documents.push(ReapableDocument {
1224 doc_id,
1225 space_name,
1226 chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1227 });
1228 }
1229
1230 Ok(documents)
1231 }
1232
1233 pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1234 if doc_ids.is_empty() {
1235 return Ok(());
1236 }
1237
1238 let conn = self
1239 .db
1240 .lock()
1241 .map_err(|_| CoreError::poisoned("database"))?;
1242
1243 let placeholders = vec!["?"; doc_ids.len()].join(", ");
1244 let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1245 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1246 Ok(())
1247 }
1248
1249 pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1250 if chunks.is_empty() {
1251 return Ok(Vec::new());
1252 }
1253
1254 let conn = self
1255 .db
1256 .lock()
1257 .map_err(|_| CoreError::poisoned("database"))?;
1258 let _doc = lookup_document_id(&conn, doc_id)?;
1259
1260 let tx = conn.unchecked_transaction()?;
1261 let mut stmt = tx.prepare(
1262 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1263 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1264 )?;
1265
1266 let mut ids = Vec::with_capacity(chunks.len());
1267 for chunk in chunks {
1268 stmt.execute(params![
1269 doc_id,
1270 chunk.seq,
1271 chunk.offset as i64,
1272 chunk.length as i64,
1273 chunk.heading,
1274 chunk.kind.as_storage_kind(),
1275 chunk.retrieval_prefix.as_deref(),
1276 ])?;
1277 ids.push(tx.last_insert_rowid());
1278 }
1279 drop(stmt);
1280 tx.commit()?;
1281 Ok(ids)
1282 }
1283
1284 pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1285 let conn = self
1286 .db
1287 .lock()
1288 .map_err(|_| CoreError::poisoned("database"))?;
1289 let _doc = lookup_document_id(&conn, doc_id)?;
1290
1291 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1292 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1293 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1294
1295 conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1296 Ok(chunk_ids)
1297 }
1298
1299 pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1300 let conn = self
1301 .db
1302 .lock()
1303 .map_err(|_| CoreError::poisoned("database"))?;
1304 let _doc = lookup_document_id(&conn, doc_id)?;
1305
1306 let mut stmt = conn.prepare(
1307 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1308 FROM chunks
1309 WHERE doc_id = ?1
1310 ORDER BY seq ASC",
1311 )?;
1312 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1313 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1314 Ok(chunks)
1315 }
1316
1317 pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1318 if doc_ids.is_empty() {
1319 return Ok(HashMap::new());
1320 }
1321
1322 let mut requested = doc_ids.to_vec();
1323 requested.sort_unstable();
1324 requested.dedup();
1325
1326 let conn = self
1327 .db
1328 .lock()
1329 .map_err(|_| CoreError::poisoned("database"))?;
1330
1331 let placeholders = vec!["?"; requested.len()].join(", ");
1332 let sql = format!(
1333 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1334 FROM chunks
1335 WHERE doc_id IN ({placeholders})
1336 ORDER BY doc_id ASC, seq ASC"
1337 );
1338 let mut stmt = conn.prepare(&sql)?;
1339 let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1340 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1341 for doc_id in requested {
1342 chunks_by_doc.insert(doc_id, Vec::new());
1343 }
1344 for row in rows {
1345 let chunk = row?;
1346 chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1347 }
1348
1349 Ok(chunks_by_doc)
1350 }
1351
1352 pub fn get_chunks_for_document_seq_ranges(
1353 &self,
1354 ranges: &[(i64, i32, i32)],
1355 ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1356 if ranges.is_empty() {
1357 return Ok(HashMap::new());
1358 }
1359
1360 let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1361 for (doc_id, min_seq, max_seq) in ranges {
1362 if min_seq > max_seq {
1363 return Err(CoreError::Internal(format!(
1364 "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1365 )));
1366 }
1367
1368 ranges_by_doc
1369 .entry(*doc_id)
1370 .or_default()
1371 .push((*min_seq, *max_seq));
1372 }
1373
1374 for doc_ranges in ranges_by_doc.values_mut() {
1375 doc_ranges.sort_unstable();
1376 let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1377 for (min_seq, max_seq) in doc_ranges.drain(..) {
1378 if let Some((_, merged_max)) = merged.last_mut() {
1379 if min_seq <= merged_max.saturating_add(1) {
1380 *merged_max = (*merged_max).max(max_seq);
1381 continue;
1382 }
1383 }
1384 merged.push((min_seq, max_seq));
1385 }
1386 *doc_ranges = merged;
1387 }
1388
1389 let conn = self
1390 .db
1391 .lock()
1392 .map_err(|_| CoreError::poisoned("database"))?;
1393 let mut stmt = conn.prepare(
1394 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1395 FROM chunks
1396 WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1397 ORDER BY seq ASC",
1398 )?;
1399
1400 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1401 for (doc_id, doc_ranges) in ranges_by_doc {
1402 chunks_by_doc.entry(doc_id).or_default();
1403 for (min_seq, max_seq) in doc_ranges {
1404 let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1405 for row in rows {
1406 chunks_by_doc.entry(doc_id).or_default().push(row?);
1407 }
1408 }
1409 }
1410
1411 Ok(chunks_by_doc)
1412 }
1413
1414 pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1415 if chunk_ids.is_empty() {
1416 return Ok(Vec::new());
1417 }
1418
1419 let conn = self
1420 .db
1421 .lock()
1422 .map_err(|_| CoreError::poisoned("database"))?;
1423
1424 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1425 let sql = format!(
1426 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1427 FROM chunks
1428 WHERE id IN ({placeholders})
1429 ORDER BY id ASC"
1430 );
1431 let mut stmt = conn.prepare(&sql)?;
1432 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1433 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1434 Ok(chunks)
1435 }
1436
1437 pub fn get_chunks_with_documents(
1438 &self,
1439 chunk_ids: &[i64],
1440 ) -> Result<Vec<(ChunkRow, DocumentRow)>> {
1441 if chunk_ids.is_empty() {
1442 return Ok(Vec::new());
1443 }
1444
1445 let conn = self
1446 .db
1447 .lock()
1448 .map_err(|_| CoreError::poisoned("database"))?;
1449
1450 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1451 let sql = format!(
1452 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
1453 d.id, d.collection_id, d.path, d.title, d.hash, d.modified, d.active, d.deactivated_at, d.fts_dirty
1454 FROM chunks c
1455 JOIN documents d ON d.id = c.doc_id
1456 WHERE c.id IN ({placeholders})
1457 ORDER BY c.id ASC"
1458 );
1459 let mut stmt = conn.prepare(&sql)?;
1460 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), |row| {
1461 Ok((
1462 decode_chunk_row_at(row, 0)?,
1463 decode_document_row_at(row, 8)?,
1464 ))
1465 })?;
1466 let rows = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1467 Ok(rows)
1468 }
1469
1470 pub fn get_active_search_scope_summary_in_collections(
1471 &self,
1472 collection_ids: &[i64],
1473 ) -> Result<SearchScopeSummary> {
1474 if collection_ids.is_empty() {
1475 return Ok(SearchScopeSummary {
1476 document_ids: Vec::new(),
1477 chunk_count: 0,
1478 });
1479 }
1480
1481 let conn = self
1482 .db
1483 .lock()
1484 .map_err(|_| CoreError::poisoned("database"))?;
1485
1486 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1487 let sql = format!(
1488 "SELECT d.id, COUNT(c.id)
1489 FROM chunks c
1490 JOIN documents d ON d.id = c.doc_id
1491 WHERE d.active = 1
1492 AND d.collection_id IN ({placeholders})
1493 GROUP BY d.id
1494 ORDER BY d.id ASC"
1495 );
1496 let mut stmt = conn.prepare(&sql)?;
1497 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1498 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1499 })?;
1500
1501 let mut document_ids = Vec::new();
1502 let mut chunk_count = 0usize;
1503 for row in rows {
1504 let (doc_id, doc_chunk_count) = row?;
1505 let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1506 CoreError::Internal(format!(
1507 "active chunk count must be non-negative for document {doc_id}"
1508 ))
1509 })?;
1510 document_ids.push(doc_id);
1511 chunk_count = chunk_count.saturating_add(doc_chunk_count);
1512 }
1513
1514 Ok(SearchScopeSummary {
1515 document_ids,
1516 chunk_count,
1517 })
1518 }
1519
1520 pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1521 if collection_ids.is_empty() {
1522 return Ok(0);
1523 }
1524
1525 let conn = self
1526 .db
1527 .lock()
1528 .map_err(|_| CoreError::poisoned("database"))?;
1529
1530 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1531 let sql = format!(
1532 "SELECT COUNT(*)
1533 FROM chunks c
1534 JOIN documents d ON d.id = c.doc_id
1535 WHERE d.active = 1
1536 AND d.collection_id IN ({placeholders})"
1537 );
1538 query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1539 }
1540
1541 pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1542 if collection_ids.is_empty() {
1543 return Ok(false);
1544 }
1545
1546 let conn = self
1547 .db
1548 .lock()
1549 .map_err(|_| CoreError::poisoned("database"))?;
1550
1551 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1552 let sql = format!(
1553 "SELECT EXISTS(
1554 SELECT 1
1555 FROM documents
1556 WHERE active = 0
1557 AND collection_id IN ({placeholders})
1558 )"
1559 );
1560 let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1561 row.get(0)
1562 })?;
1563 Ok(exists != 0)
1564 }
1565
1566 pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1567 if collection_ids.is_empty() {
1568 return Ok(Vec::new());
1569 }
1570
1571 let conn = self
1572 .db
1573 .lock()
1574 .map_err(|_| CoreError::poisoned("database"))?;
1575
1576 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1577 let sql = format!(
1578 "SELECT c.id
1579 FROM chunks c
1580 JOIN documents d ON d.id = c.doc_id
1581 WHERE d.active = 1
1582 AND d.collection_id IN ({placeholders})
1583 ORDER BY d.id ASC, c.seq ASC"
1584 );
1585 let mut stmt = conn.prepare(&sql)?;
1586 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1587 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1588 Ok(chunk_ids)
1589 }
1590
1591 pub fn replace_document_generation(
1592 &self,
1593 replacement: DocumentGenerationReplace<'_>,
1594 ) -> Result<DocumentGenerationReplaceResult> {
1595 for chunk in replacement.chunks {
1596 validate_text_span(
1597 replacement.text,
1598 chunk.offset,
1599 chunk.length,
1600 &format!("chunk seq {}", chunk.seq),
1601 )?;
1602 }
1603
1604 let conn = self
1605 .db
1606 .lock()
1607 .map_err(|_| CoreError::poisoned("database"))?;
1608 let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1609
1610 let tx = conn.unchecked_transaction()?;
1611 let doc_id: i64 = tx.query_row(
1612 "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
1613 VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
1614 ON CONFLICT(collection_id, path) DO UPDATE SET
1615 title = excluded.title,
1616 hash = excluded.hash,
1617 modified = excluded.modified,
1618 active = 1,
1619 deactivated_at = NULL,
1620 fts_dirty = 1
1621 RETURNING id",
1622 params![
1623 replacement.collection_id,
1624 replacement.path,
1625 normalized_title(replacement.title).unwrap_or(""),
1626 replacement.hash,
1627 replacement.modified
1628 ],
1629 |row| row.get(0),
1630 )?;
1631
1632 let old_chunk_ids = {
1633 let mut stmt =
1634 tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1635 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1636 rows.collect::<std::result::Result<Vec<_>, _>>()?
1637 };
1638
1639 tx.execute(
1640 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1641 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1642 ON CONFLICT(doc_id) DO UPDATE SET
1643 extractor_key = excluded.extractor_key,
1644 source_hash = excluded.source_hash,
1645 text_hash = excluded.text_hash,
1646 generation_key = excluded.generation_key,
1647 text = excluded.text,
1648 created = excluded.created",
1649 params![
1650 doc_id,
1651 replacement.extractor_key,
1652 replacement.source_hash,
1653 replacement.text_hash,
1654 replacement.generation_key,
1655 replacement.text
1656 ],
1657 )?;
1658
1659 tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1660
1661 let mut stmt = tx.prepare(
1662 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1663 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1664 )?;
1665 let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1666 for chunk in replacement.chunks {
1667 stmt.execute(params![
1668 doc_id,
1669 chunk.seq,
1670 chunk.offset as i64,
1671 chunk.length as i64,
1672 chunk.heading,
1673 chunk.kind.as_storage_kind(),
1674 chunk.retrieval_prefix.as_deref(),
1675 ])?;
1676 chunk_ids.push(tx.last_insert_rowid());
1677 }
1678 drop(stmt);
1679
1680 tx.commit()?;
1681 Ok(DocumentGenerationReplaceResult {
1682 doc_id,
1683 old_chunk_ids,
1684 chunk_ids,
1685 })
1686 }
1687
1688 pub fn put_document_text(
1689 &self,
1690 doc_id: i64,
1691 extractor_key: &str,
1692 source_hash: &str,
1693 text_hash: &str,
1694 generation_key: &str,
1695 text: &str,
1696 ) -> Result<()> {
1697 let conn = self
1698 .db
1699 .lock()
1700 .map_err(|_| CoreError::poisoned("database"))?;
1701 let _doc = lookup_document_id(&conn, doc_id)?;
1702
1703 conn.execute(
1704 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1705 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1706 ON CONFLICT(doc_id) DO UPDATE SET
1707 extractor_key = excluded.extractor_key,
1708 source_hash = excluded.source_hash,
1709 text_hash = excluded.text_hash,
1710 generation_key = excluded.generation_key,
1711 text = excluded.text,
1712 created = excluded.created",
1713 params![
1714 doc_id,
1715 extractor_key,
1716 source_hash,
1717 text_hash,
1718 generation_key,
1719 text
1720 ],
1721 )?;
1722 Ok(())
1723 }
1724
1725 pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1726 let conn = self
1727 .db
1728 .lock()
1729 .map_err(|_| CoreError::poisoned("database"))?;
1730 let _doc = lookup_document_id(&conn, doc_id)?;
1731
1732 let result = conn.query_row(
1733 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1734 FROM document_texts
1735 WHERE doc_id = ?1",
1736 params![doc_id],
1737 decode_document_text_row,
1738 );
1739 match result {
1740 Ok(row) => Ok(row),
1741 Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1742 Err(err) => Err(err.into()),
1743 }
1744 }
1745
1746 pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1747 if doc_ids.is_empty() {
1748 return Ok(HashMap::new());
1749 }
1750
1751 let mut requested = doc_ids.to_vec();
1752 requested.sort_unstable();
1753 requested.dedup();
1754
1755 let text_by_doc = self.get_existing_document_texts(&requested)?;
1756 for doc_id in requested {
1757 if !text_by_doc.contains_key(&doc_id) {
1758 return Err(missing_document_text_error(doc_id));
1759 }
1760 }
1761
1762 Ok(text_by_doc)
1763 }
1764
1765 pub fn get_existing_document_texts(
1766 &self,
1767 doc_ids: &[i64],
1768 ) -> Result<HashMap<i64, DocumentTextRow>> {
1769 if doc_ids.is_empty() {
1770 return Ok(HashMap::new());
1771 }
1772
1773 let mut requested = doc_ids.to_vec();
1774 requested.sort_unstable();
1775 requested.dedup();
1776
1777 let conn = self
1778 .db
1779 .lock()
1780 .map_err(|_| CoreError::poisoned("database"))?;
1781 let mut text_by_doc = HashMap::new();
1782 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1783 let placeholders = vec!["?"; batch.len()].join(", ");
1784 let sql = format!(
1785 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1786 FROM document_texts
1787 WHERE doc_id IN ({placeholders})
1788 ORDER BY doc_id ASC"
1789 );
1790 let mut stmt = conn.prepare(&sql)?;
1791 let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1792 for row in rows {
1793 let row = row?;
1794 text_by_doc.insert(row.doc_id, row);
1795 }
1796 }
1797
1798 Ok(text_by_doc)
1799 }
1800
1801 pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1802 let conn = self
1803 .db
1804 .lock()
1805 .map_err(|_| CoreError::poisoned("database"))?;
1806 let exists: i64 = conn.query_row(
1807 "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1808 params![doc_id],
1809 |row| row.get(0),
1810 )?;
1811 Ok(exists != 0)
1812 }
1813
1814 pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1815 let conn = self
1816 .db
1817 .lock()
1818 .map_err(|_| CoreError::poisoned("database"))?;
1819 let exists: i64 = conn.query_row(
1820 "SELECT EXISTS(
1821 SELECT 1 FROM document_texts
1822 WHERE doc_id = ?1 AND generation_key = ?2
1823 )",
1824 params![doc_id, generation_key],
1825 |row| row.get(0),
1826 )?;
1827 Ok(exists != 0)
1828 }
1829
1830 pub fn get_document_text_generation_keys(
1831 &self,
1832 doc_ids: &[i64],
1833 ) -> Result<HashMap<i64, String>> {
1834 if doc_ids.is_empty() {
1835 return Ok(HashMap::new());
1836 }
1837
1838 let mut requested = doc_ids.to_vec();
1839 requested.sort_unstable();
1840 requested.dedup();
1841
1842 let conn = self
1843 .db
1844 .lock()
1845 .map_err(|_| CoreError::poisoned("database"))?;
1846 let mut generation_by_doc = HashMap::with_capacity(requested.len());
1847 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1848 let placeholders = vec!["?"; batch.len()].join(", ");
1849 let sql = format!(
1850 "SELECT doc_id, generation_key
1851 FROM document_texts
1852 WHERE doc_id IN ({placeholders})"
1853 );
1854 let mut stmt = conn.prepare(&sql)?;
1855 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1856 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1857 })?;
1858 for row in rows {
1859 let (doc_id, generation_key) = row?;
1860 generation_by_doc.insert(doc_id, generation_key);
1861 }
1862 }
1863
1864 Ok(generation_by_doc)
1865 }
1866
1867 pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1868 if doc_ids.is_empty() {
1869 return Ok(HashMap::new());
1870 }
1871
1872 let mut requested = doc_ids.to_vec();
1873 requested.sort_unstable();
1874 requested.dedup();
1875
1876 let conn = self
1877 .db
1878 .lock()
1879 .map_err(|_| CoreError::poisoned("database"))?;
1880 let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1881 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1882 let placeholders = vec!["?"; batch.len()].join(", ");
1883 let sql = format!(
1884 "SELECT doc_id, extractor_key
1885 FROM document_texts
1886 WHERE doc_id IN ({placeholders})"
1887 );
1888 let mut stmt = conn.prepare(&sql)?;
1889 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1890 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1891 })?;
1892 for row in rows {
1893 let (doc_id, extractor_key) = row?;
1894 extractor_by_doc.insert(doc_id, extractor_key);
1895 }
1896 }
1897
1898 for doc_id in requested {
1899 if !extractor_by_doc.contains_key(&doc_id) {
1900 return Err(missing_document_text_error(doc_id));
1901 }
1902 }
1903
1904 Ok(extractor_by_doc)
1905 }
1906
1907 pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1908 if chunk_ids.is_empty() {
1909 return Ok(HashMap::new());
1910 }
1911
1912 let mut requested = chunk_ids.to_vec();
1913 requested.sort_unstable();
1914 requested.dedup();
1915
1916 let conn = self
1917 .db
1918 .lock()
1919 .map_err(|_| CoreError::poisoned("database"))?;
1920 let mut text_by_chunk = HashMap::with_capacity(requested.len());
1921 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1922 let placeholders = vec!["?"; batch.len()].join(", ");
1923 let sql = format!(
1924 "SELECT c.id,
1925 c.offset,
1926 c.length,
1927 length(CAST(dt.text AS BLOB)),
1928 substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1929 substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1930 substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1931 FROM chunks c
1932 JOIN document_texts dt ON dt.doc_id = c.doc_id
1933 WHERE c.id IN ({placeholders})"
1934 );
1935 let mut stmt = conn.prepare(&sql)?;
1936 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1937 Ok((
1938 row.get::<_, i64>(0)?,
1939 row.get::<_, i64>(1)?,
1940 row.get::<_, i64>(2)?,
1941 row.get::<_, i64>(3)?,
1942 row.get::<_, Vec<u8>>(4)?,
1943 row.get::<_, Vec<u8>>(5)?,
1944 row.get::<_, Vec<u8>>(6)?,
1945 ))
1946 })?;
1947 for row in rows {
1948 let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1949 let text = canonical_chunk_slice_from_bytes(
1950 chunk_id,
1951 offset,
1952 length,
1953 document_len,
1954 start_byte,
1955 end_byte,
1956 bytes,
1957 )?;
1958 text_by_chunk.insert(chunk_id, text);
1959 }
1960 }
1961
1962 for chunk_id in requested {
1963 if !text_by_chunk.contains_key(&chunk_id) {
1964 return Err(CoreError::Internal(format!(
1965 "canonical text missing for chunk {chunk_id}"
1966 )));
1967 }
1968 }
1969
1970 Ok(text_by_chunk)
1971 }
1972
1973 pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
1974 let conn = self
1975 .db
1976 .lock()
1977 .map_err(|_| CoreError::poisoned("database"))?;
1978 let result = conn.query_row(
1979 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
1980 FROM chunks c
1981 JOIN document_texts dt ON dt.doc_id = c.doc_id
1982 WHERE c.id = ?1",
1983 params![chunk_id],
1984 |row| {
1985 let chunk = decode_chunk_row(row)?;
1986 let extractor_key: String = row.get(8)?;
1987 let document_text: String = row.get(9)?;
1988 Ok((chunk, extractor_key, document_text))
1989 },
1990 );
1991 let (chunk, extractor_key, document_text) = match result {
1992 Ok(row) => row,
1993 Err(Error::QueryReturnedNoRows) => {
1994 let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
1995 return Err(missing_document_text_error(doc_id));
1996 }
1997 Err(err) => return Err(err.into()),
1998 };
1999 let text = chunk_text_from_canonical(&document_text, &chunk)?;
2000 Ok(ChunkTextRow {
2001 chunk,
2002 extractor_key,
2003 text,
2004 })
2005 }
2006
2007 pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2008 if entries.is_empty() {
2009 return Ok(());
2010 }
2011
2012 let conn = self
2013 .db
2014 .lock()
2015 .map_err(|_| CoreError::poisoned("database"))?;
2016
2017 let tx = conn.unchecked_transaction()?;
2018 let mut stmt = tx.prepare(
2019 "INSERT INTO embeddings (chunk_id, model, embedded_at)
2020 VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2021 ON CONFLICT(chunk_id, model) DO UPDATE SET
2022 embedded_at = excluded.embedded_at",
2023 )?;
2024
2025 for (chunk_id, model) in entries {
2026 stmt.execute(params![chunk_id, model])?;
2027 }
2028
2029 drop(stmt);
2030 tx.commit()?;
2031 Ok(())
2032 }
2033
2034 pub fn get_unembedded_chunks(
2035 &self,
2036 model: &str,
2037 after_chunk_id: i64,
2038 limit: usize,
2039 ) -> Result<Vec<EmbedRecord>> {
2040 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2041 }
2042
2043 pub fn get_unembedded_chunks_in_space(
2044 &self,
2045 model: &str,
2046 space_id: i64,
2047 after_chunk_id: i64,
2048 limit: usize,
2049 ) -> Result<Vec<EmbedRecord>> {
2050 self.get_unembedded_chunks_filtered(
2051 model,
2052 after_chunk_id,
2053 limit,
2054 " AND col.space_id = ?",
2055 vec![SqlValue::Integer(space_id)],
2056 )
2057 }
2058
2059 pub fn get_unembedded_chunks_in_collections(
2060 &self,
2061 model: &str,
2062 collection_ids: &[i64],
2063 after_chunk_id: i64,
2064 limit: usize,
2065 ) -> Result<Vec<EmbedRecord>> {
2066 if collection_ids.is_empty() {
2067 return Ok(Vec::new());
2068 }
2069
2070 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2071 let clause = format!(" AND d.collection_id IN ({placeholders})");
2072 let params = collection_ids
2073 .iter()
2074 .map(|id| SqlValue::Integer(*id))
2075 .collect::<Vec<_>>();
2076 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2077 }
2078
2079 fn get_unembedded_chunks_filtered(
2080 &self,
2081 model: &str,
2082 after_chunk_id: i64,
2083 limit: usize,
2084 scope_clause: &str,
2085 scope_params: Vec<SqlValue>,
2086 ) -> Result<Vec<EmbedRecord>> {
2087 let conn = self
2088 .db
2089 .lock()
2090 .map_err(|_| CoreError::poisoned("database"))?;
2091 let sql_limit = i64::try_from(limit)
2092 .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2093
2094 let sql = format!(
2095 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2096 d.path, d.title, col.path, s.name
2097 FROM chunks c
2098 JOIN documents d ON d.id = c.doc_id
2099 JOIN collections col ON col.id = d.collection_id
2100 JOIN spaces s ON s.id = col.space_id
2101 LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2102 WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2103 ORDER BY c.id ASC
2104 LIMIT ?"
2105 );
2106 let mut stmt = conn.prepare(&sql)?;
2107 let mut params = Vec::with_capacity(scope_params.len() + 3);
2108 params.push(SqlValue::Text(model.to_string()));
2109 params.push(SqlValue::Integer(after_chunk_id));
2110 params.extend(scope_params);
2111 params.push(SqlValue::Integer(sql_limit));
2112 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2113 Ok(EmbedRecord {
2114 chunk: decode_chunk_row(row)?,
2115 doc_path: row.get(8)?,
2116 doc_title: decode_optional_title(row.get::<_, String>(9)?),
2117 collection_path: PathBuf::from(row.get::<_, String>(10)?),
2118 space_name: row.get(11)?,
2119 })
2120 })?;
2121
2122 let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2123 Ok(records)
2124 }
2125
2126 pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2127 let conn = self
2128 .db
2129 .lock()
2130 .map_err(|_| CoreError::poisoned("database"))?;
2131
2132 let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2133 Ok(deleted)
2134 }
2135
2136 pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2137 let conn = self
2138 .db
2139 .lock()
2140 .map_err(|_| CoreError::poisoned("database"))?;
2141 let _space_name = lookup_space_name(&conn, space_id)?;
2142
2143 let deleted = conn.execute(
2144 "DELETE FROM embeddings
2145 WHERE chunk_id IN (
2146 SELECT c.id
2147 FROM chunks c
2148 JOIN documents d ON d.id = c.doc_id
2149 JOIN collections col ON col.id = d.collection_id
2150 WHERE col.space_id = ?1
2151 )",
2152 params![space_id],
2153 )?;
2154 Ok(deleted)
2155 }
2156
2157 pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2158 let conn = self
2159 .db
2160 .lock()
2161 .map_err(|_| CoreError::poisoned("database"))?;
2162 let _space_name = lookup_space_name(&conn, space_id)?;
2163
2164 let mut stmt = conn.prepare(
2165 "SELECT DISTINCT e.model
2166 FROM embeddings e
2167 JOIN chunks c ON c.id = e.chunk_id
2168 JOIN documents d ON d.id = c.doc_id
2169 JOIN collections col ON col.id = d.collection_id
2170 WHERE col.space_id = ?1
2171 ORDER BY e.model ASC",
2172 )?;
2173 let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2174 let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2175 Ok(models)
2176 }
2177
2178 pub fn count_embeddings(&self) -> Result<usize> {
2179 let conn = self
2180 .db
2181 .lock()
2182 .map_err(|_| CoreError::poisoned("database"))?;
2183
2184 let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2185 Ok(count as usize)
2186 }
2187
2188 pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2189 if entries.is_empty() {
2190 return Ok(());
2191 }
2192
2193 let space_indexes = self.get_space_indexes(space)?;
2194 with_tantivy_writer(&space_indexes, |writer| {
2195 for entry in entries {
2196 let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2197 CoreError::Internal(format!(
2198 "chunk_id must be non-negative for tantivy indexing: {}",
2199 entry.chunk_id
2200 ))
2201 })?;
2202 let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2203 CoreError::Internal(format!(
2204 "doc_id must be non-negative for tantivy indexing: {}",
2205 entry.doc_id
2206 ))
2207 })?;
2208
2209 let mut doc = TantivyDocument::default();
2210 doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2211 doc.add_u64(space_indexes.fields.doc_id, doc_id);
2212 doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2213 if let Some(title) = &entry.title {
2214 doc.add_text(space_indexes.fields.title, title);
2215 }
2216 if let Some(heading) = &entry.heading {
2217 doc.add_text(space_indexes.fields.heading, heading);
2218 }
2219 doc.add_text(space_indexes.fields.body, &entry.body);
2220 writer.add_document(doc)?;
2221 }
2222 Ok(())
2223 })
2224 }
2225
2226 pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2227 if chunk_ids.is_empty() {
2228 return Ok(());
2229 }
2230
2231 let space_indexes = self.get_space_indexes(space)?;
2232 with_tantivy_writer(&space_indexes, |writer| {
2233 for chunk_id in chunk_ids {
2234 let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2235 CoreError::Internal(format!(
2236 "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2237 ))
2238 })?;
2239 writer.delete_term(Term::from_field_u64(
2240 space_indexes.fields.chunk_id,
2241 chunk_key,
2242 ));
2243 }
2244
2245 Ok(())
2246 })
2247 }
2248
2249 pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2250 let space_indexes = self.get_space_indexes(space)?;
2251 with_tantivy_writer(&space_indexes, |writer| {
2252 let doc_key = u64::try_from(doc_id).map_err(|_| {
2253 CoreError::Internal(format!(
2254 "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2255 ))
2256 })?;
2257 writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2258 Ok(())
2259 })
2260 }
2261
2262 pub fn query_bm25(
2263 &self,
2264 space: &str,
2265 query: &str,
2266 fields: &[(&str, f32)],
2267 limit: usize,
2268 ) -> Result<Vec<BM25Hit>> {
2269 self.query_bm25_filtered(space, query, fields, None, limit, true)
2270 }
2271
2272 pub(crate) fn query_bm25_cached(
2273 &self,
2274 space: &str,
2275 query: &str,
2276 fields: &[(&str, f32)],
2277 limit: usize,
2278 ) -> Result<Vec<BM25Hit>> {
2279 self.query_bm25_filtered(space, query, fields, None, limit, false)
2280 }
2281
2282 pub fn query_bm25_in_documents(
2283 &self,
2284 space: &str,
2285 query: &str,
2286 fields: &[(&str, f32)],
2287 document_ids: &[i64],
2288 limit: usize,
2289 ) -> Result<Vec<BM25Hit>> {
2290 if document_ids.is_empty() {
2291 return Ok(Vec::new());
2292 }
2293
2294 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2295 }
2296
2297 pub(crate) fn query_bm25_in_documents_cached(
2298 &self,
2299 space: &str,
2300 query: &str,
2301 fields: &[(&str, f32)],
2302 document_ids: &[i64],
2303 limit: usize,
2304 ) -> Result<Vec<BM25Hit>> {
2305 if document_ids.is_empty() {
2306 return Ok(Vec::new());
2307 }
2308
2309 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2310 }
2311
2312 fn query_bm25_filtered(
2313 &self,
2314 space: &str,
2315 query: &str,
2316 fields: &[(&str, f32)],
2317 document_ids: Option<&[i64]>,
2318 limit: usize,
2319 reload_reader: bool,
2320 ) -> Result<Vec<BM25Hit>> {
2321 if limit == 0 {
2322 return Ok(Vec::new());
2323 }
2324
2325 if fields.is_empty() {
2326 return Err(CoreError::Internal(
2327 "bm25 query requires at least one field".to_string(),
2328 ));
2329 }
2330
2331 let space_indexes = self.get_space_indexes(space)?;
2332
2333 let schema = space_indexes.tantivy_index.schema();
2334 let query_fields = fields
2335 .iter()
2336 .map(|(name, boost)| {
2337 let field = resolve_tantivy_field(space_indexes.fields, name)?;
2338 let field_entry = schema.get_field_entry(field);
2339 let index_record_option = field_entry
2340 .field_type()
2341 .get_index_record_option()
2342 .ok_or_else(|| {
2343 CoreError::Internal(format!(
2344 "bm25 field '{}' is not indexed",
2345 field_entry.name()
2346 ))
2347 })?;
2348 Ok(Bm25FieldSpec {
2349 field,
2350 boost: *boost,
2351 index_record_option,
2352 })
2353 })
2354 .collect::<Result<Vec<_>>>()?;
2355 let Some(parsed_query) =
2356 build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2357 else {
2358 return Ok(Vec::new());
2359 };
2360 let parsed_query = if let Some(document_ids) = document_ids {
2361 Box::new(BooleanQuery::new(vec![
2362 (Occur::Must, parsed_query),
2363 (
2364 Occur::Must,
2365 build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2366 ),
2367 ])) as Box<dyn Query>
2368 } else {
2369 parsed_query
2370 };
2371 if reload_reader {
2372 space_indexes.tantivy_reader.reload()?;
2373 }
2374 let searcher = space_indexes.tantivy_reader.searcher();
2375 let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2376
2377 let mut hits = Vec::with_capacity(docs.len());
2378 let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2379 let mut chunk_id_columns = HashMap::new();
2380 for (score, address) in docs {
2381 let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2382 std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2383 std::collections::hash_map::Entry::Vacant(entry) => {
2384 let column = searcher
2385 .segment_reader(address.segment_ord)
2386 .fast_fields()
2387 .u64(chunk_id_field_name)?;
2388 entry.insert(column)
2389 }
2390 };
2391 let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2392 CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2393 })?;
2394 let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2395 CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2396 })?;
2397 hits.push(BM25Hit { chunk_id, score });
2398 }
2399
2400 Ok(hits)
2401 }
2402
2403 pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2404 let space_indexes = self.get_space_indexes(space)?;
2405 space_indexes.tantivy_reader.reload()?;
2406 Ok(())
2407 }
2408
2409 pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2410 let space_indexes = self.get_space_indexes(space)?;
2411 let mut writer = space_indexes
2412 .tantivy_writer
2413 .lock()
2414 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2415 let Some(mut writer) = writer.take() else {
2416 return Ok(());
2417 };
2418 writer.commit()?;
2419 space_indexes.tantivy_reader.reload()?;
2420 Ok(())
2421 }
2422
2423 pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2424 self.batch_insert_usearch(space, &[(key, vector)])
2425 }
2426
2427 pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2428 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2429 }
2430
2431 pub(crate) fn batch_insert_usearch_deferred(
2432 &self,
2433 space: &str,
2434 entries: &[(i64, &[f32])],
2435 ) -> Result<()> {
2436 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2437 }
2438
2439 fn batch_insert_usearch_with_save_mode(
2440 &self,
2441 space: &str,
2442 entries: &[(i64, &[f32])],
2443 save_mode: UsearchSaveMode,
2444 ) -> Result<()> {
2445 if entries.is_empty() {
2446 return Ok(());
2447 }
2448
2449 let space_indexes = self.get_space_indexes(space)?;
2450
2451 let expected_dimensions = entries[0].1.len();
2452 if expected_dimensions == 0 {
2453 return Err(CoreError::Internal(
2454 "cannot insert empty vector into usearch index".to_string(),
2455 ));
2456 }
2457 for (_, vector) in entries {
2458 if vector.len() != expected_dimensions {
2459 return Err(CoreError::Internal(format!(
2460 "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2461 vector.len()
2462 )));
2463 }
2464 }
2465
2466 let mut index = space_indexes
2467 .usearch_index
2468 .write()
2469 .map_err(|_| CoreError::poisoned("usearch index"))?;
2470 let ensure_started = std::time::Instant::now();
2471 ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2472 crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2473
2474 let target_capacity = index.size().saturating_add(entries.len());
2475 let reserve_started = std::time::Instant::now();
2476 index.reserve(target_capacity).map_err(|err| {
2477 CoreError::Internal(format!(
2478 "usearch reserve failed for {target_capacity} items: {err}"
2479 ))
2480 })?;
2481 crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2482
2483 if matches!(save_mode, UsearchSaveMode::Deferred) {
2484 self.mark_usearch_dirty(space)?;
2485 }
2486
2487 let add_started = std::time::Instant::now();
2488 for (key, vector) in entries {
2489 let key = u64::try_from(*key).map_err(|_| {
2490 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2491 })?;
2492 index
2493 .add::<f32>(key, vector)
2494 .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2495 }
2496 crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2497
2498 if matches!(save_mode, UsearchSaveMode::Immediate) {
2499 let save_started = std::time::Instant::now();
2500 save_usearch_index(&index, &space_indexes.usearch_path)?;
2501 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2502 }
2503 Ok(())
2504 }
2505
2506 pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2507 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2508 }
2509
2510 pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2511 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2512 }
2513
2514 fn delete_usearch_with_save_mode(
2515 &self,
2516 space: &str,
2517 keys: &[i64],
2518 save_mode: UsearchSaveMode,
2519 ) -> Result<()> {
2520 if keys.is_empty() {
2521 return Ok(());
2522 }
2523
2524 let space_indexes = self.get_space_indexes(space)?;
2525 let index = space_indexes
2526 .usearch_index
2527 .write()
2528 .map_err(|_| CoreError::poisoned("usearch index"))?;
2529
2530 if matches!(save_mode, UsearchSaveMode::Deferred) {
2531 self.mark_usearch_dirty(space)?;
2532 }
2533
2534 let delete_started = std::time::Instant::now();
2535 for key in keys {
2536 let key = u64::try_from(*key).map_err(|_| {
2537 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2538 })?;
2539 index
2540 .remove(key)
2541 .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2542 }
2543 crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2544
2545 if matches!(save_mode, UsearchSaveMode::Immediate) {
2546 let save_started = std::time::Instant::now();
2547 save_usearch_index(&index, &space_indexes.usearch_path)?;
2548 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2549 }
2550 Ok(())
2551 }
2552
2553 pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2554 let spaces = {
2555 let mut dirty = self
2556 .dirty_usearch_spaces
2557 .lock()
2558 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2559 if dirty.is_empty() {
2560 return Ok(());
2561 }
2562
2563 let mut spaces = dirty.drain().collect::<Vec<_>>();
2564 spaces.sort();
2565 spaces
2566 };
2567
2568 for (index, space) in spaces.iter().enumerate() {
2569 if let Err(err) = self.save_usearch_for_space(space) {
2570 let mut dirty = self
2571 .dirty_usearch_spaces
2572 .lock()
2573 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2574 for unsaved in &spaces[index..] {
2575 dirty.insert(unsaved.clone());
2576 }
2577 return Err(err);
2578 }
2579 crate::profile::increment_update_count("usearch_saved_spaces", 1);
2580 }
2581
2582 Ok(())
2583 }
2584
2585 fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2586 self.dirty_usearch_spaces
2587 .lock()
2588 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2589 .insert(space.to_string());
2590 Ok(())
2591 }
2592
2593 fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2594 let space_indexes = self.get_space_indexes(space)?;
2595 let index = space_indexes
2596 .usearch_index
2597 .read()
2598 .map_err(|_| CoreError::poisoned("usearch index"))?;
2599
2600 let save_started = std::time::Instant::now();
2601 save_usearch_index(&index, &space_indexes.usearch_path)?;
2602 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2603 Ok(())
2604 }
2605
2606 pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2607 self.query_dense_filtered(space, vector, None, limit)
2608 }
2609
2610 pub fn query_dense_in_chunks(
2611 &self,
2612 space: &str,
2613 vector: &[f32],
2614 chunk_ids: &[i64],
2615 limit: usize,
2616 ) -> Result<Vec<DenseHit>> {
2617 if chunk_ids.is_empty() {
2618 return Ok(Vec::new());
2619 }
2620
2621 let allowed_keys = chunk_ids
2622 .iter()
2623 .map(|chunk_id| {
2624 u64::try_from(*chunk_id).map_err(|_| {
2625 CoreError::Internal(format!(
2626 "chunk_id must be non-negative for usearch query: {chunk_id}"
2627 ))
2628 })
2629 })
2630 .collect::<Result<HashSet<_>>>()?;
2631 self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2632 }
2633
2634 pub(crate) fn query_dense_in_key_set(
2635 &self,
2636 space: &str,
2637 vector: &[f32],
2638 allowed_keys: &HashSet<u64>,
2639 limit: usize,
2640 ) -> Result<Vec<DenseHit>> {
2641 if allowed_keys.is_empty() {
2642 return Ok(Vec::new());
2643 }
2644
2645 self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2646 }
2647
2648 fn query_dense_filtered(
2649 &self,
2650 space: &str,
2651 vector: &[f32],
2652 allowed_keys: Option<&HashSet<u64>>,
2653 limit: usize,
2654 ) -> Result<Vec<DenseHit>> {
2655 if limit == 0 {
2656 return Ok(Vec::new());
2657 }
2658 if vector.is_empty() {
2659 return Err(CoreError::Internal(
2660 "cannot query usearch with empty vector".to_string(),
2661 ));
2662 }
2663
2664 let space_indexes = self.get_space_indexes(space)?;
2665 let index = space_indexes
2666 .usearch_index
2667 .read()
2668 .map_err(|_| CoreError::poisoned("usearch index"))?;
2669
2670 if index.size() == 0 {
2671 return Ok(Vec::new());
2672 }
2673 if vector.len() != index.dimensions() {
2674 return Err(CoreError::Internal(format!(
2675 "query vector dimension mismatch: expected {}, got {}",
2676 index.dimensions(),
2677 vector.len()
2678 )));
2679 }
2680
2681 let matches = if let Some(allowed_keys) = allowed_keys {
2682 index
2683 .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2684 .map_err(|err| {
2685 CoreError::Internal(format!("usearch filtered query failed: {err}"))
2686 })?
2687 } else {
2688 index
2689 .search::<f32>(vector, limit)
2690 .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2691 };
2692 let hits = matches
2693 .keys
2694 .into_iter()
2695 .zip(matches.distances)
2696 .map(|(key, distance)| DenseHit {
2697 chunk_id: key as i64,
2698 distance,
2699 })
2700 .collect();
2701 Ok(hits)
2702 }
2703
2704 pub fn count_usearch(&self, space: &str) -> Result<usize> {
2705 let space_indexes = self.get_space_indexes(space)?;
2706 let index = space_indexes
2707 .usearch_index
2708 .read()
2709 .map_err(|_| CoreError::poisoned("usearch index"))?;
2710 Ok(index.size())
2711 }
2712
2713 pub fn clear_usearch(&self, space: &str) -> Result<()> {
2714 let space_indexes = self.get_space_indexes(space)?;
2715 let index = space_indexes
2716 .usearch_index
2717 .write()
2718 .map_err(|_| CoreError::poisoned("usearch index"))?;
2719 let clear_started = std::time::Instant::now();
2720 index
2721 .reset()
2722 .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2723 std::fs::File::create(&space_indexes.usearch_path)?;
2724 crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2725 Ok(())
2726 }
2727
2728 pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2729 self.get_fts_dirty_documents_filtered("", Vec::new())
2730 }
2731
2732 pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2733 self.get_fts_dirty_documents_filtered(
2734 " AND c.space_id = ?",
2735 vec![SqlValue::Integer(space_id)],
2736 )
2737 }
2738
2739 pub fn get_fts_dirty_documents_in_collections(
2740 &self,
2741 collection_ids: &[i64],
2742 ) -> Result<Vec<FtsDirtyRecord>> {
2743 if collection_ids.is_empty() {
2744 return Ok(Vec::new());
2745 }
2746
2747 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2748 let clause = format!(" AND d.collection_id IN ({placeholders})");
2749 let params = collection_ids
2750 .iter()
2751 .map(|id| SqlValue::Integer(*id))
2752 .collect::<Vec<_>>();
2753 self.get_fts_dirty_documents_filtered(&clause, params)
2754 }
2755
2756 fn get_fts_dirty_documents_filtered(
2757 &self,
2758 scope_clause: &str,
2759 scope_params: Vec<SqlValue>,
2760 ) -> Result<Vec<FtsDirtyRecord>> {
2761 let conn = self
2762 .db
2763 .lock()
2764 .map_err(|_| CoreError::poisoned("database"))?;
2765
2766 let sql = format!(
2767 "SELECT d.id, d.path, d.title, d.hash, c.path, s.name
2768 FROM documents d
2769 JOIN collections c ON c.id = d.collection_id
2770 JOIN spaces s ON s.id = c.space_id
2771 WHERE d.fts_dirty = 1{scope_clause}
2772 ORDER BY d.id ASC"
2773 );
2774 let mut stmt = conn.prepare(&sql)?;
2775 let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2776 Ok((
2777 row.get::<_, i64>(0)?,
2778 row.get::<_, String>(1)?,
2779 row.get::<_, String>(2)?,
2780 row.get::<_, String>(3)?,
2781 row.get::<_, String>(4)?,
2782 row.get::<_, String>(5)?,
2783 ))
2784 })?;
2785 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2786 drop(stmt);
2787
2788 let mut records = Vec::with_capacity(headers.len());
2789 for (doc_id, doc_path, doc_title, doc_hash, collection_path, space_name) in headers {
2790 let chunks = load_chunks_for_doc(&conn, doc_id)?;
2791 records.push(FtsDirtyRecord {
2792 doc_id,
2793 doc_path,
2794 doc_title: decode_optional_title(doc_title),
2795 doc_hash,
2796 collection_path: PathBuf::from(collection_path),
2797 space_name,
2798 chunks,
2799 });
2800 }
2801
2802 Ok(records)
2803 }
2804
2805 pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2806 if doc_ids.is_empty() {
2807 return Ok(());
2808 }
2809
2810 let conn = self
2811 .db
2812 .lock()
2813 .map_err(|_| CoreError::poisoned("database"))?;
2814
2815 let placeholders = vec!["?"; doc_ids.len()].join(", ");
2816 let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2817 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2818 Ok(())
2819 }
2820
2821 pub fn count_documents_in_collection(
2822 &self,
2823 collection_id: i64,
2824 active_only: bool,
2825 ) -> Result<usize> {
2826 let conn = self
2827 .db
2828 .lock()
2829 .map_err(|_| CoreError::poisoned("database"))?;
2830 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2831 let active_only = i64::from(active_only);
2832 query_count(
2833 &conn,
2834 "SELECT COUNT(*)
2835 FROM documents
2836 WHERE collection_id = ?1
2837 AND (?2 = 0 OR active = 1)",
2838 params![collection_id, active_only],
2839 )
2840 }
2841
2842 pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2843 let conn = self
2844 .db
2845 .lock()
2846 .map_err(|_| CoreError::poisoned("database"))?;
2847 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2848
2849 query_count(
2850 &conn,
2851 "SELECT COUNT(*)
2852 FROM chunks c
2853 JOIN documents d ON d.id = c.doc_id
2854 WHERE d.collection_id = ?1",
2855 params![collection_id],
2856 )
2857 }
2858
2859 pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2860 let conn = self
2861 .db
2862 .lock()
2863 .map_err(|_| CoreError::poisoned("database"))?;
2864 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2865
2866 query_count(
2867 &conn,
2868 "SELECT COUNT(DISTINCT e.chunk_id)
2869 FROM embeddings e
2870 JOIN chunks c ON c.id = e.chunk_id
2871 JOIN documents d ON d.id = c.doc_id
2872 WHERE d.collection_id = ?1",
2873 params![collection_id],
2874 )
2875 }
2876
2877 pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2878 let conn = self
2879 .db
2880 .lock()
2881 .map_err(|_| CoreError::poisoned("database"))?;
2882
2883 match space_id {
2884 Some(space_id) => {
2885 let _space_name = lookup_space_name(&conn, space_id)?;
2886 query_count(
2887 &conn,
2888 "SELECT COUNT(*)
2889 FROM documents d
2890 JOIN collections c ON c.id = d.collection_id
2891 WHERE c.space_id = ?1",
2892 params![space_id],
2893 )
2894 }
2895 None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2896 }
2897 }
2898
2899 pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2900 let conn = self
2901 .db
2902 .lock()
2903 .map_err(|_| CoreError::poisoned("database"))?;
2904
2905 match space_id {
2906 Some(space_id) => {
2907 let _space_name = lookup_space_name(&conn, space_id)?;
2908 query_count(
2909 &conn,
2910 "SELECT COUNT(*)
2911 FROM chunks c
2912 JOIN documents d ON d.id = c.doc_id
2913 JOIN collections col ON col.id = d.collection_id
2914 WHERE col.space_id = ?1",
2915 params![space_id],
2916 )
2917 }
2918 None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2919 }
2920 }
2921
2922 pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2923 let conn = self
2924 .db
2925 .lock()
2926 .map_err(|_| CoreError::poisoned("database"))?;
2927
2928 match space_id {
2929 Some(space_id) => {
2930 let _space_name = lookup_space_name(&conn, space_id)?;
2931 query_count(
2932 &conn,
2933 "SELECT COUNT(DISTINCT e.chunk_id)
2934 FROM embeddings e
2935 JOIN chunks c ON c.id = e.chunk_id
2936 JOIN documents d ON d.id = c.doc_id
2937 JOIN collections col ON col.id = d.collection_id
2938 WHERE col.space_id = ?1",
2939 params![space_id],
2940 )
2941 }
2942 None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2943 }
2944 }
2945
2946 pub fn disk_usage(&self) -> Result<DiskUsage> {
2947 let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2948
2949 let mut tantivy_bytes = 0_u64;
2950 let mut usearch_bytes = 0_u64;
2951 let spaces_dir = self.cache_dir.join(SPACES_DIR);
2952 if spaces_dir.exists() {
2953 for entry in std::fs::read_dir(&spaces_dir)? {
2954 let space_dir = entry?.path();
2955 if !space_dir.is_dir() {
2956 continue;
2957 }
2958
2959 tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2960 usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2961 }
2962 }
2963
2964 let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2965 let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2966
2967 Ok(DiskUsage {
2968 sqlite_bytes,
2969 tantivy_bytes,
2970 usearch_bytes,
2971 models_bytes,
2972 total_bytes,
2973 })
2974 }
2975
2976 fn unload_space(&self, name: &str) -> Result<()> {
2977 let mut spaces = self
2978 .spaces
2979 .write()
2980 .map_err(|_| CoreError::poisoned("spaces"))?;
2981 spaces.remove(name);
2982 Ok(())
2983 }
2984
2985 fn remove_space_artifacts(&self, name: &str) -> Result<()> {
2986 let space_root = self.space_root_path(name);
2987 if space_root.exists() {
2988 std::fs::remove_dir_all(space_root)?;
2989 }
2990 Ok(())
2991 }
2992
2993 fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
2994 let old_root = self.space_root_path(old);
2995 let new_root = self.space_root_path(new);
2996 if !old_root.exists() {
2997 return Ok(());
2998 }
2999
3000 if new_root.exists() {
3001 return Err(CoreError::Internal(format!(
3002 "cannot rename space artifacts: destination already exists: {}",
3003 new_root.display()
3004 )));
3005 }
3006
3007 if let Some(parent) = new_root.parent() {
3008 std::fs::create_dir_all(parent)?;
3009 }
3010 std::fs::rename(old_root, new_root)?;
3011 Ok(())
3012 }
3013
3014 fn space_root_path(&self, name: &str) -> PathBuf {
3015 self.cache_dir.join(SPACES_DIR).join(name)
3016 }
3017
3018 fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3019 let space_root = self.space_root_path(name);
3020 let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3021 let usearch_path = space_root.join(USEARCH_FILENAME);
3022 (tantivy_dir, usearch_path)
3023 }
3024
3025 fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3026 self.open_space(name)?;
3027 let spaces = self
3028 .spaces
3029 .read()
3030 .map_err(|_| CoreError::poisoned("spaces"))?;
3031 spaces.get(name).cloned().ok_or_else(|| {
3032 KboltError::SpaceNotFound {
3033 name: name.to_string(),
3034 }
3035 .into()
3036 })
3037 }
3038}
3039
3040fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3041 let result = conn.query_row(
3042 "SELECT name FROM spaces WHERE id = ?1",
3043 params![space_id],
3044 |row| row.get::<_, String>(0),
3045 );
3046 match result {
3047 Ok(name) => Ok(name),
3048 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3049 name: format!("id={space_id}"),
3050 }
3051 .into()),
3052 Err(err) => Err(err.into()),
3053 }
3054}
3055
3056fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3057 let result = conn.query_row(
3058 "SELECT name FROM collections WHERE id = ?1",
3059 params![collection_id],
3060 |row| row.get::<_, String>(0),
3061 );
3062 match result {
3063 Ok(name) => Ok(name),
3064 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3065 name: format!("id={collection_id}"),
3066 }
3067 .into()),
3068 Err(err) => Err(err.into()),
3069 }
3070}
3071
3072fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3073 let result = conn.query_row(
3074 "SELECT id FROM documents WHERE id = ?1",
3075 params![doc_id],
3076 |row| row.get::<_, i64>(0),
3077 );
3078 match result {
3079 Ok(id) => Ok(id),
3080 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3081 path: format!("id={doc_id}"),
3082 }
3083 .into()),
3084 Err(err) => Err(err.into()),
3085 }
3086}
3087
3088fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3089 let mut stmt = conn.prepare(
3090 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3091 FROM chunks
3092 WHERE doc_id = ?1
3093 ORDER BY seq ASC",
3094 )?;
3095 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3096 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3097 Ok(chunks)
3098}
3099
3100fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3101 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3102 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3103 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3104 Ok(chunk_ids)
3105}
3106
3107fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3108 let result = conn.query_row(
3109 "SELECT doc_id FROM chunks WHERE id = ?1",
3110 params![chunk_id],
3111 |row| row.get::<_, i64>(0),
3112 );
3113 match result {
3114 Ok(doc_id) => Ok(doc_id),
3115 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3116 path: format!("chunk_id={chunk_id}"),
3117 }
3118 .into()),
3119 Err(err) => Err(err.into()),
3120 }
3121}
3122
3123fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3124 let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3125 Ok(count as usize)
3126}
3127
3128fn file_size_or_zero(path: &Path) -> Result<u64> {
3129 if !path.exists() {
3130 return Ok(0);
3131 }
3132
3133 let metadata = std::fs::metadata(path)?;
3134 if metadata.is_file() {
3135 Ok(metadata.len())
3136 } else {
3137 Ok(0)
3138 }
3139}
3140
3141fn dir_size_or_zero(path: &Path) -> Result<u64> {
3142 if !path.exists() {
3143 return Ok(0);
3144 }
3145
3146 let mut total = 0_u64;
3147 for entry in std::fs::read_dir(path)? {
3148 let entry = entry?;
3149 let child_path = entry.path();
3150 let metadata = std::fs::symlink_metadata(&child_path)?;
3151 if metadata.is_file() {
3152 total += metadata.len();
3153 } else if metadata.is_dir() {
3154 total += dir_size_or_zero(&child_path)?;
3155 }
3156 }
3157
3158 Ok(total)
3159}
3160
3161fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3162 let meta_path = path.join("meta.json");
3163 if meta_path.exists() {
3164 return Ok(Index::open_in_dir(path)?);
3165 }
3166
3167 Ok(Index::create_in_dir(path, tantivy_schema())?)
3168}
3169
3170fn with_tantivy_writer<T>(
3171 space_indexes: &SpaceIndexes,
3172 f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3173) -> Result<T> {
3174 let mut writer = space_indexes
3175 .tantivy_writer
3176 .lock()
3177 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3178
3179 if writer.is_none() {
3180 *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3181 }
3182
3183 let writer = writer
3184 .as_mut()
3185 .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3186 f(writer)
3187}
3188
3189fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3190 if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3191 return Ok(());
3192 }
3193
3194 let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3195 if document_count == 0 {
3196 return Ok(());
3197 }
3198
3199 Err(KboltError::Config(
3200 "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3201 )
3202 .into())
3203}
3204
3205fn ensure_schema_version(conn: &Connection) -> Result<()> {
3206 let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3207 if current > SCHEMA_VERSION {
3208 return Err(KboltError::Config(format!(
3209 "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3210 ))
3211 .into());
3212 }
3213
3214 if current < SCHEMA_VERSION {
3215 conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3216 }
3217
3218 Ok(())
3219}
3220
3221fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3222 let exists = conn.query_row(
3223 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3224 [table],
3225 |row| row.get::<_, i64>(0),
3226 )?;
3227 Ok(exists != 0)
3228}
3229
3230fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3231 let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3232 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3233 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3234 drop(stmt);
3235
3236 if columns.iter().any(|column| column == "generation_key") {
3237 return Ok(());
3238 }
3239
3240 conn.execute(
3241 "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3242 [],
3243 )?;
3244 Ok(())
3245}
3246
3247fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3248 let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3249 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3250 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3251 drop(stmt);
3252
3253 if columns.iter().any(|column| column == "retrieval_prefix") {
3254 return Ok(());
3255 }
3256
3257 conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3258 Ok(())
3259}
3260
3261fn build_literal_bm25_query(
3262 index: &Index,
3263 fields: &[Bm25FieldSpec],
3264 query: &str,
3265) -> Result<Option<Box<dyn Query>>> {
3266 let mut clauses = Vec::new();
3267 for field in fields {
3268 for token in analyzed_terms_for_field(index, field.field, query)? {
3269 let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3270 Term::from_field_text(field.field, &token),
3271 field.index_record_option,
3272 ));
3273 let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3274 Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3275 } else {
3276 term_query
3277 };
3278 clauses.push((Occur::Should, query));
3279 }
3280 }
3281
3282 if clauses.is_empty() {
3283 Ok(None)
3284 } else {
3285 Ok(Some(Box::new(BooleanQuery::new(clauses))))
3286 }
3287}
3288
3289fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3290 let mut terms = Vec::new();
3291 let mut seen = HashSet::new();
3292 for doc_id in document_ids {
3293 let doc_id = u64::try_from(*doc_id).map_err(|_| {
3294 CoreError::Internal(format!(
3295 "doc_id must be non-negative for tantivy query: {doc_id}"
3296 ))
3297 })?;
3298 if seen.insert(doc_id) {
3299 terms.push(Term::from_field_u64(field, doc_id));
3300 }
3301 }
3302
3303 Ok(Box::new(ConstScoreQuery::new(
3304 Box::new(TermSetQuery::new(terms)),
3305 0.0,
3306 )))
3307}
3308
3309fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3310 let mut analyzer = index.tokenizer_for_field(field)?;
3311 let mut stream = analyzer.token_stream(query);
3312 let mut terms = Vec::new();
3313 let mut seen = HashSet::new();
3314 while let Some(token) = stream.next() {
3315 if token.text.is_empty() {
3316 continue;
3317 }
3318 let text = token.text.clone();
3319 if seen.insert(text.clone()) {
3320 terms.push(text);
3321 }
3322 }
3323 Ok(terms)
3324}
3325
3326fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3327 let options = IndexOptions {
3328 dimensions,
3329 metric: MetricKind::Cos,
3330 quantization: ScalarKind::F32,
3331 connectivity: 16,
3332 expansion_add: 200,
3333 expansion_search: 100,
3334 ..IndexOptions::default()
3335 };
3336 usearch::Index::new(&options)
3337 .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3338}
3339
3340fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3341 let index = new_usearch_index(256)?;
3342 let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3343 if file_size > 0 {
3344 let path = path
3345 .to_str()
3346 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3347 index
3348 .load(path)
3349 .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3350 }
3351 Ok(index)
3352}
3353
3354fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3355 if index.size() == 0 && index.dimensions() != expected_dimensions {
3356 *index = new_usearch_index(expected_dimensions)?;
3357 return Ok(());
3358 }
3359
3360 if index.dimensions() != expected_dimensions {
3361 return Err(CoreError::Internal(format!(
3362 "usearch vector dimension mismatch: index expects {}, got {}",
3363 index.dimensions(),
3364 expected_dimensions
3365 )));
3366 }
3367 Ok(())
3368}
3369
3370fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3371 if index.size() == 0 {
3372 std::fs::File::create(path)?;
3373 return Ok(());
3374 }
3375
3376 let path = path
3377 .to_str()
3378 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3379 index
3380 .save(path)
3381 .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3382 Ok(())
3383}
3384
3385fn tantivy_schema() -> tantivy::schema::Schema {
3386 let mut builder = tantivy::schema::Schema::builder();
3387 builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3388 builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3389 builder.add_text_field("filepath", TEXT | STORED);
3390 builder.add_text_field("title", TEXT | STORED);
3391 builder.add_text_field("heading", TEXT | STORED);
3392 builder.add_text_field("body", TEXT);
3393 builder.build()
3394}
3395
3396fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3397 Ok(TantivyFields {
3398 chunk_id: schema.get_field("chunk_id").map_err(|_| {
3399 CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3400 })?,
3401 doc_id: schema
3402 .get_field("doc_id")
3403 .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3404 filepath: schema.get_field("filepath").map_err(|_| {
3405 CoreError::Internal("tantivy schema missing field: filepath".to_string())
3406 })?,
3407 title: schema
3408 .get_field("title")
3409 .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3410 heading: schema.get_field("heading").map_err(|_| {
3411 CoreError::Internal("tantivy schema missing field: heading".to_string())
3412 })?,
3413 body: schema
3414 .get_field("body")
3415 .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3416 })
3417}
3418
3419fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3420 match name {
3421 "chunk_id" => Ok(fields.chunk_id),
3422 "doc_id" => Ok(fields.doc_id),
3423 "filepath" => Ok(fields.filepath),
3424 "title" => Ok(fields.title),
3425 "heading" => Ok(fields.heading),
3426 "body" => Ok(fields.body),
3427 other => Err(CoreError::Internal(format!(
3428 "unsupported tantivy field: {other}"
3429 ))),
3430 }
3431}
3432
3433fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3434 match extensions {
3435 None => Ok(None),
3436 Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3437 }
3438}
3439
3440fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3441 match raw {
3442 None => Ok(None),
3443 Some(json) => serde_json::from_str::<Vec<String>>(&json)
3444 .map(Some)
3445 .map_err(Into::into),
3446 }
3447}
3448
3449fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3450 Ok(SpaceRow {
3451 id: row.get(0)?,
3452 name: row.get(1)?,
3453 description: row.get(2)?,
3454 created: row.get(3)?,
3455 })
3456}
3457
3458fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3459 let raw_extensions: Option<String> = row.get(5)?;
3460 let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3461 Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3462 })?;
3463 Ok(CollectionRow {
3464 id: row.get(0)?,
3465 space_id: row.get(1)?,
3466 name: row.get(2)?,
3467 path: PathBuf::from(row.get::<_, String>(3)?),
3468 description: row.get(4)?,
3469 extensions,
3470 created: row.get(6)?,
3471 updated: row.get(7)?,
3472 })
3473}
3474
3475fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3476 decode_document_row_at(row, 0)
3477}
3478
3479fn decode_document_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<DocumentRow> {
3480 let active_value: i64 = row.get(base + 6)?;
3481 let fts_dirty_value: i64 = row.get(base + 8)?;
3482 Ok(DocumentRow {
3483 id: row.get(base)?,
3484 collection_id: row.get(base + 1)?,
3485 path: row.get(base + 2)?,
3486 title: decode_optional_title(row.get::<_, String>(base + 3)?),
3487 hash: row.get(base + 4)?,
3488 modified: row.get(base + 5)?,
3489 active: active_value != 0,
3490 deactivated_at: row.get(base + 7)?,
3491 fts_dirty: fts_dirty_value != 0,
3492 })
3493}
3494
3495fn normalized_title(title: Option<&str>) -> Option<&str> {
3496 title.map(str::trim).filter(|title| !title.is_empty())
3497}
3498
3499fn decode_optional_title(title: String) -> Option<String> {
3500 let trimmed = title.trim();
3501 if trimmed.is_empty() {
3502 None
3503 } else if trimmed.len() == title.len() {
3504 Some(title)
3505 } else {
3506 Some(trimmed.to_string())
3507 }
3508}
3509
3510fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3511 Ok(DocumentTextRow {
3512 doc_id: row.get(0)?,
3513 extractor_key: row.get(1)?,
3514 source_hash: row.get(2)?,
3515 text_hash: row.get(3)?,
3516 generation_key: row.get(4)?,
3517 text: row.get(5)?,
3518 created: row.get(6)?,
3519 })
3520}
3521
3522fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3523 decode_chunk_row_at(row, 0)
3524}
3525
3526fn decode_chunk_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<ChunkRow> {
3527 let offset_value: i64 = row.get(base + 3)?;
3528 let length_value: i64 = row.get(base + 4)?;
3529 let kind_raw: String = row.get(base + 6)?;
3530 let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3531 Error::FromSqlConversionFailure(base + 6, rusqlite::types::Type::Text, Box::new(err))
3532 })?;
3533 Ok(ChunkRow {
3534 id: row.get(base)?,
3535 doc_id: row.get(base + 1)?,
3536 seq: row.get(base + 2)?,
3537 offset: decode_non_negative_usize(offset_value, base + 3, "chunks.offset")?,
3538 length: decode_non_negative_usize(length_value, base + 4, "chunks.length")?,
3539 heading: row.get(base + 5)?,
3540 kind,
3541 retrieval_prefix: row.get(base + 7)?,
3542 })
3543}
3544
3545fn decode_non_negative_usize(
3546 value: i64,
3547 column: usize,
3548 name: &'static str,
3549) -> rusqlite::Result<usize> {
3550 if value < 0 {
3551 return Err(Error::FromSqlConversionFailure(
3552 column,
3553 SqlType::Integer,
3554 Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3555 ));
3556 }
3557
3558 Ok(value as usize)
3559}
3560
3561fn canonical_chunk_slice_from_bytes(
3562 chunk_id: i64,
3563 offset_value: i64,
3564 length_value: i64,
3565 document_len_value: i64,
3566 start_byte: Vec<u8>,
3567 end_byte: Vec<u8>,
3568 bytes: Vec<u8>,
3569) -> Result<String> {
3570 if offset_value < 0 {
3571 return Err(CoreError::Internal(format!(
3572 "chunk {chunk_id} offset must not be negative"
3573 )));
3574 }
3575 if length_value < 0 {
3576 return Err(CoreError::Internal(format!(
3577 "chunk {chunk_id} length must not be negative"
3578 )));
3579 }
3580 if document_len_value < 0 {
3581 return Err(CoreError::Internal(format!(
3582 "document text length for chunk {chunk_id} must not be negative"
3583 )));
3584 }
3585
3586 let offset = offset_value as usize;
3587 let length = length_value as usize;
3588 let document_len = document_len_value as usize;
3589 let end = offset.checked_add(length).ok_or_else(|| {
3590 CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3591 })?;
3592 if end > document_len {
3593 return Err(CoreError::Internal(format!(
3594 "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3595 )));
3596 }
3597 if bytes.len() != length {
3598 return Err(CoreError::Internal(format!(
3599 "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3600 bytes.len()
3601 )));
3602 }
3603 validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3604 validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3605
3606 String::from_utf8(bytes).map_err(|err| {
3607 CoreError::Internal(format!(
3608 "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3609 ))
3610 })
3611}
3612
3613fn validate_sqlite_utf8_boundary(
3614 chunk_id: i64,
3615 index: usize,
3616 document_len: usize,
3617 byte_at_index: &[u8],
3618) -> Result<()> {
3619 if index == 0 || index == document_len {
3620 return Ok(());
3621 }
3622 if byte_at_index.len() != 1 {
3623 return Err(CoreError::Internal(format!(
3624 "chunk {chunk_id} text boundary byte at {index} is missing"
3625 )));
3626 }
3627 if (0x80..0xC0).contains(&byte_at_index[0]) {
3628 return Err(CoreError::Internal(format!(
3629 "chunk {chunk_id} text span is not on UTF-8 boundaries"
3630 )));
3631 }
3632
3633 Ok(())
3634}
3635
3636pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3637 let label = format!("chunk {}", chunk.id);
3638 let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3639
3640 Ok(document_text[chunk.offset..end].to_string())
3641}
3642
3643fn validate_text_span(
3644 document_text: &str,
3645 offset: usize,
3646 length: usize,
3647 label: &str,
3648) -> Result<usize> {
3649 let end = offset
3650 .checked_add(length)
3651 .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3652 if end > document_text.len() {
3653 return Err(CoreError::Internal(format!(
3654 "{label} text span {}..{} exceeds document text length {}",
3655 offset,
3656 end,
3657 document_text.len()
3658 )));
3659 }
3660 if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3661 return Err(CoreError::Internal(format!(
3662 "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3663 )));
3664 }
3665
3666 Ok(end)
3667}
3668
3669pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3670 KboltError::Internal(format!(
3671 "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3672 ))
3673 .into()
3674}
3675
3676#[cfg(test)]
3677mod tests;