Skip to main content

hermes_core/query/
scoring.rs

1//! Shared scoring abstractions for text and sparse vector search
2//!
3//! Provides common types and executors for efficient top-k retrieval:
4//! - `TermCursor`: Unified cursor for both BM25 text and sparse vector posting lists
5//! - `ScoreCollector`: Efficient min-heap for maintaining top-k results
6//! - `MaxScoreExecutor`: Unified Block-Max MaxScore with conjunction optimization
7//! - `ScoredDoc`: Result type with doc_id, score, and ordinal
8
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12use log::debug;
13
14use crate::DocId;
15
16/// Entry for top-k min-heap
17#[derive(Clone, Copy)]
18pub struct HeapEntry {
19    pub doc_id: DocId,
20    pub score: f32,
21    pub ordinal: u16,
22}
23
24impl PartialEq for HeapEntry {
25    fn eq(&self, other: &Self) -> bool {
26        self.score == other.score && self.doc_id == other.doc_id
27    }
28}
29
30impl Eq for HeapEntry {}
31
32impl Ord for HeapEntry {
33    fn cmp(&self, other: &Self) -> Ordering {
34        // Min-heap: lower scores come first (to be evicted)
35        other
36            .score
37            .partial_cmp(&self.score)
38            .unwrap_or(Ordering::Equal)
39            .then_with(|| self.doc_id.cmp(&other.doc_id))
40    }
41}
42
43impl PartialOrd for HeapEntry {
44    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49/// Efficient top-k collector using min-heap (internal, scoring-layer)
50///
51/// Maintains the k highest-scoring documents using a min-heap where the
52/// lowest score is at the top for O(1) threshold lookup and O(log k) eviction.
53/// No deduplication — caller must ensure each doc_id is inserted only once.
54///
55/// This is intentionally separate from `TopKCollector` in `collector.rs`:
56/// `ScoreCollector` is used inside `MaxScoreExecutor` where only `(doc_id,
57/// score, ordinal)` tuples exist — no `Scorer` trait, no position tracking,
58/// and the threshold must be inlined for tight block-max loops.
59/// `TopKCollector` wraps a `Scorer` and drives the full `DocSet`/`Scorer`
60/// protocol, collecting positions on demand.
61pub struct ScoreCollector {
62    /// Min-heap of top-k entries (lowest score at top for eviction)
63    heap: BinaryHeap<HeapEntry>,
64    pub k: usize,
65    /// Cached threshold: avoids repeated heap.peek() in hot loops.
66    /// Updated only when the heap changes (insert/pop).
67    cached_threshold: f32,
68}
69
70impl ScoreCollector {
71    /// Create a new collector for top-k results
72    pub fn new(k: usize) -> Self {
73        // Cap capacity to avoid allocation overflow for very large k
74        let capacity = k.saturating_add(1).min(1_000_000);
75        Self {
76            heap: BinaryHeap::with_capacity(capacity),
77            k,
78            cached_threshold: 0.0,
79        }
80    }
81
82    /// Current score threshold (minimum score to enter top-k)
83    #[inline]
84    pub fn threshold(&self) -> f32 {
85        self.cached_threshold
86    }
87
88    /// Recompute cached threshold from heap state
89    #[inline]
90    fn update_threshold(&mut self) {
91        self.cached_threshold = if self.heap.len() >= self.k {
92            self.heap.peek().map(|e| e.score).unwrap_or(0.0)
93        } else {
94            0.0
95        };
96    }
97
98    /// Insert a document score. Returns true if inserted in top-k.
99    /// Caller must ensure each doc_id is inserted only once.
100    #[inline]
101    pub fn insert(&mut self, doc_id: DocId, score: f32) -> bool {
102        self.insert_with_ordinal(doc_id, score, 0)
103    }
104
105    /// Insert a document score with ordinal. Returns true if inserted in top-k.
106    /// Caller must ensure each doc_id is inserted only once.
107    #[inline]
108    pub fn insert_with_ordinal(&mut self, doc_id: DocId, score: f32, ordinal: u16) -> bool {
109        if self.heap.len() < self.k {
110            self.heap.push(HeapEntry {
111                doc_id,
112                score,
113                ordinal,
114            });
115            self.update_threshold();
116            true
117        } else if score > self.cached_threshold {
118            self.heap.push(HeapEntry {
119                doc_id,
120                score,
121                ordinal,
122            });
123            self.heap.pop(); // Remove lowest
124            self.update_threshold();
125            true
126        } else {
127            false
128        }
129    }
130
131    /// Check if a score could potentially enter top-k
132    #[inline]
133    pub fn would_enter(&self, score: f32) -> bool {
134        self.heap.len() < self.k || score > self.cached_threshold
135    }
136
137    /// Get number of documents collected so far
138    #[inline]
139    pub fn len(&self) -> usize {
140        self.heap.len()
141    }
142
143    /// Check if collector is empty
144    #[inline]
145    pub fn is_empty(&self) -> bool {
146        self.heap.is_empty()
147    }
148
149    /// Convert to sorted top-k results (descending by score)
150    pub fn into_sorted_results(self) -> Vec<(DocId, f32, u16)> {
151        let heap_vec = self.heap.into_vec();
152        let mut results: Vec<(DocId, f32, u16)> = Vec::with_capacity(heap_vec.len());
153        for e in heap_vec {
154            results.push((e.doc_id, e.score, e.ordinal));
155        }
156
157        // Sort by score descending, then doc_id ascending
158        results.sort_by(|a, b| {
159            b.1.partial_cmp(&a.1)
160                .unwrap_or(Ordering::Equal)
161                .then_with(|| a.0.cmp(&b.0))
162        });
163
164        results
165    }
166}
167
168/// Search result from MaxScore execution
169#[derive(Debug, Clone, Copy)]
170pub struct ScoredDoc {
171    pub doc_id: DocId,
172    pub score: f32,
173    /// Ordinal for multi-valued fields (which vector in the field matched)
174    pub ordinal: u16,
175}
176
177/// Unified Block-Max MaxScore executor for top-k retrieval
178///
179/// Works with both full-text (BM25) and sparse vector (dot product) queries
180/// through the polymorphic `TermCursor`. Combines three optimizations:
181/// 1. **MaxScore partitioning** (Turtle & Flood 1995): terms split into essential
182///    (must check) and non-essential (only scored if candidate is promising)
183/// 2. **Block-max pruning** (Ding & Suel 2011): skip blocks where per-block
184///    upper bounds can't beat the current threshold
185/// 3. **Conjunction optimization** (Lucene/Grand 2023): progressively intersect
186///    essential terms as threshold rises, skipping docs that lack enough terms
187pub struct MaxScoreExecutor<'a> {
188    cursors: Vec<TermCursor<'a>>,
189    prefix_sums: Vec<f32>,
190    collector: ScoreCollector,
191    heap_factor: f32,
192    predicate: Option<super::DocPredicate<'a>>,
193}
194
195/// Unified term cursor for Block-Max MaxScore execution.
196///
197/// All per-position decode buffers (`doc_ids`, `scores`, `ordinals`) live in
198/// the struct directly and are filled by `ensure_block_loaded`.
199///
200/// Skip-list metadata is **not** materialized — it is read lazily from the
201/// underlying source (`BlockPostingList` for text, `SparseIndex` for sparse),
202/// both backed by zero-copy mmap'd `OwnedBytes`.
203pub(crate) struct TermCursor<'a> {
204    pub max_score: f32,
205    num_blocks: usize,
206    // ── Per-position state (filled by ensure_block_loaded) ──────────
207    block_idx: usize,
208    doc_ids: Vec<u32>,
209    scores: Vec<f32>,
210    ordinals: Vec<u16>,
211    pos: usize,
212    block_loaded: bool,
213    exhausted: bool,
214    // ── Block decode + skip access source ───────────────────────────
215    variant: CursorVariant<'a>,
216}
217
218enum CursorVariant<'a> {
219    /// Full-text BM25 — in-memory BlockPostingList (skip list + block data)
220    Text {
221        list: crate::structures::BlockPostingList,
222        idf: f32,
223        avg_field_len: f32,
224        tfs: Vec<u32>, // temp decode buffer, converted to scores
225    },
226    /// Sparse vector — mmap'd SparseIndex (skip entries + block data)
227    Sparse {
228        si: &'a crate::segment::SparseIndex,
229        query_weight: f32,
230        skip_start: usize,
231        block_data_offset: u64,
232    },
233}
234
235// ── TermCursor async/sync macros ──────────────────────────────────────────
236//
237// Parameterised on:
238//   $load_block_fn – load_block_direct | load_block_direct_sync  (sparse I/O)
239//   $ensure_fn     – ensure_block_loaded | ensure_block_loaded_sync
240//   $($aw)*        – .await  (present for async, absent for sync)
241
242macro_rules! cursor_ensure_block {
243    ($self:ident, $load_block_fn:ident, $($aw:tt)*) => {{
244        if $self.exhausted || $self.block_loaded {
245            return Ok(!$self.exhausted);
246        }
247        match &mut $self.variant {
248            CursorVariant::Text {
249                list,
250                idf,
251                avg_field_len,
252                tfs,
253            } => {
254                if list.decode_block_into($self.block_idx, &mut $self.doc_ids, tfs) {
255                    $self.scores.clear();
256                    $self.scores.reserve(tfs.len());
257                    for &tf in tfs.iter() {
258                        let tf = tf as f32;
259                        $self.scores
260                            .push(super::bm25_score(tf, *idf, tf, *avg_field_len));
261                    }
262                    $self.pos = 0;
263                    $self.block_loaded = true;
264                    Ok(true)
265                } else {
266                    $self.exhausted = true;
267                    Ok(false)
268                }
269            }
270            CursorVariant::Sparse {
271                si,
272                query_weight,
273                skip_start,
274                block_data_offset,
275                ..
276            } => {
277                let block = si
278                    .$load_block_fn(*skip_start, *block_data_offset, $self.block_idx)
279                    $($aw)* ?;
280                match block {
281                    Some(b) => {
282                        b.decode_doc_ids_into(&mut $self.doc_ids);
283                        b.decode_ordinals_into(&mut $self.ordinals);
284                        b.decode_scored_weights_into(*query_weight, &mut $self.scores);
285                        $self.pos = 0;
286                        $self.block_loaded = true;
287                        Ok(true)
288                    }
289                    None => {
290                        $self.exhausted = true;
291                        Ok(false)
292                    }
293                }
294            }
295        }
296    }};
297}
298
299macro_rules! cursor_advance {
300    ($self:ident, $ensure_fn:ident, $($aw:tt)*) => {{
301        if $self.exhausted {
302            return Ok(u32::MAX);
303        }
304        $self.$ensure_fn() $($aw)* ?;
305        if $self.exhausted {
306            return Ok(u32::MAX);
307        }
308        Ok($self.advance_pos())
309    }};
310}
311
312macro_rules! cursor_seek {
313    ($self:ident, $ensure_fn:ident, $target:expr, $($aw:tt)*) => {{
314        if let Some(doc) = $self.seek_prepare($target) {
315            return Ok(doc);
316        }
317        $self.$ensure_fn() $($aw)* ?;
318        if $self.seek_finish($target) {
319            $self.$ensure_fn() $($aw)* ?;
320        }
321        Ok($self.doc())
322    }};
323}
324
325impl<'a> TermCursor<'a> {
326    /// Create a full-text BM25 cursor (lazy — no blocks decoded yet).
327    pub fn text(
328        posting_list: crate::structures::BlockPostingList,
329        idf: f32,
330        avg_field_len: f32,
331    ) -> Self {
332        let max_tf = posting_list.max_tf() as f32;
333        let max_score = super::bm25_upper_bound(max_tf.max(1.0), idf);
334        let num_blocks = posting_list.num_blocks();
335        Self {
336            max_score,
337            num_blocks,
338            block_idx: 0,
339            doc_ids: Vec::with_capacity(128),
340            scores: Vec::with_capacity(128),
341            ordinals: Vec::new(),
342            pos: 0,
343            block_loaded: false,
344            exhausted: num_blocks == 0,
345            variant: CursorVariant::Text {
346                list: posting_list,
347                idf,
348                avg_field_len,
349                tfs: Vec::with_capacity(128),
350            },
351        }
352    }
353
354    /// Create a sparse vector cursor with lazy block loading.
355    /// Skip entries are **not** copied — they are read from `SparseIndex` mmap on demand.
356    pub fn sparse(
357        si: &'a crate::segment::SparseIndex,
358        query_weight: f32,
359        skip_start: usize,
360        skip_count: usize,
361        global_max_weight: f32,
362        block_data_offset: u64,
363    ) -> Self {
364        Self {
365            max_score: query_weight.abs() * global_max_weight,
366            num_blocks: skip_count,
367            block_idx: 0,
368            doc_ids: Vec::with_capacity(256),
369            scores: Vec::with_capacity(256),
370            ordinals: Vec::with_capacity(256),
371            pos: 0,
372            block_loaded: false,
373            exhausted: skip_count == 0,
374            variant: CursorVariant::Sparse {
375                si,
376                query_weight,
377                skip_start,
378                block_data_offset,
379            },
380        }
381    }
382
383    // ── Skip-entry access (lazy, zero-copy for sparse) ──────────────────
384
385    #[inline]
386    fn block_first_doc(&self, idx: usize) -> DocId {
387        match &self.variant {
388            CursorVariant::Text { list, .. } => list.block_first_doc(idx).unwrap_or(u32::MAX),
389            CursorVariant::Sparse { si, skip_start, .. } => {
390                si.read_skip_entry(*skip_start + idx).first_doc
391            }
392        }
393    }
394
395    #[inline]
396    fn block_last_doc(&self, idx: usize) -> DocId {
397        match &self.variant {
398            CursorVariant::Text { list, .. } => list.block_last_doc(idx).unwrap_or(0),
399            CursorVariant::Sparse { si, skip_start, .. } => {
400                si.read_skip_entry(*skip_start + idx).last_doc
401            }
402        }
403    }
404
405    // ── Read-only accessors ─────────────────────────────────────────────
406
407    #[inline]
408    pub fn doc(&self) -> DocId {
409        if self.exhausted {
410            return u32::MAX;
411        }
412        if self.block_loaded {
413            self.doc_ids.get(self.pos).copied().unwrap_or(u32::MAX)
414        } else {
415            self.block_first_doc(self.block_idx)
416        }
417    }
418
419    #[inline]
420    pub fn ordinal(&self) -> u16 {
421        if !self.block_loaded || self.ordinals.is_empty() {
422            return 0;
423        }
424        self.ordinals.get(self.pos).copied().unwrap_or(0)
425    }
426
427    #[inline]
428    pub fn score(&self) -> f32 {
429        if !self.block_loaded {
430            return 0.0;
431        }
432        self.scores.get(self.pos).copied().unwrap_or(0.0)
433    }
434
435    #[inline]
436    pub fn current_block_max_score(&self) -> f32 {
437        if self.exhausted {
438            return 0.0;
439        }
440        match &self.variant {
441            CursorVariant::Text { list, idf, .. } => {
442                let block_max_tf = list.block_max_tf(self.block_idx).unwrap_or(0) as f32;
443                super::bm25_upper_bound(block_max_tf.max(1.0), *idf)
444            }
445            CursorVariant::Sparse {
446                si,
447                query_weight,
448                skip_start,
449                ..
450            } => query_weight.abs() * si.read_skip_entry(*skip_start + self.block_idx).max_weight,
451        }
452    }
453
454    // ── Block navigation ────────────────────────────────────────────────
455
456    pub fn skip_to_next_block(&mut self) -> DocId {
457        if self.exhausted {
458            return u32::MAX;
459        }
460        self.block_idx += 1;
461        self.block_loaded = false;
462        if self.block_idx >= self.num_blocks {
463            self.exhausted = true;
464            return u32::MAX;
465        }
466        self.block_first_doc(self.block_idx)
467    }
468
469    #[inline]
470    fn advance_pos(&mut self) -> DocId {
471        self.pos += 1;
472        if self.pos >= self.doc_ids.len() {
473            self.block_idx += 1;
474            self.block_loaded = false;
475            if self.block_idx >= self.num_blocks {
476                self.exhausted = true;
477                return u32::MAX;
478            }
479        }
480        self.doc()
481    }
482
483    // ── Block loading / advance / seek ─────────────────────────────────
484    //
485    // Macros parameterised on sparse I/O method + optional .await to
486    // stamp out both async and sync variants without duplication.
487
488    pub async fn ensure_block_loaded(&mut self) -> crate::Result<bool> {
489        cursor_ensure_block!(self, load_block_direct, .await)
490    }
491
492    pub fn ensure_block_loaded_sync(&mut self) -> crate::Result<bool> {
493        cursor_ensure_block!(self, load_block_direct_sync,)
494    }
495
496    pub async fn advance(&mut self) -> crate::Result<DocId> {
497        cursor_advance!(self, ensure_block_loaded, .await)
498    }
499
500    pub fn advance_sync(&mut self) -> crate::Result<DocId> {
501        cursor_advance!(self, ensure_block_loaded_sync,)
502    }
503
504    pub async fn seek(&mut self, target: DocId) -> crate::Result<DocId> {
505        cursor_seek!(self, ensure_block_loaded, target, .await)
506    }
507
508    pub fn seek_sync(&mut self, target: DocId) -> crate::Result<DocId> {
509        cursor_seek!(self, ensure_block_loaded_sync, target,)
510    }
511
512    fn seek_prepare(&mut self, target: DocId) -> Option<DocId> {
513        if self.exhausted {
514            return Some(u32::MAX);
515        }
516
517        // Fast path: target is within the currently loaded block
518        if self.block_loaded
519            && let Some(&last) = self.doc_ids.last()
520        {
521            if last >= target && self.doc_ids[self.pos] < target {
522                let remaining = &self.doc_ids[self.pos..];
523                self.pos += crate::structures::simd::find_first_ge_u32(remaining, target);
524                if self.pos >= self.doc_ids.len() {
525                    self.block_idx += 1;
526                    self.block_loaded = false;
527                    if self.block_idx >= self.num_blocks {
528                        self.exhausted = true;
529                        return Some(u32::MAX);
530                    }
531                }
532                return Some(self.doc());
533            }
534            if self.doc_ids[self.pos] >= target {
535                return Some(self.doc());
536            }
537        }
538
539        // Seek to the block containing target
540        let lo = match &self.variant {
541            // Text: SIMD-accelerated 2-level seek (L1 + L0)
542            CursorVariant::Text { list, .. } => match list.seek_block(target, self.block_idx) {
543                Some(idx) => idx,
544                None => {
545                    self.exhausted = true;
546                    return Some(u32::MAX);
547                }
548            },
549            // Sparse: binary search on skip entries (lazy mmap reads)
550            CursorVariant::Sparse { .. } => {
551                let mut lo = self.block_idx;
552                let mut hi = self.num_blocks;
553                while lo < hi {
554                    let mid = lo + (hi - lo) / 2;
555                    if self.block_last_doc(mid) < target {
556                        lo = mid + 1;
557                    } else {
558                        hi = mid;
559                    }
560                }
561                lo
562            }
563        };
564        if lo >= self.num_blocks {
565            self.exhausted = true;
566            return Some(u32::MAX);
567        }
568        if lo != self.block_idx || !self.block_loaded {
569            self.block_idx = lo;
570            self.block_loaded = false;
571        }
572        None
573    }
574
575    #[inline]
576    fn seek_finish(&mut self, target: DocId) -> bool {
577        if self.exhausted {
578            return false;
579        }
580        self.pos = crate::structures::simd::find_first_ge_u32(&self.doc_ids, target);
581        if self.pos >= self.doc_ids.len() {
582            self.block_idx += 1;
583            self.block_loaded = false;
584            if self.block_idx >= self.num_blocks {
585                self.exhausted = true;
586                return false;
587            }
588            return true;
589        }
590        false
591    }
592}
593
594/// Macro to stamp out the Block-Max MaxScore loop for both async and sync paths.
595///
596/// `$ensure`, `$advance`, `$seek` are cursor method idents (async or _sync variants).
597/// `$($aw:tt)*` captures `.await` for async or nothing for sync.
598macro_rules! bms_execute_loop {
599    ($self:ident, $ensure:ident, $advance:ident, $seek:ident, $($aw:tt)*) => {{
600        let n = $self.cursors.len();
601
602        // Load first block for each cursor (ensures doc() returns real values)
603        for cursor in &mut $self.cursors {
604            cursor.$ensure() $($aw)* ?;
605        }
606
607        let mut docs_scored = 0u64;
608        let mut docs_skipped = 0u64;
609        let mut blocks_skipped = 0u64;
610        let mut conjunction_skipped = 0u64;
611        let mut ordinal_scores: Vec<(u16, f32)> = Vec::with_capacity(n * 2);
612
613        loop {
614            let partition = $self.find_partition();
615            if partition >= n {
616                break;
617            }
618
619            // Find minimum doc_id across essential cursors
620            let mut min_doc = u32::MAX;
621            for i in partition..n {
622                let doc = $self.cursors[i].doc();
623                if doc < min_doc {
624                    min_doc = doc;
625                }
626            }
627            if min_doc == u32::MAX {
628                break;
629            }
630
631            let non_essential_upper = if partition > 0 {
632                $self.prefix_sums[partition - 1]
633            } else {
634                0.0
635            };
636            // Small epsilon to guard against FP rounding in score accumulation.
637            // Without this, a document whose true score equals the threshold can
638            // be incorrectly pruned due to rounding in the heap_factor multiply
639            // or in the prefix_sum additions.
640            let adjusted_threshold = $self.collector.threshold() * $self.heap_factor - 1e-6;
641
642            // --- Conjunction optimization ---
643            if $self.collector.len() >= $self.collector.k {
644                let present_upper: f32 = (partition..n)
645                    .filter(|&i| $self.cursors[i].doc() == min_doc)
646                    .map(|i| $self.cursors[i].max_score)
647                    .sum();
648
649                if present_upper + non_essential_upper <= adjusted_threshold {
650                    for i in partition..n {
651                        if $self.cursors[i].doc() == min_doc {
652                            $self.cursors[i].$ensure() $($aw)* ?;
653                            $self.cursors[i].$advance() $($aw)* ?;
654                        }
655                    }
656                    conjunction_skipped += 1;
657                    continue;
658                }
659            }
660
661            // --- Block-max pruning ---
662            if $self.collector.len() >= $self.collector.k {
663                let block_max_sum: f32 = (partition..n)
664                    .filter(|&i| $self.cursors[i].doc() == min_doc)
665                    .map(|i| $self.cursors[i].current_block_max_score())
666                    .sum();
667
668                if block_max_sum + non_essential_upper <= adjusted_threshold {
669                    for i in partition..n {
670                        if $self.cursors[i].doc() == min_doc {
671                            $self.cursors[i].skip_to_next_block();
672                            $self.cursors[i].$ensure() $($aw)* ?;
673                        }
674                    }
675                    blocks_skipped += 1;
676                    continue;
677                }
678            }
679
680            // --- Predicate filter (after block-max, before scoring) ---
681            if let Some(ref pred) = $self.predicate {
682                if !pred(min_doc) {
683                    for i in partition..n {
684                        if $self.cursors[i].doc() == min_doc {
685                            $self.cursors[i].$ensure() $($aw)* ?;
686                            $self.cursors[i].$advance() $($aw)* ?;
687                        }
688                    }
689                    continue;
690                }
691            }
692
693            // --- Score essential cursors ---
694            ordinal_scores.clear();
695            for i in partition..n {
696                if $self.cursors[i].doc() == min_doc {
697                    $self.cursors[i].$ensure() $($aw)* ?;
698                    while $self.cursors[i].doc() == min_doc {
699                        ordinal_scores.push(($self.cursors[i].ordinal(), $self.cursors[i].score()));
700                        $self.cursors[i].$advance() $($aw)* ?;
701                    }
702                }
703            }
704
705            let essential_total: f32 = ordinal_scores.iter().map(|(_, s)| *s).sum();
706            if $self.collector.len() >= $self.collector.k
707                && essential_total + non_essential_upper <= adjusted_threshold
708            {
709                docs_skipped += 1;
710                continue;
711            }
712
713            // --- Score non-essential cursors (highest max_score first for early exit) ---
714            let mut running_total = essential_total;
715            for i in (0..partition).rev() {
716                if $self.collector.len() >= $self.collector.k
717                    && running_total + $self.prefix_sums[i] <= adjusted_threshold
718                {
719                    break;
720                }
721
722                let doc = $self.cursors[i].$seek(min_doc) $($aw)* ?;
723                if doc == min_doc {
724                    while $self.cursors[i].doc() == min_doc {
725                        let s = $self.cursors[i].score();
726                        running_total += s;
727                        ordinal_scores.push(($self.cursors[i].ordinal(), s));
728                        $self.cursors[i].$advance() $($aw)* ?;
729                    }
730                }
731            }
732
733            // --- Group by ordinal and insert ---
734            // Fast path: single entry (common for single-valued fields) — skip sort + grouping
735            if ordinal_scores.len() == 1 {
736                let (ord, score) = ordinal_scores[0];
737                if $self.collector.insert_with_ordinal(min_doc, score, ord) {
738                    docs_scored += 1;
739                } else {
740                    docs_skipped += 1;
741                }
742            } else if !ordinal_scores.is_empty() {
743                if ordinal_scores.len() > 2 {
744                    ordinal_scores.sort_unstable_by_key(|(ord, _)| *ord);
745                } else if ordinal_scores.len() == 2 && ordinal_scores[0].0 > ordinal_scores[1].0 {
746                    ordinal_scores.swap(0, 1);
747                }
748                let mut j = 0;
749                while j < ordinal_scores.len() {
750                    let current_ord = ordinal_scores[j].0;
751                    let mut score = 0.0f32;
752                    while j < ordinal_scores.len() && ordinal_scores[j].0 == current_ord {
753                        score += ordinal_scores[j].1;
754                        j += 1;
755                    }
756                    if $self
757                        .collector
758                        .insert_with_ordinal(min_doc, score, current_ord)
759                    {
760                        docs_scored += 1;
761                    } else {
762                        docs_skipped += 1;
763                    }
764                }
765            }
766        }
767
768        let results: Vec<ScoredDoc> = $self
769            .collector
770            .into_sorted_results()
771            .into_iter()
772            .map(|(doc_id, score, ordinal)| ScoredDoc {
773                doc_id,
774                score,
775                ordinal,
776            })
777            .collect();
778
779        debug!(
780            "MaxScoreExecutor: scored={}, skipped={}, blocks_skipped={}, conjunction_skipped={}, returned={}, top_score={:.4}",
781            docs_scored,
782            docs_skipped,
783            blocks_skipped,
784            conjunction_skipped,
785            results.len(),
786            results.first().map(|r| r.score).unwrap_or(0.0)
787        );
788
789        Ok(results)
790    }};
791}
792
793impl<'a> MaxScoreExecutor<'a> {
794    /// Create a new executor from pre-built cursors.
795    ///
796    /// Cursors are sorted by max_score ascending (non-essential first) and
797    /// prefix sums are computed for the MaxScore partitioning.
798    pub(crate) fn new(mut cursors: Vec<TermCursor<'a>>, k: usize, heap_factor: f32) -> Self {
799        // Sort by max_score ascending (non-essential first)
800        cursors.sort_by(|a, b| {
801            a.max_score
802                .partial_cmp(&b.max_score)
803                .unwrap_or(Ordering::Equal)
804        });
805
806        let mut prefix_sums = Vec::with_capacity(cursors.len());
807        let mut cumsum = 0.0f32;
808        for c in &cursors {
809            cumsum += c.max_score;
810            prefix_sums.push(cumsum);
811        }
812
813        debug!(
814            "Creating MaxScoreExecutor: num_cursors={}, k={}, total_upper={:.4}, heap_factor={:.2}",
815            cursors.len(),
816            k,
817            cumsum,
818            heap_factor
819        );
820
821        Self {
822            cursors,
823            prefix_sums,
824            collector: ScoreCollector::new(k),
825            heap_factor: heap_factor.clamp(0.0, 1.0),
826            predicate: None,
827        }
828    }
829
830    /// Create an executor for sparse vector queries.
831    ///
832    /// Builds `TermCursor::Sparse` for each matched dimension.
833    pub fn sparse(
834        sparse_index: &'a crate::segment::SparseIndex,
835        query_terms: Vec<(u32, f32)>,
836        k: usize,
837        heap_factor: f32,
838    ) -> Self {
839        let cursors: Vec<TermCursor<'a>> = query_terms
840            .iter()
841            .filter_map(|&(dim_id, qw)| {
842                let (skip_start, skip_count, global_max, block_data_offset) =
843                    sparse_index.get_skip_range_full(dim_id)?;
844                Some(TermCursor::sparse(
845                    sparse_index,
846                    qw,
847                    skip_start,
848                    skip_count,
849                    global_max,
850                    block_data_offset,
851                ))
852            })
853            .collect();
854        Self::new(cursors, k, heap_factor)
855    }
856
857    /// Create an executor for full-text BM25 queries.
858    ///
859    /// Builds `TermCursor::Text` for each posting list.
860    pub fn text(
861        posting_lists: Vec<(crate::structures::BlockPostingList, f32)>,
862        avg_field_len: f32,
863        k: usize,
864    ) -> Self {
865        let cursors: Vec<TermCursor<'a>> = posting_lists
866            .into_iter()
867            .map(|(pl, idf)| TermCursor::text(pl, idf, avg_field_len))
868            .collect();
869        Self::new(cursors, k, 1.0)
870    }
871
872    #[inline]
873    fn find_partition(&self) -> usize {
874        let threshold = self.collector.threshold() * self.heap_factor;
875        self.prefix_sums.partition_point(|&sum| sum <= threshold)
876    }
877
878    /// Attach a per-doc predicate filter to this executor.
879    ///
880    /// Docs failing the predicate are skipped after block-max pruning but
881    /// before scoring. The predicate does not affect thresholds or block-max
882    /// comparisons — the heap stores pure sparse/text scores.
883    pub fn with_predicate(mut self, predicate: super::DocPredicate<'a>) -> Self {
884        self.predicate = Some(predicate);
885        self
886    }
887
888    /// Execute Block-Max MaxScore and return top-k results (async).
889    pub async fn execute(mut self) -> crate::Result<Vec<ScoredDoc>> {
890        if self.cursors.is_empty() {
891            return Ok(Vec::new());
892        }
893        bms_execute_loop!(self, ensure_block_loaded, advance, seek, .await)
894    }
895
896    /// Synchronous execution — works when all cursors are text or mmap-backed sparse.
897    pub fn execute_sync(mut self) -> crate::Result<Vec<ScoredDoc>> {
898        if self.cursors.is_empty() {
899            return Ok(Vec::new());
900        }
901        bms_execute_loop!(self, ensure_block_loaded_sync, advance_sync, seek_sync,)
902    }
903}
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908
909    #[test]
910    fn test_score_collector_basic() {
911        let mut collector = ScoreCollector::new(3);
912
913        collector.insert(1, 1.0);
914        collector.insert(2, 2.0);
915        collector.insert(3, 3.0);
916        assert_eq!(collector.threshold(), 1.0);
917
918        collector.insert(4, 4.0);
919        assert_eq!(collector.threshold(), 2.0);
920
921        let results = collector.into_sorted_results();
922        assert_eq!(results.len(), 3);
923        assert_eq!(results[0].0, 4); // Highest score
924        assert_eq!(results[1].0, 3);
925        assert_eq!(results[2].0, 2);
926    }
927
928    #[test]
929    fn test_score_collector_threshold() {
930        let mut collector = ScoreCollector::new(2);
931
932        collector.insert(1, 5.0);
933        collector.insert(2, 3.0);
934        assert_eq!(collector.threshold(), 3.0);
935
936        // Should not enter (score too low)
937        assert!(!collector.would_enter(2.0));
938        assert!(!collector.insert(3, 2.0));
939
940        // Should enter (score high enough)
941        assert!(collector.would_enter(4.0));
942        assert!(collector.insert(4, 4.0));
943        assert_eq!(collector.threshold(), 4.0);
944    }
945
946    #[test]
947    fn test_heap_entry_ordering() {
948        let mut heap = BinaryHeap::new();
949        heap.push(HeapEntry {
950            doc_id: 1,
951            score: 3.0,
952            ordinal: 0,
953        });
954        heap.push(HeapEntry {
955            doc_id: 2,
956            score: 1.0,
957            ordinal: 0,
958        });
959        heap.push(HeapEntry {
960            doc_id: 3,
961            score: 2.0,
962            ordinal: 0,
963        });
964
965        // Min-heap: lowest score should come out first
966        assert_eq!(heap.pop().unwrap().score, 1.0);
967        assert_eq!(heap.pop().unwrap().score, 2.0);
968        assert_eq!(heap.pop().unwrap().score, 3.0);
969    }
970}