Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot,
27    /// PhantomData for the directory generic
28    _phantom: std::marker::PhantomData<D>,
29    /// Loaded segment readers
30    segments: Vec<Arc<SegmentReader>>,
31    /// Schema
32    schema: Arc<Schema>,
33    /// Default fields for search
34    default_fields: Vec<crate::Field>,
35    /// Tokenizers
36    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
38    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39    /// Lazy global statistics for cross-segment IDF computation
40    global_stats: Arc<LazyGlobalStats>,
41    /// O(1) segment lookup by segment_id
42    segment_map: FxHashMap<u128, usize>,
43    /// Total document count across all segments
44    total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48    /// Create a Searcher directly from segment IDs
49    ///
50    /// This is a simpler initialization path that doesn't require SegmentManager.
51    /// Use this for read-only access to pre-built indexes.
52    pub async fn open(
53        directory: Arc<D>,
54        schema: Arc<Schema>,
55        segment_ids: &[String],
56        term_cache_blocks: usize,
57    ) -> Result<Self> {
58        Self::create(
59            directory,
60            schema,
61            segment_ids,
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        term_cache_blocks: usize,
76    ) -> Result<Self> {
77        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78            &directory,
79            &schema,
80            snapshot.segment_ids(),
81            &trained_centroids,
82            term_cache_blocks,
83        )
84        .await;
85
86        Ok(Self {
87            _snapshot: snapshot,
88            _phantom: std::marker::PhantomData,
89            segments,
90            schema,
91            default_fields,
92            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
93            trained_centroids,
94            global_stats,
95            segment_map,
96            total_docs,
97        })
98    }
99
100    /// Internal create method
101    async fn create(
102        directory: Arc<D>,
103        schema: Arc<Schema>,
104        segment_ids: &[String],
105        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
106        term_cache_blocks: usize,
107    ) -> Result<Self> {
108        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
109            &directory,
110            &schema,
111            segment_ids,
112            &trained_centroids,
113            term_cache_blocks,
114        )
115        .await;
116
117        #[cfg(feature = "native")]
118        let _snapshot = {
119            let tracker = Arc::new(SegmentTracker::new());
120            SegmentSnapshot::new(tracker, segment_ids.to_vec())
121        };
122
123        let _ = directory; // suppress unused warning on wasm
124        Ok(Self {
125            #[cfg(feature = "native")]
126            _snapshot,
127            _phantom: std::marker::PhantomData,
128            segments,
129            schema,
130            default_fields,
131            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
132            trained_centroids,
133            global_stats,
134            segment_map,
135            total_docs,
136        })
137    }
138
139    /// Common loading logic shared by create and from_snapshot
140    async fn load_common(
141        directory: &Arc<D>,
142        schema: &Arc<Schema>,
143        segment_ids: &[String],
144        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
145        term_cache_blocks: usize,
146    ) -> (
147        Vec<Arc<SegmentReader>>,
148        Vec<crate::Field>,
149        Arc<LazyGlobalStats>,
150        FxHashMap<u128, usize>,
151        u32,
152    ) {
153        let segments = Self::load_segments(
154            directory,
155            schema,
156            segment_ids,
157            trained_centroids,
158            term_cache_blocks,
159        )
160        .await;
161        let default_fields = Self::build_default_fields(schema);
162        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163        let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
164        (
165            segments,
166            default_fields,
167            global_stats,
168            segment_map,
169            total_docs,
170        )
171    }
172
173    /// Load segment readers from IDs (parallel loading for performance)
174    async fn load_segments(
175        directory: &Arc<D>,
176        schema: &Arc<Schema>,
177        segment_ids: &[String],
178        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
179        term_cache_blocks: usize,
180    ) -> Vec<Arc<SegmentReader>> {
181        // Parse segment IDs and filter invalid ones
182        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
183            .iter()
184            .enumerate()
185            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
186            .collect();
187
188        // Load all segments in parallel
189        let futures: Vec<_> = valid_segments
190            .iter()
191            .map(|(_, segment_id)| {
192                let dir = Arc::clone(directory);
193                let sch = Arc::clone(schema);
194                let sid = *segment_id;
195                async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
196            })
197            .collect();
198
199        let results = futures::future::join_all(futures).await;
200
201        // Collect results — fail fast if any segment fails to open
202        let mut loaded: Vec<(usize, SegmentReader)> = Vec::with_capacity(valid_segments.len());
203        for ((idx, sid), result) in valid_segments.into_iter().zip(results) {
204            match result {
205                Ok(reader) => loaded.push((idx, reader)),
206                Err(e) => {
207                    panic!(
208                        "Failed to open segment {:016x}: {:?}. \
209                         Refusing to serve with incomplete data.",
210                        sid.0, e
211                    );
212                }
213            }
214        }
215
216        // Sort by original index to maintain deterministic ordering
217        loaded.sort_by_key(|(idx, _)| *idx);
218
219        let mut segments = Vec::with_capacity(loaded.len());
220        for (_, mut reader) in loaded {
221            // Inject per-field centroids into reader for IVF/ScaNN search
222            if !trained_centroids.is_empty() {
223                reader.set_coarse_centroids(trained_centroids.clone());
224            }
225            segments.push(Arc::new(reader));
226        }
227
228        // Log searcher loading summary with per-segment memory breakdown
229        let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
230        let mut total_mem = 0usize;
231        for seg in &segments {
232            let stats = seg.memory_stats();
233            let seg_total = stats.total_bytes();
234            total_mem += seg_total;
235            log::info!(
236                "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
237                 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
238                stats.segment_id,
239                stats.num_docs,
240                seg_total as f64 / (1024.0 * 1024.0),
241                stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
242                stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
243                stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
244                stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
245                stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
246            );
247        }
248        // Log process RSS if available (helps diagnose OOM)
249        let rss_mb = process_rss_mb();
250        log::info!(
251            "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
252            segments.len(),
253            total_docs,
254            total_mem as f64 / (1024.0 * 1024.0),
255            rss_mb,
256        );
257
258        segments
259    }
260
261    /// Build default fields from schema
262    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
263        if !schema.default_fields().is_empty() {
264            schema.default_fields().to_vec()
265        } else {
266            schema
267                .fields()
268                .filter(|(_, entry)| {
269                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
270                })
271                .map(|(field, _)| field)
272                .collect()
273        }
274    }
275
276    /// Get the schema
277    pub fn schema(&self) -> &Schema {
278        &self.schema
279    }
280
281    /// Get segment readers
282    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
283        &self.segments
284    }
285
286    /// Get default fields for search
287    pub fn default_fields(&self) -> &[crate::Field] {
288        &self.default_fields
289    }
290
291    /// Get tokenizer registry
292    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
293        &self.tokenizers
294    }
295
296    /// Get trained centroids
297    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
298        &self.trained_centroids
299    }
300
301    /// Get lazy global statistics for cross-segment IDF computation
302    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
303        &self.global_stats
304    }
305
306    /// Build O(1) lookup tables from loaded segments
307    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
308        let mut segment_map = FxHashMap::default();
309        let mut total = 0u32;
310        for (i, seg) in segments.iter().enumerate() {
311            segment_map.insert(seg.meta().id, i);
312            total = total.saturating_add(seg.meta().num_docs);
313        }
314        (segment_map, total)
315    }
316
317    /// Get total document count across all segments
318    pub fn num_docs(&self) -> u32 {
319        self.total_docs
320    }
321
322    /// Get O(1) segment_id → index map (used by reranker)
323    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
324        &self.segment_map
325    }
326
327    /// Get number of segments
328    pub fn num_segments(&self) -> usize {
329        self.segments.len()
330    }
331
332    /// Get a document by (segment_id, local_doc_id)
333    pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
334        if let Some(&idx) = self.segment_map.get(&segment_id) {
335            return self.segments[idx].doc(doc_id).await;
336        }
337        Ok(None)
338    }
339
340    /// Search across all segments and return aggregated results
341    pub async fn search(
342        &self,
343        query: &dyn crate::query::Query,
344        limit: usize,
345    ) -> Result<Vec<crate::query::SearchResult>> {
346        let (results, _) = self.search_with_count(query, limit).await?;
347        Ok(results)
348    }
349
350    /// Search across all segments and return (results, total_seen)
351    /// total_seen is the number of documents that were scored across all segments
352    pub async fn search_with_count(
353        &self,
354        query: &dyn crate::query::Query,
355        limit: usize,
356    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
357        self.search_with_offset_and_count(query, limit, 0).await
358    }
359
360    /// Search with offset for pagination
361    pub async fn search_with_offset(
362        &self,
363        query: &dyn crate::query::Query,
364        limit: usize,
365        offset: usize,
366    ) -> Result<Vec<crate::query::SearchResult>> {
367        let (results, _) = self
368            .search_with_offset_and_count(query, limit, offset)
369            .await?;
370        Ok(results)
371    }
372
373    /// Search with offset and return (results, total_seen)
374    pub async fn search_with_offset_and_count(
375        &self,
376        query: &dyn crate::query::Query,
377        limit: usize,
378        offset: usize,
379    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
380        self.search_internal(query, limit, offset, false).await
381    }
382
383    /// Search with positions (ordinal tracking) and return (results, total_seen)
384    ///
385    /// Use this when you need per-ordinal scores for multi-valued fields.
386    pub async fn search_with_positions(
387        &self,
388        query: &dyn crate::query::Query,
389        limit: usize,
390    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
391        self.search_internal(query, limit, 0, true).await
392    }
393
394    /// Internal search implementation
395    async fn search_internal(
396        &self,
397        query: &dyn crate::query::Query,
398        limit: usize,
399        offset: usize,
400        collect_positions: bool,
401    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
402        let fetch_limit = offset + limit;
403
404        let futures: Vec<_> = self
405            .segments
406            .iter()
407            .map(|segment| {
408                let sid = segment.meta().id;
409                async move {
410                    let (mut results, segment_seen) = if collect_positions {
411                        crate::query::search_segment_with_positions_and_count(
412                            segment.as_ref(),
413                            query,
414                            fetch_limit,
415                        )
416                        .await?
417                    } else {
418                        crate::query::search_segment_with_count(
419                            segment.as_ref(),
420                            query,
421                            fetch_limit,
422                        )
423                        .await?
424                    };
425                    // Stamp segment_id on each result
426                    for r in &mut results {
427                        r.segment_id = sid;
428                    }
429                    Ok::<_, crate::error::Error>((results, segment_seen))
430                }
431            })
432            .collect();
433
434        let batches = futures::future::try_join_all(futures).await?;
435        let mut total_seen: u32 = 0;
436
437        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
438            Vec::with_capacity(batches.len());
439        for (batch, segment_seen) in batches {
440            total_seen += segment_seen;
441            if !batch.is_empty() {
442                sorted_batches.push(batch);
443            }
444        }
445
446        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
447        Ok((results, total_seen))
448    }
449
450    /// Synchronous search across all segments using rayon for parallelism.
451    ///
452    /// This is the async-free boundary — no tokio involvement from here down.
453    #[cfg(feature = "sync")]
454    pub fn search_with_offset_and_count_sync(
455        &self,
456        query: &dyn crate::query::Query,
457        limit: usize,
458        offset: usize,
459    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
460        use rayon::prelude::*;
461
462        let fetch_limit = offset + limit;
463
464        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
465            .segments
466            .par_iter()
467            .map(|segment| {
468                let sid = segment.meta().id;
469                let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
470                    segment.as_ref(),
471                    query,
472                    fetch_limit,
473                )?;
474                for r in &mut results {
475                    r.segment_id = sid;
476                }
477                Ok((results, segment_seen))
478            })
479            .collect();
480
481        let mut total_seen: u32 = 0;
482        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
483            Vec::with_capacity(batches.len());
484        for result in batches {
485            let (batch, segment_seen) = result?;
486            total_seen += segment_seen;
487            if !batch.is_empty() {
488                sorted_batches.push(batch);
489            }
490        }
491
492        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
493        Ok((results, total_seen))
494    }
495
496    /// Two-stage search: L1 retrieval + L2 dense vector reranking
497    ///
498    /// Runs the query to get `l1_limit` candidates, then reranks by exact
499    /// dense vector distance and returns the top `final_limit` results.
500    pub async fn search_and_rerank(
501        &self,
502        query: &dyn crate::query::Query,
503        l1_limit: usize,
504        final_limit: usize,
505        config: &crate::query::RerankerConfig,
506    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
507        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
508        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
509        Ok((reranked, total_seen))
510    }
511
512    /// Parse query string and search (convenience method)
513    pub async fn query(
514        &self,
515        query_str: &str,
516        limit: usize,
517    ) -> Result<crate::query::SearchResponse> {
518        self.query_offset(query_str, limit, 0).await
519    }
520
521    /// Parse query string and search with offset (convenience method)
522    pub async fn query_offset(
523        &self,
524        query_str: &str,
525        limit: usize,
526        offset: usize,
527    ) -> Result<crate::query::SearchResponse> {
528        let parser = self.query_parser();
529        let query = parser
530            .parse(query_str)
531            .map_err(crate::error::Error::Query)?;
532
533        let (results, _total_seen) = self
534            .search_internal(query.as_ref(), limit, offset, false)
535            .await?;
536
537        let total_hits = results.len() as u32;
538        let hits: Vec<crate::query::SearchHit> = results
539            .into_iter()
540            .map(|result| crate::query::SearchHit {
541                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
542                score: result.score,
543                matched_fields: result.extract_ordinals(),
544            })
545            .collect();
546
547        Ok(crate::query::SearchResponse { hits, total_hits })
548    }
549
550    /// Get query parser for this searcher
551    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
552        let query_routers = self.schema.query_routers();
553        if !query_routers.is_empty()
554            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
555        {
556            return crate::dsl::QueryLanguageParser::with_router(
557                Arc::clone(&self.schema),
558                self.default_fields.clone(),
559                Arc::clone(&self.tokenizers),
560                router,
561            );
562        }
563
564        crate::dsl::QueryLanguageParser::new(
565            Arc::clone(&self.schema),
566            self.default_fields.clone(),
567            Arc::clone(&self.tokenizers),
568        )
569    }
570
571    /// Get a document by address (segment_id + local doc_id)
572    pub async fn get_document(
573        &self,
574        address: &crate::query::DocAddress,
575    ) -> Result<Option<crate::dsl::Document>> {
576        self.get_document_with_fields(address, None).await
577    }
578
579    /// Get a document by address, hydrating only the specified field IDs.
580    ///
581    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
582    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
583    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
584    pub async fn get_document_with_fields(
585        &self,
586        address: &crate::query::DocAddress,
587        fields: Option<&rustc_hash::FxHashSet<u32>>,
588    ) -> Result<Option<crate::dsl::Document>> {
589        let segment_id = address.segment_id_u128().ok_or_else(|| {
590            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
591        })?;
592
593        if let Some(&idx) = self.segment_map.get(&segment_id) {
594            return self.segments[idx]
595                .doc_with_fields(address.doc_id, fields)
596                .await;
597        }
598
599        Ok(None)
600    }
601}
602
603/// K-way merge of pre-sorted segment result batches.
604///
605/// Each batch is sorted by score descending. Uses a max-heap of
606/// (score, batch_idx, position) to merge in O(N log K).
607fn merge_segment_results(
608    sorted_batches: Vec<Vec<crate::query::SearchResult>>,
609    fetch_limit: usize,
610    offset: usize,
611) -> Vec<crate::query::SearchResult> {
612    use std::cmp::Ordering;
613
614    struct MergeEntry {
615        score: f32,
616        batch_idx: usize,
617        pos: usize,
618    }
619    impl PartialEq for MergeEntry {
620        fn eq(&self, other: &Self) -> bool {
621            self.score == other.score
622        }
623    }
624    impl Eq for MergeEntry {}
625    impl PartialOrd for MergeEntry {
626        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
627            Some(self.cmp(other))
628        }
629    }
630    impl Ord for MergeEntry {
631        fn cmp(&self, other: &Self) -> Ordering {
632            self.score
633                .partial_cmp(&other.score)
634                .unwrap_or(Ordering::Equal)
635        }
636    }
637
638    let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
639    for (i, batch) in sorted_batches.iter().enumerate() {
640        if !batch.is_empty() {
641            heap.push(MergeEntry {
642                score: batch[0].score,
643                batch_idx: i,
644                pos: 0,
645            });
646        }
647    }
648
649    let mut results = Vec::with_capacity(fetch_limit.min(64));
650    let mut emitted = 0usize;
651    while let Some(entry) = heap.pop() {
652        if emitted >= fetch_limit {
653            break;
654        }
655        let batch = &sorted_batches[entry.batch_idx];
656        if emitted >= offset {
657            results.push(batch[entry.pos].clone());
658        }
659        emitted += 1;
660        let next_pos = entry.pos + 1;
661        if next_pos < batch.len() {
662            heap.push(MergeEntry {
663                score: batch[next_pos].score,
664                batch_idx: entry.batch_idx,
665                pos: next_pos,
666            });
667        }
668    }
669
670    results
671}
672
673/// Get current process RSS in MB (best-effort, returns 0.0 on failure)
674fn process_rss_mb() -> f64 {
675    #[cfg(target_os = "linux")]
676    {
677        // Read from /proc/self/status — VmRSS line
678        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
679            for line in status.lines() {
680                if let Some(rest) = line.strip_prefix("VmRSS:") {
681                    let kb: f64 = rest
682                        .trim()
683                        .trim_end_matches("kB")
684                        .trim()
685                        .parse()
686                        .unwrap_or(0.0);
687                    return kb / 1024.0;
688                }
689            }
690        }
691        0.0
692    }
693    #[cfg(target_os = "macos")]
694    {
695        // Use mach_task_self / task_info via raw syscall
696        use std::mem;
697        #[repr(C)]
698        struct TaskBasicInfo {
699            virtual_size: u64,
700            resident_size: u64,
701            resident_size_max: u64,
702            user_time: [u32; 2],
703            system_time: [u32; 2],
704            policy: i32,
705            suspend_count: i32,
706        }
707        unsafe extern "C" {
708            fn mach_task_self() -> u32;
709            fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
710        }
711        const MACH_TASK_BASIC_INFO: u32 = 20;
712        let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
713        let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
714        let ret = unsafe {
715            task_info(
716                mach_task_self(),
717                MACH_TASK_BASIC_INFO,
718                &mut info,
719                &mut count,
720            )
721        };
722        if ret == 0 {
723            info.resident_size as f64 / (1024.0 * 1024.0)
724        } else {
725            0.0
726        }
727    }
728    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
729    {
730        0.0
731    }
732}