Skip to main content

hermes_core/index/
searcher.rs

1//! Searcher - read-only search over pre-built segments
2//!
3//! This module provides `Searcher` for read-only search access to indexes.
4//! It can be used standalone (for wasm/read-only) or via `IndexReader` (for native).
5
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19/// Searcher - provides search over loaded segments
20///
21/// For wasm/read-only use, create via `Searcher::open()`.
22/// For native use with Index, this is created via `IndexReader`.
23pub struct Searcher<D: Directory + 'static> {
24    /// Segment snapshot holding refs - prevents deletion during native use
25    #[cfg(feature = "native")]
26    _snapshot: SegmentSnapshot,
27    /// PhantomData for the directory generic
28    _phantom: std::marker::PhantomData<D>,
29    /// Loaded segment readers
30    segments: Vec<Arc<SegmentReader>>,
31    /// Schema
32    schema: Arc<Schema>,
33    /// Default fields for search
34    default_fields: Vec<crate::Field>,
35    /// Tokenizers
36    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37    /// Trained centroids per field (injected into segment readers for IVF/ScaNN search)
38    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39    /// Lazy global statistics for cross-segment IDF computation
40    global_stats: Arc<LazyGlobalStats>,
41    /// O(1) segment lookup by segment_id
42    segment_map: FxHashMap<u128, usize>,
43    /// Total document count across all segments
44    total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48    /// Create a Searcher directly from segment IDs
49    ///
50    /// This is a simpler initialization path that doesn't require SegmentManager.
51    /// Use this for read-only access to pre-built indexes.
52    pub async fn open(
53        directory: Arc<D>,
54        schema: Arc<Schema>,
55        segment_ids: &[String],
56        term_cache_blocks: usize,
57    ) -> Result<Self> {
58        Self::create(
59            directory,
60            schema,
61            segment_ids,
62            FxHashMap::default(),
63            term_cache_blocks,
64        )
65        .await
66    }
67
68    /// Create from a snapshot (for native IndexReader use)
69    #[cfg(feature = "native")]
70    pub(crate) async fn from_snapshot(
71        directory: Arc<D>,
72        schema: Arc<Schema>,
73        snapshot: SegmentSnapshot,
74        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75        term_cache_blocks: usize,
76    ) -> Result<Self> {
77        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78            &directory,
79            &schema,
80            snapshot.segment_ids(),
81            &trained_centroids,
82            term_cache_blocks,
83        )
84        .await?;
85
86        Ok(Self {
87            _snapshot: snapshot,
88            _phantom: std::marker::PhantomData,
89            segments,
90            schema,
91            default_fields,
92            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
93            trained_centroids,
94            global_stats,
95            segment_map,
96            total_docs,
97        })
98    }
99
100    /// Internal create method
101    async fn create(
102        directory: Arc<D>,
103        schema: Arc<Schema>,
104        segment_ids: &[String],
105        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
106        term_cache_blocks: usize,
107    ) -> Result<Self> {
108        let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
109            &directory,
110            &schema,
111            segment_ids,
112            &trained_centroids,
113            term_cache_blocks,
114        )
115        .await?;
116
117        #[cfg(feature = "native")]
118        let _snapshot = {
119            let tracker = Arc::new(SegmentTracker::new());
120            SegmentSnapshot::new(tracker, segment_ids.to_vec())
121        };
122
123        let _ = directory; // suppress unused warning on wasm
124        Ok(Self {
125            #[cfg(feature = "native")]
126            _snapshot,
127            _phantom: std::marker::PhantomData,
128            segments,
129            schema,
130            default_fields,
131            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
132            trained_centroids,
133            global_stats,
134            segment_map,
135            total_docs,
136        })
137    }
138
139    /// Common loading logic shared by create and from_snapshot
140    async fn load_common(
141        directory: &Arc<D>,
142        schema: &Arc<Schema>,
143        segment_ids: &[String],
144        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
145        term_cache_blocks: usize,
146    ) -> Result<(
147        Vec<Arc<SegmentReader>>,
148        Vec<crate::Field>,
149        Arc<LazyGlobalStats>,
150        FxHashMap<u128, usize>,
151        u32,
152    )> {
153        let segments = Self::load_segments(
154            directory,
155            schema,
156            segment_ids,
157            trained_centroids,
158            term_cache_blocks,
159        )
160        .await?;
161        let default_fields = Self::build_default_fields(schema);
162        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163        let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
164        Ok((
165            segments,
166            default_fields,
167            global_stats,
168            segment_map,
169            total_docs,
170        ))
171    }
172
173    /// Load segment readers from IDs (parallel loading for performance)
174    async fn load_segments(
175        directory: &Arc<D>,
176        schema: &Arc<Schema>,
177        segment_ids: &[String],
178        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
179        term_cache_blocks: usize,
180    ) -> Result<Vec<Arc<SegmentReader>>> {
181        // Parse segment IDs and filter invalid ones
182        let valid_segments: Vec<(usize, SegmentId)> = segment_ids
183            .iter()
184            .enumerate()
185            .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
186            .collect();
187
188        // Load all segments in parallel
189        let futures: Vec<_> = valid_segments
190            .iter()
191            .map(|(_, segment_id)| {
192                let dir = Arc::clone(directory);
193                let sch = Arc::clone(schema);
194                let sid = *segment_id;
195                async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
196            })
197            .collect();
198
199        let results = futures::future::join_all(futures).await;
200
201        // Collect results — fail fast if any segment fails to open
202        let mut loaded: Vec<(usize, SegmentReader)> = Vec::with_capacity(valid_segments.len());
203        for ((idx, sid), result) in valid_segments.into_iter().zip(results) {
204            match result {
205                Ok(reader) => loaded.push((idx, reader)),
206                Err(e) => {
207                    return Err(crate::error::Error::Internal(format!(
208                        "Failed to open segment {:016x}: {:?}",
209                        sid.0, e
210                    )));
211                }
212            }
213        }
214
215        // Sort by original index to maintain deterministic ordering
216        loaded.sort_by_key(|(idx, _)| *idx);
217
218        let mut segments = Vec::with_capacity(loaded.len());
219        for (_, mut reader) in loaded {
220            // Inject per-field centroids into reader for IVF/ScaNN search
221            if !trained_centroids.is_empty() {
222                reader.set_coarse_centroids(trained_centroids.clone());
223            }
224            segments.push(Arc::new(reader));
225        }
226
227        // Log searcher loading summary with per-segment memory breakdown
228        let total_docs: u64 = segments.iter().map(|s| s.meta().num_docs as u64).sum();
229        let mut total_mem = 0usize;
230        for seg in &segments {
231            let stats = seg.memory_stats();
232            let seg_total = stats.total_bytes();
233            total_mem += seg_total;
234            log::info!(
235                "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
236                 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
237                stats.segment_id,
238                stats.num_docs,
239                seg_total as f64 / (1024.0 * 1024.0),
240                stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
241                stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
242                stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
243                stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
244                stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
245            );
246        }
247        // Log process RSS if available (helps diagnose OOM)
248        let rss_mb = process_rss_mb();
249        log::info!(
250            "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
251            segments.len(),
252            total_docs,
253            total_mem as f64 / (1024.0 * 1024.0),
254            rss_mb,
255        );
256
257        Ok(segments)
258    }
259
260    /// Build default fields from schema
261    fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
262        if !schema.default_fields().is_empty() {
263            schema.default_fields().to_vec()
264        } else {
265            schema
266                .fields()
267                .filter(|(_, entry)| {
268                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
269                })
270                .map(|(field, _)| field)
271                .collect()
272        }
273    }
274
275    /// Get the schema
276    pub fn schema(&self) -> &Schema {
277        &self.schema
278    }
279
280    /// Get segment readers
281    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
282        &self.segments
283    }
284
285    /// Get default fields for search
286    pub fn default_fields(&self) -> &[crate::Field] {
287        &self.default_fields
288    }
289
290    /// Get tokenizer registry
291    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
292        &self.tokenizers
293    }
294
295    /// Get trained centroids
296    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
297        &self.trained_centroids
298    }
299
300    /// Get lazy global statistics for cross-segment IDF computation
301    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
302        &self.global_stats
303    }
304
305    /// Build O(1) lookup tables from loaded segments
306    fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
307        let mut segment_map = FxHashMap::default();
308        let mut total = 0u32;
309        for (i, seg) in segments.iter().enumerate() {
310            segment_map.insert(seg.meta().id, i);
311            total = total.saturating_add(seg.meta().num_docs);
312        }
313        (segment_map, total)
314    }
315
316    /// Get total document count across all segments
317    pub fn num_docs(&self) -> u32 {
318        self.total_docs
319    }
320
321    /// Get O(1) segment_id → index map (used by reranker)
322    pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
323        &self.segment_map
324    }
325
326    /// Get number of segments
327    pub fn num_segments(&self) -> usize {
328        self.segments.len()
329    }
330
331    /// Get a document by (segment_id, local_doc_id)
332    pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
333        if let Some(&idx) = self.segment_map.get(&segment_id) {
334            return self.segments[idx].doc(doc_id).await;
335        }
336        Ok(None)
337    }
338
339    /// Search across all segments and return aggregated results
340    pub async fn search(
341        &self,
342        query: &dyn crate::query::Query,
343        limit: usize,
344    ) -> Result<Vec<crate::query::SearchResult>> {
345        let (results, _) = self.search_with_count(query, limit).await?;
346        Ok(results)
347    }
348
349    /// Search across all segments and return (results, total_seen)
350    /// total_seen is the number of documents that were scored across all segments
351    pub async fn search_with_count(
352        &self,
353        query: &dyn crate::query::Query,
354        limit: usize,
355    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
356        self.search_with_offset_and_count(query, limit, 0).await
357    }
358
359    /// Search with offset for pagination
360    pub async fn search_with_offset(
361        &self,
362        query: &dyn crate::query::Query,
363        limit: usize,
364        offset: usize,
365    ) -> Result<Vec<crate::query::SearchResult>> {
366        let (results, _) = self
367            .search_with_offset_and_count(query, limit, offset)
368            .await?;
369        Ok(results)
370    }
371
372    /// Search with offset and return (results, total_seen)
373    pub async fn search_with_offset_and_count(
374        &self,
375        query: &dyn crate::query::Query,
376        limit: usize,
377        offset: usize,
378    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
379        self.search_internal(query, limit, offset, false).await
380    }
381
382    /// Search with positions (ordinal tracking) and return (results, total_seen)
383    ///
384    /// Use this when you need per-ordinal scores for multi-valued fields.
385    pub async fn search_with_positions(
386        &self,
387        query: &dyn crate::query::Query,
388        limit: usize,
389    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
390        self.search_internal(query, limit, 0, true).await
391    }
392
393    /// Internal search implementation
394    async fn search_internal(
395        &self,
396        query: &dyn crate::query::Query,
397        limit: usize,
398        offset: usize,
399        collect_positions: bool,
400    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
401        let fetch_limit = offset + limit;
402
403        // Multi-segment: use rayon for true CPU parallelism (sync feature required).
404        // Only works on multi-threaded tokio runtime (block_in_place panics on current_thread).
405        #[cfg(feature = "sync")]
406        if self.segments.len() > 1
407            && tokio::runtime::Handle::current().runtime_flavor()
408                == tokio::runtime::RuntimeFlavor::MultiThread
409        {
410            return self.search_internal_parallel(query, fetch_limit, offset, collect_positions);
411        }
412
413        // Single segment or no sync feature: use async path
414        let futures: Vec<_> = self
415            .segments
416            .iter()
417            .map(|segment| {
418                let sid = segment.meta().id;
419                async move {
420                    let (mut results, segment_seen) = if collect_positions {
421                        crate::query::search_segment_with_positions_and_count(
422                            segment.as_ref(),
423                            query,
424                            fetch_limit,
425                        )
426                        .await?
427                    } else {
428                        crate::query::search_segment_with_count(
429                            segment.as_ref(),
430                            query,
431                            fetch_limit,
432                        )
433                        .await?
434                    };
435                    // Stamp segment_id on each result
436                    for r in &mut results {
437                        r.segment_id = sid;
438                    }
439                    Ok::<_, crate::error::Error>((results, segment_seen))
440                }
441            })
442            .collect();
443
444        let batches = futures::future::try_join_all(futures).await?;
445        let mut total_seen: u32 = 0;
446
447        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
448            Vec::with_capacity(batches.len());
449        for (batch, segment_seen) in batches {
450            total_seen = total_seen.saturating_add(segment_seen);
451            if !batch.is_empty() {
452                sorted_batches.push(batch);
453            }
454        }
455
456        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
457        Ok((results, total_seen))
458    }
459
460    /// Multi-segment parallel search using rayon (CPU-bound scoring on thread pool).
461    ///
462    /// `block_in_place` tells tokio this worker is occupied so it can steal tasks.
463    /// `rayon::par_iter` distributes segment scoring across the rayon thread pool.
464    #[cfg(feature = "sync")]
465    fn search_internal_parallel(
466        &self,
467        query: &dyn crate::query::Query,
468        fetch_limit: usize,
469        offset: usize,
470        collect_positions: bool,
471    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
472        use rayon::prelude::*;
473
474        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> =
475            tokio::task::block_in_place(|| {
476                self.segments
477                    .par_iter()
478                    .map(|segment| {
479                        let sid = segment.meta().id;
480                        let (mut results, segment_seen) = if collect_positions {
481                            crate::query::search_segment_with_positions_and_count_sync(
482                                segment.as_ref(),
483                                query,
484                                fetch_limit,
485                            )?
486                        } else {
487                            crate::query::search_segment_with_count_sync(
488                                segment.as_ref(),
489                                query,
490                                fetch_limit,
491                            )?
492                        };
493                        for r in &mut results {
494                            r.segment_id = sid;
495                        }
496                        Ok((results, segment_seen))
497                    })
498                    .collect()
499            });
500
501        let mut total_seen: u32 = 0;
502        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
503            Vec::with_capacity(batches.len());
504        for result in batches {
505            let (batch, segment_seen) = result?;
506            total_seen = total_seen.saturating_add(segment_seen);
507            if !batch.is_empty() {
508                sorted_batches.push(batch);
509            }
510        }
511
512        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
513        Ok((results, total_seen))
514    }
515
516    /// Synchronous search across all segments using rayon for parallelism.
517    ///
518    /// This is the async-free boundary — no tokio involvement from here down.
519    #[cfg(feature = "sync")]
520    pub fn search_with_offset_and_count_sync(
521        &self,
522        query: &dyn crate::query::Query,
523        limit: usize,
524        offset: usize,
525    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
526        use rayon::prelude::*;
527
528        let fetch_limit = offset + limit;
529
530        let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
531            .segments
532            .par_iter()
533            .map(|segment| {
534                let sid = segment.meta().id;
535                let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
536                    segment.as_ref(),
537                    query,
538                    fetch_limit,
539                )?;
540                for r in &mut results {
541                    r.segment_id = sid;
542                }
543                Ok((results, segment_seen))
544            })
545            .collect();
546
547        let mut total_seen: u32 = 0;
548        let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
549            Vec::with_capacity(batches.len());
550        for result in batches {
551            let (batch, segment_seen) = result?;
552            total_seen = total_seen.saturating_add(segment_seen);
553            if !batch.is_empty() {
554                sorted_batches.push(batch);
555            }
556        }
557
558        let results = merge_segment_results(sorted_batches, fetch_limit, offset);
559        Ok((results, total_seen))
560    }
561
562    /// Two-stage search: L1 retrieval + L2 dense vector reranking
563    ///
564    /// Runs the query to get `l1_limit` candidates, then reranks by exact
565    /// dense vector distance and returns the top `final_limit` results.
566    pub async fn search_and_rerank(
567        &self,
568        query: &dyn crate::query::Query,
569        l1_limit: usize,
570        final_limit: usize,
571        config: &crate::query::RerankerConfig,
572    ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
573        let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
574        let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
575        Ok((reranked, total_seen))
576    }
577
578    /// Parse query string and search (convenience method)
579    pub async fn query(
580        &self,
581        query_str: &str,
582        limit: usize,
583    ) -> Result<crate::query::SearchResponse> {
584        self.query_offset(query_str, limit, 0).await
585    }
586
587    /// Parse query string and search with offset (convenience method)
588    pub async fn query_offset(
589        &self,
590        query_str: &str,
591        limit: usize,
592        offset: usize,
593    ) -> Result<crate::query::SearchResponse> {
594        let parser = self.query_parser();
595        let query = parser
596            .parse(query_str)
597            .map_err(crate::error::Error::Query)?;
598
599        let (results, _total_seen) = self
600            .search_internal(query.as_ref(), limit, offset, false)
601            .await?;
602
603        let total_hits = results.len() as u32;
604        let hits: Vec<crate::query::SearchHit> = results
605            .into_iter()
606            .map(|result| crate::query::SearchHit {
607                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
608                score: result.score,
609                matched_fields: result.extract_ordinals(),
610            })
611            .collect();
612
613        Ok(crate::query::SearchResponse { hits, total_hits })
614    }
615
616    /// Get query parser for this searcher
617    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
618        let query_routers = self.schema.query_routers();
619        if !query_routers.is_empty()
620            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
621        {
622            return crate::dsl::QueryLanguageParser::with_router(
623                Arc::clone(&self.schema),
624                self.default_fields.clone(),
625                Arc::clone(&self.tokenizers),
626                router,
627            );
628        }
629
630        crate::dsl::QueryLanguageParser::new(
631            Arc::clone(&self.schema),
632            self.default_fields.clone(),
633            Arc::clone(&self.tokenizers),
634        )
635    }
636
637    /// Get a document by address (segment_id + local doc_id)
638    pub async fn get_document(
639        &self,
640        address: &crate::query::DocAddress,
641    ) -> Result<Option<crate::dsl::Document>> {
642        self.get_document_with_fields(address, None).await
643    }
644
645    /// Get a document by address, hydrating only the specified field IDs.
646    ///
647    /// If `fields` is `None`, all fields are hydrated (including dense vectors).
648    /// If `fields` is `Some(set)`, only dense vector fields in the set are read
649    /// from flat storage — skipping expensive mmap reads for unrequested vectors.
650    pub async fn get_document_with_fields(
651        &self,
652        address: &crate::query::DocAddress,
653        fields: Option<&rustc_hash::FxHashSet<u32>>,
654    ) -> Result<Option<crate::dsl::Document>> {
655        let segment_id = address.segment_id_u128().ok_or_else(|| {
656            crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id()))
657        })?;
658
659        if let Some(&idx) = self.segment_map.get(&segment_id) {
660            return self.segments[idx]
661                .doc_with_fields(address.doc_id, fields)
662                .await;
663        }
664
665        Ok(None)
666    }
667}
668
669/// K-way merge of pre-sorted segment result batches.
670///
671/// Each batch is sorted by score descending. Uses a max-heap of
672/// (score, batch_idx, position) to merge in O(N log K).
673fn merge_segment_results(
674    sorted_batches: Vec<Vec<crate::query::SearchResult>>,
675    fetch_limit: usize,
676    offset: usize,
677) -> Vec<crate::query::SearchResult> {
678    use std::cmp::Ordering;
679
680    struct MergeEntry {
681        score: f32,
682        batch_idx: usize,
683        pos: usize,
684    }
685    impl PartialEq for MergeEntry {
686        fn eq(&self, other: &Self) -> bool {
687            self.score == other.score
688        }
689    }
690    impl Eq for MergeEntry {}
691    impl PartialOrd for MergeEntry {
692        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
693            Some(self.cmp(other))
694        }
695    }
696    impl Ord for MergeEntry {
697        fn cmp(&self, other: &Self) -> Ordering {
698            self.score
699                .partial_cmp(&other.score)
700                .unwrap_or(Ordering::Equal)
701        }
702    }
703
704    let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
705    for (i, batch) in sorted_batches.iter().enumerate() {
706        if !batch.is_empty() {
707            heap.push(MergeEntry {
708                score: batch[0].score,
709                batch_idx: i,
710                pos: 0,
711            });
712        }
713    }
714
715    let mut results = Vec::with_capacity(fetch_limit.min(64));
716    let mut emitted = 0usize;
717    while let Some(entry) = heap.pop() {
718        if emitted >= fetch_limit {
719            break;
720        }
721        let batch = &sorted_batches[entry.batch_idx];
722        if emitted >= offset {
723            results.push(batch[entry.pos].clone());
724        }
725        emitted += 1;
726        let next_pos = entry.pos + 1;
727        if next_pos < batch.len() {
728            heap.push(MergeEntry {
729                score: batch[next_pos].score,
730                batch_idx: entry.batch_idx,
731                pos: next_pos,
732            });
733        }
734    }
735
736    results
737}
738
739/// Get current process RSS in MB (best-effort, returns 0.0 on failure)
740fn process_rss_mb() -> f64 {
741    #[cfg(target_os = "linux")]
742    {
743        // Read from /proc/self/status — VmRSS line
744        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
745            for line in status.lines() {
746                if let Some(rest) = line.strip_prefix("VmRSS:") {
747                    let kb: f64 = rest
748                        .trim()
749                        .trim_end_matches("kB")
750                        .trim()
751                        .parse()
752                        .unwrap_or(0.0);
753                    return kb / 1024.0;
754                }
755            }
756        }
757        0.0
758    }
759    #[cfg(target_os = "macos")]
760    {
761        // Use mach_task_self / task_info via raw syscall
762        use std::mem;
763        #[repr(C)]
764        struct TaskBasicInfo {
765            virtual_size: u64,
766            resident_size: u64,
767            resident_size_max: u64,
768            user_time: [u32; 2],
769            system_time: [u32; 2],
770            policy: i32,
771            suspend_count: i32,
772        }
773        unsafe extern "C" {
774            fn mach_task_self() -> u32;
775            fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
776        }
777        const MACH_TASK_BASIC_INFO: u32 = 20;
778        let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
779        let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
780        let ret = unsafe {
781            task_info(
782                mach_task_self(),
783                MACH_TASK_BASIC_INFO,
784                &mut info,
785                &mut count,
786            )
787        };
788        if ret == 0 {
789            info.resident_size as f64 / (1024.0 * 1024.0)
790        } else {
791            0.0
792        }
793    }
794    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
795    {
796        0.0
797    }
798}