1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12 BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29 Immediate,
30 Deferred,
31}
32
33pub struct Storage {
34 db: Mutex<Connection>,
35 cache_dir: PathBuf,
36 spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37 dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42 pub id: i64,
43 pub name: String,
44 pub description: Option<String>,
45 pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50 pub id: i64,
51 pub space_id: i64,
52 pub name: String,
53 pub path: PathBuf,
54 pub description: Option<String>,
55 pub extensions: Option<Vec<String>>,
56 pub created: String,
57 pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62 pub id: i64,
63 pub collection_id: i64,
64 pub path: String,
65 pub title: String,
66 pub title_source: DocumentTitleSource,
67 pub hash: String,
68 pub modified: String,
69 pub active: bool,
70 pub deactivated_at: Option<String>,
71 pub fts_dirty: bool,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct FileListRow {
76 pub doc_id: i64,
77 pub path: String,
78 pub title: String,
79 pub hash: String,
80 pub active: bool,
81 pub chunk_count: usize,
82 pub embedded_chunk_count: usize,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChunkRow {
87 pub id: i64,
88 pub doc_id: i64,
89 pub seq: i32,
90 pub offset: usize,
91 pub length: usize,
92 pub heading: Option<String>,
93 pub kind: FinalChunkKind,
94 pub retrieval_prefix: Option<String>,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ChunkInsert {
99 pub seq: i32,
100 pub offset: usize,
101 pub length: usize,
102 pub heading: Option<String>,
103 pub kind: FinalChunkKind,
104 pub retrieval_prefix: Option<String>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DocumentTextRow {
109 pub doc_id: i64,
110 pub extractor_key: String,
111 pub source_hash: String,
112 pub text_hash: String,
113 pub generation_key: String,
114 pub text: String,
115 pub created: String,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ChunkTextRow {
120 pub chunk: ChunkRow,
121 pub extractor_key: String,
122 pub text: String,
123}
124
125pub struct DocumentGenerationReplace<'a> {
126 pub collection_id: i64,
127 pub path: &'a str,
128 pub title: &'a str,
129 pub title_source: DocumentTitleSource,
130 pub hash: &'a str,
131 pub modified: &'a str,
132 pub extractor_key: &'a str,
133 pub source_hash: &'a str,
134 pub text_hash: &'a str,
135 pub generation_key: &'a str,
136 pub text: &'a str,
137 pub chunks: &'a [ChunkInsert],
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct DocumentGenerationReplaceResult {
142 pub doc_id: i64,
143 pub old_chunk_ids: Vec<i64>,
144 pub chunk_ids: Vec<i64>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct EmbedRecord {
149 pub chunk: ChunkRow,
150 pub doc_path: String,
151 pub collection_path: PathBuf,
152 pub space_name: String,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FtsDirtyRecord {
157 pub doc_id: i64,
158 pub doc_path: String,
159 pub doc_title: String,
160 pub doc_title_source: DocumentTitleSource,
161 pub doc_hash: String,
162 pub collection_path: PathBuf,
163 pub space_name: String,
164 pub chunks: Vec<ChunkRow>,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct ReapableDocument {
169 pub doc_id: i64,
170 pub space_name: String,
171 pub chunk_ids: Vec<i64>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TantivyEntry {
176 pub chunk_id: i64,
177 pub doc_id: i64,
178 pub filepath: String,
179 pub semantic_title: Option<String>,
180 pub heading: Option<String>,
181 pub body: String,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum DocumentTitleSource {
186 Extracted,
187 FilenameFallback,
188}
189
190impl DocumentTitleSource {
191 pub fn as_sql(self) -> &'static str {
192 match self {
193 Self::Extracted => "extracted",
194 Self::FilenameFallback => "filename_fallback",
195 }
196 }
197
198 fn from_sql(raw: &str) -> std::result::Result<Self, KboltError> {
199 match raw {
200 "extracted" => Ok(Self::Extracted),
201 "filename_fallback" => Ok(Self::FilenameFallback),
202 other => Err(KboltError::InvalidInput(format!(
203 "invalid stored document title source: {other}"
204 ))),
205 }
206 }
207
208 pub fn semantic_title(self, title: &str) -> Option<&str> {
209 matches!(self, Self::Extracted)
210 .then_some(title.trim())
211 .filter(|title| !title.is_empty())
212 }
213}
214
215#[derive(Debug, Clone, PartialEq)]
216pub struct BM25Hit {
217 pub chunk_id: i64,
218 pub score: f32,
219}
220
221#[derive(Debug, Clone, PartialEq)]
222pub struct DenseHit {
223 pub chunk_id: i64,
224 pub distance: f32,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub struct SearchScopeSummary {
229 pub document_ids: Vec<i64>,
230 pub chunk_count: usize,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub enum SpaceResolution {
235 Found(SpaceRow),
236 Ambiguous(Vec<String>),
237 NotFound,
238}
239
240struct SpaceIndexes {
241 _tantivy_dir: PathBuf,
242 usearch_path: PathBuf,
243 tantivy_index: Index,
244 tantivy_reader: IndexReader,
245 tantivy_writer: Mutex<Option<IndexWriter>>,
246 usearch_index: RwLock<usearch::Index>,
247 fields: TantivyFields,
248}
249
250#[derive(Debug, Clone, Copy)]
251struct TantivyFields {
252 chunk_id: Field,
253 doc_id: Field,
254 filepath: Field,
255 title: Field,
256 heading: Field,
257 body: Field,
258}
259
260#[derive(Debug, Clone, Copy)]
261struct Bm25FieldSpec {
262 field: Field,
263 boost: f32,
264 index_record_option: IndexRecordOption,
265}
266
267impl Storage {
268 pub fn new(cache_dir: &Path) -> Result<Self> {
269 std::fs::create_dir_all(cache_dir)?;
270 let db_path = cache_dir.join(DB_FILE);
271 let conn = Connection::open(db_path)?;
272 reject_incompatible_legacy_index(&conn)?;
273 conn.execute_batch(
274 r#"
275PRAGMA foreign_keys = ON;
276PRAGMA journal_mode = WAL;
277
278CREATE TABLE IF NOT EXISTS spaces (
279 id INTEGER PRIMARY KEY,
280 name TEXT NOT NULL UNIQUE,
281 description TEXT,
282 created TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
283);
284
285CREATE TABLE IF NOT EXISTS collections (
286 id INTEGER PRIMARY KEY,
287 space_id INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
288 name TEXT NOT NULL,
289 path TEXT NOT NULL,
290 description TEXT,
291 extensions TEXT,
292 created TEXT NOT NULL,
293 updated TEXT NOT NULL,
294 UNIQUE(space_id, name)
295);
296
297CREATE TABLE IF NOT EXISTS documents (
298 id INTEGER PRIMARY KEY,
299 collection_id INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
300 path TEXT NOT NULL,
301 title TEXT NOT NULL,
302 title_source TEXT NOT NULL DEFAULT 'extracted',
303 hash TEXT NOT NULL,
304 modified TEXT NOT NULL,
305 active INTEGER NOT NULL DEFAULT 1,
306 deactivated_at TEXT,
307 fts_dirty INTEGER NOT NULL DEFAULT 0,
308 UNIQUE(collection_id, path)
309);
310CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
311CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
312CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
313
314CREATE TABLE IF NOT EXISTS chunks (
315 id INTEGER PRIMARY KEY,
316 doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
317 seq INTEGER NOT NULL,
318 offset INTEGER NOT NULL,
319 length INTEGER NOT NULL,
320 heading TEXT,
321 kind TEXT NOT NULL DEFAULT 'section',
322 retrieval_prefix TEXT,
323 UNIQUE(doc_id, seq)
324);
325CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
326
327CREATE TABLE IF NOT EXISTS embeddings (
328 chunk_id INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
329 model TEXT NOT NULL,
330 embedded_at TEXT NOT NULL,
331 PRIMARY KEY (chunk_id, model)
332);
333
334CREATE TABLE IF NOT EXISTS document_texts (
335 doc_id INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
336 extractor_key TEXT NOT NULL,
337 source_hash TEXT NOT NULL,
338 text_hash TEXT NOT NULL,
339 generation_key TEXT NOT NULL DEFAULT '',
340 text TEXT NOT NULL,
341 created TEXT NOT NULL
342);
343"#,
344 )?;
345 ensure_documents_title_source_column(&conn)?;
346 ensure_document_texts_generation_key_column(&conn)?;
347 ensure_chunks_retrieval_prefix_column(&conn)?;
348 ensure_schema_version(&conn)?;
349
350 conn.execute(
351 "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
352 params![DEFAULT_SPACE_NAME],
353 )?;
354
355 let storage = Self {
356 db: Mutex::new(conn),
357 cache_dir: cache_dir.to_path_buf(),
358 spaces: RwLock::new(HashMap::new()),
359 dirty_usearch_spaces: Mutex::new(HashSet::new()),
360 };
361 storage.open_space(DEFAULT_SPACE_NAME)?;
362
363 Ok(storage)
364 }
365
366 pub fn open_space(&self, name: &str) -> Result<()> {
367 let _space = self.get_space(name)?;
368
369 {
370 let spaces = self
371 .spaces
372 .read()
373 .map_err(|_| CoreError::poisoned("spaces"))?;
374 if spaces.contains_key(name) {
375 return Ok(());
376 }
377 }
378
379 let (tantivy_dir, usearch_path) = self.space_paths(name);
380 std::fs::create_dir_all(&tantivy_dir)?;
381 std::fs::OpenOptions::new()
382 .create(true)
383 .append(true)
384 .open(&usearch_path)?;
385 let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
386 let tantivy_reader = tantivy_index.reader()?;
387 let usearch_index = open_or_create_usearch_index(&usearch_path)?;
388 let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
389
390 let mut spaces = self
391 .spaces
392 .write()
393 .map_err(|_| CoreError::poisoned("spaces"))?;
394 spaces
395 .entry(name.to_string())
396 .or_insert(Arc::new(SpaceIndexes {
397 _tantivy_dir: tantivy_dir,
398 usearch_path,
399 tantivy_index,
400 tantivy_reader,
401 tantivy_writer: Mutex::new(None),
402 usearch_index: RwLock::new(usearch_index),
403 fields,
404 }));
405
406 Ok(())
407 }
408
409 pub fn close_space(&self, name: &str) -> Result<()> {
410 let _space = self.get_space(name)?;
411 let mut spaces = self
412 .spaces
413 .write()
414 .map_err(|_| CoreError::poisoned("spaces"))?;
415 let _removed = spaces.remove(name);
416 Ok(())
417 }
418
419 pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
420 let space_id = {
421 let conn = self
422 .db
423 .lock()
424 .map_err(|_| CoreError::poisoned("database"))?;
425
426 let result = conn.execute(
427 "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
428 params![name, description],
429 );
430
431 match result {
432 Ok(_) => conn.last_insert_rowid(),
433 Err(err) => {
434 return match err {
435 Error::SqliteFailure(sqlite_err, _)
436 if sqlite_err.code == ErrorCode::ConstraintViolation =>
437 {
438 Err(KboltError::SpaceAlreadyExists {
439 name: name.to_string(),
440 }
441 .into())
442 }
443 other => Err(other.into()),
444 };
445 }
446 }
447 };
448
449 if let Err(open_err) = self.open_space(name) {
450 let rollback_result = self
451 .db
452 .lock()
453 .map_err(|_| CoreError::poisoned("database"))?
454 .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
455
456 if let Err(rollback_err) = rollback_result {
457 return Err(CoreError::Internal(format!(
458 "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
459 )));
460 }
461
462 return Err(open_err);
463 }
464
465 Ok(space_id)
466 }
467
468 pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
469 let conn = self
470 .db
471 .lock()
472 .map_err(|_| CoreError::poisoned("database"))?;
473 let mut stmt =
474 conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
475
476 let row = stmt.query_row(params![name], decode_space_row);
477
478 match row {
479 Ok(space) => Ok(space),
480 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
481 name: name.to_string(),
482 }
483 .into()),
484 Err(err) => Err(err.into()),
485 }
486 }
487
488 pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
489 let conn = self
490 .db
491 .lock()
492 .map_err(|_| CoreError::poisoned("database"))?;
493 let mut stmt =
494 conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
495
496 let row = stmt.query_row(params![id], decode_space_row);
497
498 match row {
499 Ok(space) => Ok(space),
500 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
501 name: format!("id={id}"),
502 }
503 .into()),
504 Err(err) => Err(err.into()),
505 }
506 }
507
508 pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
509 let conn = self
510 .db
511 .lock()
512 .map_err(|_| CoreError::poisoned("database"))?;
513 let mut stmt = conn.prepare(
514 "SELECT id, name, description, created
515 FROM spaces
516 ORDER BY name ASC",
517 )?;
518
519 let rows = stmt.query_map([], decode_space_row)?;
520
521 let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
522 Ok(spaces)
523 }
524
525 pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
526 let conn = self
527 .db
528 .lock()
529 .map_err(|_| CoreError::poisoned("database"))?;
530 let mut stmt = conn.prepare(
531 "SELECT s.id, s.name, s.description, s.created
532 FROM spaces s
533 JOIN collections c ON c.space_id = s.id
534 WHERE c.name = ?1
535 ORDER BY s.name ASC",
536 )?;
537 let rows = stmt.query_map(params![collection], decode_space_row)?;
538 let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
539
540 if matches.is_empty() {
541 return Ok(SpaceResolution::NotFound);
542 }
543
544 if matches.len() == 1 {
545 return Ok(SpaceResolution::Found(matches[0].clone()));
546 }
547
548 let spaces = matches.into_iter().map(|space| space.name).collect();
549 Ok(SpaceResolution::Ambiguous(spaces))
550 }
551
552 pub fn delete_space(&self, name: &str) -> Result<()> {
553 if name == DEFAULT_SPACE_NAME {
554 let conn = self
555 .db
556 .lock()
557 .map_err(|_| CoreError::poisoned("database"))?;
558 conn.execute(
559 "DELETE FROM collections
560 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
561 params![name],
562 )?;
563 drop(conn);
564
565 self.unload_space(name)?;
566 self.remove_space_artifacts(name)?;
567 self.open_space(name)?;
568 return Ok(());
569 }
570
571 let conn = self
572 .db
573 .lock()
574 .map_err(|_| CoreError::poisoned("database"))?;
575 let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
576 drop(conn);
577
578 if deleted == 0 {
579 return Err(KboltError::SpaceNotFound {
580 name: name.to_string(),
581 }
582 .into());
583 }
584
585 self.unload_space(name)?;
586 self.remove_space_artifacts(name)?;
587
588 Ok(())
589 }
590
591 pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
592 if old == DEFAULT_SPACE_NAME {
593 return Err(
594 KboltError::Config("cannot rename reserved space: default".to_string()).into(),
595 );
596 }
597
598 let conn = self
599 .db
600 .lock()
601 .map_err(|_| CoreError::poisoned("database"))?;
602
603 let result = conn.execute(
604 "UPDATE spaces SET name = ?1 WHERE name = ?2",
605 params![new, old],
606 );
607 drop(conn);
608
609 match result {
610 Ok(0) => Err(KboltError::SpaceNotFound {
611 name: old.to_string(),
612 }
613 .into()),
614 Ok(_) => {
615 if let Err(rename_err) = self.rename_space_artifacts(old, new) {
616 let rollback = self
617 .db
618 .lock()
619 .map_err(|_| CoreError::poisoned("database"))?
620 .execute(
621 "UPDATE spaces SET name = ?1 WHERE name = ?2",
622 params![old, new],
623 );
624
625 if let Err(rollback_err) = rollback {
626 return Err(CoreError::Internal(format!(
627 "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
628 )));
629 }
630
631 return Err(rename_err);
632 }
633
634 self.unload_space(old)?;
635 if let Err(open_err) = self.open_space(new) {
636 let _ = self.rename_space_artifacts(new, old);
637 let _ = self
638 .db
639 .lock()
640 .map_err(|_| CoreError::poisoned("database"))?
641 .execute(
642 "UPDATE spaces SET name = ?1 WHERE name = ?2",
643 params![old, new],
644 );
645 let _ = self.open_space(old);
646 return Err(open_err);
647 }
648
649 Ok(())
650 }
651 Err(Error::SqliteFailure(sqlite_err, _))
652 if sqlite_err.code == ErrorCode::ConstraintViolation =>
653 {
654 Err(KboltError::SpaceAlreadyExists {
655 name: new.to_string(),
656 }
657 .into())
658 }
659 Err(err) => Err(err.into()),
660 }
661 }
662
663 pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
664 let conn = self
665 .db
666 .lock()
667 .map_err(|_| CoreError::poisoned("database"))?;
668
669 let updated = conn.execute(
670 "UPDATE spaces SET description = ?1 WHERE name = ?2",
671 params![description, name],
672 )?;
673
674 if updated == 0 {
675 return Err(KboltError::SpaceNotFound {
676 name: name.to_string(),
677 }
678 .into());
679 }
680
681 Ok(())
682 }
683
684 pub fn create_collection(
685 &self,
686 space_id: i64,
687 name: &str,
688 path: &Path,
689 description: Option<&str>,
690 extensions: Option<&[String]>,
691 ) -> Result<i64> {
692 let conn = self
693 .db
694 .lock()
695 .map_err(|_| CoreError::poisoned("database"))?;
696
697 let space_name = lookup_space_name(&conn, space_id)?;
698 let extensions_json = serialize_extensions(extensions)?;
699 let result = conn.execute(
700 "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
701 VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
702 params![space_id, name, path.to_string_lossy(), description, extensions_json],
703 );
704
705 match result {
706 Ok(_) => Ok(conn.last_insert_rowid()),
707 Err(Error::SqliteFailure(sqlite_err, _))
708 if sqlite_err.code == ErrorCode::ConstraintViolation =>
709 {
710 Err(KboltError::CollectionAlreadyExists {
711 name: name.to_string(),
712 space: space_name,
713 }
714 .into())
715 }
716 Err(err) => Err(err.into()),
717 }
718 }
719
720 pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
721 let conn = self
722 .db
723 .lock()
724 .map_err(|_| CoreError::poisoned("database"))?;
725 let mut stmt = conn.prepare(
726 "SELECT id, space_id, name, path, description, extensions, created, updated
727 FROM collections
728 WHERE space_id = ?1 AND name = ?2",
729 )?;
730
731 let result = stmt.query_row(params![space_id, name], decode_collection_row);
732 match result {
733 Ok(row) => Ok(row),
734 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
735 name: name.to_string(),
736 }
737 .into()),
738 Err(err) => Err(err.into()),
739 }
740 }
741
742 pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
743 let conn = self
744 .db
745 .lock()
746 .map_err(|_| CoreError::poisoned("database"))?;
747 let mut stmt = conn.prepare(
748 "SELECT id, space_id, name, path, description, extensions, created, updated
749 FROM collections
750 WHERE id = ?1",
751 )?;
752
753 let result = stmt.query_row(params![id], decode_collection_row);
754 match result {
755 Ok(row) => Ok(row),
756 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
757 name: format!("id={id}"),
758 }
759 .into()),
760 Err(err) => Err(err.into()),
761 }
762 }
763
764 pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
765 let conn = self
766 .db
767 .lock()
768 .map_err(|_| CoreError::poisoned("database"))?;
769
770 let (sql, params): (&str, Vec<i64>) = match space_id {
771 Some(id) => (
772 "SELECT id, space_id, name, path, description, extensions, created, updated
773 FROM collections
774 WHERE space_id = ?1
775 ORDER BY name ASC",
776 vec![id],
777 ),
778 None => (
779 "SELECT id, space_id, name, path, description, extensions, created, updated
780 FROM collections
781 ORDER BY space_id ASC, name ASC",
782 Vec::new(),
783 ),
784 };
785
786 let mut stmt = conn.prepare(sql)?;
787 let rows = if params.is_empty() {
788 stmt.query_map([], decode_collection_row)?
789 } else {
790 stmt.query_map(params![params[0]], decode_collection_row)?
791 };
792 let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
793 Ok(collections)
794 }
795
796 pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
797 let conn = self
798 .db
799 .lock()
800 .map_err(|_| CoreError::poisoned("database"))?;
801 let _space_name = lookup_space_name(&conn, space_id)?;
802
803 query_count(
804 &conn,
805 "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
806 params![space_id],
807 )
808 }
809
810 pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
811 let conn = self
812 .db
813 .lock()
814 .map_err(|_| CoreError::poisoned("database"))?;
815 let _space_name = lookup_space_name(&conn, space_id)?;
816
817 let deleted = conn.execute(
818 "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
819 params![space_id, name],
820 )?;
821
822 if deleted == 0 {
823 return Err(KboltError::CollectionNotFound {
824 name: name.to_string(),
825 }
826 .into());
827 }
828
829 Ok(())
830 }
831
832 pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
833 let conn = self
834 .db
835 .lock()
836 .map_err(|_| CoreError::poisoned("database"))?;
837 let space_name = lookup_space_name(&conn, space_id)?;
838 let result = conn.execute(
839 "UPDATE collections
840 SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
841 WHERE space_id = ?2 AND name = ?3",
842 params![new, space_id, old],
843 );
844
845 match result {
846 Ok(0) => Err(KboltError::CollectionNotFound {
847 name: old.to_string(),
848 }
849 .into()),
850 Ok(_) => Ok(()),
851 Err(Error::SqliteFailure(sqlite_err, _))
852 if sqlite_err.code == ErrorCode::ConstraintViolation =>
853 {
854 Err(KboltError::CollectionAlreadyExists {
855 name: new.to_string(),
856 space: space_name,
857 }
858 .into())
859 }
860 Err(err) => Err(err.into()),
861 }
862 }
863
864 pub fn update_collection_description(
865 &self,
866 space_id: i64,
867 name: &str,
868 desc: &str,
869 ) -> Result<()> {
870 let conn = self
871 .db
872 .lock()
873 .map_err(|_| CoreError::poisoned("database"))?;
874 let _space_name = lookup_space_name(&conn, space_id)?;
875
876 let updated = conn.execute(
877 "UPDATE collections
878 SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
879 WHERE space_id = ?2 AND name = ?3",
880 params![desc, space_id, name],
881 )?;
882
883 if updated == 0 {
884 return Err(KboltError::CollectionNotFound {
885 name: name.to_string(),
886 }
887 .into());
888 }
889
890 Ok(())
891 }
892
893 pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
894 let conn = self
895 .db
896 .lock()
897 .map_err(|_| CoreError::poisoned("database"))?;
898
899 let updated = conn.execute(
900 "UPDATE collections
901 SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
902 WHERE id = ?1",
903 params![collection_id],
904 )?;
905
906 if updated == 0 {
907 return Err(KboltError::CollectionNotFound {
908 name: format!("id={collection_id}"),
909 }
910 .into());
911 }
912
913 Ok(())
914 }
915
916 pub fn upsert_document(
917 &self,
918 collection_id: i64,
919 path: &str,
920 title: &str,
921 title_source: DocumentTitleSource,
922 hash: &str,
923 modified: &str,
924 ) -> Result<i64> {
925 let conn = self
926 .db
927 .lock()
928 .map_err(|_| CoreError::poisoned("database"))?;
929 let _collection_name = lookup_collection_name(&conn, collection_id)?;
930
931 let id = conn.query_row(
932 "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
933 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
934 ON CONFLICT(collection_id, path) DO UPDATE SET
935 title = excluded.title,
936 title_source = excluded.title_source,
937 hash = excluded.hash,
938 modified = excluded.modified,
939 active = 1,
940 deactivated_at = NULL,
941 fts_dirty = 1
942 RETURNING id",
943 params![
944 collection_id,
945 path,
946 title,
947 title_source.as_sql(),
948 hash,
949 modified
950 ],
951 |row| row.get(0),
952 )?;
953 Ok(id)
954 }
955
956 pub fn get_document_by_path(
957 &self,
958 collection_id: i64,
959 path: &str,
960 ) -> Result<Option<DocumentRow>> {
961 let conn = self
962 .db
963 .lock()
964 .map_err(|_| CoreError::poisoned("database"))?;
965 let _collection_name = lookup_collection_name(&conn, collection_id)?;
966 let mut stmt = conn.prepare(
967 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
968 FROM documents
969 WHERE collection_id = ?1 AND path = ?2",
970 )?;
971
972 let result = stmt.query_row(params![collection_id, path], decode_document_row);
973 match result {
974 Ok(row) => Ok(Some(row)),
975 Err(Error::QueryReturnedNoRows) => Ok(None),
976 Err(err) => Err(err.into()),
977 }
978 }
979
980 pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
981 let conn = self
982 .db
983 .lock()
984 .map_err(|_| CoreError::poisoned("database"))?;
985 let mut stmt = conn.prepare(
986 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
987 FROM documents
988 WHERE id = ?1",
989 )?;
990
991 let result = stmt.query_row(params![id], decode_document_row);
992 match result {
993 Ok(row) => Ok(row),
994 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
995 path: format!("id={id}"),
996 }
997 .into()),
998 Err(err) => Err(err.into()),
999 }
1000 }
1001
1002 pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
1003 if ids.is_empty() {
1004 return Ok(Vec::new());
1005 }
1006
1007 let conn = self
1008 .db
1009 .lock()
1010 .map_err(|_| CoreError::poisoned("database"))?;
1011
1012 let placeholders = vec!["?"; ids.len()].join(", ");
1013 let sql = format!(
1014 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1015 FROM documents
1016 WHERE id IN ({placeholders})"
1017 );
1018 let mut stmt = conn.prepare(&sql)?;
1019 let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
1020 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1021 Ok(docs)
1022 }
1023
1024 pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
1025 let conn = self
1026 .db
1027 .lock()
1028 .map_err(|_| CoreError::poisoned("database"))?;
1029
1030 let updated = conn.execute(
1031 "UPDATE documents
1032 SET modified = ?1,
1033 active = 1,
1034 deactivated_at = NULL
1035 WHERE id = ?2",
1036 params![modified, doc_id],
1037 )?;
1038
1039 if updated == 0 {
1040 return Err(KboltError::DocumentNotFound {
1041 path: format!("id={doc_id}"),
1042 }
1043 .into());
1044 }
1045
1046 Ok(())
1047 }
1048
1049 pub fn list_documents(
1050 &self,
1051 collection_id: i64,
1052 active_only: bool,
1053 ) -> Result<Vec<DocumentRow>> {
1054 let conn = self
1055 .db
1056 .lock()
1057 .map_err(|_| CoreError::poisoned("database"))?;
1058 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1059 let active_only = i64::from(active_only);
1060 let mut stmt = conn.prepare(
1061 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1062 FROM documents
1063 WHERE collection_id = ?1
1064 AND (?2 = 0 OR active = 1)
1065 ORDER BY path ASC",
1066 )?;
1067 let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1068 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1069 Ok(docs)
1070 }
1071
1072 pub fn list_collection_file_rows(
1073 &self,
1074 collection_id: i64,
1075 active_only: bool,
1076 ) -> Result<Vec<FileListRow>> {
1077 let conn = self
1078 .db
1079 .lock()
1080 .map_err(|_| CoreError::poisoned("database"))?;
1081 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1082 let active_only = i64::from(active_only);
1083 let mut stmt = conn.prepare(
1084 "SELECT d.id, d.path, d.title, d.hash, d.active,
1085 COUNT(DISTINCT c.id) AS chunk_count,
1086 COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1087 FROM documents d
1088 LEFT JOIN chunks c ON c.doc_id = d.id
1089 LEFT JOIN embeddings e ON e.chunk_id = c.id
1090 WHERE d.collection_id = ?1
1091 AND (?2 = 0 OR d.active = 1)
1092 GROUP BY d.id, d.path, d.title, d.hash, d.active
1093 ORDER BY d.path ASC",
1094 )?;
1095 let rows = stmt.query_map(params![collection_id, active_only], |row| {
1096 let chunk_count: i64 = row.get(5)?;
1097 let embedded_chunk_count: i64 = row.get(6)?;
1098 Ok(FileListRow {
1099 doc_id: row.get(0)?,
1100 path: row.get(1)?,
1101 title: row.get(2)?,
1102 hash: row.get(3)?,
1103 active: row.get::<_, i64>(4)? != 0,
1104 chunk_count: chunk_count as usize,
1105 embedded_chunk_count: embedded_chunk_count as usize,
1106 })
1107 })?;
1108 let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1109 Ok(files)
1110 }
1111
1112 pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1113 let conn = self
1114 .db
1115 .lock()
1116 .map_err(|_| CoreError::poisoned("database"))?;
1117
1118 let pattern = format!("{prefix}%");
1119 let mut stmt = conn.prepare(
1120 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1121 FROM documents
1122 WHERE hash LIKE ?1
1123 ORDER BY id ASC",
1124 )?;
1125 let rows = stmt.query_map(params![pattern], decode_document_row)?;
1126 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1127 Ok(docs)
1128 }
1129
1130 pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1131 let conn = self
1132 .db
1133 .lock()
1134 .map_err(|_| CoreError::poisoned("database"))?;
1135
1136 let updated = conn.execute(
1137 "UPDATE documents
1138 SET active = 0,
1139 deactivated_at = CASE
1140 WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1141 ELSE deactivated_at
1142 END
1143 WHERE id = ?1",
1144 params![doc_id],
1145 )?;
1146
1147 if updated == 0 {
1148 return Err(KboltError::DocumentNotFound {
1149 path: format!("id={doc_id}"),
1150 }
1151 .into());
1152 }
1153
1154 Ok(())
1155 }
1156
1157 pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1158 let conn = self
1159 .db
1160 .lock()
1161 .map_err(|_| CoreError::poisoned("database"))?;
1162
1163 let updated = conn.execute(
1164 "UPDATE documents
1165 SET active = 1, deactivated_at = NULL
1166 WHERE id = ?1",
1167 params![doc_id],
1168 )?;
1169
1170 if updated == 0 {
1171 return Err(KboltError::DocumentNotFound {
1172 path: format!("id={doc_id}"),
1173 }
1174 .into());
1175 }
1176
1177 Ok(())
1178 }
1179
1180 pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1181 let reaped = self.list_reapable_documents(older_than_days)?;
1182 let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1183 self.delete_documents(&doc_ids)?;
1184 Ok(doc_ids)
1185 }
1186
1187 pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1188 self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1189 }
1190
1191 pub fn list_reapable_documents_in_space(
1192 &self,
1193 older_than_days: u32,
1194 space_id: i64,
1195 ) -> Result<Vec<ReapableDocument>> {
1196 self.list_reapable_documents_filtered(
1197 older_than_days,
1198 " AND c.space_id = ?",
1199 vec![SqlValue::Integer(space_id)],
1200 )
1201 }
1202
1203 pub fn list_reapable_documents_in_collections(
1204 &self,
1205 older_than_days: u32,
1206 collection_ids: &[i64],
1207 ) -> Result<Vec<ReapableDocument>> {
1208 if collection_ids.is_empty() {
1209 return Ok(Vec::new());
1210 }
1211
1212 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1213 let clause = format!(" AND d.collection_id IN ({placeholders})");
1214 let params = collection_ids
1215 .iter()
1216 .map(|id| SqlValue::Integer(*id))
1217 .collect::<Vec<_>>();
1218 self.list_reapable_documents_filtered(older_than_days, &clause, params)
1219 }
1220
1221 fn list_reapable_documents_filtered(
1222 &self,
1223 older_than_days: u32,
1224 scope_clause: &str,
1225 scope_params: Vec<SqlValue>,
1226 ) -> Result<Vec<ReapableDocument>> {
1227 let conn = self
1228 .db
1229 .lock()
1230 .map_err(|_| CoreError::poisoned("database"))?;
1231
1232 let modifier = format!("-{} days", older_than_days);
1233 let sql = format!(
1234 "SELECT d.id, s.name
1235 FROM documents d
1236 JOIN collections c ON c.id = d.collection_id
1237 JOIN spaces s ON s.id = c.space_id
1238 WHERE d.active = 0
1239 AND d.deactivated_at IS NOT NULL
1240 AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1241 ORDER BY d.id ASC"
1242 );
1243 let mut stmt = conn.prepare(&sql)?;
1244 let mut params = Vec::with_capacity(scope_params.len() + 1);
1245 params.push(SqlValue::Text(modifier));
1246 params.extend(scope_params);
1247 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1248 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1249 })?;
1250 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1251 drop(stmt);
1252
1253 let mut documents = Vec::with_capacity(headers.len());
1254 for (doc_id, space_name) in headers {
1255 documents.push(ReapableDocument {
1256 doc_id,
1257 space_name,
1258 chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1259 });
1260 }
1261
1262 Ok(documents)
1263 }
1264
1265 pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1266 if doc_ids.is_empty() {
1267 return Ok(());
1268 }
1269
1270 let conn = self
1271 .db
1272 .lock()
1273 .map_err(|_| CoreError::poisoned("database"))?;
1274
1275 let placeholders = vec!["?"; doc_ids.len()].join(", ");
1276 let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1277 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1278 Ok(())
1279 }
1280
1281 pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1282 if chunks.is_empty() {
1283 return Ok(Vec::new());
1284 }
1285
1286 let conn = self
1287 .db
1288 .lock()
1289 .map_err(|_| CoreError::poisoned("database"))?;
1290 let _doc = lookup_document_id(&conn, doc_id)?;
1291
1292 let tx = conn.unchecked_transaction()?;
1293 let mut stmt = tx.prepare(
1294 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1295 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1296 )?;
1297
1298 let mut ids = Vec::with_capacity(chunks.len());
1299 for chunk in chunks {
1300 stmt.execute(params![
1301 doc_id,
1302 chunk.seq,
1303 chunk.offset as i64,
1304 chunk.length as i64,
1305 chunk.heading,
1306 chunk.kind.as_storage_kind(),
1307 chunk.retrieval_prefix.as_deref(),
1308 ])?;
1309 ids.push(tx.last_insert_rowid());
1310 }
1311 drop(stmt);
1312 tx.commit()?;
1313 Ok(ids)
1314 }
1315
1316 pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1317 let conn = self
1318 .db
1319 .lock()
1320 .map_err(|_| CoreError::poisoned("database"))?;
1321 let _doc = lookup_document_id(&conn, doc_id)?;
1322
1323 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1324 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1325 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1326
1327 conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1328 Ok(chunk_ids)
1329 }
1330
1331 pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1332 let conn = self
1333 .db
1334 .lock()
1335 .map_err(|_| CoreError::poisoned("database"))?;
1336 let _doc = lookup_document_id(&conn, doc_id)?;
1337
1338 let mut stmt = conn.prepare(
1339 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1340 FROM chunks
1341 WHERE doc_id = ?1
1342 ORDER BY seq ASC",
1343 )?;
1344 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1345 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1346 Ok(chunks)
1347 }
1348
1349 pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1350 if doc_ids.is_empty() {
1351 return Ok(HashMap::new());
1352 }
1353
1354 let mut requested = doc_ids.to_vec();
1355 requested.sort_unstable();
1356 requested.dedup();
1357
1358 let conn = self
1359 .db
1360 .lock()
1361 .map_err(|_| CoreError::poisoned("database"))?;
1362
1363 let placeholders = vec!["?"; requested.len()].join(", ");
1364 let sql = format!(
1365 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1366 FROM chunks
1367 WHERE doc_id IN ({placeholders})
1368 ORDER BY doc_id ASC, seq ASC"
1369 );
1370 let mut stmt = conn.prepare(&sql)?;
1371 let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1372 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1373 for doc_id in requested {
1374 chunks_by_doc.insert(doc_id, Vec::new());
1375 }
1376 for row in rows {
1377 let chunk = row?;
1378 chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1379 }
1380
1381 Ok(chunks_by_doc)
1382 }
1383
1384 pub fn get_chunks_for_document_seq_ranges(
1385 &self,
1386 ranges: &[(i64, i32, i32)],
1387 ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1388 if ranges.is_empty() {
1389 return Ok(HashMap::new());
1390 }
1391
1392 let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1393 for (doc_id, min_seq, max_seq) in ranges {
1394 if min_seq > max_seq {
1395 return Err(CoreError::Internal(format!(
1396 "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1397 )));
1398 }
1399
1400 ranges_by_doc
1401 .entry(*doc_id)
1402 .or_default()
1403 .push((*min_seq, *max_seq));
1404 }
1405
1406 for doc_ranges in ranges_by_doc.values_mut() {
1407 doc_ranges.sort_unstable();
1408 let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1409 for (min_seq, max_seq) in doc_ranges.drain(..) {
1410 if let Some((_, merged_max)) = merged.last_mut() {
1411 if min_seq <= merged_max.saturating_add(1) {
1412 *merged_max = (*merged_max).max(max_seq);
1413 continue;
1414 }
1415 }
1416 merged.push((min_seq, max_seq));
1417 }
1418 *doc_ranges = merged;
1419 }
1420
1421 let conn = self
1422 .db
1423 .lock()
1424 .map_err(|_| CoreError::poisoned("database"))?;
1425 let mut stmt = conn.prepare(
1426 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1427 FROM chunks
1428 WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1429 ORDER BY seq ASC",
1430 )?;
1431
1432 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1433 for (doc_id, doc_ranges) in ranges_by_doc {
1434 chunks_by_doc.entry(doc_id).or_default();
1435 for (min_seq, max_seq) in doc_ranges {
1436 let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1437 for row in rows {
1438 chunks_by_doc.entry(doc_id).or_default().push(row?);
1439 }
1440 }
1441 }
1442
1443 Ok(chunks_by_doc)
1444 }
1445
1446 pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1447 if chunk_ids.is_empty() {
1448 return Ok(Vec::new());
1449 }
1450
1451 let conn = self
1452 .db
1453 .lock()
1454 .map_err(|_| CoreError::poisoned("database"))?;
1455
1456 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1457 let sql = format!(
1458 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1459 FROM chunks
1460 WHERE id IN ({placeholders})
1461 ORDER BY id ASC"
1462 );
1463 let mut stmt = conn.prepare(&sql)?;
1464 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1465 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1466 Ok(chunks)
1467 }
1468
1469 pub fn get_active_search_scope_summary_in_collections(
1470 &self,
1471 collection_ids: &[i64],
1472 ) -> Result<SearchScopeSummary> {
1473 if collection_ids.is_empty() {
1474 return Ok(SearchScopeSummary {
1475 document_ids: Vec::new(),
1476 chunk_count: 0,
1477 });
1478 }
1479
1480 let conn = self
1481 .db
1482 .lock()
1483 .map_err(|_| CoreError::poisoned("database"))?;
1484
1485 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1486 let sql = format!(
1487 "SELECT d.id, COUNT(c.id)
1488 FROM chunks c
1489 JOIN documents d ON d.id = c.doc_id
1490 WHERE d.active = 1
1491 AND d.collection_id IN ({placeholders})
1492 GROUP BY d.id
1493 ORDER BY d.id ASC"
1494 );
1495 let mut stmt = conn.prepare(&sql)?;
1496 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1497 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1498 })?;
1499
1500 let mut document_ids = Vec::new();
1501 let mut chunk_count = 0usize;
1502 for row in rows {
1503 let (doc_id, doc_chunk_count) = row?;
1504 let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1505 CoreError::Internal(format!(
1506 "active chunk count must be non-negative for document {doc_id}"
1507 ))
1508 })?;
1509 document_ids.push(doc_id);
1510 chunk_count = chunk_count.saturating_add(doc_chunk_count);
1511 }
1512
1513 Ok(SearchScopeSummary {
1514 document_ids,
1515 chunk_count,
1516 })
1517 }
1518
1519 pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1520 if collection_ids.is_empty() {
1521 return Ok(0);
1522 }
1523
1524 let conn = self
1525 .db
1526 .lock()
1527 .map_err(|_| CoreError::poisoned("database"))?;
1528
1529 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1530 let sql = format!(
1531 "SELECT COUNT(*)
1532 FROM chunks c
1533 JOIN documents d ON d.id = c.doc_id
1534 WHERE d.active = 1
1535 AND d.collection_id IN ({placeholders})"
1536 );
1537 query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1538 }
1539
1540 pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1541 if collection_ids.is_empty() {
1542 return Ok(false);
1543 }
1544
1545 let conn = self
1546 .db
1547 .lock()
1548 .map_err(|_| CoreError::poisoned("database"))?;
1549
1550 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1551 let sql = format!(
1552 "SELECT EXISTS(
1553 SELECT 1
1554 FROM documents
1555 WHERE active = 0
1556 AND collection_id IN ({placeholders})
1557 )"
1558 );
1559 let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1560 row.get(0)
1561 })?;
1562 Ok(exists != 0)
1563 }
1564
1565 pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1566 if collection_ids.is_empty() {
1567 return Ok(Vec::new());
1568 }
1569
1570 let conn = self
1571 .db
1572 .lock()
1573 .map_err(|_| CoreError::poisoned("database"))?;
1574
1575 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1576 let sql = format!(
1577 "SELECT c.id
1578 FROM chunks c
1579 JOIN documents d ON d.id = c.doc_id
1580 WHERE d.active = 1
1581 AND d.collection_id IN ({placeholders})
1582 ORDER BY d.id ASC, c.seq ASC"
1583 );
1584 let mut stmt = conn.prepare(&sql)?;
1585 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1586 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1587 Ok(chunk_ids)
1588 }
1589
1590 pub fn replace_document_generation(
1591 &self,
1592 replacement: DocumentGenerationReplace<'_>,
1593 ) -> Result<DocumentGenerationReplaceResult> {
1594 for chunk in replacement.chunks {
1595 validate_text_span(
1596 replacement.text,
1597 chunk.offset,
1598 chunk.length,
1599 &format!("chunk seq {}", chunk.seq),
1600 )?;
1601 }
1602
1603 let conn = self
1604 .db
1605 .lock()
1606 .map_err(|_| CoreError::poisoned("database"))?;
1607 let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1608
1609 let tx = conn.unchecked_transaction()?;
1610 let doc_id: i64 = tx.query_row(
1611 "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
1612 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
1613 ON CONFLICT(collection_id, path) DO UPDATE SET
1614 title = excluded.title,
1615 title_source = excluded.title_source,
1616 hash = excluded.hash,
1617 modified = excluded.modified,
1618 active = 1,
1619 deactivated_at = NULL,
1620 fts_dirty = 1
1621 RETURNING id",
1622 params![
1623 replacement.collection_id,
1624 replacement.path,
1625 replacement.title,
1626 replacement.title_source.as_sql(),
1627 replacement.hash,
1628 replacement.modified
1629 ],
1630 |row| row.get(0),
1631 )?;
1632
1633 let old_chunk_ids = {
1634 let mut stmt =
1635 tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1636 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1637 rows.collect::<std::result::Result<Vec<_>, _>>()?
1638 };
1639
1640 tx.execute(
1641 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1642 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1643 ON CONFLICT(doc_id) DO UPDATE SET
1644 extractor_key = excluded.extractor_key,
1645 source_hash = excluded.source_hash,
1646 text_hash = excluded.text_hash,
1647 generation_key = excluded.generation_key,
1648 text = excluded.text,
1649 created = excluded.created",
1650 params![
1651 doc_id,
1652 replacement.extractor_key,
1653 replacement.source_hash,
1654 replacement.text_hash,
1655 replacement.generation_key,
1656 replacement.text
1657 ],
1658 )?;
1659
1660 tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1661
1662 let mut stmt = tx.prepare(
1663 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1664 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1665 )?;
1666 let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1667 for chunk in replacement.chunks {
1668 stmt.execute(params![
1669 doc_id,
1670 chunk.seq,
1671 chunk.offset as i64,
1672 chunk.length as i64,
1673 chunk.heading,
1674 chunk.kind.as_storage_kind(),
1675 chunk.retrieval_prefix.as_deref(),
1676 ])?;
1677 chunk_ids.push(tx.last_insert_rowid());
1678 }
1679 drop(stmt);
1680
1681 tx.commit()?;
1682 Ok(DocumentGenerationReplaceResult {
1683 doc_id,
1684 old_chunk_ids,
1685 chunk_ids,
1686 })
1687 }
1688
1689 pub fn put_document_text(
1690 &self,
1691 doc_id: i64,
1692 extractor_key: &str,
1693 source_hash: &str,
1694 text_hash: &str,
1695 generation_key: &str,
1696 text: &str,
1697 ) -> Result<()> {
1698 let conn = self
1699 .db
1700 .lock()
1701 .map_err(|_| CoreError::poisoned("database"))?;
1702 let _doc = lookup_document_id(&conn, doc_id)?;
1703
1704 conn.execute(
1705 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1706 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1707 ON CONFLICT(doc_id) DO UPDATE SET
1708 extractor_key = excluded.extractor_key,
1709 source_hash = excluded.source_hash,
1710 text_hash = excluded.text_hash,
1711 generation_key = excluded.generation_key,
1712 text = excluded.text,
1713 created = excluded.created",
1714 params![
1715 doc_id,
1716 extractor_key,
1717 source_hash,
1718 text_hash,
1719 generation_key,
1720 text
1721 ],
1722 )?;
1723 Ok(())
1724 }
1725
1726 pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1727 let conn = self
1728 .db
1729 .lock()
1730 .map_err(|_| CoreError::poisoned("database"))?;
1731 let _doc = lookup_document_id(&conn, doc_id)?;
1732
1733 let result = conn.query_row(
1734 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1735 FROM document_texts
1736 WHERE doc_id = ?1",
1737 params![doc_id],
1738 decode_document_text_row,
1739 );
1740 match result {
1741 Ok(row) => Ok(row),
1742 Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1743 Err(err) => Err(err.into()),
1744 }
1745 }
1746
1747 pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1748 if doc_ids.is_empty() {
1749 return Ok(HashMap::new());
1750 }
1751
1752 let mut requested = doc_ids.to_vec();
1753 requested.sort_unstable();
1754 requested.dedup();
1755
1756 let text_by_doc = self.get_existing_document_texts(&requested)?;
1757 for doc_id in requested {
1758 if !text_by_doc.contains_key(&doc_id) {
1759 return Err(missing_document_text_error(doc_id));
1760 }
1761 }
1762
1763 Ok(text_by_doc)
1764 }
1765
1766 pub fn get_existing_document_texts(
1767 &self,
1768 doc_ids: &[i64],
1769 ) -> Result<HashMap<i64, DocumentTextRow>> {
1770 if doc_ids.is_empty() {
1771 return Ok(HashMap::new());
1772 }
1773
1774 let mut requested = doc_ids.to_vec();
1775 requested.sort_unstable();
1776 requested.dedup();
1777
1778 let conn = self
1779 .db
1780 .lock()
1781 .map_err(|_| CoreError::poisoned("database"))?;
1782 let mut text_by_doc = HashMap::new();
1783 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1784 let placeholders = vec!["?"; batch.len()].join(", ");
1785 let sql = format!(
1786 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1787 FROM document_texts
1788 WHERE doc_id IN ({placeholders})
1789 ORDER BY doc_id ASC"
1790 );
1791 let mut stmt = conn.prepare(&sql)?;
1792 let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1793 for row in rows {
1794 let row = row?;
1795 text_by_doc.insert(row.doc_id, row);
1796 }
1797 }
1798
1799 Ok(text_by_doc)
1800 }
1801
1802 pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1803 let conn = self
1804 .db
1805 .lock()
1806 .map_err(|_| CoreError::poisoned("database"))?;
1807 let exists: i64 = conn.query_row(
1808 "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1809 params![doc_id],
1810 |row| row.get(0),
1811 )?;
1812 Ok(exists != 0)
1813 }
1814
1815 pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1816 let conn = self
1817 .db
1818 .lock()
1819 .map_err(|_| CoreError::poisoned("database"))?;
1820 let exists: i64 = conn.query_row(
1821 "SELECT EXISTS(
1822 SELECT 1 FROM document_texts
1823 WHERE doc_id = ?1 AND generation_key = ?2
1824 )",
1825 params![doc_id, generation_key],
1826 |row| row.get(0),
1827 )?;
1828 Ok(exists != 0)
1829 }
1830
1831 pub fn get_document_text_generation_keys(
1832 &self,
1833 doc_ids: &[i64],
1834 ) -> Result<HashMap<i64, String>> {
1835 if doc_ids.is_empty() {
1836 return Ok(HashMap::new());
1837 }
1838
1839 let mut requested = doc_ids.to_vec();
1840 requested.sort_unstable();
1841 requested.dedup();
1842
1843 let conn = self
1844 .db
1845 .lock()
1846 .map_err(|_| CoreError::poisoned("database"))?;
1847 let mut generation_by_doc = HashMap::with_capacity(requested.len());
1848 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1849 let placeholders = vec!["?"; batch.len()].join(", ");
1850 let sql = format!(
1851 "SELECT doc_id, generation_key
1852 FROM document_texts
1853 WHERE doc_id IN ({placeholders})"
1854 );
1855 let mut stmt = conn.prepare(&sql)?;
1856 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1857 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1858 })?;
1859 for row in rows {
1860 let (doc_id, generation_key) = row?;
1861 generation_by_doc.insert(doc_id, generation_key);
1862 }
1863 }
1864
1865 Ok(generation_by_doc)
1866 }
1867
1868 pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1869 if doc_ids.is_empty() {
1870 return Ok(HashMap::new());
1871 }
1872
1873 let mut requested = doc_ids.to_vec();
1874 requested.sort_unstable();
1875 requested.dedup();
1876
1877 let conn = self
1878 .db
1879 .lock()
1880 .map_err(|_| CoreError::poisoned("database"))?;
1881 let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1882 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1883 let placeholders = vec!["?"; batch.len()].join(", ");
1884 let sql = format!(
1885 "SELECT doc_id, extractor_key
1886 FROM document_texts
1887 WHERE doc_id IN ({placeholders})"
1888 );
1889 let mut stmt = conn.prepare(&sql)?;
1890 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1891 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1892 })?;
1893 for row in rows {
1894 let (doc_id, extractor_key) = row?;
1895 extractor_by_doc.insert(doc_id, extractor_key);
1896 }
1897 }
1898
1899 for doc_id in requested {
1900 if !extractor_by_doc.contains_key(&doc_id) {
1901 return Err(missing_document_text_error(doc_id));
1902 }
1903 }
1904
1905 Ok(extractor_by_doc)
1906 }
1907
1908 pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1909 if chunk_ids.is_empty() {
1910 return Ok(HashMap::new());
1911 }
1912
1913 let mut requested = chunk_ids.to_vec();
1914 requested.sort_unstable();
1915 requested.dedup();
1916
1917 let conn = self
1918 .db
1919 .lock()
1920 .map_err(|_| CoreError::poisoned("database"))?;
1921 let mut text_by_chunk = HashMap::with_capacity(requested.len());
1922 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1923 let placeholders = vec!["?"; batch.len()].join(", ");
1924 let sql = format!(
1925 "SELECT c.id,
1926 c.offset,
1927 c.length,
1928 length(CAST(dt.text AS BLOB)),
1929 substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1930 substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1931 substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1932 FROM chunks c
1933 JOIN document_texts dt ON dt.doc_id = c.doc_id
1934 WHERE c.id IN ({placeholders})"
1935 );
1936 let mut stmt = conn.prepare(&sql)?;
1937 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1938 Ok((
1939 row.get::<_, i64>(0)?,
1940 row.get::<_, i64>(1)?,
1941 row.get::<_, i64>(2)?,
1942 row.get::<_, i64>(3)?,
1943 row.get::<_, Vec<u8>>(4)?,
1944 row.get::<_, Vec<u8>>(5)?,
1945 row.get::<_, Vec<u8>>(6)?,
1946 ))
1947 })?;
1948 for row in rows {
1949 let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1950 let text = canonical_chunk_slice_from_bytes(
1951 chunk_id,
1952 offset,
1953 length,
1954 document_len,
1955 start_byte,
1956 end_byte,
1957 bytes,
1958 )?;
1959 text_by_chunk.insert(chunk_id, text);
1960 }
1961 }
1962
1963 for chunk_id in requested {
1964 if !text_by_chunk.contains_key(&chunk_id) {
1965 return Err(CoreError::Internal(format!(
1966 "canonical text missing for chunk {chunk_id}"
1967 )));
1968 }
1969 }
1970
1971 Ok(text_by_chunk)
1972 }
1973
1974 pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
1975 let conn = self
1976 .db
1977 .lock()
1978 .map_err(|_| CoreError::poisoned("database"))?;
1979 let result = conn.query_row(
1980 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
1981 FROM chunks c
1982 JOIN document_texts dt ON dt.doc_id = c.doc_id
1983 WHERE c.id = ?1",
1984 params![chunk_id],
1985 |row| {
1986 let chunk = decode_chunk_row(row)?;
1987 let extractor_key: String = row.get(8)?;
1988 let document_text: String = row.get(9)?;
1989 Ok((chunk, extractor_key, document_text))
1990 },
1991 );
1992 let (chunk, extractor_key, document_text) = match result {
1993 Ok(row) => row,
1994 Err(Error::QueryReturnedNoRows) => {
1995 let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
1996 return Err(missing_document_text_error(doc_id));
1997 }
1998 Err(err) => return Err(err.into()),
1999 };
2000 let text = chunk_text_from_canonical(&document_text, &chunk)?;
2001 Ok(ChunkTextRow {
2002 chunk,
2003 extractor_key,
2004 text,
2005 })
2006 }
2007
2008 pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2009 if entries.is_empty() {
2010 return Ok(());
2011 }
2012
2013 let conn = self
2014 .db
2015 .lock()
2016 .map_err(|_| CoreError::poisoned("database"))?;
2017
2018 let tx = conn.unchecked_transaction()?;
2019 let mut stmt = tx.prepare(
2020 "INSERT INTO embeddings (chunk_id, model, embedded_at)
2021 VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2022 ON CONFLICT(chunk_id, model) DO UPDATE SET
2023 embedded_at = excluded.embedded_at",
2024 )?;
2025
2026 for (chunk_id, model) in entries {
2027 stmt.execute(params![chunk_id, model])?;
2028 }
2029
2030 drop(stmt);
2031 tx.commit()?;
2032 Ok(())
2033 }
2034
2035 pub fn get_unembedded_chunks(
2036 &self,
2037 model: &str,
2038 after_chunk_id: i64,
2039 limit: usize,
2040 ) -> Result<Vec<EmbedRecord>> {
2041 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2042 }
2043
2044 pub fn get_unembedded_chunks_in_space(
2045 &self,
2046 model: &str,
2047 space_id: i64,
2048 after_chunk_id: i64,
2049 limit: usize,
2050 ) -> Result<Vec<EmbedRecord>> {
2051 self.get_unembedded_chunks_filtered(
2052 model,
2053 after_chunk_id,
2054 limit,
2055 " AND col.space_id = ?",
2056 vec![SqlValue::Integer(space_id)],
2057 )
2058 }
2059
2060 pub fn get_unembedded_chunks_in_collections(
2061 &self,
2062 model: &str,
2063 collection_ids: &[i64],
2064 after_chunk_id: i64,
2065 limit: usize,
2066 ) -> Result<Vec<EmbedRecord>> {
2067 if collection_ids.is_empty() {
2068 return Ok(Vec::new());
2069 }
2070
2071 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2072 let clause = format!(" AND d.collection_id IN ({placeholders})");
2073 let params = collection_ids
2074 .iter()
2075 .map(|id| SqlValue::Integer(*id))
2076 .collect::<Vec<_>>();
2077 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2078 }
2079
2080 fn get_unembedded_chunks_filtered(
2081 &self,
2082 model: &str,
2083 after_chunk_id: i64,
2084 limit: usize,
2085 scope_clause: &str,
2086 scope_params: Vec<SqlValue>,
2087 ) -> Result<Vec<EmbedRecord>> {
2088 let conn = self
2089 .db
2090 .lock()
2091 .map_err(|_| CoreError::poisoned("database"))?;
2092 let sql_limit = i64::try_from(limit)
2093 .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2094
2095 let sql = format!(
2096 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2097 d.path, col.path, s.name
2098 FROM chunks c
2099 JOIN documents d ON d.id = c.doc_id
2100 JOIN collections col ON col.id = d.collection_id
2101 JOIN spaces s ON s.id = col.space_id
2102 LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2103 WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2104 ORDER BY c.id ASC
2105 LIMIT ?"
2106 );
2107 let mut stmt = conn.prepare(&sql)?;
2108 let mut params = Vec::with_capacity(scope_params.len() + 3);
2109 params.push(SqlValue::Text(model.to_string()));
2110 params.push(SqlValue::Integer(after_chunk_id));
2111 params.extend(scope_params);
2112 params.push(SqlValue::Integer(sql_limit));
2113 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2114 Ok(EmbedRecord {
2115 chunk: decode_chunk_row(row)?,
2116 doc_path: row.get(8)?,
2117 collection_path: PathBuf::from(row.get::<_, String>(9)?),
2118 space_name: row.get(10)?,
2119 })
2120 })?;
2121
2122 let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2123 Ok(records)
2124 }
2125
2126 pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2127 let conn = self
2128 .db
2129 .lock()
2130 .map_err(|_| CoreError::poisoned("database"))?;
2131
2132 let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2133 Ok(deleted)
2134 }
2135
2136 pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2137 let conn = self
2138 .db
2139 .lock()
2140 .map_err(|_| CoreError::poisoned("database"))?;
2141 let _space_name = lookup_space_name(&conn, space_id)?;
2142
2143 let deleted = conn.execute(
2144 "DELETE FROM embeddings
2145 WHERE chunk_id IN (
2146 SELECT c.id
2147 FROM chunks c
2148 JOIN documents d ON d.id = c.doc_id
2149 JOIN collections col ON col.id = d.collection_id
2150 WHERE col.space_id = ?1
2151 )",
2152 params![space_id],
2153 )?;
2154 Ok(deleted)
2155 }
2156
2157 pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2158 let conn = self
2159 .db
2160 .lock()
2161 .map_err(|_| CoreError::poisoned("database"))?;
2162 let _space_name = lookup_space_name(&conn, space_id)?;
2163
2164 let mut stmt = conn.prepare(
2165 "SELECT DISTINCT e.model
2166 FROM embeddings e
2167 JOIN chunks c ON c.id = e.chunk_id
2168 JOIN documents d ON d.id = c.doc_id
2169 JOIN collections col ON col.id = d.collection_id
2170 WHERE col.space_id = ?1
2171 ORDER BY e.model ASC",
2172 )?;
2173 let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2174 let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2175 Ok(models)
2176 }
2177
2178 pub fn count_embeddings(&self) -> Result<usize> {
2179 let conn = self
2180 .db
2181 .lock()
2182 .map_err(|_| CoreError::poisoned("database"))?;
2183
2184 let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2185 Ok(count as usize)
2186 }
2187
2188 pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2189 if entries.is_empty() {
2190 return Ok(());
2191 }
2192
2193 let space_indexes = self.get_space_indexes(space)?;
2194 with_tantivy_writer(&space_indexes, |writer| {
2195 for entry in entries {
2196 let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2197 CoreError::Internal(format!(
2198 "chunk_id must be non-negative for tantivy indexing: {}",
2199 entry.chunk_id
2200 ))
2201 })?;
2202 let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2203 CoreError::Internal(format!(
2204 "doc_id must be non-negative for tantivy indexing: {}",
2205 entry.doc_id
2206 ))
2207 })?;
2208
2209 let mut doc = TantivyDocument::default();
2210 doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2211 doc.add_u64(space_indexes.fields.doc_id, doc_id);
2212 doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2213 if let Some(title) = &entry.semantic_title {
2214 doc.add_text(space_indexes.fields.title, title);
2215 }
2216 if let Some(heading) = &entry.heading {
2217 doc.add_text(space_indexes.fields.heading, heading);
2218 }
2219 doc.add_text(space_indexes.fields.body, &entry.body);
2220 writer.add_document(doc)?;
2221 }
2222 Ok(())
2223 })
2224 }
2225
2226 pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2227 if chunk_ids.is_empty() {
2228 return Ok(());
2229 }
2230
2231 let space_indexes = self.get_space_indexes(space)?;
2232 with_tantivy_writer(&space_indexes, |writer| {
2233 for chunk_id in chunk_ids {
2234 let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2235 CoreError::Internal(format!(
2236 "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2237 ))
2238 })?;
2239 writer.delete_term(Term::from_field_u64(
2240 space_indexes.fields.chunk_id,
2241 chunk_key,
2242 ));
2243 }
2244
2245 Ok(())
2246 })
2247 }
2248
2249 pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2250 let space_indexes = self.get_space_indexes(space)?;
2251 with_tantivy_writer(&space_indexes, |writer| {
2252 let doc_key = u64::try_from(doc_id).map_err(|_| {
2253 CoreError::Internal(format!(
2254 "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2255 ))
2256 })?;
2257 writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2258 Ok(())
2259 })
2260 }
2261
2262 pub fn query_bm25(
2263 &self,
2264 space: &str,
2265 query: &str,
2266 fields: &[(&str, f32)],
2267 limit: usize,
2268 ) -> Result<Vec<BM25Hit>> {
2269 self.query_bm25_filtered(space, query, fields, None, limit, true)
2270 }
2271
2272 pub(crate) fn query_bm25_cached(
2273 &self,
2274 space: &str,
2275 query: &str,
2276 fields: &[(&str, f32)],
2277 limit: usize,
2278 ) -> Result<Vec<BM25Hit>> {
2279 self.query_bm25_filtered(space, query, fields, None, limit, false)
2280 }
2281
2282 pub fn query_bm25_in_documents(
2283 &self,
2284 space: &str,
2285 query: &str,
2286 fields: &[(&str, f32)],
2287 document_ids: &[i64],
2288 limit: usize,
2289 ) -> Result<Vec<BM25Hit>> {
2290 if document_ids.is_empty() {
2291 return Ok(Vec::new());
2292 }
2293
2294 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2295 }
2296
2297 pub(crate) fn query_bm25_in_documents_cached(
2298 &self,
2299 space: &str,
2300 query: &str,
2301 fields: &[(&str, f32)],
2302 document_ids: &[i64],
2303 limit: usize,
2304 ) -> Result<Vec<BM25Hit>> {
2305 if document_ids.is_empty() {
2306 return Ok(Vec::new());
2307 }
2308
2309 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2310 }
2311
2312 fn query_bm25_filtered(
2313 &self,
2314 space: &str,
2315 query: &str,
2316 fields: &[(&str, f32)],
2317 document_ids: Option<&[i64]>,
2318 limit: usize,
2319 reload_reader: bool,
2320 ) -> Result<Vec<BM25Hit>> {
2321 if limit == 0 {
2322 return Ok(Vec::new());
2323 }
2324
2325 if fields.is_empty() {
2326 return Err(CoreError::Internal(
2327 "bm25 query requires at least one field".to_string(),
2328 ));
2329 }
2330
2331 let space_indexes = self.get_space_indexes(space)?;
2332
2333 let schema = space_indexes.tantivy_index.schema();
2334 let query_fields = fields
2335 .iter()
2336 .map(|(name, boost)| {
2337 let field = resolve_tantivy_field(space_indexes.fields, name)?;
2338 let field_entry = schema.get_field_entry(field);
2339 let index_record_option = field_entry
2340 .field_type()
2341 .get_index_record_option()
2342 .ok_or_else(|| {
2343 CoreError::Internal(format!(
2344 "bm25 field '{}' is not indexed",
2345 field_entry.name()
2346 ))
2347 })?;
2348 Ok(Bm25FieldSpec {
2349 field,
2350 boost: *boost,
2351 index_record_option,
2352 })
2353 })
2354 .collect::<Result<Vec<_>>>()?;
2355 let Some(parsed_query) =
2356 build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2357 else {
2358 return Ok(Vec::new());
2359 };
2360 let parsed_query = if let Some(document_ids) = document_ids {
2361 Box::new(BooleanQuery::new(vec![
2362 (Occur::Must, parsed_query),
2363 (
2364 Occur::Must,
2365 build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2366 ),
2367 ])) as Box<dyn Query>
2368 } else {
2369 parsed_query
2370 };
2371 if reload_reader {
2372 space_indexes.tantivy_reader.reload()?;
2373 }
2374 let searcher = space_indexes.tantivy_reader.searcher();
2375 let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2376
2377 let mut hits = Vec::with_capacity(docs.len());
2378 let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2379 let mut chunk_id_columns = HashMap::new();
2380 for (score, address) in docs {
2381 let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2382 std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2383 std::collections::hash_map::Entry::Vacant(entry) => {
2384 let column = searcher
2385 .segment_reader(address.segment_ord)
2386 .fast_fields()
2387 .u64(chunk_id_field_name)?;
2388 entry.insert(column)
2389 }
2390 };
2391 let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2392 CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2393 })?;
2394 let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2395 CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2396 })?;
2397 hits.push(BM25Hit { chunk_id, score });
2398 }
2399
2400 Ok(hits)
2401 }
2402
2403 pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2404 let space_indexes = self.get_space_indexes(space)?;
2405 space_indexes.tantivy_reader.reload()?;
2406 Ok(())
2407 }
2408
2409 pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2410 let space_indexes = self.get_space_indexes(space)?;
2411 let mut writer = space_indexes
2412 .tantivy_writer
2413 .lock()
2414 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2415 let Some(mut writer) = writer.take() else {
2416 return Ok(());
2417 };
2418 writer.commit()?;
2419 space_indexes.tantivy_reader.reload()?;
2420 Ok(())
2421 }
2422
2423 pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2424 self.batch_insert_usearch(space, &[(key, vector)])
2425 }
2426
2427 pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2428 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2429 }
2430
2431 pub(crate) fn batch_insert_usearch_deferred(
2432 &self,
2433 space: &str,
2434 entries: &[(i64, &[f32])],
2435 ) -> Result<()> {
2436 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2437 }
2438
2439 fn batch_insert_usearch_with_save_mode(
2440 &self,
2441 space: &str,
2442 entries: &[(i64, &[f32])],
2443 save_mode: UsearchSaveMode,
2444 ) -> Result<()> {
2445 if entries.is_empty() {
2446 return Ok(());
2447 }
2448
2449 let space_indexes = self.get_space_indexes(space)?;
2450
2451 let expected_dimensions = entries[0].1.len();
2452 if expected_dimensions == 0 {
2453 return Err(CoreError::Internal(
2454 "cannot insert empty vector into usearch index".to_string(),
2455 ));
2456 }
2457 for (_, vector) in entries {
2458 if vector.len() != expected_dimensions {
2459 return Err(CoreError::Internal(format!(
2460 "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2461 vector.len()
2462 )));
2463 }
2464 }
2465
2466 let mut index = space_indexes
2467 .usearch_index
2468 .write()
2469 .map_err(|_| CoreError::poisoned("usearch index"))?;
2470 let ensure_started = std::time::Instant::now();
2471 ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2472 crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2473
2474 let target_capacity = index.size().saturating_add(entries.len());
2475 let reserve_started = std::time::Instant::now();
2476 index.reserve(target_capacity).map_err(|err| {
2477 CoreError::Internal(format!(
2478 "usearch reserve failed for {target_capacity} items: {err}"
2479 ))
2480 })?;
2481 crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2482
2483 if matches!(save_mode, UsearchSaveMode::Deferred) {
2484 self.mark_usearch_dirty(space)?;
2485 }
2486
2487 let add_started = std::time::Instant::now();
2488 for (key, vector) in entries {
2489 let key = u64::try_from(*key).map_err(|_| {
2490 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2491 })?;
2492 index
2493 .add::<f32>(key, vector)
2494 .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2495 }
2496 crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2497
2498 if matches!(save_mode, UsearchSaveMode::Immediate) {
2499 let save_started = std::time::Instant::now();
2500 save_usearch_index(&index, &space_indexes.usearch_path)?;
2501 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2502 }
2503 Ok(())
2504 }
2505
2506 pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2507 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2508 }
2509
2510 pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2511 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2512 }
2513
2514 fn delete_usearch_with_save_mode(
2515 &self,
2516 space: &str,
2517 keys: &[i64],
2518 save_mode: UsearchSaveMode,
2519 ) -> Result<()> {
2520 if keys.is_empty() {
2521 return Ok(());
2522 }
2523
2524 let space_indexes = self.get_space_indexes(space)?;
2525 let index = space_indexes
2526 .usearch_index
2527 .write()
2528 .map_err(|_| CoreError::poisoned("usearch index"))?;
2529
2530 if matches!(save_mode, UsearchSaveMode::Deferred) {
2531 self.mark_usearch_dirty(space)?;
2532 }
2533
2534 let delete_started = std::time::Instant::now();
2535 for key in keys {
2536 let key = u64::try_from(*key).map_err(|_| {
2537 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2538 })?;
2539 index
2540 .remove(key)
2541 .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2542 }
2543 crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2544
2545 if matches!(save_mode, UsearchSaveMode::Immediate) {
2546 let save_started = std::time::Instant::now();
2547 save_usearch_index(&index, &space_indexes.usearch_path)?;
2548 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2549 }
2550 Ok(())
2551 }
2552
2553 pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2554 let spaces = {
2555 let mut dirty = self
2556 .dirty_usearch_spaces
2557 .lock()
2558 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2559 if dirty.is_empty() {
2560 return Ok(());
2561 }
2562
2563 let mut spaces = dirty.drain().collect::<Vec<_>>();
2564 spaces.sort();
2565 spaces
2566 };
2567
2568 for (index, space) in spaces.iter().enumerate() {
2569 if let Err(err) = self.save_usearch_for_space(space) {
2570 let mut dirty = self
2571 .dirty_usearch_spaces
2572 .lock()
2573 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2574 for unsaved in &spaces[index..] {
2575 dirty.insert(unsaved.clone());
2576 }
2577 return Err(err);
2578 }
2579 crate::profile::increment_update_count("usearch_saved_spaces", 1);
2580 }
2581
2582 Ok(())
2583 }
2584
2585 fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2586 self.dirty_usearch_spaces
2587 .lock()
2588 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2589 .insert(space.to_string());
2590 Ok(())
2591 }
2592
2593 fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2594 let space_indexes = self.get_space_indexes(space)?;
2595 let index = space_indexes
2596 .usearch_index
2597 .read()
2598 .map_err(|_| CoreError::poisoned("usearch index"))?;
2599
2600 let save_started = std::time::Instant::now();
2601 save_usearch_index(&index, &space_indexes.usearch_path)?;
2602 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2603 Ok(())
2604 }
2605
2606 pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2607 self.query_dense_filtered(space, vector, None, limit)
2608 }
2609
2610 pub fn query_dense_in_chunks(
2611 &self,
2612 space: &str,
2613 vector: &[f32],
2614 chunk_ids: &[i64],
2615 limit: usize,
2616 ) -> Result<Vec<DenseHit>> {
2617 if chunk_ids.is_empty() {
2618 return Ok(Vec::new());
2619 }
2620
2621 let allowed_keys = chunk_ids
2622 .iter()
2623 .map(|chunk_id| {
2624 u64::try_from(*chunk_id).map_err(|_| {
2625 CoreError::Internal(format!(
2626 "chunk_id must be non-negative for usearch query: {chunk_id}"
2627 ))
2628 })
2629 })
2630 .collect::<Result<HashSet<_>>>()?;
2631 self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2632 }
2633
2634 pub(crate) fn query_dense_in_key_set(
2635 &self,
2636 space: &str,
2637 vector: &[f32],
2638 allowed_keys: &HashSet<u64>,
2639 limit: usize,
2640 ) -> Result<Vec<DenseHit>> {
2641 if allowed_keys.is_empty() {
2642 return Ok(Vec::new());
2643 }
2644
2645 self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2646 }
2647
2648 fn query_dense_filtered(
2649 &self,
2650 space: &str,
2651 vector: &[f32],
2652 allowed_keys: Option<&HashSet<u64>>,
2653 limit: usize,
2654 ) -> Result<Vec<DenseHit>> {
2655 if limit == 0 {
2656 return Ok(Vec::new());
2657 }
2658 if vector.is_empty() {
2659 return Err(CoreError::Internal(
2660 "cannot query usearch with empty vector".to_string(),
2661 ));
2662 }
2663
2664 let space_indexes = self.get_space_indexes(space)?;
2665 let index = space_indexes
2666 .usearch_index
2667 .read()
2668 .map_err(|_| CoreError::poisoned("usearch index"))?;
2669
2670 if index.size() == 0 {
2671 return Ok(Vec::new());
2672 }
2673 if vector.len() != index.dimensions() {
2674 return Err(CoreError::Internal(format!(
2675 "query vector dimension mismatch: expected {}, got {}",
2676 index.dimensions(),
2677 vector.len()
2678 )));
2679 }
2680
2681 let matches = if let Some(allowed_keys) = allowed_keys {
2682 index
2683 .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2684 .map_err(|err| {
2685 CoreError::Internal(format!("usearch filtered query failed: {err}"))
2686 })?
2687 } else {
2688 index
2689 .search::<f32>(vector, limit)
2690 .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2691 };
2692 let hits = matches
2693 .keys
2694 .into_iter()
2695 .zip(matches.distances)
2696 .map(|(key, distance)| DenseHit {
2697 chunk_id: key as i64,
2698 distance,
2699 })
2700 .collect();
2701 Ok(hits)
2702 }
2703
2704 pub fn count_usearch(&self, space: &str) -> Result<usize> {
2705 let space_indexes = self.get_space_indexes(space)?;
2706 let index = space_indexes
2707 .usearch_index
2708 .read()
2709 .map_err(|_| CoreError::poisoned("usearch index"))?;
2710 Ok(index.size())
2711 }
2712
2713 pub fn clear_usearch(&self, space: &str) -> Result<()> {
2714 let space_indexes = self.get_space_indexes(space)?;
2715 let index = space_indexes
2716 .usearch_index
2717 .write()
2718 .map_err(|_| CoreError::poisoned("usearch index"))?;
2719 let clear_started = std::time::Instant::now();
2720 index
2721 .reset()
2722 .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2723 std::fs::File::create(&space_indexes.usearch_path)?;
2724 crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2725 Ok(())
2726 }
2727
2728 pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2729 self.get_fts_dirty_documents_filtered("", Vec::new())
2730 }
2731
2732 pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2733 self.get_fts_dirty_documents_filtered(
2734 " AND c.space_id = ?",
2735 vec![SqlValue::Integer(space_id)],
2736 )
2737 }
2738
2739 pub fn get_fts_dirty_documents_in_collections(
2740 &self,
2741 collection_ids: &[i64],
2742 ) -> Result<Vec<FtsDirtyRecord>> {
2743 if collection_ids.is_empty() {
2744 return Ok(Vec::new());
2745 }
2746
2747 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2748 let clause = format!(" AND d.collection_id IN ({placeholders})");
2749 let params = collection_ids
2750 .iter()
2751 .map(|id| SqlValue::Integer(*id))
2752 .collect::<Vec<_>>();
2753 self.get_fts_dirty_documents_filtered(&clause, params)
2754 }
2755
2756 fn get_fts_dirty_documents_filtered(
2757 &self,
2758 scope_clause: &str,
2759 scope_params: Vec<SqlValue>,
2760 ) -> Result<Vec<FtsDirtyRecord>> {
2761 let conn = self
2762 .db
2763 .lock()
2764 .map_err(|_| CoreError::poisoned("database"))?;
2765
2766 let sql = format!(
2767 "SELECT d.id, d.path, d.title, d.title_source, d.hash, c.path, s.name
2768 FROM documents d
2769 JOIN collections c ON c.id = d.collection_id
2770 JOIN spaces s ON s.id = c.space_id
2771 WHERE d.fts_dirty = 1{scope_clause}
2772 ORDER BY d.id ASC"
2773 );
2774 let mut stmt = conn.prepare(&sql)?;
2775 let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2776 Ok((
2777 row.get::<_, i64>(0)?,
2778 row.get::<_, String>(1)?,
2779 row.get::<_, String>(2)?,
2780 row.get::<_, String>(3)?,
2781 row.get::<_, String>(4)?,
2782 row.get::<_, String>(5)?,
2783 row.get::<_, String>(6)?,
2784 ))
2785 })?;
2786 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2787 drop(stmt);
2788
2789 let mut records = Vec::with_capacity(headers.len());
2790 for (
2791 doc_id,
2792 doc_path,
2793 doc_title,
2794 doc_title_source,
2795 doc_hash,
2796 collection_path,
2797 space_name,
2798 ) in headers
2799 {
2800 let chunks = load_chunks_for_doc(&conn, doc_id)?;
2801 records.push(FtsDirtyRecord {
2802 doc_id,
2803 doc_path,
2804 doc_title,
2805 doc_title_source: DocumentTitleSource::from_sql(&doc_title_source)?,
2806 doc_hash,
2807 collection_path: PathBuf::from(collection_path),
2808 space_name,
2809 chunks,
2810 });
2811 }
2812
2813 Ok(records)
2814 }
2815
2816 pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2817 if doc_ids.is_empty() {
2818 return Ok(());
2819 }
2820
2821 let conn = self
2822 .db
2823 .lock()
2824 .map_err(|_| CoreError::poisoned("database"))?;
2825
2826 let placeholders = vec!["?"; doc_ids.len()].join(", ");
2827 let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2828 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2829 Ok(())
2830 }
2831
2832 pub fn count_documents_in_collection(
2833 &self,
2834 collection_id: i64,
2835 active_only: bool,
2836 ) -> Result<usize> {
2837 let conn = self
2838 .db
2839 .lock()
2840 .map_err(|_| CoreError::poisoned("database"))?;
2841 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2842 let active_only = i64::from(active_only);
2843 query_count(
2844 &conn,
2845 "SELECT COUNT(*)
2846 FROM documents
2847 WHERE collection_id = ?1
2848 AND (?2 = 0 OR active = 1)",
2849 params![collection_id, active_only],
2850 )
2851 }
2852
2853 pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2854 let conn = self
2855 .db
2856 .lock()
2857 .map_err(|_| CoreError::poisoned("database"))?;
2858 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2859
2860 query_count(
2861 &conn,
2862 "SELECT COUNT(*)
2863 FROM chunks c
2864 JOIN documents d ON d.id = c.doc_id
2865 WHERE d.collection_id = ?1",
2866 params![collection_id],
2867 )
2868 }
2869
2870 pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2871 let conn = self
2872 .db
2873 .lock()
2874 .map_err(|_| CoreError::poisoned("database"))?;
2875 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2876
2877 query_count(
2878 &conn,
2879 "SELECT COUNT(DISTINCT e.chunk_id)
2880 FROM embeddings e
2881 JOIN chunks c ON c.id = e.chunk_id
2882 JOIN documents d ON d.id = c.doc_id
2883 WHERE d.collection_id = ?1",
2884 params![collection_id],
2885 )
2886 }
2887
2888 pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2889 let conn = self
2890 .db
2891 .lock()
2892 .map_err(|_| CoreError::poisoned("database"))?;
2893
2894 match space_id {
2895 Some(space_id) => {
2896 let _space_name = lookup_space_name(&conn, space_id)?;
2897 query_count(
2898 &conn,
2899 "SELECT COUNT(*)
2900 FROM documents d
2901 JOIN collections c ON c.id = d.collection_id
2902 WHERE c.space_id = ?1",
2903 params![space_id],
2904 )
2905 }
2906 None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2907 }
2908 }
2909
2910 pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2911 let conn = self
2912 .db
2913 .lock()
2914 .map_err(|_| CoreError::poisoned("database"))?;
2915
2916 match space_id {
2917 Some(space_id) => {
2918 let _space_name = lookup_space_name(&conn, space_id)?;
2919 query_count(
2920 &conn,
2921 "SELECT COUNT(*)
2922 FROM chunks c
2923 JOIN documents d ON d.id = c.doc_id
2924 JOIN collections col ON col.id = d.collection_id
2925 WHERE col.space_id = ?1",
2926 params![space_id],
2927 )
2928 }
2929 None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2930 }
2931 }
2932
2933 pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2934 let conn = self
2935 .db
2936 .lock()
2937 .map_err(|_| CoreError::poisoned("database"))?;
2938
2939 match space_id {
2940 Some(space_id) => {
2941 let _space_name = lookup_space_name(&conn, space_id)?;
2942 query_count(
2943 &conn,
2944 "SELECT COUNT(DISTINCT e.chunk_id)
2945 FROM embeddings e
2946 JOIN chunks c ON c.id = e.chunk_id
2947 JOIN documents d ON d.id = c.doc_id
2948 JOIN collections col ON col.id = d.collection_id
2949 WHERE col.space_id = ?1",
2950 params![space_id],
2951 )
2952 }
2953 None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2954 }
2955 }
2956
2957 pub fn disk_usage(&self) -> Result<DiskUsage> {
2958 let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2959
2960 let mut tantivy_bytes = 0_u64;
2961 let mut usearch_bytes = 0_u64;
2962 let spaces_dir = self.cache_dir.join(SPACES_DIR);
2963 if spaces_dir.exists() {
2964 for entry in std::fs::read_dir(&spaces_dir)? {
2965 let space_dir = entry?.path();
2966 if !space_dir.is_dir() {
2967 continue;
2968 }
2969
2970 tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
2971 usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
2972 }
2973 }
2974
2975 let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
2976 let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
2977
2978 Ok(DiskUsage {
2979 sqlite_bytes,
2980 tantivy_bytes,
2981 usearch_bytes,
2982 models_bytes,
2983 total_bytes,
2984 })
2985 }
2986
2987 fn unload_space(&self, name: &str) -> Result<()> {
2988 let mut spaces = self
2989 .spaces
2990 .write()
2991 .map_err(|_| CoreError::poisoned("spaces"))?;
2992 spaces.remove(name);
2993 Ok(())
2994 }
2995
2996 fn remove_space_artifacts(&self, name: &str) -> Result<()> {
2997 let space_root = self.space_root_path(name);
2998 if space_root.exists() {
2999 std::fs::remove_dir_all(space_root)?;
3000 }
3001 Ok(())
3002 }
3003
3004 fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
3005 let old_root = self.space_root_path(old);
3006 let new_root = self.space_root_path(new);
3007 if !old_root.exists() {
3008 return Ok(());
3009 }
3010
3011 if new_root.exists() {
3012 return Err(CoreError::Internal(format!(
3013 "cannot rename space artifacts: destination already exists: {}",
3014 new_root.display()
3015 )));
3016 }
3017
3018 if let Some(parent) = new_root.parent() {
3019 std::fs::create_dir_all(parent)?;
3020 }
3021 std::fs::rename(old_root, new_root)?;
3022 Ok(())
3023 }
3024
3025 fn space_root_path(&self, name: &str) -> PathBuf {
3026 self.cache_dir.join(SPACES_DIR).join(name)
3027 }
3028
3029 fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3030 let space_root = self.space_root_path(name);
3031 let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3032 let usearch_path = space_root.join(USEARCH_FILENAME);
3033 (tantivy_dir, usearch_path)
3034 }
3035
3036 fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3037 self.open_space(name)?;
3038 let spaces = self
3039 .spaces
3040 .read()
3041 .map_err(|_| CoreError::poisoned("spaces"))?;
3042 spaces.get(name).cloned().ok_or_else(|| {
3043 KboltError::SpaceNotFound {
3044 name: name.to_string(),
3045 }
3046 .into()
3047 })
3048 }
3049}
3050
3051fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3052 let result = conn.query_row(
3053 "SELECT name FROM spaces WHERE id = ?1",
3054 params![space_id],
3055 |row| row.get::<_, String>(0),
3056 );
3057 match result {
3058 Ok(name) => Ok(name),
3059 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3060 name: format!("id={space_id}"),
3061 }
3062 .into()),
3063 Err(err) => Err(err.into()),
3064 }
3065}
3066
3067fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3068 let result = conn.query_row(
3069 "SELECT name FROM collections WHERE id = ?1",
3070 params![collection_id],
3071 |row| row.get::<_, String>(0),
3072 );
3073 match result {
3074 Ok(name) => Ok(name),
3075 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3076 name: format!("id={collection_id}"),
3077 }
3078 .into()),
3079 Err(err) => Err(err.into()),
3080 }
3081}
3082
3083fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3084 let result = conn.query_row(
3085 "SELECT id FROM documents WHERE id = ?1",
3086 params![doc_id],
3087 |row| row.get::<_, i64>(0),
3088 );
3089 match result {
3090 Ok(id) => Ok(id),
3091 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3092 path: format!("id={doc_id}"),
3093 }
3094 .into()),
3095 Err(err) => Err(err.into()),
3096 }
3097}
3098
3099fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3100 let mut stmt = conn.prepare(
3101 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3102 FROM chunks
3103 WHERE doc_id = ?1
3104 ORDER BY seq ASC",
3105 )?;
3106 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3107 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3108 Ok(chunks)
3109}
3110
3111fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3112 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3113 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3114 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3115 Ok(chunk_ids)
3116}
3117
3118fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3119 let result = conn.query_row(
3120 "SELECT doc_id FROM chunks WHERE id = ?1",
3121 params![chunk_id],
3122 |row| row.get::<_, i64>(0),
3123 );
3124 match result {
3125 Ok(doc_id) => Ok(doc_id),
3126 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3127 path: format!("chunk_id={chunk_id}"),
3128 }
3129 .into()),
3130 Err(err) => Err(err.into()),
3131 }
3132}
3133
3134fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3135 let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3136 Ok(count as usize)
3137}
3138
3139fn file_size_or_zero(path: &Path) -> Result<u64> {
3140 if !path.exists() {
3141 return Ok(0);
3142 }
3143
3144 let metadata = std::fs::metadata(path)?;
3145 if metadata.is_file() {
3146 Ok(metadata.len())
3147 } else {
3148 Ok(0)
3149 }
3150}
3151
3152fn dir_size_or_zero(path: &Path) -> Result<u64> {
3153 if !path.exists() {
3154 return Ok(0);
3155 }
3156
3157 let mut total = 0_u64;
3158 for entry in std::fs::read_dir(path)? {
3159 let entry = entry?;
3160 let child_path = entry.path();
3161 let metadata = std::fs::symlink_metadata(&child_path)?;
3162 if metadata.is_file() {
3163 total += metadata.len();
3164 } else if metadata.is_dir() {
3165 total += dir_size_or_zero(&child_path)?;
3166 }
3167 }
3168
3169 Ok(total)
3170}
3171
3172fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3173 let meta_path = path.join("meta.json");
3174 if meta_path.exists() {
3175 return Ok(Index::open_in_dir(path)?);
3176 }
3177
3178 Ok(Index::create_in_dir(path, tantivy_schema())?)
3179}
3180
3181fn with_tantivy_writer<T>(
3182 space_indexes: &SpaceIndexes,
3183 f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3184) -> Result<T> {
3185 let mut writer = space_indexes
3186 .tantivy_writer
3187 .lock()
3188 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3189
3190 if writer.is_none() {
3191 *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3192 }
3193
3194 let writer = writer
3195 .as_mut()
3196 .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3197 f(writer)
3198}
3199
3200fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3201 if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3202 return Ok(());
3203 }
3204
3205 let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3206 if document_count == 0 {
3207 return Ok(());
3208 }
3209
3210 Err(KboltError::Config(
3211 "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3212 )
3213 .into())
3214}
3215
3216fn ensure_schema_version(conn: &Connection) -> Result<()> {
3217 let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3218 if current > SCHEMA_VERSION {
3219 return Err(KboltError::Config(format!(
3220 "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3221 ))
3222 .into());
3223 }
3224
3225 if current < SCHEMA_VERSION {
3226 conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3227 }
3228
3229 Ok(())
3230}
3231
3232fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3233 let exists = conn.query_row(
3234 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3235 [table],
3236 |row| row.get::<_, i64>(0),
3237 )?;
3238 Ok(exists != 0)
3239}
3240
3241fn ensure_documents_title_source_column(conn: &Connection) -> Result<()> {
3242 let mut stmt = conn.prepare("PRAGMA table_info(documents)")?;
3243 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3244 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3245 drop(stmt);
3246
3247 if columns.iter().any(|column| column == "title_source") {
3248 return Ok(());
3249 }
3250
3251 conn.execute(
3252 "ALTER TABLE documents ADD COLUMN title_source TEXT NOT NULL DEFAULT 'extracted'",
3253 [],
3254 )?;
3255 Ok(())
3256}
3257
3258fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3259 let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3260 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3261 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3262 drop(stmt);
3263
3264 if columns.iter().any(|column| column == "generation_key") {
3265 return Ok(());
3266 }
3267
3268 conn.execute(
3269 "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3270 [],
3271 )?;
3272 Ok(())
3273}
3274
3275fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3276 let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3277 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3278 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3279 drop(stmt);
3280
3281 if columns.iter().any(|column| column == "retrieval_prefix") {
3282 return Ok(());
3283 }
3284
3285 conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3286 Ok(())
3287}
3288
3289fn build_literal_bm25_query(
3290 index: &Index,
3291 fields: &[Bm25FieldSpec],
3292 query: &str,
3293) -> Result<Option<Box<dyn Query>>> {
3294 let mut clauses = Vec::new();
3295 for field in fields {
3296 for token in analyzed_terms_for_field(index, field.field, query)? {
3297 let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3298 Term::from_field_text(field.field, &token),
3299 field.index_record_option,
3300 ));
3301 let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3302 Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3303 } else {
3304 term_query
3305 };
3306 clauses.push((Occur::Should, query));
3307 }
3308 }
3309
3310 if clauses.is_empty() {
3311 Ok(None)
3312 } else {
3313 Ok(Some(Box::new(BooleanQuery::new(clauses))))
3314 }
3315}
3316
3317fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3318 let mut terms = Vec::new();
3319 let mut seen = HashSet::new();
3320 for doc_id in document_ids {
3321 let doc_id = u64::try_from(*doc_id).map_err(|_| {
3322 CoreError::Internal(format!(
3323 "doc_id must be non-negative for tantivy query: {doc_id}"
3324 ))
3325 })?;
3326 if seen.insert(doc_id) {
3327 terms.push(Term::from_field_u64(field, doc_id));
3328 }
3329 }
3330
3331 Ok(Box::new(ConstScoreQuery::new(
3332 Box::new(TermSetQuery::new(terms)),
3333 0.0,
3334 )))
3335}
3336
3337fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3338 let mut analyzer = index.tokenizer_for_field(field)?;
3339 let mut stream = analyzer.token_stream(query);
3340 let mut terms = Vec::new();
3341 let mut seen = HashSet::new();
3342 while let Some(token) = stream.next() {
3343 if token.text.is_empty() {
3344 continue;
3345 }
3346 let text = token.text.clone();
3347 if seen.insert(text.clone()) {
3348 terms.push(text);
3349 }
3350 }
3351 Ok(terms)
3352}
3353
3354fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3355 let options = IndexOptions {
3356 dimensions,
3357 metric: MetricKind::Cos,
3358 quantization: ScalarKind::F32,
3359 connectivity: 16,
3360 expansion_add: 200,
3361 expansion_search: 100,
3362 ..IndexOptions::default()
3363 };
3364 usearch::Index::new(&options)
3365 .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3366}
3367
3368fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3369 let index = new_usearch_index(256)?;
3370 let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3371 if file_size > 0 {
3372 let path = path
3373 .to_str()
3374 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3375 index
3376 .load(path)
3377 .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3378 }
3379 Ok(index)
3380}
3381
3382fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3383 if index.size() == 0 && index.dimensions() != expected_dimensions {
3384 *index = new_usearch_index(expected_dimensions)?;
3385 return Ok(());
3386 }
3387
3388 if index.dimensions() != expected_dimensions {
3389 return Err(CoreError::Internal(format!(
3390 "usearch vector dimension mismatch: index expects {}, got {}",
3391 index.dimensions(),
3392 expected_dimensions
3393 )));
3394 }
3395 Ok(())
3396}
3397
3398fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3399 if index.size() == 0 {
3400 std::fs::File::create(path)?;
3401 return Ok(());
3402 }
3403
3404 let path = path
3405 .to_str()
3406 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3407 index
3408 .save(path)
3409 .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3410 Ok(())
3411}
3412
3413fn tantivy_schema() -> tantivy::schema::Schema {
3414 let mut builder = tantivy::schema::Schema::builder();
3415 builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3416 builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3417 builder.add_text_field("filepath", TEXT | STORED);
3418 builder.add_text_field("title", TEXT | STORED);
3419 builder.add_text_field("heading", TEXT | STORED);
3420 builder.add_text_field("body", TEXT);
3421 builder.build()
3422}
3423
3424fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3425 Ok(TantivyFields {
3426 chunk_id: schema.get_field("chunk_id").map_err(|_| {
3427 CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3428 })?,
3429 doc_id: schema
3430 .get_field("doc_id")
3431 .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3432 filepath: schema.get_field("filepath").map_err(|_| {
3433 CoreError::Internal("tantivy schema missing field: filepath".to_string())
3434 })?,
3435 title: schema
3436 .get_field("title")
3437 .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3438 heading: schema.get_field("heading").map_err(|_| {
3439 CoreError::Internal("tantivy schema missing field: heading".to_string())
3440 })?,
3441 body: schema
3442 .get_field("body")
3443 .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3444 })
3445}
3446
3447fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3448 match name {
3449 "chunk_id" => Ok(fields.chunk_id),
3450 "doc_id" => Ok(fields.doc_id),
3451 "filepath" => Ok(fields.filepath),
3452 "title" => Ok(fields.title),
3453 "heading" => Ok(fields.heading),
3454 "body" => Ok(fields.body),
3455 other => Err(CoreError::Internal(format!(
3456 "unsupported tantivy field: {other}"
3457 ))),
3458 }
3459}
3460
3461fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3462 match extensions {
3463 None => Ok(None),
3464 Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3465 }
3466}
3467
3468fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3469 match raw {
3470 None => Ok(None),
3471 Some(json) => serde_json::from_str::<Vec<String>>(&json)
3472 .map(Some)
3473 .map_err(Into::into),
3474 }
3475}
3476
3477fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3478 Ok(SpaceRow {
3479 id: row.get(0)?,
3480 name: row.get(1)?,
3481 description: row.get(2)?,
3482 created: row.get(3)?,
3483 })
3484}
3485
3486fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3487 let raw_extensions: Option<String> = row.get(5)?;
3488 let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3489 Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3490 })?;
3491 Ok(CollectionRow {
3492 id: row.get(0)?,
3493 space_id: row.get(1)?,
3494 name: row.get(2)?,
3495 path: PathBuf::from(row.get::<_, String>(3)?),
3496 description: row.get(4)?,
3497 extensions,
3498 created: row.get(6)?,
3499 updated: row.get(7)?,
3500 })
3501}
3502
3503fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3504 let raw_title_source: String = row.get(4)?;
3505 let title_source = DocumentTitleSource::from_sql(&raw_title_source).map_err(|err| {
3506 Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(err))
3507 })?;
3508 let active_value: i64 = row.get(7)?;
3509 let fts_dirty_value: i64 = row.get(9)?;
3510 Ok(DocumentRow {
3511 id: row.get(0)?,
3512 collection_id: row.get(1)?,
3513 path: row.get(2)?,
3514 title: row.get(3)?,
3515 title_source,
3516 hash: row.get(5)?,
3517 modified: row.get(6)?,
3518 active: active_value != 0,
3519 deactivated_at: row.get(8)?,
3520 fts_dirty: fts_dirty_value != 0,
3521 })
3522}
3523
3524fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3525 Ok(DocumentTextRow {
3526 doc_id: row.get(0)?,
3527 extractor_key: row.get(1)?,
3528 source_hash: row.get(2)?,
3529 text_hash: row.get(3)?,
3530 generation_key: row.get(4)?,
3531 text: row.get(5)?,
3532 created: row.get(6)?,
3533 })
3534}
3535
3536fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3537 let offset_value: i64 = row.get(3)?;
3538 let length_value: i64 = row.get(4)?;
3539 let kind_raw: String = row.get(6)?;
3540 let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3541 Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(err))
3542 })?;
3543 Ok(ChunkRow {
3544 id: row.get(0)?,
3545 doc_id: row.get(1)?,
3546 seq: row.get(2)?,
3547 offset: decode_non_negative_usize(offset_value, 3, "chunks.offset")?,
3548 length: decode_non_negative_usize(length_value, 4, "chunks.length")?,
3549 heading: row.get(5)?,
3550 kind,
3551 retrieval_prefix: row.get(7)?,
3552 })
3553}
3554
3555fn decode_non_negative_usize(
3556 value: i64,
3557 column: usize,
3558 name: &'static str,
3559) -> rusqlite::Result<usize> {
3560 if value < 0 {
3561 return Err(Error::FromSqlConversionFailure(
3562 column,
3563 SqlType::Integer,
3564 Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3565 ));
3566 }
3567
3568 Ok(value as usize)
3569}
3570
3571fn canonical_chunk_slice_from_bytes(
3572 chunk_id: i64,
3573 offset_value: i64,
3574 length_value: i64,
3575 document_len_value: i64,
3576 start_byte: Vec<u8>,
3577 end_byte: Vec<u8>,
3578 bytes: Vec<u8>,
3579) -> Result<String> {
3580 if offset_value < 0 {
3581 return Err(CoreError::Internal(format!(
3582 "chunk {chunk_id} offset must not be negative"
3583 )));
3584 }
3585 if length_value < 0 {
3586 return Err(CoreError::Internal(format!(
3587 "chunk {chunk_id} length must not be negative"
3588 )));
3589 }
3590 if document_len_value < 0 {
3591 return Err(CoreError::Internal(format!(
3592 "document text length for chunk {chunk_id} must not be negative"
3593 )));
3594 }
3595
3596 let offset = offset_value as usize;
3597 let length = length_value as usize;
3598 let document_len = document_len_value as usize;
3599 let end = offset.checked_add(length).ok_or_else(|| {
3600 CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3601 })?;
3602 if end > document_len {
3603 return Err(CoreError::Internal(format!(
3604 "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3605 )));
3606 }
3607 if bytes.len() != length {
3608 return Err(CoreError::Internal(format!(
3609 "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3610 bytes.len()
3611 )));
3612 }
3613 validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3614 validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3615
3616 String::from_utf8(bytes).map_err(|err| {
3617 CoreError::Internal(format!(
3618 "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3619 ))
3620 })
3621}
3622
3623fn validate_sqlite_utf8_boundary(
3624 chunk_id: i64,
3625 index: usize,
3626 document_len: usize,
3627 byte_at_index: &[u8],
3628) -> Result<()> {
3629 if index == 0 || index == document_len {
3630 return Ok(());
3631 }
3632 if byte_at_index.len() != 1 {
3633 return Err(CoreError::Internal(format!(
3634 "chunk {chunk_id} text boundary byte at {index} is missing"
3635 )));
3636 }
3637 if (0x80..0xC0).contains(&byte_at_index[0]) {
3638 return Err(CoreError::Internal(format!(
3639 "chunk {chunk_id} text span is not on UTF-8 boundaries"
3640 )));
3641 }
3642
3643 Ok(())
3644}
3645
3646pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3647 let label = format!("chunk {}", chunk.id);
3648 let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3649
3650 Ok(document_text[chunk.offset..end].to_string())
3651}
3652
3653fn validate_text_span(
3654 document_text: &str,
3655 offset: usize,
3656 length: usize,
3657 label: &str,
3658) -> Result<usize> {
3659 let end = offset
3660 .checked_add(length)
3661 .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3662 if end > document_text.len() {
3663 return Err(CoreError::Internal(format!(
3664 "{label} text span {}..{} exceeds document text length {}",
3665 offset,
3666 end,
3667 document_text.len()
3668 )));
3669 }
3670 if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3671 return Err(CoreError::Internal(format!(
3672 "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3673 )));
3674 }
3675
3676 Ok(end)
3677}
3678
3679pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3680 KboltError::Internal(format!(
3681 "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3682 ))
3683 .into()
3684}
3685
3686#[cfg(test)]
3687mod tests;