1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12 BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29 Immediate,
30 Deferred,
31}
32
33pub struct Storage {
34 db: Mutex<Connection>,
35 cache_dir: PathBuf,
36 spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37 dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42 pub id: i64,
43 pub name: String,
44 pub description: Option<String>,
45 pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50 pub id: i64,
51 pub space_id: i64,
52 pub name: String,
53 pub path: PathBuf,
54 pub description: Option<String>,
55 pub extensions: Option<Vec<String>>,
56 pub created: String,
57 pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62 pub id: i64,
63 pub collection_id: i64,
64 pub path: String,
65 pub title: Option<String>,
66 pub hash: String,
67 pub modified: String,
68 pub active: bool,
69 pub deactivated_at: Option<String>,
70 pub fts_dirty: bool,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FileListRow {
75 pub doc_id: i64,
76 pub path: String,
77 pub title: Option<String>,
78 pub hash: String,
79 pub active: bool,
80 pub chunk_count: usize,
81 pub embedded_chunk_count: usize,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct ChunkRow {
86 pub id: i64,
87 pub doc_id: i64,
88 pub seq: i32,
89 pub offset: usize,
90 pub length: usize,
91 pub heading: Option<String>,
92 pub kind: FinalChunkKind,
93 pub retrieval_prefix: Option<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ChunkInsert {
98 pub seq: i32,
99 pub offset: usize,
100 pub length: usize,
101 pub heading: Option<String>,
102 pub kind: FinalChunkKind,
103 pub retrieval_prefix: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct DocumentTextRow {
108 pub doc_id: i64,
109 pub extractor_key: String,
110 pub source_hash: String,
111 pub text_hash: String,
112 pub generation_key: String,
113 pub text: String,
114 pub created: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct ChunkTextRow {
119 pub chunk: ChunkRow,
120 pub extractor_key: String,
121 pub text: String,
122}
123
124pub struct DocumentGenerationReplace<'a> {
125 pub collection_id: i64,
126 pub path: &'a str,
127 pub title: Option<&'a str>,
128 pub hash: &'a str,
129 pub modified: &'a str,
130 pub extractor_key: &'a str,
131 pub source_hash: &'a str,
132 pub text_hash: &'a str,
133 pub generation_key: &'a str,
134 pub text: &'a str,
135 pub chunks: &'a [ChunkInsert],
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct DocumentGenerationReplaceResult {
140 pub doc_id: i64,
141 pub old_chunk_ids: Vec<i64>,
142 pub chunk_ids: Vec<i64>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct EmbedRecord {
147 pub chunk: ChunkRow,
148 pub doc_path: String,
149 pub doc_title: Option<String>,
150 pub collection_path: PathBuf,
151 pub space_name: String,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct FtsDirtyRecord {
156 pub doc_id: i64,
157 pub doc_path: String,
158 pub doc_title: Option<String>,
159 pub doc_hash: String,
160 pub collection_path: PathBuf,
161 pub space_name: String,
162 pub chunks: Vec<ChunkRow>,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct ReapableDocument {
167 pub doc_id: i64,
168 pub space_name: String,
169 pub chunk_ids: Vec<i64>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
173pub struct TantivyEntry {
174 pub chunk_id: i64,
175 pub doc_id: i64,
176 pub filepath: String,
177 pub title: Option<String>,
178 pub heading: Option<String>,
179 pub body: String,
180}
181
182#[derive(Debug, Clone, PartialEq)]
183pub struct BM25Hit {
184 pub chunk_id: i64,
185 pub score: f32,
186}
187
188#[derive(Debug, Clone, PartialEq)]
189pub struct DenseHit {
190 pub chunk_id: i64,
191 pub distance: f32,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct SearchScopeSummary {
196 pub document_ids: Vec<i64>,
197 pub chunk_count: usize,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub enum SpaceResolution {
202 Found(SpaceRow),
203 Ambiguous(Vec<String>),
204 NotFound,
205}
206
207struct SpaceIndexes {
208 _tantivy_dir: PathBuf,
209 usearch_path: PathBuf,
210 tantivy_index: Index,
211 tantivy_reader: IndexReader,
212 tantivy_writer: Mutex<Option<IndexWriter>>,
213 usearch_index: RwLock<usearch::Index>,
214 fields: TantivyFields,
215}
216
217#[derive(Debug, Clone, Copy)]
218struct TantivyFields {
219 chunk_id: Field,
220 doc_id: Field,
221 filepath: Field,
222 title: Field,
223 heading: Field,
224 body: Field,
225}
226
227#[derive(Debug, Clone, Copy)]
228struct Bm25FieldSpec {
229 field: Field,
230 boost: f32,
231 index_record_option: IndexRecordOption,
232}
233
234impl Storage {
235 pub fn new(cache_dir: &Path) -> Result<Self> {
236 std::fs::create_dir_all(cache_dir)?;
237 let db_path = cache_dir.join(DB_FILE);
238 let conn = Connection::open(db_path)?;
239 reject_incompatible_legacy_index(&conn)?;
240 conn.execute_batch(
241 r#"
242PRAGMA foreign_keys = ON;
243PRAGMA journal_mode = WAL;
244
245CREATE TABLE IF NOT EXISTS spaces (
246 id INTEGER PRIMARY KEY,
247 name TEXT NOT NULL UNIQUE,
248 description TEXT,
249 created TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
250);
251
252CREATE TABLE IF NOT EXISTS collections (
253 id INTEGER PRIMARY KEY,
254 space_id INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
255 name TEXT NOT NULL,
256 path TEXT NOT NULL,
257 description TEXT,
258 extensions TEXT,
259 created TEXT NOT NULL,
260 updated TEXT NOT NULL,
261 UNIQUE(space_id, name)
262);
263
264CREATE TABLE IF NOT EXISTS documents (
265 id INTEGER PRIMARY KEY,
266 collection_id INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
267 path TEXT NOT NULL,
268 title TEXT NOT NULL DEFAULT '',
269 hash TEXT NOT NULL,
270 modified TEXT NOT NULL,
271 active INTEGER NOT NULL DEFAULT 1,
272 deactivated_at TEXT,
273 fts_dirty INTEGER NOT NULL DEFAULT 0,
274 UNIQUE(collection_id, path)
275);
276CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
277CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
278CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
279
280CREATE TABLE IF NOT EXISTS chunks (
281 id INTEGER PRIMARY KEY,
282 doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
283 seq INTEGER NOT NULL,
284 offset INTEGER NOT NULL,
285 length INTEGER NOT NULL,
286 heading TEXT,
287 kind TEXT NOT NULL DEFAULT 'section',
288 retrieval_prefix TEXT,
289 UNIQUE(doc_id, seq)
290);
291CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
292
293CREATE TABLE IF NOT EXISTS embeddings (
294 chunk_id INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
295 model TEXT NOT NULL,
296 embedded_at TEXT NOT NULL,
297 PRIMARY KEY (chunk_id, model)
298);
299
300CREATE TABLE IF NOT EXISTS document_texts (
301 doc_id INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
302 extractor_key TEXT NOT NULL,
303 source_hash TEXT NOT NULL,
304 text_hash TEXT NOT NULL,
305 generation_key TEXT NOT NULL DEFAULT '',
306 text TEXT NOT NULL,
307 created TEXT NOT NULL
308);
309"#,
310 )?;
311 ensure_document_texts_generation_key_column(&conn)?;
312 ensure_chunks_retrieval_prefix_column(&conn)?;
313 ensure_schema_version(&conn)?;
314
315 conn.execute(
316 "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
317 params![DEFAULT_SPACE_NAME],
318 )?;
319
320 let storage = Self {
321 db: Mutex::new(conn),
322 cache_dir: cache_dir.to_path_buf(),
323 spaces: RwLock::new(HashMap::new()),
324 dirty_usearch_spaces: Mutex::new(HashSet::new()),
325 };
326 storage.open_space(DEFAULT_SPACE_NAME)?;
327
328 Ok(storage)
329 }
330
331 pub fn open_space(&self, name: &str) -> Result<()> {
332 let _space = self.get_space(name)?;
333
334 {
335 let spaces = self
336 .spaces
337 .read()
338 .map_err(|_| CoreError::poisoned("spaces"))?;
339 if spaces.contains_key(name) {
340 return Ok(());
341 }
342 }
343
344 let (tantivy_dir, usearch_path) = self.space_paths(name);
345 std::fs::create_dir_all(&tantivy_dir)?;
346 std::fs::OpenOptions::new()
347 .create(true)
348 .append(true)
349 .open(&usearch_path)?;
350
351 let mut spaces = self
352 .spaces
353 .write()
354 .map_err(|_| CoreError::poisoned("spaces"))?;
355 if spaces.contains_key(name) {
356 return Ok(());
357 }
358
359 let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
360 let tantivy_reader = tantivy_index.reader()?;
361 let usearch_index = open_or_create_usearch_index(&usearch_path)?;
362 let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
363
364 spaces.insert(
365 name.to_string(),
366 Arc::new(SpaceIndexes {
367 _tantivy_dir: tantivy_dir,
368 usearch_path,
369 tantivy_index,
370 tantivy_reader,
371 tantivy_writer: Mutex::new(None),
372 usearch_index: RwLock::new(usearch_index),
373 fields,
374 }),
375 );
376
377 Ok(())
378 }
379
380 pub fn close_space(&self, name: &str) -> Result<()> {
381 let _space = self.get_space(name)?;
382 let mut spaces = self
383 .spaces
384 .write()
385 .map_err(|_| CoreError::poisoned("spaces"))?;
386 let _removed = spaces.remove(name);
387 Ok(())
388 }
389
390 pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
391 let space_id = {
392 let conn = self
393 .db
394 .lock()
395 .map_err(|_| CoreError::poisoned("database"))?;
396
397 let result = conn.execute(
398 "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
399 params![name, description],
400 );
401
402 match result {
403 Ok(_) => conn.last_insert_rowid(),
404 Err(err) => {
405 return match err {
406 Error::SqliteFailure(sqlite_err, _)
407 if sqlite_err.code == ErrorCode::ConstraintViolation =>
408 {
409 Err(KboltError::SpaceAlreadyExists {
410 name: name.to_string(),
411 }
412 .into())
413 }
414 other => Err(other.into()),
415 };
416 }
417 }
418 };
419
420 if let Err(open_err) = self.open_space(name) {
421 let rollback_result = self
422 .db
423 .lock()
424 .map_err(|_| CoreError::poisoned("database"))?
425 .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
426
427 if let Err(rollback_err) = rollback_result {
428 return Err(CoreError::Internal(format!(
429 "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
430 )));
431 }
432
433 return Err(open_err);
434 }
435
436 Ok(space_id)
437 }
438
439 pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
440 let conn = self
441 .db
442 .lock()
443 .map_err(|_| CoreError::poisoned("database"))?;
444 let mut stmt =
445 conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
446
447 let row = stmt.query_row(params![name], decode_space_row);
448
449 match row {
450 Ok(space) => Ok(space),
451 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
452 name: name.to_string(),
453 }
454 .into()),
455 Err(err) => Err(err.into()),
456 }
457 }
458
459 pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
460 let conn = self
461 .db
462 .lock()
463 .map_err(|_| CoreError::poisoned("database"))?;
464 let mut stmt =
465 conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
466
467 let row = stmt.query_row(params![id], decode_space_row);
468
469 match row {
470 Ok(space) => Ok(space),
471 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
472 name: format!("id={id}"),
473 }
474 .into()),
475 Err(err) => Err(err.into()),
476 }
477 }
478
479 pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
480 let conn = self
481 .db
482 .lock()
483 .map_err(|_| CoreError::poisoned("database"))?;
484 let mut stmt = conn.prepare(
485 "SELECT id, name, description, created
486 FROM spaces
487 ORDER BY name ASC",
488 )?;
489
490 let rows = stmt.query_map([], decode_space_row)?;
491
492 let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
493 Ok(spaces)
494 }
495
496 pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
497 let conn = self
498 .db
499 .lock()
500 .map_err(|_| CoreError::poisoned("database"))?;
501 let mut stmt = conn.prepare(
502 "SELECT s.id, s.name, s.description, s.created
503 FROM spaces s
504 JOIN collections c ON c.space_id = s.id
505 WHERE c.name = ?1
506 ORDER BY s.name ASC",
507 )?;
508 let rows = stmt.query_map(params![collection], decode_space_row)?;
509 let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
510
511 if matches.is_empty() {
512 return Ok(SpaceResolution::NotFound);
513 }
514
515 if matches.len() == 1 {
516 return Ok(SpaceResolution::Found(matches[0].clone()));
517 }
518
519 let spaces = matches.into_iter().map(|space| space.name).collect();
520 Ok(SpaceResolution::Ambiguous(spaces))
521 }
522
523 pub fn delete_space(&self, name: &str) -> Result<()> {
524 if name == DEFAULT_SPACE_NAME {
525 let conn = self
526 .db
527 .lock()
528 .map_err(|_| CoreError::poisoned("database"))?;
529 conn.execute(
530 "DELETE FROM collections
531 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
532 params![name],
533 )?;
534 drop(conn);
535
536 self.unload_space(name)?;
537 self.remove_space_artifacts(name)?;
538 self.open_space(name)?;
539 return Ok(());
540 }
541
542 let conn = self
543 .db
544 .lock()
545 .map_err(|_| CoreError::poisoned("database"))?;
546 let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
547 drop(conn);
548
549 if deleted == 0 {
550 return Err(KboltError::SpaceNotFound {
551 name: name.to_string(),
552 }
553 .into());
554 }
555
556 self.unload_space(name)?;
557 self.remove_space_artifacts(name)?;
558
559 Ok(())
560 }
561
562 pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
563 if old == DEFAULT_SPACE_NAME {
564 return Err(
565 KboltError::Config("cannot rename reserved space: default".to_string()).into(),
566 );
567 }
568
569 let conn = self
570 .db
571 .lock()
572 .map_err(|_| CoreError::poisoned("database"))?;
573
574 let result = conn.execute(
575 "UPDATE spaces SET name = ?1 WHERE name = ?2",
576 params![new, old],
577 );
578 drop(conn);
579
580 match result {
581 Ok(0) => Err(KboltError::SpaceNotFound {
582 name: old.to_string(),
583 }
584 .into()),
585 Ok(_) => {
586 if let Err(rename_err) = self.rename_space_artifacts(old, new) {
587 let rollback = self
588 .db
589 .lock()
590 .map_err(|_| CoreError::poisoned("database"))?
591 .execute(
592 "UPDATE spaces SET name = ?1 WHERE name = ?2",
593 params![old, new],
594 );
595
596 if let Err(rollback_err) = rollback {
597 return Err(CoreError::Internal(format!(
598 "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
599 )));
600 }
601
602 return Err(rename_err);
603 }
604
605 self.unload_space(old)?;
606 if let Err(open_err) = self.open_space(new) {
607 let _ = self.rename_space_artifacts(new, old);
608 let _ = self
609 .db
610 .lock()
611 .map_err(|_| CoreError::poisoned("database"))?
612 .execute(
613 "UPDATE spaces SET name = ?1 WHERE name = ?2",
614 params![old, new],
615 );
616 let _ = self.open_space(old);
617 return Err(open_err);
618 }
619
620 Ok(())
621 }
622 Err(Error::SqliteFailure(sqlite_err, _))
623 if sqlite_err.code == ErrorCode::ConstraintViolation =>
624 {
625 Err(KboltError::SpaceAlreadyExists {
626 name: new.to_string(),
627 }
628 .into())
629 }
630 Err(err) => Err(err.into()),
631 }
632 }
633
634 pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
635 let conn = self
636 .db
637 .lock()
638 .map_err(|_| CoreError::poisoned("database"))?;
639
640 let updated = conn.execute(
641 "UPDATE spaces SET description = ?1 WHERE name = ?2",
642 params![description, name],
643 )?;
644
645 if updated == 0 {
646 return Err(KboltError::SpaceNotFound {
647 name: name.to_string(),
648 }
649 .into());
650 }
651
652 Ok(())
653 }
654
655 pub fn create_collection(
656 &self,
657 space_id: i64,
658 name: &str,
659 path: &Path,
660 description: Option<&str>,
661 extensions: Option<&[String]>,
662 ) -> Result<i64> {
663 let conn = self
664 .db
665 .lock()
666 .map_err(|_| CoreError::poisoned("database"))?;
667
668 let space_name = lookup_space_name(&conn, space_id)?;
669 let extensions_json = serialize_extensions(extensions)?;
670 let result = conn.execute(
671 "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
672 VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
673 params![space_id, name, path.to_string_lossy(), description, extensions_json],
674 );
675
676 match result {
677 Ok(_) => Ok(conn.last_insert_rowid()),
678 Err(Error::SqliteFailure(sqlite_err, _))
679 if sqlite_err.code == ErrorCode::ConstraintViolation =>
680 {
681 Err(KboltError::CollectionAlreadyExists {
682 name: name.to_string(),
683 space: space_name,
684 }
685 .into())
686 }
687 Err(err) => Err(err.into()),
688 }
689 }
690
691 pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
692 let conn = self
693 .db
694 .lock()
695 .map_err(|_| CoreError::poisoned("database"))?;
696 let mut stmt = conn.prepare(
697 "SELECT id, space_id, name, path, description, extensions, created, updated
698 FROM collections
699 WHERE space_id = ?1 AND name = ?2",
700 )?;
701
702 let result = stmt.query_row(params![space_id, name], decode_collection_row);
703 match result {
704 Ok(row) => Ok(row),
705 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
706 name: name.to_string(),
707 }
708 .into()),
709 Err(err) => Err(err.into()),
710 }
711 }
712
713 pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
714 let conn = self
715 .db
716 .lock()
717 .map_err(|_| CoreError::poisoned("database"))?;
718 let mut stmt = conn.prepare(
719 "SELECT id, space_id, name, path, description, extensions, created, updated
720 FROM collections
721 WHERE id = ?1",
722 )?;
723
724 let result = stmt.query_row(params![id], decode_collection_row);
725 match result {
726 Ok(row) => Ok(row),
727 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
728 name: format!("id={id}"),
729 }
730 .into()),
731 Err(err) => Err(err.into()),
732 }
733 }
734
735 pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
736 let conn = self
737 .db
738 .lock()
739 .map_err(|_| CoreError::poisoned("database"))?;
740
741 let (sql, params): (&str, Vec<i64>) = match space_id {
742 Some(id) => (
743 "SELECT id, space_id, name, path, description, extensions, created, updated
744 FROM collections
745 WHERE space_id = ?1
746 ORDER BY name ASC",
747 vec![id],
748 ),
749 None => (
750 "SELECT id, space_id, name, path, description, extensions, created, updated
751 FROM collections
752 ORDER BY space_id ASC, name ASC",
753 Vec::new(),
754 ),
755 };
756
757 let mut stmt = conn.prepare(sql)?;
758 let rows = if params.is_empty() {
759 stmt.query_map([], decode_collection_row)?
760 } else {
761 stmt.query_map(params![params[0]], decode_collection_row)?
762 };
763 let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
764 Ok(collections)
765 }
766
767 pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
768 let conn = self
769 .db
770 .lock()
771 .map_err(|_| CoreError::poisoned("database"))?;
772 let _space_name = lookup_space_name(&conn, space_id)?;
773
774 query_count(
775 &conn,
776 "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
777 params![space_id],
778 )
779 }
780
781 pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
782 let conn = self
783 .db
784 .lock()
785 .map_err(|_| CoreError::poisoned("database"))?;
786 let _space_name = lookup_space_name(&conn, space_id)?;
787
788 let deleted = conn.execute(
789 "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
790 params![space_id, name],
791 )?;
792
793 if deleted == 0 {
794 return Err(KboltError::CollectionNotFound {
795 name: name.to_string(),
796 }
797 .into());
798 }
799
800 Ok(())
801 }
802
803 pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
804 let conn = self
805 .db
806 .lock()
807 .map_err(|_| CoreError::poisoned("database"))?;
808 let space_name = lookup_space_name(&conn, space_id)?;
809 let result = conn.execute(
810 "UPDATE collections
811 SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
812 WHERE space_id = ?2 AND name = ?3",
813 params![new, space_id, old],
814 );
815
816 match result {
817 Ok(0) => Err(KboltError::CollectionNotFound {
818 name: old.to_string(),
819 }
820 .into()),
821 Ok(_) => Ok(()),
822 Err(Error::SqliteFailure(sqlite_err, _))
823 if sqlite_err.code == ErrorCode::ConstraintViolation =>
824 {
825 Err(KboltError::CollectionAlreadyExists {
826 name: new.to_string(),
827 space: space_name,
828 }
829 .into())
830 }
831 Err(err) => Err(err.into()),
832 }
833 }
834
835 pub fn update_collection_description(
836 &self,
837 space_id: i64,
838 name: &str,
839 desc: &str,
840 ) -> Result<()> {
841 let conn = self
842 .db
843 .lock()
844 .map_err(|_| CoreError::poisoned("database"))?;
845 let _space_name = lookup_space_name(&conn, space_id)?;
846
847 let updated = conn.execute(
848 "UPDATE collections
849 SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
850 WHERE space_id = ?2 AND name = ?3",
851 params![desc, space_id, name],
852 )?;
853
854 if updated == 0 {
855 return Err(KboltError::CollectionNotFound {
856 name: name.to_string(),
857 }
858 .into());
859 }
860
861 Ok(())
862 }
863
864 pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
865 let conn = self
866 .db
867 .lock()
868 .map_err(|_| CoreError::poisoned("database"))?;
869
870 let updated = conn.execute(
871 "UPDATE collections
872 SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
873 WHERE id = ?1",
874 params![collection_id],
875 )?;
876
877 if updated == 0 {
878 return Err(KboltError::CollectionNotFound {
879 name: format!("id={collection_id}"),
880 }
881 .into());
882 }
883
884 Ok(())
885 }
886
887 pub fn upsert_document(
888 &self,
889 collection_id: i64,
890 path: &str,
891 title: Option<&str>,
892 hash: &str,
893 modified: &str,
894 ) -> Result<i64> {
895 let conn = self
896 .db
897 .lock()
898 .map_err(|_| CoreError::poisoned("database"))?;
899 let _collection_name = lookup_collection_name(&conn, collection_id)?;
900
901 let id = conn.query_row(
902 "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
903 VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
904 ON CONFLICT(collection_id, path) DO UPDATE SET
905 title = excluded.title,
906 hash = excluded.hash,
907 modified = excluded.modified,
908 active = 1,
909 deactivated_at = NULL,
910 fts_dirty = 1
911 RETURNING id",
912 params![
913 collection_id,
914 path,
915 normalized_title(title).unwrap_or(""),
916 hash,
917 modified
918 ],
919 |row| row.get(0),
920 )?;
921 Ok(id)
922 }
923
924 pub fn get_document_by_path(
925 &self,
926 collection_id: i64,
927 path: &str,
928 ) -> Result<Option<DocumentRow>> {
929 let conn = self
930 .db
931 .lock()
932 .map_err(|_| CoreError::poisoned("database"))?;
933 let _collection_name = lookup_collection_name(&conn, collection_id)?;
934 let mut stmt = conn.prepare(
935 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
936 FROM documents
937 WHERE collection_id = ?1 AND path = ?2",
938 )?;
939
940 let result = stmt.query_row(params![collection_id, path], decode_document_row);
941 match result {
942 Ok(row) => Ok(Some(row)),
943 Err(Error::QueryReturnedNoRows) => Ok(None),
944 Err(err) => Err(err.into()),
945 }
946 }
947
948 pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
949 let conn = self
950 .db
951 .lock()
952 .map_err(|_| CoreError::poisoned("database"))?;
953 let mut stmt = conn.prepare(
954 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
955 FROM documents
956 WHERE id = ?1",
957 )?;
958
959 let result = stmt.query_row(params![id], decode_document_row);
960 match result {
961 Ok(row) => Ok(row),
962 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
963 path: format!("id={id}"),
964 }
965 .into()),
966 Err(err) => Err(err.into()),
967 }
968 }
969
970 pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
971 if ids.is_empty() {
972 return Ok(Vec::new());
973 }
974
975 let conn = self
976 .db
977 .lock()
978 .map_err(|_| CoreError::poisoned("database"))?;
979
980 let placeholders = vec!["?"; ids.len()].join(", ");
981 let sql = format!(
982 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
983 FROM documents
984 WHERE id IN ({placeholders})"
985 );
986 let mut stmt = conn.prepare(&sql)?;
987 let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
988 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
989 Ok(docs)
990 }
991
992 pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
993 let conn = self
994 .db
995 .lock()
996 .map_err(|_| CoreError::poisoned("database"))?;
997
998 let updated = conn.execute(
999 "UPDATE documents
1000 SET modified = ?1,
1001 active = 1,
1002 deactivated_at = NULL
1003 WHERE id = ?2",
1004 params![modified, doc_id],
1005 )?;
1006
1007 if updated == 0 {
1008 return Err(KboltError::DocumentNotFound {
1009 path: format!("id={doc_id}"),
1010 }
1011 .into());
1012 }
1013
1014 Ok(())
1015 }
1016
1017 pub fn list_documents(
1018 &self,
1019 collection_id: i64,
1020 active_only: bool,
1021 ) -> Result<Vec<DocumentRow>> {
1022 let conn = self
1023 .db
1024 .lock()
1025 .map_err(|_| CoreError::poisoned("database"))?;
1026 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1027 let active_only = i64::from(active_only);
1028 let mut stmt = conn.prepare(
1029 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1030 FROM documents
1031 WHERE collection_id = ?1
1032 AND (?2 = 0 OR active = 1)
1033 ORDER BY path ASC",
1034 )?;
1035 let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1036 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1037 Ok(docs)
1038 }
1039
1040 pub fn list_collection_file_rows(
1041 &self,
1042 collection_id: i64,
1043 active_only: bool,
1044 ) -> Result<Vec<FileListRow>> {
1045 let conn = self
1046 .db
1047 .lock()
1048 .map_err(|_| CoreError::poisoned("database"))?;
1049 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1050 let active_only = i64::from(active_only);
1051 let mut stmt = conn.prepare(
1052 "SELECT d.id, d.path, d.title, d.hash, d.active,
1053 COUNT(DISTINCT c.id) AS chunk_count,
1054 COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1055 FROM documents d
1056 LEFT JOIN chunks c ON c.doc_id = d.id
1057 LEFT JOIN embeddings e ON e.chunk_id = c.id
1058 WHERE d.collection_id = ?1
1059 AND (?2 = 0 OR d.active = 1)
1060 GROUP BY d.id, d.path, d.title, d.hash, d.active
1061 ORDER BY d.path ASC",
1062 )?;
1063 let rows = stmt.query_map(params![collection_id, active_only], |row| {
1064 let chunk_count: i64 = row.get(5)?;
1065 let embedded_chunk_count: i64 = row.get(6)?;
1066 Ok(FileListRow {
1067 doc_id: row.get(0)?,
1068 path: row.get(1)?,
1069 title: decode_optional_title(row.get::<_, String>(2)?),
1070 hash: row.get(3)?,
1071 active: row.get::<_, i64>(4)? != 0,
1072 chunk_count: chunk_count as usize,
1073 embedded_chunk_count: embedded_chunk_count as usize,
1074 })
1075 })?;
1076 let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1077 Ok(files)
1078 }
1079
1080 pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1081 let conn = self
1082 .db
1083 .lock()
1084 .map_err(|_| CoreError::poisoned("database"))?;
1085
1086 let pattern = format!("{prefix}%");
1087 let mut stmt = conn.prepare(
1088 "SELECT id, collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty
1089 FROM documents
1090 WHERE hash LIKE ?1
1091 ORDER BY id ASC",
1092 )?;
1093 let rows = stmt.query_map(params![pattern], decode_document_row)?;
1094 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1095 Ok(docs)
1096 }
1097
1098 pub fn document_hashes_by_prefix_in_space(
1099 &self,
1100 prefix: &str,
1101 space: &str,
1102 ) -> Result<Vec<String>> {
1103 let conn = self
1104 .db
1105 .lock()
1106 .map_err(|_| CoreError::poisoned("database"))?;
1107
1108 let pattern = format!("{prefix}%");
1109 let mut stmt = conn.prepare(
1110 "SELECT d.hash
1111 FROM documents d
1112 JOIN collections c ON d.collection_id = c.id
1113 JOIN spaces s ON c.space_id = s.id
1114 WHERE s.name = ?1 AND d.hash LIKE ?2
1115 ORDER BY d.id ASC",
1116 )?;
1117 let rows = stmt.query_map(params![space, pattern], |row| row.get(0))?;
1118 let hashes = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1119 Ok(hashes)
1120 }
1121
1122 pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1123 let conn = self
1124 .db
1125 .lock()
1126 .map_err(|_| CoreError::poisoned("database"))?;
1127
1128 let updated = conn.execute(
1129 "UPDATE documents
1130 SET active = 0,
1131 deactivated_at = CASE
1132 WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1133 ELSE deactivated_at
1134 END
1135 WHERE id = ?1",
1136 params![doc_id],
1137 )?;
1138
1139 if updated == 0 {
1140 return Err(KboltError::DocumentNotFound {
1141 path: format!("id={doc_id}"),
1142 }
1143 .into());
1144 }
1145
1146 Ok(())
1147 }
1148
1149 pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1150 let conn = self
1151 .db
1152 .lock()
1153 .map_err(|_| CoreError::poisoned("database"))?;
1154
1155 let updated = conn.execute(
1156 "UPDATE documents
1157 SET active = 1, deactivated_at = NULL
1158 WHERE id = ?1",
1159 params![doc_id],
1160 )?;
1161
1162 if updated == 0 {
1163 return Err(KboltError::DocumentNotFound {
1164 path: format!("id={doc_id}"),
1165 }
1166 .into());
1167 }
1168
1169 Ok(())
1170 }
1171
1172 pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1173 let reaped = self.list_reapable_documents(older_than_days)?;
1174 let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1175 self.delete_documents(&doc_ids)?;
1176 Ok(doc_ids)
1177 }
1178
1179 pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1180 self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1181 }
1182
1183 pub fn list_reapable_documents_in_space(
1184 &self,
1185 older_than_days: u32,
1186 space_id: i64,
1187 ) -> Result<Vec<ReapableDocument>> {
1188 self.list_reapable_documents_filtered(
1189 older_than_days,
1190 " AND c.space_id = ?",
1191 vec![SqlValue::Integer(space_id)],
1192 )
1193 }
1194
1195 pub fn list_reapable_documents_in_collections(
1196 &self,
1197 older_than_days: u32,
1198 collection_ids: &[i64],
1199 ) -> Result<Vec<ReapableDocument>> {
1200 if collection_ids.is_empty() {
1201 return Ok(Vec::new());
1202 }
1203
1204 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1205 let clause = format!(" AND d.collection_id IN ({placeholders})");
1206 let params = collection_ids
1207 .iter()
1208 .map(|id| SqlValue::Integer(*id))
1209 .collect::<Vec<_>>();
1210 self.list_reapable_documents_filtered(older_than_days, &clause, params)
1211 }
1212
1213 fn list_reapable_documents_filtered(
1214 &self,
1215 older_than_days: u32,
1216 scope_clause: &str,
1217 scope_params: Vec<SqlValue>,
1218 ) -> Result<Vec<ReapableDocument>> {
1219 let conn = self
1220 .db
1221 .lock()
1222 .map_err(|_| CoreError::poisoned("database"))?;
1223
1224 let modifier = format!("-{} days", older_than_days);
1225 let sql = format!(
1226 "SELECT d.id, s.name
1227 FROM documents d
1228 JOIN collections c ON c.id = d.collection_id
1229 JOIN spaces s ON s.id = c.space_id
1230 WHERE d.active = 0
1231 AND d.deactivated_at IS NOT NULL
1232 AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1233 ORDER BY d.id ASC"
1234 );
1235 let mut stmt = conn.prepare(&sql)?;
1236 let mut params = Vec::with_capacity(scope_params.len() + 1);
1237 params.push(SqlValue::Text(modifier));
1238 params.extend(scope_params);
1239 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1240 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1241 })?;
1242 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1243 drop(stmt);
1244
1245 let mut documents = Vec::with_capacity(headers.len());
1246 for (doc_id, space_name) in headers {
1247 documents.push(ReapableDocument {
1248 doc_id,
1249 space_name,
1250 chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1251 });
1252 }
1253
1254 Ok(documents)
1255 }
1256
1257 pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1258 if doc_ids.is_empty() {
1259 return Ok(());
1260 }
1261
1262 let conn = self
1263 .db
1264 .lock()
1265 .map_err(|_| CoreError::poisoned("database"))?;
1266
1267 let placeholders = vec!["?"; doc_ids.len()].join(", ");
1268 let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1269 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1270 Ok(())
1271 }
1272
1273 pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1274 if chunks.is_empty() {
1275 return Ok(Vec::new());
1276 }
1277
1278 let conn = self
1279 .db
1280 .lock()
1281 .map_err(|_| CoreError::poisoned("database"))?;
1282 let _doc = lookup_document_id(&conn, doc_id)?;
1283
1284 let tx = conn.unchecked_transaction()?;
1285 let mut stmt = tx.prepare(
1286 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1287 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1288 )?;
1289
1290 let mut ids = Vec::with_capacity(chunks.len());
1291 for chunk in chunks {
1292 stmt.execute(params![
1293 doc_id,
1294 chunk.seq,
1295 chunk.offset as i64,
1296 chunk.length as i64,
1297 chunk.heading,
1298 chunk.kind.as_storage_kind(),
1299 chunk.retrieval_prefix.as_deref(),
1300 ])?;
1301 ids.push(tx.last_insert_rowid());
1302 }
1303 drop(stmt);
1304 tx.commit()?;
1305 Ok(ids)
1306 }
1307
1308 pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1309 let conn = self
1310 .db
1311 .lock()
1312 .map_err(|_| CoreError::poisoned("database"))?;
1313 let _doc = lookup_document_id(&conn, doc_id)?;
1314
1315 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1316 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1317 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1318
1319 conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1320 Ok(chunk_ids)
1321 }
1322
1323 pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1324 let conn = self
1325 .db
1326 .lock()
1327 .map_err(|_| CoreError::poisoned("database"))?;
1328 let _doc = lookup_document_id(&conn, doc_id)?;
1329
1330 let mut stmt = conn.prepare(
1331 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1332 FROM chunks
1333 WHERE doc_id = ?1
1334 ORDER BY seq ASC",
1335 )?;
1336 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1337 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1338 Ok(chunks)
1339 }
1340
1341 pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1342 if doc_ids.is_empty() {
1343 return Ok(HashMap::new());
1344 }
1345
1346 let mut requested = doc_ids.to_vec();
1347 requested.sort_unstable();
1348 requested.dedup();
1349
1350 let conn = self
1351 .db
1352 .lock()
1353 .map_err(|_| CoreError::poisoned("database"))?;
1354
1355 let placeholders = vec!["?"; requested.len()].join(", ");
1356 let sql = format!(
1357 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1358 FROM chunks
1359 WHERE doc_id IN ({placeholders})
1360 ORDER BY doc_id ASC, seq ASC"
1361 );
1362 let mut stmt = conn.prepare(&sql)?;
1363 let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1364 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1365 for doc_id in requested {
1366 chunks_by_doc.insert(doc_id, Vec::new());
1367 }
1368 for row in rows {
1369 let chunk = row?;
1370 chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1371 }
1372
1373 Ok(chunks_by_doc)
1374 }
1375
1376 pub fn get_chunks_for_document_seq_ranges(
1377 &self,
1378 ranges: &[(i64, i32, i32)],
1379 ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1380 if ranges.is_empty() {
1381 return Ok(HashMap::new());
1382 }
1383
1384 let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1385 for (doc_id, min_seq, max_seq) in ranges {
1386 if min_seq > max_seq {
1387 return Err(CoreError::Internal(format!(
1388 "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1389 )));
1390 }
1391
1392 ranges_by_doc
1393 .entry(*doc_id)
1394 .or_default()
1395 .push((*min_seq, *max_seq));
1396 }
1397
1398 for doc_ranges in ranges_by_doc.values_mut() {
1399 doc_ranges.sort_unstable();
1400 let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1401 for (min_seq, max_seq) in doc_ranges.drain(..) {
1402 if let Some((_, merged_max)) = merged.last_mut() {
1403 if min_seq <= merged_max.saturating_add(1) {
1404 *merged_max = (*merged_max).max(max_seq);
1405 continue;
1406 }
1407 }
1408 merged.push((min_seq, max_seq));
1409 }
1410 *doc_ranges = merged;
1411 }
1412
1413 let conn = self
1414 .db
1415 .lock()
1416 .map_err(|_| CoreError::poisoned("database"))?;
1417 let mut stmt = conn.prepare(
1418 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1419 FROM chunks
1420 WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1421 ORDER BY seq ASC",
1422 )?;
1423
1424 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1425 for (doc_id, doc_ranges) in ranges_by_doc {
1426 chunks_by_doc.entry(doc_id).or_default();
1427 for (min_seq, max_seq) in doc_ranges {
1428 let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1429 for row in rows {
1430 chunks_by_doc.entry(doc_id).or_default().push(row?);
1431 }
1432 }
1433 }
1434
1435 Ok(chunks_by_doc)
1436 }
1437
1438 pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1439 if chunk_ids.is_empty() {
1440 return Ok(Vec::new());
1441 }
1442
1443 let conn = self
1444 .db
1445 .lock()
1446 .map_err(|_| CoreError::poisoned("database"))?;
1447
1448 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1449 let sql = format!(
1450 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1451 FROM chunks
1452 WHERE id IN ({placeholders})
1453 ORDER BY id ASC"
1454 );
1455 let mut stmt = conn.prepare(&sql)?;
1456 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1457 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1458 Ok(chunks)
1459 }
1460
1461 pub fn get_chunks_with_documents(
1462 &self,
1463 chunk_ids: &[i64],
1464 ) -> Result<Vec<(ChunkRow, DocumentRow)>> {
1465 if chunk_ids.is_empty() {
1466 return Ok(Vec::new());
1467 }
1468
1469 let conn = self
1470 .db
1471 .lock()
1472 .map_err(|_| CoreError::poisoned("database"))?;
1473
1474 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1475 let sql = format!(
1476 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
1477 d.id, d.collection_id, d.path, d.title, d.hash, d.modified, d.active, d.deactivated_at, d.fts_dirty
1478 FROM chunks c
1479 JOIN documents d ON d.id = c.doc_id
1480 WHERE c.id IN ({placeholders})
1481 ORDER BY c.id ASC"
1482 );
1483 let mut stmt = conn.prepare(&sql)?;
1484 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), |row| {
1485 Ok((
1486 decode_chunk_row_at(row, 0)?,
1487 decode_document_row_at(row, 8)?,
1488 ))
1489 })?;
1490 let rows = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1491 Ok(rows)
1492 }
1493
1494 pub fn get_active_search_scope_summary_in_collections(
1495 &self,
1496 collection_ids: &[i64],
1497 ) -> Result<SearchScopeSummary> {
1498 if collection_ids.is_empty() {
1499 return Ok(SearchScopeSummary {
1500 document_ids: Vec::new(),
1501 chunk_count: 0,
1502 });
1503 }
1504
1505 let conn = self
1506 .db
1507 .lock()
1508 .map_err(|_| CoreError::poisoned("database"))?;
1509
1510 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1511 let sql = format!(
1512 "SELECT d.id, COUNT(c.id)
1513 FROM chunks c
1514 JOIN documents d ON d.id = c.doc_id
1515 WHERE d.active = 1
1516 AND d.collection_id IN ({placeholders})
1517 GROUP BY d.id
1518 ORDER BY d.id ASC"
1519 );
1520 let mut stmt = conn.prepare(&sql)?;
1521 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1522 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1523 })?;
1524
1525 let mut document_ids = Vec::new();
1526 let mut chunk_count = 0usize;
1527 for row in rows {
1528 let (doc_id, doc_chunk_count) = row?;
1529 let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1530 CoreError::Internal(format!(
1531 "active chunk count must be non-negative for document {doc_id}"
1532 ))
1533 })?;
1534 document_ids.push(doc_id);
1535 chunk_count = chunk_count.saturating_add(doc_chunk_count);
1536 }
1537
1538 Ok(SearchScopeSummary {
1539 document_ids,
1540 chunk_count,
1541 })
1542 }
1543
1544 pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1545 if collection_ids.is_empty() {
1546 return Ok(0);
1547 }
1548
1549 let conn = self
1550 .db
1551 .lock()
1552 .map_err(|_| CoreError::poisoned("database"))?;
1553
1554 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1555 let sql = format!(
1556 "SELECT COUNT(*)
1557 FROM chunks c
1558 JOIN documents d ON d.id = c.doc_id
1559 WHERE d.active = 1
1560 AND d.collection_id IN ({placeholders})"
1561 );
1562 query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1563 }
1564
1565 pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1566 if collection_ids.is_empty() {
1567 return Ok(false);
1568 }
1569
1570 let conn = self
1571 .db
1572 .lock()
1573 .map_err(|_| CoreError::poisoned("database"))?;
1574
1575 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1576 let sql = format!(
1577 "SELECT EXISTS(
1578 SELECT 1
1579 FROM documents
1580 WHERE active = 0
1581 AND collection_id IN ({placeholders})
1582 )"
1583 );
1584 let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1585 row.get(0)
1586 })?;
1587 Ok(exists != 0)
1588 }
1589
1590 pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1591 if collection_ids.is_empty() {
1592 return Ok(Vec::new());
1593 }
1594
1595 let conn = self
1596 .db
1597 .lock()
1598 .map_err(|_| CoreError::poisoned("database"))?;
1599
1600 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1601 let sql = format!(
1602 "SELECT c.id
1603 FROM chunks c
1604 JOIN documents d ON d.id = c.doc_id
1605 WHERE d.active = 1
1606 AND d.collection_id IN ({placeholders})
1607 ORDER BY d.id ASC, c.seq ASC"
1608 );
1609 let mut stmt = conn.prepare(&sql)?;
1610 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1611 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1612 Ok(chunk_ids)
1613 }
1614
1615 pub fn replace_document_generation(
1616 &self,
1617 replacement: DocumentGenerationReplace<'_>,
1618 ) -> Result<DocumentGenerationReplaceResult> {
1619 for chunk in replacement.chunks {
1620 validate_text_span(
1621 replacement.text,
1622 chunk.offset,
1623 chunk.length,
1624 &format!("chunk seq {}", chunk.seq),
1625 )?;
1626 }
1627
1628 let conn = self
1629 .db
1630 .lock()
1631 .map_err(|_| CoreError::poisoned("database"))?;
1632 let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1633
1634 let tx = conn.unchecked_transaction()?;
1635 let doc_id: i64 = tx.query_row(
1636 "INSERT INTO documents (collection_id, path, title, hash, modified, active, deactivated_at, fts_dirty)
1637 VALUES (?1, ?2, ?3, ?4, ?5, 1, NULL, 1)
1638 ON CONFLICT(collection_id, path) DO UPDATE SET
1639 title = excluded.title,
1640 hash = excluded.hash,
1641 modified = excluded.modified,
1642 active = 1,
1643 deactivated_at = NULL,
1644 fts_dirty = 1
1645 RETURNING id",
1646 params![
1647 replacement.collection_id,
1648 replacement.path,
1649 normalized_title(replacement.title).unwrap_or(""),
1650 replacement.hash,
1651 replacement.modified
1652 ],
1653 |row| row.get(0),
1654 )?;
1655
1656 let old_chunk_ids = {
1657 let mut stmt =
1658 tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1659 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1660 rows.collect::<std::result::Result<Vec<_>, _>>()?
1661 };
1662
1663 tx.execute(
1664 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1665 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1666 ON CONFLICT(doc_id) DO UPDATE SET
1667 extractor_key = excluded.extractor_key,
1668 source_hash = excluded.source_hash,
1669 text_hash = excluded.text_hash,
1670 generation_key = excluded.generation_key,
1671 text = excluded.text,
1672 created = excluded.created",
1673 params![
1674 doc_id,
1675 replacement.extractor_key,
1676 replacement.source_hash,
1677 replacement.text_hash,
1678 replacement.generation_key,
1679 replacement.text
1680 ],
1681 )?;
1682
1683 tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1684
1685 let mut stmt = tx.prepare(
1686 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1687 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1688 )?;
1689 let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1690 for chunk in replacement.chunks {
1691 stmt.execute(params![
1692 doc_id,
1693 chunk.seq,
1694 chunk.offset as i64,
1695 chunk.length as i64,
1696 chunk.heading,
1697 chunk.kind.as_storage_kind(),
1698 chunk.retrieval_prefix.as_deref(),
1699 ])?;
1700 chunk_ids.push(tx.last_insert_rowid());
1701 }
1702 drop(stmt);
1703
1704 tx.commit()?;
1705 Ok(DocumentGenerationReplaceResult {
1706 doc_id,
1707 old_chunk_ids,
1708 chunk_ids,
1709 })
1710 }
1711
1712 pub fn put_document_text(
1713 &self,
1714 doc_id: i64,
1715 extractor_key: &str,
1716 source_hash: &str,
1717 text_hash: &str,
1718 generation_key: &str,
1719 text: &str,
1720 ) -> Result<()> {
1721 let conn = self
1722 .db
1723 .lock()
1724 .map_err(|_| CoreError::poisoned("database"))?;
1725 let _doc = lookup_document_id(&conn, doc_id)?;
1726
1727 conn.execute(
1728 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1729 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1730 ON CONFLICT(doc_id) DO UPDATE SET
1731 extractor_key = excluded.extractor_key,
1732 source_hash = excluded.source_hash,
1733 text_hash = excluded.text_hash,
1734 generation_key = excluded.generation_key,
1735 text = excluded.text,
1736 created = excluded.created",
1737 params![
1738 doc_id,
1739 extractor_key,
1740 source_hash,
1741 text_hash,
1742 generation_key,
1743 text
1744 ],
1745 )?;
1746 Ok(())
1747 }
1748
1749 pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1750 let conn = self
1751 .db
1752 .lock()
1753 .map_err(|_| CoreError::poisoned("database"))?;
1754 let _doc = lookup_document_id(&conn, doc_id)?;
1755
1756 let result = conn.query_row(
1757 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1758 FROM document_texts
1759 WHERE doc_id = ?1",
1760 params![doc_id],
1761 decode_document_text_row,
1762 );
1763 match result {
1764 Ok(row) => Ok(row),
1765 Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1766 Err(err) => Err(err.into()),
1767 }
1768 }
1769
1770 pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1771 if doc_ids.is_empty() {
1772 return Ok(HashMap::new());
1773 }
1774
1775 let mut requested = doc_ids.to_vec();
1776 requested.sort_unstable();
1777 requested.dedup();
1778
1779 let text_by_doc = self.get_existing_document_texts(&requested)?;
1780 for doc_id in requested {
1781 if !text_by_doc.contains_key(&doc_id) {
1782 return Err(missing_document_text_error(doc_id));
1783 }
1784 }
1785
1786 Ok(text_by_doc)
1787 }
1788
1789 pub fn get_existing_document_texts(
1790 &self,
1791 doc_ids: &[i64],
1792 ) -> Result<HashMap<i64, DocumentTextRow>> {
1793 if doc_ids.is_empty() {
1794 return Ok(HashMap::new());
1795 }
1796
1797 let mut requested = doc_ids.to_vec();
1798 requested.sort_unstable();
1799 requested.dedup();
1800
1801 let conn = self
1802 .db
1803 .lock()
1804 .map_err(|_| CoreError::poisoned("database"))?;
1805 let mut text_by_doc = HashMap::new();
1806 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1807 let placeholders = vec!["?"; batch.len()].join(", ");
1808 let sql = format!(
1809 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1810 FROM document_texts
1811 WHERE doc_id IN ({placeholders})
1812 ORDER BY doc_id ASC"
1813 );
1814 let mut stmt = conn.prepare(&sql)?;
1815 let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1816 for row in rows {
1817 let row = row?;
1818 text_by_doc.insert(row.doc_id, row);
1819 }
1820 }
1821
1822 Ok(text_by_doc)
1823 }
1824
1825 pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1826 let conn = self
1827 .db
1828 .lock()
1829 .map_err(|_| CoreError::poisoned("database"))?;
1830 let exists: i64 = conn.query_row(
1831 "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1832 params![doc_id],
1833 |row| row.get(0),
1834 )?;
1835 Ok(exists != 0)
1836 }
1837
1838 pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1839 let conn = self
1840 .db
1841 .lock()
1842 .map_err(|_| CoreError::poisoned("database"))?;
1843 let exists: i64 = conn.query_row(
1844 "SELECT EXISTS(
1845 SELECT 1 FROM document_texts
1846 WHERE doc_id = ?1 AND generation_key = ?2
1847 )",
1848 params![doc_id, generation_key],
1849 |row| row.get(0),
1850 )?;
1851 Ok(exists != 0)
1852 }
1853
1854 pub fn get_document_text_generation_keys(
1855 &self,
1856 doc_ids: &[i64],
1857 ) -> Result<HashMap<i64, String>> {
1858 if doc_ids.is_empty() {
1859 return Ok(HashMap::new());
1860 }
1861
1862 let mut requested = doc_ids.to_vec();
1863 requested.sort_unstable();
1864 requested.dedup();
1865
1866 let conn = self
1867 .db
1868 .lock()
1869 .map_err(|_| CoreError::poisoned("database"))?;
1870 let mut generation_by_doc = HashMap::with_capacity(requested.len());
1871 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1872 let placeholders = vec!["?"; batch.len()].join(", ");
1873 let sql = format!(
1874 "SELECT doc_id, generation_key
1875 FROM document_texts
1876 WHERE doc_id IN ({placeholders})"
1877 );
1878 let mut stmt = conn.prepare(&sql)?;
1879 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1880 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1881 })?;
1882 for row in rows {
1883 let (doc_id, generation_key) = row?;
1884 generation_by_doc.insert(doc_id, generation_key);
1885 }
1886 }
1887
1888 Ok(generation_by_doc)
1889 }
1890
1891 pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1892 if doc_ids.is_empty() {
1893 return Ok(HashMap::new());
1894 }
1895
1896 let mut requested = doc_ids.to_vec();
1897 requested.sort_unstable();
1898 requested.dedup();
1899
1900 let conn = self
1901 .db
1902 .lock()
1903 .map_err(|_| CoreError::poisoned("database"))?;
1904 let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1905 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1906 let placeholders = vec!["?"; batch.len()].join(", ");
1907 let sql = format!(
1908 "SELECT doc_id, extractor_key
1909 FROM document_texts
1910 WHERE doc_id IN ({placeholders})"
1911 );
1912 let mut stmt = conn.prepare(&sql)?;
1913 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1914 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1915 })?;
1916 for row in rows {
1917 let (doc_id, extractor_key) = row?;
1918 extractor_by_doc.insert(doc_id, extractor_key);
1919 }
1920 }
1921
1922 for doc_id in requested {
1923 if !extractor_by_doc.contains_key(&doc_id) {
1924 return Err(missing_document_text_error(doc_id));
1925 }
1926 }
1927
1928 Ok(extractor_by_doc)
1929 }
1930
1931 pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1932 if chunk_ids.is_empty() {
1933 return Ok(HashMap::new());
1934 }
1935
1936 let mut requested = chunk_ids.to_vec();
1937 requested.sort_unstable();
1938 requested.dedup();
1939
1940 let conn = self
1941 .db
1942 .lock()
1943 .map_err(|_| CoreError::poisoned("database"))?;
1944 let mut text_by_chunk = HashMap::with_capacity(requested.len());
1945 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1946 let placeholders = vec!["?"; batch.len()].join(", ");
1947 let sql = format!(
1948 "SELECT c.id,
1949 c.offset,
1950 c.length,
1951 length(CAST(dt.text AS BLOB)),
1952 substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1953 substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1954 substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1955 FROM chunks c
1956 JOIN document_texts dt ON dt.doc_id = c.doc_id
1957 WHERE c.id IN ({placeholders})"
1958 );
1959 let mut stmt = conn.prepare(&sql)?;
1960 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1961 Ok((
1962 row.get::<_, i64>(0)?,
1963 row.get::<_, i64>(1)?,
1964 row.get::<_, i64>(2)?,
1965 row.get::<_, i64>(3)?,
1966 row.get::<_, Vec<u8>>(4)?,
1967 row.get::<_, Vec<u8>>(5)?,
1968 row.get::<_, Vec<u8>>(6)?,
1969 ))
1970 })?;
1971 for row in rows {
1972 let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1973 let text = canonical_chunk_slice_from_bytes(
1974 chunk_id,
1975 offset,
1976 length,
1977 document_len,
1978 start_byte,
1979 end_byte,
1980 bytes,
1981 )?;
1982 text_by_chunk.insert(chunk_id, text);
1983 }
1984 }
1985
1986 for chunk_id in requested {
1987 if !text_by_chunk.contains_key(&chunk_id) {
1988 return Err(CoreError::Internal(format!(
1989 "canonical text missing for chunk {chunk_id}"
1990 )));
1991 }
1992 }
1993
1994 Ok(text_by_chunk)
1995 }
1996
1997 pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
1998 let conn = self
1999 .db
2000 .lock()
2001 .map_err(|_| CoreError::poisoned("database"))?;
2002 let result = conn.query_row(
2003 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
2004 FROM chunks c
2005 JOIN document_texts dt ON dt.doc_id = c.doc_id
2006 WHERE c.id = ?1",
2007 params![chunk_id],
2008 |row| {
2009 let chunk = decode_chunk_row(row)?;
2010 let extractor_key: String = row.get(8)?;
2011 let document_text: String = row.get(9)?;
2012 Ok((chunk, extractor_key, document_text))
2013 },
2014 );
2015 let (chunk, extractor_key, document_text) = match result {
2016 Ok(row) => row,
2017 Err(Error::QueryReturnedNoRows) => {
2018 let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
2019 return Err(missing_document_text_error(doc_id));
2020 }
2021 Err(err) => return Err(err.into()),
2022 };
2023 let text = chunk_text_from_canonical(&document_text, &chunk)?;
2024 Ok(ChunkTextRow {
2025 chunk,
2026 extractor_key,
2027 text,
2028 })
2029 }
2030
2031 pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2032 if entries.is_empty() {
2033 return Ok(());
2034 }
2035
2036 let conn = self
2037 .db
2038 .lock()
2039 .map_err(|_| CoreError::poisoned("database"))?;
2040
2041 let tx = conn.unchecked_transaction()?;
2042 let mut stmt = tx.prepare(
2043 "INSERT INTO embeddings (chunk_id, model, embedded_at)
2044 VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2045 ON CONFLICT(chunk_id, model) DO UPDATE SET
2046 embedded_at = excluded.embedded_at",
2047 )?;
2048
2049 for (chunk_id, model) in entries {
2050 stmt.execute(params![chunk_id, model])?;
2051 }
2052
2053 drop(stmt);
2054 tx.commit()?;
2055 Ok(())
2056 }
2057
2058 pub fn get_unembedded_chunks(
2059 &self,
2060 model: &str,
2061 after_chunk_id: i64,
2062 limit: usize,
2063 ) -> Result<Vec<EmbedRecord>> {
2064 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2065 }
2066
2067 pub fn get_unembedded_chunks_in_space(
2068 &self,
2069 model: &str,
2070 space_id: i64,
2071 after_chunk_id: i64,
2072 limit: usize,
2073 ) -> Result<Vec<EmbedRecord>> {
2074 self.get_unembedded_chunks_filtered(
2075 model,
2076 after_chunk_id,
2077 limit,
2078 " AND col.space_id = ?",
2079 vec![SqlValue::Integer(space_id)],
2080 )
2081 }
2082
2083 pub fn get_unembedded_chunks_in_collections(
2084 &self,
2085 model: &str,
2086 collection_ids: &[i64],
2087 after_chunk_id: i64,
2088 limit: usize,
2089 ) -> Result<Vec<EmbedRecord>> {
2090 if collection_ids.is_empty() {
2091 return Ok(Vec::new());
2092 }
2093
2094 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2095 let clause = format!(" AND d.collection_id IN ({placeholders})");
2096 let params = collection_ids
2097 .iter()
2098 .map(|id| SqlValue::Integer(*id))
2099 .collect::<Vec<_>>();
2100 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2101 }
2102
2103 fn get_unembedded_chunks_filtered(
2104 &self,
2105 model: &str,
2106 after_chunk_id: i64,
2107 limit: usize,
2108 scope_clause: &str,
2109 scope_params: Vec<SqlValue>,
2110 ) -> Result<Vec<EmbedRecord>> {
2111 let conn = self
2112 .db
2113 .lock()
2114 .map_err(|_| CoreError::poisoned("database"))?;
2115 let sql_limit = i64::try_from(limit)
2116 .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2117
2118 let sql = format!(
2119 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2120 d.path, d.title, col.path, s.name
2121 FROM chunks c
2122 JOIN documents d ON d.id = c.doc_id
2123 JOIN collections col ON col.id = d.collection_id
2124 JOIN spaces s ON s.id = col.space_id
2125 LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2126 WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2127 ORDER BY c.id ASC
2128 LIMIT ?"
2129 );
2130 let mut stmt = conn.prepare(&sql)?;
2131 let mut params = Vec::with_capacity(scope_params.len() + 3);
2132 params.push(SqlValue::Text(model.to_string()));
2133 params.push(SqlValue::Integer(after_chunk_id));
2134 params.extend(scope_params);
2135 params.push(SqlValue::Integer(sql_limit));
2136 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2137 Ok(EmbedRecord {
2138 chunk: decode_chunk_row(row)?,
2139 doc_path: row.get(8)?,
2140 doc_title: decode_optional_title(row.get::<_, String>(9)?),
2141 collection_path: PathBuf::from(row.get::<_, String>(10)?),
2142 space_name: row.get(11)?,
2143 })
2144 })?;
2145
2146 let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2147 Ok(records)
2148 }
2149
2150 pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2151 let conn = self
2152 .db
2153 .lock()
2154 .map_err(|_| CoreError::poisoned("database"))?;
2155
2156 let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2157 Ok(deleted)
2158 }
2159
2160 pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2161 let conn = self
2162 .db
2163 .lock()
2164 .map_err(|_| CoreError::poisoned("database"))?;
2165 let _space_name = lookup_space_name(&conn, space_id)?;
2166
2167 let deleted = conn.execute(
2168 "DELETE FROM embeddings
2169 WHERE chunk_id IN (
2170 SELECT c.id
2171 FROM chunks c
2172 JOIN documents d ON d.id = c.doc_id
2173 JOIN collections col ON col.id = d.collection_id
2174 WHERE col.space_id = ?1
2175 )",
2176 params![space_id],
2177 )?;
2178 Ok(deleted)
2179 }
2180
2181 pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2182 let conn = self
2183 .db
2184 .lock()
2185 .map_err(|_| CoreError::poisoned("database"))?;
2186 let _space_name = lookup_space_name(&conn, space_id)?;
2187
2188 let mut stmt = conn.prepare(
2189 "SELECT DISTINCT e.model
2190 FROM embeddings e
2191 JOIN chunks c ON c.id = e.chunk_id
2192 JOIN documents d ON d.id = c.doc_id
2193 JOIN collections col ON col.id = d.collection_id
2194 WHERE col.space_id = ?1
2195 ORDER BY e.model ASC",
2196 )?;
2197 let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2198 let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2199 Ok(models)
2200 }
2201
2202 pub fn count_embeddings(&self) -> Result<usize> {
2203 let conn = self
2204 .db
2205 .lock()
2206 .map_err(|_| CoreError::poisoned("database"))?;
2207
2208 let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2209 Ok(count as usize)
2210 }
2211
2212 pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2213 if entries.is_empty() {
2214 return Ok(());
2215 }
2216
2217 let space_indexes = self.get_space_indexes(space)?;
2218 with_tantivy_writer(&space_indexes, |writer| {
2219 for entry in entries {
2220 let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2221 CoreError::Internal(format!(
2222 "chunk_id must be non-negative for tantivy indexing: {}",
2223 entry.chunk_id
2224 ))
2225 })?;
2226 let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2227 CoreError::Internal(format!(
2228 "doc_id must be non-negative for tantivy indexing: {}",
2229 entry.doc_id
2230 ))
2231 })?;
2232
2233 let mut doc = TantivyDocument::default();
2234 doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2235 doc.add_u64(space_indexes.fields.doc_id, doc_id);
2236 doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2237 if let Some(title) = &entry.title {
2238 doc.add_text(space_indexes.fields.title, title);
2239 }
2240 if let Some(heading) = &entry.heading {
2241 doc.add_text(space_indexes.fields.heading, heading);
2242 }
2243 doc.add_text(space_indexes.fields.body, &entry.body);
2244 writer.add_document(doc)?;
2245 }
2246 Ok(())
2247 })
2248 }
2249
2250 pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2251 if chunk_ids.is_empty() {
2252 return Ok(());
2253 }
2254
2255 let space_indexes = self.get_space_indexes(space)?;
2256 with_tantivy_writer(&space_indexes, |writer| {
2257 for chunk_id in chunk_ids {
2258 let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2259 CoreError::Internal(format!(
2260 "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2261 ))
2262 })?;
2263 writer.delete_term(Term::from_field_u64(
2264 space_indexes.fields.chunk_id,
2265 chunk_key,
2266 ));
2267 }
2268
2269 Ok(())
2270 })
2271 }
2272
2273 pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2274 let space_indexes = self.get_space_indexes(space)?;
2275 with_tantivy_writer(&space_indexes, |writer| {
2276 let doc_key = u64::try_from(doc_id).map_err(|_| {
2277 CoreError::Internal(format!(
2278 "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2279 ))
2280 })?;
2281 writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2282 Ok(())
2283 })
2284 }
2285
2286 pub fn query_bm25(
2287 &self,
2288 space: &str,
2289 query: &str,
2290 fields: &[(&str, f32)],
2291 limit: usize,
2292 ) -> Result<Vec<BM25Hit>> {
2293 self.query_bm25_filtered(space, query, fields, None, limit, true)
2294 }
2295
2296 pub(crate) fn query_bm25_cached(
2297 &self,
2298 space: &str,
2299 query: &str,
2300 fields: &[(&str, f32)],
2301 limit: usize,
2302 ) -> Result<Vec<BM25Hit>> {
2303 self.query_bm25_filtered(space, query, fields, None, limit, false)
2304 }
2305
2306 pub fn query_bm25_in_documents(
2307 &self,
2308 space: &str,
2309 query: &str,
2310 fields: &[(&str, f32)],
2311 document_ids: &[i64],
2312 limit: usize,
2313 ) -> Result<Vec<BM25Hit>> {
2314 if document_ids.is_empty() {
2315 return Ok(Vec::new());
2316 }
2317
2318 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2319 }
2320
2321 pub(crate) fn query_bm25_in_documents_cached(
2322 &self,
2323 space: &str,
2324 query: &str,
2325 fields: &[(&str, f32)],
2326 document_ids: &[i64],
2327 limit: usize,
2328 ) -> Result<Vec<BM25Hit>> {
2329 if document_ids.is_empty() {
2330 return Ok(Vec::new());
2331 }
2332
2333 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2334 }
2335
2336 fn query_bm25_filtered(
2337 &self,
2338 space: &str,
2339 query: &str,
2340 fields: &[(&str, f32)],
2341 document_ids: Option<&[i64]>,
2342 limit: usize,
2343 reload_reader: bool,
2344 ) -> Result<Vec<BM25Hit>> {
2345 if limit == 0 {
2346 return Ok(Vec::new());
2347 }
2348
2349 if fields.is_empty() {
2350 return Err(CoreError::Internal(
2351 "bm25 query requires at least one field".to_string(),
2352 ));
2353 }
2354
2355 let space_indexes = self.get_space_indexes(space)?;
2356
2357 let schema = space_indexes.tantivy_index.schema();
2358 let query_fields = fields
2359 .iter()
2360 .map(|(name, boost)| {
2361 let field = resolve_tantivy_field(space_indexes.fields, name)?;
2362 let field_entry = schema.get_field_entry(field);
2363 let index_record_option = field_entry
2364 .field_type()
2365 .get_index_record_option()
2366 .ok_or_else(|| {
2367 CoreError::Internal(format!(
2368 "bm25 field '{}' is not indexed",
2369 field_entry.name()
2370 ))
2371 })?;
2372 Ok(Bm25FieldSpec {
2373 field,
2374 boost: *boost,
2375 index_record_option,
2376 })
2377 })
2378 .collect::<Result<Vec<_>>>()?;
2379 let Some(parsed_query) =
2380 build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2381 else {
2382 return Ok(Vec::new());
2383 };
2384 let parsed_query = if let Some(document_ids) = document_ids {
2385 Box::new(BooleanQuery::new(vec![
2386 (Occur::Must, parsed_query),
2387 (
2388 Occur::Must,
2389 build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2390 ),
2391 ])) as Box<dyn Query>
2392 } else {
2393 parsed_query
2394 };
2395 if reload_reader {
2396 space_indexes.tantivy_reader.reload()?;
2397 }
2398 let searcher = space_indexes.tantivy_reader.searcher();
2399 let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2400
2401 let mut hits = Vec::with_capacity(docs.len());
2402 let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2403 let mut chunk_id_columns = HashMap::new();
2404 for (score, address) in docs {
2405 let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2406 std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2407 std::collections::hash_map::Entry::Vacant(entry) => {
2408 let column = searcher
2409 .segment_reader(address.segment_ord)
2410 .fast_fields()
2411 .u64(chunk_id_field_name)?;
2412 entry.insert(column)
2413 }
2414 };
2415 let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2416 CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2417 })?;
2418 let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2419 CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2420 })?;
2421 hits.push(BM25Hit { chunk_id, score });
2422 }
2423
2424 Ok(hits)
2425 }
2426
2427 pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2428 let space_indexes = self.get_space_indexes(space)?;
2429 space_indexes.tantivy_reader.reload()?;
2430 Ok(())
2431 }
2432
2433 pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2434 let space_indexes = self.get_space_indexes(space)?;
2435 let mut writer = space_indexes
2436 .tantivy_writer
2437 .lock()
2438 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2439 let Some(mut writer) = writer.take() else {
2440 return Ok(());
2441 };
2442 writer.commit()?;
2443 space_indexes.tantivy_reader.reload()?;
2444 Ok(())
2445 }
2446
2447 pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2448 self.batch_insert_usearch(space, &[(key, vector)])
2449 }
2450
2451 pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2452 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2453 }
2454
2455 pub(crate) fn batch_insert_usearch_deferred(
2456 &self,
2457 space: &str,
2458 entries: &[(i64, &[f32])],
2459 ) -> Result<()> {
2460 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2461 }
2462
2463 fn batch_insert_usearch_with_save_mode(
2464 &self,
2465 space: &str,
2466 entries: &[(i64, &[f32])],
2467 save_mode: UsearchSaveMode,
2468 ) -> Result<()> {
2469 if entries.is_empty() {
2470 return Ok(());
2471 }
2472
2473 let space_indexes = self.get_space_indexes(space)?;
2474
2475 let expected_dimensions = entries[0].1.len();
2476 if expected_dimensions == 0 {
2477 return Err(CoreError::Internal(
2478 "cannot insert empty vector into usearch index".to_string(),
2479 ));
2480 }
2481 for (_, vector) in entries {
2482 if vector.len() != expected_dimensions {
2483 return Err(CoreError::Internal(format!(
2484 "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2485 vector.len()
2486 )));
2487 }
2488 }
2489
2490 let mut index = space_indexes
2491 .usearch_index
2492 .write()
2493 .map_err(|_| CoreError::poisoned("usearch index"))?;
2494 let ensure_started = std::time::Instant::now();
2495 ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2496 crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2497
2498 let target_capacity = index.size().saturating_add(entries.len());
2499 let reserve_started = std::time::Instant::now();
2500 index.reserve(target_capacity).map_err(|err| {
2501 CoreError::Internal(format!(
2502 "usearch reserve failed for {target_capacity} items: {err}"
2503 ))
2504 })?;
2505 crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2506
2507 if matches!(save_mode, UsearchSaveMode::Deferred) {
2508 self.mark_usearch_dirty(space)?;
2509 }
2510
2511 let add_started = std::time::Instant::now();
2512 for (key, vector) in entries {
2513 let key = u64::try_from(*key).map_err(|_| {
2514 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2515 })?;
2516 index
2517 .add::<f32>(key, vector)
2518 .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2519 }
2520 crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2521
2522 if matches!(save_mode, UsearchSaveMode::Immediate) {
2523 let save_started = std::time::Instant::now();
2524 save_usearch_index(&index, &space_indexes.usearch_path)?;
2525 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2526 }
2527 Ok(())
2528 }
2529
2530 pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2531 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2532 }
2533
2534 pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2535 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2536 }
2537
2538 fn delete_usearch_with_save_mode(
2539 &self,
2540 space: &str,
2541 keys: &[i64],
2542 save_mode: UsearchSaveMode,
2543 ) -> Result<()> {
2544 if keys.is_empty() {
2545 return Ok(());
2546 }
2547
2548 let space_indexes = self.get_space_indexes(space)?;
2549 let index = space_indexes
2550 .usearch_index
2551 .write()
2552 .map_err(|_| CoreError::poisoned("usearch index"))?;
2553
2554 if matches!(save_mode, UsearchSaveMode::Deferred) {
2555 self.mark_usearch_dirty(space)?;
2556 }
2557
2558 let delete_started = std::time::Instant::now();
2559 for key in keys {
2560 let key = u64::try_from(*key).map_err(|_| {
2561 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2562 })?;
2563 index
2564 .remove(key)
2565 .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2566 }
2567 crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2568
2569 if matches!(save_mode, UsearchSaveMode::Immediate) {
2570 let save_started = std::time::Instant::now();
2571 save_usearch_index(&index, &space_indexes.usearch_path)?;
2572 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2573 }
2574 Ok(())
2575 }
2576
2577 pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2578 let spaces = {
2579 let mut dirty = self
2580 .dirty_usearch_spaces
2581 .lock()
2582 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2583 if dirty.is_empty() {
2584 return Ok(());
2585 }
2586
2587 let mut spaces = dirty.drain().collect::<Vec<_>>();
2588 spaces.sort();
2589 spaces
2590 };
2591
2592 for (index, space) in spaces.iter().enumerate() {
2593 if let Err(err) = self.save_usearch_for_space(space) {
2594 let mut dirty = self
2595 .dirty_usearch_spaces
2596 .lock()
2597 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2598 for unsaved in &spaces[index..] {
2599 dirty.insert(unsaved.clone());
2600 }
2601 return Err(err);
2602 }
2603 crate::profile::increment_update_count("usearch_saved_spaces", 1);
2604 }
2605
2606 Ok(())
2607 }
2608
2609 fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2610 self.dirty_usearch_spaces
2611 .lock()
2612 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2613 .insert(space.to_string());
2614 Ok(())
2615 }
2616
2617 fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2618 let space_indexes = self.get_space_indexes(space)?;
2619 let index = space_indexes
2620 .usearch_index
2621 .read()
2622 .map_err(|_| CoreError::poisoned("usearch index"))?;
2623
2624 let save_started = std::time::Instant::now();
2625 save_usearch_index(&index, &space_indexes.usearch_path)?;
2626 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2627 Ok(())
2628 }
2629
2630 pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2631 self.query_dense_filtered(space, vector, None, limit)
2632 }
2633
2634 pub fn query_dense_in_chunks(
2635 &self,
2636 space: &str,
2637 vector: &[f32],
2638 chunk_ids: &[i64],
2639 limit: usize,
2640 ) -> Result<Vec<DenseHit>> {
2641 if chunk_ids.is_empty() {
2642 return Ok(Vec::new());
2643 }
2644
2645 let allowed_keys = chunk_ids
2646 .iter()
2647 .map(|chunk_id| {
2648 u64::try_from(*chunk_id).map_err(|_| {
2649 CoreError::Internal(format!(
2650 "chunk_id must be non-negative for usearch query: {chunk_id}"
2651 ))
2652 })
2653 })
2654 .collect::<Result<HashSet<_>>>()?;
2655 self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2656 }
2657
2658 pub(crate) fn query_dense_in_key_set(
2659 &self,
2660 space: &str,
2661 vector: &[f32],
2662 allowed_keys: &HashSet<u64>,
2663 limit: usize,
2664 ) -> Result<Vec<DenseHit>> {
2665 if allowed_keys.is_empty() {
2666 return Ok(Vec::new());
2667 }
2668
2669 self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2670 }
2671
2672 fn query_dense_filtered(
2673 &self,
2674 space: &str,
2675 vector: &[f32],
2676 allowed_keys: Option<&HashSet<u64>>,
2677 limit: usize,
2678 ) -> Result<Vec<DenseHit>> {
2679 if limit == 0 {
2680 return Ok(Vec::new());
2681 }
2682 if vector.is_empty() {
2683 return Err(CoreError::Internal(
2684 "cannot query usearch with empty vector".to_string(),
2685 ));
2686 }
2687
2688 let space_indexes = self.get_space_indexes(space)?;
2689 let index = space_indexes
2690 .usearch_index
2691 .read()
2692 .map_err(|_| CoreError::poisoned("usearch index"))?;
2693
2694 if index.size() == 0 {
2695 return Ok(Vec::new());
2696 }
2697 if vector.len() != index.dimensions() {
2698 return Err(CoreError::Internal(format!(
2699 "query vector dimension mismatch: expected {}, got {}",
2700 index.dimensions(),
2701 vector.len()
2702 )));
2703 }
2704
2705 let matches = if let Some(allowed_keys) = allowed_keys {
2706 index
2707 .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2708 .map_err(|err| {
2709 CoreError::Internal(format!("usearch filtered query failed: {err}"))
2710 })?
2711 } else {
2712 index
2713 .search::<f32>(vector, limit)
2714 .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2715 };
2716 let hits = matches
2717 .keys
2718 .into_iter()
2719 .zip(matches.distances)
2720 .map(|(key, distance)| DenseHit {
2721 chunk_id: key as i64,
2722 distance,
2723 })
2724 .collect();
2725 Ok(hits)
2726 }
2727
2728 pub fn count_usearch(&self, space: &str) -> Result<usize> {
2729 let space_indexes = self.get_space_indexes(space)?;
2730 let index = space_indexes
2731 .usearch_index
2732 .read()
2733 .map_err(|_| CoreError::poisoned("usearch index"))?;
2734 Ok(index.size())
2735 }
2736
2737 pub fn clear_usearch(&self, space: &str) -> Result<()> {
2738 let space_indexes = self.get_space_indexes(space)?;
2739 let index = space_indexes
2740 .usearch_index
2741 .write()
2742 .map_err(|_| CoreError::poisoned("usearch index"))?;
2743 let clear_started = std::time::Instant::now();
2744 index
2745 .reset()
2746 .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2747 std::fs::File::create(&space_indexes.usearch_path)?;
2748 crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2749 Ok(())
2750 }
2751
2752 pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2753 self.get_fts_dirty_documents_filtered("", Vec::new())
2754 }
2755
2756 pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2757 self.get_fts_dirty_documents_filtered(
2758 " AND c.space_id = ?",
2759 vec![SqlValue::Integer(space_id)],
2760 )
2761 }
2762
2763 pub fn get_fts_dirty_documents_in_collections(
2764 &self,
2765 collection_ids: &[i64],
2766 ) -> Result<Vec<FtsDirtyRecord>> {
2767 if collection_ids.is_empty() {
2768 return Ok(Vec::new());
2769 }
2770
2771 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2772 let clause = format!(" AND d.collection_id IN ({placeholders})");
2773 let params = collection_ids
2774 .iter()
2775 .map(|id| SqlValue::Integer(*id))
2776 .collect::<Vec<_>>();
2777 self.get_fts_dirty_documents_filtered(&clause, params)
2778 }
2779
2780 fn get_fts_dirty_documents_filtered(
2781 &self,
2782 scope_clause: &str,
2783 scope_params: Vec<SqlValue>,
2784 ) -> Result<Vec<FtsDirtyRecord>> {
2785 let conn = self
2786 .db
2787 .lock()
2788 .map_err(|_| CoreError::poisoned("database"))?;
2789
2790 let sql = format!(
2791 "SELECT d.id, d.path, d.title, d.hash, c.path, s.name
2792 FROM documents d
2793 JOIN collections c ON c.id = d.collection_id
2794 JOIN spaces s ON s.id = c.space_id
2795 WHERE d.fts_dirty = 1{scope_clause}
2796 ORDER BY d.id ASC"
2797 );
2798 let mut stmt = conn.prepare(&sql)?;
2799 let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2800 Ok((
2801 row.get::<_, i64>(0)?,
2802 row.get::<_, String>(1)?,
2803 row.get::<_, String>(2)?,
2804 row.get::<_, String>(3)?,
2805 row.get::<_, String>(4)?,
2806 row.get::<_, String>(5)?,
2807 ))
2808 })?;
2809 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2810 drop(stmt);
2811
2812 let mut records = Vec::with_capacity(headers.len());
2813 for (doc_id, doc_path, doc_title, doc_hash, collection_path, space_name) in headers {
2814 let chunks = load_chunks_for_doc(&conn, doc_id)?;
2815 records.push(FtsDirtyRecord {
2816 doc_id,
2817 doc_path,
2818 doc_title: decode_optional_title(doc_title),
2819 doc_hash,
2820 collection_path: PathBuf::from(collection_path),
2821 space_name,
2822 chunks,
2823 });
2824 }
2825
2826 Ok(records)
2827 }
2828
2829 pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2830 if doc_ids.is_empty() {
2831 return Ok(());
2832 }
2833
2834 let conn = self
2835 .db
2836 .lock()
2837 .map_err(|_| CoreError::poisoned("database"))?;
2838
2839 let placeholders = vec!["?"; doc_ids.len()].join(", ");
2840 let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2841 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2842 Ok(())
2843 }
2844
2845 pub fn count_documents_in_collection(
2846 &self,
2847 collection_id: i64,
2848 active_only: bool,
2849 ) -> Result<usize> {
2850 let conn = self
2851 .db
2852 .lock()
2853 .map_err(|_| CoreError::poisoned("database"))?;
2854 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2855 let active_only = i64::from(active_only);
2856 query_count(
2857 &conn,
2858 "SELECT COUNT(*)
2859 FROM documents
2860 WHERE collection_id = ?1
2861 AND (?2 = 0 OR active = 1)",
2862 params![collection_id, active_only],
2863 )
2864 }
2865
2866 pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2867 let conn = self
2868 .db
2869 .lock()
2870 .map_err(|_| CoreError::poisoned("database"))?;
2871 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2872
2873 query_count(
2874 &conn,
2875 "SELECT COUNT(*)
2876 FROM chunks c
2877 JOIN documents d ON d.id = c.doc_id
2878 WHERE d.collection_id = ?1",
2879 params![collection_id],
2880 )
2881 }
2882
2883 pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2884 let conn = self
2885 .db
2886 .lock()
2887 .map_err(|_| CoreError::poisoned("database"))?;
2888 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2889
2890 query_count(
2891 &conn,
2892 "SELECT COUNT(DISTINCT e.chunk_id)
2893 FROM embeddings e
2894 JOIN chunks c ON c.id = e.chunk_id
2895 JOIN documents d ON d.id = c.doc_id
2896 WHERE d.collection_id = ?1",
2897 params![collection_id],
2898 )
2899 }
2900
2901 pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2902 let conn = self
2903 .db
2904 .lock()
2905 .map_err(|_| CoreError::poisoned("database"))?;
2906
2907 match space_id {
2908 Some(space_id) => {
2909 let _space_name = lookup_space_name(&conn, space_id)?;
2910 query_count(
2911 &conn,
2912 "SELECT COUNT(*)
2913 FROM documents d
2914 JOIN collections c ON c.id = d.collection_id
2915 WHERE c.space_id = ?1",
2916 params![space_id],
2917 )
2918 }
2919 None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2920 }
2921 }
2922
2923 pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2924 let conn = self
2925 .db
2926 .lock()
2927 .map_err(|_| CoreError::poisoned("database"))?;
2928
2929 match space_id {
2930 Some(space_id) => {
2931 let _space_name = lookup_space_name(&conn, space_id)?;
2932 query_count(
2933 &conn,
2934 "SELECT COUNT(*)
2935 FROM chunks c
2936 JOIN documents d ON d.id = c.doc_id
2937 JOIN collections col ON col.id = d.collection_id
2938 WHERE col.space_id = ?1",
2939 params![space_id],
2940 )
2941 }
2942 None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2943 }
2944 }
2945
2946 pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2947 let conn = self
2948 .db
2949 .lock()
2950 .map_err(|_| CoreError::poisoned("database"))?;
2951
2952 match space_id {
2953 Some(space_id) => {
2954 let _space_name = lookup_space_name(&conn, space_id)?;
2955 query_count(
2956 &conn,
2957 "SELECT COUNT(DISTINCT e.chunk_id)
2958 FROM embeddings e
2959 JOIN chunks c ON c.id = e.chunk_id
2960 JOIN documents d ON d.id = c.doc_id
2961 JOIN collections col ON col.id = d.collection_id
2962 WHERE col.space_id = ?1",
2963 params![space_id],
2964 )
2965 }
2966 None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2967 }
2968 }
2969
2970 pub fn disk_usage(&self) -> Result<DiskUsage> {
2971 let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2972
2973 let mut tantivy_bytes = 0_u64;
2974 let mut usearch_bytes = 0_u64;
2975 let spaces_dir = self.cache_dir.join(SPACES_DIR);
2976 if spaces_dir.exists() {
2977 for entry in std::fs::read_dir(&spaces_dir)? {
2978 let space_dir = entry?.path();
2979 if !space_dir.is_dir() {
2980 continue;
2981 }
2982
2983 tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2984 usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2985 }
2986 }
2987
2988 let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2989 let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2990
2991 Ok(DiskUsage {
2992 sqlite_bytes,
2993 tantivy_bytes,
2994 usearch_bytes,
2995 models_bytes,
2996 total_bytes,
2997 })
2998 }
2999
3000 fn unload_space(&self, name: &str) -> Result<()> {
3001 let mut spaces = self
3002 .spaces
3003 .write()
3004 .map_err(|_| CoreError::poisoned("spaces"))?;
3005 spaces.remove(name);
3006 Ok(())
3007 }
3008
3009 fn remove_space_artifacts(&self, name: &str) -> Result<()> {
3010 let space_root = self.space_root_path(name);
3011 if space_root.exists() {
3012 std::fs::remove_dir_all(space_root)?;
3013 }
3014 Ok(())
3015 }
3016
3017 fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
3018 let old_root = self.space_root_path(old);
3019 let new_root = self.space_root_path(new);
3020 if !old_root.exists() {
3021 return Ok(());
3022 }
3023
3024 if new_root.exists() {
3025 return Err(CoreError::Internal(format!(
3026 "cannot rename space artifacts: destination already exists: {}",
3027 new_root.display()
3028 )));
3029 }
3030
3031 if let Some(parent) = new_root.parent() {
3032 std::fs::create_dir_all(parent)?;
3033 }
3034 std::fs::rename(old_root, new_root)?;
3035 Ok(())
3036 }
3037
3038 fn space_root_path(&self, name: &str) -> PathBuf {
3039 self.cache_dir.join(SPACES_DIR).join(name)
3040 }
3041
3042 fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3043 let space_root = self.space_root_path(name);
3044 let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3045 let usearch_path = space_root.join(USEARCH_FILENAME);
3046 (tantivy_dir, usearch_path)
3047 }
3048
3049 fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3050 self.open_space(name)?;
3051 let spaces = self
3052 .spaces
3053 .read()
3054 .map_err(|_| CoreError::poisoned("spaces"))?;
3055 spaces.get(name).cloned().ok_or_else(|| {
3056 KboltError::SpaceNotFound {
3057 name: name.to_string(),
3058 }
3059 .into()
3060 })
3061 }
3062}
3063
3064fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3065 let result = conn.query_row(
3066 "SELECT name FROM spaces WHERE id = ?1",
3067 params![space_id],
3068 |row| row.get::<_, String>(0),
3069 );
3070 match result {
3071 Ok(name) => Ok(name),
3072 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3073 name: format!("id={space_id}"),
3074 }
3075 .into()),
3076 Err(err) => Err(err.into()),
3077 }
3078}
3079
3080fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3081 let result = conn.query_row(
3082 "SELECT name FROM collections WHERE id = ?1",
3083 params![collection_id],
3084 |row| row.get::<_, String>(0),
3085 );
3086 match result {
3087 Ok(name) => Ok(name),
3088 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3089 name: format!("id={collection_id}"),
3090 }
3091 .into()),
3092 Err(err) => Err(err.into()),
3093 }
3094}
3095
3096fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3097 let result = conn.query_row(
3098 "SELECT id FROM documents WHERE id = ?1",
3099 params![doc_id],
3100 |row| row.get::<_, i64>(0),
3101 );
3102 match result {
3103 Ok(id) => Ok(id),
3104 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3105 path: format!("id={doc_id}"),
3106 }
3107 .into()),
3108 Err(err) => Err(err.into()),
3109 }
3110}
3111
3112fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3113 let mut stmt = conn.prepare(
3114 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3115 FROM chunks
3116 WHERE doc_id = ?1
3117 ORDER BY seq ASC",
3118 )?;
3119 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3120 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3121 Ok(chunks)
3122}
3123
3124fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3125 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3126 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3127 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3128 Ok(chunk_ids)
3129}
3130
3131fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3132 let result = conn.query_row(
3133 "SELECT doc_id FROM chunks WHERE id = ?1",
3134 params![chunk_id],
3135 |row| row.get::<_, i64>(0),
3136 );
3137 match result {
3138 Ok(doc_id) => Ok(doc_id),
3139 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3140 path: format!("chunk_id={chunk_id}"),
3141 }
3142 .into()),
3143 Err(err) => Err(err.into()),
3144 }
3145}
3146
3147fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3148 let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3149 Ok(count as usize)
3150}
3151
3152fn file_size_or_zero(path: &Path) -> Result<u64> {
3153 if !path.exists() {
3154 return Ok(0);
3155 }
3156
3157 let metadata = std::fs::metadata(path)?;
3158 if metadata.is_file() {
3159 Ok(metadata.len())
3160 } else {
3161 Ok(0)
3162 }
3163}
3164
3165fn dir_size_or_zero(path: &Path) -> Result<u64> {
3166 if !path.exists() {
3167 return Ok(0);
3168 }
3169
3170 let mut total = 0_u64;
3171 for entry in std::fs::read_dir(path)? {
3172 let entry = entry?;
3173 let child_path = entry.path();
3174 let metadata = std::fs::symlink_metadata(&child_path)?;
3175 if metadata.is_file() {
3176 total += metadata.len();
3177 } else if metadata.is_dir() {
3178 total += dir_size_or_zero(&child_path)?;
3179 }
3180 }
3181
3182 Ok(total)
3183}
3184
3185fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3186 let meta_path = path.join("meta.json");
3187 if meta_path.exists() {
3188 return Ok(Index::open_in_dir(path)?);
3189 }
3190
3191 Ok(Index::create_in_dir(path, tantivy_schema())?)
3192}
3193
3194fn with_tantivy_writer<T>(
3195 space_indexes: &SpaceIndexes,
3196 f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3197) -> Result<T> {
3198 let mut writer = space_indexes
3199 .tantivy_writer
3200 .lock()
3201 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3202
3203 if writer.is_none() {
3204 *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3205 }
3206
3207 let writer = writer
3208 .as_mut()
3209 .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3210 f(writer)
3211}
3212
3213fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3214 if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3215 return Ok(());
3216 }
3217
3218 let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3219 if document_count == 0 {
3220 return Ok(());
3221 }
3222
3223 Err(KboltError::Config(
3224 "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3225 )
3226 .into())
3227}
3228
3229fn ensure_schema_version(conn: &Connection) -> Result<()> {
3230 let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3231 if current > SCHEMA_VERSION {
3232 return Err(KboltError::Config(format!(
3233 "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3234 ))
3235 .into());
3236 }
3237
3238 if current < SCHEMA_VERSION {
3239 conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3240 }
3241
3242 Ok(())
3243}
3244
3245fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3246 let exists = conn.query_row(
3247 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3248 [table],
3249 |row| row.get::<_, i64>(0),
3250 )?;
3251 Ok(exists != 0)
3252}
3253
3254fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3255 let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3256 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3257 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3258 drop(stmt);
3259
3260 if columns.iter().any(|column| column == "generation_key") {
3261 return Ok(());
3262 }
3263
3264 conn.execute(
3265 "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3266 [],
3267 )?;
3268 Ok(())
3269}
3270
3271fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3272 let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3273 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3274 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3275 drop(stmt);
3276
3277 if columns.iter().any(|column| column == "retrieval_prefix") {
3278 return Ok(());
3279 }
3280
3281 conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3282 Ok(())
3283}
3284
3285fn build_literal_bm25_query(
3286 index: &Index,
3287 fields: &[Bm25FieldSpec],
3288 query: &str,
3289) -> Result<Option<Box<dyn Query>>> {
3290 let mut clauses = Vec::new();
3291 for field in fields {
3292 for token in analyzed_terms_for_field(index, field.field, query)? {
3293 let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3294 Term::from_field_text(field.field, &token),
3295 field.index_record_option,
3296 ));
3297 let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3298 Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3299 } else {
3300 term_query
3301 };
3302 clauses.push((Occur::Should, query));
3303 }
3304 }
3305
3306 if clauses.is_empty() {
3307 Ok(None)
3308 } else {
3309 Ok(Some(Box::new(BooleanQuery::new(clauses))))
3310 }
3311}
3312
3313fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3314 let mut terms = Vec::new();
3315 let mut seen = HashSet::new();
3316 for doc_id in document_ids {
3317 let doc_id = u64::try_from(*doc_id).map_err(|_| {
3318 CoreError::Internal(format!(
3319 "doc_id must be non-negative for tantivy query: {doc_id}"
3320 ))
3321 })?;
3322 if seen.insert(doc_id) {
3323 terms.push(Term::from_field_u64(field, doc_id));
3324 }
3325 }
3326
3327 Ok(Box::new(ConstScoreQuery::new(
3328 Box::new(TermSetQuery::new(terms)),
3329 0.0,
3330 )))
3331}
3332
3333fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3334 let mut analyzer = index.tokenizer_for_field(field)?;
3335 let mut stream = analyzer.token_stream(query);
3336 let mut terms = Vec::new();
3337 let mut seen = HashSet::new();
3338 while let Some(token) = stream.next() {
3339 if token.text.is_empty() {
3340 continue;
3341 }
3342 let text = token.text.clone();
3343 if seen.insert(text.clone()) {
3344 terms.push(text);
3345 }
3346 }
3347 Ok(terms)
3348}
3349
3350fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3351 let options = IndexOptions {
3352 dimensions,
3353 metric: MetricKind::Cos,
3354 quantization: ScalarKind::F32,
3355 connectivity: 16,
3356 expansion_add: 200,
3357 expansion_search: 100,
3358 ..IndexOptions::default()
3359 };
3360 usearch::Index::new(&options)
3361 .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3362}
3363
3364fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3365 let index = new_usearch_index(256)?;
3366 let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3367 if file_size > 0 {
3368 let path = path
3369 .to_str()
3370 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3371 index
3372 .load(path)
3373 .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3374 }
3375 Ok(index)
3376}
3377
3378fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3379 if index.size() == 0 && index.dimensions() != expected_dimensions {
3380 *index = new_usearch_index(expected_dimensions)?;
3381 return Ok(());
3382 }
3383
3384 if index.dimensions() != expected_dimensions {
3385 return Err(CoreError::Internal(format!(
3386 "usearch vector dimension mismatch: index expects {}, got {}",
3387 index.dimensions(),
3388 expected_dimensions
3389 )));
3390 }
3391 Ok(())
3392}
3393
3394fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3395 if index.size() == 0 {
3396 std::fs::File::create(path)?;
3397 return Ok(());
3398 }
3399
3400 let path = path
3401 .to_str()
3402 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3403 index
3404 .save(path)
3405 .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3406 Ok(())
3407}
3408
3409fn tantivy_schema() -> tantivy::schema::Schema {
3410 let mut builder = tantivy::schema::Schema::builder();
3411 builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3412 builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3413 builder.add_text_field("filepath", TEXT | STORED);
3414 builder.add_text_field("title", TEXT | STORED);
3415 builder.add_text_field("heading", TEXT | STORED);
3416 builder.add_text_field("body", TEXT);
3417 builder.build()
3418}
3419
3420fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3421 Ok(TantivyFields {
3422 chunk_id: schema.get_field("chunk_id").map_err(|_| {
3423 CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3424 })?,
3425 doc_id: schema
3426 .get_field("doc_id")
3427 .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3428 filepath: schema.get_field("filepath").map_err(|_| {
3429 CoreError::Internal("tantivy schema missing field: filepath".to_string())
3430 })?,
3431 title: schema
3432 .get_field("title")
3433 .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3434 heading: schema.get_field("heading").map_err(|_| {
3435 CoreError::Internal("tantivy schema missing field: heading".to_string())
3436 })?,
3437 body: schema
3438 .get_field("body")
3439 .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3440 })
3441}
3442
3443fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3444 match name {
3445 "chunk_id" => Ok(fields.chunk_id),
3446 "doc_id" => Ok(fields.doc_id),
3447 "filepath" => Ok(fields.filepath),
3448 "title" => Ok(fields.title),
3449 "heading" => Ok(fields.heading),
3450 "body" => Ok(fields.body),
3451 other => Err(CoreError::Internal(format!(
3452 "unsupported tantivy field: {other}"
3453 ))),
3454 }
3455}
3456
3457fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3458 match extensions {
3459 None => Ok(None),
3460 Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3461 }
3462}
3463
3464fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3465 match raw {
3466 None => Ok(None),
3467 Some(json) => serde_json::from_str::<Vec<String>>(&json)
3468 .map(Some)
3469 .map_err(Into::into),
3470 }
3471}
3472
3473fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3474 Ok(SpaceRow {
3475 id: row.get(0)?,
3476 name: row.get(1)?,
3477 description: row.get(2)?,
3478 created: row.get(3)?,
3479 })
3480}
3481
3482fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3483 let raw_extensions: Option<String> = row.get(5)?;
3484 let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3485 Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3486 })?;
3487 Ok(CollectionRow {
3488 id: row.get(0)?,
3489 space_id: row.get(1)?,
3490 name: row.get(2)?,
3491 path: PathBuf::from(row.get::<_, String>(3)?),
3492 description: row.get(4)?,
3493 extensions,
3494 created: row.get(6)?,
3495 updated: row.get(7)?,
3496 })
3497}
3498
3499fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3500 decode_document_row_at(row, 0)
3501}
3502
3503fn decode_document_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<DocumentRow> {
3504 let active_value: i64 = row.get(base + 6)?;
3505 let fts_dirty_value: i64 = row.get(base + 8)?;
3506 Ok(DocumentRow {
3507 id: row.get(base)?,
3508 collection_id: row.get(base + 1)?,
3509 path: row.get(base + 2)?,
3510 title: decode_optional_title(row.get::<_, String>(base + 3)?),
3511 hash: row.get(base + 4)?,
3512 modified: row.get(base + 5)?,
3513 active: active_value != 0,
3514 deactivated_at: row.get(base + 7)?,
3515 fts_dirty: fts_dirty_value != 0,
3516 })
3517}
3518
3519fn normalized_title(title: Option<&str>) -> Option<&str> {
3520 title.map(str::trim).filter(|title| !title.is_empty())
3521}
3522
3523fn decode_optional_title(title: String) -> Option<String> {
3524 let trimmed = title.trim();
3525 if trimmed.is_empty() {
3526 None
3527 } else if trimmed.len() == title.len() {
3528 Some(title)
3529 } else {
3530 Some(trimmed.to_string())
3531 }
3532}
3533
3534fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3535 Ok(DocumentTextRow {
3536 doc_id: row.get(0)?,
3537 extractor_key: row.get(1)?,
3538 source_hash: row.get(2)?,
3539 text_hash: row.get(3)?,
3540 generation_key: row.get(4)?,
3541 text: row.get(5)?,
3542 created: row.get(6)?,
3543 })
3544}
3545
3546fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3547 decode_chunk_row_at(row, 0)
3548}
3549
3550fn decode_chunk_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<ChunkRow> {
3551 let offset_value: i64 = row.get(base + 3)?;
3552 let length_value: i64 = row.get(base + 4)?;
3553 let kind_raw: String = row.get(base + 6)?;
3554 let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3555 Error::FromSqlConversionFailure(base + 6, rusqlite::types::Type::Text, Box::new(err))
3556 })?;
3557 Ok(ChunkRow {
3558 id: row.get(base)?,
3559 doc_id: row.get(base + 1)?,
3560 seq: row.get(base + 2)?,
3561 offset: decode_non_negative_usize(offset_value, base + 3, "chunks.offset")?,
3562 length: decode_non_negative_usize(length_value, base + 4, "chunks.length")?,
3563 heading: row.get(base + 5)?,
3564 kind,
3565 retrieval_prefix: row.get(base + 7)?,
3566 })
3567}
3568
3569fn decode_non_negative_usize(
3570 value: i64,
3571 column: usize,
3572 name: &'static str,
3573) -> rusqlite::Result<usize> {
3574 if value < 0 {
3575 return Err(Error::FromSqlConversionFailure(
3576 column,
3577 SqlType::Integer,
3578 Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3579 ));
3580 }
3581
3582 Ok(value as usize)
3583}
3584
3585fn canonical_chunk_slice_from_bytes(
3586 chunk_id: i64,
3587 offset_value: i64,
3588 length_value: i64,
3589 document_len_value: i64,
3590 start_byte: Vec<u8>,
3591 end_byte: Vec<u8>,
3592 bytes: Vec<u8>,
3593) -> Result<String> {
3594 if offset_value < 0 {
3595 return Err(CoreError::Internal(format!(
3596 "chunk {chunk_id} offset must not be negative"
3597 )));
3598 }
3599 if length_value < 0 {
3600 return Err(CoreError::Internal(format!(
3601 "chunk {chunk_id} length must not be negative"
3602 )));
3603 }
3604 if document_len_value < 0 {
3605 return Err(CoreError::Internal(format!(
3606 "document text length for chunk {chunk_id} must not be negative"
3607 )));
3608 }
3609
3610 let offset = offset_value as usize;
3611 let length = length_value as usize;
3612 let document_len = document_len_value as usize;
3613 let end = offset.checked_add(length).ok_or_else(|| {
3614 CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3615 })?;
3616 if end > document_len {
3617 return Err(CoreError::Internal(format!(
3618 "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3619 )));
3620 }
3621 if bytes.len() != length {
3622 return Err(CoreError::Internal(format!(
3623 "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3624 bytes.len()
3625 )));
3626 }
3627 validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3628 validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3629
3630 String::from_utf8(bytes).map_err(|err| {
3631 CoreError::Internal(format!(
3632 "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3633 ))
3634 })
3635}
3636
3637fn validate_sqlite_utf8_boundary(
3638 chunk_id: i64,
3639 index: usize,
3640 document_len: usize,
3641 byte_at_index: &[u8],
3642) -> Result<()> {
3643 if index == 0 || index == document_len {
3644 return Ok(());
3645 }
3646 if byte_at_index.len() != 1 {
3647 return Err(CoreError::Internal(format!(
3648 "chunk {chunk_id} text boundary byte at {index} is missing"
3649 )));
3650 }
3651 if (0x80..0xC0).contains(&byte_at_index[0]) {
3652 return Err(CoreError::Internal(format!(
3653 "chunk {chunk_id} text span is not on UTF-8 boundaries"
3654 )));
3655 }
3656
3657 Ok(())
3658}
3659
3660pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3661 let label = format!("chunk {}", chunk.id);
3662 let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3663
3664 Ok(document_text[chunk.offset..end].to_string())
3665}
3666
3667fn validate_text_span(
3668 document_text: &str,
3669 offset: usize,
3670 length: usize,
3671 label: &str,
3672) -> Result<usize> {
3673 let end = offset
3674 .checked_add(length)
3675 .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3676 if end > document_text.len() {
3677 return Err(CoreError::Internal(format!(
3678 "{label} text span {}..{} exceeds document text length {}",
3679 offset,
3680 end,
3681 document_text.len()
3682 )));
3683 }
3684 if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3685 return Err(CoreError::Internal(format!(
3686 "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3687 )));
3688 }
3689
3690 Ok(end)
3691}
3692
3693pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3694 KboltError::Internal(format!(
3695 "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3696 ))
3697 .into()
3698}
3699
3700#[cfg(test)]
3701mod tests;