Skip to main content

kbolt_core/
engine.rs

1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::chunk::{chunk_document, chunk_document_with_counter, resolve_policy};
10use crate::ingest::extract::default_registry;
11use crate::lock::{LockMode, OperationLock};
12use crate::models;
13use crate::storage::Storage;
14use crate::storage::{
15    ChunkInsert, ChunkRow, CollectionRow, DocumentRow, DocumentTitleSource, SpaceResolution,
16    TantivyEntry,
17};
18use crate::Result;
19use kbolt_types::{
20    ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
21    CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
22    InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
23    OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
24    SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
25    SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
26};
27mod eval_ops;
28mod file_utils;
29pub(crate) mod ignore_helpers;
30mod ignore_ops;
31mod path_utils;
32mod schedule_ops;
33mod schedule_run_ops;
34mod schedule_status_ops;
35mod scoring;
36mod search_ops;
37mod text_helpers;
38mod update_ops;
39use file_utils::{file_error, file_title, modified_token, sha256_hex};
40use ignore_helpers::{
41    collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
42    load_collection_ignore_matcher, validate_ignore_pattern,
43};
44use path_utils::{
45    collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
46    normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
47};
48use scoring::{dense_distance_to_score, max_option};
49pub(crate) use text_helpers::retrieval_text_with_prefix;
50use text_helpers::{chunk_text_from_bytes, search_text_with_neighbors};
51
52pub struct Engine {
53    storage: Storage,
54    config: Config,
55    embedder: Option<Arc<dyn models::Embedder>>,
56    embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
57    reranker: Option<Arc<dyn models::Reranker>>,
58    expander: Option<Arc<dyn models::Expander>>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct IgnoreListEntry {
63    pub space: String,
64    pub collection: String,
65    pub pattern_count: usize,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct UpdateTarget {
70    pub space: String,
71    pub collection: CollectionRow,
72}
73
74#[derive(Debug, Clone, Copy)]
75struct TargetScope<'a> {
76    space: Option<&'a str>,
77    collections: &'a [String],
78}
79
80#[derive(Debug, Clone)]
81struct SearchCollectionMeta {
82    space: String,
83    collection: String,
84    path: std::path::PathBuf,
85}
86
87#[derive(Debug, Clone)]
88struct SearchHitCandidate {
89    chunk_id: i64,
90    bm25_score: f32,
91}
92
93#[derive(Debug, Clone)]
94struct RankedChunk {
95    chunk_id: i64,
96    score: f32,
97    fusion: f32,
98    reranker: Option<f32>,
99    bm25: Option<f32>,
100    dense: Option<f32>,
101    original_rank: Option<usize>,
102}
103
104impl Engine {
105    pub fn new(config_path: Option<&Path>) -> Result<Self> {
106        Self::new_with_recovery_notice(config_path, None)
107    }
108
109    pub fn new_with_recovery_notice(
110        config_path: Option<&Path>,
111        recovery_notice: Option<crate::RecoveryNoticeSink>,
112    ) -> Result<Self> {
113        let config = config::load(config_path)?;
114        let storage = Storage::new(&config.cache_dir)?;
115        let built_models =
116            models::build_inference_clients_with_recovery_notice(&config, recovery_notice)?;
117        Ok(Self {
118            storage,
119            config,
120            embedder: built_models.embedder,
121            embedding_document_sizer: built_models.embedding_document_sizer,
122            reranker: built_models.reranker,
123            expander: built_models.expander,
124        })
125    }
126
127    #[cfg(test)]
128    pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
129        Self::from_parts_with_models(storage, config, None, None, None)
130    }
131
132    #[cfg(test)]
133    pub(crate) fn from_parts_with_embedder(
134        storage: Storage,
135        config: Config,
136        embedder: Option<Arc<dyn models::Embedder>>,
137    ) -> Self {
138        Self::from_parts_with_inference(storage, config, embedder, None, None, None)
139    }
140
141    #[cfg(test)]
142    pub(crate) fn from_parts_with_embedding_runtime(
143        storage: Storage,
144        config: Config,
145        embedder: Option<Arc<dyn models::Embedder>>,
146        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
147    ) -> Self {
148        Self::from_parts_with_inference(
149            storage,
150            config,
151            embedder,
152            embedding_document_sizer,
153            None,
154            None,
155        )
156    }
157
158    #[cfg(test)]
159    pub(crate) fn from_parts_with_models(
160        storage: Storage,
161        config: Config,
162        embedder: Option<Arc<dyn models::Embedder>>,
163        reranker: Option<Arc<dyn models::Reranker>>,
164        expander: Option<Arc<dyn models::Expander>>,
165    ) -> Self {
166        Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
167    }
168
169    #[cfg(test)]
170    fn from_parts_with_inference(
171        storage: Storage,
172        config: Config,
173        embedder: Option<Arc<dyn models::Embedder>>,
174        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
175        reranker: Option<Arc<dyn models::Reranker>>,
176        expander: Option<Arc<dyn models::Expander>>,
177    ) -> Self {
178        let built_models =
179            models::build_inference_clients(&config).expect("build inference models");
180        Self {
181            storage,
182            config,
183            embedder: embedder.or(built_models.embedder),
184            embedding_document_sizer: embedding_document_sizer
185                .or(built_models.embedding_document_sizer),
186            reranker: reranker.or(built_models.reranker),
187            expander: expander.or(built_models.expander),
188        }
189    }
190
191    pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
192        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
193        self.storage.create_space(name, description)?;
194        let space = self.storage.get_space(name)?;
195        self.build_space_info(&space)
196    }
197
198    pub fn remove_space(&mut self, name: &str) -> Result<()> {
199        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
200        self.storage.delete_space(name)?;
201        if self.config.default_space.as_deref() == Some(name) {
202            self.persist_default_space(None)?;
203        }
204        Ok(())
205    }
206
207    pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
208        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
209        self.storage.rename_space(old, new)?;
210        if self.config.default_space.as_deref() == Some(old) {
211            self.persist_default_space(Some(new.to_string()))?;
212        }
213        Ok(())
214    }
215
216    pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
217        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
218        self.storage.update_space_description(name, description)
219    }
220
221    pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
222        let spaces = self.storage.list_spaces()?;
223        let mut infos = Vec::with_capacity(spaces.len());
224        for space in spaces {
225            infos.push(self.build_space_info(&space)?);
226        }
227        Ok(infos)
228    }
229
230    pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
231        let space = self.storage.get_space(name)?;
232        self.build_space_info(&space)
233    }
234
235    pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
236        if let Some(space_name) = name {
237            self.storage.get_space(space_name)?;
238        }
239
240        self.persist_default_space(name.map(ToString::to_string))?;
241        Ok(self.config.default_space.clone())
242    }
243
244    fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
245        let previous = self.config.default_space.clone();
246        self.config.default_space = default_space;
247        if let Err(err) = config::save(&self.config) {
248            self.config.default_space = previous;
249            return Err(err);
250        }
251        Ok(())
252    }
253
254    pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
255        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
256        let AddCollectionRequest {
257            path,
258            space: requested_space,
259            name: requested_name,
260            description,
261            extensions,
262            no_index,
263        } = req;
264
265        let space = match requested_space.as_deref() {
266            Some(space_name) => match self.storage.get_space(space_name) {
267                Ok(space) => space,
268                Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
269                    self.storage.create_space(space_name, None)?;
270                    self.storage.get_space(space_name)?
271                }
272                Err(err) => return Err(err),
273            },
274            None => self.resolve_space_row(None, None)?,
275        };
276        if !path.is_absolute() || !path.is_dir() {
277            return Err(KboltError::InvalidPath(path).into());
278        }
279
280        let name = match requested_name {
281            Some(name) => name,
282            None => path
283                .file_name()
284                .and_then(|name| name.to_str())
285                .map(ToString::to_string)
286                .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
287        };
288
289        self.storage.create_collection(
290            space.id,
291            &name,
292            &path,
293            description.as_deref(),
294            extensions.as_deref(),
295        )?;
296
297        let initial_indexing = if no_index {
298            InitialIndexingOutcome::Skipped
299        } else {
300            match self.update_unlocked(UpdateOptions {
301                space: Some(space.name.clone()),
302                collections: vec![name.clone()],
303                no_embed: false,
304                dry_run: false,
305                verbose: false,
306            }) {
307                Ok(report) => InitialIndexingOutcome::Indexed(report),
308                Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
309                    InitialIndexingOutcome::Blocked(
310                        InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
311                    )
312                }
313                Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
314                    InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
315                        name,
316                    })
317                }
318                Err(err) => return Err(err),
319            }
320        };
321
322        let collection = self.storage.get_collection(space.id, &name)?;
323        Ok(AddCollectionResult {
324            collection: self.build_collection_info(&space.name, &collection)?,
325            initial_indexing,
326        })
327    }
328
329    pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
330        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
331        let resolved = self.resolve_space_row(space, Some(name))?;
332        let collection = self.storage.get_collection(resolved.id, name)?;
333        let documents = self.storage.list_documents(collection.id, false)?;
334        let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
335        let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
336        self.purge_space_chunks(&resolved.name, &chunk_ids)?;
337        self.storage.delete_collection(resolved.id, name)?;
338
339        let ignore_path =
340            collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
341        if ignore_path.is_file() {
342            std::fs::remove_file(ignore_path)?;
343        }
344
345        Ok(())
346    }
347
348    pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
349        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
350        let resolved = self.resolve_space_row(space, Some(old))?;
351        let old_ignore_path =
352            collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
353        let new_ignore_path =
354            collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
355        if old_ignore_path.is_file() && new_ignore_path.exists() {
356            return Err(KboltError::Internal(format!(
357                "cannot rename ignore file: destination already exists: {}",
358                new_ignore_path.display()
359            ))
360            .into());
361        }
362
363        self.storage.rename_collection(resolved.id, old, new)?;
364
365        if old_ignore_path.is_file() {
366            if let Some(parent) = new_ignore_path.parent() {
367                std::fs::create_dir_all(parent)?;
368            }
369            if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
370                match self.storage.rename_collection(resolved.id, new, old) {
371                    Ok(()) => {
372                        return Err(KboltError::Internal(format!(
373                            "renamed collection was rolled back after ignore rename failure: {rename_err}"
374                        ))
375                        .into())
376                    }
377                    Err(rollback_err) => {
378                        return Err(KboltError::Internal(format!(
379                            "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
380                        ))
381                        .into())
382                    }
383                }
384            }
385        }
386
387        Ok(())
388    }
389
390    pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
391        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
392        let resolved = self.resolve_space_row(space, Some(name))?;
393        self.storage
394            .update_collection_description(resolved.id, name, desc)
395    }
396
397    pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
398        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
399            let resolved = self.resolve_space_row(Some(space_name), None)?;
400            let mut map = std::collections::HashMap::new();
401            map.insert(resolved.id, resolved.name.clone());
402            (Some(resolved.id), map)
403        } else {
404            let spaces = self.storage.list_spaces()?;
405            let map = spaces
406                .into_iter()
407                .map(|space| (space.id, space.name))
408                .collect::<std::collections::HashMap<_, _>>();
409            (None, map)
410        };
411
412        let collections = self.storage.list_collections(space_id_filter)?;
413        let mut infos = Vec::with_capacity(collections.len());
414        for collection in collections {
415            let space_name = spaces_by_id
416                .get(&collection.space_id)
417                .ok_or_else(|| {
418                    KboltError::Internal(format!(
419                        "missing space mapping for collection '{}'",
420                        collection.name
421                    ))
422                })?
423                .clone();
424            infos.push(self.build_collection_info(&space_name, &collection)?);
425        }
426        Ok(infos)
427    }
428
429    pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
430        let resolved = self.resolve_space_row(space, Some(name))?;
431        let collection = self.storage.get_collection(resolved.id, name)?;
432        self.build_collection_info(&resolved.name, &collection)
433    }
434
435    pub fn list_files(
436        &self,
437        space: Option<&str>,
438        collection: &str,
439        prefix: Option<&str>,
440    ) -> Result<Vec<FileEntry>> {
441        let resolved_space = self.resolve_space_row(space, Some(collection))?;
442        let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
443        let normalized_prefix = normalize_list_prefix(prefix)?;
444        let file_rows = self
445            .storage
446            .list_collection_file_rows(collection_row.id, false)?;
447
448        let mut files = Vec::with_capacity(file_rows.len());
449        for file_row in file_rows {
450            if let Some(prefix) = normalized_prefix.as_deref() {
451                if !path_matches_prefix(&file_row.path, prefix) {
452                    continue;
453                }
454            }
455
456            files.push(FileEntry {
457                path: file_row.path,
458                title: file_row.title,
459                docid: short_docid(&file_row.hash),
460                active: file_row.active,
461                chunk_count: file_row.chunk_count,
462                embedded: file_row.chunk_count > 0
463                    && file_row.embedded_chunk_count >= file_row.chunk_count,
464            });
465        }
466
467        Ok(files)
468    }
469
470    pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
471        self.get_document_unlocked(req)
472    }
473
474    fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
475        let GetRequest {
476            locator,
477            space,
478            offset,
479            limit,
480        } = req;
481
482        let (document, collection_row, space_name) = match locator {
483            Locator::Path(locator_path) => {
484                let (collection_name, relative_path) = split_collection_path(&locator_path)?;
485                let resolved_space =
486                    self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
487                let collection = self
488                    .storage
489                    .get_collection(resolved_space.id, &collection_name)?;
490                let document = self
491                    .storage
492                    .get_document_by_path(collection.id, &relative_path)?
493                    .ok_or_else(|| KboltError::DocumentNotFound {
494                        path: locator_path.clone(),
495                    })?;
496                (document, collection, resolved_space.name)
497            }
498            Locator::DocId(docid) => {
499                let prefix = normalize_docid(&docid)?;
500                let mut candidates = self
501                    .storage
502                    .get_document_by_hash_prefix(&prefix)?
503                    .into_iter()
504                    .map(|document| {
505                        let collection =
506                            self.storage.get_collection_by_id(document.collection_id)?;
507                        Ok((document, collection))
508                    })
509                    .collect::<Result<Vec<_>>>()?;
510
511                if let Some(space_name) = space.as_deref() {
512                    let resolved_space = self.resolve_space_row(Some(space_name), None)?;
513                    candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
514                }
515
516                if candidates.is_empty() {
517                    return Err(KboltError::DocumentNotFound {
518                        path: format!("#{prefix}"),
519                    }
520                    .into());
521                }
522
523                if candidates.len() > 1 {
524                    return Err(KboltError::InvalidInput(
525                        "docid is ambiguous; provide more characters".to_string(),
526                    )
527                    .into());
528                }
529
530                let (document, collection) = candidates.pop().expect("candidate exists");
531                let space = self.storage.get_space_by_id(collection.space_id)?;
532                (document, collection, space.name)
533            }
534        };
535
536        let full_path = collection_row.path.join(&document.path);
537        let bytes = match std::fs::read(&full_path) {
538            Ok(bytes) => bytes,
539            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
540                return Err(KboltError::FileDeleted(full_path).into())
541            }
542            Err(err) => return Err(err.into()),
543        };
544
545        let raw_content = String::from_utf8_lossy(&bytes).into_owned();
546        let stale = sha256_hex(&bytes) != document.hash;
547
548        let lines = raw_content.lines().collect::<Vec<_>>();
549        let total_lines = lines.len();
550        let start = offset.unwrap_or(0).min(total_lines);
551        let requested = limit.unwrap_or(total_lines.saturating_sub(start));
552        let end = start.saturating_add(requested).min(total_lines);
553        let returned_lines = end.saturating_sub(start);
554        let content = if returned_lines == 0 {
555            String::new()
556        } else {
557            lines[start..end].join("\n")
558        };
559
560        Ok(DocumentResponse {
561            docid: short_docid(&document.hash),
562            path: format!("{}/{}", collection_row.name, document.path),
563            title: document.title,
564            space: space_name,
565            collection: collection_row.name,
566            content,
567            stale,
568            total_lines,
569            returned_lines,
570        })
571    }
572
573    pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
574        if req.max_files == 0 {
575            return Err(
576                KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
577            );
578        }
579        if req.max_bytes == 0 {
580            return Err(
581                KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
582            );
583        }
584
585        let mut documents = Vec::new();
586        let mut omitted = Vec::new();
587        let mut resolved_count = 0usize;
588        let mut warnings = Vec::new();
589        let mut consumed_bytes = 0usize;
590
591        for locator in req.locators {
592            let document = match self.get_document_unlocked(GetRequest {
593                locator,
594                space: req.space.clone(),
595                offset: None,
596                limit: None,
597            }) {
598                Ok(document) => document,
599                Err(err) => match KboltError::from(err) {
600                    KboltError::FileDeleted(path) => {
601                        warnings.push(format!(
602                            "file deleted since indexing: {}. run `kbolt update` to refresh.",
603                            path.display()
604                        ));
605                        continue;
606                    }
607                    KboltError::DocumentNotFound { path } => {
608                        warnings.push(format!("document not found: {path}"));
609                        continue;
610                    }
611                    KboltError::InvalidInput(message) => {
612                        warnings.push(format!("invalid locator: {message}"));
613                        continue;
614                    }
615                    KboltError::InvalidPath(path) => {
616                        warnings.push(format!("invalid locator path: {}", path.display()));
617                        continue;
618                    }
619                    KboltError::AmbiguousSpace { collection, spaces } => {
620                        warnings.push(format!(
621                            "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
622                            spaces
623                        ));
624                        continue;
625                    }
626                    KboltError::SpaceNotFound { name } => {
627                        warnings.push(format!("space not found for locator: {name}"));
628                        continue;
629                    }
630                    KboltError::CollectionNotFound { name } => {
631                        warnings.push(format!("collection not found for locator: {name}"));
632                        continue;
633                    }
634                    other => return Err(other.into()),
635                },
636            };
637            resolved_count = resolved_count.saturating_add(1);
638
639            let size_bytes = document.content.len();
640            if documents.len() >= req.max_files {
641                omitted.push(OmittedFile {
642                    path: document.path,
643                    docid: document.docid,
644                    size_bytes,
645                    reason: OmitReason::MaxFiles,
646                });
647                continue;
648            }
649
650            if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
651                omitted.push(OmittedFile {
652                    path: document.path,
653                    docid: document.docid,
654                    size_bytes,
655                    reason: OmitReason::MaxBytes,
656                });
657                continue;
658            }
659
660            consumed_bytes = consumed_bytes.saturating_add(size_bytes);
661            documents.push(document);
662        }
663
664        Ok(MultiGetResponse {
665            resolved_count,
666            documents,
667            omitted,
668            warnings,
669        })
670    }
671
672    pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
673        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
674        let started = Instant::now();
675        let query = req.query.trim();
676        if query.is_empty() {
677            return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
678        }
679
680        let requested_mode = req.mode.clone();
681        let rerank_enabled =
682            matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
683        let mut pipeline = self.initial_search_pipeline(&requested_mode);
684
685        let targets = self.resolve_targets(TargetScope {
686            space: req.space.as_deref(),
687            collections: &req.collections,
688        })?;
689
690        let staleness_hint = targets
691            .iter()
692            .map(|target| target.collection.updated.clone())
693            .max()
694            .map(|updated| format!("Index last updated: {updated}"));
695
696        if req.limit == 0 || targets.is_empty() {
697            return Ok(SearchResponse {
698                results: Vec::new(),
699                query: req.query,
700                requested_mode: requested_mode.clone(),
701                effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
702                pipeline,
703                staleness_hint,
704                elapsed_ms: started.elapsed().as_millis() as u64,
705            });
706        }
707
708        let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
709        for target in &targets {
710            collections_by_id.insert(
711                target.collection.id,
712                SearchCollectionMeta {
713                    space: target.space.clone(),
714                    collection: target.collection.name.clone(),
715                    path: target.collection.path.clone(),
716                },
717            );
718        }
719
720        let max_candidates = self.max_search_candidates(&targets)?;
721        let mut retrieval_limit =
722            self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
723        let results = loop {
724            let ranked_chunks = self.rank_chunks_for_mode(
725                &requested_mode,
726                &targets,
727                query,
728                retrieval_limit,
729                req.min_score,
730                &mut pipeline,
731            )?;
732
733            if ranked_chunks.is_empty() {
734                return Ok(SearchResponse {
735                    results: Vec::new(),
736                    query: req.query,
737                    requested_mode: requested_mode.clone(),
738                    effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
739                    pipeline,
740                    staleness_hint,
741                    elapsed_ms: started.elapsed().as_millis() as u64,
742                });
743            }
744
745            let ranked_len = ranked_chunks.len();
746            let results = self.assemble_search_results(
747                query,
748                &requested_mode,
749                ranked_chunks,
750                &collections_by_id,
751                req.debug,
752                rerank_enabled,
753                &mut pipeline,
754                req.limit,
755            )?;
756
757            if results.len() >= req.limit
758                || ranked_len < retrieval_limit
759                || retrieval_limit >= max_candidates
760            {
761                break results;
762            }
763
764            let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
765            if next_limit <= retrieval_limit {
766                break results;
767            }
768            retrieval_limit = next_limit;
769        };
770
771        Ok(SearchResponse {
772            results,
773            query: req.query,
774            requested_mode: requested_mode.clone(),
775            effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
776            pipeline,
777            staleness_hint,
778            elapsed_ms: started.elapsed().as_millis() as u64,
779        })
780    }
781
782    pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
783        let resolved = self.resolve_space_row(explicit, None)?;
784        Ok(resolved.name)
785    }
786
787    pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
788        let resolved = self.resolve_preferred_space(explicit)?;
789        Ok(resolved.map(|(space, source)| ActiveSpace {
790            name: space.name,
791            source,
792        }))
793    }
794
795    pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
796        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
797        let (spaces, totals_scope) = if let Some(space_name) = space {
798            let resolved = self.resolve_space_row(Some(space_name), None)?;
799            (vec![resolved.clone()], Some(resolved.id))
800        } else {
801            (self.storage.list_spaces()?, None)
802        };
803
804        let mut space_statuses = Vec::with_capacity(spaces.len());
805        for space_row in spaces {
806            let collections = self.storage.list_collections(Some(space_row.id))?;
807            let mut collection_statuses = Vec::with_capacity(collections.len());
808            let mut last_updated: Option<String> = None;
809
810            for collection in collections {
811                if last_updated
812                    .as_ref()
813                    .map(|existing| collection.updated > *existing)
814                    .unwrap_or(true)
815                {
816                    last_updated = Some(collection.updated.clone());
817                }
818
819                let documents = self
820                    .storage
821                    .count_documents_in_collection(collection.id, false)?;
822                let active_documents = self
823                    .storage
824                    .count_documents_in_collection(collection.id, true)?;
825                let chunks = self.storage.count_chunks_in_collection(collection.id)?;
826                let embedded_chunks = self
827                    .storage
828                    .count_embedded_chunks_in_collection(collection.id)?;
829
830                collection_statuses.push(CollectionStatus {
831                    name: collection.name,
832                    path: collection.path,
833                    documents,
834                    active_documents,
835                    chunks,
836                    embedded_chunks,
837                    last_updated: collection.updated,
838                });
839            }
840
841            space_statuses.push(SpaceStatus {
842                name: space_row.name,
843                description: space_row.description,
844                collections: collection_statuses,
845                last_updated,
846            });
847        }
848
849        let models = self.model_status_unlocked()?;
850
851        Ok(StatusResponse {
852            spaces: space_statuses,
853            models,
854            cache_dir: self.config.cache_dir.clone(),
855            config_dir: self.config.config_dir.clone(),
856            total_documents: self.storage.count_documents(totals_scope)?,
857            total_chunks: self.storage.count_chunks(totals_scope)?,
858            total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
859            disk_usage: self.storage.disk_usage()?,
860        })
861    }
862
863    pub fn model_status(&self) -> Result<ModelStatus> {
864        self.model_status_unlocked()
865    }
866
867    fn model_status_unlocked(&self) -> Result<ModelStatus> {
868        models::status(&self.config)
869    }
870
871    pub fn config(&self) -> &Config {
872        &self.config
873    }
874
875    pub fn storage(&self) -> &Storage {
876        &self.storage
877    }
878
879    fn embedding_model_key(&self) -> &str {
880        self.config
881            .roles
882            .embedder
883            .as_ref()
884            .and_then(|role| self.config.providers.get(&role.provider))
885            .map(|profile| profile.model())
886            .unwrap_or("embedder")
887    }
888
889    fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
890        OperationLock::acquire(&self.config.cache_dir, mode)
891    }
892
893    fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
894        let mut chunk_ids = Vec::new();
895        for doc_id in doc_ids {
896            let chunks = self.storage.get_chunks_for_document(*doc_id)?;
897            chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
898        }
899        Ok(chunk_ids)
900    }
901
902    fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
903        if chunk_ids.is_empty() {
904            return Ok(());
905        }
906
907        self.storage.delete_tantivy(space, chunk_ids)?;
908        self.storage.commit_tantivy(space)?;
909        self.storage.delete_usearch(space, chunk_ids)?;
910        Ok(())
911    }
912
913    fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
914        let collection_count = self.storage.list_collections(Some(space.id))?.len();
915        let document_count = self.storage.count_documents(Some(space.id))?;
916        let chunk_count = self.storage.count_chunks(Some(space.id))?;
917
918        Ok(SpaceInfo {
919            name: space.name.clone(),
920            description: space.description.clone(),
921            collection_count,
922            document_count,
923            chunk_count,
924            created: space.created.clone(),
925        })
926    }
927
928    fn build_collection_info(
929        &self,
930        space_name: &str,
931        collection: &CollectionRow,
932    ) -> Result<CollectionInfo> {
933        let document_count = self
934            .storage
935            .count_documents_in_collection(collection.id, false)?;
936        let active_document_count = self
937            .storage
938            .count_documents_in_collection(collection.id, true)?;
939        let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
940        let embedded_chunk_count = self
941            .storage
942            .count_embedded_chunks_in_collection(collection.id)?;
943
944        Ok(CollectionInfo {
945            name: collection.name.clone(),
946            space: space_name.to_string(),
947            path: collection.path.clone(),
948            description: collection.description.clone(),
949            extensions: collection.extensions.clone(),
950            document_count,
951            active_document_count,
952            chunk_count,
953            embedded_chunk_count,
954            created: collection.created.clone(),
955            updated: collection.updated.clone(),
956        })
957    }
958
959    fn resolve_space_row(
960        &self,
961        explicit: Option<&str>,
962        collection_for_lookup: Option<&str>,
963    ) -> Result<crate::storage::SpaceRow> {
964        if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
965            return Ok(space);
966        }
967
968        if let Some(collection) = collection_for_lookup {
969            return match self.storage.find_space_for_collection(collection)? {
970                SpaceResolution::Found(space) => Ok(space),
971                SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
972                    collection: collection.to_string(),
973                    spaces,
974                }
975                .into()),
976                SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
977                    name: collection.to_string(),
978                }
979                .into()),
980            };
981        }
982
983        Err(KboltError::NoActiveSpace.into())
984    }
985
986    fn resolve_preferred_space(
987        &self,
988        explicit: Option<&str>,
989    ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
990        if let Some(space_name) = explicit {
991            let space = self.storage.get_space(space_name)?;
992            return Ok(Some((space, ActiveSpaceSource::Flag)));
993        }
994
995        if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
996            let trimmed = space_name.trim();
997            if !trimmed.is_empty() {
998                let space = self.storage.get_space(trimmed)?;
999                return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1000            }
1001        }
1002
1003        if let Some(space_name) = self.config.default_space.as_deref() {
1004            let space = self.storage.get_space(space_name)?;
1005            return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1006        }
1007
1008        Ok(None)
1009    }
1010}
1011
1012#[cfg(test)]
1013mod tests;
1014
1015#[cfg(test)]
1016mod lock_relief_tests {
1017    use fs2::FileExt;
1018    use std::collections::HashMap;
1019    use std::fs::OpenOptions;
1020    use std::mem;
1021
1022    use tempfile::tempdir;
1023
1024    use super::Engine;
1025    use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1026    use crate::storage::Storage;
1027    use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1028
1029    #[test]
1030    fn metadata_reads_succeed_while_global_lock_is_held() {
1031        let engine = test_engine_with_indexed_collection();
1032        let _holder = hold_global_lock(&engine);
1033
1034        let spaces = engine.list_spaces().expect("list spaces");
1035        assert_eq!(
1036            spaces
1037                .iter()
1038                .map(|space| space.name.as_str())
1039                .collect::<Vec<_>>(),
1040            vec!["default", "work"]
1041        );
1042
1043        let space = engine.space_info("work").expect("load space info");
1044        assert_eq!(space.name, "work");
1045        assert_eq!(space.collection_count, 1);
1046
1047        let collections = engine
1048            .list_collections(Some("work"))
1049            .expect("list collections");
1050        assert_eq!(collections.len(), 1);
1051        assert_eq!(collections[0].name, "api");
1052
1053        let collection = engine
1054            .collection_info(Some("work"), "api")
1055            .expect("load collection info");
1056        assert_eq!(collection.name, "api");
1057        assert_eq!(collection.document_count, 1);
1058
1059        let models = engine.model_status().expect("read model status");
1060        assert!(!models.embedder.configured);
1061        assert!(!models.reranker.configured);
1062        assert!(!models.expander.configured);
1063    }
1064
1065    #[test]
1066    fn document_reads_succeed_while_global_lock_is_held() {
1067        let engine = test_engine_with_indexed_collection();
1068        let _holder = hold_global_lock(&engine);
1069
1070        let files = engine
1071            .list_files(Some("work"), "api", None)
1072            .expect("list files");
1073        assert_eq!(files.len(), 1);
1074        assert_eq!(files[0].path, "guide.md");
1075
1076        let document = engine
1077            .get_document(GetRequest {
1078                locator: Locator::Path("api/guide.md".to_string()),
1079                space: Some("work".to_string()),
1080                offset: None,
1081                limit: None,
1082            })
1083            .expect("read document");
1084        assert_eq!(document.path, "api/guide.md");
1085        assert!(document.content.contains("hello from kbolt"));
1086
1087        let response = engine
1088            .multi_get(MultiGetRequest {
1089                locators: vec![Locator::Path("api/guide.md".to_string())],
1090                space: Some("work".to_string()),
1091                max_files: 4,
1092                max_bytes: 4_096,
1093            })
1094            .expect("read multiple documents");
1095        assert_eq!(response.resolved_count, 1);
1096        assert_eq!(response.documents.len(), 1);
1097        assert!(response.warnings.is_empty());
1098    }
1099
1100    fn test_engine_with_indexed_collection() -> Engine {
1101        let root = tempdir().expect("create temp root");
1102        let root_path = root.path().to_path_buf();
1103        mem::forget(root);
1104
1105        let config_dir = root_path.join("config");
1106        let cache_dir = root_path.join("cache");
1107        let docs_dir = root_path.join("docs");
1108        std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1109        std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1110            .expect("write document");
1111
1112        let storage = Storage::new(&cache_dir).expect("create storage");
1113        let config = Config {
1114            config_dir,
1115            cache_dir,
1116            default_space: None,
1117            providers: HashMap::new(),
1118            roles: crate::config::RoleBindingsConfig::default(),
1119            reaping: ReapingConfig { days: 7 },
1120            chunking: ChunkingConfig::default(),
1121            ranking: RankingConfig::default(),
1122        };
1123        let engine = Engine::from_parts(storage, config);
1124
1125        engine
1126            .add_collection(AddCollectionRequest {
1127                path: docs_dir,
1128                space: Some("work".to_string()),
1129                name: Some("api".to_string()),
1130                description: None,
1131                extensions: None,
1132                no_index: true,
1133            })
1134            .expect("add collection");
1135        engine
1136            .update(UpdateOptions {
1137                space: Some("work".to_string()),
1138                collections: vec!["api".to_string()],
1139                no_embed: true,
1140                dry_run: false,
1141                verbose: false,
1142            })
1143            .expect("index collection");
1144
1145        engine
1146    }
1147
1148    fn hold_global_lock(engine: &Engine) -> std::fs::File {
1149        std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1150        let holder = OpenOptions::new()
1151            .read(true)
1152            .write(true)
1153            .create(true)
1154            .truncate(false)
1155            .open(engine.config().cache_dir.join("kbolt.lock"))
1156            .expect("open lock file");
1157        FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1158        holder
1159    }
1160}