Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot,
27    /// PhantomData for the directory generic
28    _phantom: std::marker::PhantomData<D>,
29    /// Loaded segment readers
30    segments: Vec<Arc<SegmentReader>>,
31    /// Schema
32    schema: Arc<Schema>,
33    /// Default fields for search
34    default_fields: Vec<crate::Field>,
35    /// Tokenizers
36    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
38    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39    /// Lazy global statistics for cross-segment IDF computation
40    global_stats: Arc<LazyGlobalStats>,
41    /// O(1) segment lookup by segment_id
42    segment_map: FxHashMap<u128, usize>,
43    /// Total document count across all segments
44    total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48    /// Create a Searcher directly from segment IDs
49    ///
50    /// This is a simpler initialization path that doesn't require SegmentManager.
51    /// Use this for read-only access to pre-built indexes.
52    pub async fn open(
53        directory: Arc<D>,
54        schema: Arc<Schema>,
55        segment_ids: &[String],
56        term_cache_blocks: usize,
57    ) -> Result<Self> {
58        Self::create(
59            directory,
60            schema,
61            segment_ids,
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        term_cache_blocks: usize,
76    ) -> Result<Self> {
77        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78            &directory,
79            &schema,
80            snapshot.segment_ids(),
81            &trained_centroids,
82            term_cache_blocks,
83        )
84        .await;
85
86        Ok(Self {
87            _snapshot: snapshot,
88            _phantom: std::marker::PhantomData,
89            segments,
90            schema,
91            default_fields,
92            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
93            trained_centroids,
94            global_stats,
95            segment_map,
96            total_docs,
97        })
98    }
99
100    /// Internal create method
101    async fn create(
102        directory: Arc<D>,
103        schema: Arc<Schema>,
104        segment_ids: &[String],
105        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
106        term_cache_blocks: usize,
107    ) -> Result<Self> {
108        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
109            &directory,
110            &schema,
111            segment_ids,
112            &trained_centroids,
113            term_cache_blocks,
114        )
115        .await;
116
117        #[cfg(feature = "native")]
118        let _snapshot = {
119            let tracker = Arc::new(SegmentTracker::new());
120            SegmentSnapshot::new(tracker, segment_ids.to_vec())
121        };
122
123        let _ = directory; // suppress unused warning on wasm
124        Ok(Self {
125            #[cfg(feature = "native")]
126            _snapshot,
127            _phantom: std::marker::PhantomData,
128            segments,
129            schema,
130            default_fields,
131            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
132            trained_centroids,
133            global_stats,
134            segment_map,
135            total_docs,
136        })
137    }
138
139    /// Common loading logic shared by create and from_snapshot
140    async fn load_common(
141        directory: &Arc<D>,
142        schema: &Arc<Schema>,
143        segment_ids: &[String],
144        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
145        term_cache_blocks: usize,
146    ) -> (
147        Vec<Arc<SegmentReader>>,
148        Vec<crate::Field>,
149        Arc<LazyGlobalStats>,
150        FxHashMap<u128, usize>,
151        u32,
152    ) {
153        let segments = Self::load_segments(
154            directory,
155            schema,
156            segment_ids,
157            trained_centroids,
158            term_cache_blocks,
159        )
160        .await;
161        let default_fields = Self::build_default_fields(schema);
162        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163        let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
164        (
165            segments,
166            default_fields,
167            global_stats,
168            segment_map,
169            total_docs,
170        )
171    }
172
173    /// Load segment readers from IDs (parallel loading for performance)
174    async fn load_segments(
175        directory: &Arc<D>,
176        schema: &Arc<Schema>,
177        segment_ids: &[String],
178        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
179        term_cache_blocks: usize,
180    ) -> Vec<Arc<SegmentReader>> {
181        // Parse segment IDs and filter invalid ones
182        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
183            .iter()
184            .enumerate()
185            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
186            .collect();
187
188        // Load all segments in parallel
189        let futures: Vec<_> = valid_segments
190            .iter()
191            .map(|(_, segment_id)| {
192                let dir = Arc::clone(directory);
193                let sch = Arc::clone(schema);
194                let sid = *segment_id;
195                async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
196            })
197            .collect();
198
199        let results = futures::future::join_all(futures).await;
200
201        // Collect results — fail fast if any segment fails to open
202        let mut loaded: Vec<(usize, SegmentReader)> = Vec::with_capacity(valid_segments.len());
203        for ((idx, sid), result) in valid_segments.into_iter().zip(results) {
204            match result {
205                Ok(reader) => loaded.push((idx, reader)),
206                Err(e) => {
207                    panic!(
208                        "Failed to open segment {:016x}: {:?}. \
209                         Refusing to serve with incomplete data.",
210                        sid.0, e
211                    );
212                }
213            }
214        }
215
216        // Sort by original index to maintain deterministic ordering
217        loaded.sort_by_key(|(idx, _)| *idx);
218
219        let mut segments = Vec::with_capacity(loaded.len());
220        for (_, mut reader) in loaded {
221            // Inject per-field centroids into reader for IVF/ScaNN search
222            if !trained_centroids.is_empty() {
223                reader.set_coarse_centroids(trained_centroids.clone());
224            }
225            segments.push(Arc::new(reader));
226        }
227
228        // Log searcher loading summary with per-segment memory breakdown
229        let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
230        let mut total_mem = 0usize;
231        for seg in &segments {
232            let stats = seg.memory_stats();
233            let seg_total = stats.total_bytes();
234            total_mem += seg_total;
235            log::info!(
236                "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
237                 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
238                stats.segment_id,
239                stats.num_docs,
240                seg_total as f64 / (1024.0 * 1024.0),
241                stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
242                stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
243                stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
244                stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
245                stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
246            );
247        }
248        // Log process RSS if available (helps diagnose OOM)
249        let rss_mb = process_rss_mb();
250        log::info!(
251            "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
252            segments.len(),
253            total_docs,
254            total_mem as f64 / (1024.0 * 1024.0),
255            rss_mb,
256        );
257
258        segments
259    }
260
261    /// Build default fields from schema
262    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
263        if !schema.default_fields().is_empty() {
264            schema.default_fields().to_vec()
265        } else {
266            schema
267                .fields()
268                .filter(|(_, entry)| {
269                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
270                })
271                .map(|(field, _)| field)
272                .collect()
273        }
274    }
275
276    /// Get the schema
277    pub fn schema(&self) -> &Schema {
278        &self.schema
279    }
280
281    /// Get segment readers
282    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
283        &self.segments
284    }
285
286    /// Get default fields for search
287    pub fn default_fields(&self) -> &[crate::Field] {
288        &self.default_fields
289    }
290
291    /// Get tokenizer registry
292    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
293        &self.tokenizers
294    }
295
296    /// Get trained centroids
297    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
298        &self.trained_centroids
299    }
300
301    /// Get lazy global statistics for cross-segment IDF computation
302    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
303        &self.global_stats
304    }
305
306    /// Build O(1) lookup tables from loaded segments
307    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
308        let mut segment_map = FxHashMap::default();
309        let mut total = 0u32;
310        for (i, seg) in segments.iter().enumerate() {
311            segment_map.insert(seg.meta().id, i);
312            total = total.saturating_add(seg.meta().num_docs);
313        }
314        (segment_map, total)
315    }
316
317    /// Get total document count across all segments
318    pub fn num_docs(&self) -> u32 {
319        self.total_docs
320    }
321
322    /// Get O(1) segment_id → index map (used by reranker)
323    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
324        &self.segment_map
325    }
326
327    /// Get number of segments
328    pub fn num_segments(&self) -> usize {
329        self.segments.len()
330    }
331
332    /// Get a document by (segment_id, local_doc_id)
333    pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
334        if let Some(&idx) = self.segment_map.get(&segment_id) {
335            return self.segments[idx].doc(doc_id).await;
336        }
337        Ok(None)
338    }
339
340    /// Search across all segments and return aggregated results
341    pub async fn search(
342        &self,
343        query: &dyn crate::query::Query,
344        limit: usize,
345    ) -> Result<Vec<crate::query::SearchResult>> {
346        let (results, _) = self.search_with_count(query, limit).await?;
347        Ok(results)
348    }
349
350    /// Search across all segments and return (results, total_seen)
351    /// total_seen is the number of documents that were scored across all segments
352    pub async fn search_with_count(
353        &self,
354        query: &dyn crate::query::Query,
355        limit: usize,
356    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
357        self.search_with_offset_and_count(query, limit, 0).await
358    }
359
360    /// Search with offset for pagination
361    pub async fn search_with_offset(
362        &self,
363        query: &dyn crate::query::Query,
364        limit: usize,
365        offset: usize,
366    ) -> Result<Vec<crate::query::SearchResult>> {
367        let (results, _) = self
368            .search_with_offset_and_count(query, limit, offset)
369            .await?;
370        Ok(results)
371    }
372
373    /// Search with offset and return (results, total_seen)
374    pub async fn search_with_offset_and_count(
375        &self,
376        query: &dyn crate::query::Query,
377        limit: usize,
378        offset: usize,
379    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
380        self.search_internal(query, limit, offset, false).await
381    }
382
383    /// Search with positions (ordinal tracking) and return (results, total_seen)
384    ///
385    /// Use this when you need per-ordinal scores for multi-valued fields.
386    pub async fn search_with_positions(
387        &self,
388        query: &dyn crate::query::Query,
389        limit: usize,
390    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
391        self.search_internal(query, limit, 0, true).await
392    }
393
394    /// Internal search implementation
395    async fn search_internal(
396        &self,
397        query: &dyn crate::query::Query,
398        limit: usize,
399        offset: usize,
400        collect_positions: bool,
401    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
402        let fetch_limit = offset + limit;
403
404        let futures: Vec<_> = self
405            .segments
406            .iter()
407            .map(|segment| {
408                let sid = segment.meta().id;
409                async move {
410                    let (mut results, segment_seen) = if collect_positions {
411                        crate::query::search_segment_with_positions_and_count(
412                            segment.as_ref(),
413                            query,
414                            fetch_limit,
415                        )
416                        .await?
417                    } else {
418                        crate::query::search_segment_with_count(
419                            segment.as_ref(),
420                            query,
421                            fetch_limit,
422                        )
423                        .await?
424                    };
425                    // Stamp segment_id on each result
426                    for r in &mut results {
427                        r.segment_id = sid;
428                    }
429                    Ok::<_, crate::error::Error>((results, segment_seen))
430                }
431            })
432            .collect();
433
434        let batches = futures::future::try_join_all(futures).await?;
435        let mut total_seen: u32 = 0;
436
437        // K-way merge: each batch is already sorted by score descending.
438        // Use a max-heap of (score, batch_idx, position) to merge in O(N log K).
439        use std::cmp::Ordering;
440        struct MergeEntry {
441            score: f32,
442            batch_idx: usize,
443            pos: usize,
444        }
445        impl PartialEq for MergeEntry {
446            fn eq(&self, other: &Self) -> bool {
447                self.score == other.score
448            }
449        }
450        impl Eq for MergeEntry {}
451        impl PartialOrd for MergeEntry {
452            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
453                Some(self.cmp(other))
454            }
455        }
456        impl Ord for MergeEntry {
457            fn cmp(&self, other: &Self) -> Ordering {
458                self.score
459                    .partial_cmp(&other.score)
460                    .unwrap_or(Ordering::Equal)
461            }
462        }
463
464        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
465            Vec::with_capacity(batches.len());
466        for (batch, segment_seen) in batches {
467            total_seen += segment_seen;
468            if !batch.is_empty() {
469                sorted_batches.push(batch);
470            }
471        }
472
473        let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
474        for (i, batch) in sorted_batches.iter().enumerate() {
475            heap.push(MergeEntry {
476                score: batch[0].score,
477                batch_idx: i,
478                pos: 0,
479            });
480        }
481
482        let mut results = Vec::with_capacity(fetch_limit.min(total_seen as usize));
483        let mut emitted = 0usize;
484        while let Some(entry) = heap.pop() {
485            if emitted >= fetch_limit {
486                break;
487            }
488            let batch = &sorted_batches[entry.batch_idx];
489            if emitted >= offset {
490                results.push(batch[entry.pos].clone());
491            }
492            emitted += 1;
493            let next_pos = entry.pos + 1;
494            if next_pos < batch.len() {
495                heap.push(MergeEntry {
496                    score: batch[next_pos].score,
497                    batch_idx: entry.batch_idx,
498                    pos: next_pos,
499                });
500            }
501        }
502
503        Ok((results, total_seen))
504    }
505
506    /// Two-stage search: L1 retrieval + L2 dense vector reranking
507    ///
508    /// Runs the query to get `l1_limit` candidates, then reranks by exact
509    /// dense vector distance and returns the top `final_limit` results.
510    pub async fn search_and_rerank(
511        &self,
512        query: &dyn crate::query::Query,
513        l1_limit: usize,
514        final_limit: usize,
515        config: &crate::query::RerankerConfig,
516    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
517        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
518        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
519        Ok((reranked, total_seen))
520    }
521
522    /// Parse query string and search (convenience method)
523    pub async fn query(
524        &self,
525        query_str: &str,
526        limit: usize,
527    ) -> Result<crate::query::SearchResponse> {
528        self.query_offset(query_str, limit, 0).await
529    }
530
531    /// Parse query string and search with offset (convenience method)
532    pub async fn query_offset(
533        &self,
534        query_str: &str,
535        limit: usize,
536        offset: usize,
537    ) -> Result<crate::query::SearchResponse> {
538        let parser = self.query_parser();
539        let query = parser
540            .parse(query_str)
541            .map_err(crate::error::Error::Query)?;
542
543        let (results, _total_seen) = self
544            .search_internal(query.as_ref(), limit, offset, false)
545            .await?;
546
547        let total_hits = results.len() as u32;
548        let hits: Vec<crate::query::SearchHit> = results
549            .into_iter()
550            .map(|result| crate::query::SearchHit {
551                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
552                score: result.score,
553                matched_fields: result.extract_ordinals(),
554            })
555            .collect();
556
557        Ok(crate::query::SearchResponse { hits, total_hits })
558    }
559
560    /// Get query parser for this searcher
561    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
562        let query_routers = self.schema.query_routers();
563        if !query_routers.is_empty()
564            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
565        {
566            return crate::dsl::QueryLanguageParser::with_router(
567                Arc::clone(&self.schema),
568                self.default_fields.clone(),
569                Arc::clone(&self.tokenizers),
570                router,
571            );
572        }
573
574        crate::dsl::QueryLanguageParser::new(
575            Arc::clone(&self.schema),
576            self.default_fields.clone(),
577            Arc::clone(&self.tokenizers),
578        )
579    }
580
581    /// Get a document by address (segment_id + local doc_id)
582    pub async fn get_document(
583        &self,
584        address: &crate::query::DocAddress,
585    ) -> Result<Option<crate::dsl::Document>> {
586        self.get_document_with_fields(address, None).await
587    }
588
589    /// Get a document by address, hydrating only the specified field IDs.
590    ///
591    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
592    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
593    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
594    pub async fn get_document_with_fields(
595        &self,
596        address: &crate::query::DocAddress,
597        fields: Option<&rustc_hash::FxHashSet<u32>>,
598    ) -> Result<Option<crate::dsl::Document>> {
599        let segment_id = address.segment_id_u128().ok_or_else(|| {
600            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
601        })?;
602
603        if let Some(&idx) = self.segment_map.get(&segment_id) {
604            return self.segments[idx]
605                .doc_with_fields(address.doc_id, fields)
606                .await;
607        }
608
609        Ok(None)
610    }
611}
612
613/// Get current process RSS in MB (best-effort, returns 0.0 on failure)
614fn process_rss_mb() -> f64 {
615    #[cfg(target_os = "linux")]
616    {
617        // Read from /proc/self/status — VmRSS line
618        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
619            for line in status.lines() {
620                if let Some(rest) = line.strip_prefix("VmRSS:") {
621                    let kb: f64 = rest
622                        .trim()
623                        .trim_end_matches("kB")
624                        .trim()
625                        .parse()
626                        .unwrap_or(0.0);
627                    return kb / 1024.0;
628                }
629            }
630        }
631        0.0
632    }
633    #[cfg(target_os = "macos")]
634    {
635        // Use mach_task_self / task_info via raw syscall
636        use std::mem;
637        #[repr(C)]
638        struct TaskBasicInfo {
639            virtual_size: u64,
640            resident_size: u64,
641            resident_size_max: u64,
642            user_time: [u32; 2],
643            system_time: [u32; 2],
644            policy: i32,
645            suspend_count: i32,
646        }
647        unsafe extern "C" {
648            fn mach_task_self() -> u32;
649            fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
650        }
651        const MACH_TASK_BASIC_INFO: u32 = 20;
652        let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
653        let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
654        let ret = unsafe {
655            task_info(
656                mach_task_self(),
657                MACH_TASK_BASIC_INFO,
658                &mut info,
659                &mut count,
660            )
661        };
662        if ret == 0 {
663            info.resident_size as f64 / (1024.0 * 1024.0)
664        } else {
665            0.0
666        }
667    }
668    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
669    {
670        0.0
671    }
672}