Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot,
27    /// PhantomData for the directory generic
28    _phantom: std::marker::PhantomData<D>,
29    /// Loaded segment readers
30    segments: Vec<Arc<SegmentReader>>,
31    /// Schema
32    schema: Arc<Schema>,
33    /// Default fields for search
34    default_fields: Vec<crate::Field>,
35    /// Tokenizers
36    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
38    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39    /// Lazy global statistics for cross-segment IDF computation
40    global_stats: Arc<LazyGlobalStats>,
41    /// O(1) segment lookup by segment_id
42    segment_map: FxHashMap<u128, usize>,
43    /// Total document count across all segments
44    total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48    /// Create a Searcher directly from segment IDs
49    ///
50    /// This is a simpler initialization path that doesn't require SegmentManager.
51    /// Use this for read-only access to pre-built indexes.
52    pub async fn open(
53        directory: Arc<D>,
54        schema: Arc<Schema>,
55        segment_ids: &[String],
56        term_cache_blocks: usize,
57    ) -> Result<Self> {
58        Self::create(
59            directory,
60            schema,
61            segment_ids,
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        term_cache_blocks: usize,
76    ) -> Result<Self> {
77        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78            &directory,
79            &schema,
80            snapshot.segment_ids(),
81            &trained_centroids,
82            term_cache_blocks,
83            &[],
84        )
85        .await?;
86
87        Ok(Self {
88            _snapshot: snapshot,
89            _phantom: std::marker::PhantomData,
90            segments,
91            schema,
92            default_fields,
93            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94            trained_centroids,
95            global_stats,
96            segment_map,
97            total_docs,
98        })
99    }
100
101    /// Create from a snapshot, reusing existing segment readers for unchanged segments.
102    /// This avoids re-opening mmaps, fast fields, sparse indexes, etc. for segments
103    /// that weren't touched by merge.
104    #[cfg(feature = "native")]
105    pub(crate) async fn from_snapshot_reuse(
106        directory: Arc<D>,
107        schema: Arc<Schema>,
108        snapshot: SegmentSnapshot,
109        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
110        term_cache_blocks: usize,
111        existing_segments: &[Arc<SegmentReader>],
112    ) -> Result<Self> {
113        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
114            &directory,
115            &schema,
116            snapshot.segment_ids(),
117            &trained_centroids,
118            term_cache_blocks,
119            existing_segments,
120        )
121        .await?;
122
123        Ok(Self {
124            _snapshot: snapshot,
125            _phantom: std::marker::PhantomData,
126            segments,
127            schema,
128            default_fields,
129            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
130            trained_centroids,
131            global_stats,
132            segment_map,
133            total_docs,
134        })
135    }
136
137    /// Internal create method
138    async fn create(
139        directory: Arc<D>,
140        schema: Arc<Schema>,
141        segment_ids: &[String],
142        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
143        term_cache_blocks: usize,
144    ) -> Result<Self> {
145        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
146            &directory,
147            &schema,
148            segment_ids,
149            &trained_centroids,
150            term_cache_blocks,
151            &[],
152        )
153        .await?;
154
155        #[cfg(feature = "native")]
156        let _snapshot = {
157            let tracker = Arc::new(SegmentTracker::new());
158            SegmentSnapshot::new(tracker, segment_ids.to_vec())
159        };
160
161        let _ = directory; // suppress unused warning on wasm
162        Ok(Self {
163            #[cfg(feature = "native")]
164            _snapshot,
165            _phantom: std::marker::PhantomData,
166            segments,
167            schema,
168            default_fields,
169            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
170            trained_centroids,
171            global_stats,
172            segment_map,
173            total_docs,
174        })
175    }
176
177    /// Common loading logic shared by create and from_snapshot
178    async fn load_common(
179        directory: &Arc<D>,
180        schema: &Arc<Schema>,
181        segment_ids: &[String],
182        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
183        term_cache_blocks: usize,
184        existing_segments: &[Arc<SegmentReader>],
185    ) -> Result<(
186        Vec<Arc<SegmentReader>>,
187        Vec<crate::Field>,
188        Arc<LazyGlobalStats>,
189        FxHashMap<u128, usize>,
190        u32,
191    )> {
192        let segments = Self::load_segments(
193            directory,
194            schema,
195            segment_ids,
196            trained_centroids,
197            term_cache_blocks,
198            existing_segments,
199        )
200        .await?;
201        let default_fields = Self::build_default_fields(schema);
202        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
203        let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
204        Ok((
205            segments,
206            default_fields,
207            global_stats,
208            segment_map,
209            total_docs,
210        ))
211    }
212
213    /// Load segment readers from IDs (parallel loading for performance).
214    /// Reuses existing segment readers for unchanged segments when `existing_segments`
215    /// is non-empty — avoids re-opening mmaps, fast fields, sparse indexes, etc.
216    async fn load_segments(
217        directory: &Arc<D>,
218        schema: &Arc<Schema>,
219        segment_ids: &[String],
220        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
221        term_cache_blocks: usize,
222        existing_segments: &[Arc<SegmentReader>],
223    ) -> Result<Vec<Arc<SegmentReader>>> {
224        // Build lookup from existing segment readers for reuse
225        let existing_map: FxHashMap<u128, Arc<SegmentReader>> = existing_segments
226            .iter()
227            .map(|seg| (seg.meta().id, Arc::clone(seg)))
228            .collect();
229
230        // Parse segment IDs and filter invalid ones
231        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
232            .iter()
233            .enumerate()
234            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
235            .collect();
236
237        // Separate into reusable and new segments
238        let mut reused: Vec<(usize, Arc<SegmentReader>)> = Vec::new();
239        let mut to_load: Vec<(usize, SegmentId)> = Vec::new();
240        for (idx, sid) in &valid_segments {
241            if let Some(existing) = existing_map.get(&sid.0) {
242                reused.push((*idx, Arc::clone(existing)));
243            } else {
244                to_load.push((*idx, *sid));
245            }
246        }
247
248        if !existing_segments.is_empty() {
249            log::info!(
250                "[searcher] reusing {} segment readers, loading {} new",
251                reused.len(),
252                to_load.len(),
253            );
254        }
255
256        // Load only NEW segments in parallel
257        let futures: Vec<_> = to_load
258            .iter()
259            .map(|(_, segment_id)| {
260                let dir = Arc::clone(directory);
261                let sch = Arc::clone(schema);
262                let sid = *segment_id;
263                async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
264            })
265            .collect();
266
267        let results = futures::future::join_all(futures).await;
268
269        // Collect newly loaded results — fail fast if any segment fails to open
270        let mut loaded: Vec<(usize, Arc<SegmentReader>)> = Vec::with_capacity(valid_segments.len());
271
272        // Add reused segments
273        loaded.extend(reused);
274
275        // Add newly loaded segments
276        for ((idx, sid), result) in to_load.into_iter().zip(results) {
277            match result {
278                Ok(mut reader) => {
279                    // Inject per-field centroids into reader for IVF/ScaNN search
280                    if !trained_centroids.is_empty() {
281                        reader.set_coarse_centroids(trained_centroids.clone());
282                    }
283                    loaded.push((idx, Arc::new(reader)));
284                }
285                Err(e) => {
286                    return Err(crate::error::Error::Internal(format!(
287                        "Failed to open segment {:016x}: {:?}",
288                        sid.0, e
289                    )));
290                }
291            }
292        }
293
294        // Sort by original index to maintain deterministic ordering
295        loaded.sort_by_key(|(idx, _)| *idx);
296
297        let segments: Vec<Arc<SegmentReader>> = loaded.into_iter().map(|(_, seg)| seg).collect();
298
299        // Log searcher loading summary with per-segment memory breakdown
300        let total_docs: u64 = segments.iter().map(|s| s.meta().num_docs as u64).sum();
301        let mut total_mem = 0usize;
302        for seg in &segments {
303            let stats = seg.memory_stats();
304            let seg_total = stats.total_bytes();
305            total_mem += seg_total;
306            log::info!(
307                "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
308                 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
309                stats.segment_id,
310                stats.num_docs,
311                seg_total as f64 / (1024.0 * 1024.0),
312                stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
313                stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
314                stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
315                stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
316                stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
317            );
318        }
319        // Log process RSS if available (helps diagnose OOM)
320        let rss_mb = process_rss_mb();
321        log::info!(
322            "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
323            segments.len(),
324            total_docs,
325            total_mem as f64 / (1024.0 * 1024.0),
326            rss_mb,
327        );
328
329        Ok(segments)
330    }
331
332    /// Build default fields from schema
333    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
334        if !schema.default_fields().is_empty() {
335            schema.default_fields().to_vec()
336        } else {
337            schema
338                .fields()
339                .filter(|(_, entry)| {
340                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
341                })
342                .map(|(field, _)| field)
343                .collect()
344        }
345    }
346
347    /// Get the schema
348    pub fn schema(&self) -> &Schema {
349        &self.schema
350    }
351
352    /// Get segment readers
353    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
354        &self.segments
355    }
356
357    /// Get default fields for search
358    pub fn default_fields(&self) -> &[crate::Field] {
359        &self.default_fields
360    }
361
362    /// Get tokenizer registry
363    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
364        &self.tokenizers
365    }
366
367    /// Get trained centroids
368    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
369        &self.trained_centroids
370    }
371
372    /// Get lazy global statistics for cross-segment IDF computation
373    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
374        &self.global_stats
375    }
376
377    /// Build O(1) lookup tables from loaded segments
378    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
379        let mut segment_map = FxHashMap::default();
380        let mut total = 0u32;
381        for (i, seg) in segments.iter().enumerate() {
382            segment_map.insert(seg.meta().id, i);
383            total = total.saturating_add(seg.meta().num_docs);
384        }
385        (segment_map, total)
386    }
387
388    /// Get total document count across all segments
389    pub fn num_docs(&self) -> u32 {
390        self.total_docs
391    }
392
393    /// Get O(1) segment_id → index map (used by reranker)
394    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
395        &self.segment_map
396    }
397
398    /// Get number of segments
399    pub fn num_segments(&self) -> usize {
400        self.segments.len()
401    }
402
403    /// Get a document by (segment_id, local_doc_id)
404    pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
405        if let Some(&idx) = self.segment_map.get(&segment_id) {
406            return self.segments[idx].doc(doc_id).await;
407        }
408        Ok(None)
409    }
410
411    /// Search across all segments and return aggregated results
412    pub async fn search(
413        &self,
414        query: &dyn crate::query::Query,
415        limit: usize,
416    ) -> Result<Vec<crate::query::SearchResult>> {
417        let (results, _) = self.search_with_count(query, limit).await?;
418        Ok(results)
419    }
420
421    /// Search across all segments and return (results, total_seen)
422    /// total_seen is the number of documents that were scored across all segments
423    pub async fn search_with_count(
424        &self,
425        query: &dyn crate::query::Query,
426        limit: usize,
427    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
428        self.search_with_offset_and_count(query, limit, 0).await
429    }
430
431    /// Search with offset for pagination
432    pub async fn search_with_offset(
433        &self,
434        query: &dyn crate::query::Query,
435        limit: usize,
436        offset: usize,
437    ) -> Result<Vec<crate::query::SearchResult>> {
438        let (results, _) = self
439            .search_with_offset_and_count(query, limit, offset)
440            .await?;
441        Ok(results)
442    }
443
444    /// Search with offset and return (results, total_seen)
445    pub async fn search_with_offset_and_count(
446        &self,
447        query: &dyn crate::query::Query,
448        limit: usize,
449        offset: usize,
450    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
451        self.search_internal(query, limit, offset, false).await
452    }
453
454    /// Search with positions (ordinal tracking) and return (results, total_seen)
455    ///
456    /// Use this when you need per-ordinal scores for multi-valued fields.
457    pub async fn search_with_positions(
458        &self,
459        query: &dyn crate::query::Query,
460        limit: usize,
461    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
462        self.search_internal(query, limit, 0, true).await
463    }
464
465    /// Internal search implementation
466    async fn search_internal(
467        &self,
468        query: &dyn crate::query::Query,
469        limit: usize,
470        offset: usize,
471        collect_positions: bool,
472    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
473        let fetch_limit = offset + limit;
474
475        // Multi-segment: use rayon for true CPU parallelism (sync feature required).
476        // Only works on multi-threaded tokio runtime (block_in_place panics on current_thread).
477        #[cfg(feature = "sync")]
478        if self.segments.len() > 1
479            && tokio::runtime::Handle::current().runtime_flavor()
480                == tokio::runtime::RuntimeFlavor::MultiThread
481        {
482            return self.search_internal_parallel(query, fetch_limit, offset, collect_positions);
483        }
484
485        // Single segment or no sync feature: use async path
486        let futures: Vec<_> = self
487            .segments
488            .iter()
489            .map(|segment| {
490                let sid = segment.meta().id;
491                async move {
492                    let (mut results, segment_seen) = if collect_positions {
493                        crate::query::search_segment_with_positions_and_count(
494                            segment.as_ref(),
495                            query,
496                            fetch_limit,
497                        )
498                        .await?
499                    } else {
500                        crate::query::search_segment_with_count(
501                            segment.as_ref(),
502                            query,
503                            fetch_limit,
504                        )
505                        .await?
506                    };
507                    // Stamp segment_id on each result
508                    for r in &mut results {
509                        r.segment_id = sid;
510                    }
511                    Ok::<_, crate::error::Error>((results, segment_seen))
512                }
513            })
514            .collect();
515
516        let batches = futures::future::try_join_all(futures).await?;
517        let mut total_seen: u32 = 0;
518
519        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
520            Vec::with_capacity(batches.len());
521        for (batch, segment_seen) in batches {
522            total_seen = total_seen.saturating_add(segment_seen);
523            if !batch.is_empty() {
524                sorted_batches.push(batch);
525            }
526        }
527
528        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
529        Ok((results, total_seen))
530    }
531
532    /// Multi-segment parallel search using rayon (CPU-bound scoring on thread pool).
533    ///
534    /// `block_in_place` tells tokio this worker is occupied so it can steal tasks.
535    /// `rayon::par_iter` distributes segment scoring across the rayon thread pool.
536    #[cfg(feature = "sync")]
537    fn search_internal_parallel(
538        &self,
539        query: &dyn crate::query::Query,
540        fetch_limit: usize,
541        offset: usize,
542        collect_positions: bool,
543    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
544        use rayon::prelude::*;
545
546        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> =
547            tokio::task::block_in_place(|| {
548                self.segments
549                    .par_iter()
550                    .map(|segment| {
551                        let sid = segment.meta().id;
552                        let (mut results, segment_seen) = if collect_positions {
553                            crate::query::search_segment_with_positions_and_count_sync(
554                                segment.as_ref(),
555                                query,
556                                fetch_limit,
557                            )?
558                        } else {
559                            crate::query::search_segment_with_count_sync(
560                                segment.as_ref(),
561                                query,
562                                fetch_limit,
563                            )?
564                        };
565                        for r in &mut results {
566                            r.segment_id = sid;
567                        }
568                        Ok((results, segment_seen))
569                    })
570                    .collect()
571            });
572
573        let mut total_seen: u32 = 0;
574        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
575            Vec::with_capacity(batches.len());
576        for result in batches {
577            let (batch, segment_seen) = result?;
578            total_seen = total_seen.saturating_add(segment_seen);
579            if !batch.is_empty() {
580                sorted_batches.push(batch);
581            }
582        }
583
584        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
585        Ok((results, total_seen))
586    }
587
588    /// Synchronous search across all segments using rayon for parallelism.
589    ///
590    /// This is the async-free boundary — no tokio involvement from here down.
591    #[cfg(feature = "sync")]
592    pub fn search_with_offset_and_count_sync(
593        &self,
594        query: &dyn crate::query::Query,
595        limit: usize,
596        offset: usize,
597    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
598        use rayon::prelude::*;
599
600        let fetch_limit = offset + limit;
601
602        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
603            .segments
604            .par_iter()
605            .map(|segment| {
606                let sid = segment.meta().id;
607                let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
608                    segment.as_ref(),
609                    query,
610                    fetch_limit,
611                )?;
612                for r in &mut results {
613                    r.segment_id = sid;
614                }
615                Ok((results, segment_seen))
616            })
617            .collect();
618
619        let mut total_seen: u32 = 0;
620        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
621            Vec::with_capacity(batches.len());
622        for result in batches {
623            let (batch, segment_seen) = result?;
624            total_seen = total_seen.saturating_add(segment_seen);
625            if !batch.is_empty() {
626                sorted_batches.push(batch);
627            }
628        }
629
630        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
631        Ok((results, total_seen))
632    }
633
634    /// Two-stage search: L1 retrieval + L2 dense vector reranking
635    ///
636    /// Runs the query to get `l1_limit` candidates, then reranks by exact
637    /// dense vector distance and returns the top `final_limit` results.
638    pub async fn search_and_rerank(
639        &self,
640        query: &dyn crate::query::Query,
641        l1_limit: usize,
642        final_limit: usize,
643        config: &crate::query::RerankerConfig,
644    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
645        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
646        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
647        Ok((reranked, total_seen))
648    }
649
650    /// Parse query string and search (convenience method)
651    pub async fn query(
652        &self,
653        query_str: &str,
654        limit: usize,
655    ) -> Result<crate::query::SearchResponse> {
656        self.query_offset(query_str, limit, 0).await
657    }
658
659    /// Parse query string and search with offset (convenience method)
660    pub async fn query_offset(
661        &self,
662        query_str: &str,
663        limit: usize,
664        offset: usize,
665    ) -> Result<crate::query::SearchResponse> {
666        let parser = self.query_parser();
667        let query = parser
668            .parse(query_str)
669            .map_err(crate::error::Error::Query)?;
670
671        let (results, _total_seen) = self
672            .search_internal(query.as_ref(), limit, offset, false)
673            .await?;
674
675        let total_hits = results.len() as u32;
676        let hits: Vec<crate::query::SearchHit> = results
677            .into_iter()
678            .map(|result| crate::query::SearchHit {
679                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
680                score: result.score,
681                matched_fields: result.extract_ordinals(),
682            })
683            .collect();
684
685        Ok(crate::query::SearchResponse { hits, total_hits })
686    }
687
688    /// Get query parser for this searcher
689    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
690        let query_routers = self.schema.query_routers();
691        if !query_routers.is_empty()
692            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
693        {
694            return crate::dsl::QueryLanguageParser::with_router(
695                Arc::clone(&self.schema),
696                self.default_fields.clone(),
697                Arc::clone(&self.tokenizers),
698                router,
699            );
700        }
701
702        crate::dsl::QueryLanguageParser::new(
703            Arc::clone(&self.schema),
704            self.default_fields.clone(),
705            Arc::clone(&self.tokenizers),
706        )
707    }
708
709    /// Get a document by address (segment_id + local doc_id)
710    pub async fn get_document(
711        &self,
712        address: &crate::query::DocAddress,
713    ) -> Result<Option<crate::dsl::Document>> {
714        self.get_document_with_fields(address, None).await
715    }
716
717    /// Get a document by address, hydrating only the specified field IDs.
718    ///
719    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
720    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
721    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
722    pub async fn get_document_with_fields(
723        &self,
724        address: &crate::query::DocAddress,
725        fields: Option<&rustc_hash::FxHashSet<u32>>,
726    ) -> Result<Option<crate::dsl::Document>> {
727        let segment_id = address.segment_id_u128().ok_or_else(|| {
728            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id()))
729        })?;
730
731        if let Some(&idx) = self.segment_map.get(&segment_id) {
732            return self.segments[idx]
733                .doc_with_fields(address.doc_id, fields)
734                .await;
735        }
736
737        Ok(None)
738    }
739}
740
741/// K-way merge of pre-sorted segment result batches.
742///
743/// Each batch is sorted by score descending. Uses a max-heap of
744/// (score, batch_idx, position) to merge in O(N log K).
745fn merge_segment_results(
746    sorted_batches: Vec<Vec<crate::query::SearchResult>>,
747    fetch_limit: usize,
748    offset: usize,
749) -> Vec<crate::query::SearchResult> {
750    use std::cmp::Ordering;
751
752    struct MergeEntry {
753        score: f32,
754        batch_idx: usize,
755        pos: usize,
756    }
757    impl PartialEq for MergeEntry {
758        fn eq(&self, other: &Self) -> bool {
759            self.score == other.score
760        }
761    }
762    impl Eq for MergeEntry {}
763    impl PartialOrd for MergeEntry {
764        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
765            Some(self.cmp(other))
766        }
767    }
768    impl Ord for MergeEntry {
769        fn cmp(&self, other: &Self) -> Ordering {
770            self.score
771                .partial_cmp(&other.score)
772                .unwrap_or(Ordering::Equal)
773        }
774    }
775
776    let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
777    for (i, batch) in sorted_batches.iter().enumerate() {
778        if !batch.is_empty() {
779            heap.push(MergeEntry {
780                score: batch[0].score,
781                batch_idx: i,
782                pos: 0,
783            });
784        }
785    }
786
787    let mut results = Vec::with_capacity(fetch_limit.min(64));
788    let mut emitted = 0usize;
789    while let Some(entry) = heap.pop() {
790        if emitted >= fetch_limit {
791            break;
792        }
793        let batch = &sorted_batches[entry.batch_idx];
794        if emitted >= offset {
795            results.push(batch[entry.pos].clone());
796        }
797        emitted += 1;
798        let next_pos = entry.pos + 1;
799        if next_pos < batch.len() {
800            heap.push(MergeEntry {
801                score: batch[next_pos].score,
802                batch_idx: entry.batch_idx,
803                pos: next_pos,
804            });
805        }
806    }
807
808    results
809}
810
811/// Get current process RSS in MB (best-effort, returns 0.0 on failure)
812fn process_rss_mb() -> f64 {
813    #[cfg(target_os = "linux")]
814    {
815        // Read from /proc/self/status — VmRSS line
816        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
817            for line in status.lines() {
818                if let Some(rest) = line.strip_prefix("VmRSS:") {
819                    let kb: f64 = rest
820                        .trim()
821                        .trim_end_matches("kB")
822                        .trim()
823                        .parse()
824                        .unwrap_or(0.0);
825                    return kb / 1024.0;
826                }
827            }
828        }
829        0.0
830    }
831    #[cfg(target_os = "macos")]
832    {
833        // Use mach_task_self / task_info via raw syscall
834        use std::mem;
835        #[repr(C)]
836        struct TaskBasicInfo {
837            virtual_size: u64,
838            resident_size: u64,
839            resident_size_max: u64,
840            user_time: [u32; 2],
841            system_time: [u32; 2],
842            policy: i32,
843            suspend_count: i32,
844        }
845        unsafe extern "C" {
846            fn mach_task_self() -> u32;
847            fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
848        }
849        const MACH_TASK_BASIC_INFO: u32 = 20;
850        let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
851        let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
852        let ret = unsafe {
853            task_info(
854                mach_task_self(),
855                MACH_TASK_BASIC_INFO,
856                &mut info,
857                &mut count,
858            )
859        };
860        if ret == 0 {
861            info.resident_size as f64 / (1024.0 * 1024.0)
862        } else {
863            0.0
864        }
865    }
866    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
867    {
868        0.0
869    }
870}