Skip to main content

cjc_data/
adaptive_selection.rs

1//! Adaptive selection representations for TidyView.
2//!
3//! A `TidyView` describes a subset of rows from a base `DataFrame`. Until v2,
4//! the subset was stored as a single `BitMask` regardless of density. The
5//! adaptive engine picks one of five deterministic representations based on
6//! result density, so that sparse predicates do not pay the cost of a full
7//! bitscan and dense predicates retain the existing fast O(nrows/64) path.
8//!
9//! # Modes
10//!
11//! - `Empty` — no rows selected (zero allocation)
12//! - `All` — every row selected (zero allocation; nrows stored)
13//! - `SelectionVector` — ascending `Vec<u32>` of selected row indices (sparse)
14//! - `VerbatimMask` — backing `BitMask` for the dense path (≥30% density)
15//! - `Hybrid` — chunked, locally-classified per [`HYBRID_CHUNK_SIZE`]-row block.
16//!   Active for mid-density results when `nrows >= 2 * HYBRID_CHUNK_SIZE`.
17//!
18//! # Determinism
19//!
20//! - Iteration order is **always ascending row index** for every arm.
21//! - Density classification uses pure integer arithmetic; thresholds are
22//!   bit-stable across platforms.
23//! - `intersect`/`union` produce a single deterministic arm choice for any
24//!   pair of inputs.
25
26use crate::BitMask;
27
28// ── Constants ────────────────────────────────────────────────────────────────
29
30/// Sparse threshold: count strictly less than `nrows / SPARSE_DIVISOR` is sparse.
31const SPARSE_DIVISOR: usize = 1024;
32
33/// Dense threshold: count strictly greater than `nrows * DENSE_NUM / DENSE_DEN`
34/// is dense. 3/10 = 30%.
35const DENSE_NUM: usize = 3;
36const DENSE_DEN: usize = 10;
37
38/// Hybrid chunk size in rows. 4096 rows = 64 u64 words per dense chunk = 512 B.
39pub const HYBRID_CHUNK_SIZE: usize = 4096;
40
41/// Words-per-chunk for the Dense variant. `HYBRID_CHUNK_SIZE / 64`.
42const HYBRID_WORDS_PER_CHUNK: usize = HYBRID_CHUNK_SIZE / 64;
43
44/// Per-chunk sparse threshold: a chunk with < `HYBRID_CHUNK_SIZE / 32` set bits
45/// uses `HybridChunk::Sparse`. 128 hits at 4096-row chunk size.
46const HYBRID_CHUNK_SPARSE_THRESHOLD: usize = HYBRID_CHUNK_SIZE / 32;
47
48// ── HybridChunk ──────────────────────────────────────────────────────────────
49
50/// A per-chunk selection state. Each chunk represents `HYBRID_CHUNK_SIZE` rows
51/// (the final chunk may be partial — see `HybridChunk::partial_size`).
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum HybridChunk {
54    /// 0 hits in this chunk.
55    Empty,
56    /// All `chunk_len` rows selected (where `chunk_len` is the partial-aware
57    /// row count for this chunk).
58    All,
59    /// Sparse: chunk-local row indices in ascending order, range `[0, chunk_len)`.
60    Sparse(Vec<u16>),
61    /// Dense: word-packed bitmap, length `HYBRID_WORDS_PER_CHUNK`. Tail bits
62    /// past `chunk_len` are zero.
63    Dense(Box<[u64]>),
64}
65
66impl HybridChunk {
67    /// Count of selected rows in this chunk.
68    pub fn count(&self, chunk_len: usize) -> usize {
69        match self {
70            HybridChunk::Empty => 0,
71            HybridChunk::All => chunk_len,
72            HybridChunk::Sparse(rows) => rows.len(),
73            HybridChunk::Dense(words) => {
74                words.iter().map(|w| w.count_ones() as usize).sum()
75            }
76        }
77    }
78
79    /// True if chunk-local row `off` (in `[0, chunk_len)`) is selected.
80    pub fn contains_local(&self, off: usize, chunk_len: usize) -> bool {
81        if off >= chunk_len {
82            return false;
83        }
84        match self {
85            HybridChunk::Empty => false,
86            HybridChunk::All => true,
87            HybridChunk::Sparse(rows) => rows.binary_search(&(off as u16)).is_ok(),
88            HybridChunk::Dense(words) => {
89                let wi = off / 64;
90                let bi = off % 64;
91                (words[wi] >> bi) & 1 == 1
92            }
93        }
94    }
95}
96
97// ── AdaptiveSelection ────────────────────────────────────────────────────────
98
99/// A row-selection representation chosen adaptively by density.
100///
101/// All variants carry `nrows` (the size of the underlying DataFrame) so that
102/// `count`/`contains`/iteration are answerable without consulting the base.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum AdaptiveSelection {
105    /// No rows selected. `nrows` is the underlying frame size.
106    Empty { nrows: usize },
107    /// Every row selected.
108    All { nrows: usize },
109    /// Sparse: ascending u32 row indices.
110    SelectionVector { rows: Vec<u32>, nrows: usize },
111    /// Dense: backing BitMask.
112    VerbatimMask { mask: BitMask },
113    /// Mid-density chunked representation. `chunks.len() == ceil(nrows / HYBRID_CHUNK_SIZE)`.
114    Hybrid { nrows: usize, chunks: Vec<HybridChunk> },
115}
116
117impl AdaptiveSelection {
118    // ── Constructors ─────────────────────────────────────────────────────
119
120    /// All rows selected.
121    pub fn all(nrows: usize) -> Self {
122        AdaptiveSelection::All { nrows }
123    }
124
125    /// No rows selected.
126    pub fn empty(nrows: usize) -> Self {
127        AdaptiveSelection::Empty { nrows }
128    }
129
130    /// Wrap a `BitMask` directly (force the dense arm). Used by call sites
131    /// that already produced a mask through the existing pipeline and do
132    /// not want a re-classification.
133    pub fn from_bitmask(mask: BitMask) -> Self {
134        let nrows = mask.nrows();
135        let count = mask.count_ones();
136        if count == 0 {
137            AdaptiveSelection::Empty { nrows }
138        } else if count == nrows {
139            AdaptiveSelection::All { nrows }
140        } else {
141            AdaptiveSelection::VerbatimMask { mask }
142        }
143    }
144
145    /// Construct from raw predicate-result words (LSB-first within each u64).
146    ///
147    /// This is the canonical entry point from `try_eval_predicate_columnar`
148    /// and the row-wise filter fallback. The classifier:
149    ///
150    /// 1. Counts set bits across `words`.
151    /// 2. Picks `Empty`/`All`/`SelectionVector`/`VerbatimMask` per density.
152    /// 3. Mid-band selections become `Hybrid` when `nrows >= 2 * HYBRID_CHUNK_SIZE`.
153    ///
154    /// The caller is responsible for masking off tail bits past `nrows` before
155    /// invoking — `BitMask::all_true` / `BitMask::from_bools` and the
156    /// columnar predicate path already do this.
157    pub fn from_predicate_result(words: Vec<u64>, nrows: usize) -> Self {
158        let count: usize = words.iter().map(|w| w.count_ones() as usize).sum();
159        Self::classify(words, nrows, count)
160    }
161
162    /// Test-only and conversion helper — same as `from_predicate_result` but
163    /// caller already knows the count.
164    pub fn from_words_with_count(words: Vec<u64>, nrows: usize, count: usize) -> Self {
165        Self::classify(words, nrows, count)
166    }
167
168    fn classify(words: Vec<u64>, nrows: usize, count: usize) -> Self {
169        if count == 0 {
170            return AdaptiveSelection::Empty { nrows };
171        }
172        if count == nrows {
173            return AdaptiveSelection::All { nrows };
174        }
175        if Self::is_sparse(count, nrows) {
176            let rows = words_to_indices(&words);
177            return AdaptiveSelection::SelectionVector { rows, nrows };
178        }
179        let mask = bitmask_from_words(words, nrows);
180        if Self::is_dense(count, nrows) {
181            AdaptiveSelection::VerbatimMask { mask }
182        } else if Self::should_hybrid(nrows) {
183            // Mid-band: chunk it. Cheap one-pass classifier per chunk.
184            let chunks = chunks_from_mask(&mask);
185            AdaptiveSelection::Hybrid { nrows, chunks }
186        } else {
187            // Mid-band but the frame is too small to benefit from chunking.
188            AdaptiveSelection::VerbatimMask { mask }
189        }
190    }
191
192    /// `count < nrows / 1024`. Saturates safely for nrows < 1024 (then no
193    /// non-empty selection is sparse, which is correct: a hand-typed
194    /// 5-element df with 1 hit should not allocate a `Vec<u32>`).
195    #[inline]
196    fn is_sparse(count: usize, nrows: usize) -> bool {
197        count < nrows / SPARSE_DIVISOR
198    }
199
200    /// `count > nrows * 3/10`. Uses 128-bit intermediate to avoid overflow on
201    /// extreme nrows.
202    #[inline]
203    fn is_dense(count: usize, nrows: usize) -> bool {
204        let lhs = (count as u128) * (DENSE_DEN as u128);
205        let rhs = (nrows as u128) * (DENSE_NUM as u128);
206        lhs > rhs
207    }
208
209    /// Frames smaller than 2 chunks gain nothing from chunking — keep them
210    /// on `VerbatimMask`.
211    #[inline]
212    fn should_hybrid(nrows: usize) -> bool {
213        nrows >= 2 * HYBRID_CHUNK_SIZE
214    }
215
216    // ── SelectionRepr surface ────────────────────────────────────────────
217
218    /// Total number of rows in the underlying DataFrame.
219    pub fn nrows(&self) -> usize {
220        match self {
221            AdaptiveSelection::Empty { nrows } => *nrows,
222            AdaptiveSelection::All { nrows } => *nrows,
223            AdaptiveSelection::SelectionVector { nrows, .. } => *nrows,
224            AdaptiveSelection::VerbatimMask { mask } => mask.nrows(),
225            AdaptiveSelection::Hybrid { nrows, .. } => *nrows,
226        }
227    }
228
229    /// Number of rows actually selected.
230    pub fn count(&self) -> usize {
231        match self {
232            AdaptiveSelection::Empty { .. } => 0,
233            AdaptiveSelection::All { nrows } => *nrows,
234            AdaptiveSelection::SelectionVector { rows, .. } => rows.len(),
235            AdaptiveSelection::VerbatimMask { mask } => mask.count_ones(),
236            AdaptiveSelection::Hybrid { nrows, chunks } => {
237                let mut total = 0;
238                for (i, c) in chunks.iter().enumerate() {
239                    total += c.count(chunk_len_for(i, *nrows));
240                }
241                total
242            }
243        }
244    }
245
246    /// True if `row` is in the selection. Out-of-range rows return `false`.
247    pub fn contains(&self, row: usize) -> bool {
248        match self {
249            AdaptiveSelection::Empty { .. } => false,
250            AdaptiveSelection::All { nrows } => row < *nrows,
251            AdaptiveSelection::SelectionVector { rows, nrows } => {
252                if row >= *nrows {
253                    return false;
254                }
255                let target = row as u32;
256                rows.binary_search(&target).is_ok()
257            }
258            AdaptiveSelection::VerbatimMask { mask } => {
259                if row >= mask.nrows() {
260                    return false;
261                }
262                mask.get(row)
263            }
264            AdaptiveSelection::Hybrid { nrows, chunks } => {
265                if row >= *nrows {
266                    return false;
267                }
268                let ci = row / HYBRID_CHUNK_SIZE;
269                let off = row % HYBRID_CHUNK_SIZE;
270                chunks[ci].contains_local(off, chunk_len_for(ci, *nrows))
271            }
272        }
273    }
274
275    /// Iterate selected rows in ascending order.
276    pub fn iter_indices(&self) -> SelectionIndices<'_> {
277        match self {
278            AdaptiveSelection::Empty { .. } => SelectionIndices::Empty,
279            AdaptiveSelection::All { nrows } => SelectionIndices::Range(0..*nrows),
280            AdaptiveSelection::SelectionVector { rows, .. } => SelectionIndices::Vec(rows.iter()),
281            AdaptiveSelection::VerbatimMask { mask } => SelectionIndices::Mask {
282                mask,
283                next: 0,
284                nrows: mask.nrows(),
285            },
286            AdaptiveSelection::Hybrid { nrows, chunks } => SelectionIndices::Hybrid {
287                chunks,
288                nrows: *nrows,
289                ci: 0,
290                inner: HybridInner::Start,
291            },
292        }
293    }
294
295    /// Materialize the selection as a `BitMask`. Always allocates `O(nrows/64)`.
296    pub fn materialize_mask(&self) -> BitMask {
297        match self {
298            AdaptiveSelection::Empty { nrows } => BitMask::all_false(*nrows),
299            AdaptiveSelection::All { nrows } => BitMask::all_true(*nrows),
300            AdaptiveSelection::SelectionVector { rows, nrows } => {
301                let mut bools = vec![false; *nrows];
302                for &r in rows {
303                    bools[r as usize] = true;
304                }
305                BitMask::from_bools(&bools)
306            }
307            AdaptiveSelection::VerbatimMask { mask } => mask.clone(),
308            AdaptiveSelection::Hybrid { nrows, chunks } => {
309                let mut bools = vec![false; *nrows];
310                for (ci, chunk) in chunks.iter().enumerate() {
311                    let base = ci * HYBRID_CHUNK_SIZE;
312                    let chunk_len = chunk_len_for(ci, *nrows);
313                    match chunk {
314                        HybridChunk::Empty => {}
315                        HybridChunk::All => {
316                            for off in 0..chunk_len {
317                                bools[base + off] = true;
318                            }
319                        }
320                        HybridChunk::Sparse(rows) => {
321                            for &r in rows {
322                                bools[base + r as usize] = true;
323                            }
324                        }
325                        HybridChunk::Dense(words) => {
326                            for off in 0..chunk_len {
327                                let wi = off / 64;
328                                let bi = off % 64;
329                                if (words[wi] >> bi) & 1 == 1 {
330                                    bools[base + off] = true;
331                                }
332                            }
333                        }
334                    }
335                }
336                BitMask::from_bools(&bools)
337            }
338        }
339    }
340
341    /// Materialize the selection as an ascending `Vec<u32>` of row indices.
342    pub fn materialize_indices(&self) -> Vec<u32> {
343        match self {
344            AdaptiveSelection::Empty { .. } => Vec::new(),
345            AdaptiveSelection::All { nrows } => (0..*nrows as u32).collect(),
346            AdaptiveSelection::SelectionVector { rows, .. } => rows.clone(),
347            AdaptiveSelection::VerbatimMask { mask } => {
348                mask.iter_set().map(|i| i as u32).collect()
349            }
350            AdaptiveSelection::Hybrid { nrows, chunks } => {
351                let mut out = Vec::with_capacity(self.count());
352                for (ci, chunk) in chunks.iter().enumerate() {
353                    let base = (ci * HYBRID_CHUNK_SIZE) as u32;
354                    let chunk_len = chunk_len_for(ci, *nrows);
355                    match chunk {
356                        HybridChunk::Empty => {}
357                        HybridChunk::All => {
358                            for off in 0..chunk_len as u32 {
359                                out.push(base + off);
360                            }
361                        }
362                        HybridChunk::Sparse(rows) => {
363                            for &r in rows {
364                                out.push(base + r as u32);
365                            }
366                        }
367                        HybridChunk::Dense(words) => {
368                            for (wi, &w) in words.iter().enumerate() {
369                                let mut bits = w;
370                                while bits != 0 {
371                                    let tz = bits.trailing_zeros() as usize;
372                                    let off = wi * 64 + tz;
373                                    if off < chunk_len {
374                                        out.push(base + off as u32);
375                                    }
376                                    bits &= bits - 1;
377                                }
378                            }
379                        }
380                    }
381                }
382                out
383            }
384        }
385    }
386
387    /// Intersection (AND). Output mode is re-classified by density.
388    ///
389    /// Panics if `nrows` differ — programming error (different base frames).
390    pub fn intersect(&self, other: &AdaptiveSelection) -> AdaptiveSelection {
391        assert_eq!(
392            self.nrows(),
393            other.nrows(),
394            "AdaptiveSelection::intersect: nrows mismatch ({} vs {})",
395            self.nrows(),
396            other.nrows()
397        );
398        // Identity / annihilator fast paths
399        match (self, other) {
400            (AdaptiveSelection::Empty { nrows }, _) | (_, AdaptiveSelection::Empty { nrows }) => {
401                return AdaptiveSelection::Empty { nrows: *nrows };
402            }
403            (AdaptiveSelection::All { .. }, _) => return other.clone(),
404            (_, AdaptiveSelection::All { .. }) => return self.clone(),
405            _ => {}
406        }
407        // Sparse ∩ Sparse: merge-walk in O(|A|+|B|), no bitmap allocation.
408        if let (
409            AdaptiveSelection::SelectionVector { rows: a, nrows },
410            AdaptiveSelection::SelectionVector { rows: b, .. },
411        ) = (self, other)
412        {
413            let merged = sorted_merge_intersect(a, b);
414            return Self::classify_sparse(merged, *nrows);
415        }
416        // Sparse ∩ VerbatimMask: filter the sparse vector by mask test.
417        if let (
418            AdaptiveSelection::SelectionVector { rows, nrows },
419            AdaptiveSelection::VerbatimMask { mask },
420        )
421        | (
422            AdaptiveSelection::VerbatimMask { mask },
423            AdaptiveSelection::SelectionVector { rows, nrows },
424        ) = (self, other)
425        {
426            let filtered: Vec<u32> =
427                rows.iter().copied().filter(|&r| mask.get(r as usize)).collect();
428            return Self::classify_sparse(filtered, *nrows);
429        }
430        // ── v3 Phase 3: Hybrid streaming fast paths ──────────────────────
431        // Hybrid ∩ Hybrid: per-chunk dispatch over the 16-way shape table.
432        if let (
433            AdaptiveSelection::Hybrid { nrows, chunks: ac },
434            AdaptiveSelection::Hybrid { chunks: bc, .. },
435        ) = (self, other)
436        {
437            let mut out = Vec::with_capacity(ac.len());
438            for ci in 0..ac.len() {
439                let chunk_len = chunk_len_for(ci, *nrows);
440                out.push(intersect_chunks(&ac[ci], &bc[ci], chunk_len));
441            }
442            return Self::simplify_hybrid(*nrows, out);
443        }
444        // Hybrid ∩ SelectionVector: walk the sparse vector once, dispatch
445        // each row to its chunk via row >> 12. Output stays sparse.
446        if let (
447            AdaptiveSelection::Hybrid { nrows, chunks },
448            AdaptiveSelection::SelectionVector { rows, .. },
449        )
450        | (
451            AdaptiveSelection::SelectionVector { rows, .. },
452            AdaptiveSelection::Hybrid { nrows, chunks },
453        ) = (self, other)
454        {
455            let mut filtered: Vec<u32> = Vec::with_capacity(rows.len());
456            for &r in rows {
457                let row = r as usize;
458                let ci = row / HYBRID_CHUNK_SIZE;
459                let off = row % HYBRID_CHUNK_SIZE;
460                let chunk_len = chunk_len_for(ci, *nrows);
461                if chunks[ci].contains_local(off, chunk_len) {
462                    filtered.push(r);
463                }
464            }
465            return Self::classify_sparse(filtered, *nrows);
466        }
467        // Hybrid ∩ VerbatimMask: per-chunk word-AND against the matching
468        // 64-word slice of the verbatim mask. Output stays Hybrid.
469        if let (
470            AdaptiveSelection::Hybrid { nrows, chunks },
471            AdaptiveSelection::VerbatimMask { mask },
472        )
473        | (
474            AdaptiveSelection::VerbatimMask { mask },
475            AdaptiveSelection::Hybrid { nrows, chunks },
476        ) = (self, other)
477        {
478            let words = mask.words_slice();
479            let mut out_chunks = Vec::with_capacity(chunks.len());
480            for ci in 0..chunks.len() {
481                let chunk_len = chunk_len_for(ci, *nrows);
482                let w_start = ci * HYBRID_WORDS_PER_CHUNK;
483                let w_end = (w_start + HYBRID_WORDS_PER_CHUNK).min(words.len());
484                let mask_chunk = &words[w_start..w_end];
485                out_chunks.push(intersect_chunk_with_words(
486                    &chunks[ci],
487                    mask_chunk,
488                    chunk_len,
489                ));
490            }
491            return Self::simplify_hybrid(*nrows, out_chunks);
492        }
493        // General path: materialize both as words, AND, reclassify.
494        let lhs = self.materialize_mask();
495        let rhs = other.materialize_mask();
496        let words: Vec<u64> = lhs
497            .words_slice()
498            .iter()
499            .zip(rhs.words_slice().iter())
500            .map(|(a, b)| a & b)
501            .collect();
502        AdaptiveSelection::from_predicate_result(words, lhs.nrows())
503    }
504
505    /// Union (OR). Output mode re-classified by density.
506    pub fn union(&self, other: &AdaptiveSelection) -> AdaptiveSelection {
507        assert_eq!(
508            self.nrows(),
509            other.nrows(),
510            "AdaptiveSelection::union: nrows mismatch ({} vs {})",
511            self.nrows(),
512            other.nrows()
513        );
514        match (self, other) {
515            (AdaptiveSelection::All { nrows }, _) | (_, AdaptiveSelection::All { nrows }) => {
516                return AdaptiveSelection::All { nrows: *nrows };
517            }
518            (AdaptiveSelection::Empty { .. }, _) => return other.clone(),
519            (_, AdaptiveSelection::Empty { .. }) => return self.clone(),
520            _ => {}
521        }
522        // Sparse ∪ Sparse: merge-walk, no bitmap allocation.
523        if let (
524            AdaptiveSelection::SelectionVector { rows: a, nrows },
525            AdaptiveSelection::SelectionVector { rows: b, .. },
526        ) = (self, other)
527        {
528            let merged = sorted_merge_union(a, b);
529            // Re-classify: union may push us out of the sparse band.
530            if merged.len() >= *nrows / SPARSE_DIVISOR {
531                // No longer sparse — fall through to bitmap path below.
532            } else {
533                return Self::classify_sparse(merged, *nrows);
534            }
535        }
536        // ── v3 Phase 3: Hybrid streaming fast paths ──────────────────────
537        // Hybrid ∪ Hybrid: per-chunk dispatch.
538        if let (
539            AdaptiveSelection::Hybrid { nrows, chunks: ac },
540            AdaptiveSelection::Hybrid { chunks: bc, .. },
541        ) = (self, other)
542        {
543            let mut out = Vec::with_capacity(ac.len());
544            for ci in 0..ac.len() {
545                let chunk_len = chunk_len_for(ci, *nrows);
546                out.push(union_chunks(&ac[ci], &bc[ci], chunk_len));
547            }
548            return Self::simplify_hybrid(*nrows, out);
549        }
550        // Hybrid ∪ SelectionVector: scatter the sparse rows into the
551        // matching chunks. Output stays Hybrid.
552        if let (
553            AdaptiveSelection::Hybrid { nrows, chunks },
554            AdaptiveSelection::SelectionVector { rows, .. },
555        )
556        | (
557            AdaptiveSelection::SelectionVector { rows, .. },
558            AdaptiveSelection::Hybrid { nrows, chunks },
559        ) = (self, other)
560        {
561            let mut out_chunks: Vec<HybridChunk> = chunks.clone();
562            // Bucket sparse rows by chunk index, then union each bucket.
563            // Single linear pass: ascending input, ascending bucket order.
564            let mut i = 0usize;
565            while i < rows.len() {
566                let first_row = rows[i] as usize;
567                let ci = first_row / HYBRID_CHUNK_SIZE;
568                let chunk_base = ci * HYBRID_CHUNK_SIZE;
569                let chunk_end = chunk_base + HYBRID_CHUNK_SIZE;
570                let mut bucket: Vec<u16> = Vec::new();
571                while i < rows.len() && (rows[i] as usize) < chunk_end {
572                    bucket.push((rows[i] as usize - chunk_base) as u16);
573                    i += 1;
574                }
575                let chunk_len = chunk_len_for(ci, *nrows);
576                let bucket_chunk = if bucket.len() == chunk_len {
577                    HybridChunk::All
578                } else {
579                    HybridChunk::Sparse(bucket)
580                };
581                out_chunks[ci] = union_chunks(&out_chunks[ci], &bucket_chunk, chunk_len);
582            }
583            return Self::simplify_hybrid(*nrows, out_chunks);
584        }
585        // Hybrid ∪ VerbatimMask: per-chunk word-OR against the matching
586        // 64-word slice. Output stays Hybrid.
587        if let (
588            AdaptiveSelection::Hybrid { nrows, chunks },
589            AdaptiveSelection::VerbatimMask { mask },
590        )
591        | (
592            AdaptiveSelection::VerbatimMask { mask },
593            AdaptiveSelection::Hybrid { nrows, chunks },
594        ) = (self, other)
595        {
596            let words = mask.words_slice();
597            let mut out_chunks = Vec::with_capacity(chunks.len());
598            for ci in 0..chunks.len() {
599                let chunk_len = chunk_len_for(ci, *nrows);
600                let w_start = ci * HYBRID_WORDS_PER_CHUNK;
601                let w_end = (w_start + HYBRID_WORDS_PER_CHUNK).min(words.len());
602                let mask_chunk = &words[w_start..w_end];
603                out_chunks.push(union_chunk_with_words(
604                    &chunks[ci],
605                    mask_chunk,
606                    chunk_len,
607                ));
608            }
609            return Self::simplify_hybrid(*nrows, out_chunks);
610        }
611        let lhs = self.materialize_mask();
612        let rhs = other.materialize_mask();
613        let words: Vec<u64> = lhs
614            .words_slice()
615            .iter()
616            .zip(rhs.words_slice().iter())
617            .map(|(a, b)| a | b)
618            .collect();
619        AdaptiveSelection::from_predicate_result(words, lhs.nrows())
620    }
621
622    /// Stable identifier of the chosen mode. Useful for debug, tests, and
623    /// the planned `explain_selection_mode` user-facing call.
624    pub fn explain_selection_mode(&self) -> &'static str {
625        match self {
626            AdaptiveSelection::Empty { .. } => "Empty",
627            AdaptiveSelection::All { .. } => "All",
628            AdaptiveSelection::SelectionVector { .. } => "SelectionVector",
629            AdaptiveSelection::VerbatimMask { .. } => "VerbatimMask",
630            AdaptiveSelection::Hybrid { .. } => "Hybrid",
631        }
632    }
633
634    // ── Internal helpers ─────────────────────────────────────────────────
635
636    /// Build a `SelectionVector`/`Empty`/`All` from an already-sorted-ascending
637    /// `Vec<u32>`. Used by sparse-fast-path set ops.
638    fn classify_sparse(rows: Vec<u32>, nrows: usize) -> Self {
639        if rows.is_empty() {
640            AdaptiveSelection::Empty { nrows }
641        } else if rows.len() == nrows {
642            AdaptiveSelection::All { nrows }
643        } else {
644            AdaptiveSelection::SelectionVector { rows, nrows }
645        }
646    }
647
648    /// Collapse a chunked Hybrid result into `Empty`/`All` if every chunk
649    /// agrees; otherwise keep the chunked layout. Phase 3 deliberately does
650    /// not re-globalize mid-density Hybrids — preserving chunks is the point.
651    fn simplify_hybrid(nrows: usize, chunks: Vec<HybridChunk>) -> AdaptiveSelection {
652        let mut total = 0usize;
653        for (i, c) in chunks.iter().enumerate() {
654            total += c.count(chunk_len_for(i, nrows));
655        }
656        if total == 0 {
657            return AdaptiveSelection::Empty { nrows };
658        }
659        if total == nrows {
660            return AdaptiveSelection::All { nrows };
661        }
662        AdaptiveSelection::Hybrid { nrows, chunks }
663    }
664}
665
666// ── Iterator ─────────────────────────────────────────────────────────────────
667
668/// An ascending iterator over selected row indices. One variant per
669/// `AdaptiveSelection` arm so the hot path stays monomorphic and inlines.
670pub enum SelectionIndices<'a> {
671    Empty,
672    Range(std::ops::Range<usize>),
673    Vec(std::slice::Iter<'a, u32>),
674    Mask {
675        mask: &'a BitMask,
676        next: usize,
677        nrows: usize,
678    },
679    Hybrid {
680        chunks: &'a [HybridChunk],
681        nrows: usize,
682        /// Current chunk index.
683        ci: usize,
684        inner: HybridInner<'a>,
685    },
686}
687
688/// Per-chunk iteration state.
689pub enum HybridInner<'a> {
690    /// About to advance to the next chunk.
691    Start,
692    /// Iterating an `All` chunk: emit `[next, end)` plus chunk base.
693    AllRange { base: u32, next: u32, end: u32 },
694    /// Iterating a sparse chunk's index list.
695    Sparse {
696        base: u32,
697        iter: std::slice::Iter<'a, u16>,
698    },
699    /// Iterating a dense chunk's word vector.
700    Dense {
701        base: u32,
702        words: &'a [u64],
703        wi: usize,
704        bits: u64,
705        chunk_len: usize,
706    },
707}
708
709impl<'a> Iterator for SelectionIndices<'a> {
710    type Item = usize;
711    fn next(&mut self) -> Option<usize> {
712        match self {
713            SelectionIndices::Empty => None,
714            SelectionIndices::Range(r) => r.next(),
715            SelectionIndices::Vec(it) => it.next().map(|&v| v as usize),
716            SelectionIndices::Mask { mask, next, nrows } => {
717                while *next < *nrows {
718                    let i = *next;
719                    *next += 1;
720                    if mask.get(i) {
721                        return Some(i);
722                    }
723                }
724                None
725            }
726            SelectionIndices::Hybrid {
727                chunks,
728                nrows,
729                ci,
730                inner,
731            } => loop {
732                // Try to drain the current chunk's inner iterator first.
733                match inner {
734                    HybridInner::Start => {
735                        // Fall through to chunk advance.
736                    }
737                    HybridInner::AllRange { base, next, end } => {
738                        if *next < *end {
739                            let v = *next;
740                            *next += 1;
741                            return Some((*base + v) as usize);
742                        }
743                        // exhausted; fall through
744                    }
745                    HybridInner::Sparse { base, iter } => {
746                        if let Some(&off) = iter.next() {
747                            return Some((*base + off as u32) as usize);
748                        }
749                        // exhausted; fall through
750                    }
751                    HybridInner::Dense {
752                        base,
753                        words,
754                        wi,
755                        bits,
756                        chunk_len,
757                    } => {
758                        loop {
759                            if *bits != 0 {
760                                let tz = bits.trailing_zeros() as usize;
761                                let off = *wi * 64 + tz;
762                                *bits &= *bits - 1;
763                                if off < *chunk_len {
764                                    return Some((*base + off as u32) as usize);
765                                }
766                                continue;
767                            }
768                            *wi += 1;
769                            if *wi >= words.len() {
770                                break; // exhausted current chunk
771                            }
772                            *bits = words[*wi];
773                        }
774                    }
775                }
776                // Advance to next chunk
777                if *ci >= chunks.len() {
778                    return None;
779                }
780                let chunk = &chunks[*ci];
781                let chunk_idx = *ci;
782                *ci += 1;
783                let chunk_len = chunk_len_for(chunk_idx, *nrows);
784                let base = (chunk_idx * HYBRID_CHUNK_SIZE) as u32;
785                *inner = match chunk {
786                    HybridChunk::Empty => HybridInner::Start,
787                    HybridChunk::All => HybridInner::AllRange {
788                        base,
789                        next: 0,
790                        end: chunk_len as u32,
791                    },
792                    HybridChunk::Sparse(rows) => HybridInner::Sparse {
793                        base,
794                        iter: rows.iter(),
795                    },
796                    HybridChunk::Dense(words) => HybridInner::Dense {
797                        base,
798                        words: &words[..],
799                        wi: 0,
800                        bits: words[0],
801                        chunk_len,
802                    },
803                };
804            },
805        }
806    }
807}
808
809// ── Helpers ──────────────────────────────────────────────────────────────────
810
811/// Convert raw u64 words into an ascending `Vec<u32>` of set-bit indices.
812///
813/// Uses `trailing_zeros` to skip zero runs — O(count), not O(nrows).
814fn words_to_indices(words: &[u64]) -> Vec<u32> {
815    let mut out = Vec::with_capacity(64);
816    for (wi, &w) in words.iter().enumerate() {
817        let mut bits = w;
818        while bits != 0 {
819            let tz = bits.trailing_zeros() as usize;
820            let row = wi * 64 + tz;
821            out.push(row as u32);
822            // Clear the lowest set bit
823            bits &= bits - 1;
824        }
825    }
826    out
827}
828
829/// Build a `BitMask` from raw words (used after AND/OR ops). Tail bits past
830/// `nrows` are masked off to preserve the BitMask invariant.
831fn bitmask_from_words(mut words: Vec<u64>, nrows: usize) -> BitMask {
832    let nwords = (nrows + 63) / 64;
833    if words.len() < nwords {
834        words.resize(nwords, 0);
835    } else if words.len() > nwords {
836        words.truncate(nwords);
837    }
838    if nwords > 0 && nrows % 64 != 0 {
839        let tail = nrows % 64;
840        words[nwords - 1] &= (1u64 << tail) - 1;
841    }
842    BitMask::from_words_for_test(words, nrows)
843}
844
845/// Row count for chunk `ci` given a total `nrows`. The final chunk may be
846/// partial.
847#[inline]
848fn chunk_len_for(ci: usize, nrows: usize) -> usize {
849    let base = ci * HYBRID_CHUNK_SIZE;
850    let remaining = nrows.saturating_sub(base);
851    remaining.min(HYBRID_CHUNK_SIZE)
852}
853
854/// Slice the words of a fully-materialized BitMask into per-chunk
855/// `HybridChunk`s, classifying each chunk by local density.
856fn chunks_from_mask(mask: &BitMask) -> Vec<HybridChunk> {
857    let nrows = mask.nrows();
858    let total_words = mask.words_slice().len();
859    let nchunks = (nrows + HYBRID_CHUNK_SIZE - 1) / HYBRID_CHUNK_SIZE;
860    let mut chunks = Vec::with_capacity(nchunks);
861    let words = mask.words_slice();
862    for ci in 0..nchunks {
863        let chunk_len = chunk_len_for(ci, nrows);
864        let w_start = ci * HYBRID_WORDS_PER_CHUNK;
865        let w_end = (w_start + HYBRID_WORDS_PER_CHUNK).min(total_words);
866        let chunk_words = &words[w_start..w_end];
867        let count: usize = chunk_words.iter().map(|w| w.count_ones() as usize).sum();
868        let chunk = if count == 0 {
869            HybridChunk::Empty
870        } else if count == chunk_len {
871            HybridChunk::All
872        } else if count < HYBRID_CHUNK_SPARSE_THRESHOLD {
873            // Sparse chunk: pull u16 offsets directly from the words.
874            let mut offs: Vec<u16> = Vec::with_capacity(count);
875            for (i, &w) in chunk_words.iter().enumerate() {
876                let mut bits = w;
877                while bits != 0 {
878                    let tz = bits.trailing_zeros() as usize;
879                    let off = i * 64 + tz;
880                    if off < chunk_len {
881                        offs.push(off as u16);
882                    }
883                    bits &= bits - 1;
884                }
885            }
886            HybridChunk::Sparse(offs)
887        } else {
888            // Dense chunk: copy the (up to) HYBRID_WORDS_PER_CHUNK words.
889            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
890            for (i, &w) in chunk_words.iter().enumerate() {
891                buf[i] = w;
892            }
893            HybridChunk::Dense(buf.into_boxed_slice())
894        };
895        chunks.push(chunk);
896    }
897    chunks
898}
899
900// ── v3 Phase 3: per-chunk set-op helpers ─────────────────────────────────────
901
902/// Per-chunk classifier: take an ascending `Vec<u16>` of chunk-local offsets
903/// and a `chunk_len`, decide between `Empty`/`All`/`Sparse`/`Dense`. Used by
904/// outputs of sparse-shaped per-chunk operations.
905fn classify_sparse_chunk(offs: Vec<u16>, chunk_len: usize) -> HybridChunk {
906    if offs.is_empty() {
907        HybridChunk::Empty
908    } else if offs.len() == chunk_len {
909        HybridChunk::All
910    } else {
911        // Per-Phase-3 invariant: sparse output of sparse-shape input keeps
912        // the sparse arm regardless of size — overflow into Dense is the
913        // caller's responsibility (they classify via `classify_dense_chunk`).
914        HybridChunk::Sparse(offs)
915    }
916}
917
918/// Per-chunk classifier: take a populated word buffer and a `chunk_len`,
919/// pick the most compact arm by local count. Used by outputs of word-shaped
920/// per-chunk operations.
921fn classify_dense_chunk(buf: Vec<u64>, chunk_len: usize) -> HybridChunk {
922    let count: usize = buf.iter().map(|w| w.count_ones() as usize).sum();
923    if count == 0 {
924        HybridChunk::Empty
925    } else if count == chunk_len {
926        HybridChunk::All
927    } else if count < HYBRID_CHUNK_SPARSE_THRESHOLD {
928        // Demote to Sparse: extract chunk-local offsets ascending.
929        let mut offs = Vec::with_capacity(count);
930        for (i, &w) in buf.iter().enumerate() {
931            let mut bits = w;
932            while bits != 0 {
933                let tz = bits.trailing_zeros() as usize;
934                let off = i * 64 + tz;
935                if off < chunk_len {
936                    offs.push(off as u16);
937                }
938                bits &= bits - 1;
939            }
940        }
941        HybridChunk::Sparse(offs)
942    } else {
943        HybridChunk::Dense(buf.into_boxed_slice())
944    }
945}
946
947/// Merge-walk intersection of two sorted ascending `&[u16]` slices.
948fn merge_intersect_u16(a: &[u16], b: &[u16]) -> Vec<u16> {
949    let mut out = Vec::with_capacity(a.len().min(b.len()));
950    let (mut i, mut j) = (0, 0);
951    while i < a.len() && j < b.len() {
952        match a[i].cmp(&b[j]) {
953            std::cmp::Ordering::Less => i += 1,
954            std::cmp::Ordering::Greater => j += 1,
955            std::cmp::Ordering::Equal => {
956                out.push(a[i]);
957                i += 1;
958                j += 1;
959            }
960        }
961    }
962    out
963}
964
965/// Merge-walk union of two sorted ascending `&[u16]` slices.
966fn merge_union_u16(a: &[u16], b: &[u16]) -> Vec<u16> {
967    let mut out = Vec::with_capacity(a.len() + b.len());
968    let (mut i, mut j) = (0, 0);
969    while i < a.len() && j < b.len() {
970        match a[i].cmp(&b[j]) {
971            std::cmp::Ordering::Less => {
972                out.push(a[i]);
973                i += 1;
974            }
975            std::cmp::Ordering::Greater => {
976                out.push(b[j]);
977                j += 1;
978            }
979            std::cmp::Ordering::Equal => {
980                out.push(a[i]);
981                i += 1;
982                j += 1;
983            }
984        }
985    }
986    out.extend_from_slice(&a[i..]);
987    out.extend_from_slice(&b[j..]);
988    out
989}
990
991/// 16-way per-chunk intersection. The 5 effective shapes after annihilator
992/// folding: Empty/_, _/Empty, All/X, X/All, Sparse∩Sparse (merge-walk),
993/// Sparse∩Dense (filter-walk), Dense∩Dense (word-AND).
994fn intersect_chunks(a: &HybridChunk, b: &HybridChunk, chunk_len: usize) -> HybridChunk {
995    use HybridChunk::*;
996    match (a, b) {
997        (Empty, _) | (_, Empty) => Empty,
998        (All, x) | (x, All) => x.clone(),
999        (Sparse(ax), Sparse(bx)) => {
1000            let merged = merge_intersect_u16(ax, bx);
1001            classify_sparse_chunk(merged, chunk_len)
1002        }
1003        (Sparse(offs), Dense(words)) | (Dense(words), Sparse(offs)) => {
1004            let mut out: Vec<u16> = Vec::with_capacity(offs.len());
1005            for &off in offs {
1006                let off_u = off as usize;
1007                let wi = off_u / 64;
1008                let bi = off_u % 64;
1009                if (words[wi] >> bi) & 1 == 1 {
1010                    out.push(off);
1011                }
1012            }
1013            classify_sparse_chunk(out, chunk_len)
1014        }
1015        (Dense(aw), Dense(bw)) => {
1016            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1017            for i in 0..HYBRID_WORDS_PER_CHUNK {
1018                buf[i] = aw[i] & bw[i];
1019            }
1020            classify_dense_chunk(buf, chunk_len)
1021        }
1022    }
1023}
1024
1025/// 16-way per-chunk union.
1026fn union_chunks(a: &HybridChunk, b: &HybridChunk, chunk_len: usize) -> HybridChunk {
1027    use HybridChunk::*;
1028    match (a, b) {
1029        (All, _) | (_, All) => All,
1030        (Empty, x) | (x, Empty) => x.clone(),
1031        (Sparse(ax), Sparse(bx)) => {
1032            let merged = merge_union_u16(ax, bx);
1033            // May overflow the sparse threshold → promote to Dense.
1034            if merged.len() == chunk_len {
1035                All
1036            } else if merged.len() < HYBRID_CHUNK_SPARSE_THRESHOLD {
1037                if merged.is_empty() {
1038                    Empty
1039                } else {
1040                    Sparse(merged)
1041                }
1042            } else {
1043                let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1044                for &off in &merged {
1045                    let off = off as usize;
1046                    buf[off / 64] |= 1u64 << (off % 64);
1047                }
1048                classify_dense_chunk(buf, chunk_len)
1049            }
1050        }
1051        (Sparse(offs), Dense(words)) | (Dense(words), Sparse(offs)) => {
1052            let mut buf: Vec<u64> = words.iter().copied().collect();
1053            for &off in offs {
1054                let off = off as usize;
1055                buf[off / 64] |= 1u64 << (off % 64);
1056            }
1057            classify_dense_chunk(buf, chunk_len)
1058        }
1059        (Dense(aw), Dense(bw)) => {
1060            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1061            for i in 0..HYBRID_WORDS_PER_CHUNK {
1062                buf[i] = aw[i] | bw[i];
1063            }
1064            classify_dense_chunk(buf, chunk_len)
1065        }
1066    }
1067}
1068
1069/// Hybrid chunk × VerbatimMask 64-word slice → Hybrid chunk (intersection).
1070/// `mask_words` may be shorter than `HYBRID_WORDS_PER_CHUNK` for the final
1071/// (partial) chunk.
1072fn intersect_chunk_with_words(
1073    chunk: &HybridChunk,
1074    mask_words: &[u64],
1075    chunk_len: usize,
1076) -> HybridChunk {
1077    use HybridChunk::*;
1078    match chunk {
1079        Empty => Empty,
1080        All => {
1081            // Result = whatever bits are set in mask_words (pre-zeroed past
1082            // chunk_len by BitMask invariant).
1083            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1084            for (i, &w) in mask_words.iter().enumerate() {
1085                buf[i] = w;
1086            }
1087            classify_dense_chunk(buf, chunk_len)
1088        }
1089        Sparse(offs) => {
1090            let mut out: Vec<u16> = Vec::with_capacity(offs.len());
1091            for &off in offs {
1092                let off_u = off as usize;
1093                let wi = off_u / 64;
1094                if wi < mask_words.len() {
1095                    let bi = off_u % 64;
1096                    if (mask_words[wi] >> bi) & 1 == 1 {
1097                        out.push(off);
1098                    }
1099                }
1100            }
1101            classify_sparse_chunk(out, chunk_len)
1102        }
1103        Dense(words) => {
1104            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1105            for i in 0..HYBRID_WORDS_PER_CHUNK {
1106                let mw = if i < mask_words.len() { mask_words[i] } else { 0 };
1107                buf[i] = words[i] & mw;
1108            }
1109            classify_dense_chunk(buf, chunk_len)
1110        }
1111    }
1112}
1113
1114/// Hybrid chunk × VerbatimMask 64-word slice → Hybrid chunk (union).
1115fn union_chunk_with_words(
1116    chunk: &HybridChunk,
1117    mask_words: &[u64],
1118    chunk_len: usize,
1119) -> HybridChunk {
1120    use HybridChunk::*;
1121    match chunk {
1122        Empty => {
1123            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1124            for (i, &w) in mask_words.iter().enumerate() {
1125                buf[i] = w;
1126            }
1127            classify_dense_chunk(buf, chunk_len)
1128        }
1129        All => All,
1130        Sparse(offs) => {
1131            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1132            for (i, &w) in mask_words.iter().enumerate() {
1133                buf[i] = w;
1134            }
1135            for &off in offs {
1136                let off = off as usize;
1137                buf[off / 64] |= 1u64 << (off % 64);
1138            }
1139            classify_dense_chunk(buf, chunk_len)
1140        }
1141        Dense(words) => {
1142            let mut buf = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1143            for i in 0..HYBRID_WORDS_PER_CHUNK {
1144                let mw = if i < mask_words.len() { mask_words[i] } else { 0 };
1145                buf[i] = words[i] | mw;
1146            }
1147            classify_dense_chunk(buf, chunk_len)
1148        }
1149    }
1150}
1151
1152/// Merge-walk intersection of two sorted ascending `Vec<u32>` slices.
1153fn sorted_merge_intersect(a: &[u32], b: &[u32]) -> Vec<u32> {
1154    let mut out = Vec::with_capacity(a.len().min(b.len()));
1155    let (mut i, mut j) = (0, 0);
1156    while i < a.len() && j < b.len() {
1157        match a[i].cmp(&b[j]) {
1158            std::cmp::Ordering::Less => i += 1,
1159            std::cmp::Ordering::Greater => j += 1,
1160            std::cmp::Ordering::Equal => {
1161                out.push(a[i]);
1162                i += 1;
1163                j += 1;
1164            }
1165        }
1166    }
1167    out
1168}
1169
1170/// Merge-walk union of two sorted ascending `Vec<u32>` slices.
1171fn sorted_merge_union(a: &[u32], b: &[u32]) -> Vec<u32> {
1172    let mut out = Vec::with_capacity(a.len() + b.len());
1173    let (mut i, mut j) = (0, 0);
1174    while i < a.len() && j < b.len() {
1175        match a[i].cmp(&b[j]) {
1176            std::cmp::Ordering::Less => {
1177                out.push(a[i]);
1178                i += 1;
1179            }
1180            std::cmp::Ordering::Greater => {
1181                out.push(b[j]);
1182                j += 1;
1183            }
1184            std::cmp::Ordering::Equal => {
1185                out.push(a[i]);
1186                i += 1;
1187                j += 1;
1188            }
1189        }
1190    }
1191    out.extend_from_slice(&a[i..]);
1192    out.extend_from_slice(&b[j..]);
1193    out
1194}
1195
1196// ── Unit tests ───────────────────────────────────────────────────────────────
1197
1198#[cfg(test)]
1199mod tests {
1200    use super::*;
1201
1202    fn mk_bools(n: usize, set: &[usize]) -> Vec<bool> {
1203        let mut v = vec![false; n];
1204        for &i in set {
1205            v[i] = true;
1206        }
1207        v
1208    }
1209
1210    fn classify_from_bools(bools: &[bool]) -> AdaptiveSelection {
1211        let mask = BitMask::from_bools(bools);
1212        let words: Vec<u64> = mask.words_slice().to_vec();
1213        AdaptiveSelection::from_predicate_result(words, bools.len())
1214    }
1215
1216    // ── Constructors / classifier ────────────────────────────────────
1217
1218    #[test]
1219    fn empty_when_no_bits_set() {
1220        let s = classify_from_bools(&mk_bools(100, &[]));
1221        assert_eq!(s.explain_selection_mode(), "Empty");
1222        assert_eq!(s.count(), 0);
1223        assert_eq!(s.nrows(), 100);
1224    }
1225
1226    #[test]
1227    fn all_when_every_bit_set() {
1228        let s = classify_from_bools(&vec![true; 100]);
1229        assert_eq!(s.explain_selection_mode(), "All");
1230        assert_eq!(s.count(), 100);
1231        assert_eq!(s.nrows(), 100);
1232    }
1233
1234    #[test]
1235    fn sparse_picks_selection_vector() {
1236        // 100_000 rows, 50 hits → 50 < 100_000/1024 (~97) → sparse
1237        let mut hits = vec![];
1238        for i in (0..100_000).step_by(2_000) {
1239            hits.push(i);
1240        }
1241        assert_eq!(hits.len(), 50);
1242        let s = classify_from_bools(&mk_bools(100_000, &hits));
1243        assert_eq!(s.explain_selection_mode(), "SelectionVector");
1244        assert_eq!(s.count(), 50);
1245    }
1246
1247    #[test]
1248    fn dense_picks_verbatim_mask() {
1249        // 1000 rows, every other → 50% → dense
1250        let hits: Vec<usize> = (0..1000).step_by(2).collect();
1251        let s = classify_from_bools(&mk_bools(1000, &hits));
1252        assert_eq!(s.explain_selection_mode(), "VerbatimMask");
1253        assert_eq!(s.count(), 500);
1254    }
1255
1256    #[test]
1257    fn mid_band_small_frame_stays_verbatim() {
1258        // 1000 rows < 2 * HYBRID_CHUNK_SIZE → small frame, mid-band → VerbatimMask.
1259        let hits: Vec<usize> = (0..1000).step_by(5).collect();
1260        assert_eq!(hits.len(), 200);
1261        let s = classify_from_bools(&mk_bools(1000, &hits));
1262        assert_eq!(s.explain_selection_mode(), "VerbatimMask");
1263        assert_eq!(s.count(), 200);
1264    }
1265
1266    #[test]
1267    fn mid_band_large_frame_picks_hybrid() {
1268        // 100_000 rows, 5_000 hits = 5%. Above sparse threshold (97), below
1269        // dense threshold (30%). nrows >= 2 * 4096 → Hybrid.
1270        let hits: Vec<usize> = (0..100_000).step_by(20).collect();
1271        assert_eq!(hits.len(), 5_000);
1272        let s = classify_from_bools(&mk_bools(100_000, &hits));
1273        assert_eq!(s.explain_selection_mode(), "Hybrid");
1274        assert_eq!(s.count(), 5_000);
1275    }
1276
1277    // ── SelectionRepr surface ────────────────────────────────────────
1278
1279    #[test]
1280    fn count_and_contains_agree_for_sparse() {
1281        let hits = vec![3usize, 17, 99, 5_000];
1282        let s = classify_from_bools(&mk_bools(100_000, &hits));
1283        assert_eq!(s.explain_selection_mode(), "SelectionVector");
1284        for h in &hits {
1285            assert!(s.contains(*h), "expected contains({})", h);
1286        }
1287        assert!(!s.contains(0));
1288        assert!(!s.contains(100_001)); // out of range
1289    }
1290
1291    #[test]
1292    fn iter_indices_ascending_for_every_arm() {
1293        let hybrid_hits: Vec<usize> = (0..100_000).step_by(20).collect();
1294        let cases: Vec<(usize, Vec<usize>, &'static str)> = vec![
1295            (10, vec![], "Empty"),
1296            (10, (0..10).collect(), "All"),
1297            (100_000, vec![3, 17, 99, 5_000], "SelectionVector"),
1298            (1000, (0..1000).step_by(2).collect(), "VerbatimMask"),
1299            (100_000, hybrid_hits, "Hybrid"),
1300        ];
1301        for (nrows, hits, expected_mode) in cases {
1302            let s = classify_from_bools(&mk_bools(nrows, &hits));
1303            assert_eq!(s.explain_selection_mode(), expected_mode);
1304            let collected: Vec<usize> = s.iter_indices().collect();
1305            assert_eq!(collected, hits, "{expected_mode} iter mismatch");
1306            // Strictly ascending invariant
1307            for w in collected.windows(2) {
1308                assert!(w[0] < w[1]);
1309            }
1310        }
1311    }
1312
1313    #[test]
1314    fn materialize_round_trip_agrees_for_every_arm() {
1315        let hits = vec![1usize, 5, 10, 64, 65, 200];
1316        let nrows = 256;
1317        let s = classify_from_bools(&mk_bools(nrows, &hits));
1318        let mask = s.materialize_mask();
1319        let idx = s.materialize_indices();
1320        assert_eq!(mask.count_ones(), hits.len());
1321        assert_eq!(idx.len(), hits.len());
1322        for h in &hits {
1323            assert!(mask.get(*h));
1324            assert!(idx.binary_search(&(*h as u32)).is_ok());
1325        }
1326    }
1327
1328    // ── Set ops ──────────────────────────────────────────────────────
1329
1330    #[test]
1331    fn intersect_identity_with_all() {
1332        let s = classify_from_bools(&mk_bools(1000, &[1, 2, 3]));
1333        let all = AdaptiveSelection::all(1000);
1334        let r = s.intersect(&all);
1335        assert_eq!(r.count(), 3);
1336        assert!(r.contains(1) && r.contains(2) && r.contains(3));
1337    }
1338
1339    #[test]
1340    fn intersect_with_empty_is_empty() {
1341        let s = classify_from_bools(&mk_bools(1000, &[1, 2, 3]));
1342        let empty = AdaptiveSelection::empty(1000);
1343        let r = s.intersect(&empty);
1344        assert_eq!(r.explain_selection_mode(), "Empty");
1345        assert_eq!(r.count(), 0);
1346    }
1347
1348    #[test]
1349    fn union_with_all_is_all() {
1350        let s = classify_from_bools(&mk_bools(1000, &[1, 2, 3]));
1351        let all = AdaptiveSelection::all(1000);
1352        let r = s.union(&all);
1353        assert_eq!(r.explain_selection_mode(), "All");
1354        assert_eq!(r.count(), 1000);
1355    }
1356
1357    #[test]
1358    fn intersect_mode_mixing_sparse_and_dense_gives_sparse_or_empty() {
1359        // sparse: a few hits in a 100k-row frame
1360        let sparse = classify_from_bools(&mk_bools(100_000, &[3, 17, 99, 5_000]));
1361        // dense: every other row in the same frame
1362        let dense_hits: Vec<usize> = (0..100_000).step_by(2).collect();
1363        let dense = classify_from_bools(&mk_bools(100_000, &dense_hits));
1364        // 50% over 100k → mid band → Hybrid (large frame).
1365        assert_eq!(dense.explain_selection_mode(), "VerbatimMask"); // 50% > 30% → dense
1366        assert_eq!(sparse.explain_selection_mode(), "SelectionVector");
1367
1368        let r = sparse.intersect(&dense);
1369        // 3, 17, 99 are odd → don't survive even-only filter; 5_000 is even.
1370        assert_eq!(r.count(), 1);
1371        assert!(r.contains(5_000));
1372    }
1373
1374    #[test]
1375    fn intersect_is_commutative_and_associative_for_three_inputs() {
1376        let a = classify_from_bools(&mk_bools(256, &[1, 5, 7, 99]));
1377        let b = classify_from_bools(&mk_bools(256, &[5, 99, 100, 200]));
1378        let c = classify_from_bools(&mk_bools(256, &[5, 50, 99, 250]));
1379
1380        let ab_c = a.intersect(&b).intersect(&c);
1381        let bc_a = b.intersect(&c).intersect(&a);
1382        let ba_c = b.intersect(&a).intersect(&c);
1383
1384        assert_eq!(ab_c.materialize_indices(), bc_a.materialize_indices());
1385        assert_eq!(ab_c.materialize_indices(), ba_c.materialize_indices());
1386    }
1387
1388    // ── Density classifier edges ─────────────────────────────────────
1389
1390    #[test]
1391    fn small_nrows_never_classifies_as_sparse() {
1392        // nrows=10, 1 hit. nrows/1024 = 0, count=1, 1 < 0 is false → not sparse.
1393        let s = classify_from_bools(&mk_bools(10, &[3]));
1394        assert_ne!(s.explain_selection_mode(), "SelectionVector");
1395    }
1396
1397    #[test]
1398    fn dense_threshold_is_exclusive_30_percent() {
1399        // 1000 rows, exactly 300 hits (= 30%). 300*10 = 3000, 1000*3 = 3000,
1400        // strict > fails → mid band, not dense. Small frame → VerbatimMask.
1401        let hits: Vec<usize> = (0..300).collect();
1402        let s = classify_from_bools(&mk_bools(1000, &hits));
1403        assert_eq!(s.explain_selection_mode(), "VerbatimMask");
1404    }
1405
1406    #[test]
1407    fn sparse_iter_uses_word_skipping() {
1408        // 1M rows, 5 hits scattered far apart. iter must succeed quickly and
1409        // return exactly the hits.
1410        let hits = vec![100_usize, 50_000, 200_000, 500_000, 999_000];
1411        let s = classify_from_bools(&mk_bools(1_000_000, &hits));
1412        assert_eq!(s.explain_selection_mode(), "SelectionVector");
1413        let collected: Vec<usize> = s.iter_indices().collect();
1414        assert_eq!(collected, hits);
1415    }
1416
1417    // ── Hybrid arm coverage ──────────────────────────────────────────
1418
1419    #[test]
1420    fn hybrid_iter_matches_bitmask_iter_under_bimodal_density() {
1421        // Bimodal: first half empty, second half mostly-set
1422        let mut hits: Vec<usize> = Vec::new();
1423        for i in 50_000..100_000 {
1424            if i % 2 == 0 {
1425                hits.push(i);
1426            }
1427        }
1428        let bools = mk_bools(100_000, &hits);
1429        let s = classify_from_bools(&bools);
1430        // 25k hits / 100k = 25% mid band → Hybrid.
1431        assert_eq!(s.explain_selection_mode(), "Hybrid");
1432        let collected: Vec<usize> = s.iter_indices().collect();
1433        assert_eq!(collected, hits);
1434        assert_eq!(s.count(), hits.len());
1435    }
1436
1437    #[test]
1438    fn hybrid_contains_matches_bitmask() {
1439        let hits: Vec<usize> = (0..100_000).step_by(20).collect();
1440        let s = classify_from_bools(&mk_bools(100_000, &hits));
1441        assert_eq!(s.explain_selection_mode(), "Hybrid");
1442        for h in &hits {
1443            assert!(s.contains(*h));
1444        }
1445        assert!(!s.contains(99_999));
1446        assert!(!s.contains(100_001));
1447    }
1448
1449    #[test]
1450    fn hybrid_materialize_round_trip() {
1451        let hits: Vec<usize> = (0..100_000).step_by(20).collect();
1452        let s = classify_from_bools(&mk_bools(100_000, &hits));
1453        assert_eq!(s.explain_selection_mode(), "Hybrid");
1454        let bm = s.materialize_mask();
1455        assert_eq!(bm.count_ones(), hits.len());
1456        let idx = s.materialize_indices();
1457        assert_eq!(idx.len(), hits.len());
1458        for h in &hits {
1459            assert!(bm.get(*h));
1460            assert!(idx.binary_search(&(*h as u32)).is_ok());
1461        }
1462    }
1463
1464    #[test]
1465    fn hybrid_chunks_pick_local_modes() {
1466        // 8 chunks × 4096 rows = 32_768 rows. Layout chosen so global density
1467        // lands in the mid band (~16%) so the classifier picks Hybrid:
1468        //   Chunk 0: All           (4096 hits)
1469        //   Chunks 1, 4, 5, 6, 7:  Empty (0 hits)
1470        //   Chunk 2: Sparse        (5 hits)
1471        //   Chunk 3: Dense         (1024 hits, 25% of chunk — past per-chunk sparse threshold)
1472        let nrows = 8 * HYBRID_CHUNK_SIZE;
1473        let mut hits = Vec::new();
1474        for i in 0..HYBRID_CHUNK_SIZE {
1475            hits.push(i);
1476        }
1477        for k in 0..5 {
1478            hits.push(2 * HYBRID_CHUNK_SIZE + k * 100);
1479        }
1480        for off in (0..HYBRID_CHUNK_SIZE).step_by(4) {
1481            hits.push(3 * HYBRID_CHUNK_SIZE + off);
1482        }
1483        let s = classify_from_bools(&mk_bools(nrows, &hits));
1484        assert_eq!(s.explain_selection_mode(), "Hybrid");
1485        if let AdaptiveSelection::Hybrid { chunks, .. } = &s {
1486            assert_eq!(chunks.len(), 8);
1487            assert!(matches!(chunks[0], HybridChunk::All));
1488            assert!(matches!(chunks[1], HybridChunk::Empty));
1489            assert!(matches!(chunks[2], HybridChunk::Sparse(_)));
1490            assert!(matches!(chunks[3], HybridChunk::Dense(_)));
1491            assert!(matches!(chunks[7], HybridChunk::Empty));
1492        } else {
1493            panic!("expected Hybrid");
1494        }
1495    }
1496
1497    // ── Merge-walk fast paths ────────────────────────────────────────
1498
1499    #[test]
1500    fn sparse_intersect_sparse_uses_merge_walk() {
1501        let a = classify_from_bools(&mk_bools(100_000, &[1, 5, 17, 99, 5_000, 50_000]));
1502        let b = classify_from_bools(&mk_bools(100_000, &[5, 17, 200, 5_000, 99_000]));
1503        assert_eq!(a.explain_selection_mode(), "SelectionVector");
1504        assert_eq!(b.explain_selection_mode(), "SelectionVector");
1505        let r = a.intersect(&b);
1506        // {5, 17, 5000} survive
1507        let collected: Vec<usize> = r.iter_indices().collect();
1508        assert_eq!(collected, vec![5usize, 17, 5_000]);
1509    }
1510
1511    #[test]
1512    fn sparse_union_sparse_uses_merge_walk() {
1513        let a = classify_from_bools(&mk_bools(100_000, &[1, 5, 17]));
1514        let b = classify_from_bools(&mk_bools(100_000, &[5, 99, 200]));
1515        let r = a.union(&b);
1516        let collected: Vec<usize> = r.iter_indices().collect();
1517        assert_eq!(collected, vec![1usize, 5, 17, 99, 200]);
1518    }
1519
1520    #[test]
1521    fn intersect_cardinality_identity_holds_across_modes() {
1522        // |A| + |B| = |A ∪ B| + |A ∩ B|
1523        let a = classify_from_bools(&mk_bools(100_000, &[1, 5, 17, 99, 5_000, 50_000]));
1524        let b = classify_from_bools(&mk_bools(100_000, &[5, 17, 200, 5_000, 99_000]));
1525        let inter = a.intersect(&b);
1526        let union = a.union(&b);
1527        assert_eq!(a.count() + b.count(), inter.count() + union.count());
1528    }
1529
1530    #[test]
1531    fn merge_walk_intersect_helper_is_correct() {
1532        let a = vec![1u32, 3, 5, 7, 9];
1533        let b = vec![2u32, 3, 5, 6, 9, 11];
1534        let out = sorted_merge_intersect(&a, &b);
1535        assert_eq!(out, vec![3, 5, 9]);
1536    }
1537
1538    #[test]
1539    fn merge_walk_union_helper_is_correct() {
1540        let a = vec![1u32, 3, 5];
1541        let b = vec![2u32, 3, 7];
1542        let out = sorted_merge_union(&a, &b);
1543        assert_eq!(out, vec![1, 2, 3, 5, 7]);
1544    }
1545
1546    // ── v3 Phase 3: Hybrid streaming set-op fast paths ───────────────
1547
1548    /// Build a Hybrid selection from explicit hits, asserting Hybrid arm.
1549    fn hybrid_from_hits(nrows: usize, hits: &[usize]) -> AdaptiveSelection {
1550        let s = classify_from_bools(&mk_bools(nrows, hits));
1551        assert_eq!(
1552            s.explain_selection_mode(),
1553            "Hybrid",
1554            "expected Hybrid for {nrows} rows × {} hits",
1555            hits.len()
1556        );
1557        s
1558    }
1559
1560    /// Brute-force oracle: materialize both sides and AND/OR via BitMask.
1561    fn oracle_intersect(a: &AdaptiveSelection, b: &AdaptiveSelection) -> Vec<usize> {
1562        let am = a.materialize_mask();
1563        let bm = b.materialize_mask();
1564        let n = am.nrows();
1565        let mut out = Vec::new();
1566        for i in 0..n {
1567            if am.get(i) && bm.get(i) {
1568                out.push(i);
1569            }
1570        }
1571        out
1572    }
1573    fn oracle_union(a: &AdaptiveSelection, b: &AdaptiveSelection) -> Vec<usize> {
1574        let am = a.materialize_mask();
1575        let bm = b.materialize_mask();
1576        let n = am.nrows();
1577        let mut out = Vec::new();
1578        for i in 0..n {
1579            if am.get(i) || bm.get(i) {
1580                out.push(i);
1581            }
1582        }
1583        out
1584    }
1585
1586    #[test]
1587    fn phase3_hybrid_intersect_hybrid_matches_oracle() {
1588        // Two mid-band selections over a 100k-row frame at different phases.
1589        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect(); // 5%
1590        let b_hits: Vec<usize> = (0..100_000).step_by(15).collect(); // ~6.67%
1591        let a = hybrid_from_hits(100_000, &a_hits);
1592        let b = hybrid_from_hits(100_000, &b_hits);
1593        let r = a.intersect(&b);
1594        let collected: Vec<usize> = r.iter_indices().collect();
1595        assert_eq!(collected, oracle_intersect(&a, &b));
1596    }
1597
1598    #[test]
1599    fn phase3_hybrid_union_hybrid_matches_oracle() {
1600        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect();
1601        let b_hits: Vec<usize> = (0..100_000).step_by(15).collect();
1602        let a = hybrid_from_hits(100_000, &a_hits);
1603        let b = hybrid_from_hits(100_000, &b_hits);
1604        let r = a.union(&b);
1605        let collected: Vec<usize> = r.iter_indices().collect();
1606        assert_eq!(collected, oracle_union(&a, &b));
1607    }
1608
1609    #[test]
1610    fn phase3_hybrid_intersect_sparse_matches_oracle() {
1611        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect(); // Hybrid (5%)
1612        let b_hits: Vec<usize> = vec![100, 5_000, 50_020, 99_980]; // SelectionVector
1613        let a = hybrid_from_hits(100_000, &a_hits);
1614        let b = classify_from_bools(&mk_bools(100_000, &b_hits));
1615        assert_eq!(b.explain_selection_mode(), "SelectionVector");
1616        let r = a.intersect(&b);
1617        let collected: Vec<usize> = r.iter_indices().collect();
1618        assert_eq!(collected, oracle_intersect(&a, &b));
1619        // Symmetric
1620        let r2 = b.intersect(&a);
1621        let collected2: Vec<usize> = r2.iter_indices().collect();
1622        assert_eq!(collected2, oracle_intersect(&a, &b));
1623    }
1624
1625    #[test]
1626    fn phase3_hybrid_union_sparse_matches_oracle() {
1627        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect();
1628        let b_hits: Vec<usize> = vec![1, 50, 100, 5_000, 50_020, 99_999];
1629        let a = hybrid_from_hits(100_000, &a_hits);
1630        let b = classify_from_bools(&mk_bools(100_000, &b_hits));
1631        assert_eq!(b.explain_selection_mode(), "SelectionVector");
1632        let r = a.union(&b);
1633        let collected: Vec<usize> = r.iter_indices().collect();
1634        assert_eq!(collected, oracle_union(&a, &b));
1635    }
1636
1637    #[test]
1638    fn phase3_hybrid_intersect_verbatim_matches_oracle() {
1639        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect(); // Hybrid
1640        let b_hits: Vec<usize> = (0..100_000).step_by(2).collect(); // 50% → VerbatimMask
1641        let a = hybrid_from_hits(100_000, &a_hits);
1642        let b = classify_from_bools(&mk_bools(100_000, &b_hits));
1643        assert_eq!(b.explain_selection_mode(), "VerbatimMask");
1644        let r = a.intersect(&b);
1645        let collected: Vec<usize> = r.iter_indices().collect();
1646        assert_eq!(collected, oracle_intersect(&a, &b));
1647        // Symmetric
1648        let r2 = b.intersect(&a);
1649        assert_eq!(
1650            r2.iter_indices().collect::<Vec<usize>>(),
1651            oracle_intersect(&a, &b)
1652        );
1653    }
1654
1655    #[test]
1656    fn phase3_hybrid_union_verbatim_matches_oracle() {
1657        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect();
1658        let b_hits: Vec<usize> = (0..100_000).step_by(2).collect();
1659        let a = hybrid_from_hits(100_000, &a_hits);
1660        let b = classify_from_bools(&mk_bools(100_000, &b_hits));
1661        let r = a.union(&b);
1662        let collected: Vec<usize> = r.iter_indices().collect();
1663        assert_eq!(collected, oracle_union(&a, &b));
1664    }
1665
1666    #[test]
1667    fn phase3_hybrid_chain_intersect_three_way() {
1668        let a_hits: Vec<usize> = (0..100_000).step_by(20).collect();
1669        let b_hits: Vec<usize> = (0..100_000).step_by(15).collect();
1670        let c_hits: Vec<usize> = (0..100_000).step_by(12).collect();
1671        let a = hybrid_from_hits(100_000, &a_hits);
1672        let b = hybrid_from_hits(100_000, &b_hits);
1673        let c = hybrid_from_hits(100_000, &c_hits);
1674        let abc = a.intersect(&b).intersect(&c);
1675        let cba = c.intersect(&b).intersect(&a);
1676        assert_eq!(abc.materialize_indices(), cba.materialize_indices());
1677        // Compare against scalar oracle.
1678        let am = a.materialize_mask();
1679        let bm = b.materialize_mask();
1680        let cm = c.materialize_mask();
1681        let oracle: Vec<usize> = (0..100_000)
1682            .filter(|&i| am.get(i) && bm.get(i) && cm.get(i))
1683            .collect();
1684        assert_eq!(abc.iter_indices().collect::<Vec<usize>>(), oracle);
1685    }
1686
1687    #[test]
1688    fn phase3_per_chunk_intersect_sparse_sparse_uses_merge_walk() {
1689        // Two Hybrids whose corresponding chunks are both Sparse — verifies
1690        // the merge-walk path actually fires and produces correct sparse output.
1691        // Stride 200 over 100k = 500 hits; per-chunk count ≈ 4096/200 ≈ 20 (well
1692        // below 128 sparse threshold), so chunks are Sparse-shaped.
1693        let a_hits: Vec<usize> = (0..100_000).step_by(200).collect();
1694        let b_hits: Vec<usize> = (100..100_000).step_by(200).collect();
1695        let a = hybrid_from_hits(100_000, &a_hits);
1696        let b = hybrid_from_hits(100_000, &b_hits);
1697        // Disjoint hits → intersection is empty.
1698        let r = a.intersect(&b);
1699        assert_eq!(r.count(), 0);
1700        assert_eq!(r.explain_selection_mode(), "Empty");
1701    }
1702
1703    #[test]
1704    fn phase3_per_chunk_union_sparse_sparse_promotes_to_dense_when_large() {
1705        // Force per-chunk overflow past sparse threshold (128 hits/chunk):
1706        // stride 30 across 100k = ~3333 hits, per-chunk ≈ 4096/30 ≈ 136 → just
1707        // above sparse threshold per-chunk. Union with itself stays sparse-shaped
1708        // via merge-walk, but cross-streamed unions can promote.
1709        let a_hits: Vec<usize> = (0..100_000).step_by(30).collect();
1710        let b_hits: Vec<usize> = (15..100_000).step_by(30).collect();
1711        let a_count = a_hits.len();
1712        let b_count = b_hits.len();
1713        let a = hybrid_from_hits(100_000, &a_hits);
1714        let b = hybrid_from_hits(100_000, &b_hits);
1715        let r = a.union(&b);
1716        // Disjoint hits → union count = a_count + b_count.
1717        assert_eq!(r.count(), a_count + b_count);
1718        // Confirm content matches oracle.
1719        assert_eq!(
1720            r.iter_indices().collect::<Vec<usize>>(),
1721            oracle_union(&a, &b)
1722        );
1723    }
1724
1725    #[test]
1726    fn phase3_simplify_hybrid_collapses_to_all_when_full() {
1727        // a + b cover every row → simplify_hybrid must collapse to All.
1728        let a_hits: Vec<usize> = (0..100_000).filter(|i| i % 2 == 0).collect();
1729        let b_hits: Vec<usize> = (0..100_000).filter(|i| i % 2 == 1).collect();
1730        // Each is 50% over 100k → VerbatimMask, not Hybrid. Build Hybrid by
1731        // forcing mid-band: take 6/10 of rows in each side so mid-band fires.
1732        let a_hits: Vec<usize> = (0..100_000).filter(|i| i % 5 != 0).collect(); // 80% — dense
1733        let b_hits: Vec<usize> = (0..100_000).step_by(5).collect(); // 20%
1734        let _ = (a_hits, b_hits); // unused — see next assertion-style test.
1735        // Instead: assert simplify_hybrid directly on a fully-set chunked arg.
1736        let nrows = 8 * HYBRID_CHUNK_SIZE;
1737        let chunks = vec![HybridChunk::All; 8];
1738        let s = AdaptiveSelection::simplify_hybrid(nrows, chunks);
1739        assert_eq!(s.explain_selection_mode(), "All");
1740        assert_eq!(s.count(), nrows);
1741    }
1742
1743    #[test]
1744    fn phase3_simplify_hybrid_collapses_to_empty_when_drained() {
1745        let nrows = 8 * HYBRID_CHUNK_SIZE;
1746        let chunks = vec![HybridChunk::Empty; 8];
1747        let s = AdaptiveSelection::simplify_hybrid(nrows, chunks);
1748        assert_eq!(s.explain_selection_mode(), "Empty");
1749        assert_eq!(s.count(), 0);
1750    }
1751
1752    #[test]
1753    fn phase3_per_chunk_helpers_handle_partial_final_chunk() {
1754        // 9000 rows = 2 full chunks + 1 partial (808 rows). Mid-band density.
1755        let nrows = 2 * HYBRID_CHUNK_SIZE + 808;
1756        let a_hits: Vec<usize> = (0..nrows).step_by(20).collect();
1757        let b_hits: Vec<usize> = (0..nrows).step_by(15).collect();
1758        let a = hybrid_from_hits(nrows, &a_hits);
1759        let b = hybrid_from_hits(nrows, &b_hits);
1760        let r = a.intersect(&b);
1761        let collected: Vec<usize> = r.iter_indices().collect();
1762        assert_eq!(collected, oracle_intersect(&a, &b));
1763        // No row should leak past nrows.
1764        for &row in &collected {
1765            assert!(row < nrows, "row {row} out of bounds");
1766        }
1767    }
1768
1769    #[test]
1770    fn phase3_intersect_chunks_helper_dense_and_sparse() {
1771        // Direct unit test on intersect_chunks: Sparse vs Dense, pinning that
1772        // we get a sparse result with correct offsets.
1773        let chunk_len = HYBRID_CHUNK_SIZE;
1774        let sparse = HybridChunk::Sparse(vec![0u16, 5, 17, 100, 4000]);
1775        // Dense bitmap with bit 5, 17, 4000 set.
1776        let mut dense_words = vec![0u64; HYBRID_WORDS_PER_CHUNK];
1777        for off in [5usize, 17, 4000] {
1778            dense_words[off / 64] |= 1u64 << (off % 64);
1779        }
1780        let dense = HybridChunk::Dense(dense_words.into_boxed_slice());
1781        let r = intersect_chunks(&sparse, &dense, chunk_len);
1782        match r {
1783            HybridChunk::Sparse(offs) => assert_eq!(offs, vec![5, 17, 4000]),
1784            other => panic!("expected Sparse, got {other:?}"),
1785        }
1786    }
1787
1788    #[test]
1789    fn phase3_union_chunks_helper_demotes_to_sparse_when_small() {
1790        // Sparse ∪ Sparse below threshold stays Sparse.
1791        let chunk_len = HYBRID_CHUNK_SIZE;
1792        let a = HybridChunk::Sparse(vec![1u16, 2, 3]);
1793        let b = HybridChunk::Sparse(vec![4u16, 5, 6]);
1794        let r = union_chunks(&a, &b, chunk_len);
1795        match r {
1796            HybridChunk::Sparse(offs) => assert_eq!(offs, vec![1, 2, 3, 4, 5, 6]),
1797            other => panic!("expected Sparse, got {other:?}"),
1798        }
1799    }
1800
1801    #[test]
1802    fn phase3_union_chunks_helper_promotes_to_dense_above_threshold() {
1803        // Build two disjoint sparse chunks each with 70 hits (140 union >
1804        // 128 threshold) → output must be Dense.
1805        let chunk_len = HYBRID_CHUNK_SIZE;
1806        let a_offs: Vec<u16> = (0..70).map(|i| i * 2).collect();
1807        let b_offs: Vec<u16> = (0..70).map(|i| i * 2 + 1).collect();
1808        let a = HybridChunk::Sparse(a_offs);
1809        let b = HybridChunk::Sparse(b_offs);
1810        let r = union_chunks(&a, &b, chunk_len);
1811        match r {
1812            HybridChunk::Dense(words) => {
1813                let count: usize = words.iter().map(|w| w.count_ones() as usize).sum();
1814                assert_eq!(count, 140);
1815            }
1816            other => panic!("expected Dense (overflow), got {other:?}"),
1817        }
1818    }
1819}