Skip to main content

kbolt_core/
engine.rs

1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::canonical::build_canonical_document;
10use crate::ingest::chunk::{
11    chunk_canonical_document, chunk_canonical_document_with_counter, resolve_policy,
12};
13use crate::ingest::extract::default_registry;
14use crate::lock::{LockMode, OperationLock};
15use crate::models;
16use crate::retrieval_context::{EmbeddingDocumentInput, DENSE_DOCUMENT_RENDER_IDENTITY};
17use crate::storage::Storage;
18use crate::storage::{
19    ChunkInsert, ChunkRow, CollectionRow, DocumentGenerationReplace, DocumentRow, DocumentTextRow,
20    SpaceResolution, TantivyEntry,
21};
22use crate::Result;
23use kbolt_types::{
24    ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
25    CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
26    InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
27    OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
28    SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
29    SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
30};
31mod eval_ops;
32mod file_utils;
33pub(crate) mod ignore_helpers;
34mod ignore_ops;
35mod path_utils;
36mod schedule_ops;
37mod schedule_run_ops;
38mod schedule_status_ops;
39mod scoring;
40mod search_ops;
41mod text_helpers;
42mod update_ops;
43use file_utils::{file_error, modified_token, sha256_hex};
44use ignore_helpers::{
45    collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_file,
46    load_collection_ignore_matcher, validate_ignore_pattern,
47};
48use path_utils::{
49    collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
50    normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
51};
52use scoring::{dense_distance_to_score, max_option};
53#[cfg(test)]
54pub(crate) use text_helpers::retrieval_text_with_prefix;
55use text_helpers::search_text_with_loaded_canonical_neighbors;
56
57pub struct Engine {
58    storage: Storage,
59    config: Config,
60    embedder: Option<Arc<dyn models::Embedder>>,
61    embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
62    reranker: Option<Arc<dyn models::Reranker>>,
63    expander: Option<Arc<dyn models::Expander>>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct IgnoreListEntry {
68    pub space: String,
69    pub collection: String,
70    pub pattern_count: usize,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct UpdateTarget {
75    pub space: String,
76    pub collection: CollectionRow,
77}
78
79#[derive(Debug, Clone, Copy)]
80struct TargetScope<'a> {
81    space: Option<&'a str>,
82    collections: &'a [String],
83}
84
85#[derive(Debug, Clone)]
86struct SearchCollectionMeta {
87    space: String,
88    collection: String,
89}
90
91#[derive(Debug)]
92struct SearchTargetScope {
93    space: String,
94    collection_ids: Vec<i64>,
95    filtered: bool,
96    document_ids: Vec<i64>,
97    chunk_count: usize,
98    chunk_key_filter: Mutex<Option<Arc<HashSet<u64>>>>,
99    bm25_reloaded: Mutex<bool>,
100}
101
102#[derive(Debug, Clone)]
103struct SearchHitCandidate {
104    chunk_id: i64,
105    bm25_score: f32,
106}
107
108#[derive(Debug, Clone)]
109struct RankedChunk {
110    chunk_id: i64,
111    score: f32,
112    fusion: f32,
113    reranker: Option<f32>,
114    bm25: Option<f32>,
115    dense: Option<f32>,
116    original_rank: Option<usize>,
117}
118
119impl Engine {
120    pub fn new(config_path: Option<&Path>) -> Result<Self> {
121        Self::new_with_recovery_notice(config_path, None)
122    }
123
124    pub fn new_with_recovery_notice(
125        config_path: Option<&Path>,
126        recovery_notice: Option<crate::RecoveryNoticeSink>,
127    ) -> Result<Self> {
128        let config = config::load(config_path)?;
129        let storage = Storage::new(&config.cache_dir)?;
130        let built_models =
131            models::build_inference_clients_with_recovery_notice(&config, recovery_notice)?;
132        Ok(Self {
133            storage,
134            config,
135            embedder: built_models.embedder,
136            embedding_document_sizer: built_models.embedding_document_sizer,
137            reranker: built_models.reranker,
138            expander: built_models.expander,
139        })
140    }
141
142    #[cfg(test)]
143    pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
144        Self::from_parts_with_models(storage, config, None, None, None)
145    }
146
147    #[cfg(test)]
148    pub(crate) fn from_parts_with_embedder(
149        storage: Storage,
150        config: Config,
151        embedder: Option<Arc<dyn models::Embedder>>,
152    ) -> Self {
153        Self::from_parts_with_inference(storage, config, embedder, None, None, None)
154    }
155
156    #[cfg(test)]
157    pub(crate) fn from_parts_with_embedding_runtime(
158        storage: Storage,
159        config: Config,
160        embedder: Option<Arc<dyn models::Embedder>>,
161        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
162    ) -> Self {
163        Self::from_parts_with_inference(
164            storage,
165            config,
166            embedder,
167            embedding_document_sizer,
168            None,
169            None,
170        )
171    }
172
173    #[cfg(test)]
174    pub(crate) fn from_parts_with_models(
175        storage: Storage,
176        config: Config,
177        embedder: Option<Arc<dyn models::Embedder>>,
178        reranker: Option<Arc<dyn models::Reranker>>,
179        expander: Option<Arc<dyn models::Expander>>,
180    ) -> Self {
181        Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
182    }
183
184    #[cfg(test)]
185    fn from_parts_with_inference(
186        storage: Storage,
187        config: Config,
188        embedder: Option<Arc<dyn models::Embedder>>,
189        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
190        reranker: Option<Arc<dyn models::Reranker>>,
191        expander: Option<Arc<dyn models::Expander>>,
192    ) -> Self {
193        let built_models =
194            models::build_inference_clients(&config).expect("build inference models");
195        Self {
196            storage,
197            config,
198            embedder: embedder.or(built_models.embedder),
199            embedding_document_sizer: embedding_document_sizer
200                .or(built_models.embedding_document_sizer),
201            reranker: reranker.or(built_models.reranker),
202            expander: expander.or(built_models.expander),
203        }
204    }
205
206    pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
207        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
208        self.storage.create_space(name, description)?;
209        let space = self.storage.get_space(name)?;
210        self.build_space_info(&space)
211    }
212
213    pub fn remove_space(&mut self, name: &str) -> Result<()> {
214        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
215        self.storage.delete_space(name)?;
216        if self.config.default_space.as_deref() == Some(name) {
217            self.persist_default_space(None)?;
218        }
219        Ok(())
220    }
221
222    pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
223        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
224        self.storage.rename_space(old, new)?;
225        if self.config.default_space.as_deref() == Some(old) {
226            self.persist_default_space(Some(new.to_string()))?;
227        }
228        Ok(())
229    }
230
231    pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
232        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
233        self.storage.update_space_description(name, description)
234    }
235
236    pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
237        let spaces = self.storage.list_spaces()?;
238        let mut infos = Vec::with_capacity(spaces.len());
239        for space in spaces {
240            infos.push(self.build_space_info(&space)?);
241        }
242        Ok(infos)
243    }
244
245    pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
246        let space = self.storage.get_space(name)?;
247        self.build_space_info(&space)
248    }
249
250    pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
251        if let Some(space_name) = name {
252            self.storage.get_space(space_name)?;
253        }
254
255        self.persist_default_space(name.map(ToString::to_string))?;
256        Ok(self.config.default_space.clone())
257    }
258
259    fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
260        let previous = self.config.default_space.clone();
261        self.config.default_space = default_space;
262        if let Err(err) = config::save(&self.config) {
263            self.config.default_space = previous;
264            return Err(err);
265        }
266        Ok(())
267    }
268
269    pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
270        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
271        let AddCollectionRequest {
272            path,
273            space: requested_space,
274            name: requested_name,
275            description,
276            extensions,
277            no_index,
278        } = req;
279
280        let space = match requested_space.as_deref() {
281            Some(space_name) => match self.storage.get_space(space_name) {
282                Ok(space) => space,
283                Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
284                    self.storage.create_space(space_name, None)?;
285                    self.storage.get_space(space_name)?
286                }
287                Err(err) => return Err(err),
288            },
289            None => self.resolve_space_row(None, None)?,
290        };
291        if !path.is_absolute() || !path.is_dir() {
292            return Err(KboltError::InvalidPath(path).into());
293        }
294
295        let name = match requested_name {
296            Some(name) => name,
297            None => path
298                .file_name()
299                .and_then(|name| name.to_str())
300                .map(ToString::to_string)
301                .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
302        };
303
304        self.storage.create_collection(
305            space.id,
306            &name,
307            &path,
308            description.as_deref(),
309            extensions.as_deref(),
310        )?;
311
312        let initial_indexing = if no_index {
313            InitialIndexingOutcome::Skipped
314        } else {
315            match self.update_unlocked(UpdateOptions {
316                space: Some(space.name.clone()),
317                collections: vec![name.clone()],
318                no_embed: false,
319                dry_run: false,
320                verbose: false,
321            }) {
322                Ok(report) => InitialIndexingOutcome::Indexed(report),
323                Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
324                    InitialIndexingOutcome::Blocked(
325                        InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
326                    )
327                }
328                Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
329                    InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
330                        name,
331                    })
332                }
333                Err(err) => return Err(err),
334            }
335        };
336
337        let collection = self.storage.get_collection(space.id, &name)?;
338        Ok(AddCollectionResult {
339            collection: self.build_collection_info(&space.name, &collection)?,
340            initial_indexing,
341        })
342    }
343
344    pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
345        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
346        let resolved = self.resolve_space_row(space, Some(name))?;
347        let collection = self.storage.get_collection(resolved.id, name)?;
348        let documents = self.storage.list_documents(collection.id, false)?;
349        let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
350        let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
351        self.purge_space_chunks(&resolved.name, &chunk_ids)?;
352        self.storage.delete_collection(resolved.id, name)?;
353
354        let ignore_path =
355            collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
356        if ignore_path.is_file() {
357            std::fs::remove_file(ignore_path)?;
358        }
359
360        Ok(())
361    }
362
363    pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
364        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
365        let resolved = self.resolve_space_row(space, Some(old))?;
366        let old_ignore_path =
367            collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
368        let new_ignore_path =
369            collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
370        if old_ignore_path.is_file() && new_ignore_path.exists() {
371            return Err(KboltError::Internal(format!(
372                "cannot rename ignore file: destination already exists: {}",
373                new_ignore_path.display()
374            ))
375            .into());
376        }
377
378        self.storage.rename_collection(resolved.id, old, new)?;
379
380        if old_ignore_path.is_file() {
381            if let Some(parent) = new_ignore_path.parent() {
382                std::fs::create_dir_all(parent)?;
383            }
384            if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
385                match self.storage.rename_collection(resolved.id, new, old) {
386                    Ok(()) => {
387                        return Err(KboltError::Internal(format!(
388                            "renamed collection was rolled back after ignore rename failure: {rename_err}"
389                        ))
390                        .into())
391                    }
392                    Err(rollback_err) => {
393                        return Err(KboltError::Internal(format!(
394                            "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
395                        ))
396                        .into())
397                    }
398                }
399            }
400        }
401
402        Ok(())
403    }
404
405    pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
406        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
407        let resolved = self.resolve_space_row(space, Some(name))?;
408        self.storage
409            .update_collection_description(resolved.id, name, desc)
410    }
411
412    pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
413        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
414            let resolved = self.resolve_space_row(Some(space_name), None)?;
415            let mut map = std::collections::HashMap::new();
416            map.insert(resolved.id, resolved.name.clone());
417            (Some(resolved.id), map)
418        } else {
419            let spaces = self.storage.list_spaces()?;
420            let map = spaces
421                .into_iter()
422                .map(|space| (space.id, space.name))
423                .collect::<std::collections::HashMap<_, _>>();
424            (None, map)
425        };
426
427        let collections = self.storage.list_collections(space_id_filter)?;
428        let mut infos = Vec::with_capacity(collections.len());
429        for collection in collections {
430            let space_name = spaces_by_id
431                .get(&collection.space_id)
432                .ok_or_else(|| {
433                    KboltError::Internal(format!(
434                        "missing space mapping for collection '{}'",
435                        collection.name
436                    ))
437                })?
438                .clone();
439            infos.push(self.build_collection_info(&space_name, &collection)?);
440        }
441        Ok(infos)
442    }
443
444    pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
445        let resolved = self.resolve_space_row(space, Some(name))?;
446        let collection = self.storage.get_collection(resolved.id, name)?;
447        self.build_collection_info(&resolved.name, &collection)
448    }
449
450    pub fn list_files(
451        &self,
452        space: Option<&str>,
453        collection: &str,
454        prefix: Option<&str>,
455    ) -> Result<Vec<FileEntry>> {
456        let resolved_space = self.resolve_space_row(space, Some(collection))?;
457        let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
458        let normalized_prefix = normalize_list_prefix(prefix)?;
459        let file_rows = self
460            .storage
461            .list_collection_file_rows(collection_row.id, false)?;
462
463        let mut files = Vec::with_capacity(file_rows.len());
464        for file_row in file_rows {
465            if let Some(prefix) = normalized_prefix.as_deref() {
466                if !path_matches_prefix(&file_row.path, prefix) {
467                    continue;
468                }
469            }
470
471            let title =
472                crate::engine::file_utils::display_title(file_row.title.as_deref(), &file_row.path);
473            files.push(FileEntry {
474                path: file_row.path,
475                title,
476                docid: short_docid(&file_row.hash),
477                active: file_row.active,
478                chunk_count: file_row.chunk_count,
479                embedded: file_row.chunk_count > 0
480                    && file_row.embedded_chunk_count >= file_row.chunk_count,
481            });
482        }
483
484        Ok(files)
485    }
486
487    pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
488        self.get_document_unlocked(req)
489    }
490
491    fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
492        let GetRequest {
493            locator,
494            space,
495            offset,
496            limit,
497        } = req;
498
499        let (document, collection_row, space_name) = match locator {
500            Locator::Path(locator_path) => {
501                let (collection_name, relative_path) = split_collection_path(&locator_path)?;
502                let resolved_space =
503                    self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
504                let collection = self
505                    .storage
506                    .get_collection(resolved_space.id, &collection_name)?;
507                let document = self
508                    .storage
509                    .get_document_by_path(collection.id, &relative_path)?
510                    .ok_or_else(|| KboltError::DocumentNotFound {
511                        path: locator_path.clone(),
512                    })?;
513                (document, collection, resolved_space.name)
514            }
515            Locator::DocId(docid) => {
516                let prefix = normalize_docid(&docid)?;
517                let mut candidates = self
518                    .storage
519                    .get_document_by_hash_prefix(&prefix)?
520                    .into_iter()
521                    .map(|document| {
522                        let collection =
523                            self.storage.get_collection_by_id(document.collection_id)?;
524                        Ok((document, collection))
525                    })
526                    .collect::<Result<Vec<_>>>()?;
527
528                if let Some(space_name) = space.as_deref() {
529                    let resolved_space = self.resolve_space_row(Some(space_name), None)?;
530                    candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
531                }
532
533                if candidates.is_empty() {
534                    return Err(KboltError::DocumentNotFound {
535                        path: format!("#{prefix}"),
536                    }
537                    .into());
538                }
539
540                if candidates.len() > 1 {
541                    return Err(KboltError::InvalidInput(
542                        "docid is ambiguous; provide more characters".to_string(),
543                    )
544                    .into());
545                }
546
547                let (document, collection) = candidates.pop().expect("candidate exists");
548                let space = self.storage.get_space_by_id(collection.space_id)?;
549                (document, collection, space.name)
550            }
551        };
552
553        let document_text = self.storage.get_document_text(document.id)?;
554        let full_path = collection_row.path.join(&document.path);
555        let stale = source_is_stale(&full_path, document.hash.as_str())?;
556
557        let lines = document_text.text.lines().collect::<Vec<_>>();
558        let total_lines = lines.len();
559        let start = offset.unwrap_or(0).min(total_lines);
560        let requested = limit.unwrap_or(total_lines.saturating_sub(start));
561        let end = start.saturating_add(requested).min(total_lines);
562        let returned_lines = end.saturating_sub(start);
563        let content = if returned_lines == 0 {
564            String::new()
565        } else {
566            lines[start..end].join("\n")
567        };
568
569        Ok(DocumentResponse {
570            docid: short_docid(&document.hash),
571            path: format!("{}/{}", collection_row.name, document.path),
572            title: crate::engine::file_utils::display_title(
573                document.title.as_deref(),
574                &document.path,
575            ),
576            space: space_name,
577            collection: collection_row.name,
578            content,
579            stale,
580            total_lines,
581            returned_lines,
582        })
583    }
584
585    pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
586        if req.max_files == 0 {
587            return Err(
588                KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
589            );
590        }
591        if req.max_bytes == 0 {
592            return Err(
593                KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
594            );
595        }
596
597        let mut documents = Vec::new();
598        let mut omitted = Vec::new();
599        let mut resolved_count = 0usize;
600        let mut warnings = Vec::new();
601        let mut consumed_bytes = 0usize;
602
603        for locator in req.locators {
604            let document = match self.get_document_unlocked(GetRequest {
605                locator,
606                space: req.space.clone(),
607                offset: None,
608                limit: None,
609            }) {
610                Ok(document) => document,
611                Err(err) => match KboltError::from(err) {
612                    KboltError::DocumentNotFound { path } => {
613                        warnings.push(format!("document not found: {path}"));
614                        continue;
615                    }
616                    KboltError::InvalidInput(message) => {
617                        warnings.push(format!("invalid locator: {message}"));
618                        continue;
619                    }
620                    KboltError::InvalidPath(path) => {
621                        warnings.push(format!("invalid locator path: {}", path.display()));
622                        continue;
623                    }
624                    KboltError::AmbiguousSpace { collection, spaces } => {
625                        warnings.push(format!(
626                            "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
627                            spaces
628                        ));
629                        continue;
630                    }
631                    KboltError::SpaceNotFound { name } => {
632                        warnings.push(format!("space not found for locator: {name}"));
633                        continue;
634                    }
635                    KboltError::CollectionNotFound { name } => {
636                        warnings.push(format!("collection not found for locator: {name}"));
637                        continue;
638                    }
639                    other => return Err(other.into()),
640                },
641            };
642            resolved_count = resolved_count.saturating_add(1);
643
644            let size_bytes = document.content.len();
645            if documents.len() >= req.max_files {
646                omitted.push(OmittedFile {
647                    path: document.path,
648                    docid: document.docid,
649                    size_bytes,
650                    reason: OmitReason::MaxFiles,
651                });
652                continue;
653            }
654
655            if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
656                omitted.push(OmittedFile {
657                    path: document.path,
658                    docid: document.docid,
659                    size_bytes,
660                    reason: OmitReason::MaxBytes,
661                });
662                continue;
663            }
664
665            consumed_bytes = consumed_bytes.saturating_add(size_bytes);
666            documents.push(document);
667        }
668
669        Ok(MultiGetResponse {
670            resolved_count,
671            documents,
672            omitted,
673            warnings,
674        })
675    }
676
677    pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
678        let _profile = crate::profile::SearchProfileGuard::start();
679        let _lock = crate::profile::timed_search_stage("operation_lock", || {
680            self.acquire_operation_lock(LockMode::Shared)
681        })?;
682        let started = Instant::now();
683        let query = req.query.trim();
684        if query.is_empty() {
685            return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
686        }
687
688        let requested_mode = req.mode.clone();
689        let rerank_enabled =
690            matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
691        let mut pipeline = self.initial_search_pipeline(&requested_mode);
692
693        let targets = crate::profile::timed_search_stage("resolve_targets", || {
694            self.resolve_targets(TargetScope {
695                space: req.space.as_deref(),
696                collections: &req.collections,
697            })
698        })?;
699        crate::profile::increment_search_count("target_collections", targets.len() as u64);
700
701        let staleness_hint = targets
702            .iter()
703            .map(|target| target.collection.updated.clone())
704            .max()
705            .map(|updated| format!("Index last updated: {updated}"));
706
707        if req.limit == 0 || targets.is_empty() {
708            return Ok(SearchResponse {
709                results: Vec::new(),
710                query: req.query,
711                requested_mode: requested_mode.clone(),
712                effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
713                pipeline,
714                staleness_hint,
715                elapsed_ms: started.elapsed().as_millis() as u64,
716            });
717        }
718
719        let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
720        for target in &targets {
721            collections_by_id.insert(
722                target.collection.id,
723                SearchCollectionMeta {
724                    space: target.space.clone(),
725                    collection: target.collection.name.clone(),
726                },
727            );
728        }
729
730        let target_scopes = crate::profile::timed_search_stage("search_target_scopes", || {
731            self.search_target_scopes(&targets, !req.collections.is_empty())
732        })?;
733        crate::profile::increment_search_count("target_scopes", target_scopes.len() as u64);
734        for scope in &target_scopes {
735            crate::profile::increment_search_count("scope_chunks", scope.chunk_count as u64);
736            if scope.filtered {
737                crate::profile::increment_search_count("filtered_scopes", 1);
738                crate::profile::increment_search_count(
739                    "scope_document_filters",
740                    scope.document_ids.len() as u64,
741                );
742            }
743        }
744        let max_candidates = crate::profile::timed_search_stage("max_search_candidates", || {
745            self.max_search_candidates(&target_scopes)
746        });
747        let mut retrieval_limit =
748            self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
749        let final_candidates = loop {
750            crate::profile::increment_search_count("retrieval_iterations", 1);
751            let ranked_chunks = crate::profile::timed_search_stage("rank_chunks", || {
752                self.rank_chunks_for_mode(
753                    &requested_mode,
754                    &target_scopes,
755                    query,
756                    retrieval_limit,
757                    req.min_score,
758                    &mut pipeline,
759                )
760            })?;
761            crate::profile::increment_search_count("ranked_chunks", ranked_chunks.len() as u64);
762
763            if ranked_chunks.is_empty() {
764                return Ok(SearchResponse {
765                    results: Vec::new(),
766                    query: req.query,
767                    requested_mode: requested_mode.clone(),
768                    effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
769                    pipeline,
770                    staleness_hint,
771                    elapsed_ms: started.elapsed().as_millis() as u64,
772                });
773            }
774
775            let ranked_len = ranked_chunks.len();
776            let candidates = crate::profile::timed_search_stage("assemble_candidates", || {
777                self.search_candidates_from_ranked_chunks(ranked_chunks, &collections_by_id)
778            })?;
779            crate::profile::increment_search_count("assembled_candidates", candidates.len() as u64);
780
781            if candidates.len() >= req.limit
782                || ranked_len < retrieval_limit
783                || retrieval_limit >= max_candidates
784            {
785                break candidates;
786            }
787
788            let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
789            if next_limit <= retrieval_limit {
790                break candidates;
791            }
792            retrieval_limit = next_limit;
793        };
794        let results = crate::profile::timed_search_stage("assemble_results", || {
795            self.assemble_search_candidates(
796                query,
797                &requested_mode,
798                final_candidates,
799                req.debug,
800                rerank_enabled,
801                &mut pipeline,
802                req.limit,
803            )
804        })?;
805        crate::profile::increment_search_count("assembled_results", results.len() as u64);
806
807        Ok(SearchResponse {
808            results,
809            query: req.query,
810            requested_mode: requested_mode.clone(),
811            effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
812            pipeline,
813            staleness_hint,
814            elapsed_ms: started.elapsed().as_millis() as u64,
815        })
816    }
817
818    pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
819        let resolved = self.resolve_space_row(explicit, None)?;
820        Ok(resolved.name)
821    }
822
823    pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
824        let resolved = self.resolve_preferred_space(explicit)?;
825        Ok(resolved.map(|(space, source)| ActiveSpace {
826            name: space.name,
827            source,
828        }))
829    }
830
831    pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
832        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
833        let (spaces, totals_scope) = if let Some(space_name) = space {
834            let resolved = self.resolve_space_row(Some(space_name), None)?;
835            (vec![resolved.clone()], Some(resolved.id))
836        } else {
837            (self.storage.list_spaces()?, None)
838        };
839
840        let mut space_statuses = Vec::with_capacity(spaces.len());
841        for space_row in spaces {
842            let collections = self.storage.list_collections(Some(space_row.id))?;
843            let mut collection_statuses = Vec::with_capacity(collections.len());
844            let mut last_updated: Option<String> = None;
845
846            for collection in collections {
847                if last_updated
848                    .as_ref()
849                    .map(|existing| collection.updated > *existing)
850                    .unwrap_or(true)
851                {
852                    last_updated = Some(collection.updated.clone());
853                }
854
855                let documents = self
856                    .storage
857                    .count_documents_in_collection(collection.id, false)?;
858                let active_documents = self
859                    .storage
860                    .count_documents_in_collection(collection.id, true)?;
861                let chunks = self.storage.count_chunks_in_collection(collection.id)?;
862                let embedded_chunks = self
863                    .storage
864                    .count_embedded_chunks_in_collection(collection.id)?;
865
866                collection_statuses.push(CollectionStatus {
867                    name: collection.name,
868                    path: collection.path,
869                    documents,
870                    active_documents,
871                    chunks,
872                    embedded_chunks,
873                    last_updated: collection.updated,
874                });
875            }
876
877            space_statuses.push(SpaceStatus {
878                name: space_row.name,
879                description: space_row.description,
880                collections: collection_statuses,
881                last_updated,
882            });
883        }
884
885        let models = self.model_status_unlocked()?;
886
887        Ok(StatusResponse {
888            spaces: space_statuses,
889            models,
890            cache_dir: self.config.cache_dir.clone(),
891            config_dir: self.config.config_dir.clone(),
892            total_documents: self.storage.count_documents(totals_scope)?,
893            total_chunks: self.storage.count_chunks(totals_scope)?,
894            total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
895            disk_usage: self.storage.disk_usage()?,
896        })
897    }
898
899    pub fn model_status(&self) -> Result<ModelStatus> {
900        self.model_status_unlocked()
901    }
902
903    fn model_status_unlocked(&self) -> Result<ModelStatus> {
904        models::status(&self.config)
905    }
906
907    pub fn config(&self) -> &Config {
908        &self.config
909    }
910
911    pub fn storage(&self) -> &Storage {
912        &self.storage
913    }
914
915    fn embedding_model_name(&self) -> &str {
916        self.config
917            .roles
918            .embedder
919            .as_ref()
920            .and_then(|role| self.config.providers.get(&role.provider))
921            .map(|profile| profile.model())
922            .unwrap_or("embedder")
923    }
924
925    fn embedding_model_key(&self) -> String {
926        let formatter = self.embedding_input_formatter();
927        format!(
928            "{}|{}|{}",
929            self.embedding_model_name(),
930            DENSE_DOCUMENT_RENDER_IDENTITY,
931            formatter.format_identity()
932        )
933    }
934
935    fn embedding_input_formatter(&self) -> models::EmbeddingInputFormatter {
936        models::embedding_input_formatter_for_config(&self.config)
937    }
938
939    fn format_embedding_query(&self, query: &str) -> String {
940        self.embedding_input_formatter()
941            .format_query(query)
942            .into_owned()
943    }
944
945    fn format_embedding_queries(&self, queries: &[String]) -> Vec<String> {
946        let formatter = self.embedding_input_formatter();
947        queries
948            .iter()
949            .map(|query| formatter.format_query(query).into_owned())
950            .collect()
951    }
952
953    fn format_embedding_document(&self, input: &EmbeddingDocumentInput<'_>) -> String {
954        self.embedding_input_formatter().format_document(input)
955    }
956
957    fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
958        OperationLock::acquire(&self.config.cache_dir, mode)
959    }
960
961    fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
962        let mut chunk_ids = Vec::new();
963        for doc_id in doc_ids {
964            let chunks = self.storage.get_chunks_for_document(*doc_id)?;
965            chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
966        }
967        Ok(chunk_ids)
968    }
969
970    fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
971        if chunk_ids.is_empty() {
972            return Ok(());
973        }
974
975        self.storage.delete_tantivy(space, chunk_ids)?;
976        self.storage.commit_tantivy(space)?;
977        self.storage.delete_usearch(space, chunk_ids)?;
978        Ok(())
979    }
980
981    fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
982        let collection_count = self.storage.list_collections(Some(space.id))?.len();
983        let document_count = self.storage.count_documents(Some(space.id))?;
984        let chunk_count = self.storage.count_chunks(Some(space.id))?;
985
986        Ok(SpaceInfo {
987            name: space.name.clone(),
988            description: space.description.clone(),
989            collection_count,
990            document_count,
991            chunk_count,
992            created: space.created.clone(),
993        })
994    }
995
996    fn build_collection_info(
997        &self,
998        space_name: &str,
999        collection: &CollectionRow,
1000    ) -> Result<CollectionInfo> {
1001        let document_count = self
1002            .storage
1003            .count_documents_in_collection(collection.id, false)?;
1004        let active_document_count = self
1005            .storage
1006            .count_documents_in_collection(collection.id, true)?;
1007        let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
1008        let embedded_chunk_count = self
1009            .storage
1010            .count_embedded_chunks_in_collection(collection.id)?;
1011
1012        Ok(CollectionInfo {
1013            name: collection.name.clone(),
1014            space: space_name.to_string(),
1015            path: collection.path.clone(),
1016            description: collection.description.clone(),
1017            extensions: collection.extensions.clone(),
1018            document_count,
1019            active_document_count,
1020            chunk_count,
1021            embedded_chunk_count,
1022            created: collection.created.clone(),
1023            updated: collection.updated.clone(),
1024        })
1025    }
1026
1027    fn resolve_space_row(
1028        &self,
1029        explicit: Option<&str>,
1030        collection_for_lookup: Option<&str>,
1031    ) -> Result<crate::storage::SpaceRow> {
1032        if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
1033            return Ok(space);
1034        }
1035
1036        if let Some(collection) = collection_for_lookup {
1037            return match self.storage.find_space_for_collection(collection)? {
1038                SpaceResolution::Found(space) => Ok(space),
1039                SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
1040                    collection: collection.to_string(),
1041                    spaces,
1042                }
1043                .into()),
1044                SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
1045                    name: collection.to_string(),
1046                }
1047                .into()),
1048            };
1049        }
1050
1051        Err(KboltError::NoActiveSpace.into())
1052    }
1053
1054    fn resolve_preferred_space(
1055        &self,
1056        explicit: Option<&str>,
1057    ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
1058        if let Some(space_name) = explicit {
1059            let space = self.storage.get_space(space_name)?;
1060            return Ok(Some((space, ActiveSpaceSource::Flag)));
1061        }
1062
1063        if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
1064            let trimmed = space_name.trim();
1065            if !trimmed.is_empty() {
1066                let space = self.storage.get_space(trimmed)?;
1067                return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1068            }
1069        }
1070
1071        if let Some(space_name) = self.config.default_space.as_deref() {
1072            let space = self.storage.get_space(space_name)?;
1073            return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1074        }
1075
1076        Ok(None)
1077    }
1078}
1079
1080fn source_is_stale(path: &Path, indexed_hash: &str) -> Result<bool> {
1081    match std::fs::read(path) {
1082        Ok(bytes) => Ok(sha256_hex(&bytes) != indexed_hash),
1083        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(true),
1084        Err(err) => Err(std::io::Error::new(
1085            err.kind(),
1086            format!(
1087                "failed to read indexed source '{}' for stale check: {err}",
1088                path.display()
1089            ),
1090        )
1091        .into()),
1092    }
1093}
1094
1095#[cfg(test)]
1096mod tests;
1097
1098#[cfg(test)]
1099mod lock_relief_tests {
1100    use fs2::FileExt;
1101    use std::collections::HashMap;
1102    use std::fs::OpenOptions;
1103    use std::mem;
1104
1105    use tempfile::tempdir;
1106
1107    use super::Engine;
1108    use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
1109    use crate::storage::Storage;
1110    use kbolt_types::{AddCollectionRequest, GetRequest, Locator, MultiGetRequest, UpdateOptions};
1111
1112    #[test]
1113    fn metadata_reads_succeed_while_global_lock_is_held() {
1114        let engine = test_engine_with_indexed_collection();
1115        let _holder = hold_global_lock(&engine);
1116
1117        let spaces = engine.list_spaces().expect("list spaces");
1118        assert_eq!(
1119            spaces
1120                .iter()
1121                .map(|space| space.name.as_str())
1122                .collect::<Vec<_>>(),
1123            vec!["default", "work"]
1124        );
1125
1126        let space = engine.space_info("work").expect("load space info");
1127        assert_eq!(space.name, "work");
1128        assert_eq!(space.collection_count, 1);
1129
1130        let collections = engine
1131            .list_collections(Some("work"))
1132            .expect("list collections");
1133        assert_eq!(collections.len(), 1);
1134        assert_eq!(collections[0].name, "api");
1135
1136        let collection = engine
1137            .collection_info(Some("work"), "api")
1138            .expect("load collection info");
1139        assert_eq!(collection.name, "api");
1140        assert_eq!(collection.document_count, 1);
1141
1142        let models = engine.model_status().expect("read model status");
1143        assert!(!models.embedder.configured);
1144        assert!(!models.reranker.configured);
1145        assert!(!models.expander.configured);
1146    }
1147
1148    #[test]
1149    fn document_reads_succeed_while_global_lock_is_held() {
1150        let engine = test_engine_with_indexed_collection();
1151        let _holder = hold_global_lock(&engine);
1152
1153        let files = engine
1154            .list_files(Some("work"), "api", None)
1155            .expect("list files");
1156        assert_eq!(files.len(), 1);
1157        assert_eq!(files[0].path, "guide.md");
1158
1159        let document = engine
1160            .get_document(GetRequest {
1161                locator: Locator::Path("api/guide.md".to_string()),
1162                space: Some("work".to_string()),
1163                offset: None,
1164                limit: None,
1165            })
1166            .expect("read document");
1167        assert_eq!(document.path, "api/guide.md");
1168        assert!(document.content.contains("hello from kbolt"));
1169
1170        let response = engine
1171            .multi_get(MultiGetRequest {
1172                locators: vec![Locator::Path("api/guide.md".to_string())],
1173                space: Some("work".to_string()),
1174                max_files: 4,
1175                max_bytes: 4_096,
1176            })
1177            .expect("read multiple documents");
1178        assert_eq!(response.resolved_count, 1);
1179        assert_eq!(response.documents.len(), 1);
1180        assert!(response.warnings.is_empty());
1181    }
1182
1183    fn test_engine_with_indexed_collection() -> Engine {
1184        let root = tempdir().expect("create temp root");
1185        let root_path = root.path().to_path_buf();
1186        mem::forget(root);
1187
1188        let config_dir = root_path.join("config");
1189        let cache_dir = root_path.join("cache");
1190        let docs_dir = root_path.join("docs");
1191        std::fs::create_dir_all(&docs_dir).expect("create docs dir");
1192        std::fs::write(docs_dir.join("guide.md"), "# Hello\n\nhello from kbolt\n")
1193            .expect("write document");
1194
1195        let storage = Storage::new(&cache_dir).expect("create storage");
1196        let config = Config {
1197            config_dir,
1198            cache_dir,
1199            default_space: None,
1200            providers: HashMap::new(),
1201            roles: crate::config::RoleBindingsConfig::default(),
1202            reaping: ReapingConfig { days: 7 },
1203            chunking: ChunkingConfig::default(),
1204            ranking: RankingConfig::default(),
1205        };
1206        let engine = Engine::from_parts(storage, config);
1207
1208        engine
1209            .add_collection(AddCollectionRequest {
1210                path: docs_dir,
1211                space: Some("work".to_string()),
1212                name: Some("api".to_string()),
1213                description: None,
1214                extensions: None,
1215                no_index: true,
1216            })
1217            .expect("add collection");
1218        engine
1219            .update(UpdateOptions {
1220                space: Some("work".to_string()),
1221                collections: vec!["api".to_string()],
1222                no_embed: true,
1223                dry_run: false,
1224                verbose: false,
1225            })
1226            .expect("index collection");
1227
1228        engine
1229    }
1230
1231    fn hold_global_lock(engine: &Engine) -> std::fs::File {
1232        std::fs::create_dir_all(&engine.config().cache_dir).expect("create cache dir");
1233        let holder = OpenOptions::new()
1234            .read(true)
1235            .write(true)
1236            .create(true)
1237            .truncate(false)
1238            .open(engine.config().cache_dir.join("kbolt.lock"))
1239            .expect("open lock file");
1240        FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
1241        holder
1242    }
1243}