Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot<D>,
27    /// Phantom data for wasm builds
28    #[cfg(not(feature = "native"))]
29    _phantom: std::marker::PhantomData<D>,
30    /// Loaded segment readers
31    segments: Vec<Arc<SegmentReader>>,
32    /// Schema
33    schema: Arc<Schema>,
34    /// Default fields for search
35    default_fields: Vec<crate::Field>,
36    /// Tokenizers
37    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
39    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40    /// Lazy global statistics for cross-segment IDF computation
41    global_stats: Arc<LazyGlobalStats>,
42}
43
44impl<D: Directory + 'static> Searcher<D> {
45    /// Create a Searcher directly from segment IDs
46    ///
47    /// This is a simpler initialization path that doesn't require SegmentManager.
48    /// Use this for read-only access to pre-built indexes.
49    pub async fn open(
50        directory: Arc<D>,
51        schema: Arc<Schema>,
52        segment_ids: &[String],
53        term_cache_blocks: usize,
54    ) -> Result<Self> {
55        Self::create(
56            directory,
57            schema,
58            segment_ids,
59            FxHashMap::default(),
60            term_cache_blocks,
61        )
62        .await
63    }
64
65    /// Create from a snapshot (for native IndexReader use)
66    #[cfg(feature = "native")]
67    pub(crate) async fn from_snapshot(
68        directory: Arc<D>,
69        schema: Arc<Schema>,
70        snapshot: SegmentSnapshot<D>,
71        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
72        term_cache_blocks: usize,
73    ) -> Result<Self> {
74        let (segments, default_fields, global_stats) = Self::load_common(
75            &directory,
76            &schema,
77            snapshot.segment_ids(),
78            &trained_centroids,
79            term_cache_blocks,
80        )
81        .await;
82
83        Ok(Self {
84            _snapshot: snapshot,
85            segments,
86            schema,
87            default_fields,
88            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
89            trained_centroids,
90            global_stats,
91        })
92    }
93
94    /// Internal create method
95    async fn create(
96        directory: Arc<D>,
97        schema: Arc<Schema>,
98        segment_ids: &[String],
99        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
100        term_cache_blocks: usize,
101    ) -> Result<Self> {
102        let (segments, default_fields, global_stats) = Self::load_common(
103            &directory,
104            &schema,
105            segment_ids,
106            &trained_centroids,
107            term_cache_blocks,
108        )
109        .await;
110
111        #[cfg(feature = "native")]
112        {
113            let tracker = Arc::new(SegmentTracker::new());
114            let snapshot = SegmentSnapshot::new(tracker, segment_ids.to_vec());
115            Ok(Self {
116                _snapshot: snapshot,
117                segments,
118                schema,
119                default_fields,
120                tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
121                trained_centroids,
122                global_stats,
123            })
124        }
125
126        #[cfg(not(feature = "native"))]
127        {
128            let _ = directory; // suppress unused warning
129            Ok(Self {
130                _phantom: std::marker::PhantomData,
131                segments,
132                schema,
133                default_fields,
134                tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135                trained_centroids,
136                global_stats,
137            })
138        }
139    }
140
141    /// Common loading logic shared by create and from_snapshot
142    async fn load_common(
143        directory: &Arc<D>,
144        schema: &Arc<Schema>,
145        segment_ids: &[String],
146        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
147        term_cache_blocks: usize,
148    ) -> (
149        Vec<Arc<SegmentReader>>,
150        Vec<crate::Field>,
151        Arc<LazyGlobalStats>,
152    ) {
153        let segments = Self::load_segments(
154            directory,
155            schema,
156            segment_ids,
157            trained_centroids,
158            term_cache_blocks,
159        )
160        .await;
161        let default_fields = Self::build_default_fields(schema);
162        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163        (segments, default_fields, global_stats)
164    }
165
166    /// Load segment readers from IDs (parallel loading for performance)
167    async fn load_segments(
168        directory: &Arc<D>,
169        schema: &Arc<Schema>,
170        segment_ids: &[String],
171        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
172        term_cache_blocks: usize,
173    ) -> Vec<Arc<SegmentReader>> {
174        // Parse segment IDs and filter invalid ones
175        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
176            .iter()
177            .enumerate()
178            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
179            .collect();
180
181        // Load all segments in parallel with offset=0
182        let futures: Vec<_> =
183            valid_segments
184                .iter()
185                .map(|(_, segment_id)| {
186                    let dir = Arc::clone(directory);
187                    let sch = Arc::clone(schema);
188                    let sid = *segment_id;
189                    async move {
190                        SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
191                    }
192                })
193                .collect();
194
195        let results = futures::future::join_all(futures).await;
196
197        // Collect successful results with their original index for ordering
198        let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
199            .into_iter()
200            .zip(results)
201            .filter_map(|((idx, _), result)| match result {
202                Ok(reader) => Some((idx, reader)),
203                Err(e) => {
204                    log::warn!("Failed to open segment: {:?}", e);
205                    None
206                }
207            })
208            .collect();
209
210        // Sort by original index to maintain deterministic ordering
211        loaded.sort_by_key(|(idx, _)| *idx);
212
213        // Calculate and assign doc_id_offsets sequentially
214        let mut doc_id_offset = 0u32;
215        let mut segments = Vec::with_capacity(loaded.len());
216        for (_, mut reader) in loaded {
217            reader.set_doc_id_offset(doc_id_offset);
218            doc_id_offset += reader.meta().num_docs;
219            // Inject per-field centroids into reader for IVF/ScaNN search
220            if !trained_centroids.is_empty() {
221                reader.set_coarse_centroids(trained_centroids.clone());
222            }
223            segments.push(Arc::new(reader));
224        }
225
226        // Log searcher loading summary
227        let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
228        let total_sparse_mem: usize = segments
229            .iter()
230            .flat_map(|s| s.sparse_indexes().values())
231            .map(|idx| idx.num_dimensions() * 12)
232            .sum();
233        log::info!(
234            "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
235            segments.len(),
236            total_docs,
237            total_sparse_mem as f64 / (1024.0 * 1024.0)
238        );
239
240        segments
241    }
242
243    /// Build default fields from schema
244    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
245        if !schema.default_fields().is_empty() {
246            schema.default_fields().to_vec()
247        } else {
248            schema
249                .fields()
250                .filter(|(_, entry)| {
251                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
252                })
253                .map(|(field, _)| field)
254                .collect()
255        }
256    }
257
258    /// Get the schema
259    pub fn schema(&self) -> &Schema {
260        &self.schema
261    }
262
263    /// Get segment readers
264    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
265        &self.segments
266    }
267
268    /// Get default fields for search
269    pub fn default_fields(&self) -> &[crate::Field] {
270        &self.default_fields
271    }
272
273    /// Get tokenizer registry
274    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
275        &self.tokenizers
276    }
277
278    /// Get trained centroids
279    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
280        &self.trained_centroids
281    }
282
283    /// Get lazy global statistics for cross-segment IDF computation
284    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
285        &self.global_stats
286    }
287
288    /// Get total document count across all segments
289    pub fn num_docs(&self) -> u32 {
290        self.segments.iter().map(|s| s.meta().num_docs).sum()
291    }
292
293    /// Get number of segments
294    pub fn num_segments(&self) -> usize {
295        self.segments.len()
296    }
297
298    /// Get a document by global doc_id
299    pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
300        let mut offset = 0u32;
301        for segment in &self.segments {
302            let segment_docs = segment.meta().num_docs;
303            if doc_id < offset + segment_docs {
304                let local_doc_id = doc_id - offset;
305                return segment.doc(local_doc_id).await;
306            }
307            offset += segment_docs;
308        }
309        Ok(None)
310    }
311
312    /// Search across all segments and return aggregated results
313    pub async fn search(
314        &self,
315        query: &dyn crate::query::Query,
316        limit: usize,
317    ) -> Result<Vec<crate::query::SearchResult>> {
318        let (results, _) = self.search_with_count(query, limit).await?;
319        Ok(results)
320    }
321
322    /// Search across all segments and return (results, total_seen)
323    /// total_seen is the number of documents that were scored across all segments
324    pub async fn search_with_count(
325        &self,
326        query: &dyn crate::query::Query,
327        limit: usize,
328    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
329        self.search_with_offset_and_count(query, limit, 0).await
330    }
331
332    /// Search with offset for pagination
333    pub async fn search_with_offset(
334        &self,
335        query: &dyn crate::query::Query,
336        limit: usize,
337        offset: usize,
338    ) -> Result<Vec<crate::query::SearchResult>> {
339        let (results, _) = self
340            .search_with_offset_and_count(query, limit, offset)
341            .await?;
342        Ok(results)
343    }
344
345    /// Search with offset and return (results, total_seen)
346    pub async fn search_with_offset_and_count(
347        &self,
348        query: &dyn crate::query::Query,
349        limit: usize,
350        offset: usize,
351    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
352        self.search_internal(query, limit, offset, false).await
353    }
354
355    /// Search with positions (ordinal tracking) and return (results, total_seen)
356    ///
357    /// Use this when you need per-ordinal scores for multi-valued fields.
358    pub async fn search_with_positions(
359        &self,
360        query: &dyn crate::query::Query,
361        limit: usize,
362    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
363        self.search_internal(query, limit, 0, true).await
364    }
365
366    /// Internal search implementation
367    async fn search_internal(
368        &self,
369        query: &dyn crate::query::Query,
370        limit: usize,
371        offset: usize,
372        collect_positions: bool,
373    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
374        let fetch_limit = offset + limit;
375
376        let futures: Vec<_> = self
377            .segments
378            .iter()
379            .map(|segment| {
380                let sid = segment.meta().id;
381                async move {
382                    let (results, segment_seen) = if collect_positions {
383                        crate::query::search_segment_with_positions_and_count(
384                            segment.as_ref(),
385                            query,
386                            fetch_limit,
387                        )
388                        .await?
389                    } else {
390                        crate::query::search_segment_with_count(
391                            segment.as_ref(),
392                            query,
393                            fetch_limit,
394                        )
395                        .await?
396                    };
397                    Ok::<_, crate::error::Error>((
398                        results
399                            .into_iter()
400                            .map(move |r| (sid, r))
401                            .collect::<Vec<_>>(),
402                        segment_seen,
403                    ))
404                }
405            })
406            .collect();
407
408        let batches = futures::future::try_join_all(futures).await?;
409        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
410        let mut total_seen: u32 = 0;
411        for (batch, segment_seen) in batches {
412            total_seen += segment_seen;
413            all_results.extend(batch);
414        }
415
416        // Sort by score descending
417        all_results.sort_by(|a, b| {
418            b.1.score
419                .partial_cmp(&a.1.score)
420                .unwrap_or(std::cmp::Ordering::Equal)
421        });
422
423        // Apply offset and limit
424        let results = all_results
425            .into_iter()
426            .skip(offset)
427            .take(limit)
428            .map(|(_, result)| result)
429            .collect();
430
431        Ok((results, total_seen))
432    }
433
434    /// Two-stage search: L1 retrieval + L2 dense vector reranking
435    ///
436    /// Runs the query to get `l1_limit` candidates, then reranks by exact
437    /// dense vector distance and returns the top `final_limit` results.
438    pub async fn search_and_rerank(
439        &self,
440        query: &dyn crate::query::Query,
441        l1_limit: usize,
442        final_limit: usize,
443        config: &crate::query::RerankerConfig,
444    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
445        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
446        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
447        Ok((reranked, total_seen))
448    }
449
450    /// Parse query string and search (convenience method)
451    pub async fn query(
452        &self,
453        query_str: &str,
454        limit: usize,
455    ) -> Result<crate::query::SearchResponse> {
456        self.query_offset(query_str, limit, 0).await
457    }
458
459    /// Parse query string and search with offset (convenience method)
460    pub async fn query_offset(
461        &self,
462        query_str: &str,
463        limit: usize,
464        offset: usize,
465    ) -> Result<crate::query::SearchResponse> {
466        let parser = self.query_parser();
467        let query = parser
468            .parse(query_str)
469            .map_err(crate::error::Error::Query)?;
470
471        let fetch_limit = offset + limit;
472        let query_ref = query.as_ref();
473
474        let futures: Vec<_> = self
475            .segments
476            .iter()
477            .map(|segment| {
478                let sid = segment.meta().id;
479                async move {
480                    let results =
481                        crate::query::search_segment(segment.as_ref(), query_ref, fetch_limit)
482                            .await?;
483                    Ok::<_, crate::error::Error>(
484                        results
485                            .into_iter()
486                            .map(move |r| (sid, r))
487                            .collect::<Vec<_>>(),
488                    )
489                }
490            })
491            .collect();
492
493        let batches = futures::future::try_join_all(futures).await?;
494        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
495            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
496        for batch in batches {
497            all_results.extend(batch);
498        }
499
500        all_results.sort_by(|a, b| {
501            b.1.score
502                .partial_cmp(&a.1.score)
503                .unwrap_or(std::cmp::Ordering::Equal)
504        });
505
506        let total_hits = all_results.len() as u32;
507
508        let hits: Vec<crate::query::SearchHit> = all_results
509            .into_iter()
510            .skip(offset)
511            .take(limit)
512            .map(|(segment_id, result)| crate::query::SearchHit {
513                address: crate::query::DocAddress::new(segment_id, result.doc_id),
514                score: result.score,
515                matched_fields: result.extract_ordinals(),
516            })
517            .collect();
518
519        Ok(crate::query::SearchResponse { hits, total_hits })
520    }
521
522    /// Get query parser for this searcher
523    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
524        let query_routers = self.schema.query_routers();
525        if !query_routers.is_empty()
526            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
527        {
528            return crate::dsl::QueryLanguageParser::with_router(
529                Arc::clone(&self.schema),
530                self.default_fields.clone(),
531                Arc::clone(&self.tokenizers),
532                router,
533            );
534        }
535
536        crate::dsl::QueryLanguageParser::new(
537            Arc::clone(&self.schema),
538            self.default_fields.clone(),
539            Arc::clone(&self.tokenizers),
540        )
541    }
542
543    /// Get a document by address (segment_id + global doc_id)
544    ///
545    /// The doc_id in the address is a global doc_id (with doc_id_offset applied).
546    /// This method converts it back to a segment-local doc_id for the store lookup.
547    pub async fn get_document(
548        &self,
549        address: &crate::query::DocAddress,
550    ) -> Result<Option<crate::dsl::Document>> {
551        let segment_id = address.segment_id_u128().ok_or_else(|| {
552            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
553        })?;
554
555        for segment in &self.segments {
556            if segment.meta().id == segment_id {
557                // Convert global doc_id to segment-local doc_id
558                let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
559                return segment.doc(local_doc_id).await;
560            }
561        }
562
563        Ok(None)
564    }
565}