Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::{CoarseCentroids, PQCodebook};
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot<D>,
27    /// Phantom data for wasm builds
28    #[cfg(not(feature = "native"))]
29    _phantom: std::marker::PhantomData<D>,
30    /// Loaded segment readers
31    segments: Vec<Arc<SegmentReader>>,
32    /// Schema
33    schema: Arc<Schema>,
34    /// Default fields for search
35    default_fields: Vec<crate::Field>,
36    /// Tokenizers
37    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38    /// Trained centroids per field
39    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40    /// Trained codebooks per field
41    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
42    /// Lazy global statistics for cross-segment IDF computation
43    global_stats: Arc<LazyGlobalStats>,
44}
45
46impl<D: Directory + 'static> Searcher<D> {
47    /// Create a Searcher directly from segment IDs
48    ///
49    /// This is a simpler initialization path that doesn't require SegmentManager.
50    /// Use this for read-only access to pre-built indexes.
51    pub async fn open(
52        directory: Arc<D>,
53        schema: Arc<Schema>,
54        segment_ids: &[String],
55        term_cache_blocks: usize,
56    ) -> Result<Self> {
57        Self::create(
58            directory,
59            schema,
60            segment_ids,
61            FxHashMap::default(),
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot<D>,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
76        term_cache_blocks: usize,
77    ) -> Result<Self> {
78        let (segments, default_fields, global_stats) = Self::load_common(
79            &directory,
80            &schema,
81            snapshot.segment_ids(),
82            term_cache_blocks,
83        )
84        .await;
85
86        Ok(Self {
87            _snapshot: snapshot,
88            segments,
89            schema,
90            default_fields,
91            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
92            trained_centroids,
93            trained_codebooks,
94            global_stats,
95        })
96    }
97
98    /// Internal create method
99    async fn create(
100        directory: Arc<D>,
101        schema: Arc<Schema>,
102        segment_ids: &[String],
103        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
104        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
105        term_cache_blocks: usize,
106    ) -> Result<Self> {
107        let (segments, default_fields, global_stats) =
108            Self::load_common(&directory, &schema, segment_ids, term_cache_blocks).await;
109
110        #[cfg(feature = "native")]
111        {
112            let tracker = Arc::new(SegmentTracker::new());
113            let snapshot = SegmentSnapshot::new(tracker, directory, segment_ids.to_vec());
114            Ok(Self {
115                _snapshot: snapshot,
116                segments,
117                schema,
118                default_fields,
119                tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
120                trained_centroids,
121                trained_codebooks,
122                global_stats,
123            })
124        }
125
126        #[cfg(not(feature = "native"))]
127        {
128            let _ = directory; // suppress unused warning
129            Ok(Self {
130                _phantom: std::marker::PhantomData,
131                segments,
132                schema,
133                default_fields,
134                tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135                trained_centroids,
136                trained_codebooks,
137                global_stats,
138            })
139        }
140    }
141
142    /// Common loading logic shared by create and from_snapshot
143    async fn load_common(
144        directory: &Arc<D>,
145        schema: &Arc<Schema>,
146        segment_ids: &[String],
147        term_cache_blocks: usize,
148    ) -> (
149        Vec<Arc<SegmentReader>>,
150        Vec<crate::Field>,
151        Arc<LazyGlobalStats>,
152    ) {
153        let segments = Self::load_segments(directory, schema, segment_ids, term_cache_blocks).await;
154        let default_fields = Self::build_default_fields(schema);
155        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
156        (segments, default_fields, global_stats)
157    }
158
159    /// Load segment readers from IDs (parallel loading for performance)
160    async fn load_segments(
161        directory: &Arc<D>,
162        schema: &Arc<Schema>,
163        segment_ids: &[String],
164        term_cache_blocks: usize,
165    ) -> Vec<Arc<SegmentReader>> {
166        // Parse segment IDs and filter invalid ones
167        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
168            .iter()
169            .enumerate()
170            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
171            .collect();
172
173        // Load all segments in parallel with offset=0
174        let futures: Vec<_> =
175            valid_segments
176                .iter()
177                .map(|(_, segment_id)| {
178                    let dir = Arc::clone(directory);
179                    let sch = Arc::clone(schema);
180                    let sid = *segment_id;
181                    async move {
182                        SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
183                    }
184                })
185                .collect();
186
187        let results = futures::future::join_all(futures).await;
188
189        // Collect successful results with their original index for ordering
190        let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
191            .into_iter()
192            .zip(results)
193            .filter_map(|((idx, _), result)| match result {
194                Ok(reader) => Some((idx, reader)),
195                Err(e) => {
196                    log::warn!("Failed to open segment: {:?}", e);
197                    None
198                }
199            })
200            .collect();
201
202        // Sort by original index to maintain deterministic ordering
203        loaded.sort_by_key(|(idx, _)| *idx);
204
205        // Calculate and assign doc_id_offsets sequentially
206        let mut doc_id_offset = 0u32;
207        let mut segments = Vec::with_capacity(loaded.len());
208        for (_, mut reader) in loaded {
209            reader.set_doc_id_offset(doc_id_offset);
210            doc_id_offset += reader.meta().num_docs;
211            segments.push(Arc::new(reader));
212        }
213
214        // Log searcher loading summary
215        let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
216        let total_sparse_mem: usize = segments
217            .iter()
218            .flat_map(|s| s.sparse_indexes().values())
219            .map(|idx| idx.num_dimensions() * 12)
220            .sum();
221        log::info!(
222            "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
223            segments.len(),
224            total_docs,
225            total_sparse_mem as f64 / (1024.0 * 1024.0)
226        );
227
228        segments
229    }
230
231    /// Build default fields from schema
232    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
233        if !schema.default_fields().is_empty() {
234            schema.default_fields().to_vec()
235        } else {
236            schema
237                .fields()
238                .filter(|(_, entry)| {
239                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240                })
241                .map(|(field, _)| field)
242                .collect()
243        }
244    }
245
246    /// Get the schema
247    pub fn schema(&self) -> &Schema {
248        &self.schema
249    }
250
251    /// Get segment readers
252    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
253        &self.segments
254    }
255
256    /// Get default fields for search
257    pub fn default_fields(&self) -> &[crate::Field] {
258        &self.default_fields
259    }
260
261    /// Get tokenizer registry
262    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
263        &self.tokenizers
264    }
265
266    /// Get trained centroids
267    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
268        &self.trained_centroids
269    }
270
271    /// Get trained codebooks
272    pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
273        &self.trained_codebooks
274    }
275
276    /// Get lazy global statistics for cross-segment IDF computation
277    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
278        &self.global_stats
279    }
280
281    /// Get total document count across all segments
282    pub fn num_docs(&self) -> u32 {
283        self.segments.iter().map(|s| s.meta().num_docs).sum()
284    }
285
286    /// Get number of segments
287    pub fn num_segments(&self) -> usize {
288        self.segments.len()
289    }
290
291    /// Get a document by global doc_id
292    pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
293        let mut offset = 0u32;
294        for segment in &self.segments {
295            let segment_docs = segment.meta().num_docs;
296            if doc_id < offset + segment_docs {
297                let local_doc_id = doc_id - offset;
298                return segment.doc(local_doc_id).await;
299            }
300            offset += segment_docs;
301        }
302        Ok(None)
303    }
304
305    /// Search across all segments and return aggregated results
306    pub async fn search(
307        &self,
308        query: &dyn crate::query::Query,
309        limit: usize,
310    ) -> Result<Vec<crate::query::SearchResult>> {
311        let (results, _) = self.search_with_count(query, limit).await?;
312        Ok(results)
313    }
314
315    /// Search across all segments and return (results, total_seen)
316    /// total_seen is the number of documents that were scored across all segments
317    pub async fn search_with_count(
318        &self,
319        query: &dyn crate::query::Query,
320        limit: usize,
321    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
322        self.search_with_offset_and_count(query, limit, 0).await
323    }
324
325    /// Search with offset for pagination
326    pub async fn search_with_offset(
327        &self,
328        query: &dyn crate::query::Query,
329        limit: usize,
330        offset: usize,
331    ) -> Result<Vec<crate::query::SearchResult>> {
332        let (results, _) = self
333            .search_with_offset_and_count(query, limit, offset)
334            .await?;
335        Ok(results)
336    }
337
338    /// Search with offset and return (results, total_seen)
339    pub async fn search_with_offset_and_count(
340        &self,
341        query: &dyn crate::query::Query,
342        limit: usize,
343        offset: usize,
344    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
345        let fetch_limit = offset + limit;
346        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
347        let mut total_seen: u32 = 0;
348
349        for segment in &self.segments {
350            let segment_id = segment.meta().id;
351            let (results, segment_seen) =
352                crate::query::search_segment_with_count(segment.as_ref(), query, fetch_limit)
353                    .await?;
354            total_seen += segment_seen;
355            for result in results {
356                all_results.push((segment_id, result));
357            }
358        }
359
360        // Sort by score descending
361        all_results.sort_by(|a, b| {
362            b.1.score
363                .partial_cmp(&a.1.score)
364                .unwrap_or(std::cmp::Ordering::Equal)
365        });
366
367        // Apply offset and limit
368        let results = all_results
369            .into_iter()
370            .skip(offset)
371            .take(limit)
372            .map(|(_, result)| result)
373            .collect();
374
375        Ok((results, total_seen))
376    }
377
378    /// Parse query string and search (convenience method)
379    pub async fn query(
380        &self,
381        query_str: &str,
382        limit: usize,
383    ) -> Result<crate::query::SearchResponse> {
384        self.query_offset(query_str, limit, 0).await
385    }
386
387    /// Parse query string and search with offset (convenience method)
388    pub async fn query_offset(
389        &self,
390        query_str: &str,
391        limit: usize,
392        offset: usize,
393    ) -> Result<crate::query::SearchResponse> {
394        let parser = self.query_parser();
395        let query = parser
396            .parse(query_str)
397            .map_err(crate::error::Error::Query)?;
398
399        let fetch_limit = offset + limit;
400        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
401
402        for segment in &self.segments {
403            let segment_id = segment.meta().id;
404            let results =
405                crate::query::search_segment(segment.as_ref(), query.as_ref(), fetch_limit).await?;
406            for result in results {
407                all_results.push((segment_id, result));
408            }
409        }
410
411        all_results.sort_by(|a, b| {
412            b.1.score
413                .partial_cmp(&a.1.score)
414                .unwrap_or(std::cmp::Ordering::Equal)
415        });
416
417        let total_hits = all_results.len() as u32;
418
419        let hits: Vec<crate::query::SearchHit> = all_results
420            .into_iter()
421            .skip(offset)
422            .take(limit)
423            .map(|(segment_id, result)| crate::query::SearchHit {
424                address: crate::query::DocAddress::new(segment_id, result.doc_id),
425                score: result.score,
426                matched_fields: result.extract_ordinals(),
427            })
428            .collect();
429
430        Ok(crate::query::SearchResponse { hits, total_hits })
431    }
432
433    /// Get query parser for this searcher
434    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
435        let query_routers = self.schema.query_routers();
436        if !query_routers.is_empty()
437            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
438        {
439            return crate::dsl::QueryLanguageParser::with_router(
440                Arc::clone(&self.schema),
441                self.default_fields.clone(),
442                Arc::clone(&self.tokenizers),
443                router,
444            );
445        }
446
447        crate::dsl::QueryLanguageParser::new(
448            Arc::clone(&self.schema),
449            self.default_fields.clone(),
450            Arc::clone(&self.tokenizers),
451        )
452    }
453
454    /// Get a document by address (segment_id + doc_id)
455    pub async fn get_document(
456        &self,
457        address: &crate::query::DocAddress,
458    ) -> Result<Option<crate::dsl::Document>> {
459        let segment_id = address.segment_id_u128().ok_or_else(|| {
460            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
461        })?;
462
463        for segment in &self.segments {
464            if segment.meta().id == segment_id {
465                return segment.doc(address.doc_id).await;
466            }
467        }
468
469        Ok(None)
470    }
471}