1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::canonical::build_canonical_document;
10use crate::ingest::chunk::{
11 chunk_canonical_document, chunk_canonical_document_with_counter, resolve_policy,
12};
13use crate::ingest::extract::default_registry;
14use crate::lock::{LockMode, OperationLock};
15use crate::models;
16use crate::storage::Storage;
17use crate::storage::{
18 ChunkInsert, ChunkRow, CollectionRow, DocumentGenerationReplace, DocumentRow, DocumentTextRow,
19 DocumentTitleSource, SpaceResolution, TantivyEntry,
20};
21use crate::Result;
22use kbolt_types::{
23 ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
24 CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
25 InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
26 OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
27 SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
28 SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
29};
30mod eval_ops;
31mod file_utils;
32pub(crate) mod ignore_helpers;
33mod ignore_ops;
34mod path_utils;
35mod schedule_ops;
36mod schedule_run_ops;
37mod schedule_status_ops;
38mod scoring;
39mod search_ops;
40mod text_helpers;
41mod update_ops;
42use file_utils::{file_error, file_title, modified_token, sha256_hex};
43use ignore_helpers::{
44 collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
45 load_collection_ignore_matcher, validate_ignore_pattern,
46};
47use path_utils::{
48 collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
49 normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
50};
51use scoring::{dense_distance_to_score, max_option};
52pub(crate) use text_helpers::retrieval_text_with_prefix;
53use text_helpers::search_text_with_loaded_canonical_neighbors;
54
55pub struct Engine {
56 storage: Storage,
57 config: Config,
58 embedder: Option<Arc<dyn models::Embedder>>,
59 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
60 reranker: Option<Arc<dyn models::Reranker>>,
61 expander: Option<Arc<dyn models::Expander>>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct IgnoreListEntry {
66 pub space: String,
67 pub collection: String,
68 pub pattern_count: usize,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct UpdateTarget {
73 pub space: String,
74 pub collection: CollectionRow,
75}
76
77#[derive(Debug, Clone, Copy)]
78struct TargetScope<'a> {
79 space: Option<&'a str>,
80 collections: &'a [String],
81}
82
83#[derive(Debug, Clone)]
84struct SearchCollectionMeta {
85 space: String,
86 collection: String,
87}
88
89#[derive(Debug)]
90struct SearchTargetScope {
91 space: String,
92 collection_ids: Vec<i64>,
93 filtered: bool,
94 document_ids: Vec<i64>,
95 chunk_count: usize,
96 chunk_key_filter: Mutex<Option<Arc<HashSet<u64>>>>,
97 bm25_reloaded: Mutex<bool>,
98}
99
100#[derive(Debug, Clone)]
101struct SearchHitCandidate {
102 chunk_id: i64,
103 bm25_score: f32,
104}
105
106#[derive(Debug, Clone)]
107struct RankedChunk {
108 chunk_id: i64,
109 score: f32,
110 fusion: f32,
111 reranker: Option<f32>,
112 bm25: Option<f32>,
113 dense: Option<f32>,
114 original_rank: Option<usize>,
115}
116
117impl Engine {
118 pub fn new(config_path: Option<&Path>) -> Result<Self> {
119 Self::new_with_recovery_notice(config_path, None)
120 }
121
122 pub fn new_with_recovery_notice(
123 config_path: Option<&Path>,
124 recovery_notice: Option<crate::RecoveryNoticeSink>,
125 ) -> Result<Self> {
126 let config = config::load(config_path)?;
127 let storage = Storage::new(&config.cache_dir)?;
128 let built_models =
129 models::build_inference_clients_with_recovery_notice(&config, recovery_notice)?;
130 Ok(Self {
131 storage,
132 config,
133 embedder: built_models.embedder,
134 embedding_document_sizer: built_models.embedding_document_sizer,
135 reranker: built_models.reranker,
136 expander: built_models.expander,
137 })
138 }
139
140 #[cfg(test)]
141 pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
142 Self::from_parts_with_models(storage, config, None, None, None)
143 }
144
145 #[cfg(test)]
146 pub(crate) fn from_parts_with_embedder(
147 storage: Storage,
148 config: Config,
149 embedder: Option<Arc<dyn models::Embedder>>,
150 ) -> Self {
151 Self::from_parts_with_inference(storage, config, embedder, None, None, None)
152 }
153
154 #[cfg(test)]
155 pub(crate) fn from_parts_with_embedding_runtime(
156 storage: Storage,
157 config: Config,
158 embedder: Option<Arc<dyn models::Embedder>>,
159 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
160 ) -> Self {
161 Self::from_parts_with_inference(
162 storage,
163 config,
164 embedder,
165 embedding_document_sizer,
166 None,
167 None,
168 )
169 }
170
171 #[cfg(test)]
172 pub(crate) fn from_parts_with_models(
173 storage: Storage,
174 config: Config,
175 embedder: Option<Arc<dyn models::Embedder>>,
176 reranker: Option<Arc<dyn models::Reranker>>,
177 expander: Option<Arc<dyn models::Expander>>,
178 ) -> Self {
179 Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
180 }
181
182 #[cfg(test)]
183 fn from_parts_with_inference(
184 storage: Storage,
185 config: Config,
186 embedder: Option<Arc<dyn models::Embedder>>,
187 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
188 reranker: Option<Arc<dyn models::Reranker>>,
189 expander: Option<Arc<dyn models::Expander>>,
190 ) -> Self {
191 let built_models =
192 models::build_inference_clients(&config).expect("build inference models");
193 Self {
194 storage,
195 config,
196 embedder: embedder.or(built_models.embedder),
197 embedding_document_sizer: embedding_document_sizer
198 .or(built_models.embedding_document_sizer),
199 reranker: reranker.or(built_models.reranker),
200 expander: expander.or(built_models.expander),
201 }
202 }
203
204 pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
205 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
206 self.storage.create_space(name, description)?;
207 let space = self.storage.get_space(name)?;
208 self.build_space_info(&space)
209 }
210
211 pub fn remove_space(&mut self, name: &str) -> Result<()> {
212 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
213 self.storage.delete_space(name)?;
214 if self.config.default_space.as_deref() == Some(name) {
215 self.persist_default_space(None)?;
216 }
217 Ok(())
218 }
219
220 pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
221 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
222 self.storage.rename_space(old, new)?;
223 if self.config.default_space.as_deref() == Some(old) {
224 self.persist_default_space(Some(new.to_string()))?;
225 }
226 Ok(())
227 }
228
229 pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
230 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
231 self.storage.update_space_description(name, description)
232 }
233
234 pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
235 let spaces = self.storage.list_spaces()?;
236 let mut infos = Vec::with_capacity(spaces.len());
237 for space in spaces {
238 infos.push(self.build_space_info(&space)?);
239 }
240 Ok(infos)
241 }
242
243 pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
244 let space = self.storage.get_space(name)?;
245 self.build_space_info(&space)
246 }
247
248 pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
249 if let Some(space_name) = name {
250 self.storage.get_space(space_name)?;
251 }
252
253 self.persist_default_space(name.map(ToString::to_string))?;
254 Ok(self.config.default_space.clone())
255 }
256
257 fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
258 let previous = self.config.default_space.clone();
259 self.config.default_space = default_space;
260 if let Err(err) = config::save(&self.config) {
261 self.config.default_space = previous;
262 return Err(err);
263 }
264 Ok(())
265 }
266
267 pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
268 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
269 let AddCollectionRequest {
270 path,
271 space: requested_space,
272 name: requested_name,
273 description,
274 extensions,
275 no_index,
276 } = req;
277
278 let space = match requested_space.as_deref() {
279 Some(space_name) => match self.storage.get_space(space_name) {
280 Ok(space) => space,
281 Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
282 self.storage.create_space(space_name, None)?;
283 self.storage.get_space(space_name)?
284 }
285 Err(err) => return Err(err),
286 },
287 None => self.resolve_space_row(None, None)?,
288 };
289 if !path.is_absolute() || !path.is_dir() {
290 return Err(KboltError::InvalidPath(path).into());
291 }
292
293 let name = match requested_name {
294 Some(name) => name,
295 None => path
296 .file_name()
297 .and_then(|name| name.to_str())
298 .map(ToString::to_string)
299 .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
300 };
301
302 self.storage.create_collection(
303 space.id,
304 &name,
305 &path,
306 description.as_deref(),
307 extensions.as_deref(),
308 )?;
309
310 let initial_indexing = if no_index {
311 InitialIndexingOutcome::Skipped
312 } else {
313 match self.update_unlocked(UpdateOptions {
314 space: Some(space.name.clone()),
315 collections: vec![name.clone()],
316 no_embed: false,
317 dry_run: false,
318 verbose: false,
319 }) {
320 Ok(report) => InitialIndexingOutcome::Indexed(report),
321 Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
322 InitialIndexingOutcome::Blocked(
323 InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
324 )
325 }
326 Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
327 InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
328 name,
329 })
330 }
331 Err(err) => return Err(err),
332 }
333 };
334
335 let collection = self.storage.get_collection(space.id, &name)?;
336 Ok(AddCollectionResult {
337 collection: self.build_collection_info(&space.name, &collection)?,
338 initial_indexing,
339 })
340 }
341
342 pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
343 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
344 let resolved = self.resolve_space_row(space, Some(name))?;
345 let collection = self.storage.get_collection(resolved.id, name)?;
346 let documents = self.storage.list_documents(collection.id, false)?;
347 let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
348 let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
349 self.purge_space_chunks(&resolved.name, &chunk_ids)?;
350 self.storage.delete_collection(resolved.id, name)?;
351
352 let ignore_path =
353 collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
354 if ignore_path.is_file() {
355 std::fs::remove_file(ignore_path)?;
356 }
357
358 Ok(())
359 }
360
361 pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
362 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
363 let resolved = self.resolve_space_row(space, Some(old))?;
364 let old_ignore_path =
365 collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
366 let new_ignore_path =
367 collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
368 if old_ignore_path.is_file() && new_ignore_path.exists() {
369 return Err(KboltError::Internal(format!(
370 "cannot rename ignore file: destination already exists: {}",
371 new_ignore_path.display()
372 ))
373 .into());
374 }
375
376 self.storage.rename_collection(resolved.id, old, new)?;
377
378 if old_ignore_path.is_file() {
379 if let Some(parent) = new_ignore_path.parent() {
380 std::fs::create_dir_all(parent)?;
381 }
382 if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
383 match self.storage.rename_collection(resolved.id, new, old) {
384 Ok(()) => {
385 return Err(KboltError::Internal(format!(
386 "renamed collection was rolled back after ignore rename failure: {rename_err}"
387 ))
388 .into())
389 }
390 Err(rollback_err) => {
391 return Err(KboltError::Internal(format!(
392 "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
393 ))
394 .into())
395 }
396 }
397 }
398 }
399
400 Ok(())
401 }
402
403 pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
404 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
405 let resolved = self.resolve_space_row(space, Some(name))?;
406 self.storage
407 .update_collection_description(resolved.id, name, desc)
408 }
409
410 pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
411 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
412 let resolved = self.resolve_space_row(Some(space_name), None)?;
413 let mut map = std::collections::HashMap::new();
414 map.insert(resolved.id, resolved.name.clone());
415 (Some(resolved.id), map)
416 } else {
417 let spaces = self.storage.list_spaces()?;
418 let map = spaces
419 .into_iter()
420 .map(|space| (space.id, space.name))
421 .collect::<std::collections::HashMap<_, _>>();
422 (None, map)
423 };
424
425 let collections = self.storage.list_collections(space_id_filter)?;
426 let mut infos = Vec::with_capacity(collections.len());
427 for collection in collections {
428 let space_name = spaces_by_id
429 .get(&collection.space_id)
430 .ok_or_else(|| {
431 KboltError::Internal(format!(
432 "missing space mapping for collection '{}'",
433 collection.name
434 ))
435 })?
436 .clone();
437 infos.push(self.build_collection_info(&space_name, &collection)?);
438 }
439 Ok(infos)
440 }
441
442 pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
443 let resolved = self.resolve_space_row(space, Some(name))?;
444 let collection = self.storage.get_collection(resolved.id, name)?;
445 self.build_collection_info(&resolved.name, &collection)
446 }
447
448 pub fn list_files(
449 &self,
450 space: Option<&str>,
451 collection: &str,
452 prefix: Option<&str>,
453 ) -> Result<Vec<FileEntry>> {
454 let resolved_space = self.resolve_space_row(space, Some(collection))?;
455 let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
456 let normalized_prefix = normalize_list_prefix(prefix)?;
457 let file_rows = self
458 .storage
459 .list_collection_file_rows(collection_row.id, false)?;
460
461 let mut files = Vec::with_capacity(file_rows.len());
462 for file_row in file_rows {
463 if let Some(prefix) = normalized_prefix.as_deref() {
464 if !path_matches_prefix(&file_row.path, prefix) {
465 continue;
466 }
467 }
468
469 files.push(FileEntry {
470 path: file_row.path,
471 title: file_row.title,
472 docid: short_docid(&file_row.hash),
473 active: file_row.active,
474 chunk_count: file_row.chunk_count,
475 embedded: file_row.chunk_count > 0
476 && file_row.embedded_chunk_count >= file_row.chunk_count,
477 });
478 }
479
480 Ok(files)
481 }
482
483 pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
484 self.get_document_unlocked(req)
485 }
486
487 fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
488 let GetRequest {
489 locator,
490 space,
491 offset,
492 limit,
493 } = req;
494
495 let (document, collection_row, space_name) = match locator {
496 Locator::Path(locator_path) => {
497 let (collection_name, relative_path) = split_collection_path(&locator_path)?;
498 let resolved_space =
499 self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
500 let collection = self
501 .storage
502 .get_collection(resolved_space.id, &collection_name)?;
503 let document = self
504 .storage
505 .get_document_by_path(collection.id, &relative_path)?
506 .ok_or_else(|| KboltError::DocumentNotFound {
507 path: locator_path.clone(),
508 })?;
509 (document, collection, resolved_space.name)
510 }
511 Locator::DocId(docid) => {
512 let prefix = normalize_docid(&docid)?;
513 let mut candidates = self
514 .storage
515 .get_document_by_hash_prefix(&prefix)?
516 .into_iter()
517 .map(|document| {
518 let collection =
519 self.storage.get_collection_by_id(document.collection_id)?;
520 Ok((document, collection))
521 })
522 .collect::<Result<Vec<_>>>()?;
523
524 if let Some(space_name) = space.as_deref() {
525 let resolved_space = self.resolve_space_row(Some(space_name), None)?;
526 candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
527 }
528
529 if candidates.is_empty() {
530 return Err(KboltError::DocumentNotFound {
531 path: format!("#{prefix}"),
532 }
533 .into());
534 }
535
536 if candidates.len() > 1 {
537 return Err(KboltError::InvalidInput(
538 "docid is ambiguous; provide more characters".to_string(),
539 )
540 .into());
541 }
542
543 let (document, collection) = candidates.pop().expect("candidate exists");
544 let space = self.storage.get_space_by_id(collection.space_id)?;
545 (document, collection, space.name)
546 }
547 };
548
549 let document_text = self.storage.get_document_text(document.id)?;
550 let full_path = collection_row.path.join(&document.path);
551 let stale = source_is_stale(&full_path, document.hash.as_str())?;
552
553 let lines = document_text.text.lines().collect::<Vec<_>>();
554 let total_lines = lines.len();
555 let start = offset.unwrap_or(0).min(total_lines);
556 let requested = limit.unwrap_or(total_lines.saturating_sub(start));
557 let end = start.saturating_add(requested).min(total_lines);
558 let returned_lines = end.saturating_sub(start);
559 let content = if returned_lines == 0 {
560 String::new()
561 } else {
562 lines[start..end].join("\n")
563 };
564
565 Ok(DocumentResponse {
566 docid: short_docid(&document.hash),
567 path: format!("{}/{}", collection_row.name, document.path),
568 title: document.title,
569 space: space_name,
570 collection: collection_row.name,
571 content,
572 stale,
573 total_lines,
574 returned_lines,
575 })
576 }
577
578 pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
579 if req.max_files == 0 {
580 return Err(
581 KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
582 );
583 }
584 if req.max_bytes == 0 {
585 return Err(
586 KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
587 );
588 }
589
590 let mut documents = Vec::new();
591 let mut omitted = Vec::new();
592 let mut resolved_count = 0usize;
593 let mut warnings = Vec::new();
594 let mut consumed_bytes = 0usize;
595
596 for locator in req.locators {
597 let document = match self.get_document_unlocked(GetRequest {
598 locator,
599 space: req.space.clone(),
600 offset: None,
601 limit: None,
602 }) {
603 Ok(document) => document,
604 Err(err) => match KboltError::from(err) {
605 KboltError::DocumentNotFound { path } => {
606 warnings.push(format!("document not found: {path}"));
607 continue;
608 }
609 KboltError::InvalidInput(message) => {
610 warnings.push(format!("invalid locator: {message}"));
611 continue;
612 }
613 KboltError::InvalidPath(path) => {
614 warnings.push(format!("invalid locator path: {}", path.display()));
615 continue;
616 }
617 KboltError::AmbiguousSpace { collection, spaces } => {
618 warnings.push(format!(
619 "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
620 spaces
621 ));
622 continue;
623 }
624 KboltError::SpaceNotFound { name } => {
625 warnings.push(format!("space not found for locator: {name}"));
626 continue;
627 }
628 KboltError::CollectionNotFound { name } => {
629 warnings.push(format!("collection not found for locator: {name}"));
630 continue;
631 }
632 other => return Err(other.into()),
633 },
634 };
635 resolved_count = resolved_count.saturating_add(1);
636
637 let size_bytes = document.content.len();
638 if documents.len() >= req.max_files {
639 omitted.push(OmittedFile {
640 path: document.path,
641 docid: document.docid,
642 size_bytes,
643 reason: OmitReason::MaxFiles,
644 });
645 continue;
646 }
647
648 if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
649 omitted.push(OmittedFile {
650 path: document.path,
651 docid: document.docid,
652 size_bytes,
653 reason: OmitReason::MaxBytes,
654 });
655 continue;
656 }
657
658 consumed_bytes = consumed_bytes.saturating_add(size_bytes);
659 documents.push(document);
660 }
661
662 Ok(MultiGetResponse {
663 resolved_count,
664 documents,
665 omitted,
666 warnings,
667 })
668 }
669
670 pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
671 let _profile = crate::profile::SearchProfileGuard::start();
672 let _lock = crate::profile::timed_search_stage("operation_lock", || {
673 self.acquire_operation_lock(LockMode::Shared)
674 })?;
675 let started = Instant::now();
676 let query = req.query.trim();
677 if query.is_empty() {
678 return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
679 }
680
681 let requested_mode = req.mode.clone();
682 let rerank_enabled =
683 matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
684 let mut pipeline = self.initial_search_pipeline(&requested_mode);
685
686 let targets = crate::profile::timed_search_stage("resolve_targets", || {
687 self.resolve_targets(TargetScope {
688 space: req.space.as_deref(),
689 collections: &req.collections,
690 })
691 })?;
692 crate::profile::increment_search_count("target_collections", targets.len() as u64);
693
694 let staleness_hint = targets
695 .iter()
696 .map(|target| target.collection.updated.clone())
697 .max()
698 .map(|updated| format!("Index last updated: {updated}"));
699
700 if req.limit == 0 || targets.is_empty() {
701 return Ok(SearchResponse {
702 results: Vec::new(),
703 query: req.query,
704 requested_mode: requested_mode.clone(),
705 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
706 pipeline,
707 staleness_hint,
708 elapsed_ms: started.elapsed().as_millis() as u64,
709 });
710 }
711
712 let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
713 for target in &targets {
714 collections_by_id.insert(
715 target.collection.id,
716 SearchCollectionMeta {
717 space: target.space.clone(),
718 collection: target.collection.name.clone(),
719 },
720 );
721 }
722
723 let target_scopes = crate::profile::timed_search_stage("search_target_scopes", || {
724 self.search_target_scopes(&targets, !req.collections.is_empty())
725 })?;
726 crate::profile::increment_search_count("target_scopes", target_scopes.len() as u64);
727 for scope in &target_scopes {
728 crate::profile::increment_search_count("scope_chunks", scope.chunk_count as u64);
729 if scope.filtered {
730 crate::profile::increment_search_count("filtered_scopes", 1);
731 crate::profile::increment_search_count(
732 "scope_document_filters",
733 scope.document_ids.len() as u64,
734 );
735 }
736 }
737 let max_candidates = crate::profile::timed_search_stage("max_search_candidates", || {
738 self.max_search_candidates(&target_scopes)
739 });
740 let mut retrieval_limit =
741 self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
742 let results = loop {
743 crate::profile::increment_search_count("retrieval_iterations", 1);
744 let ranked_chunks = crate::profile::timed_search_stage("rank_chunks", || {
745 self.rank_chunks_for_mode(
746 &requested_mode,
747 &target_scopes,
748 query,
749 retrieval_limit,
750 req.min_score,
751 &mut pipeline,
752 )
753 })?;
754 crate::profile::increment_search_count("ranked_chunks", ranked_chunks.len() as u64);
755
756 if ranked_chunks.is_empty() {
757 return Ok(SearchResponse {
758 results: Vec::new(),
759 query: req.query,
760 requested_mode: requested_mode.clone(),
761 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
762 pipeline,
763 staleness_hint,
764 elapsed_ms: started.elapsed().as_millis() as u64,
765 });
766 }
767
768 let ranked_len = ranked_chunks.len();
769 let results = crate::profile::timed_search_stage("assemble_results", || {
770 self.assemble_search_results(
771 query,
772 &requested_mode,
773 ranked_chunks,
774 &collections_by_id,
775 req.debug,
776 rerank_enabled,
777 &mut pipeline,
778 req.limit,
779 )
780 })?;
781 crate::profile::increment_search_count("assembled_results", results.len() as u64);
782
783 if results.len() >= req.limit
784 || ranked_len < retrieval_limit
785 || retrieval_limit >= max_candidates
786 {
787 break results;
788 }
789
790 let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
791 if next_limit <= retrieval_limit {
792 break results;
793 }
794 retrieval_limit = next_limit;
795 };
796
797 Ok(SearchResponse {
798 results,
799 query: req.query,
800 requested_mode: requested_mode.clone(),
801 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
802 pipeline,
803 staleness_hint,
804 elapsed_ms: started.elapsed().as_millis() as u64,
805 })
806 }
807
808 pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
809 let resolved = self.resolve_space_row(explicit, None)?;
810 Ok(resolved.name)
811 }
812
813 pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
814 let resolved = self.resolve_preferred_space(explicit)?;
815 Ok(resolved.map(|(space, source)| ActiveSpace {
816 name: space.name,
817 source,
818 }))
819 }
820
821 pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
822 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
823 let (spaces, totals_scope) = if let Some(space_name) = space {
824 let resolved = self.resolve_space_row(Some(space_name), None)?;
825 (vec![resolved.clone()], Some(resolved.id))
826 } else {
827 (self.storage.list_spaces()?, None)
828 };
829
830 let mut space_statuses = Vec::with_capacity(spaces.len());
831 for space_row in spaces {
832 let collections = self.storage.list_collections(Some(space_row.id))?;
833 let mut collection_statuses = Vec::with_capacity(collections.len());
834 let mut last_updated: Option<String> = None;
835
836 for collection in collections {
837 if last_updated
838 .as_ref()
839 .map(|existing| collection.updated > *existing)
840 .unwrap_or(true)
841 {
842 last_updated = Some(collection.updated.clone());
843 }
844
845 let documents = self
846 .storage
847 .count_documents_in_collection(collection.id, false)?;
848 let active_documents = self
849 .storage
850 .count_documents_in_collection(collection.id, true)?;
851 let chunks = self.storage.count_chunks_in_collection(collection.id)?;
852 let embedded_chunks = self
853 .storage
854 .count_embedded_chunks_in_collection(collection.id)?;
855
856 collection_statuses.push(CollectionStatus {
857 name: collection.name,
858 path: collection.path,
859 documents,
860 active_documents,
861 chunks,
862 embedded_chunks,
863 last_updated: collection.updated,
864 });
865 }
866
867 space_statuses.push(SpaceStatus {
868 name: space_row.name,
869 description: space_row.description,
870 collections: collection_statuses,
871 last_updated,
872 });
873 }
874
875 let models = self.model_status_unlocked()?;
876
877 Ok(StatusResponse {
878 spaces: space_statuses,
879 models,
880 cache_dir: self.config.cache_dir.clone(),
881 config_dir: self.config.config_dir.clone(),
882 total_documents: self.storage.count_documents(totals_scope)?,
883 total_chunks: self.storage.count_chunks(totals_scope)?,
884 total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
885 disk_usage: self.storage.disk_usage()?,
886 })
887 }
888
889 pub fn model_status(&self) -> Result<ModelStatus> {
890 self.model_status_unlocked()
891 }
892
893 fn model_status_unlocked(&self) -> Result<ModelStatus> {
894 models::status(&self.config)
895 }
896
897 pub fn config(&self) -> &Config {
898 &self.config
899 }
900
901 pub fn storage(&self) -> &Storage {
902 &self.storage
903 }
904
905 fn embedding_model_key(&self) -> &str {
906 self.config
907 .roles
908 .embedder
909 .as_ref()
910 .and_then(|role| self.config.providers.get(&role.provider))
911 .map(|profile| profile.model())
912 .unwrap_or("embedder")
913 }
914
915 fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
916 OperationLock::acquire(&self.config.cache_dir, mode)
917 }
918
919 fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
920 let mut chunk_ids = Vec::new();
921 for doc_id in doc_ids {
922 let chunks = self.storage.get_chunks_for_document(*doc_id)?;
923 chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
924 }
925 Ok(chunk_ids)
926 }
927
928 fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
929 if chunk_ids.is_empty() {
930 return Ok(());
931 }
932
933 self.storage.delete_tantivy(space, chunk_ids)?;
934 self.storage.commit_tantivy(space)?;
935 self.storage.delete_usearch(space, chunk_ids)?;
936 Ok(())
937 }
938
939 fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
940 let collection_count = self.storage.list_collections(Some(space.id))?.len();
941 let document_count = self.storage.count_documents(Some(space.id))?;
942 let chunk_count = self.storage.count_chunks(Some(space.id))?;
943
944 Ok(SpaceInfo {
945 name: space.name.clone(),
946 description: space.description.clone(),
947 collection_count,
948 document_count,
949 chunk_count,
950 created: space.created.clone(),
951 })
952 }
953
954 fn build_collection_info(
955 &self,
956 space_name: &str,
957 collection: &CollectionRow,
958 ) -> Result<CollectionInfo> {
959 let document_count = self
960 .storage
961 .count_documents_in_collection(collection.id, false)?;
962 let active_document_count = self
963 .storage
964 .count_documents_in_collection(collection.id, true)?;
965 let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
966 let embedded_chunk_count = self
967 .storage
968 .count_embedded_chunks_in_collection(collection.id)?;
969
970 Ok(CollectionInfo {
971 name: collection.name.clone(),
972 space: space_name.to_string(),
973 path: collection.path.clone(),
974 description: collection.description.clone(),
975 extensions: collection.extensions.clone(),
976 document_count,
977 active_document_count,
978 chunk_count,
979 embedded_chunk_count,
980 created: collection.created.clone(),
981 updated: collection.updated.clone(),
982 })
983 }
984
985 fn resolve_space_row(
986 &self,
987 explicit: Option<&str>,
988 collection_for_lookup: Option<&str>,
989 ) -> Result<crate::storage::SpaceRow> {
990 if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
991 return Ok(space);
992 }
993
994 if let Some(collection) = collection_for_lookup {
995 return match self.storage.find_space_for_collection(collection)? {
996 SpaceResolution::Found(space) => Ok(space),
997 SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
998 collection: collection.to_string(),
999 spaces,
1000 }
1001 .into()),
1002 SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
1003 name: collection.to_string(),
1004 }
1005 .into()),
1006 };
1007 }
1008
1009 Err(KboltError::NoActiveSpace.into())
1010 }
1011
1012 fn resolve_preferred_space(
1013 &self,
1014 explicit: Option<&str>,
1015 ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
1016 if let Some(space_name) = explicit {
1017 let space = self.storage.get_space(space_name)?;
1018 return Ok(Some((space, ActiveSpaceSource::Flag)));
1019 }
1020
1021 if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
1022 let trimmed = space_name.trim();
1023 if !trimmed.is_empty() {
1024 let space = self.storage.get_space(trimmed)?;
1025 return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1026 }
1027 }
1028
1029 if let Some(space_name) = self.config.default_space.as_deref() {
1030 let space = self.storage.get_space(space_name)?;
1031 return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1032 }
1033
1034 Ok(None)
1035 }
1036}
1037
1038fn source_is_stale(path: &Path, indexed_hash: &str) -> Result<bool> {
1039 match std::fs::read(path) {
1040 Ok(bytes) => Ok(sha256_hex(&bytes) != indexed_hash),
1041 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(true),
1042 Err(err) => Err(std::io::Error::new(
1043 err.kind(),
1044 format!(
1045 "failed to read indexed source '{}' for stale check: {err}",
1046 path.display()
1047 ),
1048 )
1049 .into()),
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests;
1055
1056#[cfg(test)]
1057mod lock_relief_tests {
1058 use fs2::FileExt;
1059 use std::collections::HashMap;
1060 use std::fs::OpenOptions;
1061 use std::mem;
1062
1063 use tempfile::tempdir;
1064
1065 use super::Engine;
1066 use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1067 use crate::storage::Storage;
1068 use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1069
1070 #[test]
1071 fn metadata_reads_succeed_while_global_lock_is_held() {
1072 let engine = test_engine_with_indexed_collection();
1073 let _holder = hold_global_lock(&engine);
1074
1075 let spaces = engine.list_spaces().expect("list spaces");
1076 assert_eq!(
1077 spaces
1078 .iter()
1079 .map(|space| space.name.as_str())
1080 .collect::<Vec<_>>(),
1081 vec!["default", "work"]
1082 );
1083
1084 let space = engine.space_info("work").expect("load space info");
1085 assert_eq!(space.name, "work");
1086 assert_eq!(space.collection_count, 1);
1087
1088 let collections = engine
1089 .list_collections(Some("work"))
1090 .expect("list collections");
1091 assert_eq!(collections.len(), 1);
1092 assert_eq!(collections[0].name, "api");
1093
1094 let collection = engine
1095 .collection_info(Some("work"), "api")
1096 .expect("load collection info");
1097 assert_eq!(collection.name, "api");
1098 assert_eq!(collection.document_count, 1);
1099
1100 let models = engine.model_status().expect("read model status");
1101 assert!(!models.embedder.configured);
1102 assert!(!models.reranker.configured);
1103 assert!(!models.expander.configured);
1104 }
1105
1106 #[test]
1107 fn document_reads_succeed_while_global_lock_is_held() {
1108 let engine = test_engine_with_indexed_collection();
1109 let _holder = hold_global_lock(&engine);
1110
1111 let files = engine
1112 .list_files(Some("work"), "api", None)
1113 .expect("list files");
1114 assert_eq!(files.len(), 1);
1115 assert_eq!(files[0].path, "guide.md");
1116
1117 let document = engine
1118 .get_document(GetRequest {
1119 locator: Locator::Path("api/guide.md".to_string()),
1120 space: Some("work".to_string()),
1121 offset: None,
1122 limit: None,
1123 })
1124 .expect("read document");
1125 assert_eq!(document.path, "api/guide.md");
1126 assert!(document.content.contains("hello from kbolt"));
1127
1128 let response = engine
1129 .multi_get(MultiGetRequest {
1130 locators: vec![Locator::Path("api/guide.md".to_string())],
1131 space: Some("work".to_string()),
1132 max_files: 4,
1133 max_bytes: 4_096,
1134 })
1135 .expect("read multiple documents");
1136 assert_eq!(response.resolved_count, 1);
1137 assert_eq!(response.documents.len(), 1);
1138 assert!(response.warnings.is_empty());
1139 }
1140
1141 fn test_engine_with_indexed_collection() -> Engine {
1142 let root = tempdir().expect("create temp root");
1143 let root_path = root.path().to_path_buf();
1144 mem::forget(root);
1145
1146 let config_dir = root_path.join("config");
1147 let cache_dir = root_path.join("cache");
1148 let docs_dir = root_path.join("docs");
1149 std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1150 std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1151 .expect("write document");
1152
1153 let storage = Storage::new(&cache_dir).expect("create storage");
1154 let config = Config {
1155 config_dir,
1156 cache_dir,
1157 default_space: None,
1158 providers: HashMap::new(),
1159 roles: crate::config::RoleBindingsConfig::default(),
1160 reaping: ReapingConfig { days: 7 },
1161 chunking: ChunkingConfig::default(),
1162 ranking: RankingConfig::default(),
1163 };
1164 let engine = Engine::from_parts(storage, config);
1165
1166 engine
1167 .add_collection(AddCollectionRequest {
1168 path: docs_dir,
1169 space: Some("work".to_string()),
1170 name: Some("api".to_string()),
1171 description: None,
1172 extensions: None,
1173 no_index: true,
1174 })
1175 .expect("add collection");
1176 engine
1177 .update(UpdateOptions {
1178 space: Some("work".to_string()),
1179 collections: vec!["api".to_string()],
1180 no_embed: true,
1181 dry_run: false,
1182 verbose: false,
1183 })
1184 .expect("index collection");
1185
1186 engine
1187 }
1188
1189 fn hold_global_lock(engine: &Engine) -> std::fs::File {
1190 std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1191 let holder = OpenOptions::new()
1192 .read(true)
1193 .write(true)
1194 .create(true)
1195 .truncate(false)
1196 .open(engine.config().cache_dir.join("kbolt.lock"))
1197 .expect("open lock file");
1198 FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1199 holder
1200 }
1201}