1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::chunk::{chunk_document, chunk_document_with_counter, resolve_policy};
10use crate::ingest::extract::default_registry;
11use crate::lock::{LockMode, OperationLock};
12use crate::models;
13use crate::storage::Storage;
14use crate::storage::{
15 ChunkInsert, ChunkRow, CollectionRow, DocumentRow, DocumentTitleSource, SpaceResolution,
16 TantivyEntry,
17};
18use crate::Result;
19use kbolt_types::{
20 ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
21 CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
22 InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
23 OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
24 SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
25 SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
26};
27mod eval_ops;
28mod file_utils;
29pub(crate) mod ignore_helpers;
30mod ignore_ops;
31mod path_utils;
32mod schedule_ops;
33mod schedule_run_ops;
34mod schedule_status_ops;
35mod scoring;
36mod search_ops;
37mod text_helpers;
38mod update_ops;
39use file_utils::{file_error, file_title, modified_token, sha256_hex};
40use ignore_helpers::{
41 collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
42 load_collection_ignore_matcher, validate_ignore_pattern,
43};
44use path_utils::{
45 collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
46 normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
47};
48use scoring::{dense_distance_to_score, max_option};
49pub(crate) use text_helpers::retrieval_text_with_prefix;
50use text_helpers::{chunk_text_from_bytes, search_text_with_neighbors};
51
52pub struct Engine {
53 storage: Storage,
54 config: Config,
55 embedder: Option<Arc<dyn models::Embedder>>,
56 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
57 reranker: Option<Arc<dyn models::Reranker>>,
58 expander: Option<Arc<dyn models::Expander>>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct IgnoreListEntry {
63 pub space: String,
64 pub collection: String,
65 pub pattern_count: usize,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct UpdateTarget {
70 pub space: String,
71 pub collection: CollectionRow,
72}
73
74#[derive(Debug, Clone, Copy)]
75struct TargetScope<'a> {
76 space: Option<&'a str>,
77 collections: &'a [String],
78}
79
80#[derive(Debug, Clone)]
81struct SearchCollectionMeta {
82 space: String,
83 collection: String,
84 path: std::path::PathBuf,
85}
86
87#[derive(Debug, Clone)]
88struct SearchHitCandidate {
89 chunk_id: i64,
90 bm25_score: f32,
91}
92
93#[derive(Debug, Clone)]
94struct RankedChunk {
95 chunk_id: i64,
96 score: f32,
97 fusion: f32,
98 reranker: Option<f32>,
99 bm25: Option<f32>,
100 dense: Option<f32>,
101 original_rank: Option<usize>,
102}
103
104impl Engine {
105 pub fn new(config_path: Option<&Path>) -> Result<Self> {
106 Self::new_with_recovery_notice(config_path, None)
107 }
108
109 pub fn new_with_recovery_notice(
110 config_path: Option<&Path>,
111 recovery_notice: Option<crate::RecoveryNoticeSink>,
112 ) -> Result<Self> {
113 let config = config::load(config_path)?;
114 let storage = Storage::new(&config.cache_dir)?;
115 let built_models =
116 models::build_inference_clients_with_recovery_notice(&config, recovery_notice)?;
117 Ok(Self {
118 storage,
119 config,
120 embedder: built_models.embedder,
121 embedding_document_sizer: built_models.embedding_document_sizer,
122 reranker: built_models.reranker,
123 expander: built_models.expander,
124 })
125 }
126
127 #[cfg(test)]
128 pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
129 Self::from_parts_with_models(storage, config, None, None, None)
130 }
131
132 #[cfg(test)]
133 pub(crate) fn from_parts_with_embedder(
134 storage: Storage,
135 config: Config,
136 embedder: Option<Arc<dyn models::Embedder>>,
137 ) -> Self {
138 Self::from_parts_with_inference(storage, config, embedder, None, None, None)
139 }
140
141 #[cfg(test)]
142 pub(crate) fn from_parts_with_embedding_runtime(
143 storage: Storage,
144 config: Config,
145 embedder: Option<Arc<dyn models::Embedder>>,
146 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
147 ) -> Self {
148 Self::from_parts_with_inference(
149 storage,
150 config,
151 embedder,
152 embedding_document_sizer,
153 None,
154 None,
155 )
156 }
157
158 #[cfg(test)]
159 pub(crate) fn from_parts_with_models(
160 storage: Storage,
161 config: Config,
162 embedder: Option<Arc<dyn models::Embedder>>,
163 reranker: Option<Arc<dyn models::Reranker>>,
164 expander: Option<Arc<dyn models::Expander>>,
165 ) -> Self {
166 Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
167 }
168
169 #[cfg(test)]
170 fn from_parts_with_inference(
171 storage: Storage,
172 config: Config,
173 embedder: Option<Arc<dyn models::Embedder>>,
174 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
175 reranker: Option<Arc<dyn models::Reranker>>,
176 expander: Option<Arc<dyn models::Expander>>,
177 ) -> Self {
178 let built_models =
179 models::build_inference_clients(&config).expect("build inference models");
180 Self {
181 storage,
182 config,
183 embedder: embedder.or(built_models.embedder),
184 embedding_document_sizer: embedding_document_sizer
185 .or(built_models.embedding_document_sizer),
186 reranker: reranker.or(built_models.reranker),
187 expander: expander.or(built_models.expander),
188 }
189 }
190
191 pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
192 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
193 self.storage.create_space(name, description)?;
194 let space = self.storage.get_space(name)?;
195 self.build_space_info(&space)
196 }
197
198 pub fn remove_space(&mut self, name: &str) -> Result<()> {
199 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
200 self.storage.delete_space(name)?;
201 if self.config.default_space.as_deref() == Some(name) {
202 self.persist_default_space(None)?;
203 }
204 Ok(())
205 }
206
207 pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
208 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
209 self.storage.rename_space(old, new)?;
210 if self.config.default_space.as_deref() == Some(old) {
211 self.persist_default_space(Some(new.to_string()))?;
212 }
213 Ok(())
214 }
215
216 pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
217 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
218 self.storage.update_space_description(name, description)
219 }
220
221 pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
222 let spaces = self.storage.list_spaces()?;
223 let mut infos = Vec::with_capacity(spaces.len());
224 for space in spaces {
225 infos.push(self.build_space_info(&space)?);
226 }
227 Ok(infos)
228 }
229
230 pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
231 let space = self.storage.get_space(name)?;
232 self.build_space_info(&space)
233 }
234
235 pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
236 if let Some(space_name) = name {
237 self.storage.get_space(space_name)?;
238 }
239
240 self.persist_default_space(name.map(ToString::to_string))?;
241 Ok(self.config.default_space.clone())
242 }
243
244 fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
245 let previous = self.config.default_space.clone();
246 self.config.default_space = default_space;
247 if let Err(err) = config::save(&self.config) {
248 self.config.default_space = previous;
249 return Err(err);
250 }
251 Ok(())
252 }
253
254 pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
255 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
256 let AddCollectionRequest {
257 path,
258 space: requested_space,
259 name: requested_name,
260 description,
261 extensions,
262 no_index,
263 } = req;
264
265 let space = match requested_space.as_deref() {
266 Some(space_name) => match self.storage.get_space(space_name) {
267 Ok(space) => space,
268 Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
269 self.storage.create_space(space_name, None)?;
270 self.storage.get_space(space_name)?
271 }
272 Err(err) => return Err(err),
273 },
274 None => self.resolve_space_row(None, None)?,
275 };
276 if !path.is_absolute() || !path.is_dir() {
277 return Err(KboltError::InvalidPath(path).into());
278 }
279
280 let name = match requested_name {
281 Some(name) => name,
282 None => path
283 .file_name()
284 .and_then(|name| name.to_str())
285 .map(ToString::to_string)
286 .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
287 };
288
289 self.storage.create_collection(
290 space.id,
291 &name,
292 &path,
293 description.as_deref(),
294 extensions.as_deref(),
295 )?;
296
297 let initial_indexing = if no_index {
298 InitialIndexingOutcome::Skipped
299 } else {
300 match self.update_unlocked(UpdateOptions {
301 space: Some(space.name.clone()),
302 collections: vec![name.clone()],
303 no_embed: false,
304 dry_run: false,
305 verbose: false,
306 }) {
307 Ok(report) => InitialIndexingOutcome::Indexed(report),
308 Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
309 InitialIndexingOutcome::Blocked(
310 InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
311 )
312 }
313 Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
314 InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
315 name,
316 })
317 }
318 Err(err) => return Err(err),
319 }
320 };
321
322 let collection = self.storage.get_collection(space.id, &name)?;
323 Ok(AddCollectionResult {
324 collection: self.build_collection_info(&space.name, &collection)?,
325 initial_indexing,
326 })
327 }
328
329 pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
330 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
331 let resolved = self.resolve_space_row(space, Some(name))?;
332 let collection = self.storage.get_collection(resolved.id, name)?;
333 let documents = self.storage.list_documents(collection.id, false)?;
334 let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
335 let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
336 self.purge_space_chunks(&resolved.name, &chunk_ids)?;
337 self.storage.delete_collection(resolved.id, name)?;
338
339 let ignore_path =
340 collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
341 if ignore_path.is_file() {
342 std::fs::remove_file(ignore_path)?;
343 }
344
345 Ok(())
346 }
347
348 pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
349 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
350 let resolved = self.resolve_space_row(space, Some(old))?;
351 let old_ignore_path =
352 collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
353 let new_ignore_path =
354 collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
355 if old_ignore_path.is_file() && new_ignore_path.exists() {
356 return Err(KboltError::Internal(format!(
357 "cannot rename ignore file: destination already exists: {}",
358 new_ignore_path.display()
359 ))
360 .into());
361 }
362
363 self.storage.rename_collection(resolved.id, old, new)?;
364
365 if old_ignore_path.is_file() {
366 if let Some(parent) = new_ignore_path.parent() {
367 std::fs::create_dir_all(parent)?;
368 }
369 if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
370 match self.storage.rename_collection(resolved.id, new, old) {
371 Ok(()) => {
372 return Err(KboltError::Internal(format!(
373 "renamed collection was rolled back after ignore rename failure: {rename_err}"
374 ))
375 .into())
376 }
377 Err(rollback_err) => {
378 return Err(KboltError::Internal(format!(
379 "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
380 ))
381 .into())
382 }
383 }
384 }
385 }
386
387 Ok(())
388 }
389
390 pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
391 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
392 let resolved = self.resolve_space_row(space, Some(name))?;
393 self.storage
394 .update_collection_description(resolved.id, name, desc)
395 }
396
397 pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
398 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
399 let resolved = self.resolve_space_row(Some(space_name), None)?;
400 let mut map = std::collections::HashMap::new();
401 map.insert(resolved.id, resolved.name.clone());
402 (Some(resolved.id), map)
403 } else {
404 let spaces = self.storage.list_spaces()?;
405 let map = spaces
406 .into_iter()
407 .map(|space| (space.id, space.name))
408 .collect::<std::collections::HashMap<_, _>>();
409 (None, map)
410 };
411
412 let collections = self.storage.list_collections(space_id_filter)?;
413 let mut infos = Vec::with_capacity(collections.len());
414 for collection in collections {
415 let space_name = spaces_by_id
416 .get(&collection.space_id)
417 .ok_or_else(|| {
418 KboltError::Internal(format!(
419 "missing space mapping for collection '{}'",
420 collection.name
421 ))
422 })?
423 .clone();
424 infos.push(self.build_collection_info(&space_name, &collection)?);
425 }
426 Ok(infos)
427 }
428
429 pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
430 let resolved = self.resolve_space_row(space, Some(name))?;
431 let collection = self.storage.get_collection(resolved.id, name)?;
432 self.build_collection_info(&resolved.name, &collection)
433 }
434
435 pub fn list_files(
436 &self,
437 space: Option<&str>,
438 collection: &str,
439 prefix: Option<&str>,
440 ) -> Result<Vec<FileEntry>> {
441 let resolved_space = self.resolve_space_row(space, Some(collection))?;
442 let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
443 let normalized_prefix = normalize_list_prefix(prefix)?;
444 let file_rows = self
445 .storage
446 .list_collection_file_rows(collection_row.id, false)?;
447
448 let mut files = Vec::with_capacity(file_rows.len());
449 for file_row in file_rows {
450 if let Some(prefix) = normalized_prefix.as_deref() {
451 if !path_matches_prefix(&file_row.path, prefix) {
452 continue;
453 }
454 }
455
456 files.push(FileEntry {
457 path: file_row.path,
458 title: file_row.title,
459 docid: short_docid(&file_row.hash),
460 active: file_row.active,
461 chunk_count: file_row.chunk_count,
462 embedded: file_row.chunk_count > 0
463 && file_row.embedded_chunk_count >= file_row.chunk_count,
464 });
465 }
466
467 Ok(files)
468 }
469
470 pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
471 self.get_document_unlocked(req)
472 }
473
474 fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
475 let GetRequest {
476 locator,
477 space,
478 offset,
479 limit,
480 } = req;
481
482 let (document, collection_row, space_name) = match locator {
483 Locator::Path(locator_path) => {
484 let (collection_name, relative_path) = split_collection_path(&locator_path)?;
485 let resolved_space =
486 self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
487 let collection = self
488 .storage
489 .get_collection(resolved_space.id, &collection_name)?;
490 let document = self
491 .storage
492 .get_document_by_path(collection.id, &relative_path)?
493 .ok_or_else(|| KboltError::DocumentNotFound {
494 path: locator_path.clone(),
495 })?;
496 (document, collection, resolved_space.name)
497 }
498 Locator::DocId(docid) => {
499 let prefix = normalize_docid(&docid)?;
500 let mut candidates = self
501 .storage
502 .get_document_by_hash_prefix(&prefix)?
503 .into_iter()
504 .map(|document| {
505 let collection =
506 self.storage.get_collection_by_id(document.collection_id)?;
507 Ok((document, collection))
508 })
509 .collect::<Result<Vec<_>>>()?;
510
511 if let Some(space_name) = space.as_deref() {
512 let resolved_space = self.resolve_space_row(Some(space_name), None)?;
513 candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
514 }
515
516 if candidates.is_empty() {
517 return Err(KboltError::DocumentNotFound {
518 path: format!("#{prefix}"),
519 }
520 .into());
521 }
522
523 if candidates.len() > 1 {
524 return Err(KboltError::InvalidInput(
525 "docid is ambiguous; provide more characters".to_string(),
526 )
527 .into());
528 }
529
530 let (document, collection) = candidates.pop().expect("candidate exists");
531 let space = self.storage.get_space_by_id(collection.space_id)?;
532 (document, collection, space.name)
533 }
534 };
535
536 let full_path = collection_row.path.join(&document.path);
537 let bytes = match std::fs::read(&full_path) {
538 Ok(bytes) => bytes,
539 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
540 return Err(KboltError::FileDeleted(full_path).into())
541 }
542 Err(err) => return Err(err.into()),
543 };
544
545 let raw_content = String::from_utf8_lossy(&bytes).into_owned();
546 let stale = sha256_hex(&bytes) != document.hash;
547
548 let lines = raw_content.lines().collect::<Vec<_>>();
549 let total_lines = lines.len();
550 let start = offset.unwrap_or(0).min(total_lines);
551 let requested = limit.unwrap_or(total_lines.saturating_sub(start));
552 let end = start.saturating_add(requested).min(total_lines);
553 let returned_lines = end.saturating_sub(start);
554 let content = if returned_lines == 0 {
555 String::new()
556 } else {
557 lines[start..end].join("\n")
558 };
559
560 Ok(DocumentResponse {
561 docid: short_docid(&document.hash),
562 path: format!("{}/{}", collection_row.name, document.path),
563 title: document.title,
564 space: space_name,
565 collection: collection_row.name,
566 content,
567 stale,
568 total_lines,
569 returned_lines,
570 })
571 }
572
573 pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
574 if req.max_files == 0 {
575 return Err(
576 KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
577 );
578 }
579 if req.max_bytes == 0 {
580 return Err(
581 KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
582 );
583 }
584
585 let mut documents = Vec::new();
586 let mut omitted = Vec::new();
587 let mut resolved_count = 0usize;
588 let mut warnings = Vec::new();
589 let mut consumed_bytes = 0usize;
590
591 for locator in req.locators {
592 let document = match self.get_document_unlocked(GetRequest {
593 locator,
594 space: req.space.clone(),
595 offset: None,
596 limit: None,
597 }) {
598 Ok(document) => document,
599 Err(err) => match KboltError::from(err) {
600 KboltError::FileDeleted(path) => {
601 warnings.push(format!(
602 "file deleted since indexing: {}. run `kbolt update` to refresh.",
603 path.display()
604 ));
605 continue;
606 }
607 KboltError::DocumentNotFound { path } => {
608 warnings.push(format!("document not found: {path}"));
609 continue;
610 }
611 KboltError::InvalidInput(message) => {
612 warnings.push(format!("invalid locator: {message}"));
613 continue;
614 }
615 KboltError::InvalidPath(path) => {
616 warnings.push(format!("invalid locator path: {}", path.display()));
617 continue;
618 }
619 KboltError::AmbiguousSpace { collection, spaces } => {
620 warnings.push(format!(
621 "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
622 spaces
623 ));
624 continue;
625 }
626 KboltError::SpaceNotFound { name } => {
627 warnings.push(format!("space not found for locator: {name}"));
628 continue;
629 }
630 KboltError::CollectionNotFound { name } => {
631 warnings.push(format!("collection not found for locator: {name}"));
632 continue;
633 }
634 other => return Err(other.into()),
635 },
636 };
637 resolved_count = resolved_count.saturating_add(1);
638
639 let size_bytes = document.content.len();
640 if documents.len() >= req.max_files {
641 omitted.push(OmittedFile {
642 path: document.path,
643 docid: document.docid,
644 size_bytes,
645 reason: OmitReason::MaxFiles,
646 });
647 continue;
648 }
649
650 if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
651 omitted.push(OmittedFile {
652 path: document.path,
653 docid: document.docid,
654 size_bytes,
655 reason: OmitReason::MaxBytes,
656 });
657 continue;
658 }
659
660 consumed_bytes = consumed_bytes.saturating_add(size_bytes);
661 documents.push(document);
662 }
663
664 Ok(MultiGetResponse {
665 resolved_count,
666 documents,
667 omitted,
668 warnings,
669 })
670 }
671
672 pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
673 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
674 let started = Instant::now();
675 let query = req.query.trim();
676 if query.is_empty() {
677 return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
678 }
679
680 let requested_mode = req.mode.clone();
681 let rerank_enabled =
682 matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
683 let mut pipeline = self.initial_search_pipeline(&requested_mode);
684
685 let targets = self.resolve_targets(TargetScope {
686 space: req.space.as_deref(),
687 collections: &req.collections,
688 })?;
689
690 let staleness_hint = targets
691 .iter()
692 .map(|target| target.collection.updated.clone())
693 .max()
694 .map(|updated| format!("Index last updated: {updated}"));
695
696 if req.limit == 0 || targets.is_empty() {
697 return Ok(SearchResponse {
698 results: Vec::new(),
699 query: req.query,
700 requested_mode: requested_mode.clone(),
701 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
702 pipeline,
703 staleness_hint,
704 elapsed_ms: started.elapsed().as_millis() as u64,
705 });
706 }
707
708 let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
709 for target in &targets {
710 collections_by_id.insert(
711 target.collection.id,
712 SearchCollectionMeta {
713 space: target.space.clone(),
714 collection: target.collection.name.clone(),
715 path: target.collection.path.clone(),
716 },
717 );
718 }
719
720 let max_candidates = self.max_search_candidates(&targets)?;
721 let mut retrieval_limit =
722 self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
723 let results = loop {
724 let ranked_chunks = self.rank_chunks_for_mode(
725 &requested_mode,
726 &targets,
727 query,
728 retrieval_limit,
729 req.min_score,
730 &mut pipeline,
731 )?;
732
733 if ranked_chunks.is_empty() {
734 return Ok(SearchResponse {
735 results: Vec::new(),
736 query: req.query,
737 requested_mode: requested_mode.clone(),
738 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
739 pipeline,
740 staleness_hint,
741 elapsed_ms: started.elapsed().as_millis() as u64,
742 });
743 }
744
745 let ranked_len = ranked_chunks.len();
746 let results = self.assemble_search_results(
747 query,
748 &requested_mode,
749 ranked_chunks,
750 &collections_by_id,
751 req.debug,
752 rerank_enabled,
753 &mut pipeline,
754 req.limit,
755 )?;
756
757 if results.len() >= req.limit
758 || ranked_len < retrieval_limit
759 || retrieval_limit >= max_candidates
760 {
761 break results;
762 }
763
764 let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
765 if next_limit <= retrieval_limit {
766 break results;
767 }
768 retrieval_limit = next_limit;
769 };
770
771 Ok(SearchResponse {
772 results,
773 query: req.query,
774 requested_mode: requested_mode.clone(),
775 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
776 pipeline,
777 staleness_hint,
778 elapsed_ms: started.elapsed().as_millis() as u64,
779 })
780 }
781
782 pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
783 let resolved = self.resolve_space_row(explicit, None)?;
784 Ok(resolved.name)
785 }
786
787 pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
788 let resolved = self.resolve_preferred_space(explicit)?;
789 Ok(resolved.map(|(space, source)| ActiveSpace {
790 name: space.name,
791 source,
792 }))
793 }
794
795 pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
796 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
797 let (spaces, totals_scope) = if let Some(space_name) = space {
798 let resolved = self.resolve_space_row(Some(space_name), None)?;
799 (vec![resolved.clone()], Some(resolved.id))
800 } else {
801 (self.storage.list_spaces()?, None)
802 };
803
804 let mut space_statuses = Vec::with_capacity(spaces.len());
805 for space_row in spaces {
806 let collections = self.storage.list_collections(Some(space_row.id))?;
807 let mut collection_statuses = Vec::with_capacity(collections.len());
808 let mut last_updated: Option<String> = None;
809
810 for collection in collections {
811 if last_updated
812 .as_ref()
813 .map(|existing| collection.updated > *existing)
814 .unwrap_or(true)
815 {
816 last_updated = Some(collection.updated.clone());
817 }
818
819 let documents = self
820 .storage
821 .count_documents_in_collection(collection.id, false)?;
822 let active_documents = self
823 .storage
824 .count_documents_in_collection(collection.id, true)?;
825 let chunks = self.storage.count_chunks_in_collection(collection.id)?;
826 let embedded_chunks = self
827 .storage
828 .count_embedded_chunks_in_collection(collection.id)?;
829
830 collection_statuses.push(CollectionStatus {
831 name: collection.name,
832 path: collection.path,
833 documents,
834 active_documents,
835 chunks,
836 embedded_chunks,
837 last_updated: collection.updated,
838 });
839 }
840
841 space_statuses.push(SpaceStatus {
842 name: space_row.name,
843 description: space_row.description,
844 collections: collection_statuses,
845 last_updated,
846 });
847 }
848
849 let models = self.model_status_unlocked()?;
850
851 Ok(StatusResponse {
852 spaces: space_statuses,
853 models,
854 cache_dir: self.config.cache_dir.clone(),
855 config_dir: self.config.config_dir.clone(),
856 total_documents: self.storage.count_documents(totals_scope)?,
857 total_chunks: self.storage.count_chunks(totals_scope)?,
858 total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
859 disk_usage: self.storage.disk_usage()?,
860 })
861 }
862
863 pub fn model_status(&self) -> Result<ModelStatus> {
864 self.model_status_unlocked()
865 }
866
867 fn model_status_unlocked(&self) -> Result<ModelStatus> {
868 models::status(&self.config)
869 }
870
871 pub fn config(&self) -> &Config {
872 &self.config
873 }
874
875 pub fn storage(&self) -> &Storage {
876 &self.storage
877 }
878
879 fn embedding_model_key(&self) -> &str {
880 self.config
881 .roles
882 .embedder
883 .as_ref()
884 .and_then(|role| self.config.providers.get(&role.provider))
885 .map(|profile| profile.model())
886 .unwrap_or("embedder")
887 }
888
889 fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
890 OperationLock::acquire(&self.config.cache_dir, mode)
891 }
892
893 fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
894 let mut chunk_ids = Vec::new();
895 for doc_id in doc_ids {
896 let chunks = self.storage.get_chunks_for_document(*doc_id)?;
897 chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
898 }
899 Ok(chunk_ids)
900 }
901
902 fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
903 if chunk_ids.is_empty() {
904 return Ok(());
905 }
906
907 self.storage.delete_tantivy(space, chunk_ids)?;
908 self.storage.commit_tantivy(space)?;
909 self.storage.delete_usearch(space, chunk_ids)?;
910 Ok(())
911 }
912
913 fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
914 let collection_count = self.storage.list_collections(Some(space.id))?.len();
915 let document_count = self.storage.count_documents(Some(space.id))?;
916 let chunk_count = self.storage.count_chunks(Some(space.id))?;
917
918 Ok(SpaceInfo {
919 name: space.name.clone(),
920 description: space.description.clone(),
921 collection_count,
922 document_count,
923 chunk_count,
924 created: space.created.clone(),
925 })
926 }
927
928 fn build_collection_info(
929 &self,
930 space_name: &str,
931 collection: &CollectionRow,
932 ) -> Result<CollectionInfo> {
933 let document_count = self
934 .storage
935 .count_documents_in_collection(collection.id, false)?;
936 let active_document_count = self
937 .storage
938 .count_documents_in_collection(collection.id, true)?;
939 let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
940 let embedded_chunk_count = self
941 .storage
942 .count_embedded_chunks_in_collection(collection.id)?;
943
944 Ok(CollectionInfo {
945 name: collection.name.clone(),
946 space: space_name.to_string(),
947 path: collection.path.clone(),
948 description: collection.description.clone(),
949 extensions: collection.extensions.clone(),
950 document_count,
951 active_document_count,
952 chunk_count,
953 embedded_chunk_count,
954 created: collection.created.clone(),
955 updated: collection.updated.clone(),
956 })
957 }
958
959 fn resolve_space_row(
960 &self,
961 explicit: Option<&str>,
962 collection_for_lookup: Option<&str>,
963 ) -> Result<crate::storage::SpaceRow> {
964 if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
965 return Ok(space);
966 }
967
968 if let Some(collection) = collection_for_lookup {
969 return match self.storage.find_space_for_collection(collection)? {
970 SpaceResolution::Found(space) => Ok(space),
971 SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
972 collection: collection.to_string(),
973 spaces,
974 }
975 .into()),
976 SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
977 name: collection.to_string(),
978 }
979 .into()),
980 };
981 }
982
983 Err(KboltError::NoActiveSpace.into())
984 }
985
986 fn resolve_preferred_space(
987 &self,
988 explicit: Option<&str>,
989 ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
990 if let Some(space_name) = explicit {
991 let space = self.storage.get_space(space_name)?;
992 return Ok(Some((space, ActiveSpaceSource::Flag)));
993 }
994
995 if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
996 let trimmed = space_name.trim();
997 if !trimmed.is_empty() {
998 let space = self.storage.get_space(trimmed)?;
999 return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1000 }
1001 }
1002
1003 if let Some(space_name) = self.config.default_space.as_deref() {
1004 let space = self.storage.get_space(space_name)?;
1005 return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1006 }
1007
1008 Ok(None)
1009 }
1010}
1011
1012#[cfg(test)]
1013mod tests;
1014
1015#[cfg(test)]
1016mod lock_relief_tests {
1017 use fs2::FileExt;
1018 use std::collections::HashMap;
1019 use std::fs::OpenOptions;
1020 use std::mem;
1021
1022 use tempfile::tempdir;
1023
1024 use super::Engine;
1025 use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1026 use crate::storage::Storage;
1027 use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1028
1029 #[test]
1030 fn metadata_reads_succeed_while_global_lock_is_held() {
1031 let engine = test_engine_with_indexed_collection();
1032 let _holder = hold_global_lock(&engine);
1033
1034 let spaces = engine.list_spaces().expect("list spaces");
1035 assert_eq!(
1036 spaces
1037 .iter()
1038 .map(|space| space.name.as_str())
1039 .collect::<Vec<_>>(),
1040 vec!["default", "work"]
1041 );
1042
1043 let space = engine.space_info("work").expect("load space info");
1044 assert_eq!(space.name, "work");
1045 assert_eq!(space.collection_count, 1);
1046
1047 let collections = engine
1048 .list_collections(Some("work"))
1049 .expect("list collections");
1050 assert_eq!(collections.len(), 1);
1051 assert_eq!(collections[0].name, "api");
1052
1053 let collection = engine
1054 .collection_info(Some("work"), "api")
1055 .expect("load collection info");
1056 assert_eq!(collection.name, "api");
1057 assert_eq!(collection.document_count, 1);
1058
1059 let models = engine.model_status().expect("read model status");
1060 assert!(!models.embedder.configured);
1061 assert!(!models.reranker.configured);
1062 assert!(!models.expander.configured);
1063 }
1064
1065 #[test]
1066 fn document_reads_succeed_while_global_lock_is_held() {
1067 let engine = test_engine_with_indexed_collection();
1068 let _holder = hold_global_lock(&engine);
1069
1070 let files = engine
1071 .list_files(Some("work"), "api", None)
1072 .expect("list files");
1073 assert_eq!(files.len(), 1);
1074 assert_eq!(files[0].path, "guide.md");
1075
1076 let document = engine
1077 .get_document(GetRequest {
1078 locator: Locator::Path("api/guide.md".to_string()),
1079 space: Some("work".to_string()),
1080 offset: None,
1081 limit: None,
1082 })
1083 .expect("read document");
1084 assert_eq!(document.path, "api/guide.md");
1085 assert!(document.content.contains("hello from kbolt"));
1086
1087 let response = engine
1088 .multi_get(MultiGetRequest {
1089 locators: vec![Locator::Path("api/guide.md".to_string())],
1090 space: Some("work".to_string()),
1091 max_files: 4,
1092 max_bytes: 4_096,
1093 })
1094 .expect("read multiple documents");
1095 assert_eq!(response.resolved_count, 1);
1096 assert_eq!(response.documents.len(), 1);
1097 assert!(response.warnings.is_empty());
1098 }
1099
1100 fn test_engine_with_indexed_collection() -> Engine {
1101 let root = tempdir().expect("create temp root");
1102 let root_path = root.path().to_path_buf();
1103 mem::forget(root);
1104
1105 let config_dir = root_path.join("config");
1106 let cache_dir = root_path.join("cache");
1107 let docs_dir = root_path.join("docs");
1108 std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1109 std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1110 .expect("write document");
1111
1112 let storage = Storage::new(&cache_dir).expect("create storage");
1113 let config = Config {
1114 config_dir,
1115 cache_dir,
1116 default_space: None,
1117 providers: HashMap::new(),
1118 roles: crate::config::RoleBindingsConfig::default(),
1119 reaping: ReapingConfig { days: 7 },
1120 chunking: ChunkingConfig::default(),
1121 ranking: RankingConfig::default(),
1122 };
1123 let engine = Engine::from_parts(storage, config);
1124
1125 engine
1126 .add_collection(AddCollectionRequest {
1127 path: docs_dir,
1128 space: Some("work".to_string()),
1129 name: Some("api".to_string()),
1130 description: None,
1131 extensions: None,
1132 no_index: true,
1133 })
1134 .expect("add collection");
1135 engine
1136 .update(UpdateOptions {
1137 space: Some("work".to_string()),
1138 collections: vec!["api".to_string()],
1139 no_embed: true,
1140 dry_run: false,
1141 verbose: false,
1142 })
1143 .expect("index collection");
1144
1145 engine
1146 }
1147
1148 fn hold_global_lock(engine: &Engine) -> std::fs::File {
1149 std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1150 let holder = OpenOptions::new()
1151 .read(true)
1152 .write(true)
1153 .create(true)
1154 .truncate(false)
1155 .open(engine.config().cache_dir.join("kbolt.lock"))
1156 .expect("open lock file");
1157 FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1158 holder
1159 }
1160}