1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::canonical::build_canonical_document;
10use crate::ingest::chunk::{
11 chunk_canonical_document, chunk_canonical_document_with_counter, resolve_policy,
12};
13use crate::ingest::extract::default_registry;
14use crate::lock::{LockMode, OperationLock};
15use crate::models;
16use crate::retrieval_context::{EmbeddingDocumentInput, DENSE_DOCUMENT_RENDER_IDENTITY};
17use crate::storage::Storage;
18use crate::storage::{
19 ChunkInsert, ChunkRow, CollectionRow, DocumentGenerationReplace, DocumentRow, DocumentTextRow,
20 SpaceResolution, TantivyEntry,
21};
22use crate::Result;
23use kbolt_types::{
24 ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
25 CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
26 InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
27 OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
28 SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
29 SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
30};
31mod eval_ops;
32mod file_utils;
33pub(crate) mod ignore_helpers;
34mod ignore_ops;
35mod path_utils;
36mod schedule_ops;
37mod schedule_run_ops;
38mod schedule_status_ops;
39mod scoring;
40mod search_ops;
41mod text_helpers;
42mod update_ops;
43use file_utils::{file_error, modified_token, sha256_hex};
44use ignore_helpers::{
45 collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
46 load_collection_ignore_matcher, validate_ignore_pattern,
47};
48use path_utils::{
49 collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
50 normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
51};
52use scoring::{dense_distance_to_score, max_option};
53#[cfg(test)]
54pub(crate) use text_helpers::retrieval_text_with_prefix;
55use text_helpers::search_text_with_loaded_canonical_neighbors;
56
57pub struct Engine {
58 storage: Storage,
59 config: Config,
60 embedder: Option<Arc<dyn models::Embedder>>,
61 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
62 reranker: Option<Arc<dyn models::Reranker>>,
63 expander: Option<Arc<dyn models::Expander>>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct IgnoreListEntry {
68 pub space: String,
69 pub collection: String,
70 pub pattern_count: usize,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct UpdateTarget {
75 pub space: String,
76 pub collection: CollectionRow,
77}
78
79#[derive(Debug, Clone, Copy)]
80struct TargetScope<'a> {
81 space: Option<&'a str>,
82 collections: &'a [String],
83}
84
85#[derive(Debug, Clone)]
86struct SearchCollectionMeta {
87 space: String,
88 collection: String,
89}
90
91#[derive(Debug)]
92struct SearchTargetScope {
93 space: String,
94 collection_ids: Vec<i64>,
95 filtered: bool,
96 document_ids: Vec<i64>,
97 chunk_count: usize,
98 chunk_key_filter: Mutex<Option<Arc<HashSet<u64>>>>,
99 bm25_reloaded: Mutex<bool>,
100}
101
102#[derive(Debug, Clone)]
103struct SearchHitCandidate {
104 chunk_id: i64,
105 bm25_score: f32,
106}
107
108#[derive(Debug, Clone)]
109struct RankedChunk {
110 chunk_id: i64,
111 score: f32,
112 fusion: f32,
113 reranker: Option<f32>,
114 bm25: Option<f32>,
115 dense: Option<f32>,
116 original_rank: Option<usize>,
117}
118
119impl Engine {
120 pub fn new(config_path: Option<&Path>) -> Result<Self> {
121 Self::new_with_recovery_notice(config_path, None)
122 }
123
124 pub fn new_with_recovery_notice(
125 config_path: Option<&Path>,
126 recovery_notice: Option<crate::RecoveryNoticeSink>,
127 ) -> Result<Self> {
128 let config = config::load(config_path)?;
129 let storage = Storage::new(&config.cache_dir)?;
130 let built_models =
131 models::build_inference_clients_with_recovery_notice(&config, recovery_notice)?;
132 Ok(Self {
133 storage,
134 config,
135 embedder: built_models.embedder,
136 embedding_document_sizer: built_models.embedding_document_sizer,
137 reranker: built_models.reranker,
138 expander: built_models.expander,
139 })
140 }
141
142 #[cfg(test)]
143 pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
144 Self::from_parts_with_models(storage, config, None, None, None)
145 }
146
147 #[cfg(test)]
148 pub(crate) fn from_parts_with_embedder(
149 storage: Storage,
150 config: Config,
151 embedder: Option<Arc<dyn models::Embedder>>,
152 ) -> Self {
153 Self::from_parts_with_inference(storage, config, embedder, None, None, None)
154 }
155
156 #[cfg(test)]
157 pub(crate) fn from_parts_with_embedding_runtime(
158 storage: Storage,
159 config: Config,
160 embedder: Option<Arc<dyn models::Embedder>>,
161 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
162 ) -> Self {
163 Self::from_parts_with_inference(
164 storage,
165 config,
166 embedder,
167 embedding_document_sizer,
168 None,
169 None,
170 )
171 }
172
173 #[cfg(test)]
174 pub(crate) fn from_parts_with_models(
175 storage: Storage,
176 config: Config,
177 embedder: Option<Arc<dyn models::Embedder>>,
178 reranker: Option<Arc<dyn models::Reranker>>,
179 expander: Option<Arc<dyn models::Expander>>,
180 ) -> Self {
181 Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
182 }
183
184 #[cfg(test)]
185 fn from_parts_with_inference(
186 storage: Storage,
187 config: Config,
188 embedder: Option<Arc<dyn models::Embedder>>,
189 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
190 reranker: Option<Arc<dyn models::Reranker>>,
191 expander: Option<Arc<dyn models::Expander>>,
192 ) -> Self {
193 let built_models =
194 models::build_inference_clients(&config).expect("build inference models");
195 Self {
196 storage,
197 config,
198 embedder: embedder.or(built_models.embedder),
199 embedding_document_sizer: embedding_document_sizer
200 .or(built_models.embedding_document_sizer),
201 reranker: reranker.or(built_models.reranker),
202 expander: expander.or(built_models.expander),
203 }
204 }
205
206 pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
207 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
208 self.storage.create_space(name, description)?;
209 let space = self.storage.get_space(name)?;
210 self.build_space_info(&space)
211 }
212
213 pub fn remove_space(&mut self, name: &str) -> Result<()> {
214 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
215 self.storage.delete_space(name)?;
216 if self.config.default_space.as_deref() == Some(name) {
217 self.persist_default_space(None)?;
218 }
219 Ok(())
220 }
221
222 pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
223 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
224 self.storage.rename_space(old, new)?;
225 if self.config.default_space.as_deref() == Some(old) {
226 self.persist_default_space(Some(new.to_string()))?;
227 }
228 Ok(())
229 }
230
231 pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
232 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
233 self.storage.update_space_description(name, description)
234 }
235
236 pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
237 let spaces = self.storage.list_spaces()?;
238 let mut infos = Vec::with_capacity(spaces.len());
239 for space in spaces {
240 infos.push(self.build_space_info(&space)?);
241 }
242 Ok(infos)
243 }
244
245 pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
246 let space = self.storage.get_space(name)?;
247 self.build_space_info(&space)
248 }
249
250 pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
251 if let Some(space_name) = name {
252 self.storage.get_space(space_name)?;
253 }
254
255 self.persist_default_space(name.map(ToString::to_string))?;
256 Ok(self.config.default_space.clone())
257 }
258
259 fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
260 let previous = self.config.default_space.clone();
261 self.config.default_space = default_space;
262 if let Err(err) = config::save(&self.config) {
263 self.config.default_space = previous;
264 return Err(err);
265 }
266 Ok(())
267 }
268
269 pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
270 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
271 let AddCollectionRequest {
272 path,
273 space: requested_space,
274 name: requested_name,
275 description,
276 extensions,
277 no_index,
278 } = req;
279
280 let space = match requested_space.as_deref() {
281 Some(space_name) => match self.storage.get_space(space_name) {
282 Ok(space) => space,
283 Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
284 self.storage.create_space(space_name, None)?;
285 self.storage.get_space(space_name)?
286 }
287 Err(err) => return Err(err),
288 },
289 None => self.resolve_space_row(None, None)?,
290 };
291 if !path.is_absolute() || !path.is_dir() {
292 return Err(KboltError::InvalidPath(path).into());
293 }
294
295 let name = match requested_name {
296 Some(name) => name,
297 None => path
298 .file_name()
299 .and_then(|name| name.to_str())
300 .map(ToString::to_string)
301 .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
302 };
303
304 self.storage.create_collection(
305 space.id,
306 &name,
307 &path,
308 description.as_deref(),
309 extensions.as_deref(),
310 )?;
311
312 let initial_indexing = if no_index {
313 InitialIndexingOutcome::Skipped
314 } else {
315 match self.update_unlocked(UpdateOptions {
316 space: Some(space.name.clone()),
317 collections: vec![name.clone()],
318 no_embed: false,
319 dry_run: false,
320 verbose: false,
321 }) {
322 Ok(report) => InitialIndexingOutcome::Indexed(report),
323 Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
324 InitialIndexingOutcome::Blocked(
325 InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
326 )
327 }
328 Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
329 InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
330 name,
331 })
332 }
333 Err(err) => return Err(err),
334 }
335 };
336
337 let collection = self.storage.get_collection(space.id, &name)?;
338 Ok(AddCollectionResult {
339 collection: self.build_collection_info(&space.name, &collection)?,
340 initial_indexing,
341 })
342 }
343
344 pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
345 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
346 let resolved = self.resolve_space_row(space, Some(name))?;
347 let collection = self.storage.get_collection(resolved.id, name)?;
348 let documents = self.storage.list_documents(collection.id, false)?;
349 let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
350 let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
351 self.purge_space_chunks(&resolved.name, &chunk_ids)?;
352 self.storage.delete_collection(resolved.id, name)?;
353
354 let ignore_path =
355 collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
356 if ignore_path.is_file() {
357 std::fs::remove_file(ignore_path)?;
358 }
359
360 Ok(())
361 }
362
363 pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
364 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
365 let resolved = self.resolve_space_row(space, Some(old))?;
366 let old_ignore_path =
367 collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
368 let new_ignore_path =
369 collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
370 if old_ignore_path.is_file() && new_ignore_path.exists() {
371 return Err(KboltError::Internal(format!(
372 "cannot rename ignore file: destination already exists: {}",
373 new_ignore_path.display()
374 ))
375 .into());
376 }
377
378 self.storage.rename_collection(resolved.id, old, new)?;
379
380 if old_ignore_path.is_file() {
381 if let Some(parent) = new_ignore_path.parent() {
382 std::fs::create_dir_all(parent)?;
383 }
384 if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
385 match self.storage.rename_collection(resolved.id, new, old) {
386 Ok(()) => {
387 return Err(KboltError::Internal(format!(
388 "renamed collection was rolled back after ignore rename failure: {rename_err}"
389 ))
390 .into())
391 }
392 Err(rollback_err) => {
393 return Err(KboltError::Internal(format!(
394 "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
395 ))
396 .into())
397 }
398 }
399 }
400 }
401
402 Ok(())
403 }
404
405 pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
406 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
407 let resolved = self.resolve_space_row(space, Some(name))?;
408 self.storage
409 .update_collection_description(resolved.id, name, desc)
410 }
411
412 pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
413 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
414 let resolved = self.resolve_space_row(Some(space_name), None)?;
415 let mut map = std::collections::HashMap::new();
416 map.insert(resolved.id, resolved.name.clone());
417 (Some(resolved.id), map)
418 } else {
419 let spaces = self.storage.list_spaces()?;
420 let map = spaces
421 .into_iter()
422 .map(|space| (space.id, space.name))
423 .collect::<std::collections::HashMap<_, _>>();
424 (None, map)
425 };
426
427 let collections = self.storage.list_collections(space_id_filter)?;
428 let mut infos = Vec::with_capacity(collections.len());
429 for collection in collections {
430 let space_name = spaces_by_id
431 .get(&collection.space_id)
432 .ok_or_else(|| {
433 KboltError::Internal(format!(
434 "missing space mapping for collection '{}'",
435 collection.name
436 ))
437 })?
438 .clone();
439 infos.push(self.build_collection_info(&space_name, &collection)?);
440 }
441 Ok(infos)
442 }
443
444 pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
445 let resolved = self.resolve_space_row(space, Some(name))?;
446 let collection = self.storage.get_collection(resolved.id, name)?;
447 self.build_collection_info(&resolved.name, &collection)
448 }
449
450 pub fn list_files(
451 &self,
452 space: Option<&str>,
453 collection: &str,
454 prefix: Option<&str>,
455 ) -> Result<Vec<FileEntry>> {
456 let resolved_space = self.resolve_space_row(space, Some(collection))?;
457 let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
458 let normalized_prefix = normalize_list_prefix(prefix)?;
459 let file_rows = self
460 .storage
461 .list_collection_file_rows(collection_row.id, false)?;
462
463 let mut files = Vec::with_capacity(file_rows.len());
464 for file_row in file_rows {
465 if let Some(prefix) = normalized_prefix.as_deref() {
466 if !path_matches_prefix(&file_row.path, prefix) {
467 continue;
468 }
469 }
470
471 let title =
472 crate::engine::file_utils::display_title(file_row.title.as_deref(), &file_row.path);
473 files.push(FileEntry {
474 path: file_row.path,
475 title,
476 docid: short_docid(&file_row.hash),
477 active: file_row.active,
478 chunk_count: file_row.chunk_count,
479 embedded: file_row.chunk_count > 0
480 && file_row.embedded_chunk_count >= file_row.chunk_count,
481 });
482 }
483
484 Ok(files)
485 }
486
487 pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
488 self.get_document_unlocked(req)
489 }
490
491 fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
492 let GetRequest {
493 locator,
494 space,
495 offset,
496 limit,
497 } = req;
498
499 let (document, collection_row, space_name) = match locator {
500 Locator::Path(locator_path) => {
501 let (collection_name, relative_path) = split_collection_path(&locator_path)?;
502 let resolved_space =
503 self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
504 let collection = self
505 .storage
506 .get_collection(resolved_space.id, &collection_name)?;
507 let document = self
508 .storage
509 .get_document_by_path(collection.id, &relative_path)?
510 .ok_or_else(|| KboltError::DocumentNotFound {
511 path: locator_path.clone(),
512 })?;
513 (document, collection, resolved_space.name)
514 }
515 Locator::DocId(docid) => {
516 let prefix = normalize_docid(&docid)?;
517 let mut candidates = self
518 .storage
519 .get_document_by_hash_prefix(&prefix)?
520 .into_iter()
521 .map(|document| {
522 let collection =
523 self.storage.get_collection_by_id(document.collection_id)?;
524 Ok((document, collection))
525 })
526 .collect::<Result<Vec<_>>>()?;
527
528 if let Some(space_name) = space.as_deref() {
529 let resolved_space = self.resolve_space_row(Some(space_name), None)?;
530 candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
531 }
532
533 if candidates.is_empty() {
534 return Err(KboltError::DocumentNotFound {
535 path: format!("#{prefix}"),
536 }
537 .into());
538 }
539
540 if candidates.len() > 1 {
541 return Err(KboltError::InvalidInput(
542 "docid is ambiguous; provide more characters".to_string(),
543 )
544 .into());
545 }
546
547 let (document, collection) = candidates.pop().expect("candidate exists");
548 let space = self.storage.get_space_by_id(collection.space_id)?;
549 (document, collection, space.name)
550 }
551 };
552
553 let document_text = self.storage.get_document_text(document.id)?;
554 let full_path = collection_row.path.join(&document.path);
555 let stale = source_is_stale(&full_path, document.hash.as_str())?;
556
557 let lines = document_text.text.lines().collect::<Vec<_>>();
558 let total_lines = lines.len();
559 let start = offset.unwrap_or(0).min(total_lines);
560 let requested = limit.unwrap_or(total_lines.saturating_sub(start));
561 let end = start.saturating_add(requested).min(total_lines);
562 let returned_lines = end.saturating_sub(start);
563 let content = if returned_lines == 0 {
564 String::new()
565 } else {
566 lines[start..end].join("\n")
567 };
568
569 Ok(DocumentResponse {
570 docid: short_docid(&document.hash),
571 path: format!("{}/{}", collection_row.name, document.path),
572 title: crate::engine::file_utils::display_title(
573 document.title.as_deref(),
574 &document.path,
575 ),
576 space: space_name,
577 collection: collection_row.name,
578 content,
579 stale,
580 total_lines,
581 returned_lines,
582 })
583 }
584
585 pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
586 if req.max_files == 0 {
587 return Err(
588 KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
589 );
590 }
591 if req.max_bytes == 0 {
592 return Err(
593 KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
594 );
595 }
596
597 let mut documents = Vec::new();
598 let mut omitted = Vec::new();
599 let mut resolved_count = 0usize;
600 let mut warnings = Vec::new();
601 let mut consumed_bytes = 0usize;
602
603 for locator in req.locators {
604 let document = match self.get_document_unlocked(GetRequest {
605 locator,
606 space: req.space.clone(),
607 offset: None,
608 limit: None,
609 }) {
610 Ok(document) => document,
611 Err(err) => match KboltError::from(err) {
612 KboltError::DocumentNotFound { path } => {
613 warnings.push(format!("document not found: {path}"));
614 continue;
615 }
616 KboltError::InvalidInput(message) => {
617 warnings.push(format!("invalid locator: {message}"));
618 continue;
619 }
620 KboltError::InvalidPath(path) => {
621 warnings.push(format!("invalid locator path: {}", path.display()));
622 continue;
623 }
624 KboltError::AmbiguousSpace { collection, spaces } => {
625 warnings.push(format!(
626 "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
627 spaces
628 ));
629 continue;
630 }
631 KboltError::SpaceNotFound { name } => {
632 warnings.push(format!("space not found for locator: {name}"));
633 continue;
634 }
635 KboltError::CollectionNotFound { name } => {
636 warnings.push(format!("collection not found for locator: {name}"));
637 continue;
638 }
639 other => return Err(other.into()),
640 },
641 };
642 resolved_count = resolved_count.saturating_add(1);
643
644 let size_bytes = document.content.len();
645 if documents.len() >= req.max_files {
646 omitted.push(OmittedFile {
647 path: document.path,
648 docid: document.docid,
649 size_bytes,
650 reason: OmitReason::MaxFiles,
651 });
652 continue;
653 }
654
655 if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
656 omitted.push(OmittedFile {
657 path: document.path,
658 docid: document.docid,
659 size_bytes,
660 reason: OmitReason::MaxBytes,
661 });
662 continue;
663 }
664
665 consumed_bytes = consumed_bytes.saturating_add(size_bytes);
666 documents.push(document);
667 }
668
669 Ok(MultiGetResponse {
670 resolved_count,
671 documents,
672 omitted,
673 warnings,
674 })
675 }
676
677 pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
678 let _profile = crate::profile::SearchProfileGuard::start();
679 let _lock = crate::profile::timed_search_stage("operation_lock", || {
680 self.acquire_operation_lock(LockMode::Shared)
681 })?;
682 let started = Instant::now();
683 let query = req.query.trim();
684 if query.is_empty() {
685 return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
686 }
687
688 let requested_mode = req.mode.clone();
689 let rerank_enabled =
690 matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
691 let mut pipeline = self.initial_search_pipeline(&requested_mode);
692
693 let targets = crate::profile::timed_search_stage("resolve_targets", || {
694 self.resolve_targets(TargetScope {
695 space: req.space.as_deref(),
696 collections: &req.collections,
697 })
698 })?;
699 crate::profile::increment_search_count("target_collections", targets.len() as u64);
700
701 let staleness_hint = targets
702 .iter()
703 .map(|target| target.collection.updated.clone())
704 .max()
705 .map(|updated| format!("Index last updated: {updated}"));
706
707 if req.limit == 0 || targets.is_empty() {
708 return Ok(SearchResponse {
709 results: Vec::new(),
710 query: req.query,
711 requested_mode: requested_mode.clone(),
712 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
713 pipeline,
714 staleness_hint,
715 elapsed_ms: started.elapsed().as_millis() as u64,
716 });
717 }
718
719 let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
720 for target in &targets {
721 collections_by_id.insert(
722 target.collection.id,
723 SearchCollectionMeta {
724 space: target.space.clone(),
725 collection: target.collection.name.clone(),
726 },
727 );
728 }
729
730 let target_scopes = crate::profile::timed_search_stage("search_target_scopes", || {
731 self.search_target_scopes(&targets, !req.collections.is_empty())
732 })?;
733 crate::profile::increment_search_count("target_scopes", target_scopes.len() as u64);
734 for scope in &target_scopes {
735 crate::profile::increment_search_count("scope_chunks", scope.chunk_count as u64);
736 if scope.filtered {
737 crate::profile::increment_search_count("filtered_scopes", 1);
738 crate::profile::increment_search_count(
739 "scope_document_filters",
740 scope.document_ids.len() as u64,
741 );
742 }
743 }
744 let max_candidates = crate::profile::timed_search_stage("max_search_candidates", || {
745 self.max_search_candidates(&target_scopes)
746 });
747 let mut retrieval_limit =
748 self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
749 let final_candidates = loop {
750 crate::profile::increment_search_count("retrieval_iterations", 1);
751 let ranked_chunks = crate::profile::timed_search_stage("rank_chunks", || {
752 self.rank_chunks_for_mode(
753 &requested_mode,
754 &target_scopes,
755 query,
756 retrieval_limit,
757 req.min_score,
758 &mut pipeline,
759 )
760 })?;
761 crate::profile::increment_search_count("ranked_chunks", ranked_chunks.len() as u64);
762
763 if ranked_chunks.is_empty() {
764 return Ok(SearchResponse {
765 results: Vec::new(),
766 query: req.query,
767 requested_mode: requested_mode.clone(),
768 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
769 pipeline,
770 staleness_hint,
771 elapsed_ms: started.elapsed().as_millis() as u64,
772 });
773 }
774
775 let ranked_len = ranked_chunks.len();
776 let candidates = crate::profile::timed_search_stage("assemble_candidates", || {
777 self.search_candidates_from_ranked_chunks(ranked_chunks, &collections_by_id)
778 })?;
779 crate::profile::increment_search_count("assembled_candidates", candidates.len() as u64);
780
781 if candidates.len() >= req.limit
782 || ranked_len < retrieval_limit
783 || retrieval_limit >= max_candidates
784 {
785 break candidates;
786 }
787
788 let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
789 if next_limit <= retrieval_limit {
790 break candidates;
791 }
792 retrieval_limit = next_limit;
793 };
794 let results = crate::profile::timed_search_stage("assemble_results", || {
795 self.assemble_search_candidates(
796 query,
797 &requested_mode,
798 final_candidates,
799 req.debug,
800 rerank_enabled,
801 &mut pipeline,
802 req.limit,
803 )
804 })?;
805 crate::profile::increment_search_count("assembled_results", results.len() as u64);
806
807 Ok(SearchResponse {
808 results,
809 query: req.query,
810 requested_mode: requested_mode.clone(),
811 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
812 pipeline,
813 staleness_hint,
814 elapsed_ms: started.elapsed().as_millis() as u64,
815 })
816 }
817
818 pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
819 let resolved = self.resolve_space_row(explicit, None)?;
820 Ok(resolved.name)
821 }
822
823 pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
824 let resolved = self.resolve_preferred_space(explicit)?;
825 Ok(resolved.map(|(space, source)| ActiveSpace {
826 name: space.name,
827 source,
828 }))
829 }
830
831 pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
832 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
833 let (spaces, totals_scope) = if let Some(space_name) = space {
834 let resolved = self.resolve_space_row(Some(space_name), None)?;
835 (vec![resolved.clone()], Some(resolved.id))
836 } else {
837 (self.storage.list_spaces()?, None)
838 };
839
840 let mut space_statuses = Vec::with_capacity(spaces.len());
841 for space_row in spaces {
842 let collections = self.storage.list_collections(Some(space_row.id))?;
843 let mut collection_statuses = Vec::with_capacity(collections.len());
844 let mut last_updated: Option<String> = None;
845
846 for collection in collections {
847 if last_updated
848 .as_ref()
849 .map(|existing| collection.updated > *existing)
850 .unwrap_or(true)
851 {
852 last_updated = Some(collection.updated.clone());
853 }
854
855 let documents = self
856 .storage
857 .count_documents_in_collection(collection.id, false)?;
858 let active_documents = self
859 .storage
860 .count_documents_in_collection(collection.id, true)?;
861 let chunks = self.storage.count_chunks_in_collection(collection.id)?;
862 let embedded_chunks = self
863 .storage
864 .count_embedded_chunks_in_collection(collection.id)?;
865
866 collection_statuses.push(CollectionStatus {
867 name: collection.name,
868 path: collection.path,
869 documents,
870 active_documents,
871 chunks,
872 embedded_chunks,
873 last_updated: collection.updated,
874 });
875 }
876
877 space_statuses.push(SpaceStatus {
878 name: space_row.name,
879 description: space_row.description,
880 collections: collection_statuses,
881 last_updated,
882 });
883 }
884
885 let models = self.model_status_unlocked()?;
886
887 Ok(StatusResponse {
888 spaces: space_statuses,
889 models,
890 cache_dir: self.config.cache_dir.clone(),
891 config_dir: self.config.config_dir.clone(),
892 total_documents: self.storage.count_documents(totals_scope)?,
893 total_chunks: self.storage.count_chunks(totals_scope)?,
894 total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
895 disk_usage: self.storage.disk_usage()?,
896 })
897 }
898
899 pub fn model_status(&self) -> Result<ModelStatus> {
900 self.model_status_unlocked()
901 }
902
903 fn model_status_unlocked(&self) -> Result<ModelStatus> {
904 models::status(&self.config)
905 }
906
907 pub fn config(&self) -> &Config {
908 &self.config
909 }
910
911 pub fn storage(&self) -> &Storage {
912 &self.storage
913 }
914
915 fn embedding_model_name(&self) -> &str {
916 self.config
917 .roles
918 .embedder
919 .as_ref()
920 .and_then(|role| self.config.providers.get(&role.provider))
921 .map(|profile| profile.model())
922 .unwrap_or("embedder")
923 }
924
925 fn embedding_model_key(&self) -> String {
926 let formatter = self.embedding_input_formatter();
927 format!(
928 "{}|{}|{}",
929 self.embedding_model_name(),
930 DENSE_DOCUMENT_RENDER_IDENTITY,
931 formatter.format_identity()
932 )
933 }
934
935 fn embedding_input_formatter(&self) -> models::EmbeddingInputFormatter {
936 models::embedding_input_formatter_for_config(&self.config)
937 }
938
939 fn format_embedding_query(&self, query: &str) -> String {
940 self.embedding_input_formatter()
941 .format_query(query)
942 .into_owned()
943 }
944
945 fn format_embedding_queries(&self, queries: &[String]) -> Vec<String> {
946 let formatter = self.embedding_input_formatter();
947 queries
948 .iter()
949 .map(|query| formatter.format_query(query).into_owned())
950 .collect()
951 }
952
953 fn format_embedding_document(&self, input: &EmbeddingDocumentInput<'_>) -> String {
954 self.embedding_input_formatter().format_document(input)
955 }
956
957 fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
958 OperationLock::acquire(&self.config.cache_dir, mode)
959 }
960
961 fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
962 let mut chunk_ids = Vec::new();
963 for doc_id in doc_ids {
964 let chunks = self.storage.get_chunks_for_document(*doc_id)?;
965 chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
966 }
967 Ok(chunk_ids)
968 }
969
970 fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
971 if chunk_ids.is_empty() {
972 return Ok(());
973 }
974
975 self.storage.delete_tantivy(space, chunk_ids)?;
976 self.storage.commit_tantivy(space)?;
977 self.storage.delete_usearch(space, chunk_ids)?;
978 Ok(())
979 }
980
981 fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
982 let collection_count = self.storage.list_collections(Some(space.id))?.len();
983 let document_count = self.storage.count_documents(Some(space.id))?;
984 let chunk_count = self.storage.count_chunks(Some(space.id))?;
985
986 Ok(SpaceInfo {
987 name: space.name.clone(),
988 description: space.description.clone(),
989 collection_count,
990 document_count,
991 chunk_count,
992 created: space.created.clone(),
993 })
994 }
995
996 fn build_collection_info(
997 &self,
998 space_name: &str,
999 collection: &CollectionRow,
1000 ) -> Result<CollectionInfo> {
1001 let document_count = self
1002 .storage
1003 .count_documents_in_collection(collection.id, false)?;
1004 let active_document_count = self
1005 .storage
1006 .count_documents_in_collection(collection.id, true)?;
1007 let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
1008 let embedded_chunk_count = self
1009 .storage
1010 .count_embedded_chunks_in_collection(collection.id)?;
1011
1012 Ok(CollectionInfo {
1013 name: collection.name.clone(),
1014 space: space_name.to_string(),
1015 path: collection.path.clone(),
1016 description: collection.description.clone(),
1017 extensions: collection.extensions.clone(),
1018 document_count,
1019 active_document_count,
1020 chunk_count,
1021 embedded_chunk_count,
1022 created: collection.created.clone(),
1023 updated: collection.updated.clone(),
1024 })
1025 }
1026
1027 fn resolve_space_row(
1028 &self,
1029 explicit: Option<&str>,
1030 collection_for_lookup: Option<&str>,
1031 ) -> Result<crate::storage::SpaceRow> {
1032 if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
1033 return Ok(space);
1034 }
1035
1036 if let Some(collection) = collection_for_lookup {
1037 return match self.storage.find_space_for_collection(collection)? {
1038 SpaceResolution::Found(space) => Ok(space),
1039 SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
1040 collection: collection.to_string(),
1041 spaces,
1042 }
1043 .into()),
1044 SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
1045 name: collection.to_string(),
1046 }
1047 .into()),
1048 };
1049 }
1050
1051 Err(KboltError::NoActiveSpace.into())
1052 }
1053
1054 fn resolve_preferred_space(
1055 &self,
1056 explicit: Option<&str>,
1057 ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
1058 if let Some(space_name) = explicit {
1059 let space = self.storage.get_space(space_name)?;
1060 return Ok(Some((space, ActiveSpaceSource::Flag)));
1061 }
1062
1063 if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
1064 let trimmed = space_name.trim();
1065 if !trimmed.is_empty() {
1066 let space = self.storage.get_space(trimmed)?;
1067 return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1068 }
1069 }
1070
1071 if let Some(space_name) = self.config.default_space.as_deref() {
1072 let space = self.storage.get_space(space_name)?;
1073 return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1074 }
1075
1076 Ok(None)
1077 }
1078}
1079
1080fn source_is_stale(path: &Path, indexed_hash: &str) -> Result<bool> {
1081 match std::fs::read(path) {
1082 Ok(bytes) => Ok(sha256_hex(&bytes) != indexed_hash),
1083 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(true),
1084 Err(err) => Err(std::io::Error::new(
1085 err.kind(),
1086 format!(
1087 "failed to read indexed source '{}' for stale check: {err}",
1088 path.display()
1089 ),
1090 )
1091 .into()),
1092 }
1093}
1094
1095#[cfg(test)]
1096mod tests;
1097
1098#[cfg(test)]
1099mod lock_relief_tests {
1100 use fs2::FileExt;
1101 use std::collections::HashMap;
1102 use std::fs::OpenOptions;
1103 use std::mem;
1104
1105 use tempfile::tempdir;
1106
1107 use super::Engine;
1108 use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1109 use crate::storage::Storage;
1110 use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1111
1112 #[test]
1113 fn metadata_reads_succeed_while_global_lock_is_held() {
1114 let engine = test_engine_with_indexed_collection();
1115 let _holder = hold_global_lock(&engine);
1116
1117 let spaces = engine.list_spaces().expect("list spaces");
1118 assert_eq!(
1119 spaces
1120 .iter()
1121 .map(|space| space.name.as_str())
1122 .collect::<Vec<_>>(),
1123 vec!["default", "work"]
1124 );
1125
1126 let space = engine.space_info("work").expect("load space info");
1127 assert_eq!(space.name, "work");
1128 assert_eq!(space.collection_count, 1);
1129
1130 let collections = engine
1131 .list_collections(Some("work"))
1132 .expect("list collections");
1133 assert_eq!(collections.len(), 1);
1134 assert_eq!(collections[0].name, "api");
1135
1136 let collection = engine
1137 .collection_info(Some("work"), "api")
1138 .expect("load collection info");
1139 assert_eq!(collection.name, "api");
1140 assert_eq!(collection.document_count, 1);
1141
1142 let models = engine.model_status().expect("read model status");
1143 assert!(!models.embedder.configured);
1144 assert!(!models.reranker.configured);
1145 assert!(!models.expander.configured);
1146 }
1147
1148 #[test]
1149 fn document_reads_succeed_while_global_lock_is_held() {
1150 let engine = test_engine_with_indexed_collection();
1151 let _holder = hold_global_lock(&engine);
1152
1153 let files = engine
1154 .list_files(Some("work"), "api", None)
1155 .expect("list files");
1156 assert_eq!(files.len(), 1);
1157 assert_eq!(files[0].path, "guide.md");
1158
1159 let document = engine
1160 .get_document(GetRequest {
1161 locator: Locator::Path("api/guide.md".to_string()),
1162 space: Some("work".to_string()),
1163 offset: None,
1164 limit: None,
1165 })
1166 .expect("read document");
1167 assert_eq!(document.path, "api/guide.md");
1168 assert!(document.content.contains("hello from kbolt"));
1169
1170 let response = engine
1171 .multi_get(MultiGetRequest {
1172 locators: vec![Locator::Path("api/guide.md".to_string())],
1173 space: Some("work".to_string()),
1174 max_files: 4,
1175 max_bytes: 4_096,
1176 })
1177 .expect("read multiple documents");
1178 assert_eq!(response.resolved_count, 1);
1179 assert_eq!(response.documents.len(), 1);
1180 assert!(response.warnings.is_empty());
1181 }
1182
1183 fn test_engine_with_indexed_collection() -> Engine {
1184 let root = tempdir().expect("create temp root");
1185 let root_path = root.path().to_path_buf();
1186 mem::forget(root);
1187
1188 let config_dir = root_path.join("config");
1189 let cache_dir = root_path.join("cache");
1190 let docs_dir = root_path.join("docs");
1191 std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1192 std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1193 .expect("write document");
1194
1195 let storage = Storage::new(&cache_dir).expect("create storage");
1196 let config = Config {
1197 config_dir,
1198 cache_dir,
1199 default_space: None,
1200 providers: HashMap::new(),
1201 roles: crate::config::RoleBindingsConfig::default(),
1202 reaping: ReapingConfig { days: 7 },
1203 chunking: ChunkingConfig::default(),
1204 ranking: RankingConfig::default(),
1205 };
1206 let engine = Engine::from_parts(storage, config);
1207
1208 engine
1209 .add_collection(AddCollectionRequest {
1210 path: docs_dir,
1211 space: Some("work".to_string()),
1212 name: Some("api".to_string()),
1213 description: None,
1214 extensions: None,
1215 no_index: true,
1216 })
1217 .expect("add collection");
1218 engine
1219 .update(UpdateOptions {
1220 space: Some("work".to_string()),
1221 collections: vec!["api".to_string()],
1222 no_embed: true,
1223 dry_run: false,
1224 verbose: false,
1225 })
1226 .expect("index collection");
1227
1228 engine
1229 }
1230
1231 fn hold_global_lock(engine: &Engine) -> std::fs::File {
1232 std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1233 let holder = OpenOptions::new()
1234 .read(true)
1235 .write(true)
1236 .create(true)
1237 .truncate(false)
1238 .open(engine.config().cache_dir.join("kbolt.lock"))
1239 .expect("open lock file");
1240 FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1241 holder
1242 }
1243}