1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, RwLock};
4
5use crate::error::{CoreError, Result};
6use crate::ingest::chunk::FinalChunkKind;
7use kbolt_types::{DiskUsage, KboltError};
8use rusqlite::types::{Type as SqlType, Value as SqlValue};
9use rusqlite::{params, params_from_iter, Connection, Error, ErrorCode};
10use tantivy::collector::TopDocs;
11use tantivy::query::{
12 BooleanQuery, BoostQuery, ConstScoreQuery, Occur, Query, TermQuery, TermSetQuery,
13};
14use tantivy::schema::{Field, IndexRecordOption, FAST, INDEXED, STORED, TEXT};
15use tantivy::tokenizer::TokenStream;
16use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
17use usearch::{IndexOptions, MetricKind, ScalarKind};
18
19const DB_FILE: &str = "meta.sqlite";
20const DEFAULT_SPACE_NAME: &str = "default";
21const SPACES_DIR: &str = "spaces";
22const TANTIVY_DIR_NAME: &str = "tantivy";
23const USEARCH_FILENAME: &str = "vectors.usearch";
24const SCHEMA_VERSION: i64 = 3;
25const SQLITE_IN_CLAUSE_BATCH_SIZE: usize = 500;
26
27#[derive(Clone, Copy)]
28enum UsearchSaveMode {
29 Immediate,
30 Deferred,
31}
32
33pub struct Storage {
34 db: Mutex<Connection>,
35 cache_dir: PathBuf,
36 spaces: RwLock<HashMap<String, Arc<SpaceIndexes>>>,
37 dirty_usearch_spaces: Mutex<HashSet<String>>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct SpaceRow {
42 pub id: i64,
43 pub name: String,
44 pub description: Option<String>,
45 pub created: String,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct CollectionRow {
50 pub id: i64,
51 pub space_id: i64,
52 pub name: String,
53 pub path: PathBuf,
54 pub description: Option<String>,
55 pub extensions: Option<Vec<String>>,
56 pub created: String,
57 pub updated: String,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DocumentRow {
62 pub id: i64,
63 pub collection_id: i64,
64 pub path: String,
65 pub title: String,
66 pub title_source: DocumentTitleSource,
67 pub hash: String,
68 pub modified: String,
69 pub active: bool,
70 pub deactivated_at: Option<String>,
71 pub fts_dirty: bool,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct FileListRow {
76 pub doc_id: i64,
77 pub path: String,
78 pub title: String,
79 pub hash: String,
80 pub active: bool,
81 pub chunk_count: usize,
82 pub embedded_chunk_count: usize,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChunkRow {
87 pub id: i64,
88 pub doc_id: i64,
89 pub seq: i32,
90 pub offset: usize,
91 pub length: usize,
92 pub heading: Option<String>,
93 pub kind: FinalChunkKind,
94 pub retrieval_prefix: Option<String>,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ChunkInsert {
99 pub seq: i32,
100 pub offset: usize,
101 pub length: usize,
102 pub heading: Option<String>,
103 pub kind: FinalChunkKind,
104 pub retrieval_prefix: Option<String>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DocumentTextRow {
109 pub doc_id: i64,
110 pub extractor_key: String,
111 pub source_hash: String,
112 pub text_hash: String,
113 pub generation_key: String,
114 pub text: String,
115 pub created: String,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct ChunkTextRow {
120 pub chunk: ChunkRow,
121 pub extractor_key: String,
122 pub text: String,
123}
124
125pub struct DocumentGenerationReplace<'a> {
126 pub collection_id: i64,
127 pub path: &'a str,
128 pub title: &'a str,
129 pub title_source: DocumentTitleSource,
130 pub hash: &'a str,
131 pub modified: &'a str,
132 pub extractor_key: &'a str,
133 pub source_hash: &'a str,
134 pub text_hash: &'a str,
135 pub generation_key: &'a str,
136 pub text: &'a str,
137 pub chunks: &'a [ChunkInsert],
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct DocumentGenerationReplaceResult {
142 pub doc_id: i64,
143 pub old_chunk_ids: Vec<i64>,
144 pub chunk_ids: Vec<i64>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct EmbedRecord {
149 pub chunk: ChunkRow,
150 pub doc_path: String,
151 pub collection_path: PathBuf,
152 pub space_name: String,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FtsDirtyRecord {
157 pub doc_id: i64,
158 pub doc_path: String,
159 pub doc_title: String,
160 pub doc_title_source: DocumentTitleSource,
161 pub doc_hash: String,
162 pub collection_path: PathBuf,
163 pub space_name: String,
164 pub chunks: Vec<ChunkRow>,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
168pub struct ReapableDocument {
169 pub doc_id: i64,
170 pub space_name: String,
171 pub chunk_ids: Vec<i64>,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TantivyEntry {
176 pub chunk_id: i64,
177 pub doc_id: i64,
178 pub filepath: String,
179 pub semantic_title: Option<String>,
180 pub heading: Option<String>,
181 pub body: String,
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum DocumentTitleSource {
186 Extracted,
187 FilenameFallback,
188}
189
190impl DocumentTitleSource {
191 pub fn as_sql(self) -> &'static str {
192 match self {
193 Self::Extracted => "extracted",
194 Self::FilenameFallback => "filename_fallback",
195 }
196 }
197
198 fn from_sql(raw: &str) -> std::result::Result<Self, KboltError> {
199 match raw {
200 "extracted" => Ok(Self::Extracted),
201 "filename_fallback" => Ok(Self::FilenameFallback),
202 other => Err(KboltError::InvalidInput(format!(
203 "invalid stored document title source: {other}"
204 ))),
205 }
206 }
207
208 pub fn semantic_title(self, title: &str) -> Option<&str> {
209 matches!(self, Self::Extracted)
210 .then_some(title.trim())
211 .filter(|title| !title.is_empty())
212 }
213}
214
215#[derive(Debug, Clone, PartialEq)]
216pub struct BM25Hit {
217 pub chunk_id: i64,
218 pub score: f32,
219}
220
221#[derive(Debug, Clone, PartialEq)]
222pub struct DenseHit {
223 pub chunk_id: i64,
224 pub distance: f32,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub struct SearchScopeSummary {
229 pub document_ids: Vec<i64>,
230 pub chunk_count: usize,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub enum SpaceResolution {
235 Found(SpaceRow),
236 Ambiguous(Vec<String>),
237 NotFound,
238}
239
240struct SpaceIndexes {
241 _tantivy_dir: PathBuf,
242 usearch_path: PathBuf,
243 tantivy_index: Index,
244 tantivy_reader: IndexReader,
245 tantivy_writer: Mutex<Option<IndexWriter>>,
246 usearch_index: RwLock<usearch::Index>,
247 fields: TantivyFields,
248}
249
250#[derive(Debug, Clone, Copy)]
251struct TantivyFields {
252 chunk_id: Field,
253 doc_id: Field,
254 filepath: Field,
255 title: Field,
256 heading: Field,
257 body: Field,
258}
259
260#[derive(Debug, Clone, Copy)]
261struct Bm25FieldSpec {
262 field: Field,
263 boost: f32,
264 index_record_option: IndexRecordOption,
265}
266
267impl Storage {
268 pub fn new(cache_dir: &Path) -> Result<Self> {
269 std::fs::create_dir_all(cache_dir)?;
270 let db_path = cache_dir.join(DB_FILE);
271 let conn = Connection::open(db_path)?;
272 reject_incompatible_legacy_index(&conn)?;
273 conn.execute_batch(
274 r#"
275PRAGMA foreign_keys = ON;
276PRAGMA journal_mode = WAL;
277
278CREATE TABLE IF NOT EXISTS spaces (
279 id INTEGER PRIMARY KEY,
280 name TEXT NOT NULL UNIQUE,
281 description TEXT,
282 created TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
283);
284
285CREATE TABLE IF NOT EXISTS collections (
286 id INTEGER PRIMARY KEY,
287 space_id INTEGER NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
288 name TEXT NOT NULL,
289 path TEXT NOT NULL,
290 description TEXT,
291 extensions TEXT,
292 created TEXT NOT NULL,
293 updated TEXT NOT NULL,
294 UNIQUE(space_id, name)
295);
296
297CREATE TABLE IF NOT EXISTS documents (
298 id INTEGER PRIMARY KEY,
299 collection_id INTEGER NOT NULL REFERENCES collections(id) ON DELETE CASCADE,
300 path TEXT NOT NULL,
301 title TEXT NOT NULL,
302 title_source TEXT NOT NULL DEFAULT 'extracted',
303 hash TEXT NOT NULL,
304 modified TEXT NOT NULL,
305 active INTEGER NOT NULL DEFAULT 1,
306 deactivated_at TEXT,
307 fts_dirty INTEGER NOT NULL DEFAULT 0,
308 UNIQUE(collection_id, path)
309);
310CREATE INDEX IF NOT EXISTS idx_documents_collection ON documents(collection_id, active);
311CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(hash);
312CREATE INDEX IF NOT EXISTS idx_documents_fts_dirty ON documents(fts_dirty) WHERE fts_dirty = 1;
313
314CREATE TABLE IF NOT EXISTS chunks (
315 id INTEGER PRIMARY KEY,
316 doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
317 seq INTEGER NOT NULL,
318 offset INTEGER NOT NULL,
319 length INTEGER NOT NULL,
320 heading TEXT,
321 kind TEXT NOT NULL DEFAULT 'section',
322 retrieval_prefix TEXT,
323 UNIQUE(doc_id, seq)
324);
325CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id);
326
327CREATE TABLE IF NOT EXISTS embeddings (
328 chunk_id INTEGER NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
329 model TEXT NOT NULL,
330 embedded_at TEXT NOT NULL,
331 PRIMARY KEY (chunk_id, model)
332);
333
334CREATE TABLE IF NOT EXISTS document_texts (
335 doc_id INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE,
336 extractor_key TEXT NOT NULL,
337 source_hash TEXT NOT NULL,
338 text_hash TEXT NOT NULL,
339 generation_key TEXT NOT NULL DEFAULT '',
340 text TEXT NOT NULL,
341 created TEXT NOT NULL
342);
343"#,
344 )?;
345 ensure_documents_title_source_column(&conn)?;
346 ensure_document_texts_generation_key_column(&conn)?;
347 ensure_chunks_retrieval_prefix_column(&conn)?;
348 ensure_schema_version(&conn)?;
349
350 conn.execute(
351 "INSERT OR IGNORE INTO spaces (name, description) VALUES (?1, NULL)",
352 params![DEFAULT_SPACE_NAME],
353 )?;
354
355 let storage = Self {
356 db: Mutex::new(conn),
357 cache_dir: cache_dir.to_path_buf(),
358 spaces: RwLock::new(HashMap::new()),
359 dirty_usearch_spaces: Mutex::new(HashSet::new()),
360 };
361 storage.open_space(DEFAULT_SPACE_NAME)?;
362
363 Ok(storage)
364 }
365
366 pub fn open_space(&self, name: &str) -> Result<()> {
367 let _space = self.get_space(name)?;
368
369 {
370 let spaces = self
371 .spaces
372 .read()
373 .map_err(|_| CoreError::poisoned("spaces"))?;
374 if spaces.contains_key(name) {
375 return Ok(());
376 }
377 }
378
379 let (tantivy_dir, usearch_path) = self.space_paths(name);
380 std::fs::create_dir_all(&tantivy_dir)?;
381 std::fs::OpenOptions::new()
382 .create(true)
383 .append(true)
384 .open(&usearch_path)?;
385
386 let mut spaces = self
387 .spaces
388 .write()
389 .map_err(|_| CoreError::poisoned("spaces"))?;
390 if spaces.contains_key(name) {
391 return Ok(());
392 }
393
394 let tantivy_index = open_or_create_tantivy_index(&tantivy_dir)?;
395 let tantivy_reader = tantivy_index.reader()?;
396 let usearch_index = open_or_create_usearch_index(&usearch_path)?;
397 let fields = tantivy_fields_from_schema(&tantivy_index.schema())?;
398
399 spaces.insert(
400 name.to_string(),
401 Arc::new(SpaceIndexes {
402 _tantivy_dir: tantivy_dir,
403 usearch_path,
404 tantivy_index,
405 tantivy_reader,
406 tantivy_writer: Mutex::new(None),
407 usearch_index: RwLock::new(usearch_index),
408 fields,
409 }),
410 );
411
412 Ok(())
413 }
414
415 pub fn close_space(&self, name: &str) -> Result<()> {
416 let _space = self.get_space(name)?;
417 let mut spaces = self
418 .spaces
419 .write()
420 .map_err(|_| CoreError::poisoned("spaces"))?;
421 let _removed = spaces.remove(name);
422 Ok(())
423 }
424
425 pub fn create_space(&self, name: &str, description: Option<&str>) -> Result<i64> {
426 let space_id = {
427 let conn = self
428 .db
429 .lock()
430 .map_err(|_| CoreError::poisoned("database"))?;
431
432 let result = conn.execute(
433 "INSERT INTO spaces (name, description) VALUES (?1, ?2)",
434 params![name, description],
435 );
436
437 match result {
438 Ok(_) => conn.last_insert_rowid(),
439 Err(err) => {
440 return match err {
441 Error::SqliteFailure(sqlite_err, _)
442 if sqlite_err.code == ErrorCode::ConstraintViolation =>
443 {
444 Err(KboltError::SpaceAlreadyExists {
445 name: name.to_string(),
446 }
447 .into())
448 }
449 other => Err(other.into()),
450 };
451 }
452 }
453 };
454
455 if let Err(open_err) = self.open_space(name) {
456 let rollback_result = self
457 .db
458 .lock()
459 .map_err(|_| CoreError::poisoned("database"))?
460 .execute("DELETE FROM spaces WHERE id = ?1", params![space_id]);
461
462 if let Err(rollback_err) = rollback_result {
463 return Err(CoreError::Internal(format!(
464 "failed to provision indexes for space '{name}': {open_err}; rollback failed: {rollback_err}"
465 )));
466 }
467
468 return Err(open_err);
469 }
470
471 Ok(space_id)
472 }
473
474 pub fn get_space(&self, name: &str) -> Result<SpaceRow> {
475 let conn = self
476 .db
477 .lock()
478 .map_err(|_| CoreError::poisoned("database"))?;
479 let mut stmt =
480 conn.prepare("SELECT id, name, description, created FROM spaces WHERE name = ?1")?;
481
482 let row = stmt.query_row(params![name], decode_space_row);
483
484 match row {
485 Ok(space) => Ok(space),
486 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
487 name: name.to_string(),
488 }
489 .into()),
490 Err(err) => Err(err.into()),
491 }
492 }
493
494 pub fn get_space_by_id(&self, id: i64) -> Result<SpaceRow> {
495 let conn = self
496 .db
497 .lock()
498 .map_err(|_| CoreError::poisoned("database"))?;
499 let mut stmt =
500 conn.prepare("SELECT id, name, description, created FROM spaces WHERE id = ?1")?;
501
502 let row = stmt.query_row(params![id], decode_space_row);
503
504 match row {
505 Ok(space) => Ok(space),
506 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
507 name: format!("id={id}"),
508 }
509 .into()),
510 Err(err) => Err(err.into()),
511 }
512 }
513
514 pub fn list_spaces(&self) -> Result<Vec<SpaceRow>> {
515 let conn = self
516 .db
517 .lock()
518 .map_err(|_| CoreError::poisoned("database"))?;
519 let mut stmt = conn.prepare(
520 "SELECT id, name, description, created
521 FROM spaces
522 ORDER BY name ASC",
523 )?;
524
525 let rows = stmt.query_map([], decode_space_row)?;
526
527 let spaces = rows.collect::<std::result::Result<Vec<_>, _>>()?;
528 Ok(spaces)
529 }
530
531 pub fn find_space_for_collection(&self, collection: &str) -> Result<SpaceResolution> {
532 let conn = self
533 .db
534 .lock()
535 .map_err(|_| CoreError::poisoned("database"))?;
536 let mut stmt = conn.prepare(
537 "SELECT s.id, s.name, s.description, s.created
538 FROM spaces s
539 JOIN collections c ON c.space_id = s.id
540 WHERE c.name = ?1
541 ORDER BY s.name ASC",
542 )?;
543 let rows = stmt.query_map(params![collection], decode_space_row)?;
544 let matches = rows.collect::<std::result::Result<Vec<_>, _>>()?;
545
546 if matches.is_empty() {
547 return Ok(SpaceResolution::NotFound);
548 }
549
550 if matches.len() == 1 {
551 return Ok(SpaceResolution::Found(matches[0].clone()));
552 }
553
554 let spaces = matches.into_iter().map(|space| space.name).collect();
555 Ok(SpaceResolution::Ambiguous(spaces))
556 }
557
558 pub fn delete_space(&self, name: &str) -> Result<()> {
559 if name == DEFAULT_SPACE_NAME {
560 let conn = self
561 .db
562 .lock()
563 .map_err(|_| CoreError::poisoned("database"))?;
564 conn.execute(
565 "DELETE FROM collections
566 WHERE space_id = (SELECT id FROM spaces WHERE name = ?1)",
567 params![name],
568 )?;
569 drop(conn);
570
571 self.unload_space(name)?;
572 self.remove_space_artifacts(name)?;
573 self.open_space(name)?;
574 return Ok(());
575 }
576
577 let conn = self
578 .db
579 .lock()
580 .map_err(|_| CoreError::poisoned("database"))?;
581 let deleted = conn.execute("DELETE FROM spaces WHERE name = ?1", params![name])?;
582 drop(conn);
583
584 if deleted == 0 {
585 return Err(KboltError::SpaceNotFound {
586 name: name.to_string(),
587 }
588 .into());
589 }
590
591 self.unload_space(name)?;
592 self.remove_space_artifacts(name)?;
593
594 Ok(())
595 }
596
597 pub fn rename_space(&self, old: &str, new: &str) -> Result<()> {
598 if old == DEFAULT_SPACE_NAME {
599 return Err(
600 KboltError::Config("cannot rename reserved space: default".to_string()).into(),
601 );
602 }
603
604 let conn = self
605 .db
606 .lock()
607 .map_err(|_| CoreError::poisoned("database"))?;
608
609 let result = conn.execute(
610 "UPDATE spaces SET name = ?1 WHERE name = ?2",
611 params![new, old],
612 );
613 drop(conn);
614
615 match result {
616 Ok(0) => Err(KboltError::SpaceNotFound {
617 name: old.to_string(),
618 }
619 .into()),
620 Ok(_) => {
621 if let Err(rename_err) = self.rename_space_artifacts(old, new) {
622 let rollback = self
623 .db
624 .lock()
625 .map_err(|_| CoreError::poisoned("database"))?
626 .execute(
627 "UPDATE spaces SET name = ?1 WHERE name = ?2",
628 params![old, new],
629 );
630
631 if let Err(rollback_err) = rollback {
632 return Err(CoreError::Internal(format!(
633 "failed to rename space artifacts from '{old}' to '{new}': {rename_err}; rollback failed: {rollback_err}"
634 )));
635 }
636
637 return Err(rename_err);
638 }
639
640 self.unload_space(old)?;
641 if let Err(open_err) = self.open_space(new) {
642 let _ = self.rename_space_artifacts(new, old);
643 let _ = self
644 .db
645 .lock()
646 .map_err(|_| CoreError::poisoned("database"))?
647 .execute(
648 "UPDATE spaces SET name = ?1 WHERE name = ?2",
649 params![old, new],
650 );
651 let _ = self.open_space(old);
652 return Err(open_err);
653 }
654
655 Ok(())
656 }
657 Err(Error::SqliteFailure(sqlite_err, _))
658 if sqlite_err.code == ErrorCode::ConstraintViolation =>
659 {
660 Err(KboltError::SpaceAlreadyExists {
661 name: new.to_string(),
662 }
663 .into())
664 }
665 Err(err) => Err(err.into()),
666 }
667 }
668
669 pub fn update_space_description(&self, name: &str, description: &str) -> Result<()> {
670 let conn = self
671 .db
672 .lock()
673 .map_err(|_| CoreError::poisoned("database"))?;
674
675 let updated = conn.execute(
676 "UPDATE spaces SET description = ?1 WHERE name = ?2",
677 params![description, name],
678 )?;
679
680 if updated == 0 {
681 return Err(KboltError::SpaceNotFound {
682 name: name.to_string(),
683 }
684 .into());
685 }
686
687 Ok(())
688 }
689
690 pub fn create_collection(
691 &self,
692 space_id: i64,
693 name: &str,
694 path: &Path,
695 description: Option<&str>,
696 extensions: Option<&[String]>,
697 ) -> Result<i64> {
698 let conn = self
699 .db
700 .lock()
701 .map_err(|_| CoreError::poisoned("database"))?;
702
703 let space_name = lookup_space_name(&conn, space_id)?;
704 let extensions_json = serialize_extensions(extensions)?;
705 let result = conn.execute(
706 "INSERT INTO collections (space_id, name, path, description, extensions, created, updated)
707 VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%SZ','now'), strftime('%Y-%m-%dT%H:%M:%SZ','now'))",
708 params![space_id, name, path.to_string_lossy(), description, extensions_json],
709 );
710
711 match result {
712 Ok(_) => Ok(conn.last_insert_rowid()),
713 Err(Error::SqliteFailure(sqlite_err, _))
714 if sqlite_err.code == ErrorCode::ConstraintViolation =>
715 {
716 Err(KboltError::CollectionAlreadyExists {
717 name: name.to_string(),
718 space: space_name,
719 }
720 .into())
721 }
722 Err(err) => Err(err.into()),
723 }
724 }
725
726 pub fn get_collection(&self, space_id: i64, name: &str) -> Result<CollectionRow> {
727 let conn = self
728 .db
729 .lock()
730 .map_err(|_| CoreError::poisoned("database"))?;
731 let mut stmt = conn.prepare(
732 "SELECT id, space_id, name, path, description, extensions, created, updated
733 FROM collections
734 WHERE space_id = ?1 AND name = ?2",
735 )?;
736
737 let result = stmt.query_row(params![space_id, name], decode_collection_row);
738 match result {
739 Ok(row) => Ok(row),
740 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
741 name: name.to_string(),
742 }
743 .into()),
744 Err(err) => Err(err.into()),
745 }
746 }
747
748 pub fn get_collection_by_id(&self, id: i64) -> Result<CollectionRow> {
749 let conn = self
750 .db
751 .lock()
752 .map_err(|_| CoreError::poisoned("database"))?;
753 let mut stmt = conn.prepare(
754 "SELECT id, space_id, name, path, description, extensions, created, updated
755 FROM collections
756 WHERE id = ?1",
757 )?;
758
759 let result = stmt.query_row(params![id], decode_collection_row);
760 match result {
761 Ok(row) => Ok(row),
762 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
763 name: format!("id={id}"),
764 }
765 .into()),
766 Err(err) => Err(err.into()),
767 }
768 }
769
770 pub fn list_collections(&self, space_id: Option<i64>) -> Result<Vec<CollectionRow>> {
771 let conn = self
772 .db
773 .lock()
774 .map_err(|_| CoreError::poisoned("database"))?;
775
776 let (sql, params): (&str, Vec<i64>) = match space_id {
777 Some(id) => (
778 "SELECT id, space_id, name, path, description, extensions, created, updated
779 FROM collections
780 WHERE space_id = ?1
781 ORDER BY name ASC",
782 vec![id],
783 ),
784 None => (
785 "SELECT id, space_id, name, path, description, extensions, created, updated
786 FROM collections
787 ORDER BY space_id ASC, name ASC",
788 Vec::new(),
789 ),
790 };
791
792 let mut stmt = conn.prepare(sql)?;
793 let rows = if params.is_empty() {
794 stmt.query_map([], decode_collection_row)?
795 } else {
796 stmt.query_map(params![params[0]], decode_collection_row)?
797 };
798 let collections = rows.collect::<std::result::Result<Vec<_>, _>>()?;
799 Ok(collections)
800 }
801
802 pub fn count_collections_in_space(&self, space_id: i64) -> Result<usize> {
803 let conn = self
804 .db
805 .lock()
806 .map_err(|_| CoreError::poisoned("database"))?;
807 let _space_name = lookup_space_name(&conn, space_id)?;
808
809 query_count(
810 &conn,
811 "SELECT COUNT(*) FROM collections WHERE space_id = ?1",
812 params![space_id],
813 )
814 }
815
816 pub fn delete_collection(&self, space_id: i64, name: &str) -> Result<()> {
817 let conn = self
818 .db
819 .lock()
820 .map_err(|_| CoreError::poisoned("database"))?;
821 let _space_name = lookup_space_name(&conn, space_id)?;
822
823 let deleted = conn.execute(
824 "DELETE FROM collections WHERE space_id = ?1 AND name = ?2",
825 params![space_id, name],
826 )?;
827
828 if deleted == 0 {
829 return Err(KboltError::CollectionNotFound {
830 name: name.to_string(),
831 }
832 .into());
833 }
834
835 Ok(())
836 }
837
838 pub fn rename_collection(&self, space_id: i64, old: &str, new: &str) -> Result<()> {
839 let conn = self
840 .db
841 .lock()
842 .map_err(|_| CoreError::poisoned("database"))?;
843 let space_name = lookup_space_name(&conn, space_id)?;
844 let result = conn.execute(
845 "UPDATE collections
846 SET name = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
847 WHERE space_id = ?2 AND name = ?3",
848 params![new, space_id, old],
849 );
850
851 match result {
852 Ok(0) => Err(KboltError::CollectionNotFound {
853 name: old.to_string(),
854 }
855 .into()),
856 Ok(_) => Ok(()),
857 Err(Error::SqliteFailure(sqlite_err, _))
858 if sqlite_err.code == ErrorCode::ConstraintViolation =>
859 {
860 Err(KboltError::CollectionAlreadyExists {
861 name: new.to_string(),
862 space: space_name,
863 }
864 .into())
865 }
866 Err(err) => Err(err.into()),
867 }
868 }
869
870 pub fn update_collection_description(
871 &self,
872 space_id: i64,
873 name: &str,
874 desc: &str,
875 ) -> Result<()> {
876 let conn = self
877 .db
878 .lock()
879 .map_err(|_| CoreError::poisoned("database"))?;
880 let _space_name = lookup_space_name(&conn, space_id)?;
881
882 let updated = conn.execute(
883 "UPDATE collections
884 SET description = ?1, updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
885 WHERE space_id = ?2 AND name = ?3",
886 params![desc, space_id, name],
887 )?;
888
889 if updated == 0 {
890 return Err(KboltError::CollectionNotFound {
891 name: name.to_string(),
892 }
893 .into());
894 }
895
896 Ok(())
897 }
898
899 pub fn update_collection_timestamp(&self, collection_id: i64) -> Result<()> {
900 let conn = self
901 .db
902 .lock()
903 .map_err(|_| CoreError::poisoned("database"))?;
904
905 let updated = conn.execute(
906 "UPDATE collections
907 SET updated = strftime('%Y-%m-%dT%H:%M:%SZ','now')
908 WHERE id = ?1",
909 params![collection_id],
910 )?;
911
912 if updated == 0 {
913 return Err(KboltError::CollectionNotFound {
914 name: format!("id={collection_id}"),
915 }
916 .into());
917 }
918
919 Ok(())
920 }
921
922 pub fn upsert_document(
923 &self,
924 collection_id: i64,
925 path: &str,
926 title: &str,
927 title_source: DocumentTitleSource,
928 hash: &str,
929 modified: &str,
930 ) -> Result<i64> {
931 let conn = self
932 .db
933 .lock()
934 .map_err(|_| CoreError::poisoned("database"))?;
935 let _collection_name = lookup_collection_name(&conn, collection_id)?;
936
937 let id = conn.query_row(
938 "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
939 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
940 ON CONFLICT(collection_id, path) DO UPDATE SET
941 title = excluded.title,
942 title_source = excluded.title_source,
943 hash = excluded.hash,
944 modified = excluded.modified,
945 active = 1,
946 deactivated_at = NULL,
947 fts_dirty = 1
948 RETURNING id",
949 params![
950 collection_id,
951 path,
952 title,
953 title_source.as_sql(),
954 hash,
955 modified
956 ],
957 |row| row.get(0),
958 )?;
959 Ok(id)
960 }
961
962 pub fn get_document_by_path(
963 &self,
964 collection_id: i64,
965 path: &str,
966 ) -> Result<Option<DocumentRow>> {
967 let conn = self
968 .db
969 .lock()
970 .map_err(|_| CoreError::poisoned("database"))?;
971 let _collection_name = lookup_collection_name(&conn, collection_id)?;
972 let mut stmt = conn.prepare(
973 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
974 FROM documents
975 WHERE collection_id = ?1 AND path = ?2",
976 )?;
977
978 let result = stmt.query_row(params![collection_id, path], decode_document_row);
979 match result {
980 Ok(row) => Ok(Some(row)),
981 Err(Error::QueryReturnedNoRows) => Ok(None),
982 Err(err) => Err(err.into()),
983 }
984 }
985
986 pub fn get_document_by_id(&self, id: i64) -> Result<DocumentRow> {
987 let conn = self
988 .db
989 .lock()
990 .map_err(|_| CoreError::poisoned("database"))?;
991 let mut stmt = conn.prepare(
992 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
993 FROM documents
994 WHERE id = ?1",
995 )?;
996
997 let result = stmt.query_row(params![id], decode_document_row);
998 match result {
999 Ok(row) => Ok(row),
1000 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
1001 path: format!("id={id}"),
1002 }
1003 .into()),
1004 Err(err) => Err(err.into()),
1005 }
1006 }
1007
1008 pub fn get_documents_by_ids(&self, ids: &[i64]) -> Result<Vec<DocumentRow>> {
1009 if ids.is_empty() {
1010 return Ok(Vec::new());
1011 }
1012
1013 let conn = self
1014 .db
1015 .lock()
1016 .map_err(|_| CoreError::poisoned("database"))?;
1017
1018 let placeholders = vec!["?"; ids.len()].join(", ");
1019 let sql = format!(
1020 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1021 FROM documents
1022 WHERE id IN ({placeholders})"
1023 );
1024 let mut stmt = conn.prepare(&sql)?;
1025 let rows = stmt.query_map(params_from_iter(ids.iter()), decode_document_row)?;
1026 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1027 Ok(docs)
1028 }
1029
1030 pub fn refresh_document_activity(&self, doc_id: i64, modified: &str) -> Result<()> {
1031 let conn = self
1032 .db
1033 .lock()
1034 .map_err(|_| CoreError::poisoned("database"))?;
1035
1036 let updated = conn.execute(
1037 "UPDATE documents
1038 SET modified = ?1,
1039 active = 1,
1040 deactivated_at = NULL
1041 WHERE id = ?2",
1042 params![modified, doc_id],
1043 )?;
1044
1045 if updated == 0 {
1046 return Err(KboltError::DocumentNotFound {
1047 path: format!("id={doc_id}"),
1048 }
1049 .into());
1050 }
1051
1052 Ok(())
1053 }
1054
1055 pub fn list_documents(
1056 &self,
1057 collection_id: i64,
1058 active_only: bool,
1059 ) -> Result<Vec<DocumentRow>> {
1060 let conn = self
1061 .db
1062 .lock()
1063 .map_err(|_| CoreError::poisoned("database"))?;
1064 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1065 let active_only = i64::from(active_only);
1066 let mut stmt = conn.prepare(
1067 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1068 FROM documents
1069 WHERE collection_id = ?1
1070 AND (?2 = 0 OR active = 1)
1071 ORDER BY path ASC",
1072 )?;
1073 let rows = stmt.query_map(params![collection_id, active_only], decode_document_row)?;
1074 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1075 Ok(docs)
1076 }
1077
1078 pub fn list_collection_file_rows(
1079 &self,
1080 collection_id: i64,
1081 active_only: bool,
1082 ) -> Result<Vec<FileListRow>> {
1083 let conn = self
1084 .db
1085 .lock()
1086 .map_err(|_| CoreError::poisoned("database"))?;
1087 let _collection_name = lookup_collection_name(&conn, collection_id)?;
1088 let active_only = i64::from(active_only);
1089 let mut stmt = conn.prepare(
1090 "SELECT d.id, d.path, d.title, d.hash, d.active,
1091 COUNT(DISTINCT c.id) AS chunk_count,
1092 COUNT(DISTINCT e.chunk_id) AS embedded_chunk_count
1093 FROM documents d
1094 LEFT JOIN chunks c ON c.doc_id = d.id
1095 LEFT JOIN embeddings e ON e.chunk_id = c.id
1096 WHERE d.collection_id = ?1
1097 AND (?2 = 0 OR d.active = 1)
1098 GROUP BY d.id, d.path, d.title, d.hash, d.active
1099 ORDER BY d.path ASC",
1100 )?;
1101 let rows = stmt.query_map(params![collection_id, active_only], |row| {
1102 let chunk_count: i64 = row.get(5)?;
1103 let embedded_chunk_count: i64 = row.get(6)?;
1104 Ok(FileListRow {
1105 doc_id: row.get(0)?,
1106 path: row.get(1)?,
1107 title: row.get(2)?,
1108 hash: row.get(3)?,
1109 active: row.get::<_, i64>(4)? != 0,
1110 chunk_count: chunk_count as usize,
1111 embedded_chunk_count: embedded_chunk_count as usize,
1112 })
1113 })?;
1114 let files = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1115 Ok(files)
1116 }
1117
1118 pub fn get_document_by_hash_prefix(&self, prefix: &str) -> Result<Vec<DocumentRow>> {
1119 let conn = self
1120 .db
1121 .lock()
1122 .map_err(|_| CoreError::poisoned("database"))?;
1123
1124 let pattern = format!("{prefix}%");
1125 let mut stmt = conn.prepare(
1126 "SELECT id, collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty
1127 FROM documents
1128 WHERE hash LIKE ?1
1129 ORDER BY id ASC",
1130 )?;
1131 let rows = stmt.query_map(params![pattern], decode_document_row)?;
1132 let docs = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1133 Ok(docs)
1134 }
1135
1136 pub fn deactivate_document(&self, doc_id: i64) -> Result<()> {
1137 let conn = self
1138 .db
1139 .lock()
1140 .map_err(|_| CoreError::poisoned("database"))?;
1141
1142 let updated = conn.execute(
1143 "UPDATE documents
1144 SET active = 0,
1145 deactivated_at = CASE
1146 WHEN active = 1 THEN strftime('%Y-%m-%dT%H:%M:%SZ','now')
1147 ELSE deactivated_at
1148 END
1149 WHERE id = ?1",
1150 params![doc_id],
1151 )?;
1152
1153 if updated == 0 {
1154 return Err(KboltError::DocumentNotFound {
1155 path: format!("id={doc_id}"),
1156 }
1157 .into());
1158 }
1159
1160 Ok(())
1161 }
1162
1163 pub fn reactivate_document(&self, doc_id: i64) -> Result<()> {
1164 let conn = self
1165 .db
1166 .lock()
1167 .map_err(|_| CoreError::poisoned("database"))?;
1168
1169 let updated = conn.execute(
1170 "UPDATE documents
1171 SET active = 1, deactivated_at = NULL
1172 WHERE id = ?1",
1173 params![doc_id],
1174 )?;
1175
1176 if updated == 0 {
1177 return Err(KboltError::DocumentNotFound {
1178 path: format!("id={doc_id}"),
1179 }
1180 .into());
1181 }
1182
1183 Ok(())
1184 }
1185
1186 pub fn reap_documents(&self, older_than_days: u32) -> Result<Vec<i64>> {
1187 let reaped = self.list_reapable_documents(older_than_days)?;
1188 let doc_ids = reaped.iter().map(|item| item.doc_id).collect::<Vec<_>>();
1189 self.delete_documents(&doc_ids)?;
1190 Ok(doc_ids)
1191 }
1192
1193 pub fn list_reapable_documents(&self, older_than_days: u32) -> Result<Vec<ReapableDocument>> {
1194 self.list_reapable_documents_filtered(older_than_days, "", Vec::new())
1195 }
1196
1197 pub fn list_reapable_documents_in_space(
1198 &self,
1199 older_than_days: u32,
1200 space_id: i64,
1201 ) -> Result<Vec<ReapableDocument>> {
1202 self.list_reapable_documents_filtered(
1203 older_than_days,
1204 " AND c.space_id = ?",
1205 vec![SqlValue::Integer(space_id)],
1206 )
1207 }
1208
1209 pub fn list_reapable_documents_in_collections(
1210 &self,
1211 older_than_days: u32,
1212 collection_ids: &[i64],
1213 ) -> Result<Vec<ReapableDocument>> {
1214 if collection_ids.is_empty() {
1215 return Ok(Vec::new());
1216 }
1217
1218 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1219 let clause = format!(" AND d.collection_id IN ({placeholders})");
1220 let params = collection_ids
1221 .iter()
1222 .map(|id| SqlValue::Integer(*id))
1223 .collect::<Vec<_>>();
1224 self.list_reapable_documents_filtered(older_than_days, &clause, params)
1225 }
1226
1227 fn list_reapable_documents_filtered(
1228 &self,
1229 older_than_days: u32,
1230 scope_clause: &str,
1231 scope_params: Vec<SqlValue>,
1232 ) -> Result<Vec<ReapableDocument>> {
1233 let conn = self
1234 .db
1235 .lock()
1236 .map_err(|_| CoreError::poisoned("database"))?;
1237
1238 let modifier = format!("-{} days", older_than_days);
1239 let sql = format!(
1240 "SELECT d.id, s.name
1241 FROM documents d
1242 JOIN collections c ON c.id = d.collection_id
1243 JOIN spaces s ON s.id = c.space_id
1244 WHERE d.active = 0
1245 AND d.deactivated_at IS NOT NULL
1246 AND d.deactivated_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?){scope_clause}
1247 ORDER BY d.id ASC"
1248 );
1249 let mut stmt = conn.prepare(&sql)?;
1250 let mut params = Vec::with_capacity(scope_params.len() + 1);
1251 params.push(SqlValue::Text(modifier));
1252 params.extend(scope_params);
1253 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
1254 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1255 })?;
1256 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1257 drop(stmt);
1258
1259 let mut documents = Vec::with_capacity(headers.len());
1260 for (doc_id, space_name) in headers {
1261 documents.push(ReapableDocument {
1262 doc_id,
1263 space_name,
1264 chunk_ids: load_chunk_ids_for_doc(&conn, doc_id)?,
1265 });
1266 }
1267
1268 Ok(documents)
1269 }
1270
1271 pub fn delete_documents(&self, doc_ids: &[i64]) -> Result<()> {
1272 if doc_ids.is_empty() {
1273 return Ok(());
1274 }
1275
1276 let conn = self
1277 .db
1278 .lock()
1279 .map_err(|_| CoreError::poisoned("database"))?;
1280
1281 let placeholders = vec!["?"; doc_ids.len()].join(", ");
1282 let sql = format!("DELETE FROM documents WHERE id IN ({placeholders})");
1283 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
1284 Ok(())
1285 }
1286
1287 pub fn insert_chunks(&self, doc_id: i64, chunks: &[ChunkInsert]) -> Result<Vec<i64>> {
1288 if chunks.is_empty() {
1289 return Ok(Vec::new());
1290 }
1291
1292 let conn = self
1293 .db
1294 .lock()
1295 .map_err(|_| CoreError::poisoned("database"))?;
1296 let _doc = lookup_document_id(&conn, doc_id)?;
1297
1298 let tx = conn.unchecked_transaction()?;
1299 let mut stmt = tx.prepare(
1300 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1301 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1302 )?;
1303
1304 let mut ids = Vec::with_capacity(chunks.len());
1305 for chunk in chunks {
1306 stmt.execute(params![
1307 doc_id,
1308 chunk.seq,
1309 chunk.offset as i64,
1310 chunk.length as i64,
1311 chunk.heading,
1312 chunk.kind.as_storage_kind(),
1313 chunk.retrieval_prefix.as_deref(),
1314 ])?;
1315 ids.push(tx.last_insert_rowid());
1316 }
1317 drop(stmt);
1318 tx.commit()?;
1319 Ok(ids)
1320 }
1321
1322 pub fn delete_chunks_for_document(&self, doc_id: i64) -> Result<Vec<i64>> {
1323 let conn = self
1324 .db
1325 .lock()
1326 .map_err(|_| CoreError::poisoned("database"))?;
1327 let _doc = lookup_document_id(&conn, doc_id)?;
1328
1329 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1330 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1331 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1332
1333 conn.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1334 Ok(chunk_ids)
1335 }
1336
1337 pub fn get_chunks_for_document(&self, doc_id: i64) -> Result<Vec<ChunkRow>> {
1338 let conn = self
1339 .db
1340 .lock()
1341 .map_err(|_| CoreError::poisoned("database"))?;
1342 let _doc = lookup_document_id(&conn, doc_id)?;
1343
1344 let mut stmt = conn.prepare(
1345 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1346 FROM chunks
1347 WHERE doc_id = ?1
1348 ORDER BY seq ASC",
1349 )?;
1350 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
1351 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1352 Ok(chunks)
1353 }
1354
1355 pub fn get_chunks_for_documents(&self, doc_ids: &[i64]) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1356 if doc_ids.is_empty() {
1357 return Ok(HashMap::new());
1358 }
1359
1360 let mut requested = doc_ids.to_vec();
1361 requested.sort_unstable();
1362 requested.dedup();
1363
1364 let conn = self
1365 .db
1366 .lock()
1367 .map_err(|_| CoreError::poisoned("database"))?;
1368
1369 let placeholders = vec!["?"; requested.len()].join(", ");
1370 let sql = format!(
1371 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1372 FROM chunks
1373 WHERE doc_id IN ({placeholders})
1374 ORDER BY doc_id ASC, seq ASC"
1375 );
1376 let mut stmt = conn.prepare(&sql)?;
1377 let rows = stmt.query_map(params_from_iter(requested.iter()), decode_chunk_row)?;
1378 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1379 for doc_id in requested {
1380 chunks_by_doc.insert(doc_id, Vec::new());
1381 }
1382 for row in rows {
1383 let chunk = row?;
1384 chunks_by_doc.entry(chunk.doc_id).or_default().push(chunk);
1385 }
1386
1387 Ok(chunks_by_doc)
1388 }
1389
1390 pub fn get_chunks_for_document_seq_ranges(
1391 &self,
1392 ranges: &[(i64, i32, i32)],
1393 ) -> Result<HashMap<i64, Vec<ChunkRow>>> {
1394 if ranges.is_empty() {
1395 return Ok(HashMap::new());
1396 }
1397
1398 let mut ranges_by_doc: HashMap<i64, Vec<(i32, i32)>> = HashMap::new();
1399 for (doc_id, min_seq, max_seq) in ranges {
1400 if min_seq > max_seq {
1401 return Err(CoreError::Internal(format!(
1402 "chunk seq range min must be <= max for doc {doc_id}: {min_seq} > {max_seq}"
1403 )));
1404 }
1405
1406 ranges_by_doc
1407 .entry(*doc_id)
1408 .or_default()
1409 .push((*min_seq, *max_seq));
1410 }
1411
1412 for doc_ranges in ranges_by_doc.values_mut() {
1413 doc_ranges.sort_unstable();
1414 let mut merged: Vec<(i32, i32)> = Vec::with_capacity(doc_ranges.len());
1415 for (min_seq, max_seq) in doc_ranges.drain(..) {
1416 if let Some((_, merged_max)) = merged.last_mut() {
1417 if min_seq <= merged_max.saturating_add(1) {
1418 *merged_max = (*merged_max).max(max_seq);
1419 continue;
1420 }
1421 }
1422 merged.push((min_seq, max_seq));
1423 }
1424 *doc_ranges = merged;
1425 }
1426
1427 let conn = self
1428 .db
1429 .lock()
1430 .map_err(|_| CoreError::poisoned("database"))?;
1431 let mut stmt = conn.prepare(
1432 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1433 FROM chunks
1434 WHERE doc_id = ?1 AND seq BETWEEN ?2 AND ?3
1435 ORDER BY seq ASC",
1436 )?;
1437
1438 let mut chunks_by_doc: HashMap<i64, Vec<ChunkRow>> = HashMap::new();
1439 for (doc_id, doc_ranges) in ranges_by_doc {
1440 chunks_by_doc.entry(doc_id).or_default();
1441 for (min_seq, max_seq) in doc_ranges {
1442 let rows = stmt.query_map(params![doc_id, min_seq, max_seq], decode_chunk_row)?;
1443 for row in rows {
1444 chunks_by_doc.entry(doc_id).or_default().push(row?);
1445 }
1446 }
1447 }
1448
1449 Ok(chunks_by_doc)
1450 }
1451
1452 pub fn get_chunks(&self, chunk_ids: &[i64]) -> Result<Vec<ChunkRow>> {
1453 if chunk_ids.is_empty() {
1454 return Ok(Vec::new());
1455 }
1456
1457 let conn = self
1458 .db
1459 .lock()
1460 .map_err(|_| CoreError::poisoned("database"))?;
1461
1462 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1463 let sql = format!(
1464 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
1465 FROM chunks
1466 WHERE id IN ({placeholders})
1467 ORDER BY id ASC"
1468 );
1469 let mut stmt = conn.prepare(&sql)?;
1470 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), decode_chunk_row)?;
1471 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1472 Ok(chunks)
1473 }
1474
1475 pub fn get_chunks_with_documents(
1476 &self,
1477 chunk_ids: &[i64],
1478 ) -> Result<Vec<(ChunkRow, DocumentRow)>> {
1479 if chunk_ids.is_empty() {
1480 return Ok(Vec::new());
1481 }
1482
1483 let conn = self
1484 .db
1485 .lock()
1486 .map_err(|_| CoreError::poisoned("database"))?;
1487
1488 let placeholders = vec!["?"; chunk_ids.len()].join(", ");
1489 let sql = format!(
1490 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
1491 d.id, d.collection_id, d.path, d.title, d.title_source, d.hash, d.modified, d.active, d.deactivated_at, d.fts_dirty
1492 FROM chunks c
1493 JOIN documents d ON d.id = c.doc_id
1494 WHERE c.id IN ({placeholders})
1495 ORDER BY c.id ASC"
1496 );
1497 let mut stmt = conn.prepare(&sql)?;
1498 let rows = stmt.query_map(params_from_iter(chunk_ids.iter()), |row| {
1499 Ok((
1500 decode_chunk_row_at(row, 0)?,
1501 decode_document_row_at(row, 8)?,
1502 ))
1503 })?;
1504 let rows = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1505 Ok(rows)
1506 }
1507
1508 pub fn get_active_search_scope_summary_in_collections(
1509 &self,
1510 collection_ids: &[i64],
1511 ) -> Result<SearchScopeSummary> {
1512 if collection_ids.is_empty() {
1513 return Ok(SearchScopeSummary {
1514 document_ids: Vec::new(),
1515 chunk_count: 0,
1516 });
1517 }
1518
1519 let conn = self
1520 .db
1521 .lock()
1522 .map_err(|_| CoreError::poisoned("database"))?;
1523
1524 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1525 let sql = format!(
1526 "SELECT d.id, COUNT(c.id)
1527 FROM chunks c
1528 JOIN documents d ON d.id = c.doc_id
1529 WHERE d.active = 1
1530 AND d.collection_id IN ({placeholders})
1531 GROUP BY d.id
1532 ORDER BY d.id ASC"
1533 );
1534 let mut stmt = conn.prepare(&sql)?;
1535 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| {
1536 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
1537 })?;
1538
1539 let mut document_ids = Vec::new();
1540 let mut chunk_count = 0usize;
1541 for row in rows {
1542 let (doc_id, doc_chunk_count) = row?;
1543 let doc_chunk_count = usize::try_from(doc_chunk_count).map_err(|_| {
1544 CoreError::Internal(format!(
1545 "active chunk count must be non-negative for document {doc_id}"
1546 ))
1547 })?;
1548 document_ids.push(doc_id);
1549 chunk_count = chunk_count.saturating_add(doc_chunk_count);
1550 }
1551
1552 Ok(SearchScopeSummary {
1553 document_ids,
1554 chunk_count,
1555 })
1556 }
1557
1558 pub fn count_active_chunks_in_collections(&self, collection_ids: &[i64]) -> Result<usize> {
1559 if collection_ids.is_empty() {
1560 return Ok(0);
1561 }
1562
1563 let conn = self
1564 .db
1565 .lock()
1566 .map_err(|_| CoreError::poisoned("database"))?;
1567
1568 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1569 let sql = format!(
1570 "SELECT COUNT(*)
1571 FROM chunks c
1572 JOIN documents d ON d.id = c.doc_id
1573 WHERE d.active = 1
1574 AND d.collection_id IN ({placeholders})"
1575 );
1576 query_count(&conn, &sql, params_from_iter(collection_ids.iter()))
1577 }
1578
1579 pub fn has_inactive_documents_in_collections(&self, collection_ids: &[i64]) -> Result<bool> {
1580 if collection_ids.is_empty() {
1581 return Ok(false);
1582 }
1583
1584 let conn = self
1585 .db
1586 .lock()
1587 .map_err(|_| CoreError::poisoned("database"))?;
1588
1589 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1590 let sql = format!(
1591 "SELECT EXISTS(
1592 SELECT 1
1593 FROM documents
1594 WHERE active = 0
1595 AND collection_id IN ({placeholders})
1596 )"
1597 );
1598 let exists: i64 = conn.query_row(&sql, params_from_iter(collection_ids.iter()), |row| {
1599 row.get(0)
1600 })?;
1601 Ok(exists != 0)
1602 }
1603
1604 pub fn get_active_chunk_ids_in_collections(&self, collection_ids: &[i64]) -> Result<Vec<i64>> {
1605 if collection_ids.is_empty() {
1606 return Ok(Vec::new());
1607 }
1608
1609 let conn = self
1610 .db
1611 .lock()
1612 .map_err(|_| CoreError::poisoned("database"))?;
1613
1614 let placeholders = vec!["?"; collection_ids.len()].join(", ");
1615 let sql = format!(
1616 "SELECT c.id
1617 FROM chunks c
1618 JOIN documents d ON d.id = c.doc_id
1619 WHERE d.active = 1
1620 AND d.collection_id IN ({placeholders})
1621 ORDER BY d.id ASC, c.seq ASC"
1622 );
1623 let mut stmt = conn.prepare(&sql)?;
1624 let rows = stmt.query_map(params_from_iter(collection_ids.iter()), |row| row.get(0))?;
1625 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1626 Ok(chunk_ids)
1627 }
1628
1629 pub fn replace_document_generation(
1630 &self,
1631 replacement: DocumentGenerationReplace<'_>,
1632 ) -> Result<DocumentGenerationReplaceResult> {
1633 for chunk in replacement.chunks {
1634 validate_text_span(
1635 replacement.text,
1636 chunk.offset,
1637 chunk.length,
1638 &format!("chunk seq {}", chunk.seq),
1639 )?;
1640 }
1641
1642 let conn = self
1643 .db
1644 .lock()
1645 .map_err(|_| CoreError::poisoned("database"))?;
1646 let _collection_name = lookup_collection_name(&conn, replacement.collection_id)?;
1647
1648 let tx = conn.unchecked_transaction()?;
1649 let doc_id: i64 = tx.query_row(
1650 "INSERT INTO documents (collection_id, path, title, title_source, hash, modified, active, deactivated_at, fts_dirty)
1651 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, NULL, 1)
1652 ON CONFLICT(collection_id, path) DO UPDATE SET
1653 title = excluded.title,
1654 title_source = excluded.title_source,
1655 hash = excluded.hash,
1656 modified = excluded.modified,
1657 active = 1,
1658 deactivated_at = NULL,
1659 fts_dirty = 1
1660 RETURNING id",
1661 params![
1662 replacement.collection_id,
1663 replacement.path,
1664 replacement.title,
1665 replacement.title_source.as_sql(),
1666 replacement.hash,
1667 replacement.modified
1668 ],
1669 |row| row.get(0),
1670 )?;
1671
1672 let old_chunk_ids = {
1673 let mut stmt =
1674 tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
1675 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
1676 rows.collect::<std::result::Result<Vec<_>, _>>()?
1677 };
1678
1679 tx.execute(
1680 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1681 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1682 ON CONFLICT(doc_id) DO UPDATE SET
1683 extractor_key = excluded.extractor_key,
1684 source_hash = excluded.source_hash,
1685 text_hash = excluded.text_hash,
1686 generation_key = excluded.generation_key,
1687 text = excluded.text,
1688 created = excluded.created",
1689 params![
1690 doc_id,
1691 replacement.extractor_key,
1692 replacement.source_hash,
1693 replacement.text_hash,
1694 replacement.generation_key,
1695 replacement.text
1696 ],
1697 )?;
1698
1699 tx.execute("DELETE FROM chunks WHERE doc_id = ?1", params![doc_id])?;
1700
1701 let mut stmt = tx.prepare(
1702 "INSERT INTO chunks (doc_id, seq, offset, length, heading, kind, retrieval_prefix)
1703 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1704 )?;
1705 let mut chunk_ids = Vec::with_capacity(replacement.chunks.len());
1706 for chunk in replacement.chunks {
1707 stmt.execute(params![
1708 doc_id,
1709 chunk.seq,
1710 chunk.offset as i64,
1711 chunk.length as i64,
1712 chunk.heading,
1713 chunk.kind.as_storage_kind(),
1714 chunk.retrieval_prefix.as_deref(),
1715 ])?;
1716 chunk_ids.push(tx.last_insert_rowid());
1717 }
1718 drop(stmt);
1719
1720 tx.commit()?;
1721 Ok(DocumentGenerationReplaceResult {
1722 doc_id,
1723 old_chunk_ids,
1724 chunk_ids,
1725 })
1726 }
1727
1728 pub fn put_document_text(
1729 &self,
1730 doc_id: i64,
1731 extractor_key: &str,
1732 source_hash: &str,
1733 text_hash: &str,
1734 generation_key: &str,
1735 text: &str,
1736 ) -> Result<()> {
1737 let conn = self
1738 .db
1739 .lock()
1740 .map_err(|_| CoreError::poisoned("database"))?;
1741 let _doc = lookup_document_id(&conn, doc_id)?;
1742
1743 conn.execute(
1744 "INSERT INTO document_texts (doc_id, extractor_key, source_hash, text_hash, generation_key, text, created)
1745 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
1746 ON CONFLICT(doc_id) DO UPDATE SET
1747 extractor_key = excluded.extractor_key,
1748 source_hash = excluded.source_hash,
1749 text_hash = excluded.text_hash,
1750 generation_key = excluded.generation_key,
1751 text = excluded.text,
1752 created = excluded.created",
1753 params![
1754 doc_id,
1755 extractor_key,
1756 source_hash,
1757 text_hash,
1758 generation_key,
1759 text
1760 ],
1761 )?;
1762 Ok(())
1763 }
1764
1765 pub fn get_document_text(&self, doc_id: i64) -> Result<DocumentTextRow> {
1766 let conn = self
1767 .db
1768 .lock()
1769 .map_err(|_| CoreError::poisoned("database"))?;
1770 let _doc = lookup_document_id(&conn, doc_id)?;
1771
1772 let result = conn.query_row(
1773 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1774 FROM document_texts
1775 WHERE doc_id = ?1",
1776 params![doc_id],
1777 decode_document_text_row,
1778 );
1779 match result {
1780 Ok(row) => Ok(row),
1781 Err(Error::QueryReturnedNoRows) => Err(missing_document_text_error(doc_id)),
1782 Err(err) => Err(err.into()),
1783 }
1784 }
1785
1786 pub fn get_document_texts(&self, doc_ids: &[i64]) -> Result<HashMap<i64, DocumentTextRow>> {
1787 if doc_ids.is_empty() {
1788 return Ok(HashMap::new());
1789 }
1790
1791 let mut requested = doc_ids.to_vec();
1792 requested.sort_unstable();
1793 requested.dedup();
1794
1795 let text_by_doc = self.get_existing_document_texts(&requested)?;
1796 for doc_id in requested {
1797 if !text_by_doc.contains_key(&doc_id) {
1798 return Err(missing_document_text_error(doc_id));
1799 }
1800 }
1801
1802 Ok(text_by_doc)
1803 }
1804
1805 pub fn get_existing_document_texts(
1806 &self,
1807 doc_ids: &[i64],
1808 ) -> Result<HashMap<i64, DocumentTextRow>> {
1809 if doc_ids.is_empty() {
1810 return Ok(HashMap::new());
1811 }
1812
1813 let mut requested = doc_ids.to_vec();
1814 requested.sort_unstable();
1815 requested.dedup();
1816
1817 let conn = self
1818 .db
1819 .lock()
1820 .map_err(|_| CoreError::poisoned("database"))?;
1821 let mut text_by_doc = HashMap::new();
1822 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1823 let placeholders = vec!["?"; batch.len()].join(", ");
1824 let sql = format!(
1825 "SELECT doc_id, extractor_key, source_hash, text_hash, generation_key, text, created
1826 FROM document_texts
1827 WHERE doc_id IN ({placeholders})
1828 ORDER BY doc_id ASC"
1829 );
1830 let mut stmt = conn.prepare(&sql)?;
1831 let rows = stmt.query_map(params_from_iter(batch.iter()), decode_document_text_row)?;
1832 for row in rows {
1833 let row = row?;
1834 text_by_doc.insert(row.doc_id, row);
1835 }
1836 }
1837
1838 Ok(text_by_doc)
1839 }
1840
1841 pub fn has_document_text(&self, doc_id: i64) -> Result<bool> {
1842 let conn = self
1843 .db
1844 .lock()
1845 .map_err(|_| CoreError::poisoned("database"))?;
1846 let exists: i64 = conn.query_row(
1847 "SELECT EXISTS(SELECT 1 FROM document_texts WHERE doc_id = ?1)",
1848 params![doc_id],
1849 |row| row.get(0),
1850 )?;
1851 Ok(exists != 0)
1852 }
1853
1854 pub fn has_current_document_text(&self, doc_id: i64, generation_key: &str) -> Result<bool> {
1855 let conn = self
1856 .db
1857 .lock()
1858 .map_err(|_| CoreError::poisoned("database"))?;
1859 let exists: i64 = conn.query_row(
1860 "SELECT EXISTS(
1861 SELECT 1 FROM document_texts
1862 WHERE doc_id = ?1 AND generation_key = ?2
1863 )",
1864 params![doc_id, generation_key],
1865 |row| row.get(0),
1866 )?;
1867 Ok(exists != 0)
1868 }
1869
1870 pub fn get_document_text_generation_keys(
1871 &self,
1872 doc_ids: &[i64],
1873 ) -> Result<HashMap<i64, String>> {
1874 if doc_ids.is_empty() {
1875 return Ok(HashMap::new());
1876 }
1877
1878 let mut requested = doc_ids.to_vec();
1879 requested.sort_unstable();
1880 requested.dedup();
1881
1882 let conn = self
1883 .db
1884 .lock()
1885 .map_err(|_| CoreError::poisoned("database"))?;
1886 let mut generation_by_doc = HashMap::with_capacity(requested.len());
1887 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1888 let placeholders = vec!["?"; batch.len()].join(", ");
1889 let sql = format!(
1890 "SELECT doc_id, generation_key
1891 FROM document_texts
1892 WHERE doc_id IN ({placeholders})"
1893 );
1894 let mut stmt = conn.prepare(&sql)?;
1895 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1896 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1897 })?;
1898 for row in rows {
1899 let (doc_id, generation_key) = row?;
1900 generation_by_doc.insert(doc_id, generation_key);
1901 }
1902 }
1903
1904 Ok(generation_by_doc)
1905 }
1906
1907 pub fn get_document_text_extractors(&self, doc_ids: &[i64]) -> Result<HashMap<i64, String>> {
1908 if doc_ids.is_empty() {
1909 return Ok(HashMap::new());
1910 }
1911
1912 let mut requested = doc_ids.to_vec();
1913 requested.sort_unstable();
1914 requested.dedup();
1915
1916 let conn = self
1917 .db
1918 .lock()
1919 .map_err(|_| CoreError::poisoned("database"))?;
1920 let mut extractor_by_doc = HashMap::with_capacity(requested.len());
1921 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1922 let placeholders = vec!["?"; batch.len()].join(", ");
1923 let sql = format!(
1924 "SELECT doc_id, extractor_key
1925 FROM document_texts
1926 WHERE doc_id IN ({placeholders})"
1927 );
1928 let mut stmt = conn.prepare(&sql)?;
1929 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1930 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1931 })?;
1932 for row in rows {
1933 let (doc_id, extractor_key) = row?;
1934 extractor_by_doc.insert(doc_id, extractor_key);
1935 }
1936 }
1937
1938 for doc_id in requested {
1939 if !extractor_by_doc.contains_key(&doc_id) {
1940 return Err(missing_document_text_error(doc_id));
1941 }
1942 }
1943
1944 Ok(extractor_by_doc)
1945 }
1946
1947 pub fn get_canonical_chunk_texts(&self, chunk_ids: &[i64]) -> Result<HashMap<i64, String>> {
1948 if chunk_ids.is_empty() {
1949 return Ok(HashMap::new());
1950 }
1951
1952 let mut requested = chunk_ids.to_vec();
1953 requested.sort_unstable();
1954 requested.dedup();
1955
1956 let conn = self
1957 .db
1958 .lock()
1959 .map_err(|_| CoreError::poisoned("database"))?;
1960 let mut text_by_chunk = HashMap::with_capacity(requested.len());
1961 for batch in requested.chunks(SQLITE_IN_CLAUSE_BATCH_SIZE) {
1962 let placeholders = vec!["?"; batch.len()].join(", ");
1963 let sql = format!(
1964 "SELECT c.id,
1965 c.offset,
1966 c.length,
1967 length(CAST(dt.text AS BLOB)),
1968 substr(CAST(dt.text AS BLOB), c.offset + 1, 1),
1969 substr(CAST(dt.text AS BLOB), c.offset + c.length + 1, 1),
1970 substr(CAST(dt.text AS BLOB), c.offset + 1, c.length)
1971 FROM chunks c
1972 JOIN document_texts dt ON dt.doc_id = c.doc_id
1973 WHERE c.id IN ({placeholders})"
1974 );
1975 let mut stmt = conn.prepare(&sql)?;
1976 let rows = stmt.query_map(params_from_iter(batch.iter()), |row| {
1977 Ok((
1978 row.get::<_, i64>(0)?,
1979 row.get::<_, i64>(1)?,
1980 row.get::<_, i64>(2)?,
1981 row.get::<_, i64>(3)?,
1982 row.get::<_, Vec<u8>>(4)?,
1983 row.get::<_, Vec<u8>>(5)?,
1984 row.get::<_, Vec<u8>>(6)?,
1985 ))
1986 })?;
1987 for row in rows {
1988 let (chunk_id, offset, length, document_len, start_byte, end_byte, bytes) = row?;
1989 let text = canonical_chunk_slice_from_bytes(
1990 chunk_id,
1991 offset,
1992 length,
1993 document_len,
1994 start_byte,
1995 end_byte,
1996 bytes,
1997 )?;
1998 text_by_chunk.insert(chunk_id, text);
1999 }
2000 }
2001
2002 for chunk_id in requested {
2003 if !text_by_chunk.contains_key(&chunk_id) {
2004 return Err(CoreError::Internal(format!(
2005 "canonical text missing for chunk {chunk_id}"
2006 )));
2007 }
2008 }
2009
2010 Ok(text_by_chunk)
2011 }
2012
2013 pub fn get_chunk_text(&self, chunk_id: i64) -> Result<ChunkTextRow> {
2014 let conn = self
2015 .db
2016 .lock()
2017 .map_err(|_| CoreError::poisoned("database"))?;
2018 let result = conn.query_row(
2019 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix, dt.extractor_key, dt.text
2020 FROM chunks c
2021 JOIN document_texts dt ON dt.doc_id = c.doc_id
2022 WHERE c.id = ?1",
2023 params![chunk_id],
2024 |row| {
2025 let chunk = decode_chunk_row(row)?;
2026 let extractor_key: String = row.get(8)?;
2027 let document_text: String = row.get(9)?;
2028 Ok((chunk, extractor_key, document_text))
2029 },
2030 );
2031 let (chunk, extractor_key, document_text) = match result {
2032 Ok(row) => row,
2033 Err(Error::QueryReturnedNoRows) => {
2034 let doc_id = lookup_chunk_doc_id(&conn, chunk_id)?;
2035 return Err(missing_document_text_error(doc_id));
2036 }
2037 Err(err) => return Err(err.into()),
2038 };
2039 let text = chunk_text_from_canonical(&document_text, &chunk)?;
2040 Ok(ChunkTextRow {
2041 chunk,
2042 extractor_key,
2043 text,
2044 })
2045 }
2046
2047 pub fn insert_embeddings(&self, entries: &[(i64, &str)]) -> Result<()> {
2048 if entries.is_empty() {
2049 return Ok(());
2050 }
2051
2052 let conn = self
2053 .db
2054 .lock()
2055 .map_err(|_| CoreError::poisoned("database"))?;
2056
2057 let tx = conn.unchecked_transaction()?;
2058 let mut stmt = tx.prepare(
2059 "INSERT INTO embeddings (chunk_id, model, embedded_at)
2060 VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ','now'))
2061 ON CONFLICT(chunk_id, model) DO UPDATE SET
2062 embedded_at = excluded.embedded_at",
2063 )?;
2064
2065 for (chunk_id, model) in entries {
2066 stmt.execute(params![chunk_id, model])?;
2067 }
2068
2069 drop(stmt);
2070 tx.commit()?;
2071 Ok(())
2072 }
2073
2074 pub fn get_unembedded_chunks(
2075 &self,
2076 model: &str,
2077 after_chunk_id: i64,
2078 limit: usize,
2079 ) -> Result<Vec<EmbedRecord>> {
2080 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, "", Vec::new())
2081 }
2082
2083 pub fn get_unembedded_chunks_in_space(
2084 &self,
2085 model: &str,
2086 space_id: i64,
2087 after_chunk_id: i64,
2088 limit: usize,
2089 ) -> Result<Vec<EmbedRecord>> {
2090 self.get_unembedded_chunks_filtered(
2091 model,
2092 after_chunk_id,
2093 limit,
2094 " AND col.space_id = ?",
2095 vec![SqlValue::Integer(space_id)],
2096 )
2097 }
2098
2099 pub fn get_unembedded_chunks_in_collections(
2100 &self,
2101 model: &str,
2102 collection_ids: &[i64],
2103 after_chunk_id: i64,
2104 limit: usize,
2105 ) -> Result<Vec<EmbedRecord>> {
2106 if collection_ids.is_empty() {
2107 return Ok(Vec::new());
2108 }
2109
2110 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2111 let clause = format!(" AND d.collection_id IN ({placeholders})");
2112 let params = collection_ids
2113 .iter()
2114 .map(|id| SqlValue::Integer(*id))
2115 .collect::<Vec<_>>();
2116 self.get_unembedded_chunks_filtered(model, after_chunk_id, limit, &clause, params)
2117 }
2118
2119 fn get_unembedded_chunks_filtered(
2120 &self,
2121 model: &str,
2122 after_chunk_id: i64,
2123 limit: usize,
2124 scope_clause: &str,
2125 scope_params: Vec<SqlValue>,
2126 ) -> Result<Vec<EmbedRecord>> {
2127 let conn = self
2128 .db
2129 .lock()
2130 .map_err(|_| CoreError::poisoned("database"))?;
2131 let sql_limit = i64::try_from(limit)
2132 .map_err(|_| CoreError::Internal("limit too large for sqlite".to_string()))?;
2133
2134 let sql = format!(
2135 "SELECT c.id, c.doc_id, c.seq, c.offset, c.length, c.heading, c.kind, c.retrieval_prefix,
2136 d.path, col.path, s.name
2137 FROM chunks c
2138 JOIN documents d ON d.id = c.doc_id
2139 JOIN collections col ON col.id = d.collection_id
2140 JOIN spaces s ON s.id = col.space_id
2141 LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
2142 WHERE d.active = 1 AND e.chunk_id IS NULL AND c.id > ?{scope_clause}
2143 ORDER BY c.id ASC
2144 LIMIT ?"
2145 );
2146 let mut stmt = conn.prepare(&sql)?;
2147 let mut params = Vec::with_capacity(scope_params.len() + 3);
2148 params.push(SqlValue::Text(model.to_string()));
2149 params.push(SqlValue::Integer(after_chunk_id));
2150 params.extend(scope_params);
2151 params.push(SqlValue::Integer(sql_limit));
2152 let rows = stmt.query_map(params_from_iter(params.iter()), |row| {
2153 Ok(EmbedRecord {
2154 chunk: decode_chunk_row(row)?,
2155 doc_path: row.get(8)?,
2156 collection_path: PathBuf::from(row.get::<_, String>(9)?),
2157 space_name: row.get(10)?,
2158 })
2159 })?;
2160
2161 let records = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2162 Ok(records)
2163 }
2164
2165 pub fn delete_embeddings_for_model(&self, model: &str) -> Result<usize> {
2166 let conn = self
2167 .db
2168 .lock()
2169 .map_err(|_| CoreError::poisoned("database"))?;
2170
2171 let deleted = conn.execute("DELETE FROM embeddings WHERE model = ?1", params![model])?;
2172 Ok(deleted)
2173 }
2174
2175 pub fn delete_embeddings_for_space(&self, space_id: i64) -> Result<usize> {
2176 let conn = self
2177 .db
2178 .lock()
2179 .map_err(|_| CoreError::poisoned("database"))?;
2180 let _space_name = lookup_space_name(&conn, space_id)?;
2181
2182 let deleted = conn.execute(
2183 "DELETE FROM embeddings
2184 WHERE chunk_id IN (
2185 SELECT c.id
2186 FROM chunks c
2187 JOIN documents d ON d.id = c.doc_id
2188 JOIN collections col ON col.id = d.collection_id
2189 WHERE col.space_id = ?1
2190 )",
2191 params![space_id],
2192 )?;
2193 Ok(deleted)
2194 }
2195
2196 pub fn list_embedding_models_in_space(&self, space_id: i64) -> Result<Vec<String>> {
2197 let conn = self
2198 .db
2199 .lock()
2200 .map_err(|_| CoreError::poisoned("database"))?;
2201 let _space_name = lookup_space_name(&conn, space_id)?;
2202
2203 let mut stmt = conn.prepare(
2204 "SELECT DISTINCT e.model
2205 FROM embeddings e
2206 JOIN chunks c ON c.id = e.chunk_id
2207 JOIN documents d ON d.id = c.doc_id
2208 JOIN collections col ON col.id = d.collection_id
2209 WHERE col.space_id = ?1
2210 ORDER BY e.model ASC",
2211 )?;
2212 let rows = stmt.query_map(params![space_id], |row| row.get::<_, String>(0))?;
2213 let models = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2214 Ok(models)
2215 }
2216
2217 pub fn count_embeddings(&self) -> Result<usize> {
2218 let conn = self
2219 .db
2220 .lock()
2221 .map_err(|_| CoreError::poisoned("database"))?;
2222
2223 let count: i64 = conn.query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))?;
2224 Ok(count as usize)
2225 }
2226
2227 pub fn index_tantivy(&self, space: &str, entries: &[TantivyEntry]) -> Result<()> {
2228 if entries.is_empty() {
2229 return Ok(());
2230 }
2231
2232 let space_indexes = self.get_space_indexes(space)?;
2233 with_tantivy_writer(&space_indexes, |writer| {
2234 for entry in entries {
2235 let chunk_id = u64::try_from(entry.chunk_id).map_err(|_| {
2236 CoreError::Internal(format!(
2237 "chunk_id must be non-negative for tantivy indexing: {}",
2238 entry.chunk_id
2239 ))
2240 })?;
2241 let doc_id = u64::try_from(entry.doc_id).map_err(|_| {
2242 CoreError::Internal(format!(
2243 "doc_id must be non-negative for tantivy indexing: {}",
2244 entry.doc_id
2245 ))
2246 })?;
2247
2248 let mut doc = TantivyDocument::default();
2249 doc.add_u64(space_indexes.fields.chunk_id, chunk_id);
2250 doc.add_u64(space_indexes.fields.doc_id, doc_id);
2251 doc.add_text(space_indexes.fields.filepath, &entry.filepath);
2252 if let Some(title) = &entry.semantic_title {
2253 doc.add_text(space_indexes.fields.title, title);
2254 }
2255 if let Some(heading) = &entry.heading {
2256 doc.add_text(space_indexes.fields.heading, heading);
2257 }
2258 doc.add_text(space_indexes.fields.body, &entry.body);
2259 writer.add_document(doc)?;
2260 }
2261 Ok(())
2262 })
2263 }
2264
2265 pub fn delete_tantivy(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
2266 if chunk_ids.is_empty() {
2267 return Ok(());
2268 }
2269
2270 let space_indexes = self.get_space_indexes(space)?;
2271 with_tantivy_writer(&space_indexes, |writer| {
2272 for chunk_id in chunk_ids {
2273 let chunk_key = u64::try_from(*chunk_id).map_err(|_| {
2274 CoreError::Internal(format!(
2275 "chunk_id must be non-negative for tantivy delete: {chunk_id}"
2276 ))
2277 })?;
2278 writer.delete_term(Term::from_field_u64(
2279 space_indexes.fields.chunk_id,
2280 chunk_key,
2281 ));
2282 }
2283
2284 Ok(())
2285 })
2286 }
2287
2288 pub fn delete_tantivy_by_doc(&self, space: &str, doc_id: i64) -> Result<()> {
2289 let space_indexes = self.get_space_indexes(space)?;
2290 with_tantivy_writer(&space_indexes, |writer| {
2291 let doc_key = u64::try_from(doc_id).map_err(|_| {
2292 CoreError::Internal(format!(
2293 "doc_id must be non-negative for tantivy delete-by-doc: {doc_id}"
2294 ))
2295 })?;
2296 writer.delete_term(Term::from_field_u64(space_indexes.fields.doc_id, doc_key));
2297 Ok(())
2298 })
2299 }
2300
2301 pub fn query_bm25(
2302 &self,
2303 space: &str,
2304 query: &str,
2305 fields: &[(&str, f32)],
2306 limit: usize,
2307 ) -> Result<Vec<BM25Hit>> {
2308 self.query_bm25_filtered(space, query, fields, None, limit, true)
2309 }
2310
2311 pub(crate) fn query_bm25_cached(
2312 &self,
2313 space: &str,
2314 query: &str,
2315 fields: &[(&str, f32)],
2316 limit: usize,
2317 ) -> Result<Vec<BM25Hit>> {
2318 self.query_bm25_filtered(space, query, fields, None, limit, false)
2319 }
2320
2321 pub fn query_bm25_in_documents(
2322 &self,
2323 space: &str,
2324 query: &str,
2325 fields: &[(&str, f32)],
2326 document_ids: &[i64],
2327 limit: usize,
2328 ) -> Result<Vec<BM25Hit>> {
2329 if document_ids.is_empty() {
2330 return Ok(Vec::new());
2331 }
2332
2333 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, true)
2334 }
2335
2336 pub(crate) fn query_bm25_in_documents_cached(
2337 &self,
2338 space: &str,
2339 query: &str,
2340 fields: &[(&str, f32)],
2341 document_ids: &[i64],
2342 limit: usize,
2343 ) -> Result<Vec<BM25Hit>> {
2344 if document_ids.is_empty() {
2345 return Ok(Vec::new());
2346 }
2347
2348 self.query_bm25_filtered(space, query, fields, Some(document_ids), limit, false)
2349 }
2350
2351 fn query_bm25_filtered(
2352 &self,
2353 space: &str,
2354 query: &str,
2355 fields: &[(&str, f32)],
2356 document_ids: Option<&[i64]>,
2357 limit: usize,
2358 reload_reader: bool,
2359 ) -> Result<Vec<BM25Hit>> {
2360 if limit == 0 {
2361 return Ok(Vec::new());
2362 }
2363
2364 if fields.is_empty() {
2365 return Err(CoreError::Internal(
2366 "bm25 query requires at least one field".to_string(),
2367 ));
2368 }
2369
2370 let space_indexes = self.get_space_indexes(space)?;
2371
2372 let schema = space_indexes.tantivy_index.schema();
2373 let query_fields = fields
2374 .iter()
2375 .map(|(name, boost)| {
2376 let field = resolve_tantivy_field(space_indexes.fields, name)?;
2377 let field_entry = schema.get_field_entry(field);
2378 let index_record_option = field_entry
2379 .field_type()
2380 .get_index_record_option()
2381 .ok_or_else(|| {
2382 CoreError::Internal(format!(
2383 "bm25 field '{}' is not indexed",
2384 field_entry.name()
2385 ))
2386 })?;
2387 Ok(Bm25FieldSpec {
2388 field,
2389 boost: *boost,
2390 index_record_option,
2391 })
2392 })
2393 .collect::<Result<Vec<_>>>()?;
2394 let Some(parsed_query) =
2395 build_literal_bm25_query(&space_indexes.tantivy_index, &query_fields, query)?
2396 else {
2397 return Ok(Vec::new());
2398 };
2399 let parsed_query = if let Some(document_ids) = document_ids {
2400 Box::new(BooleanQuery::new(vec![
2401 (Occur::Must, parsed_query),
2402 (
2403 Occur::Must,
2404 build_doc_id_filter_query(space_indexes.fields.doc_id, document_ids)?,
2405 ),
2406 ])) as Box<dyn Query>
2407 } else {
2408 parsed_query
2409 };
2410 if reload_reader {
2411 space_indexes.tantivy_reader.reload()?;
2412 }
2413 let searcher = space_indexes.tantivy_reader.searcher();
2414 let docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
2415
2416 let mut hits = Vec::with_capacity(docs.len());
2417 let chunk_id_field_name = schema.get_field_entry(space_indexes.fields.chunk_id).name();
2418 let mut chunk_id_columns = HashMap::new();
2419 for (score, address) in docs {
2420 let chunk_id_column = match chunk_id_columns.entry(address.segment_ord) {
2421 std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
2422 std::collections::hash_map::Entry::Vacant(entry) => {
2423 let column = searcher
2424 .segment_reader(address.segment_ord)
2425 .fast_fields()
2426 .u64(chunk_id_field_name)?;
2427 entry.insert(column)
2428 }
2429 };
2430 let chunk_id = chunk_id_column.first(address.doc_id).ok_or_else(|| {
2431 CoreError::Internal("tantivy hit missing chunk_id fast field".to_string())
2432 })?;
2433 let chunk_id = i64::try_from(chunk_id).map_err(|_| {
2434 CoreError::Internal(format!("tantivy chunk_id exceeds i64 range: {chunk_id}"))
2435 })?;
2436 hits.push(BM25Hit { chunk_id, score });
2437 }
2438
2439 Ok(hits)
2440 }
2441
2442 pub(crate) fn reload_tantivy_reader(&self, space: &str) -> Result<()> {
2443 let space_indexes = self.get_space_indexes(space)?;
2444 space_indexes.tantivy_reader.reload()?;
2445 Ok(())
2446 }
2447
2448 pub fn commit_tantivy(&self, space: &str) -> Result<()> {
2449 let space_indexes = self.get_space_indexes(space)?;
2450 let mut writer = space_indexes
2451 .tantivy_writer
2452 .lock()
2453 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
2454 let Some(mut writer) = writer.take() else {
2455 return Ok(());
2456 };
2457 writer.commit()?;
2458 space_indexes.tantivy_reader.reload()?;
2459 Ok(())
2460 }
2461
2462 pub fn insert_usearch(&self, space: &str, key: i64, vector: &[f32]) -> Result<()> {
2463 self.batch_insert_usearch(space, &[(key, vector)])
2464 }
2465
2466 pub fn batch_insert_usearch(&self, space: &str, entries: &[(i64, &[f32])]) -> Result<()> {
2467 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Immediate)
2468 }
2469
2470 pub(crate) fn batch_insert_usearch_deferred(
2471 &self,
2472 space: &str,
2473 entries: &[(i64, &[f32])],
2474 ) -> Result<()> {
2475 self.batch_insert_usearch_with_save_mode(space, entries, UsearchSaveMode::Deferred)
2476 }
2477
2478 fn batch_insert_usearch_with_save_mode(
2479 &self,
2480 space: &str,
2481 entries: &[(i64, &[f32])],
2482 save_mode: UsearchSaveMode,
2483 ) -> Result<()> {
2484 if entries.is_empty() {
2485 return Ok(());
2486 }
2487
2488 let space_indexes = self.get_space_indexes(space)?;
2489
2490 let expected_dimensions = entries[0].1.len();
2491 if expected_dimensions == 0 {
2492 return Err(CoreError::Internal(
2493 "cannot insert empty vector into usearch index".to_string(),
2494 ));
2495 }
2496 for (_, vector) in entries {
2497 if vector.len() != expected_dimensions {
2498 return Err(CoreError::Internal(format!(
2499 "vector dimension mismatch in batch insert: expected {expected_dimensions}, got {}",
2500 vector.len()
2501 )));
2502 }
2503 }
2504
2505 let mut index = space_indexes
2506 .usearch_index
2507 .write()
2508 .map_err(|_| CoreError::poisoned("usearch index"))?;
2509 let ensure_started = std::time::Instant::now();
2510 ensure_usearch_dimensions(&mut index, expected_dimensions)?;
2511 crate::profile::record_update_stage("usearch_ensure_dimensions", ensure_started.elapsed());
2512
2513 let target_capacity = index.size().saturating_add(entries.len());
2514 let reserve_started = std::time::Instant::now();
2515 index.reserve(target_capacity).map_err(|err| {
2516 CoreError::Internal(format!(
2517 "usearch reserve failed for {target_capacity} items: {err}"
2518 ))
2519 })?;
2520 crate::profile::record_update_stage("usearch_reserve", reserve_started.elapsed());
2521
2522 if matches!(save_mode, UsearchSaveMode::Deferred) {
2523 self.mark_usearch_dirty(space)?;
2524 }
2525
2526 let add_started = std::time::Instant::now();
2527 for (key, vector) in entries {
2528 let key = u64::try_from(*key).map_err(|_| {
2529 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2530 })?;
2531 index
2532 .add::<f32>(key, vector)
2533 .map_err(|err| CoreError::Internal(format!("usearch add failed: {err}")))?;
2534 }
2535 crate::profile::record_update_stage("usearch_add", add_started.elapsed());
2536
2537 if matches!(save_mode, UsearchSaveMode::Immediate) {
2538 let save_started = std::time::Instant::now();
2539 save_usearch_index(&index, &space_indexes.usearch_path)?;
2540 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2541 }
2542 Ok(())
2543 }
2544
2545 pub fn delete_usearch(&self, space: &str, keys: &[i64]) -> Result<()> {
2546 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Immediate)
2547 }
2548
2549 pub(crate) fn delete_usearch_deferred(&self, space: &str, keys: &[i64]) -> Result<()> {
2550 self.delete_usearch_with_save_mode(space, keys, UsearchSaveMode::Deferred)
2551 }
2552
2553 fn delete_usearch_with_save_mode(
2554 &self,
2555 space: &str,
2556 keys: &[i64],
2557 save_mode: UsearchSaveMode,
2558 ) -> Result<()> {
2559 if keys.is_empty() {
2560 return Ok(());
2561 }
2562
2563 let space_indexes = self.get_space_indexes(space)?;
2564 let index = space_indexes
2565 .usearch_index
2566 .write()
2567 .map_err(|_| CoreError::poisoned("usearch index"))?;
2568
2569 if matches!(save_mode, UsearchSaveMode::Deferred) {
2570 self.mark_usearch_dirty(space)?;
2571 }
2572
2573 let delete_started = std::time::Instant::now();
2574 for key in keys {
2575 let key = u64::try_from(*key).map_err(|_| {
2576 CoreError::Internal(format!("usearch key must be non-negative: {}", *key))
2577 })?;
2578 index
2579 .remove(key)
2580 .map_err(|err| CoreError::Internal(format!("usearch remove failed: {err}")))?;
2581 }
2582 crate::profile::record_update_stage("usearch_delete", delete_started.elapsed());
2583
2584 if matches!(save_mode, UsearchSaveMode::Immediate) {
2585 let save_started = std::time::Instant::now();
2586 save_usearch_index(&index, &space_indexes.usearch_path)?;
2587 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2588 }
2589 Ok(())
2590 }
2591
2592 pub(crate) fn save_dirty_usearch_indexes(&self) -> Result<()> {
2593 let spaces = {
2594 let mut dirty = self
2595 .dirty_usearch_spaces
2596 .lock()
2597 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2598 if dirty.is_empty() {
2599 return Ok(());
2600 }
2601
2602 let mut spaces = dirty.drain().collect::<Vec<_>>();
2603 spaces.sort();
2604 spaces
2605 };
2606
2607 for (index, space) in spaces.iter().enumerate() {
2608 if let Err(err) = self.save_usearch_for_space(space) {
2609 let mut dirty = self
2610 .dirty_usearch_spaces
2611 .lock()
2612 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?;
2613 for unsaved in &spaces[index..] {
2614 dirty.insert(unsaved.clone());
2615 }
2616 return Err(err);
2617 }
2618 crate::profile::increment_update_count("usearch_saved_spaces", 1);
2619 }
2620
2621 Ok(())
2622 }
2623
2624 fn mark_usearch_dirty(&self, space: &str) -> Result<()> {
2625 self.dirty_usearch_spaces
2626 .lock()
2627 .map_err(|_| CoreError::poisoned("dirty usearch spaces"))?
2628 .insert(space.to_string());
2629 Ok(())
2630 }
2631
2632 fn save_usearch_for_space(&self, space: &str) -> Result<()> {
2633 let space_indexes = self.get_space_indexes(space)?;
2634 let index = space_indexes
2635 .usearch_index
2636 .read()
2637 .map_err(|_| CoreError::poisoned("usearch index"))?;
2638
2639 let save_started = std::time::Instant::now();
2640 save_usearch_index(&index, &space_indexes.usearch_path)?;
2641 crate::profile::record_update_stage("usearch_save", save_started.elapsed());
2642 Ok(())
2643 }
2644
2645 pub fn query_dense(&self, space: &str, vector: &[f32], limit: usize) -> Result<Vec<DenseHit>> {
2646 self.query_dense_filtered(space, vector, None, limit)
2647 }
2648
2649 pub fn query_dense_in_chunks(
2650 &self,
2651 space: &str,
2652 vector: &[f32],
2653 chunk_ids: &[i64],
2654 limit: usize,
2655 ) -> Result<Vec<DenseHit>> {
2656 if chunk_ids.is_empty() {
2657 return Ok(Vec::new());
2658 }
2659
2660 let allowed_keys = chunk_ids
2661 .iter()
2662 .map(|chunk_id| {
2663 u64::try_from(*chunk_id).map_err(|_| {
2664 CoreError::Internal(format!(
2665 "chunk_id must be non-negative for usearch query: {chunk_id}"
2666 ))
2667 })
2668 })
2669 .collect::<Result<HashSet<_>>>()?;
2670 self.query_dense_in_key_set(space, vector, &allowed_keys, limit)
2671 }
2672
2673 pub(crate) fn query_dense_in_key_set(
2674 &self,
2675 space: &str,
2676 vector: &[f32],
2677 allowed_keys: &HashSet<u64>,
2678 limit: usize,
2679 ) -> Result<Vec<DenseHit>> {
2680 if allowed_keys.is_empty() {
2681 return Ok(Vec::new());
2682 }
2683
2684 self.query_dense_filtered(space, vector, Some(allowed_keys), limit)
2685 }
2686
2687 fn query_dense_filtered(
2688 &self,
2689 space: &str,
2690 vector: &[f32],
2691 allowed_keys: Option<&HashSet<u64>>,
2692 limit: usize,
2693 ) -> Result<Vec<DenseHit>> {
2694 if limit == 0 {
2695 return Ok(Vec::new());
2696 }
2697 if vector.is_empty() {
2698 return Err(CoreError::Internal(
2699 "cannot query usearch with empty vector".to_string(),
2700 ));
2701 }
2702
2703 let space_indexes = self.get_space_indexes(space)?;
2704 let index = space_indexes
2705 .usearch_index
2706 .read()
2707 .map_err(|_| CoreError::poisoned("usearch index"))?;
2708
2709 if index.size() == 0 {
2710 return Ok(Vec::new());
2711 }
2712 if vector.len() != index.dimensions() {
2713 return Err(CoreError::Internal(format!(
2714 "query vector dimension mismatch: expected {}, got {}",
2715 index.dimensions(),
2716 vector.len()
2717 )));
2718 }
2719
2720 let matches = if let Some(allowed_keys) = allowed_keys {
2721 index
2722 .filtered_search::<f32, _>(vector, limit, |key| allowed_keys.contains(&key))
2723 .map_err(|err| {
2724 CoreError::Internal(format!("usearch filtered query failed: {err}"))
2725 })?
2726 } else {
2727 index
2728 .search::<f32>(vector, limit)
2729 .map_err(|err| CoreError::Internal(format!("usearch query failed: {err}")))?
2730 };
2731 let hits = matches
2732 .keys
2733 .into_iter()
2734 .zip(matches.distances)
2735 .map(|(key, distance)| DenseHit {
2736 chunk_id: key as i64,
2737 distance,
2738 })
2739 .collect();
2740 Ok(hits)
2741 }
2742
2743 pub fn count_usearch(&self, space: &str) -> Result<usize> {
2744 let space_indexes = self.get_space_indexes(space)?;
2745 let index = space_indexes
2746 .usearch_index
2747 .read()
2748 .map_err(|_| CoreError::poisoned("usearch index"))?;
2749 Ok(index.size())
2750 }
2751
2752 pub fn clear_usearch(&self, space: &str) -> Result<()> {
2753 let space_indexes = self.get_space_indexes(space)?;
2754 let index = space_indexes
2755 .usearch_index
2756 .write()
2757 .map_err(|_| CoreError::poisoned("usearch index"))?;
2758 let clear_started = std::time::Instant::now();
2759 index
2760 .reset()
2761 .map_err(|err| CoreError::Internal(format!("usearch clear failed: {err}")))?;
2762 std::fs::File::create(&space_indexes.usearch_path)?;
2763 crate::profile::record_update_stage("usearch_clear", clear_started.elapsed());
2764 Ok(())
2765 }
2766
2767 pub fn get_fts_dirty_documents(&self) -> Result<Vec<FtsDirtyRecord>> {
2768 self.get_fts_dirty_documents_filtered("", Vec::new())
2769 }
2770
2771 pub fn get_fts_dirty_documents_in_space(&self, space_id: i64) -> Result<Vec<FtsDirtyRecord>> {
2772 self.get_fts_dirty_documents_filtered(
2773 " AND c.space_id = ?",
2774 vec![SqlValue::Integer(space_id)],
2775 )
2776 }
2777
2778 pub fn get_fts_dirty_documents_in_collections(
2779 &self,
2780 collection_ids: &[i64],
2781 ) -> Result<Vec<FtsDirtyRecord>> {
2782 if collection_ids.is_empty() {
2783 return Ok(Vec::new());
2784 }
2785
2786 let placeholders = vec!["?"; collection_ids.len()].join(", ");
2787 let clause = format!(" AND d.collection_id IN ({placeholders})");
2788 let params = collection_ids
2789 .iter()
2790 .map(|id| SqlValue::Integer(*id))
2791 .collect::<Vec<_>>();
2792 self.get_fts_dirty_documents_filtered(&clause, params)
2793 }
2794
2795 fn get_fts_dirty_documents_filtered(
2796 &self,
2797 scope_clause: &str,
2798 scope_params: Vec<SqlValue>,
2799 ) -> Result<Vec<FtsDirtyRecord>> {
2800 let conn = self
2801 .db
2802 .lock()
2803 .map_err(|_| CoreError::poisoned("database"))?;
2804
2805 let sql = format!(
2806 "SELECT d.id, d.path, d.title, d.title_source, d.hash, c.path, s.name
2807 FROM documents d
2808 JOIN collections c ON c.id = d.collection_id
2809 JOIN spaces s ON s.id = c.space_id
2810 WHERE d.fts_dirty = 1{scope_clause}
2811 ORDER BY d.id ASC"
2812 );
2813 let mut stmt = conn.prepare(&sql)?;
2814 let rows = stmt.query_map(params_from_iter(scope_params.iter()), |row| {
2815 Ok((
2816 row.get::<_, i64>(0)?,
2817 row.get::<_, String>(1)?,
2818 row.get::<_, String>(2)?,
2819 row.get::<_, String>(3)?,
2820 row.get::<_, String>(4)?,
2821 row.get::<_, String>(5)?,
2822 row.get::<_, String>(6)?,
2823 ))
2824 })?;
2825 let headers = rows.collect::<std::result::Result<Vec<_>, _>>()?;
2826 drop(stmt);
2827
2828 let mut records = Vec::with_capacity(headers.len());
2829 for (
2830 doc_id,
2831 doc_path,
2832 doc_title,
2833 doc_title_source,
2834 doc_hash,
2835 collection_path,
2836 space_name,
2837 ) in headers
2838 {
2839 let chunks = load_chunks_for_doc(&conn, doc_id)?;
2840 records.push(FtsDirtyRecord {
2841 doc_id,
2842 doc_path,
2843 doc_title,
2844 doc_title_source: DocumentTitleSource::from_sql(&doc_title_source)?,
2845 doc_hash,
2846 collection_path: PathBuf::from(collection_path),
2847 space_name,
2848 chunks,
2849 });
2850 }
2851
2852 Ok(records)
2853 }
2854
2855 pub fn batch_clear_fts_dirty(&self, doc_ids: &[i64]) -> Result<()> {
2856 if doc_ids.is_empty() {
2857 return Ok(());
2858 }
2859
2860 let conn = self
2861 .db
2862 .lock()
2863 .map_err(|_| CoreError::poisoned("database"))?;
2864
2865 let placeholders = vec!["?"; doc_ids.len()].join(", ");
2866 let sql = format!("UPDATE documents SET fts_dirty = 0 WHERE id IN ({placeholders})");
2867 conn.execute(&sql, params_from_iter(doc_ids.iter()))?;
2868 Ok(())
2869 }
2870
2871 pub fn count_documents_in_collection(
2872 &self,
2873 collection_id: i64,
2874 active_only: bool,
2875 ) -> Result<usize> {
2876 let conn = self
2877 .db
2878 .lock()
2879 .map_err(|_| CoreError::poisoned("database"))?;
2880 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2881 let active_only = i64::from(active_only);
2882 query_count(
2883 &conn,
2884 "SELECT COUNT(*)
2885 FROM documents
2886 WHERE collection_id = ?1
2887 AND (?2 = 0 OR active = 1)",
2888 params![collection_id, active_only],
2889 )
2890 }
2891
2892 pub fn count_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2893 let conn = self
2894 .db
2895 .lock()
2896 .map_err(|_| CoreError::poisoned("database"))?;
2897 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2898
2899 query_count(
2900 &conn,
2901 "SELECT COUNT(*)
2902 FROM chunks c
2903 JOIN documents d ON d.id = c.doc_id
2904 WHERE d.collection_id = ?1",
2905 params![collection_id],
2906 )
2907 }
2908
2909 pub fn count_embedded_chunks_in_collection(&self, collection_id: i64) -> Result<usize> {
2910 let conn = self
2911 .db
2912 .lock()
2913 .map_err(|_| CoreError::poisoned("database"))?;
2914 let _collection_name = lookup_collection_name(&conn, collection_id)?;
2915
2916 query_count(
2917 &conn,
2918 "SELECT COUNT(DISTINCT e.chunk_id)
2919 FROM embeddings e
2920 JOIN chunks c ON c.id = e.chunk_id
2921 JOIN documents d ON d.id = c.doc_id
2922 WHERE d.collection_id = ?1",
2923 params![collection_id],
2924 )
2925 }
2926
2927 pub fn count_documents(&self, space_id: Option<i64>) -> Result<usize> {
2928 let conn = self
2929 .db
2930 .lock()
2931 .map_err(|_| CoreError::poisoned("database"))?;
2932
2933 match space_id {
2934 Some(space_id) => {
2935 let _space_name = lookup_space_name(&conn, space_id)?;
2936 query_count(
2937 &conn,
2938 "SELECT COUNT(*)
2939 FROM documents d
2940 JOIN collections c ON c.id = d.collection_id
2941 WHERE c.space_id = ?1",
2942 params![space_id],
2943 )
2944 }
2945 None => query_count(&conn, "SELECT COUNT(*) FROM documents", []),
2946 }
2947 }
2948
2949 pub fn count_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2950 let conn = self
2951 .db
2952 .lock()
2953 .map_err(|_| CoreError::poisoned("database"))?;
2954
2955 match space_id {
2956 Some(space_id) => {
2957 let _space_name = lookup_space_name(&conn, space_id)?;
2958 query_count(
2959 &conn,
2960 "SELECT COUNT(*)
2961 FROM chunks c
2962 JOIN documents d ON d.id = c.doc_id
2963 JOIN collections col ON col.id = d.collection_id
2964 WHERE col.space_id = ?1",
2965 params![space_id],
2966 )
2967 }
2968 None => query_count(&conn, "SELECT COUNT(*) FROM chunks", []),
2969 }
2970 }
2971
2972 pub fn count_embedded_chunks(&self, space_id: Option<i64>) -> Result<usize> {
2973 let conn = self
2974 .db
2975 .lock()
2976 .map_err(|_| CoreError::poisoned("database"))?;
2977
2978 match space_id {
2979 Some(space_id) => {
2980 let _space_name = lookup_space_name(&conn, space_id)?;
2981 query_count(
2982 &conn,
2983 "SELECT COUNT(DISTINCT e.chunk_id)
2984 FROM embeddings e
2985 JOIN chunks c ON c.id = e.chunk_id
2986 JOIN documents d ON d.id = c.doc_id
2987 JOIN collections col ON col.id = d.collection_id
2988 WHERE col.space_id = ?1",
2989 params![space_id],
2990 )
2991 }
2992 None => query_count(&conn, "SELECT COUNT(DISTINCT chunk_id) FROM embeddings", []),
2993 }
2994 }
2995
2996 pub fn disk_usage(&self) -> Result<DiskUsage> {
2997 let sqlite_bytes = file_size_or_zero(&self.cache_dir.join(DB_FILE))?;
2998
2999 let mut tantivy_bytes = 0_u64;
3000 let mut usearch_bytes = 0_u64;
3001 let spaces_dir = self.cache_dir.join(SPACES_DIR);
3002 if spaces_dir.exists() {
3003 for entry in std::fs::read_dir(&spaces_dir)? {
3004 let space_dir = entry?.path();
3005 if !space_dir.is_dir() {
3006 continue;
3007 }
3008
3009 tantivy_bytes += dir_size_or_zero(&space_dir.join(TANTIVY_DIR_NAME))?;
3010 usearch_bytes += file_size_or_zero(&space_dir.join(USEARCH_FILENAME))?;
3011 }
3012 }
3013
3014 let models_bytes = dir_size_or_zero(&self.cache_dir.join("models"))?;
3015 let total_bytes = sqlite_bytes + tantivy_bytes + usearch_bytes + models_bytes;
3016
3017 Ok(DiskUsage {
3018 sqlite_bytes,
3019 tantivy_bytes,
3020 usearch_bytes,
3021 models_bytes,
3022 total_bytes,
3023 })
3024 }
3025
3026 fn unload_space(&self, name: &str) -> Result<()> {
3027 let mut spaces = self
3028 .spaces
3029 .write()
3030 .map_err(|_| CoreError::poisoned("spaces"))?;
3031 spaces.remove(name);
3032 Ok(())
3033 }
3034
3035 fn remove_space_artifacts(&self, name: &str) -> Result<()> {
3036 let space_root = self.space_root_path(name);
3037 if space_root.exists() {
3038 std::fs::remove_dir_all(space_root)?;
3039 }
3040 Ok(())
3041 }
3042
3043 fn rename_space_artifacts(&self, old: &str, new: &str) -> Result<()> {
3044 let old_root = self.space_root_path(old);
3045 let new_root = self.space_root_path(new);
3046 if !old_root.exists() {
3047 return Ok(());
3048 }
3049
3050 if new_root.exists() {
3051 return Err(CoreError::Internal(format!(
3052 "cannot rename space artifacts: destination already exists: {}",
3053 new_root.display()
3054 )));
3055 }
3056
3057 if let Some(parent) = new_root.parent() {
3058 std::fs::create_dir_all(parent)?;
3059 }
3060 std::fs::rename(old_root, new_root)?;
3061 Ok(())
3062 }
3063
3064 fn space_root_path(&self, name: &str) -> PathBuf {
3065 self.cache_dir.join(SPACES_DIR).join(name)
3066 }
3067
3068 fn space_paths(&self, name: &str) -> (PathBuf, PathBuf) {
3069 let space_root = self.space_root_path(name);
3070 let tantivy_dir = space_root.join(TANTIVY_DIR_NAME);
3071 let usearch_path = space_root.join(USEARCH_FILENAME);
3072 (tantivy_dir, usearch_path)
3073 }
3074
3075 fn get_space_indexes(&self, name: &str) -> Result<Arc<SpaceIndexes>> {
3076 self.open_space(name)?;
3077 let spaces = self
3078 .spaces
3079 .read()
3080 .map_err(|_| CoreError::poisoned("spaces"))?;
3081 spaces.get(name).cloned().ok_or_else(|| {
3082 KboltError::SpaceNotFound {
3083 name: name.to_string(),
3084 }
3085 .into()
3086 })
3087 }
3088}
3089
3090fn lookup_space_name(conn: &Connection, space_id: i64) -> Result<String> {
3091 let result = conn.query_row(
3092 "SELECT name FROM spaces WHERE id = ?1",
3093 params![space_id],
3094 |row| row.get::<_, String>(0),
3095 );
3096 match result {
3097 Ok(name) => Ok(name),
3098 Err(Error::QueryReturnedNoRows) => Err(KboltError::SpaceNotFound {
3099 name: format!("id={space_id}"),
3100 }
3101 .into()),
3102 Err(err) => Err(err.into()),
3103 }
3104}
3105
3106fn lookup_collection_name(conn: &Connection, collection_id: i64) -> Result<String> {
3107 let result = conn.query_row(
3108 "SELECT name FROM collections WHERE id = ?1",
3109 params![collection_id],
3110 |row| row.get::<_, String>(0),
3111 );
3112 match result {
3113 Ok(name) => Ok(name),
3114 Err(Error::QueryReturnedNoRows) => Err(KboltError::CollectionNotFound {
3115 name: format!("id={collection_id}"),
3116 }
3117 .into()),
3118 Err(err) => Err(err.into()),
3119 }
3120}
3121
3122fn lookup_document_id(conn: &Connection, doc_id: i64) -> Result<i64> {
3123 let result = conn.query_row(
3124 "SELECT id FROM documents WHERE id = ?1",
3125 params![doc_id],
3126 |row| row.get::<_, i64>(0),
3127 );
3128 match result {
3129 Ok(id) => Ok(id),
3130 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3131 path: format!("id={doc_id}"),
3132 }
3133 .into()),
3134 Err(err) => Err(err.into()),
3135 }
3136}
3137
3138fn load_chunks_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<ChunkRow>> {
3139 let mut stmt = conn.prepare(
3140 "SELECT id, doc_id, seq, offset, length, heading, kind, retrieval_prefix
3141 FROM chunks
3142 WHERE doc_id = ?1
3143 ORDER BY seq ASC",
3144 )?;
3145 let rows = stmt.query_map(params![doc_id], decode_chunk_row)?;
3146 let chunks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3147 Ok(chunks)
3148}
3149
3150fn load_chunk_ids_for_doc(conn: &Connection, doc_id: i64) -> Result<Vec<i64>> {
3151 let mut stmt = conn.prepare("SELECT id FROM chunks WHERE doc_id = ?1 ORDER BY seq ASC")?;
3152 let rows = stmt.query_map(params![doc_id], |row| row.get::<_, i64>(0))?;
3153 let chunk_ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3154 Ok(chunk_ids)
3155}
3156
3157fn lookup_chunk_doc_id(conn: &Connection, chunk_id: i64) -> Result<i64> {
3158 let result = conn.query_row(
3159 "SELECT doc_id FROM chunks WHERE id = ?1",
3160 params![chunk_id],
3161 |row| row.get::<_, i64>(0),
3162 );
3163 match result {
3164 Ok(doc_id) => Ok(doc_id),
3165 Err(Error::QueryReturnedNoRows) => Err(KboltError::DocumentNotFound {
3166 path: format!("chunk_id={chunk_id}"),
3167 }
3168 .into()),
3169 Err(err) => Err(err.into()),
3170 }
3171}
3172
3173fn query_count<P: rusqlite::Params>(conn: &Connection, sql: &str, params: P) -> Result<usize> {
3174 let count: i64 = conn.query_row(sql, params, |row| row.get(0))?;
3175 Ok(count as usize)
3176}
3177
3178fn file_size_or_zero(path: &Path) -> Result<u64> {
3179 if !path.exists() {
3180 return Ok(0);
3181 }
3182
3183 let metadata = std::fs::metadata(path)?;
3184 if metadata.is_file() {
3185 Ok(metadata.len())
3186 } else {
3187 Ok(0)
3188 }
3189}
3190
3191fn dir_size_or_zero(path: &Path) -> Result<u64> {
3192 if !path.exists() {
3193 return Ok(0);
3194 }
3195
3196 let mut total = 0_u64;
3197 for entry in std::fs::read_dir(path)? {
3198 let entry = entry?;
3199 let child_path = entry.path();
3200 let metadata = std::fs::symlink_metadata(&child_path)?;
3201 if metadata.is_file() {
3202 total += metadata.len();
3203 } else if metadata.is_dir() {
3204 total += dir_size_or_zero(&child_path)?;
3205 }
3206 }
3207
3208 Ok(total)
3209}
3210
3211fn open_or_create_tantivy_index(path: &Path) -> Result<Index> {
3212 let meta_path = path.join("meta.json");
3213 if meta_path.exists() {
3214 return Ok(Index::open_in_dir(path)?);
3215 }
3216
3217 Ok(Index::create_in_dir(path, tantivy_schema())?)
3218}
3219
3220fn with_tantivy_writer<T>(
3221 space_indexes: &SpaceIndexes,
3222 f: impl FnOnce(&mut IndexWriter) -> Result<T>,
3223) -> Result<T> {
3224 let mut writer = space_indexes
3225 .tantivy_writer
3226 .lock()
3227 .map_err(|_| CoreError::poisoned("tantivy writer"))?;
3228
3229 if writer.is_none() {
3230 *writer = Some(space_indexes.tantivy_index.writer(50_000_000)?);
3231 }
3232
3233 let writer = writer
3234 .as_mut()
3235 .ok_or_else(|| CoreError::Internal("failed to initialize tantivy writer".to_string()))?;
3236 f(writer)
3237}
3238
3239fn reject_incompatible_legacy_index(conn: &Connection) -> Result<()> {
3240 if !table_exists(conn, "documents")? || table_exists(conn, "document_texts")? {
3241 return Ok(());
3242 }
3243
3244 let document_count = query_count(conn, "SELECT COUNT(*) FROM documents", [])?;
3245 if document_count == 0 {
3246 return Ok(());
3247 }
3248
3249 Err(KboltError::Config(
3250 "cache index uses an older text-storage format; rebuild the kbolt cache before using this version".to_string(),
3251 )
3252 .into())
3253}
3254
3255fn ensure_schema_version(conn: &Connection) -> Result<()> {
3256 let current: i64 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;
3257 if current > SCHEMA_VERSION {
3258 return Err(KboltError::Config(format!(
3259 "cache index schema version {current} is newer than supported version {SCHEMA_VERSION}"
3260 ))
3261 .into());
3262 }
3263
3264 if current < SCHEMA_VERSION {
3265 conn.pragma_update(None, "user_version", SCHEMA_VERSION)?;
3266 }
3267
3268 Ok(())
3269}
3270
3271fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
3272 let exists = conn.query_row(
3273 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = ?1",
3274 [table],
3275 |row| row.get::<_, i64>(0),
3276 )?;
3277 Ok(exists != 0)
3278}
3279
3280fn ensure_documents_title_source_column(conn: &Connection) -> Result<()> {
3281 let mut stmt = conn.prepare("PRAGMA table_info(documents)")?;
3282 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3283 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3284 drop(stmt);
3285
3286 if columns.iter().any(|column| column == "title_source") {
3287 return Ok(());
3288 }
3289
3290 conn.execute(
3291 "ALTER TABLE documents ADD COLUMN title_source TEXT NOT NULL DEFAULT 'extracted'",
3292 [],
3293 )?;
3294 Ok(())
3295}
3296
3297fn ensure_document_texts_generation_key_column(conn: &Connection) -> Result<()> {
3298 let mut stmt = conn.prepare("PRAGMA table_info(document_texts)")?;
3299 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3300 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3301 drop(stmt);
3302
3303 if columns.iter().any(|column| column == "generation_key") {
3304 return Ok(());
3305 }
3306
3307 conn.execute(
3308 "ALTER TABLE document_texts ADD COLUMN generation_key TEXT NOT NULL DEFAULT ''",
3309 [],
3310 )?;
3311 Ok(())
3312}
3313
3314fn ensure_chunks_retrieval_prefix_column(conn: &Connection) -> Result<()> {
3315 let mut stmt = conn.prepare("PRAGMA table_info(chunks)")?;
3316 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3317 let columns = rows.collect::<std::result::Result<Vec<_>, _>>()?;
3318 drop(stmt);
3319
3320 if columns.iter().any(|column| column == "retrieval_prefix") {
3321 return Ok(());
3322 }
3323
3324 conn.execute("ALTER TABLE chunks ADD COLUMN retrieval_prefix TEXT", [])?;
3325 Ok(())
3326}
3327
3328fn build_literal_bm25_query(
3329 index: &Index,
3330 fields: &[Bm25FieldSpec],
3331 query: &str,
3332) -> Result<Option<Box<dyn Query>>> {
3333 let mut clauses = Vec::new();
3334 for field in fields {
3335 for token in analyzed_terms_for_field(index, field.field, query)? {
3336 let term_query: Box<dyn Query> = Box::new(TermQuery::new(
3337 Term::from_field_text(field.field, &token),
3338 field.index_record_option,
3339 ));
3340 let query = if (field.boost - 1.0).abs() > f32::EPSILON {
3341 Box::new(BoostQuery::new(term_query, field.boost)) as Box<dyn Query>
3342 } else {
3343 term_query
3344 };
3345 clauses.push((Occur::Should, query));
3346 }
3347 }
3348
3349 if clauses.is_empty() {
3350 Ok(None)
3351 } else {
3352 Ok(Some(Box::new(BooleanQuery::new(clauses))))
3353 }
3354}
3355
3356fn build_doc_id_filter_query(field: Field, document_ids: &[i64]) -> Result<Box<dyn Query>> {
3357 let mut terms = Vec::new();
3358 let mut seen = HashSet::new();
3359 for doc_id in document_ids {
3360 let doc_id = u64::try_from(*doc_id).map_err(|_| {
3361 CoreError::Internal(format!(
3362 "doc_id must be non-negative for tantivy query: {doc_id}"
3363 ))
3364 })?;
3365 if seen.insert(doc_id) {
3366 terms.push(Term::from_field_u64(field, doc_id));
3367 }
3368 }
3369
3370 Ok(Box::new(ConstScoreQuery::new(
3371 Box::new(TermSetQuery::new(terms)),
3372 0.0,
3373 )))
3374}
3375
3376fn analyzed_terms_for_field(index: &Index, field: Field, query: &str) -> Result<Vec<String>> {
3377 let mut analyzer = index.tokenizer_for_field(field)?;
3378 let mut stream = analyzer.token_stream(query);
3379 let mut terms = Vec::new();
3380 let mut seen = HashSet::new();
3381 while let Some(token) = stream.next() {
3382 if token.text.is_empty() {
3383 continue;
3384 }
3385 let text = token.text.clone();
3386 if seen.insert(text.clone()) {
3387 terms.push(text);
3388 }
3389 }
3390 Ok(terms)
3391}
3392
3393fn new_usearch_index(dimensions: usize) -> Result<usearch::Index> {
3394 let options = IndexOptions {
3395 dimensions,
3396 metric: MetricKind::Cos,
3397 quantization: ScalarKind::F32,
3398 connectivity: 16,
3399 expansion_add: 200,
3400 expansion_search: 100,
3401 ..IndexOptions::default()
3402 };
3403 usearch::Index::new(&options)
3404 .map_err(|err| CoreError::Internal(format!("usearch init failed: {err}")))
3405}
3406
3407fn open_or_create_usearch_index(path: &Path) -> Result<usearch::Index> {
3408 let index = new_usearch_index(256)?;
3409 let file_size = std::fs::metadata(path).map(|meta| meta.len()).unwrap_or(0);
3410 if file_size > 0 {
3411 let path = path
3412 .to_str()
3413 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3414 index
3415 .load(path)
3416 .map_err(|err| CoreError::Internal(format!("usearch load failed: {err}")))?;
3417 }
3418 Ok(index)
3419}
3420
3421fn ensure_usearch_dimensions(index: &mut usearch::Index, expected_dimensions: usize) -> Result<()> {
3422 if index.size() == 0 && index.dimensions() != expected_dimensions {
3423 *index = new_usearch_index(expected_dimensions)?;
3424 return Ok(());
3425 }
3426
3427 if index.dimensions() != expected_dimensions {
3428 return Err(CoreError::Internal(format!(
3429 "usearch vector dimension mismatch: index expects {}, got {}",
3430 index.dimensions(),
3431 expected_dimensions
3432 )));
3433 }
3434 Ok(())
3435}
3436
3437fn save_usearch_index(index: &usearch::Index, path: &Path) -> Result<()> {
3438 if index.size() == 0 {
3439 std::fs::File::create(path)?;
3440 return Ok(());
3441 }
3442
3443 let path = path
3444 .to_str()
3445 .ok_or_else(|| CoreError::Internal("invalid usearch path encoding".to_string()))?;
3446 index
3447 .save(path)
3448 .map_err(|err| CoreError::Internal(format!("usearch save failed: {err}")))?;
3449 Ok(())
3450}
3451
3452fn tantivy_schema() -> tantivy::schema::Schema {
3453 let mut builder = tantivy::schema::Schema::builder();
3454 builder.add_u64_field("chunk_id", INDEXED | STORED | FAST);
3455 builder.add_u64_field("doc_id", INDEXED | STORED | FAST);
3456 builder.add_text_field("filepath", TEXT | STORED);
3457 builder.add_text_field("title", TEXT | STORED);
3458 builder.add_text_field("heading", TEXT | STORED);
3459 builder.add_text_field("body", TEXT);
3460 builder.build()
3461}
3462
3463fn tantivy_fields_from_schema(schema: &tantivy::schema::Schema) -> Result<TantivyFields> {
3464 Ok(TantivyFields {
3465 chunk_id: schema.get_field("chunk_id").map_err(|_| {
3466 CoreError::Internal("tantivy schema missing field: chunk_id".to_string())
3467 })?,
3468 doc_id: schema
3469 .get_field("doc_id")
3470 .map_err(|_| CoreError::Internal("tantivy schema missing field: doc_id".to_string()))?,
3471 filepath: schema.get_field("filepath").map_err(|_| {
3472 CoreError::Internal("tantivy schema missing field: filepath".to_string())
3473 })?,
3474 title: schema
3475 .get_field("title")
3476 .map_err(|_| CoreError::Internal("tantivy schema missing field: title".to_string()))?,
3477 heading: schema.get_field("heading").map_err(|_| {
3478 CoreError::Internal("tantivy schema missing field: heading".to_string())
3479 })?,
3480 body: schema
3481 .get_field("body")
3482 .map_err(|_| CoreError::Internal("tantivy schema missing field: body".to_string()))?,
3483 })
3484}
3485
3486fn resolve_tantivy_field(fields: TantivyFields, name: &str) -> Result<Field> {
3487 match name {
3488 "chunk_id" => Ok(fields.chunk_id),
3489 "doc_id" => Ok(fields.doc_id),
3490 "filepath" => Ok(fields.filepath),
3491 "title" => Ok(fields.title),
3492 "heading" => Ok(fields.heading),
3493 "body" => Ok(fields.body),
3494 other => Err(CoreError::Internal(format!(
3495 "unsupported tantivy field: {other}"
3496 ))),
3497 }
3498}
3499
3500fn serialize_extensions(extensions: Option<&[String]>) -> Result<Option<String>> {
3501 match extensions {
3502 None => Ok(None),
3503 Some(values) => serde_json::to_string(values).map(Some).map_err(Into::into),
3504 }
3505}
3506
3507fn deserialize_extensions(raw: Option<String>) -> Result<Option<Vec<String>>> {
3508 match raw {
3509 None => Ok(None),
3510 Some(json) => serde_json::from_str::<Vec<String>>(&json)
3511 .map(Some)
3512 .map_err(Into::into),
3513 }
3514}
3515
3516fn decode_space_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SpaceRow> {
3517 Ok(SpaceRow {
3518 id: row.get(0)?,
3519 name: row.get(1)?,
3520 description: row.get(2)?,
3521 created: row.get(3)?,
3522 })
3523}
3524
3525fn decode_collection_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CollectionRow> {
3526 let raw_extensions: Option<String> = row.get(5)?;
3527 let extensions = deserialize_extensions(raw_extensions).map_err(|err| {
3528 Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
3529 })?;
3530 Ok(CollectionRow {
3531 id: row.get(0)?,
3532 space_id: row.get(1)?,
3533 name: row.get(2)?,
3534 path: PathBuf::from(row.get::<_, String>(3)?),
3535 description: row.get(4)?,
3536 extensions,
3537 created: row.get(6)?,
3538 updated: row.get(7)?,
3539 })
3540}
3541
3542fn decode_document_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentRow> {
3543 decode_document_row_at(row, 0)
3544}
3545
3546fn decode_document_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<DocumentRow> {
3547 let raw_title_source: String = row.get(base + 4)?;
3548 let title_source = DocumentTitleSource::from_sql(&raw_title_source).map_err(|err| {
3549 Error::FromSqlConversionFailure(base + 4, rusqlite::types::Type::Text, Box::new(err))
3550 })?;
3551 let active_value: i64 = row.get(base + 7)?;
3552 let fts_dirty_value: i64 = row.get(base + 9)?;
3553 Ok(DocumentRow {
3554 id: row.get(base)?,
3555 collection_id: row.get(base + 1)?,
3556 path: row.get(base + 2)?,
3557 title: row.get(base + 3)?,
3558 title_source,
3559 hash: row.get(base + 5)?,
3560 modified: row.get(base + 6)?,
3561 active: active_value != 0,
3562 deactivated_at: row.get(base + 8)?,
3563 fts_dirty: fts_dirty_value != 0,
3564 })
3565}
3566
3567fn decode_document_text_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DocumentTextRow> {
3568 Ok(DocumentTextRow {
3569 doc_id: row.get(0)?,
3570 extractor_key: row.get(1)?,
3571 source_hash: row.get(2)?,
3572 text_hash: row.get(3)?,
3573 generation_key: row.get(4)?,
3574 text: row.get(5)?,
3575 created: row.get(6)?,
3576 })
3577}
3578
3579fn decode_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ChunkRow> {
3580 decode_chunk_row_at(row, 0)
3581}
3582
3583fn decode_chunk_row_at(row: &rusqlite::Row<'_>, base: usize) -> rusqlite::Result<ChunkRow> {
3584 let offset_value: i64 = row.get(base + 3)?;
3585 let length_value: i64 = row.get(base + 4)?;
3586 let kind_raw: String = row.get(base + 6)?;
3587 let kind = FinalChunkKind::try_from(kind_raw.as_str()).map_err(|err| {
3588 Error::FromSqlConversionFailure(base + 6, rusqlite::types::Type::Text, Box::new(err))
3589 })?;
3590 Ok(ChunkRow {
3591 id: row.get(base)?,
3592 doc_id: row.get(base + 1)?,
3593 seq: row.get(base + 2)?,
3594 offset: decode_non_negative_usize(offset_value, base + 3, "chunks.offset")?,
3595 length: decode_non_negative_usize(length_value, base + 4, "chunks.length")?,
3596 heading: row.get(base + 5)?,
3597 kind,
3598 retrieval_prefix: row.get(base + 7)?,
3599 })
3600}
3601
3602fn decode_non_negative_usize(
3603 value: i64,
3604 column: usize,
3605 name: &'static str,
3606) -> rusqlite::Result<usize> {
3607 if value < 0 {
3608 return Err(Error::FromSqlConversionFailure(
3609 column,
3610 SqlType::Integer,
3611 Box::new(KboltError::Internal(format!("{name} must not be negative"))),
3612 ));
3613 }
3614
3615 Ok(value as usize)
3616}
3617
3618fn canonical_chunk_slice_from_bytes(
3619 chunk_id: i64,
3620 offset_value: i64,
3621 length_value: i64,
3622 document_len_value: i64,
3623 start_byte: Vec<u8>,
3624 end_byte: Vec<u8>,
3625 bytes: Vec<u8>,
3626) -> Result<String> {
3627 if offset_value < 0 {
3628 return Err(CoreError::Internal(format!(
3629 "chunk {chunk_id} offset must not be negative"
3630 )));
3631 }
3632 if length_value < 0 {
3633 return Err(CoreError::Internal(format!(
3634 "chunk {chunk_id} length must not be negative"
3635 )));
3636 }
3637 if document_len_value < 0 {
3638 return Err(CoreError::Internal(format!(
3639 "document text length for chunk {chunk_id} must not be negative"
3640 )));
3641 }
3642
3643 let offset = offset_value as usize;
3644 let length = length_value as usize;
3645 let document_len = document_len_value as usize;
3646 let end = offset.checked_add(length).ok_or_else(|| {
3647 CoreError::Internal(format!("chunk {chunk_id} text span overflows usize"))
3648 })?;
3649 if end > document_len {
3650 return Err(CoreError::Internal(format!(
3651 "chunk {chunk_id} text span {offset}..{end} exceeds document text length {document_len}"
3652 )));
3653 }
3654 if bytes.len() != length {
3655 return Err(CoreError::Internal(format!(
3656 "canonical text slice for chunk {chunk_id} returned {} bytes, expected {length}",
3657 bytes.len()
3658 )));
3659 }
3660 validate_sqlite_utf8_boundary(chunk_id, offset, document_len, &start_byte)?;
3661 validate_sqlite_utf8_boundary(chunk_id, end, document_len, &end_byte)?;
3662
3663 String::from_utf8(bytes).map_err(|err| {
3664 CoreError::Internal(format!(
3665 "stored canonical text slice for chunk {chunk_id} is invalid UTF-8: {err}"
3666 ))
3667 })
3668}
3669
3670fn validate_sqlite_utf8_boundary(
3671 chunk_id: i64,
3672 index: usize,
3673 document_len: usize,
3674 byte_at_index: &[u8],
3675) -> Result<()> {
3676 if index == 0 || index == document_len {
3677 return Ok(());
3678 }
3679 if byte_at_index.len() != 1 {
3680 return Err(CoreError::Internal(format!(
3681 "chunk {chunk_id} text boundary byte at {index} is missing"
3682 )));
3683 }
3684 if (0x80..0xC0).contains(&byte_at_index[0]) {
3685 return Err(CoreError::Internal(format!(
3686 "chunk {chunk_id} text span is not on UTF-8 boundaries"
3687 )));
3688 }
3689
3690 Ok(())
3691}
3692
3693pub(crate) fn chunk_text_from_canonical(document_text: &str, chunk: &ChunkRow) -> Result<String> {
3694 let label = format!("chunk {}", chunk.id);
3695 let end = validate_text_span(document_text, chunk.offset, chunk.length, &label)?;
3696
3697 Ok(document_text[chunk.offset..end].to_string())
3698}
3699
3700fn validate_text_span(
3701 document_text: &str,
3702 offset: usize,
3703 length: usize,
3704 label: &str,
3705) -> Result<usize> {
3706 let end = offset
3707 .checked_add(length)
3708 .ok_or_else(|| CoreError::Internal(format!("{label} text span overflows usize")))?;
3709 if end > document_text.len() {
3710 return Err(CoreError::Internal(format!(
3711 "{label} text span {}..{} exceeds document text length {}",
3712 offset,
3713 end,
3714 document_text.len()
3715 )));
3716 }
3717 if !document_text.is_char_boundary(offset) || !document_text.is_char_boundary(end) {
3718 return Err(CoreError::Internal(format!(
3719 "{label} text span {offset}..{end} is not on UTF-8 boundaries"
3720 )));
3721 }
3722
3723 Ok(end)
3724}
3725
3726pub(crate) fn missing_document_text_error(doc_id: i64) -> CoreError {
3727 KboltError::Internal(format!(
3728 "document {doc_id} is missing persisted canonical text; rebuild the kbolt cache"
3729 ))
3730 .into()
3731}
3732
3733#[cfg(test)]
3734mod tests;