Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot<D>,
27    /// Phantom data for wasm builds
28    #[cfg(not(feature = "native"))]
29    _phantom: std::marker::PhantomData<D>,
30    /// Loaded segment readers
31    segments: Vec<Arc<SegmentReader>>,
32    /// Schema
33    schema: Arc<Schema>,
34    /// Default fields for search
35    default_fields: Vec<crate::Field>,
36    /// Tokenizers
37    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
39    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40    /// Lazy global statistics for cross-segment IDF computation
41    global_stats: Arc<LazyGlobalStats>,
42    /// O(1) segment lookup by segment_id
43    segment_map: FxHashMap<u128, usize>,
44    /// Cumulative doc counts for binary-search in doc(global_id)
45    doc_id_cumulative: Vec<u32>,
46}
47
48impl<D: Directory + 'static> Searcher<D> {
49    /// Create a Searcher directly from segment IDs
50    ///
51    /// This is a simpler initialization path that doesn't require SegmentManager.
52    /// Use this for read-only access to pre-built indexes.
53    pub async fn open(
54        directory: Arc<D>,
55        schema: Arc<Schema>,
56        segment_ids: &[String],
57        term_cache_blocks: usize,
58    ) -> Result<Self> {
59        Self::create(
60            directory,
61            schema,
62            segment_ids,
63            FxHashMap::default(),
64            term_cache_blocks,
65        )
66        .await
67    }
68
69    /// Create from a snapshot (for native IndexReader use)
70    #[cfg(feature = "native")]
71    pub(crate) async fn from_snapshot(
72        directory: Arc<D>,
73        schema: Arc<Schema>,
74        snapshot: SegmentSnapshot<D>,
75        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
76        term_cache_blocks: usize,
77    ) -> Result<Self> {
78        let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
79            Self::load_common(
80                &directory,
81                &schema,
82                snapshot.segment_ids(),
83                &trained_centroids,
84                term_cache_blocks,
85            )
86            .await;
87
88        Ok(Self {
89            _snapshot: snapshot,
90            segments,
91            schema,
92            default_fields,
93            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94            trained_centroids,
95            global_stats,
96            segment_map,
97            doc_id_cumulative,
98        })
99    }
100
101    /// Internal create method
102    async fn create(
103        directory: Arc<D>,
104        schema: Arc<Schema>,
105        segment_ids: &[String],
106        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
107        term_cache_blocks: usize,
108    ) -> Result<Self> {
109        let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
110            Self::load_common(
111                &directory,
112                &schema,
113                segment_ids,
114                &trained_centroids,
115                term_cache_blocks,
116            )
117            .await;
118
119        #[cfg(feature = "native")]
120        {
121            let tracker = Arc::new(SegmentTracker::new());
122            let snapshot = SegmentSnapshot::new(tracker, segment_ids.to_vec());
123            Ok(Self {
124                _snapshot: snapshot,
125                segments,
126                schema,
127                default_fields,
128                tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
129                trained_centroids,
130                global_stats,
131                segment_map,
132                doc_id_cumulative,
133            })
134        }
135
136        #[cfg(not(feature = "native"))]
137        {
138            let _ = directory; // suppress unused warning
139            Ok(Self {
140                _phantom: std::marker::PhantomData,
141                segments,
142                schema,
143                default_fields,
144                tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
145                trained_centroids,
146                global_stats,
147                segment_map,
148                doc_id_cumulative,
149            })
150        }
151    }
152
153    /// Common loading logic shared by create and from_snapshot
154    async fn load_common(
155        directory: &Arc<D>,
156        schema: &Arc<Schema>,
157        segment_ids: &[String],
158        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
159        term_cache_blocks: usize,
160    ) -> (
161        Vec<Arc<SegmentReader>>,
162        Vec<crate::Field>,
163        Arc<LazyGlobalStats>,
164        FxHashMap<u128, usize>,
165        Vec<u32>,
166    ) {
167        let segments = Self::load_segments(
168            directory,
169            schema,
170            segment_ids,
171            trained_centroids,
172            term_cache_blocks,
173        )
174        .await;
175        let default_fields = Self::build_default_fields(schema);
176        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
177        let (segment_map, doc_id_cumulative) = Self::build_lookup_tables(&segments);
178        (
179            segments,
180            default_fields,
181            global_stats,
182            segment_map,
183            doc_id_cumulative,
184        )
185    }
186
187    /// Load segment readers from IDs (parallel loading for performance)
188    async fn load_segments(
189        directory: &Arc<D>,
190        schema: &Arc<Schema>,
191        segment_ids: &[String],
192        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
193        term_cache_blocks: usize,
194    ) -> Vec<Arc<SegmentReader>> {
195        // Parse segment IDs and filter invalid ones
196        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
197            .iter()
198            .enumerate()
199            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
200            .collect();
201
202        // Load all segments in parallel with offset=0
203        let futures: Vec<_> =
204            valid_segments
205                .iter()
206                .map(|(_, segment_id)| {
207                    let dir = Arc::clone(directory);
208                    let sch = Arc::clone(schema);
209                    let sid = *segment_id;
210                    async move {
211                        SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
212                    }
213                })
214                .collect();
215
216        let results = futures::future::join_all(futures).await;
217
218        // Collect successful results with their original index for ordering
219        let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
220            .into_iter()
221            .zip(results)
222            .filter_map(|((idx, _), result)| match result {
223                Ok(reader) => Some((idx, reader)),
224                Err(e) => {
225                    log::warn!("Failed to open segment: {:?}", e);
226                    None
227                }
228            })
229            .collect();
230
231        // Sort by original index to maintain deterministic ordering
232        loaded.sort_by_key(|(idx, _)| *idx);
233
234        // Calculate and assign doc_id_offsets sequentially
235        let mut doc_id_offset = 0u32;
236        let mut segments = Vec::with_capacity(loaded.len());
237        for (_, mut reader) in loaded {
238            reader.set_doc_id_offset(doc_id_offset);
239            doc_id_offset += reader.meta().num_docs;
240            // Inject per-field centroids into reader for IVF/ScaNN search
241            if !trained_centroids.is_empty() {
242                reader.set_coarse_centroids(trained_centroids.clone());
243            }
244            segments.push(Arc::new(reader));
245        }
246
247        // Log searcher loading summary
248        let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
249        let total_sparse_mem: usize = segments
250            .iter()
251            .flat_map(|s| s.sparse_indexes().values())
252            .map(|idx| idx.num_dimensions() * 12)
253            .sum();
254        log::info!(
255            "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
256            segments.len(),
257            total_docs,
258            total_sparse_mem as f64 / (1024.0 * 1024.0)
259        );
260
261        segments
262    }
263
264    /// Build default fields from schema
265    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
266        if !schema.default_fields().is_empty() {
267            schema.default_fields().to_vec()
268        } else {
269            schema
270                .fields()
271                .filter(|(_, entry)| {
272                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
273                })
274                .map(|(field, _)| field)
275                .collect()
276        }
277    }
278
279    /// Get the schema
280    pub fn schema(&self) -> &Schema {
281        &self.schema
282    }
283
284    /// Get segment readers
285    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
286        &self.segments
287    }
288
289    /// Get default fields for search
290    pub fn default_fields(&self) -> &[crate::Field] {
291        &self.default_fields
292    }
293
294    /// Get tokenizer registry
295    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
296        &self.tokenizers
297    }
298
299    /// Get trained centroids
300    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
301        &self.trained_centroids
302    }
303
304    /// Get lazy global statistics for cross-segment IDF computation
305    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
306        &self.global_stats
307    }
308
309    /// Build O(1) lookup tables from loaded segments
310    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, Vec<u32>) {
311        let mut segment_map = FxHashMap::default();
312        let mut cumulative = Vec::with_capacity(segments.len());
313        let mut acc = 0u32;
314        for (i, seg) in segments.iter().enumerate() {
315            segment_map.insert(seg.meta().id, i);
316            acc += seg.meta().num_docs;
317            cumulative.push(acc);
318        }
319        (segment_map, cumulative)
320    }
321
322    /// Get total document count across all segments
323    pub fn num_docs(&self) -> u32 {
324        self.doc_id_cumulative.last().copied().unwrap_or(0)
325    }
326
327    /// Get O(1) segment_id → index map (used by reranker)
328    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
329        &self.segment_map
330    }
331
332    /// Get number of segments
333    pub fn num_segments(&self) -> usize {
334        self.segments.len()
335    }
336
337    /// Get a document by global doc_id (binary search on cumulative offsets)
338    pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
339        let idx = self.doc_id_cumulative.partition_point(|&cum| cum <= doc_id);
340        if idx >= self.segments.len() {
341            return Ok(None);
342        }
343        let base = if idx == 0 {
344            0
345        } else {
346            self.doc_id_cumulative[idx - 1]
347        };
348        self.segments[idx].doc(doc_id - base).await
349    }
350
351    /// Search across all segments and return aggregated results
352    pub async fn search(
353        &self,
354        query: &dyn crate::query::Query,
355        limit: usize,
356    ) -> Result<Vec<crate::query::SearchResult>> {
357        let (results, _) = self.search_with_count(query, limit).await?;
358        Ok(results)
359    }
360
361    /// Search across all segments and return (results, total_seen)
362    /// total_seen is the number of documents that were scored across all segments
363    pub async fn search_with_count(
364        &self,
365        query: &dyn crate::query::Query,
366        limit: usize,
367    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
368        self.search_with_offset_and_count(query, limit, 0).await
369    }
370
371    /// Search with offset for pagination
372    pub async fn search_with_offset(
373        &self,
374        query: &dyn crate::query::Query,
375        limit: usize,
376        offset: usize,
377    ) -> Result<Vec<crate::query::SearchResult>> {
378        let (results, _) = self
379            .search_with_offset_and_count(query, limit, offset)
380            .await?;
381        Ok(results)
382    }
383
384    /// Search with offset and return (results, total_seen)
385    pub async fn search_with_offset_and_count(
386        &self,
387        query: &dyn crate::query::Query,
388        limit: usize,
389        offset: usize,
390    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
391        self.search_internal(query, limit, offset, false).await
392    }
393
394    /// Search with positions (ordinal tracking) and return (results, total_seen)
395    ///
396    /// Use this when you need per-ordinal scores for multi-valued fields.
397    pub async fn search_with_positions(
398        &self,
399        query: &dyn crate::query::Query,
400        limit: usize,
401    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
402        self.search_internal(query, limit, 0, true).await
403    }
404
405    /// Internal search implementation
406    async fn search_internal(
407        &self,
408        query: &dyn crate::query::Query,
409        limit: usize,
410        offset: usize,
411        collect_positions: bool,
412    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
413        let fetch_limit = offset + limit;
414
415        let futures: Vec<_> = self
416            .segments
417            .iter()
418            .map(|segment| {
419                let sid = segment.meta().id;
420                async move {
421                    let (mut results, segment_seen) = if collect_positions {
422                        crate::query::search_segment_with_positions_and_count(
423                            segment.as_ref(),
424                            query,
425                            fetch_limit,
426                        )
427                        .await?
428                    } else {
429                        crate::query::search_segment_with_count(
430                            segment.as_ref(),
431                            query,
432                            fetch_limit,
433                        )
434                        .await?
435                    };
436                    // Stamp segment_id on each result
437                    for r in &mut results {
438                        r.segment_id = sid;
439                    }
440                    Ok::<_, crate::error::Error>((results, segment_seen))
441                }
442            })
443            .collect();
444
445        let batches = futures::future::try_join_all(futures).await?;
446        let mut all_results: Vec<crate::query::SearchResult> = Vec::new();
447        let mut total_seen: u32 = 0;
448        for (batch, segment_seen) in batches {
449            total_seen += segment_seen;
450            all_results.extend(batch);
451        }
452
453        // Sort by score descending
454        all_results.sort_by(|a, b| {
455            b.score
456                .partial_cmp(&a.score)
457                .unwrap_or(std::cmp::Ordering::Equal)
458        });
459
460        // Apply offset and limit
461        let results = all_results.into_iter().skip(offset).take(limit).collect();
462
463        Ok((results, total_seen))
464    }
465
466    /// Two-stage search: L1 retrieval + L2 dense vector reranking
467    ///
468    /// Runs the query to get `l1_limit` candidates, then reranks by exact
469    /// dense vector distance and returns the top `final_limit` results.
470    pub async fn search_and_rerank(
471        &self,
472        query: &dyn crate::query::Query,
473        l1_limit: usize,
474        final_limit: usize,
475        config: &crate::query::RerankerConfig,
476    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
477        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
478        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
479        Ok((reranked, total_seen))
480    }
481
482    /// Parse query string and search (convenience method)
483    pub async fn query(
484        &self,
485        query_str: &str,
486        limit: usize,
487    ) -> Result<crate::query::SearchResponse> {
488        self.query_offset(query_str, limit, 0).await
489    }
490
491    /// Parse query string and search with offset (convenience method)
492    pub async fn query_offset(
493        &self,
494        query_str: &str,
495        limit: usize,
496        offset: usize,
497    ) -> Result<crate::query::SearchResponse> {
498        let parser = self.query_parser();
499        let query = parser
500            .parse(query_str)
501            .map_err(crate::error::Error::Query)?;
502
503        let (results, _total_seen) = self
504            .search_internal(query.as_ref(), limit, offset, false)
505            .await?;
506
507        let total_hits = results.len() as u32;
508        let hits: Vec<crate::query::SearchHit> = results
509            .into_iter()
510            .map(|result| crate::query::SearchHit {
511                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
512                score: result.score,
513                matched_fields: result.extract_ordinals(),
514            })
515            .collect();
516
517        Ok(crate::query::SearchResponse { hits, total_hits })
518    }
519
520    /// Get query parser for this searcher
521    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
522        let query_routers = self.schema.query_routers();
523        if !query_routers.is_empty()
524            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
525        {
526            return crate::dsl::QueryLanguageParser::with_router(
527                Arc::clone(&self.schema),
528                self.default_fields.clone(),
529                Arc::clone(&self.tokenizers),
530                router,
531            );
532        }
533
534        crate::dsl::QueryLanguageParser::new(
535            Arc::clone(&self.schema),
536            self.default_fields.clone(),
537            Arc::clone(&self.tokenizers),
538        )
539    }
540
541    /// Get a document by address (segment_id + global doc_id)
542    ///
543    /// The doc_id in the address is a global doc_id (with doc_id_offset applied).
544    /// This method converts it back to a segment-local doc_id for the store lookup.
545    pub async fn get_document(
546        &self,
547        address: &crate::query::DocAddress,
548    ) -> Result<Option<crate::dsl::Document>> {
549        self.get_document_with_fields(address, None).await
550    }
551
552    /// Get a document by address, hydrating only the specified field IDs.
553    ///
554    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
555    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
556    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
557    pub async fn get_document_with_fields(
558        &self,
559        address: &crate::query::DocAddress,
560        fields: Option<&rustc_hash::FxHashSet<u32>>,
561    ) -> Result<Option<crate::dsl::Document>> {
562        let segment_id = address.segment_id_u128().ok_or_else(|| {
563            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
564        })?;
565
566        if let Some(&idx) = self.segment_map.get(&segment_id) {
567            let segment = &self.segments[idx];
568            let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
569            return segment.doc_with_fields(local_doc_id, fields).await;
570        }
571
572        Ok(None)
573    }
574}