Skip to main content

hermes_core/query/
scoring.rs

1//! Shared scoring abstractions for text and sparse vector search
2//!
3//! Provides common types and executors for efficient top-k retrieval:
4//! - `TermCursor`: Unified cursor for both BM25 text and sparse vector posting lists
5//! - `ScoreCollector`: Efficient min-heap for maintaining top-k results
6//! - `MaxScoreExecutor`: Unified Block-Max MaxScore with conjunction optimization
7//! - `ScoredDoc`: Result type with doc_id, score, and ordinal
8
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12use log::debug;
13
14use crate::DocId;
15
16/// Entry for top-k min-heap
17#[derive(Clone, Copy)]
18pub struct HeapEntry {
19    pub doc_id: DocId,
20    pub score: f32,
21    pub ordinal: u16,
22}
23
24impl PartialEq for HeapEntry {
25    fn eq(&self, other: &Self) -> bool {
26        self.score == other.score && self.doc_id == other.doc_id
27    }
28}
29
30impl Eq for HeapEntry {}
31
32impl Ord for HeapEntry {
33    fn cmp(&self, other: &Self) -> Ordering {
34        // Min-heap: lower scores come first (to be evicted)
35        other
36            .score
37            .partial_cmp(&self.score)
38            .unwrap_or(Ordering::Equal)
39            .then_with(|| self.doc_id.cmp(&other.doc_id))
40    }
41}
42
43impl PartialOrd for HeapEntry {
44    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49/// Efficient top-k collector using min-heap
50///
51/// Maintains the k highest-scoring documents using a min-heap where the
52/// lowest score is at the top for O(1) threshold lookup and O(log k) eviction.
53/// No deduplication - caller must ensure each doc_id is inserted only once.
54pub struct ScoreCollector {
55    /// Min-heap of top-k entries (lowest score at top for eviction)
56    heap: BinaryHeap<HeapEntry>,
57    pub k: usize,
58    /// Cached threshold: avoids repeated heap.peek() in hot loops.
59    /// Updated only when the heap changes (insert/pop).
60    cached_threshold: f32,
61}
62
63impl ScoreCollector {
64    /// Create a new collector for top-k results
65    pub fn new(k: usize) -> Self {
66        // Cap capacity to avoid allocation overflow for very large k
67        let capacity = k.saturating_add(1).min(1_000_000);
68        Self {
69            heap: BinaryHeap::with_capacity(capacity),
70            k,
71            cached_threshold: 0.0,
72        }
73    }
74
75    /// Current score threshold (minimum score to enter top-k)
76    #[inline]
77    pub fn threshold(&self) -> f32 {
78        self.cached_threshold
79    }
80
81    /// Recompute cached threshold from heap state
82    #[inline]
83    fn update_threshold(&mut self) {
84        self.cached_threshold = if self.heap.len() >= self.k {
85            self.heap.peek().map(|e| e.score).unwrap_or(0.0)
86        } else {
87            0.0
88        };
89    }
90
91    /// Insert a document score. Returns true if inserted in top-k.
92    /// Caller must ensure each doc_id is inserted only once.
93    #[inline]
94    pub fn insert(&mut self, doc_id: DocId, score: f32) -> bool {
95        self.insert_with_ordinal(doc_id, score, 0)
96    }
97
98    /// Insert a document score with ordinal. Returns true if inserted in top-k.
99    /// Caller must ensure each doc_id is inserted only once.
100    #[inline]
101    pub fn insert_with_ordinal(&mut self, doc_id: DocId, score: f32, ordinal: u16) -> bool {
102        if self.heap.len() < self.k {
103            self.heap.push(HeapEntry {
104                doc_id,
105                score,
106                ordinal,
107            });
108            self.update_threshold();
109            true
110        } else if score > self.cached_threshold {
111            self.heap.push(HeapEntry {
112                doc_id,
113                score,
114                ordinal,
115            });
116            self.heap.pop(); // Remove lowest
117            self.update_threshold();
118            true
119        } else {
120            false
121        }
122    }
123
124    /// Check if a score could potentially enter top-k
125    #[inline]
126    pub fn would_enter(&self, score: f32) -> bool {
127        self.heap.len() < self.k || score > self.cached_threshold
128    }
129
130    /// Get number of documents collected so far
131    #[inline]
132    pub fn len(&self) -> usize {
133        self.heap.len()
134    }
135
136    /// Check if collector is empty
137    #[inline]
138    pub fn is_empty(&self) -> bool {
139        self.heap.is_empty()
140    }
141
142    /// Convert to sorted top-k results (descending by score)
143    pub fn into_sorted_results(self) -> Vec<(DocId, f32, u16)> {
144        let heap_vec = self.heap.into_vec();
145        let mut results: Vec<(DocId, f32, u16)> = Vec::with_capacity(heap_vec.len());
146        for e in heap_vec {
147            results.push((e.doc_id, e.score, e.ordinal));
148        }
149
150        // Sort by score descending, then doc_id ascending
151        results.sort_by(|a, b| {
152            b.1.partial_cmp(&a.1)
153                .unwrap_or(Ordering::Equal)
154                .then_with(|| a.0.cmp(&b.0))
155        });
156
157        results
158    }
159}
160
161/// Search result from MaxScore execution
162#[derive(Debug, Clone, Copy)]
163pub struct ScoredDoc {
164    pub doc_id: DocId,
165    pub score: f32,
166    /// Ordinal for multi-valued fields (which vector in the field matched)
167    pub ordinal: u16,
168}
169
170/// Unified Block-Max MaxScore executor for top-k retrieval
171///
172/// Works with both full-text (BM25) and sparse vector (dot product) queries
173/// through the polymorphic `TermCursor`. Combines three optimizations:
174/// 1. **MaxScore partitioning** (Turtle & Flood 1995): terms split into essential
175///    (must check) and non-essential (only scored if candidate is promising)
176/// 2. **Block-max pruning** (Ding & Suel 2011): skip blocks where per-block
177///    upper bounds can't beat the current threshold
178/// 3. **Conjunction optimization** (Lucene/Grand 2023): progressively intersect
179///    essential terms as threshold rises, skipping docs that lack enough terms
180pub struct MaxScoreExecutor<'a> {
181    cursors: Vec<TermCursor<'a>>,
182    prefix_sums: Vec<f32>,
183    collector: ScoreCollector,
184    heap_factor: f32,
185}
186
187/// Unified term cursor for Block-Max MaxScore execution.
188///
189/// All per-position decode buffers (`doc_ids`, `scores`, `ordinals`) live in
190/// the struct directly and are filled by `ensure_block_loaded`.
191///
192/// Skip-list metadata is **not** materialized — it is read lazily from the
193/// underlying source (`BlockPostingList` for text, `SparseIndex` for sparse),
194/// both backed by zero-copy mmap'd `OwnedBytes`.
195pub(crate) struct TermCursor<'a> {
196    pub max_score: f32,
197    num_blocks: usize,
198    // ── Per-position state (filled by ensure_block_loaded) ──────────
199    block_idx: usize,
200    doc_ids: Vec<u32>,
201    scores: Vec<f32>,
202    ordinals: Vec<u16>,
203    pos: usize,
204    block_loaded: bool,
205    exhausted: bool,
206    // ── Block decode + skip access source ───────────────────────────
207    variant: CursorVariant<'a>,
208}
209
210enum CursorVariant<'a> {
211    /// Full-text BM25 — in-memory BlockPostingList (skip list + block data)
212    Text {
213        list: crate::structures::BlockPostingList,
214        idf: f32,
215        avg_field_len: f32,
216        tfs: Vec<u32>, // temp decode buffer, converted to scores
217    },
218    /// Sparse vector — mmap'd SparseIndex (skip entries + block data)
219    Sparse {
220        si: &'a crate::segment::SparseIndex,
221        query_weight: f32,
222        skip_start: usize,
223        block_data_offset: u64,
224    },
225}
226
227impl<'a> TermCursor<'a> {
228    /// Create a full-text BM25 cursor (lazy — no blocks decoded yet).
229    pub fn text(
230        posting_list: crate::structures::BlockPostingList,
231        idf: f32,
232        avg_field_len: f32,
233    ) -> Self {
234        let max_tf = posting_list.max_tf() as f32;
235        let max_score = super::bm25_upper_bound(max_tf.max(1.0), idf);
236        let num_blocks = posting_list.num_blocks();
237        Self {
238            max_score,
239            num_blocks,
240            block_idx: 0,
241            doc_ids: Vec::with_capacity(128),
242            scores: Vec::with_capacity(128),
243            ordinals: Vec::new(),
244            pos: 0,
245            block_loaded: false,
246            exhausted: num_blocks == 0,
247            variant: CursorVariant::Text {
248                list: posting_list,
249                idf,
250                avg_field_len,
251                tfs: Vec::with_capacity(128),
252            },
253        }
254    }
255
256    /// Create a sparse vector cursor with lazy block loading.
257    /// Skip entries are **not** copied — they are read from `SparseIndex` mmap on demand.
258    pub fn sparse(
259        si: &'a crate::segment::SparseIndex,
260        query_weight: f32,
261        skip_start: usize,
262        skip_count: usize,
263        global_max_weight: f32,
264        block_data_offset: u64,
265    ) -> Self {
266        Self {
267            max_score: query_weight.abs() * global_max_weight,
268            num_blocks: skip_count,
269            block_idx: 0,
270            doc_ids: Vec::with_capacity(256),
271            scores: Vec::with_capacity(256),
272            ordinals: Vec::with_capacity(256),
273            pos: 0,
274            block_loaded: false,
275            exhausted: skip_count == 0,
276            variant: CursorVariant::Sparse {
277                si,
278                query_weight,
279                skip_start,
280                block_data_offset,
281            },
282        }
283    }
284
285    // ── Skip-entry access (lazy, zero-copy for sparse) ──────────────────
286
287    #[inline]
288    fn block_first_doc(&self, idx: usize) -> DocId {
289        match &self.variant {
290            CursorVariant::Text { list, .. } => list.block_first_doc(idx).unwrap_or(u32::MAX),
291            CursorVariant::Sparse { si, skip_start, .. } => {
292                si.read_skip_entry(*skip_start + idx).first_doc
293            }
294        }
295    }
296
297    #[inline]
298    fn block_last_doc(&self, idx: usize) -> DocId {
299        match &self.variant {
300            CursorVariant::Text { list, .. } => list.block_last_doc(idx).unwrap_or(0),
301            CursorVariant::Sparse { si, skip_start, .. } => {
302                si.read_skip_entry(*skip_start + idx).last_doc
303            }
304        }
305    }
306
307    // ── Read-only accessors ─────────────────────────────────────────────
308
309    #[inline]
310    pub fn doc(&self) -> DocId {
311        if self.exhausted {
312            return u32::MAX;
313        }
314        if self.block_loaded {
315            self.doc_ids.get(self.pos).copied().unwrap_or(u32::MAX)
316        } else {
317            self.block_first_doc(self.block_idx)
318        }
319    }
320
321    #[inline]
322    pub fn ordinal(&self) -> u16 {
323        if !self.block_loaded || self.ordinals.is_empty() {
324            return 0;
325        }
326        self.ordinals.get(self.pos).copied().unwrap_or(0)
327    }
328
329    #[inline]
330    pub fn score(&self) -> f32 {
331        if !self.block_loaded {
332            return 0.0;
333        }
334        self.scores.get(self.pos).copied().unwrap_or(0.0)
335    }
336
337    #[inline]
338    pub fn current_block_max_score(&self) -> f32 {
339        if self.exhausted {
340            return 0.0;
341        }
342        match &self.variant {
343            CursorVariant::Text { list, idf, .. } => {
344                let block_max_tf = list.block_max_tf(self.block_idx).unwrap_or(0) as f32;
345                super::bm25_upper_bound(block_max_tf.max(1.0), *idf)
346            }
347            CursorVariant::Sparse {
348                si,
349                query_weight,
350                skip_start,
351                ..
352            } => query_weight.abs() * si.read_skip_entry(*skip_start + self.block_idx).max_weight,
353        }
354    }
355
356    // ── Block navigation ────────────────────────────────────────────────
357
358    pub fn skip_to_next_block(&mut self) -> DocId {
359        if self.exhausted {
360            return u32::MAX;
361        }
362        self.block_idx += 1;
363        self.block_loaded = false;
364        if self.block_idx >= self.num_blocks {
365            self.exhausted = true;
366            return u32::MAX;
367        }
368        self.block_first_doc(self.block_idx)
369    }
370
371    #[inline]
372    fn advance_pos(&mut self) -> DocId {
373        self.pos += 1;
374        if self.pos >= self.doc_ids.len() {
375            self.block_idx += 1;
376            self.block_loaded = false;
377            if self.block_idx >= self.num_blocks {
378                self.exhausted = true;
379                return u32::MAX;
380            }
381        }
382        self.doc()
383    }
384
385    // ── Block loading (dispatch: decode format + I/O differ) ────────────
386
387    pub async fn ensure_block_loaded(&mut self) -> crate::Result<bool> {
388        if self.exhausted || self.block_loaded {
389            return Ok(!self.exhausted);
390        }
391        match &mut self.variant {
392            CursorVariant::Text {
393                list,
394                idf,
395                avg_field_len,
396                tfs,
397            } => {
398                if list.decode_block_into(self.block_idx, &mut self.doc_ids, tfs) {
399                    self.scores.clear();
400                    self.scores.reserve(tfs.len());
401                    for &tf in tfs.iter() {
402                        let tf = tf as f32;
403                        self.scores
404                            .push(super::bm25_score(tf, *idf, tf, *avg_field_len));
405                    }
406                    self.pos = 0;
407                    self.block_loaded = true;
408                    Ok(true)
409                } else {
410                    self.exhausted = true;
411                    Ok(false)
412                }
413            }
414            CursorVariant::Sparse {
415                si,
416                query_weight,
417                skip_start,
418                block_data_offset,
419                ..
420            } => {
421                let block = si
422                    .load_block_direct(*skip_start, *block_data_offset, self.block_idx)
423                    .await?;
424                match block {
425                    Some(b) => {
426                        b.decode_doc_ids_into(&mut self.doc_ids);
427                        b.decode_ordinals_into(&mut self.ordinals);
428                        b.decode_scored_weights_into(*query_weight, &mut self.scores);
429                        self.pos = 0;
430                        self.block_loaded = true;
431                        Ok(true)
432                    }
433                    None => {
434                        self.exhausted = true;
435                        Ok(false)
436                    }
437                }
438            }
439        }
440    }
441
442    pub fn ensure_block_loaded_sync(&mut self) -> crate::Result<bool> {
443        if self.exhausted || self.block_loaded {
444            return Ok(!self.exhausted);
445        }
446        match &mut self.variant {
447            CursorVariant::Text {
448                list,
449                idf,
450                avg_field_len,
451                tfs,
452            } => {
453                if list.decode_block_into(self.block_idx, &mut self.doc_ids, tfs) {
454                    self.scores.clear();
455                    self.scores.reserve(tfs.len());
456                    for &tf in tfs.iter() {
457                        let tf = tf as f32;
458                        self.scores
459                            .push(super::bm25_score(tf, *idf, tf, *avg_field_len));
460                    }
461                    self.pos = 0;
462                    self.block_loaded = true;
463                    Ok(true)
464                } else {
465                    self.exhausted = true;
466                    Ok(false)
467                }
468            }
469            CursorVariant::Sparse {
470                si,
471                query_weight,
472                skip_start,
473                block_data_offset,
474                ..
475            } => {
476                let block =
477                    si.load_block_direct_sync(*skip_start, *block_data_offset, self.block_idx)?;
478                match block {
479                    Some(b) => {
480                        b.decode_doc_ids_into(&mut self.doc_ids);
481                        b.decode_ordinals_into(&mut self.ordinals);
482                        b.decode_scored_weights_into(*query_weight, &mut self.scores);
483                        self.pos = 0;
484                        self.block_loaded = true;
485                        Ok(true)
486                    }
487                    None => {
488                        self.exhausted = true;
489                        Ok(false)
490                    }
491                }
492            }
493        }
494    }
495
496    // ── Advance / Seek ──────────────────────────────────────────────────
497
498    pub async fn advance(&mut self) -> crate::Result<DocId> {
499        if self.exhausted {
500            return Ok(u32::MAX);
501        }
502        self.ensure_block_loaded().await?;
503        if self.exhausted {
504            return Ok(u32::MAX);
505        }
506        Ok(self.advance_pos())
507    }
508
509    pub fn advance_sync(&mut self) -> crate::Result<DocId> {
510        if self.exhausted {
511            return Ok(u32::MAX);
512        }
513        self.ensure_block_loaded_sync()?;
514        if self.exhausted {
515            return Ok(u32::MAX);
516        }
517        Ok(self.advance_pos())
518    }
519
520    pub async fn seek(&mut self, target: DocId) -> crate::Result<DocId> {
521        if let Some(doc) = self.seek_prepare(target) {
522            return Ok(doc);
523        }
524        self.ensure_block_loaded().await?;
525        if self.seek_finish(target) {
526            self.ensure_block_loaded().await?;
527        }
528        Ok(self.doc())
529    }
530
531    pub fn seek_sync(&mut self, target: DocId) -> crate::Result<DocId> {
532        if let Some(doc) = self.seek_prepare(target) {
533            return Ok(doc);
534        }
535        self.ensure_block_loaded_sync()?;
536        if self.seek_finish(target) {
537            self.ensure_block_loaded_sync()?;
538        }
539        Ok(self.doc())
540    }
541
542    fn seek_prepare(&mut self, target: DocId) -> Option<DocId> {
543        if self.exhausted {
544            return Some(u32::MAX);
545        }
546
547        // Fast path: target is within the currently loaded block
548        if self.block_loaded
549            && let Some(&last) = self.doc_ids.last()
550        {
551            if last >= target && self.doc_ids[self.pos] < target {
552                let remaining = &self.doc_ids[self.pos..];
553                self.pos += crate::structures::simd::find_first_ge_u32(remaining, target);
554                if self.pos >= self.doc_ids.len() {
555                    self.block_idx += 1;
556                    self.block_loaded = false;
557                    if self.block_idx >= self.num_blocks {
558                        self.exhausted = true;
559                        return Some(u32::MAX);
560                    }
561                }
562                return Some(self.doc());
563            }
564            if self.doc_ids[self.pos] >= target {
565                return Some(self.doc());
566            }
567        }
568
569        // Seek to the block containing target
570        let lo = match &self.variant {
571            // Text: SIMD-accelerated 2-level seek (L1 + L0)
572            CursorVariant::Text { list, .. } => match list.seek_block(target, self.block_idx) {
573                Some(idx) => idx,
574                None => {
575                    self.exhausted = true;
576                    return Some(u32::MAX);
577                }
578            },
579            // Sparse: binary search on skip entries (lazy mmap reads)
580            CursorVariant::Sparse { .. } => {
581                let mut lo = self.block_idx;
582                let mut hi = self.num_blocks;
583                while lo < hi {
584                    let mid = lo + (hi - lo) / 2;
585                    if self.block_last_doc(mid) < target {
586                        lo = mid + 1;
587                    } else {
588                        hi = mid;
589                    }
590                }
591                lo
592            }
593        };
594        if lo >= self.num_blocks {
595            self.exhausted = true;
596            return Some(u32::MAX);
597        }
598        if lo != self.block_idx || !self.block_loaded {
599            self.block_idx = lo;
600            self.block_loaded = false;
601        }
602        None
603    }
604
605    #[inline]
606    fn seek_finish(&mut self, target: DocId) -> bool {
607        if self.exhausted {
608            return false;
609        }
610        self.pos = crate::structures::simd::find_first_ge_u32(&self.doc_ids, target);
611        if self.pos >= self.doc_ids.len() {
612            self.block_idx += 1;
613            self.block_loaded = false;
614            if self.block_idx >= self.num_blocks {
615                self.exhausted = true;
616                return false;
617            }
618            return true;
619        }
620        false
621    }
622}
623
624/// Macro to stamp out the Block-Max MaxScore loop for both async and sync paths.
625///
626/// `$ensure`, `$advance`, `$seek` are cursor method idents (async or _sync variants).
627/// `$($aw:tt)*` captures `.await` for async or nothing for sync.
628macro_rules! bms_execute_loop {
629    ($self:ident, $ensure:ident, $advance:ident, $seek:ident, $($aw:tt)*) => {{
630        let n = $self.cursors.len();
631
632        // Load first block for each cursor (ensures doc() returns real values)
633        for cursor in &mut $self.cursors {
634            cursor.$ensure() $($aw)* ?;
635        }
636
637        let mut docs_scored = 0u64;
638        let mut docs_skipped = 0u64;
639        let mut blocks_skipped = 0u64;
640        let mut conjunction_skipped = 0u64;
641        let mut ordinal_scores: Vec<(u16, f32)> = Vec::with_capacity(n * 2);
642
643        loop {
644            let partition = $self.find_partition();
645            if partition >= n {
646                break;
647            }
648
649            // Find minimum doc_id across essential cursors
650            let mut min_doc = u32::MAX;
651            for i in partition..n {
652                let doc = $self.cursors[i].doc();
653                if doc < min_doc {
654                    min_doc = doc;
655                }
656            }
657            if min_doc == u32::MAX {
658                break;
659            }
660
661            let non_essential_upper = if partition > 0 {
662                $self.prefix_sums[partition - 1]
663            } else {
664                0.0
665            };
666            let adjusted_threshold = $self.collector.threshold() * $self.heap_factor;
667
668            // --- Conjunction optimization ---
669            if $self.collector.len() >= $self.collector.k {
670                let present_upper: f32 = (partition..n)
671                    .filter(|&i| $self.cursors[i].doc() == min_doc)
672                    .map(|i| $self.cursors[i].max_score)
673                    .sum();
674
675                if present_upper + non_essential_upper <= adjusted_threshold {
676                    for i in partition..n {
677                        if $self.cursors[i].doc() == min_doc {
678                            $self.cursors[i].$ensure() $($aw)* ?;
679                            $self.cursors[i].$advance() $($aw)* ?;
680                        }
681                    }
682                    conjunction_skipped += 1;
683                    continue;
684                }
685            }
686
687            // --- Block-max pruning ---
688            if $self.collector.len() >= $self.collector.k {
689                let block_max_sum: f32 = (partition..n)
690                    .filter(|&i| $self.cursors[i].doc() == min_doc)
691                    .map(|i| $self.cursors[i].current_block_max_score())
692                    .sum();
693
694                if block_max_sum + non_essential_upper <= adjusted_threshold {
695                    for i in partition..n {
696                        if $self.cursors[i].doc() == min_doc {
697                            $self.cursors[i].skip_to_next_block();
698                            $self.cursors[i].$ensure() $($aw)* ?;
699                        }
700                    }
701                    blocks_skipped += 1;
702                    continue;
703                }
704            }
705
706            // --- Score essential cursors ---
707            ordinal_scores.clear();
708            for i in partition..n {
709                if $self.cursors[i].doc() == min_doc {
710                    $self.cursors[i].$ensure() $($aw)* ?;
711                    while $self.cursors[i].doc() == min_doc {
712                        ordinal_scores.push(($self.cursors[i].ordinal(), $self.cursors[i].score()));
713                        $self.cursors[i].$advance() $($aw)* ?;
714                    }
715                }
716            }
717
718            let essential_total: f32 = ordinal_scores.iter().map(|(_, s)| *s).sum();
719            if $self.collector.len() >= $self.collector.k
720                && essential_total + non_essential_upper <= adjusted_threshold
721            {
722                docs_skipped += 1;
723                continue;
724            }
725
726            // --- Score non-essential cursors (highest max_score first for early exit) ---
727            let mut running_total = essential_total;
728            for i in (0..partition).rev() {
729                if $self.collector.len() >= $self.collector.k
730                    && running_total + $self.prefix_sums[i] <= adjusted_threshold
731                {
732                    break;
733                }
734
735                let doc = $self.cursors[i].$seek(min_doc) $($aw)* ?;
736                if doc == min_doc {
737                    while $self.cursors[i].doc() == min_doc {
738                        let s = $self.cursors[i].score();
739                        running_total += s;
740                        ordinal_scores.push(($self.cursors[i].ordinal(), s));
741                        $self.cursors[i].$advance() $($aw)* ?;
742                    }
743                }
744            }
745
746            // --- Group by ordinal and insert ---
747            // Fast path: single entry (common for single-valued fields) — skip sort + grouping
748            if ordinal_scores.len() == 1 {
749                let (ord, score) = ordinal_scores[0];
750                if $self.collector.insert_with_ordinal(min_doc, score, ord) {
751                    docs_scored += 1;
752                } else {
753                    docs_skipped += 1;
754                }
755            } else if !ordinal_scores.is_empty() {
756                if ordinal_scores.len() > 2 {
757                    ordinal_scores.sort_unstable_by_key(|(ord, _)| *ord);
758                } else if ordinal_scores[0].0 > ordinal_scores[1].0 {
759                    ordinal_scores.swap(0, 1);
760                }
761                let mut j = 0;
762                while j < ordinal_scores.len() {
763                    let current_ord = ordinal_scores[j].0;
764                    let mut score = 0.0f32;
765                    while j < ordinal_scores.len() && ordinal_scores[j].0 == current_ord {
766                        score += ordinal_scores[j].1;
767                        j += 1;
768                    }
769                    if $self
770                        .collector
771                        .insert_with_ordinal(min_doc, score, current_ord)
772                    {
773                        docs_scored += 1;
774                    } else {
775                        docs_skipped += 1;
776                    }
777                }
778            }
779        }
780
781        let results: Vec<ScoredDoc> = $self
782            .collector
783            .into_sorted_results()
784            .into_iter()
785            .map(|(doc_id, score, ordinal)| ScoredDoc {
786                doc_id,
787                score,
788                ordinal,
789            })
790            .collect();
791
792        debug!(
793            "MaxScoreExecutor: scored={}, skipped={}, blocks_skipped={}, conjunction_skipped={}, returned={}, top_score={:.4}",
794            docs_scored,
795            docs_skipped,
796            blocks_skipped,
797            conjunction_skipped,
798            results.len(),
799            results.first().map(|r| r.score).unwrap_or(0.0)
800        );
801
802        Ok(results)
803    }};
804}
805
806impl<'a> MaxScoreExecutor<'a> {
807    /// Create a new executor from pre-built cursors.
808    ///
809    /// Cursors are sorted by max_score ascending (non-essential first) and
810    /// prefix sums are computed for the MaxScore partitioning.
811    pub(crate) fn new(mut cursors: Vec<TermCursor<'a>>, k: usize, heap_factor: f32) -> Self {
812        // Sort by max_score ascending (non-essential first)
813        cursors.sort_by(|a, b| {
814            a.max_score
815                .partial_cmp(&b.max_score)
816                .unwrap_or(Ordering::Equal)
817        });
818
819        let mut prefix_sums = Vec::with_capacity(cursors.len());
820        let mut cumsum = 0.0f32;
821        for c in &cursors {
822            cumsum += c.max_score;
823            prefix_sums.push(cumsum);
824        }
825
826        debug!(
827            "Creating MaxScoreExecutor: num_cursors={}, k={}, total_upper={:.4}, heap_factor={:.2}",
828            cursors.len(),
829            k,
830            cumsum,
831            heap_factor
832        );
833
834        Self {
835            cursors,
836            prefix_sums,
837            collector: ScoreCollector::new(k),
838            heap_factor: heap_factor.clamp(0.0, 1.0),
839        }
840    }
841
842    /// Create an executor for sparse vector queries.
843    ///
844    /// Builds `TermCursor::Sparse` for each matched dimension.
845    pub fn sparse(
846        sparse_index: &'a crate::segment::SparseIndex,
847        query_terms: Vec<(u32, f32)>,
848        k: usize,
849        heap_factor: f32,
850    ) -> Self {
851        let cursors: Vec<TermCursor<'a>> = query_terms
852            .iter()
853            .filter_map(|&(dim_id, qw)| {
854                let (skip_start, skip_count, global_max, block_data_offset) =
855                    sparse_index.get_skip_range_full(dim_id)?;
856                Some(TermCursor::sparse(
857                    sparse_index,
858                    qw,
859                    skip_start,
860                    skip_count,
861                    global_max,
862                    block_data_offset,
863                ))
864            })
865            .collect();
866        Self::new(cursors, k, heap_factor)
867    }
868
869    /// Create an executor for full-text BM25 queries.
870    ///
871    /// Builds `TermCursor::Text` for each posting list.
872    pub fn text(
873        posting_lists: Vec<(crate::structures::BlockPostingList, f32)>,
874        avg_field_len: f32,
875        k: usize,
876    ) -> Self {
877        let cursors: Vec<TermCursor<'a>> = posting_lists
878            .into_iter()
879            .map(|(pl, idf)| TermCursor::text(pl, idf, avg_field_len))
880            .collect();
881        Self::new(cursors, k, 1.0)
882    }
883
884    #[inline]
885    fn find_partition(&self) -> usize {
886        let threshold = self.collector.threshold() * self.heap_factor;
887        self.prefix_sums.partition_point(|&sum| sum <= threshold)
888    }
889
890    /// Execute Block-Max MaxScore and return top-k results (async).
891    pub async fn execute(mut self) -> crate::Result<Vec<ScoredDoc>> {
892        if self.cursors.is_empty() {
893            return Ok(Vec::new());
894        }
895        bms_execute_loop!(self, ensure_block_loaded, advance, seek, .await)
896    }
897
898    /// Synchronous execution — works when all cursors are text or mmap-backed sparse.
899    pub fn execute_sync(mut self) -> crate::Result<Vec<ScoredDoc>> {
900        if self.cursors.is_empty() {
901            return Ok(Vec::new());
902        }
903        bms_execute_loop!(self, ensure_block_loaded_sync, advance_sync, seek_sync,)
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910
911    #[test]
912    fn test_score_collector_basic() {
913        let mut collector = ScoreCollector::new(3);
914
915        collector.insert(1, 1.0);
916        collector.insert(2, 2.0);
917        collector.insert(3, 3.0);
918        assert_eq!(collector.threshold(), 1.0);
919
920        collector.insert(4, 4.0);
921        assert_eq!(collector.threshold(), 2.0);
922
923        let results = collector.into_sorted_results();
924        assert_eq!(results.len(), 3);
925        assert_eq!(results[0].0, 4); // Highest score
926        assert_eq!(results[1].0, 3);
927        assert_eq!(results[2].0, 2);
928    }
929
930    #[test]
931    fn test_score_collector_threshold() {
932        let mut collector = ScoreCollector::new(2);
933
934        collector.insert(1, 5.0);
935        collector.insert(2, 3.0);
936        assert_eq!(collector.threshold(), 3.0);
937
938        // Should not enter (score too low)
939        assert!(!collector.would_enter(2.0));
940        assert!(!collector.insert(3, 2.0));
941
942        // Should enter (score high enough)
943        assert!(collector.would_enter(4.0));
944        assert!(collector.insert(4, 4.0));
945        assert_eq!(collector.threshold(), 4.0);
946    }
947
948    #[test]
949    fn test_heap_entry_ordering() {
950        let mut heap = BinaryHeap::new();
951        heap.push(HeapEntry {
952            doc_id: 1,
953            score: 3.0,
954            ordinal: 0,
955        });
956        heap.push(HeapEntry {
957            doc_id: 2,
958            score: 1.0,
959            ordinal: 0,
960        });
961        heap.push(HeapEntry {
962            doc_id: 3,
963            score: 2.0,
964            ordinal: 0,
965        });
966
967        // Min-heap: lowest score should come out first
968        assert_eq!(heap.pop().unwrap().score, 1.0);
969        assert_eq!(heap.pop().unwrap().score, 2.0);
970        assert_eq!(heap.pop().unwrap().score, 3.0);
971    }
972}