Skip to main content

kbolt_core/
engine.rs

1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Instant;
5
6use crate::config;
7use crate::config::Config;
8use crate::error::CoreError;
9use crate::ingest::chunk::{chunk_document, chunk_document_with_counter, resolve_policy};
10use crate::ingest::extract::default_registry;
11use crate::lock::{LockMode, OperationLock};
12use crate::models;
13use crate::storage::Storage;
14use crate::storage::{
15    ChunkInsert, ChunkRow, CollectionRow, DocumentRow, DocumentTitleSource, SpaceResolution,
16    TantivyEntry,
17};
18use crate::Result;
19use kbolt_types::{
20    ActiveSpace, ActiveSpaceSource, AddCollectionRequest, AddCollectionResult, CollectionInfo,
21    CollectionStatus, DocumentResponse, FileEntry, GetRequest, InitialIndexingBlock,
22    InitialIndexingOutcome, KboltError, Locator, ModelStatus, MultiGetRequest, MultiGetResponse,
23    OmitReason, OmittedFile, SearchMode, SearchPipeline, SearchPipelineNotice, SearchPipelineStep,
24    SearchPipelineUnavailableReason, SearchRequest, SearchResponse, SearchResult, SearchSignals,
25    SpaceInfo, SpaceStatus, StatusResponse, UpdateOptions, UpdateReport,
26};
27use walkdir::WalkDir;
28
29mod eval_ops;
30mod file_utils;
31mod ignore_helpers;
32mod ignore_ops;
33mod path_utils;
34mod schedule_ops;
35mod schedule_run_ops;
36mod schedule_status_ops;
37mod scoring;
38mod search_ops;
39mod text_helpers;
40mod update_ops;
41use file_utils::{file_error, file_title, modified_token, sha256_hex};
42use ignore_helpers::{
43    collection_ignore_file_path, count_ignore_patterns, is_hard_ignored_dir_name,
44    is_hard_ignored_file, load_collection_ignore_matcher, validate_ignore_pattern,
45};
46use path_utils::{
47    collection_relative_path, extension_allowed, normalize_docid, normalize_list_prefix,
48    normalized_extension_filter, path_matches_prefix, short_docid, split_collection_path,
49};
50use scoring::{dense_distance_to_score, max_option};
51pub(crate) use text_helpers::retrieval_text_with_prefix;
52use text_helpers::{chunk_text_from_bytes, search_text_with_neighbors};
53
54pub struct Engine {
55    storage: Storage,
56    config: Config,
57    embedder: Option<Arc<dyn models::Embedder>>,
58    embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
59    reranker: Option<Arc<dyn models::Reranker>>,
60    expander: Option<Arc<dyn models::Expander>>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct IgnoreListEntry {
65    pub space: String,
66    pub collection: String,
67    pub pattern_count: usize,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct UpdateTarget {
72    pub space: String,
73    pub collection: CollectionRow,
74}
75
76#[derive(Debug, Clone, Copy)]
77struct TargetScope<'a> {
78    space: Option<&'a str>,
79    collections: &'a [String],
80}
81
82#[derive(Debug, Clone)]
83struct SearchCollectionMeta {
84    space: String,
85    collection: String,
86    path: std::path::PathBuf,
87}
88
89#[derive(Debug, Clone)]
90struct SearchHitCandidate {
91    chunk_id: i64,
92    bm25_score: f32,
93}
94
95#[derive(Debug, Clone)]
96struct RankedChunk {
97    chunk_id: i64,
98    score: f32,
99    fusion: f32,
100    reranker: Option<f32>,
101    bm25: Option<f32>,
102    dense: Option<f32>,
103    original_rank: Option<usize>,
104}
105
106impl Engine {
107    pub fn new(config_path: Option<&Path>) -> Result<Self> {
108        let config = config::load(config_path)?;
109        let storage = Storage::new(&config.cache_dir)?;
110        let built_models = models::build_inference_clients(&config)?;
111        Ok(Self {
112            storage,
113            config,
114            embedder: built_models.embedder,
115            embedding_document_sizer: built_models.embedding_document_sizer,
116            reranker: built_models.reranker,
117            expander: built_models.expander,
118        })
119    }
120
121    #[cfg(test)]
122    pub(crate) fn from_parts(storage: Storage, config: Config) -> Self {
123        Self::from_parts_with_models(storage, config, None, None, None)
124    }
125
126    #[cfg(test)]
127    pub(crate) fn from_parts_with_embedder(
128        storage: Storage,
129        config: Config,
130        embedder: Option<Arc<dyn models::Embedder>>,
131    ) -> Self {
132        Self::from_parts_with_inference(storage, config, embedder, None, None, None)
133    }
134
135    #[cfg(test)]
136    pub(crate) fn from_parts_with_embedding_runtime(
137        storage: Storage,
138        config: Config,
139        embedder: Option<Arc<dyn models::Embedder>>,
140        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
141    ) -> Self {
142        Self::from_parts_with_inference(
143            storage,
144            config,
145            embedder,
146            embedding_document_sizer,
147            None,
148            None,
149        )
150    }
151
152    #[cfg(test)]
153    pub(crate) fn from_parts_with_models(
154        storage: Storage,
155        config: Config,
156        embedder: Option<Arc<dyn models::Embedder>>,
157        reranker: Option<Arc<dyn models::Reranker>>,
158        expander: Option<Arc<dyn models::Expander>>,
159    ) -> Self {
160        Self::from_parts_with_inference(storage, config, embedder, None, reranker, expander)
161    }
162
163    #[cfg(test)]
164    fn from_parts_with_inference(
165        storage: Storage,
166        config: Config,
167        embedder: Option<Arc<dyn models::Embedder>>,
168        embedding_document_sizer: Option<Arc<dyn models::EmbeddingDocumentSizer>>,
169        reranker: Option<Arc<dyn models::Reranker>>,
170        expander: Option<Arc<dyn models::Expander>>,
171    ) -> Self {
172        let built_models =
173            models::build_inference_clients(&config).expect("build inference models");
174        Self {
175            storage,
176            config,
177            embedder: embedder.or(built_models.embedder),
178            embedding_document_sizer: embedding_document_sizer
179                .or(built_models.embedding_document_sizer),
180            reranker: reranker.or(built_models.reranker),
181            expander: expander.or(built_models.expander),
182        }
183    }
184
185    pub fn add_space(&self, name: &str, description: Option<&str>) -> Result<SpaceInfo> {
186        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
187        self.storage.create_space(name, description)?;
188        let space = self.storage.get_space(name)?;
189        self.build_space_info(&space)
190    }
191
192    pub fn remove_space(&mut self, name: &str) -> Result<()> {
193        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
194        self.storage.delete_space(name)?;
195        if self.config.default_space.as_deref() == Some(name) {
196            self.persist_default_space(None)?;
197        }
198        Ok(())
199    }
200
201    pub fn rename_space(&mut self, old: &str, new: &str) -> Result<()> {
202        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
203        self.storage.rename_space(old, new)?;
204        if self.config.default_space.as_deref() == Some(old) {
205            self.persist_default_space(Some(new.to_string()))?;
206        }
207        Ok(())
208    }
209
210    pub fn describe_space(&self, name: &str, description: &str) -> Result<()> {
211        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
212        self.storage.update_space_description(name, description)
213    }
214
215    pub fn list_spaces(&self) -> Result<Vec<SpaceInfo>> {
216        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
217        let spaces = self.storage.list_spaces()?;
218        let mut infos = Vec::with_capacity(spaces.len());
219        for space in spaces {
220            infos.push(self.build_space_info(&space)?);
221        }
222        Ok(infos)
223    }
224
225    pub fn space_info(&self, name: &str) -> Result<SpaceInfo> {
226        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
227        let space = self.storage.get_space(name)?;
228        self.build_space_info(&space)
229    }
230
231    pub fn set_default_space(&mut self, name: Option<&str>) -> Result<Option<String>> {
232        if let Some(space_name) = name {
233            self.storage.get_space(space_name)?;
234        }
235
236        self.persist_default_space(name.map(ToString::to_string))?;
237        Ok(self.config.default_space.clone())
238    }
239
240    fn persist_default_space(&mut self, default_space: Option<String>) -> Result<()> {
241        let previous = self.config.default_space.clone();
242        self.config.default_space = default_space;
243        if let Err(err) = config::save(&self.config) {
244            self.config.default_space = previous;
245            return Err(err);
246        }
247        Ok(())
248    }
249
250    pub fn add_collection(&self, req: AddCollectionRequest) -> Result<AddCollectionResult> {
251        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
252        let AddCollectionRequest {
253            path,
254            space: requested_space,
255            name: requested_name,
256            description,
257            extensions,
258            no_index,
259        } = req;
260
261        let space = match requested_space.as_deref() {
262            Some(space_name) => match self.storage.get_space(space_name) {
263                Ok(space) => space,
264                Err(CoreError::Domain(KboltError::SpaceNotFound { .. })) => {
265                    self.storage.create_space(space_name, None)?;
266                    self.storage.get_space(space_name)?
267                }
268                Err(err) => return Err(err),
269            },
270            None => self.resolve_space_row(None, None)?,
271        };
272        if !path.is_absolute() || !path.is_dir() {
273            return Err(KboltError::InvalidPath(path).into());
274        }
275
276        let name = match requested_name {
277            Some(name) => name,
278            None => path
279                .file_name()
280                .and_then(|name| name.to_str())
281                .map(ToString::to_string)
282                .ok_or_else(|| KboltError::InvalidPath(path.clone()))?,
283        };
284
285        self.storage.create_collection(
286            space.id,
287            &name,
288            &path,
289            description.as_deref(),
290            extensions.as_deref(),
291        )?;
292
293        let initial_indexing = if no_index {
294            InitialIndexingOutcome::Skipped
295        } else {
296            match self.update_unlocked(UpdateOptions {
297                space: Some(space.name.clone()),
298                collections: vec![name.clone()],
299                no_embed: false,
300                dry_run: false,
301                verbose: false,
302            }) {
303                Ok(report) => InitialIndexingOutcome::Indexed(report),
304                Err(CoreError::Domain(KboltError::SpaceDenseRepairRequired { space, reason })) => {
305                    InitialIndexingOutcome::Blocked(
306                        InitialIndexingBlock::SpaceDenseRepairRequired { space, reason },
307                    )
308                }
309                Err(CoreError::Domain(KboltError::ModelNotAvailable { name })) => {
310                    InitialIndexingOutcome::Blocked(InitialIndexingBlock::ModelNotAvailable {
311                        name,
312                    })
313                }
314                Err(err) => return Err(err),
315            }
316        };
317
318        let collection = self.storage.get_collection(space.id, &name)?;
319        Ok(AddCollectionResult {
320            collection: self.build_collection_info(&space.name, &collection)?,
321            initial_indexing,
322        })
323    }
324
325    pub fn remove_collection(&self, space: Option<&str>, name: &str) -> Result<()> {
326        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
327        let resolved = self.resolve_space_row(space, Some(name))?;
328        let collection = self.storage.get_collection(resolved.id, name)?;
329        let documents = self.storage.list_documents(collection.id, false)?;
330        let doc_ids = documents.into_iter().map(|doc| doc.id).collect::<Vec<_>>();
331        let chunk_ids = self.collect_document_chunk_ids(&doc_ids)?;
332        self.purge_space_chunks(&resolved.name, &chunk_ids)?;
333        self.storage.delete_collection(resolved.id, name)?;
334
335        let ignore_path =
336            collection_ignore_file_path(&self.config.config_dir, &resolved.name, name);
337        if ignore_path.is_file() {
338            std::fs::remove_file(ignore_path)?;
339        }
340
341        Ok(())
342    }
343
344    pub fn rename_collection(&self, space: Option<&str>, old: &str, new: &str) -> Result<()> {
345        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
346        let resolved = self.resolve_space_row(space, Some(old))?;
347        let old_ignore_path =
348            collection_ignore_file_path(&self.config.config_dir, &resolved.name, old);
349        let new_ignore_path =
350            collection_ignore_file_path(&self.config.config_dir, &resolved.name, new);
351        if old_ignore_path.is_file() && new_ignore_path.exists() {
352            return Err(KboltError::Internal(format!(
353                "cannot rename ignore file: destination already exists: {}",
354                new_ignore_path.display()
355            ))
356            .into());
357        }
358
359        self.storage.rename_collection(resolved.id, old, new)?;
360
361        if old_ignore_path.is_file() {
362            if let Some(parent) = new_ignore_path.parent() {
363                std::fs::create_dir_all(parent)?;
364            }
365            if let Err(rename_err) = std::fs::rename(&old_ignore_path, &new_ignore_path) {
366                match self.storage.rename_collection(resolved.id, new, old) {
367                    Ok(()) => {
368                        return Err(KboltError::Internal(format!(
369                            "renamed collection was rolled back after ignore rename failure: {rename_err}"
370                        ))
371                        .into())
372                    }
373                    Err(rollback_err) => {
374                        return Err(KboltError::Internal(format!(
375                            "ignore rename failed: {rename_err}; rollback failed: {rollback_err}"
376                        ))
377                        .into())
378                    }
379                }
380            }
381        }
382
383        Ok(())
384    }
385
386    pub fn describe_collection(&self, space: Option<&str>, name: &str, desc: &str) -> Result<()> {
387        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
388        let resolved = self.resolve_space_row(space, Some(name))?;
389        self.storage
390            .update_collection_description(resolved.id, name, desc)
391    }
392
393    pub fn list_collections(&self, space: Option<&str>) -> Result<Vec<CollectionInfo>> {
394        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
395        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
396            let resolved = self.resolve_space_row(Some(space_name), None)?;
397            let mut map = std::collections::HashMap::new();
398            map.insert(resolved.id, resolved.name.clone());
399            (Some(resolved.id), map)
400        } else {
401            let spaces = self.storage.list_spaces()?;
402            let map = spaces
403                .into_iter()
404                .map(|space| (space.id, space.name))
405                .collect::<std::collections::HashMap<_, _>>();
406            (None, map)
407        };
408
409        let collections = self.storage.list_collections(space_id_filter)?;
410        let mut infos = Vec::with_capacity(collections.len());
411        for collection in collections {
412            let space_name = spaces_by_id
413                .get(&collection.space_id)
414                .ok_or_else(|| {
415                    KboltError::Internal(format!(
416                        "missing space mapping for collection '{}'",
417                        collection.name
418                    ))
419                })?
420                .clone();
421            infos.push(self.build_collection_info(&space_name, &collection)?);
422        }
423        Ok(infos)
424    }
425
426    pub fn collection_info(&self, space: Option<&str>, name: &str) -> Result<CollectionInfo> {
427        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
428        let resolved = self.resolve_space_row(space, Some(name))?;
429        let collection = self.storage.get_collection(resolved.id, name)?;
430        self.build_collection_info(&resolved.name, &collection)
431    }
432
433    pub fn list_files(
434        &self,
435        space: Option<&str>,
436        collection: &str,
437        prefix: Option<&str>,
438    ) -> Result<Vec<FileEntry>> {
439        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
440        let resolved_space = self.resolve_space_row(space, Some(collection))?;
441        let collection_row = self.storage.get_collection(resolved_space.id, collection)?;
442        let normalized_prefix = normalize_list_prefix(prefix)?;
443        let file_rows = self
444            .storage
445            .list_collection_file_rows(collection_row.id, false)?;
446
447        let mut files = Vec::with_capacity(file_rows.len());
448        for file_row in file_rows {
449            if let Some(prefix) = normalized_prefix.as_deref() {
450                if !path_matches_prefix(&file_row.path, prefix) {
451                    continue;
452                }
453            }
454
455            files.push(FileEntry {
456                path: file_row.path,
457                title: file_row.title,
458                docid: short_docid(&file_row.hash),
459                active: file_row.active,
460                chunk_count: file_row.chunk_count,
461                embedded: file_row.chunk_count > 0
462                    && file_row.embedded_chunk_count >= file_row.chunk_count,
463            });
464        }
465
466        Ok(files)
467    }
468
469    pub fn get_document(&self, req: GetRequest) -> Result<DocumentResponse> {
470        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
471        self.get_document_unlocked(req)
472    }
473
474    fn get_document_unlocked(&self, req: GetRequest) -> Result<DocumentResponse> {
475        let GetRequest {
476            locator,
477            space,
478            offset,
479            limit,
480        } = req;
481
482        let (document, collection_row, space_name) = match locator {
483            Locator::Path(locator_path) => {
484                let (collection_name, relative_path) = split_collection_path(&locator_path)?;
485                let resolved_space =
486                    self.resolve_space_row(space.as_deref(), Some(&collection_name))?;
487                let collection = self
488                    .storage
489                    .get_collection(resolved_space.id, &collection_name)?;
490                let document = self
491                    .storage
492                    .get_document_by_path(collection.id, &relative_path)?
493                    .ok_or_else(|| KboltError::DocumentNotFound {
494                        path: locator_path.clone(),
495                    })?;
496                (document, collection, resolved_space.name)
497            }
498            Locator::DocId(docid) => {
499                let prefix = normalize_docid(&docid)?;
500                let mut candidates = self
501                    .storage
502                    .get_document_by_hash_prefix(&prefix)?
503                    .into_iter()
504                    .map(|document| {
505                        let collection =
506                            self.storage.get_collection_by_id(document.collection_id)?;
507                        Ok((document, collection))
508                    })
509                    .collect::<Result<Vec<_>>>()?;
510
511                if let Some(space_name) = space.as_deref() {
512                    let resolved_space = self.resolve_space_row(Some(space_name), None)?;
513                    candidates.retain(|(_, collection)| collection.space_id == resolved_space.id);
514                }
515
516                if candidates.is_empty() {
517                    return Err(KboltError::DocumentNotFound {
518                        path: format!("#{prefix}"),
519                    }
520                    .into());
521                }
522
523                if candidates.len() > 1 {
524                    return Err(KboltError::InvalidInput(
525                        "docid is ambiguous; provide more characters".to_string(),
526                    )
527                    .into());
528                }
529
530                let (document, collection) = candidates.pop().expect("candidate exists");
531                let space = self.storage.get_space_by_id(collection.space_id)?;
532                (document, collection, space.name)
533            }
534        };
535
536        let full_path = collection_row.path.join(&document.path);
537        let bytes = match std::fs::read(&full_path) {
538            Ok(bytes) => bytes,
539            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
540                return Err(KboltError::FileDeleted(full_path).into())
541            }
542            Err(err) => return Err(err.into()),
543        };
544
545        let raw_content = String::from_utf8_lossy(&bytes).into_owned();
546        let stale = sha256_hex(&bytes) != document.hash;
547
548        let lines = raw_content.lines().collect::<Vec<_>>();
549        let total_lines = lines.len();
550        let start = offset.unwrap_or(0).min(total_lines);
551        let requested = limit.unwrap_or(total_lines.saturating_sub(start));
552        let end = start.saturating_add(requested).min(total_lines);
553        let returned_lines = end.saturating_sub(start);
554        let content = if returned_lines == 0 {
555            String::new()
556        } else {
557            lines[start..end].join("\n")
558        };
559
560        Ok(DocumentResponse {
561            docid: short_docid(&document.hash),
562            path: format!("{}/{}", collection_row.name, document.path),
563            title: document.title,
564            space: space_name,
565            collection: collection_row.name,
566            content,
567            stale,
568            total_lines,
569            returned_lines,
570        })
571    }
572
573    pub fn multi_get(&self, req: MultiGetRequest) -> Result<MultiGetResponse> {
574        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
575        if req.max_files == 0 {
576            return Err(
577                KboltError::InvalidInput("max_files must be greater than 0".to_string()).into(),
578            );
579        }
580        if req.max_bytes == 0 {
581            return Err(
582                KboltError::InvalidInput("max_bytes must be greater than 0".to_string()).into(),
583            );
584        }
585
586        let mut documents = Vec::new();
587        let mut omitted = Vec::new();
588        let mut resolved_count = 0usize;
589        let mut warnings = Vec::new();
590        let mut consumed_bytes = 0usize;
591
592        for locator in req.locators {
593            let document = match self.get_document_unlocked(GetRequest {
594                locator,
595                space: req.space.clone(),
596                offset: None,
597                limit: None,
598            }) {
599                Ok(document) => document,
600                Err(err) => match KboltError::from(err) {
601                    KboltError::FileDeleted(path) => {
602                        warnings.push(format!(
603                            "file deleted since indexing: {}. run `kbolt update` to refresh.",
604                            path.display()
605                        ));
606                        continue;
607                    }
608                    KboltError::DocumentNotFound { path } => {
609                        warnings.push(format!("document not found: {path}"));
610                        continue;
611                    }
612                    KboltError::InvalidInput(message) => {
613                        warnings.push(format!("invalid locator: {message}"));
614                        continue;
615                    }
616                    KboltError::InvalidPath(path) => {
617                        warnings.push(format!("invalid locator path: {}", path.display()));
618                        continue;
619                    }
620                    KboltError::AmbiguousSpace { collection, spaces } => {
621                        warnings.push(format!(
622                            "ambiguous locator space for collection '{collection}': {:?}. use --space to disambiguate.",
623                            spaces
624                        ));
625                        continue;
626                    }
627                    KboltError::SpaceNotFound { name } => {
628                        warnings.push(format!("space not found for locator: {name}"));
629                        continue;
630                    }
631                    KboltError::CollectionNotFound { name } => {
632                        warnings.push(format!("collection not found for locator: {name}"));
633                        continue;
634                    }
635                    other => return Err(other.into()),
636                },
637            };
638            resolved_count = resolved_count.saturating_add(1);
639
640            let size_bytes = document.content.len();
641            if documents.len() >= req.max_files {
642                omitted.push(OmittedFile {
643                    path: document.path,
644                    docid: document.docid,
645                    size_bytes,
646                    reason: OmitReason::MaxFiles,
647                });
648                continue;
649            }
650
651            if consumed_bytes.saturating_add(size_bytes) > req.max_bytes {
652                omitted.push(OmittedFile {
653                    path: document.path,
654                    docid: document.docid,
655                    size_bytes,
656                    reason: OmitReason::MaxBytes,
657                });
658                continue;
659            }
660
661            consumed_bytes = consumed_bytes.saturating_add(size_bytes);
662            documents.push(document);
663        }
664
665        Ok(MultiGetResponse {
666            resolved_count,
667            documents,
668            omitted,
669            warnings,
670        })
671    }
672
673    pub fn search(&self, req: SearchRequest) -> Result<SearchResponse> {
674        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
675        let started = Instant::now();
676        let query = req.query.trim();
677        if query.is_empty() {
678            return Err(KboltError::InvalidInput("query cannot be empty".to_string()).into());
679        }
680
681        let requested_mode = req.mode.clone();
682        let rerank_enabled =
683            matches!(requested_mode, SearchMode::Auto | SearchMode::Deep) && !req.no_rerank;
684        let mut pipeline = self.initial_search_pipeline(&requested_mode);
685
686        let targets = self.resolve_targets(TargetScope {
687            space: req.space.as_deref(),
688            collections: &req.collections,
689        })?;
690
691        let staleness_hint = targets
692            .iter()
693            .map(|target| target.collection.updated.clone())
694            .max()
695            .map(|updated| format!("Index last updated: {updated}"));
696
697        if req.limit == 0 || targets.is_empty() {
698            return Ok(SearchResponse {
699                results: Vec::new(),
700                query: req.query,
701                requested_mode: requested_mode.clone(),
702                effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
703                pipeline,
704                staleness_hint,
705                elapsed_ms: started.elapsed().as_millis() as u64,
706            });
707        }
708
709        let mut collections_by_id: HashMap<i64, SearchCollectionMeta> = HashMap::new();
710        for target in &targets {
711            collections_by_id.insert(
712                target.collection.id,
713                SearchCollectionMeta {
714                    space: target.space.clone(),
715                    collection: target.collection.name.clone(),
716                    path: target.collection.path.clone(),
717                },
718            );
719        }
720
721        let max_candidates = self.max_search_candidates(&targets)?;
722        let mut retrieval_limit =
723            self.initial_search_candidate_limit(&requested_mode, req.limit, rerank_enabled);
724        let results = loop {
725            let ranked_chunks = self.rank_chunks_for_mode(
726                &requested_mode,
727                &targets,
728                query,
729                retrieval_limit,
730                req.min_score,
731                &mut pipeline,
732            )?;
733
734            if ranked_chunks.is_empty() {
735                return Ok(SearchResponse {
736                    results: Vec::new(),
737                    query: req.query,
738                    requested_mode: requested_mode.clone(),
739                    effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
740                    pipeline,
741                    staleness_hint,
742                    elapsed_ms: started.elapsed().as_millis() as u64,
743                });
744            }
745
746            let ranked_len = ranked_chunks.len();
747            let results = self.assemble_search_results(
748                query,
749                &requested_mode,
750                ranked_chunks,
751                &collections_by_id,
752                req.debug,
753                rerank_enabled,
754                &mut pipeline,
755                req.limit,
756            )?;
757
758            if results.len() >= req.limit
759                || ranked_len < retrieval_limit
760                || retrieval_limit >= max_candidates
761            {
762                break results;
763            }
764
765            let next_limit = retrieval_limit.saturating_mul(2).min(max_candidates);
766            if next_limit <= retrieval_limit {
767                break results;
768            }
769            retrieval_limit = next_limit;
770        };
771
772        Ok(SearchResponse {
773            results,
774            query: req.query,
775            requested_mode: requested_mode.clone(),
776            effective_mode: self.effective_search_mode(&requested_mode, &pipeline),
777            pipeline,
778            staleness_hint,
779            elapsed_ms: started.elapsed().as_millis() as u64,
780        })
781    }
782
783    pub fn resolve_space(&self, explicit: Option<&str>) -> Result<String> {
784        let resolved = self.resolve_space_row(explicit, None)?;
785        Ok(resolved.name)
786    }
787
788    pub fn current_space(&self, explicit: Option<&str>) -> Result<Option<ActiveSpace>> {
789        let resolved = self.resolve_preferred_space(explicit)?;
790        Ok(resolved.map(|(space, source)| ActiveSpace {
791            name: space.name,
792            source,
793        }))
794    }
795
796    pub fn status(&self, space: Option<&str>) -> Result<StatusResponse> {
797        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
798        let (spaces, totals_scope) = if let Some(space_name) = space {
799            let resolved = self.resolve_space_row(Some(space_name), None)?;
800            (vec![resolved.clone()], Some(resolved.id))
801        } else {
802            (self.storage.list_spaces()?, None)
803        };
804
805        let mut space_statuses = Vec::with_capacity(spaces.len());
806        for space_row in spaces {
807            let collections = self.storage.list_collections(Some(space_row.id))?;
808            let mut collection_statuses = Vec::with_capacity(collections.len());
809            let mut last_updated: Option<String> = None;
810
811            for collection in collections {
812                if last_updated
813                    .as_ref()
814                    .map(|existing| collection.updated > *existing)
815                    .unwrap_or(true)
816                {
817                    last_updated = Some(collection.updated.clone());
818                }
819
820                let documents = self
821                    .storage
822                    .count_documents_in_collection(collection.id, false)?;
823                let active_documents = self
824                    .storage
825                    .count_documents_in_collection(collection.id, true)?;
826                let chunks = self.storage.count_chunks_in_collection(collection.id)?;
827                let embedded_chunks = self
828                    .storage
829                    .count_embedded_chunks_in_collection(collection.id)?;
830
831                collection_statuses.push(CollectionStatus {
832                    name: collection.name,
833                    path: collection.path,
834                    documents,
835                    active_documents,
836                    chunks,
837                    embedded_chunks,
838                    last_updated: collection.updated,
839                });
840            }
841
842            space_statuses.push(SpaceStatus {
843                name: space_row.name,
844                description: space_row.description,
845                collections: collection_statuses,
846                last_updated,
847            });
848        }
849
850        let models = self.model_status_unlocked()?;
851
852        Ok(StatusResponse {
853            spaces: space_statuses,
854            models,
855            cache_dir: self.config.cache_dir.clone(),
856            config_dir: self.config.config_dir.clone(),
857            total_documents: self.storage.count_documents(totals_scope)?,
858            total_chunks: self.storage.count_chunks(totals_scope)?,
859            total_embedded: self.storage.count_embedded_chunks(totals_scope)?,
860            disk_usage: self.storage.disk_usage()?,
861        })
862    }
863
864    pub fn model_status(&self) -> Result<ModelStatus> {
865        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
866        self.model_status_unlocked()
867    }
868
869    fn model_status_unlocked(&self) -> Result<ModelStatus> {
870        models::status(&self.config)
871    }
872
873    pub fn config(&self) -> &Config {
874        &self.config
875    }
876
877    pub fn storage(&self) -> &Storage {
878        &self.storage
879    }
880
881    fn embedding_model_key(&self) -> &str {
882        self.config
883            .roles
884            .embedder
885            .as_ref()
886            .and_then(|role| self.config.providers.get(&role.provider))
887            .map(|profile| profile.model())
888            .unwrap_or("embedder")
889    }
890
891    fn acquire_operation_lock(&self, mode: LockMode) -> Result<OperationLock> {
892        OperationLock::acquire(&self.config.cache_dir, mode)
893    }
894
895    fn collect_document_chunk_ids(&self, doc_ids: &[i64]) -> Result<Vec<i64>> {
896        let mut chunk_ids = Vec::new();
897        for doc_id in doc_ids {
898            let chunks = self.storage.get_chunks_for_document(*doc_id)?;
899            chunk_ids.extend(chunks.into_iter().map(|chunk| chunk.id));
900        }
901        Ok(chunk_ids)
902    }
903
904    fn purge_space_chunks(&self, space: &str, chunk_ids: &[i64]) -> Result<()> {
905        if chunk_ids.is_empty() {
906            return Ok(());
907        }
908
909        self.storage.delete_tantivy(space, chunk_ids)?;
910        self.storage.commit_tantivy(space)?;
911        self.storage.delete_usearch(space, chunk_ids)?;
912        Ok(())
913    }
914
915    fn build_space_info(&self, space: &crate::storage::SpaceRow) -> Result<SpaceInfo> {
916        let collection_count = self.storage.list_collections(Some(space.id))?.len();
917        let document_count = self.storage.count_documents(Some(space.id))?;
918        let chunk_count = self.storage.count_chunks(Some(space.id))?;
919
920        Ok(SpaceInfo {
921            name: space.name.clone(),
922            description: space.description.clone(),
923            collection_count,
924            document_count,
925            chunk_count,
926            created: space.created.clone(),
927        })
928    }
929
930    fn build_collection_info(
931        &self,
932        space_name: &str,
933        collection: &CollectionRow,
934    ) -> Result<CollectionInfo> {
935        let document_count = self
936            .storage
937            .count_documents_in_collection(collection.id, false)?;
938        let active_document_count = self
939            .storage
940            .count_documents_in_collection(collection.id, true)?;
941        let chunk_count = self.storage.count_chunks_in_collection(collection.id)?;
942        let embedded_chunk_count = self
943            .storage
944            .count_embedded_chunks_in_collection(collection.id)?;
945
946        Ok(CollectionInfo {
947            name: collection.name.clone(),
948            space: space_name.to_string(),
949            path: collection.path.clone(),
950            description: collection.description.clone(),
951            extensions: collection.extensions.clone(),
952            document_count,
953            active_document_count,
954            chunk_count,
955            embedded_chunk_count,
956            created: collection.created.clone(),
957            updated: collection.updated.clone(),
958        })
959    }
960
961    fn resolve_space_row(
962        &self,
963        explicit: Option<&str>,
964        collection_for_lookup: Option<&str>,
965    ) -> Result<crate::storage::SpaceRow> {
966        if let Some((space, _source)) = self.resolve_preferred_space(explicit)? {
967            return Ok(space);
968        }
969
970        if let Some(collection) = collection_for_lookup {
971            return match self.storage.find_space_for_collection(collection)? {
972                SpaceResolution::Found(space) => Ok(space),
973                SpaceResolution::Ambiguous(spaces) => Err(KboltError::AmbiguousSpace {
974                    collection: collection.to_string(),
975                    spaces,
976                }
977                .into()),
978                SpaceResolution::NotFound => Err(KboltError::CollectionNotFound {
979                    name: collection.to_string(),
980                }
981                .into()),
982            };
983        }
984
985        Err(KboltError::NoActiveSpace.into())
986    }
987
988    fn resolve_preferred_space(
989        &self,
990        explicit: Option<&str>,
991    ) -> Result<Option<(crate::storage::SpaceRow, ActiveSpaceSource)>> {
992        if let Some(space_name) = explicit {
993            let space = self.storage.get_space(space_name)?;
994            return Ok(Some((space, ActiveSpaceSource::Flag)));
995        }
996
997        if let Ok(space_name) = std::env::var("KBOLT_SPACE") {
998            let trimmed = space_name.trim();
999            if !trimmed.is_empty() {
1000                let space = self.storage.get_space(trimmed)?;
1001                return Ok(Some((space, ActiveSpaceSource::EnvVar)));
1002            }
1003        }
1004
1005        if let Some(space_name) = self.config.default_space.as_deref() {
1006            let space = self.storage.get_space(space_name)?;
1007            return Ok(Some((space, ActiveSpaceSource::ConfigDefault)));
1008        }
1009
1010        Ok(None)
1011    }
1012}
1013
1014#[cfg(test)]
1015mod tests;