Skip to main content

kbolt_core/
engine.rs

1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::canonical::build_canonical_document;
10use crate::ingest::chunk::{
11    chunk_canonical_document, chunk_canonical_document_with_counter, resolve_policy,
12};
13use crate::ingest::extract::default_registry;
14use crate::lock::{LockMode, OperationLock};
15use crate::models;
16use crate::storage::Storage;
17use crate::storage::{
18    ChunkInsert, ChunkRow, CollectionRow, DocumentGenerationReplace, DocumentRow, DocumentTextRow,
19    DocumentTitleSource, SpaceResolution, TantivyEntry,
20};
21use crate::Result;
22use kbolt_types::{
23    ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
24    CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
25    InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
26    OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
27    SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
28    SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
29};
30mod eval_ops;
31mod file_utils;
32pub(crate) mod ignore_helpers;
33mod ignore_ops;
34mod path_utils;
35mod schedule_ops;
36mod schedule_run_ops;
37mod schedule_status_ops;
38mod scoring;
39mod search_ops;
40mod text_helpers;
41mod update_ops;
42use file_utils::{file_error, file_title, modified_token, sha256_hex};
43use ignore_helpers::{
44    collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
45    load_collection_ignore_matcher, validate_ignore_pattern,
46};
47use path_utils::{
48    collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
49    normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
50};
51use scoring::{dense_distance_to_score, max_option};
52pub(crate) use text_helpers::retrieval_text_with_prefix;
53use text_helpers::search_text_with_loaded_canonical_neighbors;
54
55pub struct Engine {
56    storage: Storage,
57    config: Config,
58    embedder: Option<Arc<dyn models::Embedder>>,
59    embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
60    reranker: Option<Arc<dyn models::Reranker>>,
61    expander: Option<Arc<dyn models::Expander>>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct IgnoreListEntry {
66    pub space: String,
67    pub collection: String,
68    pub pattern_count: usize,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct UpdateTarget {
73    pub space: String,
74    pub collection: CollectionRow,
75}
76
77#[derive(Debug, Clone, Copy)]
78struct TargetScope<'a> {
79    space: Option<&'a str>,
80    collections: &'a [String],
81}
82
83#[derive(Debug, Clone)]
84struct SearchCollectionMeta {
85    space: String,
86    collection: String,
87}
88
89#[derive(Debug)]
90struct SearchTargetScope {
91    space: String,
92    collection_ids: Vec<i64>,
93    filtered: bool,
94    document_ids: Vec<i64>,
95    chunk_count: usize,
96    chunk_key_filter: Mutex<Option<Arc<HashSet<u64>>>>,
97    bm25_reloaded: Mutex<bool>,
98}
99
100#[derive(Debug, Clone)]
101struct SearchHitCandidate {
102    chunk_id: i64,
103    bm25_score: f32,
104}
105
106#[derive(Debug, Clone)]
107struct RankedChunk {
108    chunk_id: i64,
109    score: f32,
110    fusion: f32,
111    reranker: Option<f32>,
112    bm25: Option<f32>,
113    dense: Option<f32>,
114    original_rank: Option<usize>,
115}
116
117impl Engine {
118    pub fn new(config_path: Option<&Path>) -> Result<Self> {
119        Self::new_with_recovery_notice(config_path, None)
120    }
121
122    pub fn new_with_recovery_notice(
123        config_path: Option<&Path>,
124        recovery_notice: Option<crate::RecoveryNoticeSink>,
125    ) -> Result<Self> {
126        let config = config::load(config_path)?;
127        let storage = Storage::new(&config.cache_dir)?;
128        let built_models =
129            models::build_inference_clients_with_recovery_notice(&config, recovery_notice)?;
130        Ok(Self {
131            storage,
132            config,
133            embedder: built_models.embedder,
134            embedding_document_sizer: built_models.embedding_document_sizer,
135            reranker: built_models.reranker,
136            expander: built_models.expander,
137        })
138    }
139
140    #[cfg(test)]
141    pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
142        Self::from_parts_with_models(storage, config, None, None, None)
143    }
144
145    #[cfg(test)]
146    pub(crate) fn from_parts_with_embedder(
147        storage: Storage,
148        config: Config,
149        embedder: Option<Arc<dyn models::Embedder>>,
150    ) -> Self {
151        Self::from_parts_with_inference(storage, config, embedder, None, None, None)
152    }
153
154    #[cfg(test)]
155    pub(crate) fn from_parts_with_embedding_runtime(
156        storage: Storage,
157        config: Config,
158        embedder: Option<Arc<dyn models::Embedder>>,
159        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
160    ) -> Self {
161        Self::from_parts_with_inference(
162            storage,
163            config,
164            embedder,
165            embedding_document_sizer,
166            None,
167            None,
168        )
169    }
170
171    #[cfg(test)]
172    pub(crate) fn from_parts_with_models(
173        storage: Storage,
174        config: Config,
175        embedder: Option<Arc<dyn models::Embedder>>,
176        reranker: Option<Arc<dyn models::Reranker>>,
177        expander: Option<Arc<dyn models::Expander>>,
178    ) -> Self {
179        Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
180    }
181
182    #[cfg(test)]
183    fn from_parts_with_inference(
184        storage: Storage,
185        config: Config,
186        embedder: Option<Arc<dyn models::Embedder>>,
187        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
188        reranker: Option<Arc<dyn models::Reranker>>,
189        expander: Option<Arc<dyn models::Expander>>,
190    ) -> Self {
191        let built_models =
192            models::build_inference_clients(&config).expect("build inference models");
193        Self {
194            storage,
195            config,
196            embedder: embedder.or(built_models.embedder),
197            embedding_document_sizer: embedding_document_sizer
198                .or(built_models.embedding_document_sizer),
199            reranker: reranker.or(built_models.reranker),
200            expander: expander.or(built_models.expander),
201        }
202    }
203
204    pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
205        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
206        self.storage.create_space(name, description)?;
207        let space = self.storage.get_space(name)?;
208        self.build_space_info(&space)
209    }
210
211    pub fn remove_space(&mut self, name: &str) -> Result<()> {
212        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
213        self.storage.delete_space(name)?;
214        if self.config.default_space.as_deref() == Some(name) {
215            self.persist_default_space(None)?;
216        }
217        Ok(())
218    }
219
220    pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
221        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
222        self.storage.rename_space(old, new)?;
223        if self.config.default_space.as_deref() == Some(old) {
224            self.persist_default_space(Some(new.to_string()))?;
225        }
226        Ok(())
227    }
228
229    pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
230        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
231        self.storage.update_space_description(name, description)
232    }
233
234    pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
235        let spaces = self.storage.list_spaces()?;
236        let mut infos = Vec::with_capacity(spaces.len());
237        for space in spaces {
238            infos.push(self.build_space_info(&space)?);
239        }
240        Ok(infos)
241    }
242
243    pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
244        let space = self.storage.get_space(name)?;
245        self.build_space_info(&space)
246    }
247
248    pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
249        if let Some(space_name) = name {
250            self.storage.get_space(space_name)?;
251        }
252
253        self.persist_default_space(name.map(ToString::to_string))?;
254        Ok(self.config.default_space.clone())
255    }
256
257    fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
258        let previous = self.config.default_space.clone();
259        self.config.default_space = default_space;
260        if let Err(err) = config::save(&self.config) {
261            self.config.default_space = previous;
262            return Err(err);
263        }
264        Ok(())
265    }
266
267    pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
268        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
269        let AddCollectionRequest {
270            path,
271            space: requested_space,
272            name: requested_name,
273            description,
274            extensions,
275            no_index,
276        } = req;
277
278        let space = match requested_space.as_deref() {
279            Some(space_name) => match self.storage.get_space(space_name) {
280                Ok(space) => space,
281                Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
282                    self.storage.create_space(space_name, None)?;
283                    self.storage.get_space(space_name)?
284                }
285                Err(err) => return Err(err),
286            },
287            None => self.resolve_space_row(None, None)?,
288        };
289        if !path.is_absolute() || !path.is_dir() {
290            return Err(KboltError::InvalidPath(path).into());
291        }
292
293        let name = match requested_name {
294            Some(name) => name,
295            None => path
296                .file_name()
297                .and_then(|name| name.to_str())
298                .map(ToString::to_string)
299                .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
300        };
301
302        self.storage.create_collection(
303            space.id,
304            &name,
305            &path,
306            description.as_deref(),
307            extensions.as_deref(),
308        )?;
309
310        let initial_indexing = if no_index {
311            InitialIndexingOutcome::Skipped
312        } else {
313            match self.update_unlocked(UpdateOptions {
314                space: Some(space.name.clone()),
315                collections: vec![name.clone()],
316                no_embed: false,
317                dry_run: false,
318                verbose: false,
319            }) {
320                Ok(report) => InitialIndexingOutcome::Indexed(report),
321                Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
322                    InitialIndexingOutcome::Blocked(
323                        InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
324                    )
325                }
326                Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
327                    InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
328                        name,
329                    })
330                }
331                Err(err) => return Err(err),
332            }
333        };
334
335        let collection = self.storage.get_collection(space.id, &name)?;
336        Ok(AddCollectionResult {
337            collection: self.build_collection_info(&space.name, &collection)?,
338            initial_indexing,
339        })
340    }
341
342    pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
343        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
344        let resolved = self.resolve_space_row(space, Some(name))?;
345        let collection = self.storage.get_collection(resolved.id, name)?;
346        let documents = self.storage.list_documents(collection.id, false)?;
347        let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
348        let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
349        self.purge_space_chunks(&resolved.name, &chunk_ids)?;
350        self.storage.delete_collection(resolved.id, name)?;
351
352        let ignore_path =
353            collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
354        if ignore_path.is_file() {
355            std::fs::remove_file(ignore_path)?;
356        }
357
358        Ok(())
359    }
360
361    pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
362        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
363        let resolved = self.resolve_space_row(space, Some(old))?;
364        let old_ignore_path =
365            collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
366        let new_ignore_path =
367            collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
368        if old_ignore_path.is_file() && new_ignore_path.exists() {
369            return Err(KboltError::Internal(format!(
370                "cannot rename ignore file: destination already exists: {}",
371                new_ignore_path.display()
372            ))
373            .into());
374        }
375
376        self.storage.rename_collection(resolved.id, old, new)?;
377
378        if old_ignore_path.is_file() {
379            if let Some(parent) = new_ignore_path.parent() {
380                std::fs::create_dir_all(parent)?;
381            }
382            if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
383                match self.storage.rename_collection(resolved.id, new, old) {
384                    Ok(()) => {
385                        return Err(KboltError::Internal(format!(
386                            "renamed collection was rolled back after ignore rename failure: {rename_err}"
387                        ))
388                        .into())
389                    }
390                    Err(rollback_err) => {
391                        return Err(KboltError::Internal(format!(
392                            "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
393                        ))
394                        .into())
395                    }
396                }
397            }
398        }
399
400        Ok(())
401    }
402
403    pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
404        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
405        let resolved = self.resolve_space_row(space, Some(name))?;
406        self.storage
407            .update_collection_description(resolved.id, name, desc)
408    }
409
410    pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
411        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
412            let resolved = self.resolve_space_row(Some(space_name), None)?;
413            let mut map = std::collections::HashMap::new();
414            map.insert(resolved.id, resolved.name.clone());
415            (Some(resolved.id), map)
416        } else {
417            let spaces = self.storage.list_spaces()?;
418            let map = spaces
419                .into_iter()
420                .map(|space| (space.id, space.name))
421                .collect::<std::collections::HashMap<_, _>>();
422            (None, map)
423        };
424
425        let collections = self.storage.list_collections(space_id_filter)?;
426        let mut infos = Vec::with_capacity(collections.len());
427        for collection in collections {
428            let space_name = spaces_by_id
429                .get(&collection.space_id)
430                .ok_or_else(|| {
431                    KboltError::Internal(format!(
432                        "missing space mapping for collection '{}'",
433                        collection.name
434                    ))
435                })?
436                .clone();
437            infos.push(self.build_collection_info(&space_name, &collection)?);
438        }
439        Ok(infos)
440    }
441
442    pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
443        let resolved = self.resolve_space_row(space, Some(name))?;
444        let collection = self.storage.get_collection(resolved.id, name)?;
445        self.build_collection_info(&resolved.name, &collection)
446    }
447
448    pub fn list_files(
449        &self,
450        space: Option<&str>,
451        collection: &str,
452        prefix: Option<&str>,
453    ) -> Result<Vec<FileEntry>> {
454        let resolved_space = self.resolve_space_row(space, Some(collection))?;
455        let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
456        let normalized_prefix = normalize_list_prefix(prefix)?;
457        let file_rows = self
458            .storage
459            .list_collection_file_rows(collection_row.id, false)?;
460
461        let mut files = Vec::with_capacity(file_rows.len());
462        for file_row in file_rows {
463            if let Some(prefix) = normalized_prefix.as_deref() {
464                if !path_matches_prefix(&file_row.path, prefix) {
465                    continue;
466                }
467            }
468
469            files.push(FileEntry {
470                path: file_row.path,
471                title: file_row.title,
472                docid: short_docid(&file_row.hash),
473                active: file_row.active,
474                chunk_count: file_row.chunk_count,
475                embedded: file_row.chunk_count > 0
476                    && file_row.embedded_chunk_count >= file_row.chunk_count,
477            });
478        }
479
480        Ok(files)
481    }
482
483    pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
484        self.get_document_unlocked(req)
485    }
486
487    fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
488        let GetRequest {
489            locator,
490            space,
491            offset,
492            limit,
493        } = req;
494
495        let (document, collection_row, space_name) = match locator {
496            Locator::Path(locator_path) => {
497                let (collection_name, relative_path) = split_collection_path(&locator_path)?;
498                let resolved_space =
499                    self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
500                let collection = self
501                    .storage
502                    .get_collection(resolved_space.id, &collection_name)?;
503                let document = self
504                    .storage
505                    .get_document_by_path(collection.id, &relative_path)?
506                    .ok_or_else(|| KboltError::DocumentNotFound {
507                        path: locator_path.clone(),
508                    })?;
509                (document, collection, resolved_space.name)
510            }
511            Locator::DocId(docid) => {
512                let prefix = normalize_docid(&docid)?;
513                let mut candidates = self
514                    .storage
515                    .get_document_by_hash_prefix(&prefix)?
516                    .into_iter()
517                    .map(|document| {
518                        let collection =
519                            self.storage.get_collection_by_id(document.collection_id)?;
520                        Ok((document, collection))
521                    })
522                    .collect::<Result<Vec<_>>>()?;
523
524                if let Some(space_name) = space.as_deref() {
525                    let resolved_space = self.resolve_space_row(Some(space_name), None)?;
526                    candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
527                }
528
529                if candidates.is_empty() {
530                    return Err(KboltError::DocumentNotFound {
531                        path: format!("#{prefix}"),
532                    }
533                    .into());
534                }
535
536                if candidates.len() > 1 {
537                    return Err(KboltError::InvalidInput(
538                        "docid is ambiguous; provide more characters".to_string(),
539                    )
540                    .into());
541                }
542
543                let (document, collection) = candidates.pop().expect("candidate exists");
544                let space = self.storage.get_space_by_id(collection.space_id)?;
545                (document, collection, space.name)
546            }
547        };
548
549        let document_text = self.storage.get_document_text(document.id)?;
550        let full_path = collection_row.path.join(&document.path);
551        let stale = source_is_stale(&full_path, document.hash.as_str())?;
552
553        let lines = document_text.text.lines().collect::<Vec<_>>();
554        let total_lines = lines.len();
555        let start = offset.unwrap_or(0).min(total_lines);
556        let requested = limit.unwrap_or(total_lines.saturating_sub(start));
557        let end = start.saturating_add(requested).min(total_lines);
558        let returned_lines = end.saturating_sub(start);
559        let content = if returned_lines == 0 {
560            String::new()
561        } else {
562            lines[start..end].join("\n")
563        };
564
565        Ok(DocumentResponse {
566            docid: short_docid(&document.hash),
567            path: format!("{}/{}", collection_row.name, document.path),
568            title: document.title,
569            space: space_name,
570            collection: collection_row.name,
571            content,
572            stale,
573            total_lines,
574            returned_lines,
575        })
576    }
577
578    pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
579        if req.max_files == 0 {
580            return Err(
581                KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
582            );
583        }
584        if req.max_bytes == 0 {
585            return Err(
586                KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
587            );
588        }
589
590        let mut documents = Vec::new();
591        let mut omitted = Vec::new();
592        let mut resolved_count = 0usize;
593        let mut warnings = Vec::new();
594        let mut consumed_bytes = 0usize;
595
596        for locator in req.locators {
597            let document = match self.get_document_unlocked(GetRequest {
598                locator,
599                space: req.space.clone(),
600                offset: None,
601                limit: None,
602            }) {
603                Ok(document) => document,
604                Err(err) => match KboltError::from(err) {
605                    KboltError::DocumentNotFound { path } => {
606                        warnings.push(format!("document not found: {path}"));
607                        continue;
608                    }
609                    KboltError::InvalidInput(message) => {
610                        warnings.push(format!("invalid locator: {message}"));
611                        continue;
612                    }
613                    KboltError::InvalidPath(path) => {
614                        warnings.push(format!("invalid locator path: {}", path.display()));
615                        continue;
616                    }
617                    KboltError::AmbiguousSpace { collection, spaces } => {
618                        warnings.push(format!(
619                            "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
620                            spaces
621                        ));
622                        continue;
623                    }
624                    KboltError::SpaceNotFound { name } => {
625                        warnings.push(format!("space not found for locator: {name}"));
626                        continue;
627                    }
628                    KboltError::CollectionNotFound { name } => {
629                        warnings.push(format!("collection not found for locator: {name}"));
630                        continue;
631                    }
632                    other => return Err(other.into()),
633                },
634            };
635            resolved_count = resolved_count.saturating_add(1);
636
637            let size_bytes = document.content.len();
638            if documents.len() >= req.max_files {
639                omitted.push(OmittedFile {
640                    path: document.path,
641                    docid: document.docid,
642                    size_bytes,
643                    reason: OmitReason::MaxFiles,
644                });
645                continue;
646            }
647
648            if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
649                omitted.push(OmittedFile {
650                    path: document.path,
651                    docid: document.docid,
652                    size_bytes,
653                    reason: OmitReason::MaxBytes,
654                });
655                continue;
656            }
657
658            consumed_bytes = consumed_bytes.saturating_add(size_bytes);
659            documents.push(document);
660        }
661
662        Ok(MultiGetResponse {
663            resolved_count,
664            documents,
665            omitted,
666            warnings,
667        })
668    }
669
670    pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
671        let _profile = crate::profile::SearchProfileGuard::start();
672        let _lock = crate::profile::timed_search_stage("operation_lock", || {
673            self.acquire_operation_lock(LockMode::Shared)
674        })?;
675        let started = Instant::now();
676        let query = req.query.trim();
677        if query.is_empty() {
678            return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
679        }
680
681        let requested_mode = req.mode.clone();
682        let rerank_enabled =
683            matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
684        let mut pipeline = self.initial_search_pipeline(&requested_mode);
685
686        let targets = crate::profile::timed_search_stage("resolve_targets", || {
687            self.resolve_targets(TargetScope {
688                space: req.space.as_deref(),
689                collections: &req.collections,
690            })
691        })?;
692        crate::profile::increment_search_count("target_collections", targets.len() as u64);
693
694        let staleness_hint = targets
695            .iter()
696            .map(|target| target.collection.updated.clone())
697            .max()
698            .map(|updated| format!("Index last updated: {updated}"));
699
700        if req.limit == 0 || targets.is_empty() {
701            return Ok(SearchResponse {
702                results: Vec::new(),
703                query: req.query,
704                requested_mode: requested_mode.clone(),
705                effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
706                pipeline,
707                staleness_hint,
708                elapsed_ms: started.elapsed().as_millis() as u64,
709            });
710        }
711
712        let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
713        for target in &targets {
714            collections_by_id.insert(
715                target.collection.id,
716                SearchCollectionMeta {
717                    space: target.space.clone(),
718                    collection: target.collection.name.clone(),
719                },
720            );
721        }
722
723        let target_scopes = crate::profile::timed_search_stage("search_target_scopes", || {
724            self.search_target_scopes(&targets, !req.collections.is_empty())
725        })?;
726        crate::profile::increment_search_count("target_scopes", target_scopes.len() as u64);
727        for scope in &target_scopes {
728            crate::profile::increment_search_count("scope_chunks", scope.chunk_count as u64);
729            if scope.filtered {
730                crate::profile::increment_search_count("filtered_scopes", 1);
731                crate::profile::increment_search_count(
732                    "scope_document_filters",
733                    scope.document_ids.len() as u64,
734                );
735            }
736        }
737        let max_candidates = crate::profile::timed_search_stage("max_search_candidates", || {
738            self.max_search_candidates(&target_scopes)
739        });
740        let mut retrieval_limit =
741            self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
742        let final_candidates = loop {
743            crate::profile::increment_search_count("retrieval_iterations", 1);
744            let ranked_chunks = crate::profile::timed_search_stage("rank_chunks", || {
745                self.rank_chunks_for_mode(
746                    &requested_mode,
747                    &target_scopes,
748                    query,
749                    retrieval_limit,
750                    req.min_score,
751                    &mut pipeline,
752                )
753            })?;
754            crate::profile::increment_search_count("ranked_chunks", ranked_chunks.len() as u64);
755
756            if ranked_chunks.is_empty() {
757                return Ok(SearchResponse {
758                    results: Vec::new(),
759                    query: req.query,
760                    requested_mode: requested_mode.clone(),
761                    effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
762                    pipeline,
763                    staleness_hint,
764                    elapsed_ms: started.elapsed().as_millis() as u64,
765                });
766            }
767
768            let ranked_len = ranked_chunks.len();
769            let candidates = crate::profile::timed_search_stage("assemble_candidates", || {
770                self.search_candidates_from_ranked_chunks(ranked_chunks, &collections_by_id)
771            })?;
772            crate::profile::increment_search_count("assembled_candidates", candidates.len() as u64);
773
774            if candidates.len() >= req.limit
775                || ranked_len < retrieval_limit
776                || retrieval_limit >= max_candidates
777            {
778                break candidates;
779            }
780
781            let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
782            if next_limit <= retrieval_limit {
783                break candidates;
784            }
785            retrieval_limit = next_limit;
786        };
787        let results = crate::profile::timed_search_stage("assemble_results", || {
788            self.assemble_search_candidates(
789                query,
790                &requested_mode,
791                final_candidates,
792                req.debug,
793                rerank_enabled,
794                &mut pipeline,
795                req.limit,
796            )
797        })?;
798        crate::profile::increment_search_count("assembled_results", results.len() as u64);
799
800        Ok(SearchResponse {
801            results,
802            query: req.query,
803            requested_mode: requested_mode.clone(),
804            effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
805            pipeline,
806            staleness_hint,
807            elapsed_ms: started.elapsed().as_millis() as u64,
808        })
809    }
810
811    pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
812        let resolved = self.resolve_space_row(explicit, None)?;
813        Ok(resolved.name)
814    }
815
816    pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
817        let resolved = self.resolve_preferred_space(explicit)?;
818        Ok(resolved.map(|(space, source)| ActiveSpace {
819            name: space.name,
820            source,
821        }))
822    }
823
824    pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
825        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
826        let (spaces, totals_scope) = if let Some(space_name) = space {
827            let resolved = self.resolve_space_row(Some(space_name), None)?;
828            (vec![resolved.clone()], Some(resolved.id))
829        } else {
830            (self.storage.list_spaces()?, None)
831        };
832
833        let mut space_statuses = Vec::with_capacity(spaces.len());
834        for space_row in spaces {
835            let collections = self.storage.list_collections(Some(space_row.id))?;
836            let mut collection_statuses = Vec::with_capacity(collections.len());
837            let mut last_updated: Option<String> = None;
838
839            for collection in collections {
840                if last_updated
841                    .as_ref()
842                    .map(|existing| collection.updated > *existing)
843                    .unwrap_or(true)
844                {
845                    last_updated = Some(collection.updated.clone());
846                }
847
848                let documents = self
849                    .storage
850                    .count_documents_in_collection(collection.id, false)?;
851                let active_documents = self
852                    .storage
853                    .count_documents_in_collection(collection.id, true)?;
854                let chunks = self.storage.count_chunks_in_collection(collection.id)?;
855                let embedded_chunks = self
856                    .storage
857                    .count_embedded_chunks_in_collection(collection.id)?;
858
859                collection_statuses.push(CollectionStatus {
860                    name: collection.name,
861                    path: collection.path,
862                    documents,
863                    active_documents,
864                    chunks,
865                    embedded_chunks,
866                    last_updated: collection.updated,
867                });
868            }
869
870            space_statuses.push(SpaceStatus {
871                name: space_row.name,
872                description: space_row.description,
873                collections: collection_statuses,
874                last_updated,
875            });
876        }
877
878        let models = self.model_status_unlocked()?;
879
880        Ok(StatusResponse {
881            spaces: space_statuses,
882            models,
883            cache_dir: self.config.cache_dir.clone(),
884            config_dir: self.config.config_dir.clone(),
885            total_documents: self.storage.count_documents(totals_scope)?,
886            total_chunks: self.storage.count_chunks(totals_scope)?,
887            total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
888            disk_usage: self.storage.disk_usage()?,
889        })
890    }
891
892    pub fn model_status(&self) -> Result<ModelStatus> {
893        self.model_status_unlocked()
894    }
895
896    fn model_status_unlocked(&self) -> Result<ModelStatus> {
897        models::status(&self.config)
898    }
899
900    pub fn config(&self) -> &Config {
901        &self.config
902    }
903
904    pub fn storage(&self) -> &Storage {
905        &self.storage
906    }
907
908    fn embedding_model_key(&self) -> &str {
909        self.config
910            .roles
911            .embedder
912            .as_ref()
913            .and_then(|role| self.config.providers.get(&role.provider))
914            .map(|profile| profile.model())
915            .unwrap_or("embedder")
916    }
917
918    fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
919        OperationLock::acquire(&self.config.cache_dir, mode)
920    }
921
922    fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
923        let mut chunk_ids = Vec::new();
924        for doc_id in doc_ids {
925            let chunks = self.storage.get_chunks_for_document(*doc_id)?;
926            chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
927        }
928        Ok(chunk_ids)
929    }
930
931    fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
932        if chunk_ids.is_empty() {
933            return Ok(());
934        }
935
936        self.storage.delete_tantivy(space, chunk_ids)?;
937        self.storage.commit_tantivy(space)?;
938        self.storage.delete_usearch(space, chunk_ids)?;
939        Ok(())
940    }
941
942    fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
943        let collection_count = self.storage.list_collections(Some(space.id))?.len();
944        let document_count = self.storage.count_documents(Some(space.id))?;
945        let chunk_count = self.storage.count_chunks(Some(space.id))?;
946
947        Ok(SpaceInfo {
948            name: space.name.clone(),
949            description: space.description.clone(),
950            collection_count,
951            document_count,
952            chunk_count,
953            created: space.created.clone(),
954        })
955    }
956
957    fn build_collection_info(
958        &self,
959        space_name: &str,
960        collection: &CollectionRow,
961    ) -> Result<CollectionInfo> {
962        let document_count = self
963            .storage
964            .count_documents_in_collection(collection.id, false)?;
965        let active_document_count = self
966            .storage
967            .count_documents_in_collection(collection.id, true)?;
968        let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
969        let embedded_chunk_count = self
970            .storage
971            .count_embedded_chunks_in_collection(collection.id)?;
972
973        Ok(CollectionInfo {
974            name: collection.name.clone(),
975            space: space_name.to_string(),
976            path: collection.path.clone(),
977            description: collection.description.clone(),
978            extensions: collection.extensions.clone(),
979            document_count,
980            active_document_count,
981            chunk_count,
982            embedded_chunk_count,
983            created: collection.created.clone(),
984            updated: collection.updated.clone(),
985        })
986    }
987
988    fn resolve_space_row(
989        &self,
990        explicit: Option<&str>,
991        collection_for_lookup: Option<&str>,
992    ) -> Result<crate::storage::SpaceRow> {
993        if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
994            return Ok(space);
995        }
996
997        if let Some(collection) = collection_for_lookup {
998            return match self.storage.find_space_for_collection(collection)? {
999                SpaceResolution::Found(space) => Ok(space),
1000                SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
1001                    collection: collection.to_string(),
1002                    spaces,
1003                }
1004                .into()),
1005                SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
1006                    name: collection.to_string(),
1007                }
1008                .into()),
1009            };
1010        }
1011
1012        Err(KboltError::NoActiveSpace.into())
1013    }
1014
1015    fn resolve_preferred_space(
1016        &self,
1017        explicit: Option<&str>,
1018    ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
1019        if let Some(space_name) = explicit {
1020            let space = self.storage.get_space(space_name)?;
1021            return Ok(Some((space, ActiveSpaceSource::Flag)));
1022        }
1023
1024        if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
1025            let trimmed = space_name.trim();
1026            if !trimmed.is_empty() {
1027                let space = self.storage.get_space(trimmed)?;
1028                return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1029            }
1030        }
1031
1032        if let Some(space_name) = self.config.default_space.as_deref() {
1033            let space = self.storage.get_space(space_name)?;
1034            return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1035        }
1036
1037        Ok(None)
1038    }
1039}
1040
1041fn source_is_stale(path: &Path, indexed_hash: &str) -> Result<bool> {
1042    match std::fs::read(path) {
1043        Ok(bytes) => Ok(sha256_hex(&bytes) != indexed_hash),
1044        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(true),
1045        Err(err) => Err(std::io::Error::new(
1046            err.kind(),
1047            format!(
1048                "failed to read indexed source '{}' for stale check: {err}",
1049                path.display()
1050            ),
1051        )
1052        .into()),
1053    }
1054}
1055
1056#[cfg(test)]
1057mod tests;
1058
1059#[cfg(test)]
1060mod lock_relief_tests {
1061    use fs2::FileExt;
1062    use std::collections::HashMap;
1063    use std::fs::OpenOptions;
1064    use std::mem;
1065
1066    use tempfile::tempdir;
1067
1068    use super::Engine;
1069    use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1070    use crate::storage::Storage;
1071    use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1072
1073    #[test]
1074    fn metadata_reads_succeed_while_global_lock_is_held() {
1075        let engine = test_engine_with_indexed_collection();
1076        let _holder = hold_global_lock(&engine);
1077
1078        let spaces = engine.list_spaces().expect("list spaces");
1079        assert_eq!(
1080            spaces
1081                .iter()
1082                .map(|space| space.name.as_str())
1083                .collect::<Vec<_>>(),
1084            vec!["default", "work"]
1085        );
1086
1087        let space = engine.space_info("work").expect("load space info");
1088        assert_eq!(space.name, "work");
1089        assert_eq!(space.collection_count, 1);
1090
1091        let collections = engine
1092            .list_collections(Some("work"))
1093            .expect("list collections");
1094        assert_eq!(collections.len(), 1);
1095        assert_eq!(collections[0].name, "api");
1096
1097        let collection = engine
1098            .collection_info(Some("work"), "api")
1099            .expect("load collection info");
1100        assert_eq!(collection.name, "api");
1101        assert_eq!(collection.document_count, 1);
1102
1103        let models = engine.model_status().expect("read model status");
1104        assert!(!models.embedder.configured);
1105        assert!(!models.reranker.configured);
1106        assert!(!models.expander.configured);
1107    }
1108
1109    #[test]
1110    fn document_reads_succeed_while_global_lock_is_held() {
1111        let engine = test_engine_with_indexed_collection();
1112        let _holder = hold_global_lock(&engine);
1113
1114        let files = engine
1115            .list_files(Some("work"), "api", None)
1116            .expect("list files");
1117        assert_eq!(files.len(), 1);
1118        assert_eq!(files[0].path, "guide.md");
1119
1120        let document = engine
1121            .get_document(GetRequest {
1122                locator: Locator::Path("api/guide.md".to_string()),
1123                space: Some("work".to_string()),
1124                offset: None,
1125                limit: None,
1126            })
1127            .expect("read document");
1128        assert_eq!(document.path, "api/guide.md");
1129        assert!(document.content.contains("hello from kbolt"));
1130
1131        let response = engine
1132            .multi_get(MultiGetRequest {
1133                locators: vec![Locator::Path("api/guide.md".to_string())],
1134                space: Some("work".to_string()),
1135                max_files: 4,
1136                max_bytes: 4_096,
1137            })
1138            .expect("read multiple documents");
1139        assert_eq!(response.resolved_count, 1);
1140        assert_eq!(response.documents.len(), 1);
1141        assert!(response.warnings.is_empty());
1142    }
1143
1144    fn test_engine_with_indexed_collection() -> Engine {
1145        let root = tempdir().expect("create temp root");
1146        let root_path = root.path().to_path_buf();
1147        mem::forget(root);
1148
1149        let config_dir = root_path.join("config");
1150        let cache_dir = root_path.join("cache");
1151        let docs_dir = root_path.join("docs");
1152        std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1153        std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1154            .expect("write document");
1155
1156        let storage = Storage::new(&cache_dir).expect("create storage");
1157        let config = Config {
1158            config_dir,
1159            cache_dir,
1160            default_space: None,
1161            providers: HashMap::new(),
1162            roles: crate::config::RoleBindingsConfig::default(),
1163            reaping: ReapingConfig { days: 7 },
1164            chunking: ChunkingConfig::default(),
1165            ranking: RankingConfig::default(),
1166        };
1167        let engine = Engine::from_parts(storage, config);
1168
1169        engine
1170            .add_collection(AddCollectionRequest {
1171                path: docs_dir,
1172                space: Some("work".to_string()),
1173                name: Some("api".to_string()),
1174                description: None,
1175                extensions: None,
1176                no_index: true,
1177            })
1178            .expect("add collection");
1179        engine
1180            .update(UpdateOptions {
1181                space: Some("work".to_string()),
1182                collections: vec!["api".to_string()],
1183                no_embed: true,
1184                dry_run: false,
1185                verbose: false,
1186            })
1187            .expect("index collection");
1188
1189        engine
1190    }
1191
1192    fn hold_global_lock(engine: &Engine) -> std::fs::File {
1193        std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1194        let holder = OpenOptions::new()
1195            .read(true)
1196            .write(true)
1197            .create(true)
1198            .truncate(false)
1199            .open(engine.config().cache_dir.join("kbolt.lock"))
1200            .expect("open lock file");
1201        FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1202        holder
1203    }
1204}