Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot,
27    /// PhantomData for the directory generic
28    _phantom: std::marker::PhantomData<D>,
29    /// Loaded segment readers
30    segments: Vec<Arc<SegmentReader>>,
31    /// Schema
32    schema: Arc<Schema>,
33    /// Default fields for search
34    default_fields: Vec<crate::Field>,
35    /// Tokenizers
36    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
38    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39    /// Lazy global statistics for cross-segment IDF computation
40    global_stats: Arc<LazyGlobalStats>,
41    /// O(1) segment lookup by segment_id
42    segment_map: FxHashMap<u128, usize>,
43    /// Cumulative doc counts for binary-search in doc(global_id)
44    doc_id_cumulative: Vec<u32>,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48    /// Create a Searcher directly from segment IDs
49    ///
50    /// This is a simpler initialization path that doesn't require SegmentManager.
51    /// Use this for read-only access to pre-built indexes.
52    pub async fn open(
53        directory: Arc<D>,
54        schema: Arc<Schema>,
55        segment_ids: &[String],
56        term_cache_blocks: usize,
57    ) -> Result<Self> {
58        Self::create(
59            directory,
60            schema,
61            segment_ids,
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        term_cache_blocks: usize,
76    ) -> Result<Self> {
77        let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
78            Self::load_common(
79                &directory,
80                &schema,
81                snapshot.segment_ids(),
82                &trained_centroids,
83                term_cache_blocks,
84            )
85            .await;
86
87        Ok(Self {
88            _snapshot: snapshot,
89            _phantom: std::marker::PhantomData,
90            segments,
91            schema,
92            default_fields,
93            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94            trained_centroids,
95            global_stats,
96            segment_map,
97            doc_id_cumulative,
98        })
99    }
100
101    /// Internal create method
102    async fn create(
103        directory: Arc<D>,
104        schema: Arc<Schema>,
105        segment_ids: &[String],
106        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
107        term_cache_blocks: usize,
108    ) -> Result<Self> {
109        let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
110            Self::load_common(
111                &directory,
112                &schema,
113                segment_ids,
114                &trained_centroids,
115                term_cache_blocks,
116            )
117            .await;
118
119        #[cfg(feature = "native")]
120        let _snapshot = {
121            let tracker = Arc::new(SegmentTracker::new());
122            SegmentSnapshot::new(tracker, segment_ids.to_vec())
123        };
124
125        let _ = directory; // suppress unused warning on wasm
126        Ok(Self {
127            #[cfg(feature = "native")]
128            _snapshot,
129            _phantom: std::marker::PhantomData,
130            segments,
131            schema,
132            default_fields,
133            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
134            trained_centroids,
135            global_stats,
136            segment_map,
137            doc_id_cumulative,
138        })
139    }
140
141    /// Common loading logic shared by create and from_snapshot
142    async fn load_common(
143        directory: &Arc<D>,
144        schema: &Arc<Schema>,
145        segment_ids: &[String],
146        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
147        term_cache_blocks: usize,
148    ) -> (
149        Vec<Arc<SegmentReader>>,
150        Vec<crate::Field>,
151        Arc<LazyGlobalStats>,
152        FxHashMap<u128, usize>,
153        Vec<u32>,
154    ) {
155        let segments = Self::load_segments(
156            directory,
157            schema,
158            segment_ids,
159            trained_centroids,
160            term_cache_blocks,
161        )
162        .await;
163        let default_fields = Self::build_default_fields(schema);
164        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
165        let (segment_map, doc_id_cumulative) = Self::build_lookup_tables(&segments);
166        (
167            segments,
168            default_fields,
169            global_stats,
170            segment_map,
171            doc_id_cumulative,
172        )
173    }
174
175    /// Load segment readers from IDs (parallel loading for performance)
176    async fn load_segments(
177        directory: &Arc<D>,
178        schema: &Arc<Schema>,
179        segment_ids: &[String],
180        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
181        term_cache_blocks: usize,
182    ) -> Vec<Arc<SegmentReader>> {
183        // Parse segment IDs and filter invalid ones
184        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
185            .iter()
186            .enumerate()
187            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
188            .collect();
189
190        // Load all segments in parallel with offset=0
191        let futures: Vec<_> =
192            valid_segments
193                .iter()
194                .map(|(_, segment_id)| {
195                    let dir = Arc::clone(directory);
196                    let sch = Arc::clone(schema);
197                    let sid = *segment_id;
198                    async move {
199                        SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
200                    }
201                })
202                .collect();
203
204        let results = futures::future::join_all(futures).await;
205
206        // Collect successful results with their original index for ordering
207        let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
208            .into_iter()
209            .zip(results)
210            .filter_map(|((idx, _), result)| match result {
211                Ok(reader) => Some((idx, reader)),
212                Err(e) => {
213                    log::warn!("Failed to open segment: {:?}", e);
214                    None
215                }
216            })
217            .collect();
218
219        // Sort by original index to maintain deterministic ordering
220        loaded.sort_by_key(|(idx, _)| *idx);
221
222        // Calculate and assign doc_id_offsets sequentially
223        let mut doc_id_offset = 0u32;
224        let mut segments = Vec::with_capacity(loaded.len());
225        for (_, mut reader) in loaded {
226            reader.set_doc_id_offset(doc_id_offset);
227            doc_id_offset += reader.meta().num_docs;
228            // Inject per-field centroids into reader for IVF/ScaNN search
229            if !trained_centroids.is_empty() {
230                reader.set_coarse_centroids(trained_centroids.clone());
231            }
232            segments.push(Arc::new(reader));
233        }
234
235        // Log searcher loading summary
236        let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
237        let total_sparse_mem: usize = segments
238            .iter()
239            .flat_map(|s| s.sparse_indexes().values())
240            .map(|idx| idx.num_dimensions() * 12)
241            .sum();
242        log::info!(
243            "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
244            segments.len(),
245            total_docs,
246            total_sparse_mem as f64 / (1024.0 * 1024.0)
247        );
248
249        segments
250    }
251
252    /// Build default fields from schema
253    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
254        if !schema.default_fields().is_empty() {
255            schema.default_fields().to_vec()
256        } else {
257            schema
258                .fields()
259                .filter(|(_, entry)| {
260                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
261                })
262                .map(|(field, _)| field)
263                .collect()
264        }
265    }
266
267    /// Get the schema
268    pub fn schema(&self) -> &Schema {
269        &self.schema
270    }
271
272    /// Get segment readers
273    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
274        &self.segments
275    }
276
277    /// Get default fields for search
278    pub fn default_fields(&self) -> &[crate::Field] {
279        &self.default_fields
280    }
281
282    /// Get tokenizer registry
283    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
284        &self.tokenizers
285    }
286
287    /// Get trained centroids
288    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
289        &self.trained_centroids
290    }
291
292    /// Get lazy global statistics for cross-segment IDF computation
293    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
294        &self.global_stats
295    }
296
297    /// Build O(1) lookup tables from loaded segments
298    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, Vec<u32>) {
299        let mut segment_map = FxHashMap::default();
300        let mut cumulative = Vec::with_capacity(segments.len());
301        let mut acc = 0u32;
302        for (i, seg) in segments.iter().enumerate() {
303            segment_map.insert(seg.meta().id, i);
304            acc += seg.meta().num_docs;
305            cumulative.push(acc);
306        }
307        (segment_map, cumulative)
308    }
309
310    /// Get total document count across all segments
311    pub fn num_docs(&self) -> u32 {
312        self.doc_id_cumulative.last().copied().unwrap_or(0)
313    }
314
315    /// Get O(1) segment_id → index map (used by reranker)
316    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
317        &self.segment_map
318    }
319
320    /// Get number of segments
321    pub fn num_segments(&self) -> usize {
322        self.segments.len()
323    }
324
325    /// Get a document by global doc_id (binary search on cumulative offsets)
326    pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
327        let idx = self.doc_id_cumulative.partition_point(|&cum| cum <= doc_id);
328        if idx >= self.segments.len() {
329            return Ok(None);
330        }
331        let base = if idx == 0 {
332            0
333        } else {
334            self.doc_id_cumulative[idx - 1]
335        };
336        self.segments[idx].doc(doc_id - base).await
337    }
338
339    /// Search across all segments and return aggregated results
340    pub async fn search(
341        &self,
342        query: &dyn crate::query::Query,
343        limit: usize,
344    ) -> Result<Vec<crate::query::SearchResult>> {
345        let (results, _) = self.search_with_count(query, limit).await?;
346        Ok(results)
347    }
348
349    /// Search across all segments and return (results, total_seen)
350    /// total_seen is the number of documents that were scored across all segments
351    pub async fn search_with_count(
352        &self,
353        query: &dyn crate::query::Query,
354        limit: usize,
355    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
356        self.search_with_offset_and_count(query, limit, 0).await
357    }
358
359    /// Search with offset for pagination
360    pub async fn search_with_offset(
361        &self,
362        query: &dyn crate::query::Query,
363        limit: usize,
364        offset: usize,
365    ) -> Result<Vec<crate::query::SearchResult>> {
366        let (results, _) = self
367            .search_with_offset_and_count(query, limit, offset)
368            .await?;
369        Ok(results)
370    }
371
372    /// Search with offset and return (results, total_seen)
373    pub async fn search_with_offset_and_count(
374        &self,
375        query: &dyn crate::query::Query,
376        limit: usize,
377        offset: usize,
378    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
379        self.search_internal(query, limit, offset, false).await
380    }
381
382    /// Search with positions (ordinal tracking) and return (results, total_seen)
383    ///
384    /// Use this when you need per-ordinal scores for multi-valued fields.
385    pub async fn search_with_positions(
386        &self,
387        query: &dyn crate::query::Query,
388        limit: usize,
389    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
390        self.search_internal(query, limit, 0, true).await
391    }
392
393    /// Internal search implementation
394    async fn search_internal(
395        &self,
396        query: &dyn crate::query::Query,
397        limit: usize,
398        offset: usize,
399        collect_positions: bool,
400    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
401        let fetch_limit = offset + limit;
402
403        let futures: Vec<_> = self
404            .segments
405            .iter()
406            .map(|segment| {
407                let sid = segment.meta().id;
408                async move {
409                    let (mut results, segment_seen) = if collect_positions {
410                        crate::query::search_segment_with_positions_and_count(
411                            segment.as_ref(),
412                            query,
413                            fetch_limit,
414                        )
415                        .await?
416                    } else {
417                        crate::query::search_segment_with_count(
418                            segment.as_ref(),
419                            query,
420                            fetch_limit,
421                        )
422                        .await?
423                    };
424                    // Stamp segment_id on each result
425                    for r in &mut results {
426                        r.segment_id = sid;
427                    }
428                    Ok::<_, crate::error::Error>((results, segment_seen))
429                }
430            })
431            .collect();
432
433        let batches = futures::future::try_join_all(futures).await?;
434        let mut all_results: Vec<crate::query::SearchResult> = Vec::new();
435        let mut total_seen: u32 = 0;
436        for (batch, segment_seen) in batches {
437            total_seen += segment_seen;
438            all_results.extend(batch);
439        }
440
441        // Sort by score descending
442        all_results.sort_by(|a, b| {
443            b.score
444                .partial_cmp(&a.score)
445                .unwrap_or(std::cmp::Ordering::Equal)
446        });
447
448        // Apply offset and limit
449        let results = all_results.into_iter().skip(offset).take(limit).collect();
450
451        Ok((results, total_seen))
452    }
453
454    /// Two-stage search: L1 retrieval + L2 dense vector reranking
455    ///
456    /// Runs the query to get `l1_limit` candidates, then reranks by exact
457    /// dense vector distance and returns the top `final_limit` results.
458    pub async fn search_and_rerank(
459        &self,
460        query: &dyn crate::query::Query,
461        l1_limit: usize,
462        final_limit: usize,
463        config: &crate::query::RerankerConfig,
464    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
465        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
466        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
467        Ok((reranked, total_seen))
468    }
469
470    /// Parse query string and search (convenience method)
471    pub async fn query(
472        &self,
473        query_str: &str,
474        limit: usize,
475    ) -> Result<crate::query::SearchResponse> {
476        self.query_offset(query_str, limit, 0).await
477    }
478
479    /// Parse query string and search with offset (convenience method)
480    pub async fn query_offset(
481        &self,
482        query_str: &str,
483        limit: usize,
484        offset: usize,
485    ) -> Result<crate::query::SearchResponse> {
486        let parser = self.query_parser();
487        let query = parser
488            .parse(query_str)
489            .map_err(crate::error::Error::Query)?;
490
491        let (results, _total_seen) = self
492            .search_internal(query.as_ref(), limit, offset, false)
493            .await?;
494
495        let total_hits = results.len() as u32;
496        let hits: Vec<crate::query::SearchHit> = results
497            .into_iter()
498            .map(|result| crate::query::SearchHit {
499                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
500                score: result.score,
501                matched_fields: result.extract_ordinals(),
502            })
503            .collect();
504
505        Ok(crate::query::SearchResponse { hits, total_hits })
506    }
507
508    /// Get query parser for this searcher
509    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
510        let query_routers = self.schema.query_routers();
511        if !query_routers.is_empty()
512            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
513        {
514            return crate::dsl::QueryLanguageParser::with_router(
515                Arc::clone(&self.schema),
516                self.default_fields.clone(),
517                Arc::clone(&self.tokenizers),
518                router,
519            );
520        }
521
522        crate::dsl::QueryLanguageParser::new(
523            Arc::clone(&self.schema),
524            self.default_fields.clone(),
525            Arc::clone(&self.tokenizers),
526        )
527    }
528
529    /// Get a document by address (segment_id + global doc_id)
530    ///
531    /// The doc_id in the address is a global doc_id (with doc_id_offset applied).
532    /// This method converts it back to a segment-local doc_id for the store lookup.
533    pub async fn get_document(
534        &self,
535        address: &crate::query::DocAddress,
536    ) -> Result<Option<crate::dsl::Document>> {
537        self.get_document_with_fields(address, None).await
538    }
539
540    /// Get a document by address, hydrating only the specified field IDs.
541    ///
542    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
543    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
544    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
545    pub async fn get_document_with_fields(
546        &self,
547        address: &crate::query::DocAddress,
548        fields: Option<&rustc_hash::FxHashSet<u32>>,
549    ) -> Result<Option<crate::dsl::Document>> {
550        let segment_id = address.segment_id_u128().ok_or_else(|| {
551            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
552        })?;
553
554        if let Some(&idx) = self.segment_map.get(&segment_id) {
555            let segment = &self.segments[idx];
556            let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
557            return segment.doc_with_fields(local_doc_id, fields).await;
558        }
559
560        Ok(None)
561    }
562}