1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::chunk::{chunk_document, chunk_document_with_counter, resolve_policy};
10use crate::ingest::extract::default_registry;
11use crate::lock::{LockMode, OperationLock};
12use crate::models;
13use crate::storage::Storage;
14use crate::storage::{
15 ChunkInsert, ChunkRow, CollectionRow, DocumentRow, DocumentTitleSource, SpaceResolution,
16 TantivyEntry,
17};
18use crate::Result;
19use kbolt_types::{
20 ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
21 CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
22 InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
23 OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
24 SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
25 SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
26};
27mod eval_ops;
28mod file_utils;
29mod ignore_helpers;
30mod ignore_ops;
31mod path_utils;
32mod schedule_ops;
33mod schedule_run_ops;
34mod schedule_status_ops;
35mod scoring;
36mod search_ops;
37mod text_helpers;
38mod update_ops;
39use file_utils::{file_error, file_title, modified_token, sha256_hex};
40use ignore_helpers::{
41 collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
42 load_collection_ignore_matcher, validate_ignore_pattern,
43};
44use path_utils::{
45 collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
46 normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
47};
48use scoring::{dense_distance_to_score, max_option};
49pub(crate) use text_helpers::retrieval_text_with_prefix;
50use text_helpers::{chunk_text_from_bytes, search_text_with_neighbors};
51
52pub struct Engine {
53 storage: Storage,
54 config: Config,
55 embedder: Option<Arc<dyn models::Embedder>>,
56 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
57 reranker: Option<Arc<dyn models::Reranker>>,
58 expander: Option<Arc<dyn models::Expander>>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct IgnoreListEntry {
63 pub space: String,
64 pub collection: String,
65 pub pattern_count: usize,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct UpdateTarget {
70 pub space: String,
71 pub collection: CollectionRow,
72}
73
74#[derive(Debug, Clone, Copy)]
75struct TargetScope<'a> {
76 space: Option<&'a str>,
77 collections: &'a [String],
78}
79
80#[derive(Debug, Clone)]
81struct SearchCollectionMeta {
82 space: String,
83 collection: String,
84 path: std::path::PathBuf,
85}
86
87#[derive(Debug, Clone)]
88struct SearchHitCandidate {
89 chunk_id: i64,
90 bm25_score: f32,
91}
92
93#[derive(Debug, Clone)]
94struct RankedChunk {
95 chunk_id: i64,
96 score: f32,
97 fusion: f32,
98 reranker: Option<f32>,
99 bm25: Option<f32>,
100 dense: Option<f32>,
101 original_rank: Option<usize>,
102}
103
104impl Engine {
105 pub fn new(config_path: Option<&Path>) -> Result<Self> {
106 let config = config::load(config_path)?;
107 let storage = Storage::new(&config.cache_dir)?;
108 let built_models = models::build_inference_clients(&config)?;
109 Ok(Self {
110 storage,
111 config,
112 embedder: built_models.embedder,
113 embedding_document_sizer: built_models.embedding_document_sizer,
114 reranker: built_models.reranker,
115 expander: built_models.expander,
116 })
117 }
118
119 #[cfg(test)]
120 pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
121 Self::from_parts_with_models(storage, config, None, None, None)
122 }
123
124 #[cfg(test)]
125 pub(crate) fn from_parts_with_embedder(
126 storage: Storage,
127 config: Config,
128 embedder: Option<Arc<dyn models::Embedder>>,
129 ) -> Self {
130 Self::from_parts_with_inference(storage, config, embedder, None, None, None)
131 }
132
133 #[cfg(test)]
134 pub(crate) fn from_parts_with_embedding_runtime(
135 storage: Storage,
136 config: Config,
137 embedder: Option<Arc<dyn models::Embedder>>,
138 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
139 ) -> Self {
140 Self::from_parts_with_inference(
141 storage,
142 config,
143 embedder,
144 embedding_document_sizer,
145 None,
146 None,
147 )
148 }
149
150 #[cfg(test)]
151 pub(crate) fn from_parts_with_models(
152 storage: Storage,
153 config: Config,
154 embedder: Option<Arc<dyn models::Embedder>>,
155 reranker: Option<Arc<dyn models::Reranker>>,
156 expander: Option<Arc<dyn models::Expander>>,
157 ) -> Self {
158 Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
159 }
160
161 #[cfg(test)]
162 fn from_parts_with_inference(
163 storage: Storage,
164 config: Config,
165 embedder: Option<Arc<dyn models::Embedder>>,
166 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
167 reranker: Option<Arc<dyn models::Reranker>>,
168 expander: Option<Arc<dyn models::Expander>>,
169 ) -> Self {
170 let built_models =
171 models::build_inference_clients(&config).expect("build inference models");
172 Self {
173 storage,
174 config,
175 embedder: embedder.or(built_models.embedder),
176 embedding_document_sizer: embedding_document_sizer
177 .or(built_models.embedding_document_sizer),
178 reranker: reranker.or(built_models.reranker),
179 expander: expander.or(built_models.expander),
180 }
181 }
182
183 pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
184 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
185 self.storage.create_space(name, description)?;
186 let space = self.storage.get_space(name)?;
187 self.build_space_info(&space)
188 }
189
190 pub fn remove_space(&mut self, name: &str) -> Result<()> {
191 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
192 self.storage.delete_space(name)?;
193 if self.config.default_space.as_deref() == Some(name) {
194 self.persist_default_space(None)?;
195 }
196 Ok(())
197 }
198
199 pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
200 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
201 self.storage.rename_space(old, new)?;
202 if self.config.default_space.as_deref() == Some(old) {
203 self.persist_default_space(Some(new.to_string()))?;
204 }
205 Ok(())
206 }
207
208 pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
209 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
210 self.storage.update_space_description(name, description)
211 }
212
213 pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
214 let spaces = self.storage.list_spaces()?;
215 let mut infos = Vec::with_capacity(spaces.len());
216 for space in spaces {
217 infos.push(self.build_space_info(&space)?);
218 }
219 Ok(infos)
220 }
221
222 pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
223 let space = self.storage.get_space(name)?;
224 self.build_space_info(&space)
225 }
226
227 pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
228 if let Some(space_name) = name {
229 self.storage.get_space(space_name)?;
230 }
231
232 self.persist_default_space(name.map(ToString::to_string))?;
233 Ok(self.config.default_space.clone())
234 }
235
236 fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
237 let previous = self.config.default_space.clone();
238 self.config.default_space = default_space;
239 if let Err(err) = config::save(&self.config) {
240 self.config.default_space = previous;
241 return Err(err);
242 }
243 Ok(())
244 }
245
246 pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
247 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
248 let AddCollectionRequest {
249 path,
250 space: requested_space,
251 name: requested_name,
252 description,
253 extensions,
254 no_index,
255 } = req;
256
257 let space = match requested_space.as_deref() {
258 Some(space_name) => match self.storage.get_space(space_name) {
259 Ok(space) => space,
260 Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
261 self.storage.create_space(space_name, None)?;
262 self.storage.get_space(space_name)?
263 }
264 Err(err) => return Err(err),
265 },
266 None => self.resolve_space_row(None, None)?,
267 };
268 if !path.is_absolute() || !path.is_dir() {
269 return Err(KboltError::InvalidPath(path).into());
270 }
271
272 let name = match requested_name {
273 Some(name) => name,
274 None => path
275 .file_name()
276 .and_then(|name| name.to_str())
277 .map(ToString::to_string)
278 .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
279 };
280
281 self.storage.create_collection(
282 space.id,
283 &name,
284 &path,
285 description.as_deref(),
286 extensions.as_deref(),
287 )?;
288
289 let initial_indexing = if no_index {
290 InitialIndexingOutcome::Skipped
291 } else {
292 match self.update_unlocked(UpdateOptions {
293 space: Some(space.name.clone()),
294 collections: vec![name.clone()],
295 no_embed: false,
296 dry_run: false,
297 verbose: false,
298 }) {
299 Ok(report) => InitialIndexingOutcome::Indexed(report),
300 Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
301 InitialIndexingOutcome::Blocked(
302 InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
303 )
304 }
305 Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
306 InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
307 name,
308 })
309 }
310 Err(err) => return Err(err),
311 }
312 };
313
314 let collection = self.storage.get_collection(space.id, &name)?;
315 Ok(AddCollectionResult {
316 collection: self.build_collection_info(&space.name, &collection)?,
317 initial_indexing,
318 })
319 }
320
321 pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
322 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
323 let resolved = self.resolve_space_row(space, Some(name))?;
324 let collection = self.storage.get_collection(resolved.id, name)?;
325 let documents = self.storage.list_documents(collection.id, false)?;
326 let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
327 let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
328 self.purge_space_chunks(&resolved.name, &chunk_ids)?;
329 self.storage.delete_collection(resolved.id, name)?;
330
331 let ignore_path =
332 collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
333 if ignore_path.is_file() {
334 std::fs::remove_file(ignore_path)?;
335 }
336
337 Ok(())
338 }
339
340 pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
341 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
342 let resolved = self.resolve_space_row(space, Some(old))?;
343 let old_ignore_path =
344 collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
345 let new_ignore_path =
346 collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
347 if old_ignore_path.is_file() && new_ignore_path.exists() {
348 return Err(KboltError::Internal(format!(
349 "cannot rename ignore file: destination already exists: {}",
350 new_ignore_path.display()
351 ))
352 .into());
353 }
354
355 self.storage.rename_collection(resolved.id, old, new)?;
356
357 if old_ignore_path.is_file() {
358 if let Some(parent) = new_ignore_path.parent() {
359 std::fs::create_dir_all(parent)?;
360 }
361 if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
362 match self.storage.rename_collection(resolved.id, new, old) {
363 Ok(()) => {
364 return Err(KboltError::Internal(format!(
365 "renamed collection was rolled back after ignore rename failure: {rename_err}"
366 ))
367 .into())
368 }
369 Err(rollback_err) => {
370 return Err(KboltError::Internal(format!(
371 "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
372 ))
373 .into())
374 }
375 }
376 }
377 }
378
379 Ok(())
380 }
381
382 pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
383 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
384 let resolved = self.resolve_space_row(space, Some(name))?;
385 self.storage
386 .update_collection_description(resolved.id, name, desc)
387 }
388
389 pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
390 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
391 let resolved = self.resolve_space_row(Some(space_name), None)?;
392 let mut map = std::collections::HashMap::new();
393 map.insert(resolved.id, resolved.name.clone());
394 (Some(resolved.id), map)
395 } else {
396 let spaces = self.storage.list_spaces()?;
397 let map = spaces
398 .into_iter()
399 .map(|space| (space.id, space.name))
400 .collect::<std::collections::HashMap<_, _>>();
401 (None, map)
402 };
403
404 let collections = self.storage.list_collections(space_id_filter)?;
405 let mut infos = Vec::with_capacity(collections.len());
406 for collection in collections {
407 let space_name = spaces_by_id
408 .get(&collection.space_id)
409 .ok_or_else(|| {
410 KboltError::Internal(format!(
411 "missing space mapping for collection '{}'",
412 collection.name
413 ))
414 })?
415 .clone();
416 infos.push(self.build_collection_info(&space_name, &collection)?);
417 }
418 Ok(infos)
419 }
420
421 pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
422 let resolved = self.resolve_space_row(space, Some(name))?;
423 let collection = self.storage.get_collection(resolved.id, name)?;
424 self.build_collection_info(&resolved.name, &collection)
425 }
426
427 pub fn list_files(
428 &self,
429 space: Option<&str>,
430 collection: &str,
431 prefix: Option<&str>,
432 ) -> Result<Vec<FileEntry>> {
433 let resolved_space = self.resolve_space_row(space, Some(collection))?;
434 let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
435 let normalized_prefix = normalize_list_prefix(prefix)?;
436 let file_rows = self
437 .storage
438 .list_collection_file_rows(collection_row.id, false)?;
439
440 let mut files = Vec::with_capacity(file_rows.len());
441 for file_row in file_rows {
442 if let Some(prefix) = normalized_prefix.as_deref() {
443 if !path_matches_prefix(&file_row.path, prefix) {
444 continue;
445 }
446 }
447
448 files.push(FileEntry {
449 path: file_row.path,
450 title: file_row.title,
451 docid: short_docid(&file_row.hash),
452 active: file_row.active,
453 chunk_count: file_row.chunk_count,
454 embedded: file_row.chunk_count > 0
455 && file_row.embedded_chunk_count >= file_row.chunk_count,
456 });
457 }
458
459 Ok(files)
460 }
461
462 pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
463 self.get_document_unlocked(req)
464 }
465
466 fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
467 let GetRequest {
468 locator,
469 space,
470 offset,
471 limit,
472 } = req;
473
474 let (document, collection_row, space_name) = match locator {
475 Locator::Path(locator_path) => {
476 let (collection_name, relative_path) = split_collection_path(&locator_path)?;
477 let resolved_space =
478 self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
479 let collection = self
480 .storage
481 .get_collection(resolved_space.id, &collection_name)?;
482 let document = self
483 .storage
484 .get_document_by_path(collection.id, &relative_path)?
485 .ok_or_else(|| KboltError::DocumentNotFound {
486 path: locator_path.clone(),
487 })?;
488 (document, collection, resolved_space.name)
489 }
490 Locator::DocId(docid) => {
491 let prefix = normalize_docid(&docid)?;
492 let mut candidates = self
493 .storage
494 .get_document_by_hash_prefix(&prefix)?
495 .into_iter()
496 .map(|document| {
497 let collection =
498 self.storage.get_collection_by_id(document.collection_id)?;
499 Ok((document, collection))
500 })
501 .collect::<Result<Vec<_>>>()?;
502
503 if let Some(space_name) = space.as_deref() {
504 let resolved_space = self.resolve_space_row(Some(space_name), None)?;
505 candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
506 }
507
508 if candidates.is_empty() {
509 return Err(KboltError::DocumentNotFound {
510 path: format!("#{prefix}"),
511 }
512 .into());
513 }
514
515 if candidates.len() > 1 {
516 return Err(KboltError::InvalidInput(
517 "docid is ambiguous; provide more characters".to_string(),
518 )
519 .into());
520 }
521
522 let (document, collection) = candidates.pop().expect("candidate exists");
523 let space = self.storage.get_space_by_id(collection.space_id)?;
524 (document, collection, space.name)
525 }
526 };
527
528 let full_path = collection_row.path.join(&document.path);
529 let bytes = match std::fs::read(&full_path) {
530 Ok(bytes) => bytes,
531 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
532 return Err(KboltError::FileDeleted(full_path).into())
533 }
534 Err(err) => return Err(err.into()),
535 };
536
537 let raw_content = String::from_utf8_lossy(&bytes).into_owned();
538 let stale = sha256_hex(&bytes) != document.hash;
539
540 let lines = raw_content.lines().collect::<Vec<_>>();
541 let total_lines = lines.len();
542 let start = offset.unwrap_or(0).min(total_lines);
543 let requested = limit.unwrap_or(total_lines.saturating_sub(start));
544 let end = start.saturating_add(requested).min(total_lines);
545 let returned_lines = end.saturating_sub(start);
546 let content = if returned_lines == 0 {
547 String::new()
548 } else {
549 lines[start..end].join("\n")
550 };
551
552 Ok(DocumentResponse {
553 docid: short_docid(&document.hash),
554 path: format!("{}/{}", collection_row.name, document.path),
555 title: document.title,
556 space: space_name,
557 collection: collection_row.name,
558 content,
559 stale,
560 total_lines,
561 returned_lines,
562 })
563 }
564
565 pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
566 if req.max_files == 0 {
567 return Err(
568 KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
569 );
570 }
571 if req.max_bytes == 0 {
572 return Err(
573 KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
574 );
575 }
576
577 let mut documents = Vec::new();
578 let mut omitted = Vec::new();
579 let mut resolved_count = 0usize;
580 let mut warnings = Vec::new();
581 let mut consumed_bytes = 0usize;
582
583 for locator in req.locators {
584 let document = match self.get_document_unlocked(GetRequest {
585 locator,
586 space: req.space.clone(),
587 offset: None,
588 limit: None,
589 }) {
590 Ok(document) => document,
591 Err(err) => match KboltError::from(err) {
592 KboltError::FileDeleted(path) => {
593 warnings.push(format!(
594 "file deleted since indexing: {}. run `kbolt update` to refresh.",
595 path.display()
596 ));
597 continue;
598 }
599 KboltError::DocumentNotFound { path } => {
600 warnings.push(format!("document not found: {path}"));
601 continue;
602 }
603 KboltError::InvalidInput(message) => {
604 warnings.push(format!("invalid locator: {message}"));
605 continue;
606 }
607 KboltError::InvalidPath(path) => {
608 warnings.push(format!("invalid locator path: {}", path.display()));
609 continue;
610 }
611 KboltError::AmbiguousSpace { collection, spaces } => {
612 warnings.push(format!(
613 "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
614 spaces
615 ));
616 continue;
617 }
618 KboltError::SpaceNotFound { name } => {
619 warnings.push(format!("space not found for locator: {name}"));
620 continue;
621 }
622 KboltError::CollectionNotFound { name } => {
623 warnings.push(format!("collection not found for locator: {name}"));
624 continue;
625 }
626 other => return Err(other.into()),
627 },
628 };
629 resolved_count = resolved_count.saturating_add(1);
630
631 let size_bytes = document.content.len();
632 if documents.len() >= req.max_files {
633 omitted.push(OmittedFile {
634 path: document.path,
635 docid: document.docid,
636 size_bytes,
637 reason: OmitReason::MaxFiles,
638 });
639 continue;
640 }
641
642 if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
643 omitted.push(OmittedFile {
644 path: document.path,
645 docid: document.docid,
646 size_bytes,
647 reason: OmitReason::MaxBytes,
648 });
649 continue;
650 }
651
652 consumed_bytes = consumed_bytes.saturating_add(size_bytes);
653 documents.push(document);
654 }
655
656 Ok(MultiGetResponse {
657 resolved_count,
658 documents,
659 omitted,
660 warnings,
661 })
662 }
663
664 pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
665 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
666 let started = Instant::now();
667 let query = req.query.trim();
668 if query.is_empty() {
669 return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
670 }
671
672 let requested_mode = req.mode.clone();
673 let rerank_enabled =
674 matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
675 let mut pipeline = self.initial_search_pipeline(&requested_mode);
676
677 let targets = self.resolve_targets(TargetScope {
678 space: req.space.as_deref(),
679 collections: &req.collections,
680 })?;
681
682 let staleness_hint = targets
683 .iter()
684 .map(|target| target.collection.updated.clone())
685 .max()
686 .map(|updated| format!("Index last updated: {updated}"));
687
688 if req.limit == 0 || targets.is_empty() {
689 return Ok(SearchResponse {
690 results: Vec::new(),
691 query: req.query,
692 requested_mode: requested_mode.clone(),
693 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
694 pipeline,
695 staleness_hint,
696 elapsed_ms: started.elapsed().as_millis() as u64,
697 });
698 }
699
700 let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
701 for target in &targets {
702 collections_by_id.insert(
703 target.collection.id,
704 SearchCollectionMeta {
705 space: target.space.clone(),
706 collection: target.collection.name.clone(),
707 path: target.collection.path.clone(),
708 },
709 );
710 }
711
712 let max_candidates = self.max_search_candidates(&targets)?;
713 let mut retrieval_limit =
714 self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
715 let results = loop {
716 let ranked_chunks = self.rank_chunks_for_mode(
717 &requested_mode,
718 &targets,
719 query,
720 retrieval_limit,
721 req.min_score,
722 &mut pipeline,
723 )?;
724
725 if ranked_chunks.is_empty() {
726 return Ok(SearchResponse {
727 results: Vec::new(),
728 query: req.query,
729 requested_mode: requested_mode.clone(),
730 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
731 pipeline,
732 staleness_hint,
733 elapsed_ms: started.elapsed().as_millis() as u64,
734 });
735 }
736
737 let ranked_len = ranked_chunks.len();
738 let results = self.assemble_search_results(
739 query,
740 &requested_mode,
741 ranked_chunks,
742 &collections_by_id,
743 req.debug,
744 rerank_enabled,
745 &mut pipeline,
746 req.limit,
747 )?;
748
749 if results.len() >= req.limit
750 || ranked_len < retrieval_limit
751 || retrieval_limit >= max_candidates
752 {
753 break results;
754 }
755
756 let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
757 if next_limit <= retrieval_limit {
758 break results;
759 }
760 retrieval_limit = next_limit;
761 };
762
763 Ok(SearchResponse {
764 results,
765 query: req.query,
766 requested_mode: requested_mode.clone(),
767 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
768 pipeline,
769 staleness_hint,
770 elapsed_ms: started.elapsed().as_millis() as u64,
771 })
772 }
773
774 pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
775 let resolved = self.resolve_space_row(explicit, None)?;
776 Ok(resolved.name)
777 }
778
779 pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
780 let resolved = self.resolve_preferred_space(explicit)?;
781 Ok(resolved.map(|(space, source)| ActiveSpace {
782 name: space.name,
783 source,
784 }))
785 }
786
787 pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
788 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
789 let (spaces, totals_scope) = if let Some(space_name) = space {
790 let resolved = self.resolve_space_row(Some(space_name), None)?;
791 (vec![resolved.clone()], Some(resolved.id))
792 } else {
793 (self.storage.list_spaces()?, None)
794 };
795
796 let mut space_statuses = Vec::with_capacity(spaces.len());
797 for space_row in spaces {
798 let collections = self.storage.list_collections(Some(space_row.id))?;
799 let mut collection_statuses = Vec::with_capacity(collections.len());
800 let mut last_updated: Option<String> = None;
801
802 for collection in collections {
803 if last_updated
804 .as_ref()
805 .map(|existing| collection.updated > *existing)
806 .unwrap_or(true)
807 {
808 last_updated = Some(collection.updated.clone());
809 }
810
811 let documents = self
812 .storage
813 .count_documents_in_collection(collection.id, false)?;
814 let active_documents = self
815 .storage
816 .count_documents_in_collection(collection.id, true)?;
817 let chunks = self.storage.count_chunks_in_collection(collection.id)?;
818 let embedded_chunks = self
819 .storage
820 .count_embedded_chunks_in_collection(collection.id)?;
821
822 collection_statuses.push(CollectionStatus {
823 name: collection.name,
824 path: collection.path,
825 documents,
826 active_documents,
827 chunks,
828 embedded_chunks,
829 last_updated: collection.updated,
830 });
831 }
832
833 space_statuses.push(SpaceStatus {
834 name: space_row.name,
835 description: space_row.description,
836 collections: collection_statuses,
837 last_updated,
838 });
839 }
840
841 let models = self.model_status_unlocked()?;
842
843 Ok(StatusResponse {
844 spaces: space_statuses,
845 models,
846 cache_dir: self.config.cache_dir.clone(),
847 config_dir: self.config.config_dir.clone(),
848 total_documents: self.storage.count_documents(totals_scope)?,
849 total_chunks: self.storage.count_chunks(totals_scope)?,
850 total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
851 disk_usage: self.storage.disk_usage()?,
852 })
853 }
854
855 pub fn model_status(&self) -> Result<ModelStatus> {
856 self.model_status_unlocked()
857 }
858
859 fn model_status_unlocked(&self) -> Result<ModelStatus> {
860 models::status(&self.config)
861 }
862
863 pub fn config(&self) -> &Config {
864 &self.config
865 }
866
867 pub fn storage(&self) -> &Storage {
868 &self.storage
869 }
870
871 fn embedding_model_key(&self) -> &str {
872 self.config
873 .roles
874 .embedder
875 .as_ref()
876 .and_then(|role| self.config.providers.get(&role.provider))
877 .map(|profile| profile.model())
878 .unwrap_or("embedder")
879 }
880
881 fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
882 OperationLock::acquire(&self.config.cache_dir, mode)
883 }
884
885 fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
886 let mut chunk_ids = Vec::new();
887 for doc_id in doc_ids {
888 let chunks = self.storage.get_chunks_for_document(*doc_id)?;
889 chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
890 }
891 Ok(chunk_ids)
892 }
893
894 fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
895 if chunk_ids.is_empty() {
896 return Ok(());
897 }
898
899 self.storage.delete_tantivy(space, chunk_ids)?;
900 self.storage.commit_tantivy(space)?;
901 self.storage.delete_usearch(space, chunk_ids)?;
902 Ok(())
903 }
904
905 fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
906 let collection_count = self.storage.list_collections(Some(space.id))?.len();
907 let document_count = self.storage.count_documents(Some(space.id))?;
908 let chunk_count = self.storage.count_chunks(Some(space.id))?;
909
910 Ok(SpaceInfo {
911 name: space.name.clone(),
912 description: space.description.clone(),
913 collection_count,
914 document_count,
915 chunk_count,
916 created: space.created.clone(),
917 })
918 }
919
920 fn build_collection_info(
921 &self,
922 space_name: &str,
923 collection: &CollectionRow,
924 ) -> Result<CollectionInfo> {
925 let document_count = self
926 .storage
927 .count_documents_in_collection(collection.id, false)?;
928 let active_document_count = self
929 .storage
930 .count_documents_in_collection(collection.id, true)?;
931 let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
932 let embedded_chunk_count = self
933 .storage
934 .count_embedded_chunks_in_collection(collection.id)?;
935
936 Ok(CollectionInfo {
937 name: collection.name.clone(),
938 space: space_name.to_string(),
939 path: collection.path.clone(),
940 description: collection.description.clone(),
941 extensions: collection.extensions.clone(),
942 document_count,
943 active_document_count,
944 chunk_count,
945 embedded_chunk_count,
946 created: collection.created.clone(),
947 updated: collection.updated.clone(),
948 })
949 }
950
951 fn resolve_space_row(
952 &self,
953 explicit: Option<&str>,
954 collection_for_lookup: Option<&str>,
955 ) -> Result<crate::storage::SpaceRow> {
956 if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
957 return Ok(space);
958 }
959
960 if let Some(collection) = collection_for_lookup {
961 return match self.storage.find_space_for_collection(collection)? {
962 SpaceResolution::Found(space) => Ok(space),
963 SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
964 collection: collection.to_string(),
965 spaces,
966 }
967 .into()),
968 SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
969 name: collection.to_string(),
970 }
971 .into()),
972 };
973 }
974
975 Err(KboltError::NoActiveSpace.into())
976 }
977
978 fn resolve_preferred_space(
979 &self,
980 explicit: Option<&str>,
981 ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
982 if let Some(space_name) = explicit {
983 let space = self.storage.get_space(space_name)?;
984 return Ok(Some((space, ActiveSpaceSource::Flag)));
985 }
986
987 if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
988 let trimmed = space_name.trim();
989 if !trimmed.is_empty() {
990 let space = self.storage.get_space(trimmed)?;
991 return Ok(Some((space, ActiveSpaceSource::EnvVar)));
992 }
993 }
994
995 if let Some(space_name) = self.config.default_space.as_deref() {
996 let space = self.storage.get_space(space_name)?;
997 return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
998 }
999
1000 Ok(None)
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests;
1006
1007#[cfg(test)]
1008mod lock_relief_tests {
1009 use fs2::FileExt;
1010 use std::collections::HashMap;
1011 use std::fs::OpenOptions;
1012 use std::mem;
1013
1014 use tempfile::tempdir;
1015
1016 use super::Engine;
1017 use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1018 use crate::storage::Storage;
1019 use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1020
1021 #[test]
1022 fn metadata_reads_succeed_while_global_lock_is_held() {
1023 let engine = test_engine_with_indexed_collection();
1024 let _holder = hold_global_lock(&engine);
1025
1026 let spaces = engine.list_spaces().expect("list spaces");
1027 assert_eq!(
1028 spaces
1029 .iter()
1030 .map(|space| space.name.as_str())
1031 .collect::<Vec<_>>(),
1032 vec!["default", "work"]
1033 );
1034
1035 let space = engine.space_info("work").expect("load space info");
1036 assert_eq!(space.name, "work");
1037 assert_eq!(space.collection_count, 1);
1038
1039 let collections = engine
1040 .list_collections(Some("work"))
1041 .expect("list collections");
1042 assert_eq!(collections.len(), 1);
1043 assert_eq!(collections[0].name, "api");
1044
1045 let collection = engine
1046 .collection_info(Some("work"), "api")
1047 .expect("load collection info");
1048 assert_eq!(collection.name, "api");
1049 assert_eq!(collection.document_count, 1);
1050
1051 let models = engine.model_status().expect("read model status");
1052 assert!(!models.embedder.configured);
1053 assert!(!models.reranker.configured);
1054 assert!(!models.expander.configured);
1055 }
1056
1057 #[test]
1058 fn document_reads_succeed_while_global_lock_is_held() {
1059 let engine = test_engine_with_indexed_collection();
1060 let _holder = hold_global_lock(&engine);
1061
1062 let files = engine
1063 .list_files(Some("work"), "api", None)
1064 .expect("list files");
1065 assert_eq!(files.len(), 1);
1066 assert_eq!(files[0].path, "guide.md");
1067
1068 let document = engine
1069 .get_document(GetRequest {
1070 locator: Locator::Path("api/guide.md".to_string()),
1071 space: Some("work".to_string()),
1072 offset: None,
1073 limit: None,
1074 })
1075 .expect("read document");
1076 assert_eq!(document.path, "api/guide.md");
1077 assert!(document.content.contains("hello from kbolt"));
1078
1079 let response = engine
1080 .multi_get(MultiGetRequest {
1081 locators: vec![Locator::Path("api/guide.md".to_string())],
1082 space: Some("work".to_string()),
1083 max_files: 4,
1084 max_bytes: 4_096,
1085 })
1086 .expect("read multiple documents");
1087 assert_eq!(response.resolved_count, 1);
1088 assert_eq!(response.documents.len(), 1);
1089 assert!(response.warnings.is_empty());
1090 }
1091
1092 fn test_engine_with_indexed_collection() -> Engine {
1093 let root = tempdir().expect("create temp root");
1094 let root_path = root.path().to_path_buf();
1095 mem::forget(root);
1096
1097 let config_dir = root_path.join("config");
1098 let cache_dir = root_path.join("cache");
1099 let docs_dir = root_path.join("docs");
1100 std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1101 std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1102 .expect("write document");
1103
1104 let storage = Storage::new(&cache_dir).expect("create storage");
1105 let config = Config {
1106 config_dir,
1107 cache_dir,
1108 default_space: None,
1109 providers: HashMap::new(),
1110 roles: crate::config::RoleBindingsConfig::default(),
1111 reaping: ReapingConfig { days: 7 },
1112 chunking: ChunkingConfig::default(),
1113 ranking: RankingConfig::default(),
1114 };
1115 let engine = Engine::from_parts(storage, config);
1116
1117 engine
1118 .add_collection(AddCollectionRequest {
1119 path: docs_dir,
1120 space: Some("work".to_string()),
1121 name: Some("api".to_string()),
1122 description: None,
1123 extensions: None,
1124 no_index: true,
1125 })
1126 .expect("add collection");
1127 engine
1128 .update(UpdateOptions {
1129 space: Some("work".to_string()),
1130 collections: vec!["api".to_string()],
1131 no_embed: true,
1132 dry_run: false,
1133 verbose: false,
1134 })
1135 .expect("index collection");
1136
1137 engine
1138 }
1139
1140 fn hold_global_lock(engine: &Engine) -> std::fs::File {
1141 std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1142 let holder = OpenOptions::new()
1143 .read(true)
1144 .write(true)
1145 .create(true)
1146 .truncate(false)
1147 .open(engine.config().cache_dir.join("kbolt.lock"))
1148 .expect("open lock file");
1149 FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1150 holder
1151 }
1152}