1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::chunk::{chunk_document, chunk_document_with_counter, resolve_policy};
10use crate::ingest::extract::default_registry;
11use crate::lock::{LockMode, OperationLock};
12use crate::models;
13use crate::storage::Storage;
14use crate::storage::{
15 ChunkInsert, ChunkRow, CollectionRow, DocumentRow, DocumentTitleSource, SpaceResolution,
16 TantivyEntry,
17};
18use crate::Result;
19use kbolt_types::{
20 ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
21 CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
22 InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
23 OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
24 SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
25 SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
26};
27use walkdir::WalkDir;
28
29mod eval_ops;
30mod file_utils;
31mod ignore_helpers;
32mod ignore_ops;
33mod path_utils;
34mod schedule_ops;
35mod schedule_run_ops;
36mod schedule_status_ops;
37mod scoring;
38mod search_ops;
39mod text_helpers;
40mod update_ops;
41use file_utils::{file_error, file_title, modified_token, sha256_hex};
42use ignore_helpers::{
43 collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_dir_name,
44 is_hard_ignored_file, load_collection_ignore_matcher, validate_ignore_pattern,
45};
46use path_utils::{
47 collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
48 normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
49};
50use scoring::{dense_distance_to_score, max_option};
51pub(crate) use text_helpers::retrieval_text_with_prefix;
52use text_helpers::{chunk_text_from_bytes, search_text_with_neighbors};
53
54pub struct Engine {
55 storage: Storage,
56 config: Config,
57 embedder: Option<Arc<dyn models::Embedder>>,
58 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
59 reranker: Option<Arc<dyn models::Reranker>>,
60 expander: Option<Arc<dyn models::Expander>>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct IgnoreListEntry {
65 pub space: String,
66 pub collection: String,
67 pub pattern_count: usize,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct UpdateTarget {
72 pub space: String,
73 pub collection: CollectionRow,
74}
75
76#[derive(Debug, Clone, Copy)]
77struct TargetScope<'a> {
78 space: Option<&'a str>,
79 collections: &'a [String],
80}
81
82#[derive(Debug, Clone)]
83struct SearchCollectionMeta {
84 space: String,
85 collection: String,
86 path: std::path::PathBuf,
87}
88
89#[derive(Debug, Clone)]
90struct SearchHitCandidate {
91 chunk_id: i64,
92 bm25_score: f32,
93}
94
95#[derive(Debug, Clone)]
96struct RankedChunk {
97 chunk_id: i64,
98 score: f32,
99 fusion: f32,
100 reranker: Option<f32>,
101 bm25: Option<f32>,
102 dense: Option<f32>,
103 original_rank: Option<usize>,
104}
105
106impl Engine {
107 pub fn new(config_path: Option<&Path>) -> Result<Self> {
108 let config = config::load(config_path)?;
109 let storage = Storage::new(&config.cache_dir)?;
110 let built_models = models::build_inference_clients(&config)?;
111 Ok(Self {
112 storage,
113 config,
114 embedder: built_models.embedder,
115 embedding_document_sizer: built_models.embedding_document_sizer,
116 reranker: built_models.reranker,
117 expander: built_models.expander,
118 })
119 }
120
121 #[cfg(test)]
122 pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
123 Self::from_parts_with_models(storage, config, None, None, None)
124 }
125
126 #[cfg(test)]
127 pub(crate) fn from_parts_with_embedder(
128 storage: Storage,
129 config: Config,
130 embedder: Option<Arc<dyn models::Embedder>>,
131 ) -> Self {
132 Self::from_parts_with_inference(storage, config, embedder, None, None, None)
133 }
134
135 #[cfg(test)]
136 pub(crate) fn from_parts_with_embedding_runtime(
137 storage: Storage,
138 config: Config,
139 embedder: Option<Arc<dyn models::Embedder>>,
140 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
141 ) -> Self {
142 Self::from_parts_with_inference(
143 storage,
144 config,
145 embedder,
146 embedding_document_sizer,
147 None,
148 None,
149 )
150 }
151
152 #[cfg(test)]
153 pub(crate) fn from_parts_with_models(
154 storage: Storage,
155 config: Config,
156 embedder: Option<Arc<dyn models::Embedder>>,
157 reranker: Option<Arc<dyn models::Reranker>>,
158 expander: Option<Arc<dyn models::Expander>>,
159 ) -> Self {
160 Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
161 }
162
163 #[cfg(test)]
164 fn from_parts_with_inference(
165 storage: Storage,
166 config: Config,
167 embedder: Option<Arc<dyn models::Embedder>>,
168 embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
169 reranker: Option<Arc<dyn models::Reranker>>,
170 expander: Option<Arc<dyn models::Expander>>,
171 ) -> Self {
172 let built_models =
173 models::build_inference_clients(&config).expect("build inference models");
174 Self {
175 storage,
176 config,
177 embedder: embedder.or(built_models.embedder),
178 embedding_document_sizer: embedding_document_sizer
179 .or(built_models.embedding_document_sizer),
180 reranker: reranker.or(built_models.reranker),
181 expander: expander.or(built_models.expander),
182 }
183 }
184
185 pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
186 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
187 self.storage.create_space(name, description)?;
188 let space = self.storage.get_space(name)?;
189 self.build_space_info(&space)
190 }
191
192 pub fn remove_space(&mut self, name: &str) -> Result<()> {
193 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
194 self.storage.delete_space(name)?;
195 if self.config.default_space.as_deref() == Some(name) {
196 self.persist_default_space(None)?;
197 }
198 Ok(())
199 }
200
201 pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
202 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
203 self.storage.rename_space(old, new)?;
204 if self.config.default_space.as_deref() == Some(old) {
205 self.persist_default_space(Some(new.to_string()))?;
206 }
207 Ok(())
208 }
209
210 pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
211 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
212 self.storage.update_space_description(name, description)
213 }
214
215 pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
216 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
217 let spaces = self.storage.list_spaces()?;
218 let mut infos = Vec::with_capacity(spaces.len());
219 for space in spaces {
220 infos.push(self.build_space_info(&space)?);
221 }
222 Ok(infos)
223 }
224
225 pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
226 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
227 let space = self.storage.get_space(name)?;
228 self.build_space_info(&space)
229 }
230
231 pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
232 if let Some(space_name) = name {
233 self.storage.get_space(space_name)?;
234 }
235
236 self.persist_default_space(name.map(ToString::to_string))?;
237 Ok(self.config.default_space.clone())
238 }
239
240 fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
241 let previous = self.config.default_space.clone();
242 self.config.default_space = default_space;
243 if let Err(err) = config::save(&self.config) {
244 self.config.default_space = previous;
245 return Err(err);
246 }
247 Ok(())
248 }
249
250 pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
251 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
252 let AddCollectionRequest {
253 path,
254 space: requested_space,
255 name: requested_name,
256 description,
257 extensions,
258 no_index,
259 } = req;
260
261 let space = match requested_space.as_deref() {
262 Some(space_name) => match self.storage.get_space(space_name) {
263 Ok(space) => space,
264 Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
265 self.storage.create_space(space_name, None)?;
266 self.storage.get_space(space_name)?
267 }
268 Err(err) => return Err(err),
269 },
270 None => self.resolve_space_row(None, None)?,
271 };
272 if !path.is_absolute() || !path.is_dir() {
273 return Err(KboltError::InvalidPath(path).into());
274 }
275
276 let name = match requested_name {
277 Some(name) => name,
278 None => path
279 .file_name()
280 .and_then(|name| name.to_str())
281 .map(ToString::to_string)
282 .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
283 };
284
285 self.storage.create_collection(
286 space.id,
287 &name,
288 &path,
289 description.as_deref(),
290 extensions.as_deref(),
291 )?;
292
293 let initial_indexing = if no_index {
294 InitialIndexingOutcome::Skipped
295 } else {
296 match self.update_unlocked(UpdateOptions {
297 space: Some(space.name.clone()),
298 collections: vec![name.clone()],
299 no_embed: false,
300 dry_run: false,
301 verbose: false,
302 }) {
303 Ok(report) => InitialIndexingOutcome::Indexed(report),
304 Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
305 InitialIndexingOutcome::Blocked(
306 InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
307 )
308 }
309 Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
310 InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
311 name,
312 })
313 }
314 Err(err) => return Err(err),
315 }
316 };
317
318 let collection = self.storage.get_collection(space.id, &name)?;
319 Ok(AddCollectionResult {
320 collection: self.build_collection_info(&space.name, &collection)?,
321 initial_indexing,
322 })
323 }
324
325 pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
326 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
327 let resolved = self.resolve_space_row(space, Some(name))?;
328 let collection = self.storage.get_collection(resolved.id, name)?;
329 let documents = self.storage.list_documents(collection.id, false)?;
330 let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
331 let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
332 self.purge_space_chunks(&resolved.name, &chunk_ids)?;
333 self.storage.delete_collection(resolved.id, name)?;
334
335 let ignore_path =
336 collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
337 if ignore_path.is_file() {
338 std::fs::remove_file(ignore_path)?;
339 }
340
341 Ok(())
342 }
343
344 pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
345 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
346 let resolved = self.resolve_space_row(space, Some(old))?;
347 let old_ignore_path =
348 collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
349 let new_ignore_path =
350 collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
351 if old_ignore_path.is_file() && new_ignore_path.exists() {
352 return Err(KboltError::Internal(format!(
353 "cannot rename ignore file: destination already exists: {}",
354 new_ignore_path.display()
355 ))
356 .into());
357 }
358
359 self.storage.rename_collection(resolved.id, old, new)?;
360
361 if old_ignore_path.is_file() {
362 if let Some(parent) = new_ignore_path.parent() {
363 std::fs::create_dir_all(parent)?;
364 }
365 if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
366 match self.storage.rename_collection(resolved.id, new, old) {
367 Ok(()) => {
368 return Err(KboltError::Internal(format!(
369 "renamed collection was rolled back after ignore rename failure: {rename_err}"
370 ))
371 .into())
372 }
373 Err(rollback_err) => {
374 return Err(KboltError::Internal(format!(
375 "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
376 ))
377 .into())
378 }
379 }
380 }
381 }
382
383 Ok(())
384 }
385
386 pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
387 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
388 let resolved = self.resolve_space_row(space, Some(name))?;
389 self.storage
390 .update_collection_description(resolved.id, name, desc)
391 }
392
393 pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
394 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
395 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
396 let resolved = self.resolve_space_row(Some(space_name), None)?;
397 let mut map = std::collections::HashMap::new();
398 map.insert(resolved.id, resolved.name.clone());
399 (Some(resolved.id), map)
400 } else {
401 let spaces = self.storage.list_spaces()?;
402 let map = spaces
403 .into_iter()
404 .map(|space| (space.id, space.name))
405 .collect::<std::collections::HashMap<_, _>>();
406 (None, map)
407 };
408
409 let collections = self.storage.list_collections(space_id_filter)?;
410 let mut infos = Vec::with_capacity(collections.len());
411 for collection in collections {
412 let space_name = spaces_by_id
413 .get(&collection.space_id)
414 .ok_or_else(|| {
415 KboltError::Internal(format!(
416 "missing space mapping for collection '{}'",
417 collection.name
418 ))
419 })?
420 .clone();
421 infos.push(self.build_collection_info(&space_name, &collection)?);
422 }
423 Ok(infos)
424 }
425
426 pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
427 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
428 let resolved = self.resolve_space_row(space, Some(name))?;
429 let collection = self.storage.get_collection(resolved.id, name)?;
430 self.build_collection_info(&resolved.name, &collection)
431 }
432
433 pub fn list_files(
434 &self,
435 space: Option<&str>,
436 collection: &str,
437 prefix: Option<&str>,
438 ) -> Result<Vec<FileEntry>> {
439 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
440 let resolved_space = self.resolve_space_row(space, Some(collection))?;
441 let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
442 let normalized_prefix = normalize_list_prefix(prefix)?;
443 let file_rows = self
444 .storage
445 .list_collection_file_rows(collection_row.id, false)?;
446
447 let mut files = Vec::with_capacity(file_rows.len());
448 for file_row in file_rows {
449 if let Some(prefix) = normalized_prefix.as_deref() {
450 if !path_matches_prefix(&file_row.path, prefix) {
451 continue;
452 }
453 }
454
455 files.push(FileEntry {
456 path: file_row.path,
457 title: file_row.title,
458 docid: short_docid(&file_row.hash),
459 active: file_row.active,
460 chunk_count: file_row.chunk_count,
461 embedded: file_row.chunk_count > 0
462 && file_row.embedded_chunk_count >= file_row.chunk_count,
463 });
464 }
465
466 Ok(files)
467 }
468
469 pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
470 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
471 self.get_document_unlocked(req)
472 }
473
474 fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
475 let GetRequest {
476 locator,
477 space,
478 offset,
479 limit,
480 } = req;
481
482 let (document, collection_row, space_name) = match locator {
483 Locator::Path(locator_path) => {
484 let (collection_name, relative_path) = split_collection_path(&locator_path)?;
485 let resolved_space =
486 self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
487 let collection = self
488 .storage
489 .get_collection(resolved_space.id, &collection_name)?;
490 let document = self
491 .storage
492 .get_document_by_path(collection.id, &relative_path)?
493 .ok_or_else(|| KboltError::DocumentNotFound {
494 path: locator_path.clone(),
495 })?;
496 (document, collection, resolved_space.name)
497 }
498 Locator::DocId(docid) => {
499 let prefix = normalize_docid(&docid)?;
500 let mut candidates = self
501 .storage
502 .get_document_by_hash_prefix(&prefix)?
503 .into_iter()
504 .map(|document| {
505 let collection =
506 self.storage.get_collection_by_id(document.collection_id)?;
507 Ok((document, collection))
508 })
509 .collect::<Result<Vec<_>>>()?;
510
511 if let Some(space_name) = space.as_deref() {
512 let resolved_space = self.resolve_space_row(Some(space_name), None)?;
513 candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
514 }
515
516 if candidates.is_empty() {
517 return Err(KboltError::DocumentNotFound {
518 path: format!("#{prefix}"),
519 }
520 .into());
521 }
522
523 if candidates.len() > 1 {
524 return Err(KboltError::InvalidInput(
525 "docid is ambiguous; provide more characters".to_string(),
526 )
527 .into());
528 }
529
530 let (document, collection) = candidates.pop().expect("candidate exists");
531 let space = self.storage.get_space_by_id(collection.space_id)?;
532 (document, collection, space.name)
533 }
534 };
535
536 let full_path = collection_row.path.join(&document.path);
537 let bytes = match std::fs::read(&full_path) {
538 Ok(bytes) => bytes,
539 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
540 return Err(KboltError::FileDeleted(full_path).into())
541 }
542 Err(err) => return Err(err.into()),
543 };
544
545 let raw_content = String::from_utf8_lossy(&bytes).into_owned();
546 let stale = sha256_hex(&bytes) != document.hash;
547
548 let lines = raw_content.lines().collect::<Vec<_>>();
549 let total_lines = lines.len();
550 let start = offset.unwrap_or(0).min(total_lines);
551 let requested = limit.unwrap_or(total_lines.saturating_sub(start));
552 let end = start.saturating_add(requested).min(total_lines);
553 let returned_lines = end.saturating_sub(start);
554 let content = if returned_lines == 0 {
555 String::new()
556 } else {
557 lines[start..end].join("\n")
558 };
559
560 Ok(DocumentResponse {
561 docid: short_docid(&document.hash),
562 path: format!("{}/{}", collection_row.name, document.path),
563 title: document.title,
564 space: space_name,
565 collection: collection_row.name,
566 content,
567 stale,
568 total_lines,
569 returned_lines,
570 })
571 }
572
573 pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
574 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
575 if req.max_files == 0 {
576 return Err(
577 KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
578 );
579 }
580 if req.max_bytes == 0 {
581 return Err(
582 KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
583 );
584 }
585
586 let mut documents = Vec::new();
587 let mut omitted = Vec::new();
588 let mut resolved_count = 0usize;
589 let mut warnings = Vec::new();
590 let mut consumed_bytes = 0usize;
591
592 for locator in req.locators {
593 let document = match self.get_document_unlocked(GetRequest {
594 locator,
595 space: req.space.clone(),
596 offset: None,
597 limit: None,
598 }) {
599 Ok(document) => document,
600 Err(err) => match KboltError::from(err) {
601 KboltError::FileDeleted(path) => {
602 warnings.push(format!(
603 "file deleted since indexing: {}. run `kbolt update` to refresh.",
604 path.display()
605 ));
606 continue;
607 }
608 KboltError::DocumentNotFound { path } => {
609 warnings.push(format!("document not found: {path}"));
610 continue;
611 }
612 KboltError::InvalidInput(message) => {
613 warnings.push(format!("invalid locator: {message}"));
614 continue;
615 }
616 KboltError::InvalidPath(path) => {
617 warnings.push(format!("invalid locator path: {}", path.display()));
618 continue;
619 }
620 KboltError::AmbiguousSpace { collection, spaces } => {
621 warnings.push(format!(
622 "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
623 spaces
624 ));
625 continue;
626 }
627 KboltError::SpaceNotFound { name } => {
628 warnings.push(format!("space not found for locator: {name}"));
629 continue;
630 }
631 KboltError::CollectionNotFound { name } => {
632 warnings.push(format!("collection not found for locator: {name}"));
633 continue;
634 }
635 other => return Err(other.into()),
636 },
637 };
638 resolved_count = resolved_count.saturating_add(1);
639
640 let size_bytes = document.content.len();
641 if documents.len() >= req.max_files {
642 omitted.push(OmittedFile {
643 path: document.path,
644 docid: document.docid,
645 size_bytes,
646 reason: OmitReason::MaxFiles,
647 });
648 continue;
649 }
650
651 if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
652 omitted.push(OmittedFile {
653 path: document.path,
654 docid: document.docid,
655 size_bytes,
656 reason: OmitReason::MaxBytes,
657 });
658 continue;
659 }
660
661 consumed_bytes = consumed_bytes.saturating_add(size_bytes);
662 documents.push(document);
663 }
664
665 Ok(MultiGetResponse {
666 resolved_count,
667 documents,
668 omitted,
669 warnings,
670 })
671 }
672
673 pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
674 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
675 let started = Instant::now();
676 let query = req.query.trim();
677 if query.is_empty() {
678 return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
679 }
680
681 let requested_mode = req.mode.clone();
682 let rerank_enabled =
683 matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
684 let mut pipeline = self.initial_search_pipeline(&requested_mode);
685
686 let targets = self.resolve_targets(TargetScope {
687 space: req.space.as_deref(),
688 collections: &req.collections,
689 })?;
690
691 let staleness_hint = targets
692 .iter()
693 .map(|target| target.collection.updated.clone())
694 .max()
695 .map(|updated| format!("Index last updated: {updated}"));
696
697 if req.limit == 0 || targets.is_empty() {
698 return Ok(SearchResponse {
699 results: Vec::new(),
700 query: req.query,
701 requested_mode: requested_mode.clone(),
702 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
703 pipeline,
704 staleness_hint,
705 elapsed_ms: started.elapsed().as_millis() as u64,
706 });
707 }
708
709 let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
710 for target in &targets {
711 collections_by_id.insert(
712 target.collection.id,
713 SearchCollectionMeta {
714 space: target.space.clone(),
715 collection: target.collection.name.clone(),
716 path: target.collection.path.clone(),
717 },
718 );
719 }
720
721 let max_candidates = self.max_search_candidates(&targets)?;
722 let mut retrieval_limit =
723 self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
724 let results = loop {
725 let ranked_chunks = self.rank_chunks_for_mode(
726 &requested_mode,
727 &targets,
728 query,
729 retrieval_limit,
730 req.min_score,
731 &mut pipeline,
732 )?;
733
734 if ranked_chunks.is_empty() {
735 return Ok(SearchResponse {
736 results: Vec::new(),
737 query: req.query,
738 requested_mode: requested_mode.clone(),
739 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
740 pipeline,
741 staleness_hint,
742 elapsed_ms: started.elapsed().as_millis() as u64,
743 });
744 }
745
746 let ranked_len = ranked_chunks.len();
747 let results = self.assemble_search_results(
748 query,
749 &requested_mode,
750 ranked_chunks,
751 &collections_by_id,
752 req.debug,
753 rerank_enabled,
754 &mut pipeline,
755 req.limit,
756 )?;
757
758 if results.len() >= req.limit
759 || ranked_len < retrieval_limit
760 || retrieval_limit >= max_candidates
761 {
762 break results;
763 }
764
765 let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
766 if next_limit <= retrieval_limit {
767 break results;
768 }
769 retrieval_limit = next_limit;
770 };
771
772 Ok(SearchResponse {
773 results,
774 query: req.query,
775 requested_mode: requested_mode.clone(),
776 effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
777 pipeline,
778 staleness_hint,
779 elapsed_ms: started.elapsed().as_millis() as u64,
780 })
781 }
782
783 pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
784 let resolved = self.resolve_space_row(explicit, None)?;
785 Ok(resolved.name)
786 }
787
788 pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
789 let resolved = self.resolve_preferred_space(explicit)?;
790 Ok(resolved.map(|(space, source)| ActiveSpace {
791 name: space.name,
792 source,
793 }))
794 }
795
796 pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
797 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
798 let (spaces, totals_scope) = if let Some(space_name) = space {
799 let resolved = self.resolve_space_row(Some(space_name), None)?;
800 (vec![resolved.clone()], Some(resolved.id))
801 } else {
802 (self.storage.list_spaces()?, None)
803 };
804
805 let mut space_statuses = Vec::with_capacity(spaces.len());
806 for space_row in spaces {
807 let collections = self.storage.list_collections(Some(space_row.id))?;
808 let mut collection_statuses = Vec::with_capacity(collections.len());
809 let mut last_updated: Option<String> = None;
810
811 for collection in collections {
812 if last_updated
813 .as_ref()
814 .map(|existing| collection.updated > *existing)
815 .unwrap_or(true)
816 {
817 last_updated = Some(collection.updated.clone());
818 }
819
820 let documents = self
821 .storage
822 .count_documents_in_collection(collection.id, false)?;
823 let active_documents = self
824 .storage
825 .count_documents_in_collection(collection.id, true)?;
826 let chunks = self.storage.count_chunks_in_collection(collection.id)?;
827 let embedded_chunks = self
828 .storage
829 .count_embedded_chunks_in_collection(collection.id)?;
830
831 collection_statuses.push(CollectionStatus {
832 name: collection.name,
833 path: collection.path,
834 documents,
835 active_documents,
836 chunks,
837 embedded_chunks,
838 last_updated: collection.updated,
839 });
840 }
841
842 space_statuses.push(SpaceStatus {
843 name: space_row.name,
844 description: space_row.description,
845 collections: collection_statuses,
846 last_updated,
847 });
848 }
849
850 let models = self.model_status_unlocked()?;
851
852 Ok(StatusResponse {
853 spaces: space_statuses,
854 models,
855 cache_dir: self.config.cache_dir.clone(),
856 config_dir: self.config.config_dir.clone(),
857 total_documents: self.storage.count_documents(totals_scope)?,
858 total_chunks: self.storage.count_chunks(totals_scope)?,
859 total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
860 disk_usage: self.storage.disk_usage()?,
861 })
862 }
863
864 pub fn model_status(&self) -> Result<ModelStatus> {
865 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
866 self.model_status_unlocked()
867 }
868
869 fn model_status_unlocked(&self) -> Result<ModelStatus> {
870 models::status(&self.config)
871 }
872
873 pub fn config(&self) -> &Config {
874 &self.config
875 }
876
877 pub fn storage(&self) -> &Storage {
878 &self.storage
879 }
880
881 fn embedding_model_key(&self) -> &str {
882 self.config
883 .roles
884 .embedder
885 .as_ref()
886 .and_then(|role| self.config.providers.get(&role.provider))
887 .map(|profile| profile.model())
888 .unwrap_or("embedder")
889 }
890
891 fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
892 OperationLock::acquire(&self.config.cache_dir, mode)
893 }
894
895 fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
896 let mut chunk_ids = Vec::new();
897 for doc_id in doc_ids {
898 let chunks = self.storage.get_chunks_for_document(*doc_id)?;
899 chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
900 }
901 Ok(chunk_ids)
902 }
903
904 fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
905 if chunk_ids.is_empty() {
906 return Ok(());
907 }
908
909 self.storage.delete_tantivy(space, chunk_ids)?;
910 self.storage.commit_tantivy(space)?;
911 self.storage.delete_usearch(space, chunk_ids)?;
912 Ok(())
913 }
914
915 fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
916 let collection_count = self.storage.list_collections(Some(space.id))?.len();
917 let document_count = self.storage.count_documents(Some(space.id))?;
918 let chunk_count = self.storage.count_chunks(Some(space.id))?;
919
920 Ok(SpaceInfo {
921 name: space.name.clone(),
922 description: space.description.clone(),
923 collection_count,
924 document_count,
925 chunk_count,
926 created: space.created.clone(),
927 })
928 }
929
930 fn build_collection_info(
931 &self,
932 space_name: &str,
933 collection: &CollectionRow,
934 ) -> Result<CollectionInfo> {
935 let document_count = self
936 .storage
937 .count_documents_in_collection(collection.id, false)?;
938 let active_document_count = self
939 .storage
940 .count_documents_in_collection(collection.id, true)?;
941 let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
942 let embedded_chunk_count = self
943 .storage
944 .count_embedded_chunks_in_collection(collection.id)?;
945
946 Ok(CollectionInfo {
947 name: collection.name.clone(),
948 space: space_name.to_string(),
949 path: collection.path.clone(),
950 description: collection.description.clone(),
951 extensions: collection.extensions.clone(),
952 document_count,
953 active_document_count,
954 chunk_count,
955 embedded_chunk_count,
956 created: collection.created.clone(),
957 updated: collection.updated.clone(),
958 })
959 }
960
961 fn resolve_space_row(
962 &self,
963 explicit: Option<&str>,
964 collection_for_lookup: Option<&str>,
965 ) -> Result<crate::storage::SpaceRow> {
966 if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
967 return Ok(space);
968 }
969
970 if let Some(collection) = collection_for_lookup {
971 return match self.storage.find_space_for_collection(collection)? {
972 SpaceResolution::Found(space) => Ok(space),
973 SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
974 collection: collection.to_string(),
975 spaces,
976 }
977 .into()),
978 SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
979 name: collection.to_string(),
980 }
981 .into()),
982 };
983 }
984
985 Err(KboltError::NoActiveSpace.into())
986 }
987
988 fn resolve_preferred_space(
989 &self,
990 explicit: Option<&str>,
991 ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
992 if let Some(space_name) = explicit {
993 let space = self.storage.get_space(space_name)?;
994 return Ok(Some((space, ActiveSpaceSource::Flag)));
995 }
996
997 if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
998 let trimmed = space_name.trim();
999 if !trimmed.is_empty() {
1000 let space = self.storage.get_space(trimmed)?;
1001 return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1002 }
1003 }
1004
1005 if let Some(space_name) = self.config.default_space.as_deref() {
1006 let space = self.storage.get_space(space_name)?;
1007 return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1008 }
1009
1010 Ok(None)
1011 }
1012}
1013
1014#[cfg(test)]
1015mod tests;