Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot,
27    /// PhantomData for the directory generic
28    _phantom: std::marker::PhantomData<D>,
29    /// Loaded segment readers
30    segments: Vec<Arc<SegmentReader>>,
31    /// Schema
32    schema: Arc<Schema>,
33    /// Default fields for search
34    default_fields: Vec<crate::Field>,
35    /// Tokenizers
36    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
38    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39    /// Lazy global statistics for cross-segment IDF computation
40    global_stats: Arc<LazyGlobalStats>,
41    /// O(1) segment lookup by segment_id
42    segment_map: FxHashMap<u128, usize>,
43    /// Total document count across all segments
44    total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48    /// Create a Searcher directly from segment IDs
49    ///
50    /// This is a simpler initialization path that doesn't require SegmentManager.
51    /// Use this for read-only access to pre-built indexes.
52    pub async fn open(
53        directory: Arc<D>,
54        schema: Arc<Schema>,
55        segment_ids: &[String],
56        term_cache_blocks: usize,
57    ) -> Result<Self> {
58        Self::create(
59            directory,
60            schema,
61            segment_ids,
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        term_cache_blocks: usize,
76    ) -> Result<Self> {
77        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78            &directory,
79            &schema,
80            snapshot.segment_ids(),
81            &trained_centroids,
82            term_cache_blocks,
83            &[],
84        )
85        .await?;
86
87        Ok(Self {
88            _snapshot: snapshot,
89            _phantom: std::marker::PhantomData,
90            segments,
91            schema,
92            default_fields,
93            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94            trained_centroids,
95            global_stats,
96            segment_map,
97            total_docs,
98        })
99    }
100
101    /// Create from a snapshot, reusing existing segment readers for unchanged segments.
102    /// This avoids re-opening mmaps, fast fields, sparse indexes, etc. for segments
103    /// that weren't touched by merge.
104    #[cfg(feature = "native")]
105    pub(crate) async fn from_snapshot_reuse(
106        directory: Arc<D>,
107        schema: Arc<Schema>,
108        snapshot: SegmentSnapshot,
109        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
110        term_cache_blocks: usize,
111        existing_segments: &[Arc<SegmentReader>],
112    ) -> Result<Self> {
113        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
114            &directory,
115            &schema,
116            snapshot.segment_ids(),
117            &trained_centroids,
118            term_cache_blocks,
119            existing_segments,
120        )
121        .await?;
122
123        Ok(Self {
124            _snapshot: snapshot,
125            _phantom: std::marker::PhantomData,
126            segments,
127            schema,
128            default_fields,
129            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
130            trained_centroids,
131            global_stats,
132            segment_map,
133            total_docs,
134        })
135    }
136
137    /// Internal create method
138    async fn create(
139        directory: Arc<D>,
140        schema: Arc<Schema>,
141        segment_ids: &[String],
142        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
143        term_cache_blocks: usize,
144    ) -> Result<Self> {
145        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
146            &directory,
147            &schema,
148            segment_ids,
149            &trained_centroids,
150            term_cache_blocks,
151            &[],
152        )
153        .await?;
154
155        #[cfg(feature = "native")]
156        let _snapshot = {
157            let tracker = Arc::new(SegmentTracker::new());
158            SegmentSnapshot::new(tracker, segment_ids.to_vec())
159        };
160
161        let _ = directory; // suppress unused warning on wasm
162        Ok(Self {
163            #[cfg(feature = "native")]
164            _snapshot,
165            _phantom: std::marker::PhantomData,
166            segments,
167            schema,
168            default_fields,
169            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
170            trained_centroids,
171            global_stats,
172            segment_map,
173            total_docs,
174        })
175    }
176
177    /// Common loading logic shared by create and from_snapshot
178    async fn load_common(
179        directory: &Arc<D>,
180        schema: &Arc<Schema>,
181        segment_ids: &[String],
182        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
183        term_cache_blocks: usize,
184        existing_segments: &[Arc<SegmentReader>],
185    ) -> Result<(
186        Vec<Arc<SegmentReader>>,
187        Vec<crate::Field>,
188        Arc<LazyGlobalStats>,
189        FxHashMap<u128, usize>,
190        u32,
191    )> {
192        let segments = Self::load_segments(
193            directory,
194            schema,
195            segment_ids,
196            trained_centroids,
197            term_cache_blocks,
198            existing_segments,
199        )
200        .await?;
201        let default_fields = Self::build_default_fields(schema);
202        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
203        let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
204        Ok((
205            segments,
206            default_fields,
207            global_stats,
208            segment_map,
209            total_docs,
210        ))
211    }
212
213    /// Load segment readers from IDs (parallel loading for performance).
214    /// Reuses existing segment readers for unchanged segments when `existing_segments`
215    /// is non-empty — avoids re-opening mmaps, fast fields, sparse indexes, etc.
216    async fn load_segments(
217        directory: &Arc<D>,
218        schema: &Arc<Schema>,
219        segment_ids: &[String],
220        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
221        term_cache_blocks: usize,
222        existing_segments: &[Arc<SegmentReader>],
223    ) -> Result<Vec<Arc<SegmentReader>>> {
224        // Build lookup from existing segment readers for reuse
225        let existing_map: FxHashMap<u128, Arc<SegmentReader>> = existing_segments
226            .iter()
227            .map(|seg| (seg.meta().id, Arc::clone(seg)))
228            .collect();
229
230        // Parse segment IDs and filter invalid ones
231        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
232            .iter()
233            .enumerate()
234            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
235            .collect();
236
237        // Separate into reusable and new segments
238        let mut reused: Vec<(usize, Arc<SegmentReader>)> = Vec::new();
239        let mut to_load: Vec<(usize, SegmentId)> = Vec::new();
240        for (idx, sid) in &valid_segments {
241            if let Some(existing) = existing_map.get(&sid.0) {
242                reused.push((*idx, Arc::clone(existing)));
243            } else {
244                to_load.push((*idx, *sid));
245            }
246        }
247
248        if !existing_segments.is_empty() {
249            log::info!(
250                "[searcher] reusing {} segment readers, loading {} new",
251                reused.len(),
252                to_load.len(),
253            );
254        }
255
256        // Load only NEW segments in parallel
257        let futures: Vec<_> = to_load
258            .iter()
259            .map(|(_, segment_id)| {
260                let dir = Arc::clone(directory);
261                let sch = Arc::clone(schema);
262                let sid = *segment_id;
263                async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
264            })
265            .collect();
266
267        let results = futures::future::join_all(futures).await;
268
269        // Collect newly loaded results — fail fast if any segment fails to open
270        let mut loaded: Vec<(usize, Arc<SegmentReader>)> = Vec::with_capacity(valid_segments.len());
271
272        // Add reused segments
273        loaded.extend(reused);
274
275        // Add newly loaded segments
276        for ((idx, sid), result) in to_load.into_iter().zip(results) {
277            match result {
278                Ok(mut reader) => {
279                    // Inject per-field centroids into reader for IVF/ScaNN search
280                    if !trained_centroids.is_empty() {
281                        reader.set_coarse_centroids(trained_centroids.clone());
282                    }
283                    loaded.push((idx, Arc::new(reader)));
284                }
285                Err(e) => {
286                    return Err(crate::error::Error::Internal(format!(
287                        "Failed to open segment {:016x}: {:?}",
288                        sid.0, e
289                    )));
290                }
291            }
292        }
293
294        // Sort by original index to maintain deterministic ordering
295        loaded.sort_by_key(|(idx, _)| *idx);
296
297        let segments: Vec<Arc<SegmentReader>> = loaded.into_iter().map(|(_, seg)| seg).collect();
298
299        // Log searcher loading summary with per-segment memory breakdown
300        let total_docs: u64 = segments.iter().map(|s| s.meta().num_docs as u64).sum();
301        let mut total_mem = 0usize;
302        for seg in &segments {
303            let stats = seg.memory_stats();
304            let seg_total = stats.total_bytes();
305            total_mem += seg_total;
306            log::info!(
307                "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
308                 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
309                stats.segment_id,
310                stats.num_docs,
311                seg_total as f64 / (1024.0 * 1024.0),
312                stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
313                stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
314                stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
315                stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
316                stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
317            );
318        }
319        // Log process RSS if available (helps diagnose OOM)
320        let rss_mb = process_rss_mb();
321        log::info!(
322            "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
323            segments.len(),
324            total_docs,
325            total_mem as f64 / (1024.0 * 1024.0),
326            rss_mb,
327        );
328
329        Ok(segments)
330    }
331
332    /// Build default fields from schema
333    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
334        if !schema.default_fields().is_empty() {
335            schema.default_fields().to_vec()
336        } else {
337            schema
338                .fields()
339                .filter(|(_, entry)| {
340                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
341                })
342                .map(|(field, _)| field)
343                .collect()
344        }
345    }
346
347    /// Get the schema
348    pub fn schema(&self) -> &Schema {
349        &self.schema
350    }
351
352    /// Get segment readers
353    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
354        &self.segments
355    }
356
357    /// Get default fields for search
358    pub fn default_fields(&self) -> &[crate::Field] {
359        &self.default_fields
360    }
361
362    /// Get tokenizer registry
363    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
364        &self.tokenizers
365    }
366
367    /// Get trained centroids
368    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
369        &self.trained_centroids
370    }
371
372    /// Get lazy global statistics for cross-segment IDF computation
373    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
374        &self.global_stats
375    }
376
377    /// Build O(1) lookup tables from loaded segments
378    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
379        let mut segment_map = FxHashMap::default();
380        let mut total = 0u32;
381        for (i, seg) in segments.iter().enumerate() {
382            segment_map.insert(seg.meta().id, i);
383            total = total.saturating_add(seg.meta().num_docs);
384        }
385        (segment_map, total)
386    }
387
388    /// Get total document count across all segments
389    pub fn num_docs(&self) -> u32 {
390        self.total_docs
391    }
392
393    /// Get O(1) segment_id → index map (used by reranker)
394    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
395        &self.segment_map
396    }
397
398    /// Get number of segments
399    pub fn num_segments(&self) -> usize {
400        self.segments.len()
401    }
402
403    /// Get a document by (segment_id, local_doc_id)
404    pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
405        if let Some(&idx) = self.segment_map.get(&segment_id) {
406            return self.segments[idx].doc(doc_id).await;
407        }
408        Ok(None)
409    }
410
411    /// Search across all segments and return aggregated results
412    pub async fn search(
413        &self,
414        query: &dyn crate::query::Query,
415        limit: usize,
416    ) -> Result<Vec<crate::query::SearchResult>> {
417        let (results, _) = self.search_with_count(query, limit).await?;
418        Ok(results)
419    }
420
421    /// Search across all segments and return (results, total_seen)
422    /// total_seen is the number of documents that were scored across all segments
423    pub async fn search_with_count(
424        &self,
425        query: &dyn crate::query::Query,
426        limit: usize,
427    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
428        self.search_with_offset_and_count(query, limit, 0).await
429    }
430
431    /// Search with offset for pagination
432    pub async fn search_with_offset(
433        &self,
434        query: &dyn crate::query::Query,
435        limit: usize,
436        offset: usize,
437    ) -> Result<Vec<crate::query::SearchResult>> {
438        let (results, _) = self
439            .search_with_offset_and_count(query, limit, offset)
440            .await?;
441        Ok(results)
442    }
443
444    /// Search with offset and return (results, total_seen)
445    pub async fn search_with_offset_and_count(
446        &self,
447        query: &dyn crate::query::Query,
448        limit: usize,
449        offset: usize,
450    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
451        self.search_internal(query, limit, offset, false).await
452    }
453
454    /// Search with positions (ordinal tracking) and return (results, total_seen)
455    ///
456    /// Use this when you need per-ordinal scores for multi-valued fields.
457    pub async fn search_with_positions(
458        &self,
459        query: &dyn crate::query::Query,
460        limit: usize,
461    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
462        self.search_internal(query, limit, 0, true).await
463    }
464
465    /// Internal search implementation
466    async fn search_internal(
467        &self,
468        query: &dyn crate::query::Query,
469        limit: usize,
470        offset: usize,
471        collect_positions: bool,
472    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
473        let fetch_limit = offset + limit;
474
475        // Use rayon + block_in_place for CPU-bound scoring (sync feature required).
476        // Offloads the scoring loop from tokio workers so search doesn't starve
477        // other async tasks. Works for any segment count (rayon degrades gracefully
478        // to inline execution for a single segment).
479        // Only works on multi-threaded tokio runtime (block_in_place panics on current_thread).
480        #[cfg(feature = "sync")]
481        if !self.segments.is_empty()
482            && tokio::runtime::Handle::current().runtime_flavor()
483                == tokio::runtime::RuntimeFlavor::MultiThread
484        {
485            return self.search_internal_parallel(query, fetch_limit, offset, collect_positions);
486        }
487
488        // No segments, no sync feature, or current_thread runtime: use async path
489        let futures: Vec<_> = self
490            .segments
491            .iter()
492            .map(|segment| {
493                let sid = segment.meta().id;
494                async move {
495                    let (mut results, segment_seen) = if collect_positions {
496                        crate::query::search_segment_with_positions_and_count(
497                            segment.as_ref(),
498                            query,
499                            fetch_limit,
500                        )
501                        .await?
502                    } else {
503                        crate::query::search_segment_with_count(
504                            segment.as_ref(),
505                            query,
506                            fetch_limit,
507                        )
508                        .await?
509                    };
510                    // Stamp segment_id on each result
511                    for r in &mut results {
512                        r.segment_id = sid;
513                    }
514                    Ok::<_, crate::error::Error>((results, segment_seen))
515                }
516            })
517            .collect();
518
519        let batches = futures::future::try_join_all(futures).await?;
520        let mut total_seen: u32 = 0;
521
522        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
523            Vec::with_capacity(batches.len());
524        for (batch, segment_seen) in batches {
525            total_seen = total_seen.saturating_add(segment_seen);
526            if !batch.is_empty() {
527                sorted_batches.push(batch);
528            }
529        }
530
531        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
532        Ok((results, total_seen))
533    }
534
535    /// Multi-segment parallel search using rayon (CPU-bound scoring on thread pool).
536    ///
537    /// `block_in_place` tells tokio this worker is occupied so it can steal tasks.
538    /// `rayon::par_iter` distributes segment scoring across the rayon thread pool.
539    #[cfg(feature = "sync")]
540    fn search_internal_parallel(
541        &self,
542        query: &dyn crate::query::Query,
543        fetch_limit: usize,
544        offset: usize,
545        collect_positions: bool,
546    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
547        use rayon::prelude::*;
548
549        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> =
550            tokio::task::block_in_place(|| {
551                self.segments
552                    .par_iter()
553                    .map(|segment| {
554                        let sid = segment.meta().id;
555                        let (mut results, segment_seen) = if collect_positions {
556                            crate::query::search_segment_with_positions_and_count_sync(
557                                segment.as_ref(),
558                                query,
559                                fetch_limit,
560                            )?
561                        } else {
562                            crate::query::search_segment_with_count_sync(
563                                segment.as_ref(),
564                                query,
565                                fetch_limit,
566                            )?
567                        };
568                        for r in &mut results {
569                            r.segment_id = sid;
570                        }
571                        Ok((results, segment_seen))
572                    })
573                    .collect()
574            });
575
576        let mut total_seen: u32 = 0;
577        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
578            Vec::with_capacity(batches.len());
579        for result in batches {
580            let (batch, segment_seen) = result?;
581            total_seen = total_seen.saturating_add(segment_seen);
582            if !batch.is_empty() {
583                sorted_batches.push(batch);
584            }
585        }
586
587        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
588        Ok((results, total_seen))
589    }
590
591    /// Synchronous search across all segments using rayon for parallelism.
592    ///
593    /// This is the async-free boundary — no tokio involvement from here down.
594    #[cfg(feature = "sync")]
595    pub fn search_with_offset_and_count_sync(
596        &self,
597        query: &dyn crate::query::Query,
598        limit: usize,
599        offset: usize,
600    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
601        use rayon::prelude::*;
602
603        let fetch_limit = offset + limit;
604
605        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
606            .segments
607            .par_iter()
608            .map(|segment| {
609                let sid = segment.meta().id;
610                let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
611                    segment.as_ref(),
612                    query,
613                    fetch_limit,
614                )?;
615                for r in &mut results {
616                    r.segment_id = sid;
617                }
618                Ok((results, segment_seen))
619            })
620            .collect();
621
622        let mut total_seen: u32 = 0;
623        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
624            Vec::with_capacity(batches.len());
625        for result in batches {
626            let (batch, segment_seen) = result?;
627            total_seen = total_seen.saturating_add(segment_seen);
628            if !batch.is_empty() {
629                sorted_batches.push(batch);
630            }
631        }
632
633        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
634        Ok((results, total_seen))
635    }
636
637    /// Two-stage search: L1 retrieval + L2 dense vector reranking
638    ///
639    /// Runs the query to get `l1_limit` candidates, then reranks by exact
640    /// dense vector distance and returns the top `final_limit` results.
641    pub async fn search_and_rerank(
642        &self,
643        query: &dyn crate::query::Query,
644        l1_limit: usize,
645        final_limit: usize,
646        config: &crate::query::RerankerConfig,
647    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
648        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
649        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
650        Ok((reranked, total_seen))
651    }
652
653    /// Parse query string and search (convenience method)
654    pub async fn query(
655        &self,
656        query_str: &str,
657        limit: usize,
658    ) -> Result<crate::query::SearchResponse> {
659        self.query_offset(query_str, limit, 0).await
660    }
661
662    /// Parse query string and search with offset (convenience method)
663    pub async fn query_offset(
664        &self,
665        query_str: &str,
666        limit: usize,
667        offset: usize,
668    ) -> Result<crate::query::SearchResponse> {
669        let parser = self.query_parser();
670        let query = parser
671            .parse(query_str)
672            .map_err(crate::error::Error::Query)?;
673
674        let (results, _total_seen) = self
675            .search_internal(query.as_ref(), limit, offset, false)
676            .await?;
677
678        let total_hits = results.len() as u32;
679        let hits: Vec<crate::query::SearchHit> = results
680            .into_iter()
681            .map(|result| crate::query::SearchHit {
682                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
683                score: result.score,
684                matched_fields: result.extract_ordinals(),
685            })
686            .collect();
687
688        Ok(crate::query::SearchResponse { hits, total_hits })
689    }
690
691    /// Get query parser for this searcher
692    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
693        let query_routers = self.schema.query_routers();
694        if !query_routers.is_empty()
695            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
696        {
697            return crate::dsl::QueryLanguageParser::with_router(
698                Arc::clone(&self.schema),
699                self.default_fields.clone(),
700                Arc::clone(&self.tokenizers),
701                router,
702            );
703        }
704
705        crate::dsl::QueryLanguageParser::new(
706            Arc::clone(&self.schema),
707            self.default_fields.clone(),
708            Arc::clone(&self.tokenizers),
709        )
710    }
711
712    /// Get a document by address (segment_id + local doc_id)
713    pub async fn get_document(
714        &self,
715        address: &crate::query::DocAddress,
716    ) -> Result<Option<crate::dsl::Document>> {
717        self.get_document_with_fields(address, None).await
718    }
719
720    /// Get a document by address, hydrating only the specified field IDs.
721    ///
722    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
723    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
724    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
725    pub async fn get_document_with_fields(
726        &self,
727        address: &crate::query::DocAddress,
728        fields: Option<&rustc_hash::FxHashSet<u32>>,
729    ) -> Result<Option<crate::dsl::Document>> {
730        let segment_id = address.segment_id_u128().ok_or_else(|| {
731            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id()))
732        })?;
733
734        if let Some(&idx) = self.segment_map.get(&segment_id) {
735            return self.segments[idx]
736                .doc_with_fields(address.doc_id, fields)
737                .await;
738        }
739
740        Ok(None)
741    }
742}
743
744/// K-way merge of pre-sorted segment result batches.
745///
746/// Each batch is sorted by score descending. Uses a max-heap of
747/// (score, batch_idx, position) to merge in O(N log K).
748fn merge_segment_results(
749    sorted_batches: Vec<Vec<crate::query::SearchResult>>,
750    fetch_limit: usize,
751    offset: usize,
752) -> Vec<crate::query::SearchResult> {
753    use std::cmp::Ordering;
754
755    struct MergeEntry {
756        score: f32,
757        batch_idx: usize,
758        pos: usize,
759    }
760    impl PartialEq for MergeEntry {
761        fn eq(&self, other: &Self) -> bool {
762            self.score == other.score
763        }
764    }
765    impl Eq for MergeEntry {}
766    impl PartialOrd for MergeEntry {
767        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
768            Some(self.cmp(other))
769        }
770    }
771    impl Ord for MergeEntry {
772        fn cmp(&self, other: &Self) -> Ordering {
773            self.score
774                .partial_cmp(&other.score)
775                .unwrap_or(Ordering::Equal)
776        }
777    }
778
779    let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
780    for (i, batch) in sorted_batches.iter().enumerate() {
781        if !batch.is_empty() {
782            heap.push(MergeEntry {
783                score: batch[0].score,
784                batch_idx: i,
785                pos: 0,
786            });
787        }
788    }
789
790    let mut results = Vec::with_capacity(fetch_limit.min(64));
791    let mut emitted = 0usize;
792    while let Some(entry) = heap.pop() {
793        if emitted >= fetch_limit {
794            break;
795        }
796        let batch = &sorted_batches[entry.batch_idx];
797        if emitted >= offset {
798            results.push(batch[entry.pos].clone());
799        }
800        emitted += 1;
801        let next_pos = entry.pos + 1;
802        if next_pos < batch.len() {
803            heap.push(MergeEntry {
804                score: batch[next_pos].score,
805                batch_idx: entry.batch_idx,
806                pos: next_pos,
807            });
808        }
809    }
810
811    results
812}
813
814/// Get current process RSS in MB (best-effort, returns 0.0 on failure)
815fn process_rss_mb() -> f64 {
816    #[cfg(target_os = "linux")]
817    {
818        // Read from /proc/self/status — VmRSS line
819        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
820            for line in status.lines() {
821                if let Some(rest) = line.strip_prefix("VmRSS:") {
822                    let kb: f64 = rest
823                        .trim()
824                        .trim_end_matches("kB")
825                        .trim()
826                        .parse()
827                        .unwrap_or(0.0);
828                    return kb / 1024.0;
829                }
830            }
831        }
832        0.0
833    }
834    #[cfg(target_os = "macos")]
835    {
836        // Use mach_task_self / task_info via raw syscall
837        use std::mem;
838        #[repr(C)]
839        struct TaskBasicInfo {
840            virtual_size: u64,
841            resident_size: u64,
842            resident_size_max: u64,
843            user_time: [u32; 2],
844            system_time: [u32; 2],
845            policy: i32,
846            suspend_count: i32,
847        }
848        unsafe extern "C" {
849            fn mach_task_self() -> u32;
850            fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
851        }
852        const MACH_TASK_BASIC_INFO: u32 = 20;
853        let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
854        let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
855        let ret = unsafe {
856            task_info(
857                mach_task_self(),
858                MACH_TASK_BASIC_INFO,
859                &mut info,
860                &mut count,
861            )
862        };
863        if ret == 0 {
864            info.resident_size as f64 / (1024.0 * 1024.0)
865        } else {
866            0.0
867        }
868    }
869    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
870    {
871        0.0
872    }
873}