Skip to main content

kbolt_core/
engine.rs

1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::chunk::{chunk_document, chunk_document_with_counter, resolve_policy};
10use crate::ingest::extract::default_registry;
11use crate::lock::{LockMode, OperationLock};
12use crate::models;
13use crate::storage::Storage;
14use crate::storage::{
15    ChunkInsert, ChunkRow, CollectionRow, DocumentRow, DocumentTitleSource, SpaceResolution,
16    TantivyEntry,
17};
18use crate::Result;
19use kbolt_types::{
20    ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
21    CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
22    InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
23    OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
24    SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
25    SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
26};
27mod eval_ops;
28mod file_utils;
29mod ignore_helpers;
30mod ignore_ops;
31mod path_utils;
32mod schedule_ops;
33mod schedule_run_ops;
34mod schedule_status_ops;
35mod scoring;
36mod search_ops;
37mod text_helpers;
38mod update_ops;
39use file_utils::{file_error, file_title, modified_token, sha256_hex};
40use ignore_helpers::{
41    collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
42    load_collection_ignore_matcher, validate_ignore_pattern,
43};
44use path_utils::{
45    collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
46    normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
47};
48use scoring::{dense_distance_to_score, max_option};
49pub(crate) use text_helpers::retrieval_text_with_prefix;
50use text_helpers::{chunk_text_from_bytes, search_text_with_neighbors};
51
52pub struct Engine {
53    storage: Storage,
54    config: Config,
55    embedder: Option<Arc<dyn models::Embedder>>,
56    embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
57    reranker: Option<Arc<dyn models::Reranker>>,
58    expander: Option<Arc<dyn models::Expander>>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct IgnoreListEntry {
63    pub space: String,
64    pub collection: String,
65    pub pattern_count: usize,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct UpdateTarget {
70    pub space: String,
71    pub collection: CollectionRow,
72}
73
74#[derive(Debug, Clone, Copy)]
75struct TargetScope<'a> {
76    space: Option<&'a str>,
77    collections: &'a [String],
78}
79
80#[derive(Debug, Clone)]
81struct SearchCollectionMeta {
82    space: String,
83    collection: String,
84    path: std::path::PathBuf,
85}
86
87#[derive(Debug, Clone)]
88struct SearchHitCandidate {
89    chunk_id: i64,
90    bm25_score: f32,
91}
92
93#[derive(Debug, Clone)]
94struct RankedChunk {
95    chunk_id: i64,
96    score: f32,
97    fusion: f32,
98    reranker: Option<f32>,
99    bm25: Option<f32>,
100    dense: Option<f32>,
101    original_rank: Option<usize>,
102}
103
104impl Engine {
105    pub fn new(config_path: Option<&Path>) -> Result<Self> {
106        let config = config::load(config_path)?;
107        let storage = Storage::new(&config.cache_dir)?;
108        let built_models = models::build_inference_clients(&config)?;
109        Ok(Self {
110            storage,
111            config,
112            embedder: built_models.embedder,
113            embedding_document_sizer: built_models.embedding_document_sizer,
114            reranker: built_models.reranker,
115            expander: built_models.expander,
116        })
117    }
118
119    #[cfg(test)]
120    pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
121        Self::from_parts_with_models(storage, config, None, None, None)
122    }
123
124    #[cfg(test)]
125    pub(crate) fn from_parts_with_embedder(
126        storage: Storage,
127        config: Config,
128        embedder: Option<Arc<dyn models::Embedder>>,
129    ) -> Self {
130        Self::from_parts_with_inference(storage, config, embedder, None, None, None)
131    }
132
133    #[cfg(test)]
134    pub(crate) fn from_parts_with_embedding_runtime(
135        storage: Storage,
136        config: Config,
137        embedder: Option<Arc<dyn models::Embedder>>,
138        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
139    ) -> Self {
140        Self::from_parts_with_inference(
141            storage,
142            config,
143            embedder,
144            embedding_document_sizer,
145            None,
146            None,
147        )
148    }
149
150    #[cfg(test)]
151    pub(crate) fn from_parts_with_models(
152        storage: Storage,
153        config: Config,
154        embedder: Option<Arc<dyn models::Embedder>>,
155        reranker: Option<Arc<dyn models::Reranker>>,
156        expander: Option<Arc<dyn models::Expander>>,
157    ) -> Self {
158        Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
159    }
160
161    #[cfg(test)]
162    fn from_parts_with_inference(
163        storage: Storage,
164        config: Config,
165        embedder: Option<Arc<dyn models::Embedder>>,
166        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
167        reranker: Option<Arc<dyn models::Reranker>>,
168        expander: Option<Arc<dyn models::Expander>>,
169    ) -> Self {
170        let built_models =
171            models::build_inference_clients(&config).expect("build inference models");
172        Self {
173            storage,
174            config,
175            embedder: embedder.or(built_models.embedder),
176            embedding_document_sizer: embedding_document_sizer
177                .or(built_models.embedding_document_sizer),
178            reranker: reranker.or(built_models.reranker),
179            expander: expander.or(built_models.expander),
180        }
181    }
182
183    pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
184        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
185        self.storage.create_space(name, description)?;
186        let space = self.storage.get_space(name)?;
187        self.build_space_info(&space)
188    }
189
190    pub fn remove_space(&mut self, name: &str) -> Result<()> {
191        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
192        self.storage.delete_space(name)?;
193        if self.config.default_space.as_deref() == Some(name) {
194            self.persist_default_space(None)?;
195        }
196        Ok(())
197    }
198
199    pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
200        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
201        self.storage.rename_space(old, new)?;
202        if self.config.default_space.as_deref() == Some(old) {
203            self.persist_default_space(Some(new.to_string()))?;
204        }
205        Ok(())
206    }
207
208    pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
209        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
210        self.storage.update_space_description(name, description)
211    }
212
213    pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
214        let spaces = self.storage.list_spaces()?;
215        let mut infos = Vec::with_capacity(spaces.len());
216        for space in spaces {
217            infos.push(self.build_space_info(&space)?);
218        }
219        Ok(infos)
220    }
221
222    pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
223        let space = self.storage.get_space(name)?;
224        self.build_space_info(&space)
225    }
226
227    pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
228        if let Some(space_name) = name {
229            self.storage.get_space(space_name)?;
230        }
231
232        self.persist_default_space(name.map(ToString::to_string))?;
233        Ok(self.config.default_space.clone())
234    }
235
236    fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
237        let previous = self.config.default_space.clone();
238        self.config.default_space = default_space;
239        if let Err(err) = config::save(&self.config) {
240            self.config.default_space = previous;
241            return Err(err);
242        }
243        Ok(())
244    }
245
246    pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
247        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
248        let AddCollectionRequest {
249            path,
250            space: requested_space,
251            name: requested_name,
252            description,
253            extensions,
254            no_index,
255        } = req;
256
257        let space = match requested_space.as_deref() {
258            Some(space_name) => match self.storage.get_space(space_name) {
259                Ok(space) => space,
260                Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
261                    self.storage.create_space(space_name, None)?;
262                    self.storage.get_space(space_name)?
263                }
264                Err(err) => return Err(err),
265            },
266            None => self.resolve_space_row(None, None)?,
267        };
268        if !path.is_absolute() || !path.is_dir() {
269            return Err(KboltError::InvalidPath(path).into());
270        }
271
272        let name = match requested_name {
273            Some(name) => name,
274            None => path
275                .file_name()
276                .and_then(|name| name.to_str())
277                .map(ToString::to_string)
278                .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
279        };
280
281        self.storage.create_collection(
282            space.id,
283            &name,
284            &path,
285            description.as_deref(),
286            extensions.as_deref(),
287        )?;
288
289        let initial_indexing = if no_index {
290            InitialIndexingOutcome::Skipped
291        } else {
292            match self.update_unlocked(UpdateOptions {
293                space: Some(space.name.clone()),
294                collections: vec![name.clone()],
295                no_embed: false,
296                dry_run: false,
297                verbose: false,
298            }) {
299                Ok(report) => InitialIndexingOutcome::Indexed(report),
300                Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
301                    InitialIndexingOutcome::Blocked(
302                        InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
303                    )
304                }
305                Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
306                    InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
307                        name,
308                    })
309                }
310                Err(err) => return Err(err),
311            }
312        };
313
314        let collection = self.storage.get_collection(space.id, &name)?;
315        Ok(AddCollectionResult {
316            collection: self.build_collection_info(&space.name, &collection)?,
317            initial_indexing,
318        })
319    }
320
321    pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
322        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
323        let resolved = self.resolve_space_row(space, Some(name))?;
324        let collection = self.storage.get_collection(resolved.id, name)?;
325        let documents = self.storage.list_documents(collection.id, false)?;
326        let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
327        let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
328        self.purge_space_chunks(&resolved.name, &chunk_ids)?;
329        self.storage.delete_collection(resolved.id, name)?;
330
331        let ignore_path =
332            collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
333        if ignore_path.is_file() {
334            std::fs::remove_file(ignore_path)?;
335        }
336
337        Ok(())
338    }
339
340    pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
341        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
342        let resolved = self.resolve_space_row(space, Some(old))?;
343        let old_ignore_path =
344            collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
345        let new_ignore_path =
346            collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
347        if old_ignore_path.is_file() && new_ignore_path.exists() {
348            return Err(KboltError::Internal(format!(
349                "cannot rename ignore file: destination already exists: {}",
350                new_ignore_path.display()
351            ))
352            .into());
353        }
354
355        self.storage.rename_collection(resolved.id, old, new)?;
356
357        if old_ignore_path.is_file() {
358            if let Some(parent) = new_ignore_path.parent() {
359                std::fs::create_dir_all(parent)?;
360            }
361            if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
362                match self.storage.rename_collection(resolved.id, new, old) {
363                    Ok(()) => {
364                        return Err(KboltError::Internal(format!(
365                            "renamed collection was rolled back after ignore rename failure: {rename_err}"
366                        ))
367                        .into())
368                    }
369                    Err(rollback_err) => {
370                        return Err(KboltError::Internal(format!(
371                            "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
372                        ))
373                        .into())
374                    }
375                }
376            }
377        }
378
379        Ok(())
380    }
381
382    pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
383        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
384        let resolved = self.resolve_space_row(space, Some(name))?;
385        self.storage
386            .update_collection_description(resolved.id, name, desc)
387    }
388
389    pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
390        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
391            let resolved = self.resolve_space_row(Some(space_name), None)?;
392            let mut map = std::collections::HashMap::new();
393            map.insert(resolved.id, resolved.name.clone());
394            (Some(resolved.id), map)
395        } else {
396            let spaces = self.storage.list_spaces()?;
397            let map = spaces
398                .into_iter()
399                .map(|space| (space.id, space.name))
400                .collect::<std::collections::HashMap<_, _>>();
401            (None, map)
402        };
403
404        let collections = self.storage.list_collections(space_id_filter)?;
405        let mut infos = Vec::with_capacity(collections.len());
406        for collection in collections {
407            let space_name = spaces_by_id
408                .get(&collection.space_id)
409                .ok_or_else(|| {
410                    KboltError::Internal(format!(
411                        "missing space mapping for collection '{}'",
412                        collection.name
413                    ))
414                })?
415                .clone();
416            infos.push(self.build_collection_info(&space_name, &collection)?);
417        }
418        Ok(infos)
419    }
420
421    pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
422        let resolved = self.resolve_space_row(space, Some(name))?;
423        let collection = self.storage.get_collection(resolved.id, name)?;
424        self.build_collection_info(&resolved.name, &collection)
425    }
426
427    pub fn list_files(
428        &self,
429        space: Option<&str>,
430        collection: &str,
431        prefix: Option<&str>,
432    ) -> Result<Vec<FileEntry>> {
433        let resolved_space = self.resolve_space_row(space, Some(collection))?;
434        let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
435        let normalized_prefix = normalize_list_prefix(prefix)?;
436        let file_rows = self
437            .storage
438            .list_collection_file_rows(collection_row.id, false)?;
439
440        let mut files = Vec::with_capacity(file_rows.len());
441        for file_row in file_rows {
442            if let Some(prefix) = normalized_prefix.as_deref() {
443                if !path_matches_prefix(&file_row.path, prefix) {
444                    continue;
445                }
446            }
447
448            files.push(FileEntry {
449                path: file_row.path,
450                title: file_row.title,
451                docid: short_docid(&file_row.hash),
452                active: file_row.active,
453                chunk_count: file_row.chunk_count,
454                embedded: file_row.chunk_count > 0
455                    && file_row.embedded_chunk_count >= file_row.chunk_count,
456            });
457        }
458
459        Ok(files)
460    }
461
462    pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
463        self.get_document_unlocked(req)
464    }
465
466    fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
467        let GetRequest {
468            locator,
469            space,
470            offset,
471            limit,
472        } = req;
473
474        let (document, collection_row, space_name) = match locator {
475            Locator::Path(locator_path) => {
476                let (collection_name, relative_path) = split_collection_path(&locator_path)?;
477                let resolved_space =
478                    self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
479                let collection = self
480                    .storage
481                    .get_collection(resolved_space.id, &collection_name)?;
482                let document = self
483                    .storage
484                    .get_document_by_path(collection.id, &relative_path)?
485                    .ok_or_else(|| KboltError::DocumentNotFound {
486                        path: locator_path.clone(),
487                    })?;
488                (document, collection, resolved_space.name)
489            }
490            Locator::DocId(docid) => {
491                let prefix = normalize_docid(&docid)?;
492                let mut candidates = self
493                    .storage
494                    .get_document_by_hash_prefix(&prefix)?
495                    .into_iter()
496                    .map(|document| {
497                        let collection =
498                            self.storage.get_collection_by_id(document.collection_id)?;
499                        Ok((document, collection))
500                    })
501                    .collect::<Result<Vec<_>>>()?;
502
503                if let Some(space_name) = space.as_deref() {
504                    let resolved_space = self.resolve_space_row(Some(space_name), None)?;
505                    candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
506                }
507
508                if candidates.is_empty() {
509                    return Err(KboltError::DocumentNotFound {
510                        path: format!("#{prefix}"),
511                    }
512                    .into());
513                }
514
515                if candidates.len() > 1 {
516                    return Err(KboltError::InvalidInput(
517                        "docid is ambiguous; provide more characters".to_string(),
518                    )
519                    .into());
520                }
521
522                let (document, collection) = candidates.pop().expect("candidate exists");
523                let space = self.storage.get_space_by_id(collection.space_id)?;
524                (document, collection, space.name)
525            }
526        };
527
528        let full_path = collection_row.path.join(&document.path);
529        let bytes = match std::fs::read(&full_path) {
530            Ok(bytes) => bytes,
531            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
532                return Err(KboltError::FileDeleted(full_path).into())
533            }
534            Err(err) => return Err(err.into()),
535        };
536
537        let raw_content = String::from_utf8_lossy(&bytes).into_owned();
538        let stale = sha256_hex(&bytes) != document.hash;
539
540        let lines = raw_content.lines().collect::<Vec<_>>();
541        let total_lines = lines.len();
542        let start = offset.unwrap_or(0).min(total_lines);
543        let requested = limit.unwrap_or(total_lines.saturating_sub(start));
544        let end = start.saturating_add(requested).min(total_lines);
545        let returned_lines = end.saturating_sub(start);
546        let content = if returned_lines == 0 {
547            String::new()
548        } else {
549            lines[start..end].join("\n")
550        };
551
552        Ok(DocumentResponse {
553            docid: short_docid(&document.hash),
554            path: format!("{}/{}", collection_row.name, document.path),
555            title: document.title,
556            space: space_name,
557            collection: collection_row.name,
558            content,
559            stale,
560            total_lines,
561            returned_lines,
562        })
563    }
564
565    pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
566        if req.max_files == 0 {
567            return Err(
568                KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
569            );
570        }
571        if req.max_bytes == 0 {
572            return Err(
573                KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
574            );
575        }
576
577        let mut documents = Vec::new();
578        let mut omitted = Vec::new();
579        let mut resolved_count = 0usize;
580        let mut warnings = Vec::new();
581        let mut consumed_bytes = 0usize;
582
583        for locator in req.locators {
584            let document = match self.get_document_unlocked(GetRequest {
585                locator,
586                space: req.space.clone(),
587                offset: None,
588                limit: None,
589            }) {
590                Ok(document) => document,
591                Err(err) => match KboltError::from(err) {
592                    KboltError::FileDeleted(path) => {
593                        warnings.push(format!(
594                            "file deleted since indexing: {}. run `kbolt update` to refresh.",
595                            path.display()
596                        ));
597                        continue;
598                    }
599                    KboltError::DocumentNotFound { path } => {
600                        warnings.push(format!("document not found: {path}"));
601                        continue;
602                    }
603                    KboltError::InvalidInput(message) => {
604                        warnings.push(format!("invalid locator: {message}"));
605                        continue;
606                    }
607                    KboltError::InvalidPath(path) => {
608                        warnings.push(format!("invalid locator path: {}", path.display()));
609                        continue;
610                    }
611                    KboltError::AmbiguousSpace { collection, spaces } => {
612                        warnings.push(format!(
613                            "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
614                            spaces
615                        ));
616                        continue;
617                    }
618                    KboltError::SpaceNotFound { name } => {
619                        warnings.push(format!("space not found for locator: {name}"));
620                        continue;
621                    }
622                    KboltError::CollectionNotFound { name } => {
623                        warnings.push(format!("collection not found for locator: {name}"));
624                        continue;
625                    }
626                    other => return Err(other.into()),
627                },
628            };
629            resolved_count = resolved_count.saturating_add(1);
630
631            let size_bytes = document.content.len();
632            if documents.len() >= req.max_files {
633                omitted.push(OmittedFile {
634                    path: document.path,
635                    docid: document.docid,
636                    size_bytes,
637                    reason: OmitReason::MaxFiles,
638                });
639                continue;
640            }
641
642            if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
643                omitted.push(OmittedFile {
644                    path: document.path,
645                    docid: document.docid,
646                    size_bytes,
647                    reason: OmitReason::MaxBytes,
648                });
649                continue;
650            }
651
652            consumed_bytes = consumed_bytes.saturating_add(size_bytes);
653            documents.push(document);
654        }
655
656        Ok(MultiGetResponse {
657            resolved_count,
658            documents,
659            omitted,
660            warnings,
661        })
662    }
663
664    pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
665        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
666        let started = Instant::now();
667        let query = req.query.trim();
668        if query.is_empty() {
669            return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
670        }
671
672        let requested_mode = req.mode.clone();
673        let rerank_enabled =
674            matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
675        let mut pipeline = self.initial_search_pipeline(&requested_mode);
676
677        let targets = self.resolve_targets(TargetScope {
678            space: req.space.as_deref(),
679            collections: &req.collections,
680        })?;
681
682        let staleness_hint = targets
683            .iter()
684            .map(|target| target.collection.updated.clone())
685            .max()
686            .map(|updated| format!("Index last updated: {updated}"));
687
688        if req.limit == 0 || targets.is_empty() {
689            return Ok(SearchResponse {
690                results: Vec::new(),
691                query: req.query,
692                requested_mode: requested_mode.clone(),
693                effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
694                pipeline,
695                staleness_hint,
696                elapsed_ms: started.elapsed().as_millis() as u64,
697            });
698        }
699
700        let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
701        for target in &targets {
702            collections_by_id.insert(
703                target.collection.id,
704                SearchCollectionMeta {
705                    space: target.space.clone(),
706                    collection: target.collection.name.clone(),
707                    path: target.collection.path.clone(),
708                },
709            );
710        }
711
712        let max_candidates = self.max_search_candidates(&targets)?;
713        let mut retrieval_limit =
714            self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
715        let results = loop {
716            let ranked_chunks = self.rank_chunks_for_mode(
717                &requested_mode,
718                &targets,
719                query,
720                retrieval_limit,
721                req.min_score,
722                &mut pipeline,
723            )?;
724
725            if ranked_chunks.is_empty() {
726                return Ok(SearchResponse {
727                    results: Vec::new(),
728                    query: req.query,
729                    requested_mode: requested_mode.clone(),
730                    effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
731                    pipeline,
732                    staleness_hint,
733                    elapsed_ms: started.elapsed().as_millis() as u64,
734                });
735            }
736
737            let ranked_len = ranked_chunks.len();
738            let results = self.assemble_search_results(
739                query,
740                &requested_mode,
741                ranked_chunks,
742                &collections_by_id,
743                req.debug,
744                rerank_enabled,
745                &mut pipeline,
746                req.limit,
747            )?;
748
749            if results.len() >= req.limit
750                || ranked_len < retrieval_limit
751                || retrieval_limit >= max_candidates
752            {
753                break results;
754            }
755
756            let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
757            if next_limit <= retrieval_limit {
758                break results;
759            }
760            retrieval_limit = next_limit;
761        };
762
763        Ok(SearchResponse {
764            results,
765            query: req.query,
766            requested_mode: requested_mode.clone(),
767            effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
768            pipeline,
769            staleness_hint,
770            elapsed_ms: started.elapsed().as_millis() as u64,
771        })
772    }
773
774    pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
775        let resolved = self.resolve_space_row(explicit, None)?;
776        Ok(resolved.name)
777    }
778
779    pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
780        let resolved = self.resolve_preferred_space(explicit)?;
781        Ok(resolved.map(|(space, source)| ActiveSpace {
782            name: space.name,
783            source,
784        }))
785    }
786
787    pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
788        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
789        let (spaces, totals_scope) = if let Some(space_name) = space {
790            let resolved = self.resolve_space_row(Some(space_name), None)?;
791            (vec![resolved.clone()], Some(resolved.id))
792        } else {
793            (self.storage.list_spaces()?, None)
794        };
795
796        let mut space_statuses = Vec::with_capacity(spaces.len());
797        for space_row in spaces {
798            let collections = self.storage.list_collections(Some(space_row.id))?;
799            let mut collection_statuses = Vec::with_capacity(collections.len());
800            let mut last_updated: Option<String> = None;
801
802            for collection in collections {
803                if last_updated
804                    .as_ref()
805                    .map(|existing| collection.updated > *existing)
806                    .unwrap_or(true)
807                {
808                    last_updated = Some(collection.updated.clone());
809                }
810
811                let documents = self
812                    .storage
813                    .count_documents_in_collection(collection.id, false)?;
814                let active_documents = self
815                    .storage
816                    .count_documents_in_collection(collection.id, true)?;
817                let chunks = self.storage.count_chunks_in_collection(collection.id)?;
818                let embedded_chunks = self
819                    .storage
820                    .count_embedded_chunks_in_collection(collection.id)?;
821
822                collection_statuses.push(CollectionStatus {
823                    name: collection.name,
824                    path: collection.path,
825                    documents,
826                    active_documents,
827                    chunks,
828                    embedded_chunks,
829                    last_updated: collection.updated,
830                });
831            }
832
833            space_statuses.push(SpaceStatus {
834                name: space_row.name,
835                description: space_row.description,
836                collections: collection_statuses,
837                last_updated,
838            });
839        }
840
841        let models = self.model_status_unlocked()?;
842
843        Ok(StatusResponse {
844            spaces: space_statuses,
845            models,
846            cache_dir: self.config.cache_dir.clone(),
847            config_dir: self.config.config_dir.clone(),
848            total_documents: self.storage.count_documents(totals_scope)?,
849            total_chunks: self.storage.count_chunks(totals_scope)?,
850            total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
851            disk_usage: self.storage.disk_usage()?,
852        })
853    }
854
855    pub fn model_status(&self) -> Result<ModelStatus> {
856        self.model_status_unlocked()
857    }
858
859    fn model_status_unlocked(&self) -> Result<ModelStatus> {
860        models::status(&self.config)
861    }
862
863    pub fn config(&self) -> &Config {
864        &self.config
865    }
866
867    pub fn storage(&self) -> &Storage {
868        &self.storage
869    }
870
871    fn embedding_model_key(&self) -> &str {
872        self.config
873            .roles
874            .embedder
875            .as_ref()
876            .and_then(|role| self.config.providers.get(&role.provider))
877            .map(|profile| profile.model())
878            .unwrap_or("embedder")
879    }
880
881    fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
882        OperationLock::acquire(&self.config.cache_dir, mode)
883    }
884
885    fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
886        let mut chunk_ids = Vec::new();
887        for doc_id in doc_ids {
888            let chunks = self.storage.get_chunks_for_document(*doc_id)?;
889            chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
890        }
891        Ok(chunk_ids)
892    }
893
894    fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
895        if chunk_ids.is_empty() {
896            return Ok(());
897        }
898
899        self.storage.delete_tantivy(space, chunk_ids)?;
900        self.storage.commit_tantivy(space)?;
901        self.storage.delete_usearch(space, chunk_ids)?;
902        Ok(())
903    }
904
905    fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
906        let collection_count = self.storage.list_collections(Some(space.id))?.len();
907        let document_count = self.storage.count_documents(Some(space.id))?;
908        let chunk_count = self.storage.count_chunks(Some(space.id))?;
909
910        Ok(SpaceInfo {
911            name: space.name.clone(),
912            description: space.description.clone(),
913            collection_count,
914            document_count,
915            chunk_count,
916            created: space.created.clone(),
917        })
918    }
919
920    fn build_collection_info(
921        &self,
922        space_name: &str,
923        collection: &CollectionRow,
924    ) -> Result<CollectionInfo> {
925        let document_count = self
926            .storage
927            .count_documents_in_collection(collection.id, false)?;
928        let active_document_count = self
929            .storage
930            .count_documents_in_collection(collection.id, true)?;
931        let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
932        let embedded_chunk_count = self
933            .storage
934            .count_embedded_chunks_in_collection(collection.id)?;
935
936        Ok(CollectionInfo {
937            name: collection.name.clone(),
938            space: space_name.to_string(),
939            path: collection.path.clone(),
940            description: collection.description.clone(),
941            extensions: collection.extensions.clone(),
942            document_count,
943            active_document_count,
944            chunk_count,
945            embedded_chunk_count,
946            created: collection.created.clone(),
947            updated: collection.updated.clone(),
948        })
949    }
950
951    fn resolve_space_row(
952        &self,
953        explicit: Option<&str>,
954        collection_for_lookup: Option<&str>,
955    ) -> Result<crate::storage::SpaceRow> {
956        if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
957            return Ok(space);
958        }
959
960        if let Some(collection) = collection_for_lookup {
961            return match self.storage.find_space_for_collection(collection)? {
962                SpaceResolution::Found(space) => Ok(space),
963                SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
964                    collection: collection.to_string(),
965                    spaces,
966                }
967                .into()),
968                SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
969                    name: collection.to_string(),
970                }
971                .into()),
972            };
973        }
974
975        Err(KboltError::NoActiveSpace.into())
976    }
977
978    fn resolve_preferred_space(
979        &self,
980        explicit: Option<&str>,
981    ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
982        if let Some(space_name) = explicit {
983            let space = self.storage.get_space(space_name)?;
984            return Ok(Some((space, ActiveSpaceSource::Flag)));
985        }
986
987        if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
988            let trimmed = space_name.trim();
989            if !trimmed.is_empty() {
990                let space = self.storage.get_space(trimmed)?;
991                return Ok(Some((space, ActiveSpaceSource::EnvVar)));
992            }
993        }
994
995        if let Some(space_name) = self.config.default_space.as_deref() {
996            let space = self.storage.get_space(space_name)?;
997            return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
998        }
999
1000        Ok(None)
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests;
1006
1007#[cfg(test)]
1008mod lock_relief_tests {
1009    use fs2::FileExt;
1010    use std::collections::HashMap;
1011    use std::fs::OpenOptions;
1012    use std::mem;
1013
1014    use tempfile::tempdir;
1015
1016    use super::Engine;
1017    use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1018    use crate::storage::Storage;
1019    use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1020
1021    #[test]
1022    fn metadata_reads_succeed_while_global_lock_is_held() {
1023        let engine = test_engine_with_indexed_collection();
1024        let _holder = hold_global_lock(&engine);
1025
1026        let spaces = engine.list_spaces().expect("list spaces");
1027        assert_eq!(
1028            spaces
1029                .iter()
1030                .map(|space| space.name.as_str())
1031                .collect::<Vec<_>>(),
1032            vec!["default", "work"]
1033        );
1034
1035        let space = engine.space_info("work").expect("load space info");
1036        assert_eq!(space.name, "work");
1037        assert_eq!(space.collection_count, 1);
1038
1039        let collections = engine
1040            .list_collections(Some("work"))
1041            .expect("list collections");
1042        assert_eq!(collections.len(), 1);
1043        assert_eq!(collections[0].name, "api");
1044
1045        let collection = engine
1046            .collection_info(Some("work"), "api")
1047            .expect("load collection info");
1048        assert_eq!(collection.name, "api");
1049        assert_eq!(collection.document_count, 1);
1050
1051        let models = engine.model_status().expect("read model status");
1052        assert!(!models.embedder.configured);
1053        assert!(!models.reranker.configured);
1054        assert!(!models.expander.configured);
1055    }
1056
1057    #[test]
1058    fn document_reads_succeed_while_global_lock_is_held() {
1059        let engine = test_engine_with_indexed_collection();
1060        let _holder = hold_global_lock(&engine);
1061
1062        let files = engine
1063            .list_files(Some("work"), "api", None)
1064            .expect("list files");
1065        assert_eq!(files.len(), 1);
1066        assert_eq!(files[0].path, "guide.md");
1067
1068        let document = engine
1069            .get_document(GetRequest {
1070                locator: Locator::Path("api/guide.md".to_string()),
1071                space: Some("work".to_string()),
1072                offset: None,
1073                limit: None,
1074            })
1075            .expect("read document");
1076        assert_eq!(document.path, "api/guide.md");
1077        assert!(document.content.contains("hello from kbolt"));
1078
1079        let response = engine
1080            .multi_get(MultiGetRequest {
1081                locators: vec![Locator::Path("api/guide.md".to_string())],
1082                space: Some("work".to_string()),
1083                max_files: 4,
1084                max_bytes: 4_096,
1085            })
1086            .expect("read multiple documents");
1087        assert_eq!(response.resolved_count, 1);
1088        assert_eq!(response.documents.len(), 1);
1089        assert!(response.warnings.is_empty());
1090    }
1091
1092    fn test_engine_with_indexed_collection() -> Engine {
1093        let root = tempdir().expect("create temp root");
1094        let root_path = root.path().to_path_buf();
1095        mem::forget(root);
1096
1097        let config_dir = root_path.join("config");
1098        let cache_dir = root_path.join("cache");
1099        let docs_dir = root_path.join("docs");
1100        std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1101        std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1102            .expect("write document");
1103
1104        let storage = Storage::new(&cache_dir).expect("create storage");
1105        let config = Config {
1106            config_dir,
1107            cache_dir,
1108            default_space: None,
1109            providers: HashMap::new(),
1110            roles: crate::config::RoleBindingsConfig::default(),
1111            reaping: ReapingConfig { days: 7 },
1112            chunking: ChunkingConfig::default(),
1113            ranking: RankingConfig::default(),
1114        };
1115        let engine = Engine::from_parts(storage, config);
1116
1117        engine
1118            .add_collection(AddCollectionRequest {
1119                path: docs_dir,
1120                space: Some("work".to_string()),
1121                name: Some("api".to_string()),
1122                description: None,
1123                extensions: None,
1124                no_index: true,
1125            })
1126            .expect("add collection");
1127        engine
1128            .update(UpdateOptions {
1129                space: Some("work".to_string()),
1130                collections: vec!["api".to_string()],
1131                no_embed: true,
1132                dry_run: false,
1133                verbose: false,
1134            })
1135            .expect("index collection");
1136
1137        engine
1138    }
1139
1140    fn hold_global_lock(engine: &Engine) -> std::fs::File {
1141        std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1142        let holder = OpenOptions::new()
1143            .read(true)
1144            .write(true)
1145            .create(true)
1146            .truncate(false)
1147            .open(engine.config().cache_dir.join("kbolt.lock"))
1148            .expect("open lock file");
1149        FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1150        holder
1151    }
1152}