Skip to main content

cjc_data/
predicate_bytecode.rs

1//! Predicate bytecode for `TidyView::filter` — Adaptive Engine v2.1.
2//!
3//! Lowers a `DExpr` predicate to a flat stack-bytecode program once per
4//! `filter()` call, then interprets it in a tight loop. Produces bit-
5//! identical output to the legacy AST-walk path (`try_eval_predicate_columnar`)
6//! on every shape it accepts; `lower` returns `None` for unsupported shapes
7//! so the caller falls through to row-wise evaluation, exactly as before.
8//!
9//! Why bytecode rather than recursive AST descent?
10//! - Lowering happens once. Interpretation is a flat loop over a `Vec<PredicateOp>`
11//!   with no per-node recursion or `Option`-return error plumbing.
12//! - The supported predicate language is closed: leaves are always
13//!   `Col op Lit` and interior nodes are always `And`/`Or`. Three opcodes
14//!   are enough — no symbol table, no scopes.
15//! - Determinism is preserved by construction: every `Cmp` opcode delegates
16//!   to the same `columnar_cmp_*` kernels the AST walk used. Floating-point
17//!   semantics, NaN handling, and i64→f64 promotion all match bit-for-bit.
18
19use crate::{columnar_cmp_f64, columnar_cmp_i64, nwords_for, BitMask, Column, DBinOp, DExpr, DataFrame};
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub enum CmpKind {
23    Gt,
24    Lt,
25    Ge,
26    Le,
27    Eq,
28    Ne,
29}
30
31impl CmpKind {
32    fn from_dbinop(op: DBinOp) -> Option<Self> {
33        match op {
34            DBinOp::Gt => Some(Self::Gt),
35            DBinOp::Lt => Some(Self::Lt),
36            DBinOp::Ge => Some(Self::Ge),
37            DBinOp::Le => Some(Self::Le),
38            DBinOp::Eq => Some(Self::Eq),
39            DBinOp::Ne => Some(Self::Ne),
40            _ => None,
41        }
42    }
43
44    fn to_dbinop(self) -> DBinOp {
45        match self {
46            Self::Gt => DBinOp::Gt,
47            Self::Lt => DBinOp::Lt,
48            Self::Ge => DBinOp::Ge,
49            Self::Le => DBinOp::Le,
50            Self::Eq => DBinOp::Eq,
51            Self::Ne => DBinOp::Ne,
52        }
53    }
54
55    /// Flip the relation when literal is on the LHS: `5 > col` becomes `col < 5`.
56    /// `Eq`/`Ne` are symmetric.
57    fn flip(self) -> Self {
58        match self {
59            Self::Gt => Self::Lt,
60            Self::Lt => Self::Gt,
61            Self::Ge => Self::Le,
62            Self::Le => Self::Ge,
63            Self::Eq => Self::Eq,
64            Self::Ne => Self::Ne,
65        }
66    }
67}
68
69/// One leaf comparison, fully resolved at lowering time.
70///
71/// The four variants correspond to the four arms in
72/// `try_eval_predicate_columnar`: a Float column compared to a float literal,
73/// a Float column compared to an int literal (literal promoted to f64), an
74/// Int column compared to an int literal, or an Int column compared to a float
75/// literal (column promoted to f64 at run time).
76#[derive(Clone, Copy, Debug)]
77pub enum LeafKind {
78    FloatColFloatLit { col_idx: usize, lit: f64 },
79    FloatColIntLit { col_idx: usize, lit: i64 },
80    IntColIntLit { col_idx: usize, lit: i64 },
81    IntColFloatLit { col_idx: usize, lit: f64 },
82}
83
84#[derive(Clone, Debug)]
85pub enum PredicateOp {
86    /// Push the bitmask resulting from comparing a column against a literal.
87    Cmp { kind: LeafKind, op: CmpKind },
88    /// Pop two masks; push their bitwise AND.
89    And,
90    /// Pop two masks; push their bitwise OR.
91    Or,
92}
93
94#[derive(Clone, Debug)]
95pub struct PredicateBytecode {
96    ops: Vec<PredicateOp>,
97}
98
99impl PredicateBytecode {
100    /// Read-only view of the lowered op sequence. Useful for tests and for
101    /// the optional optimizer pass landing in v2.2.
102    pub fn ops(&self) -> &[PredicateOp] {
103        &self.ops
104    }
105
106    /// Lower a `DExpr` predicate to bytecode. Returns `None` if the predicate
107    /// shape is not supported by the columnar fast path (caller falls through
108    /// to row-wise evaluation, exactly as the AST-walk path did).
109    pub fn lower(predicate: &DExpr, base: &DataFrame) -> Option<Self> {
110        let mut ops = Vec::new();
111        lower_into(predicate, base, &mut ops)?;
112        Some(PredicateBytecode { ops })
113    }
114
115    /// Sparse-aware interpretation (v2.2). Evaluates the predicate over only
116    /// the rows listed in `existing_indices` (must be ascending and a subset
117    /// of `0..nrows`). Result bits are set exclusively for indices in the
118    /// input set, so no final AND with `existing_mask` is needed — the
119    /// caller has effectively pre-applied it via index materialization.
120    ///
121    /// Bit-identical to `interpret(base, &existing_mask)` whenever
122    /// `existing_indices == existing_mask.iter_set().collect()`. The win
123    /// is amortizing per-`Cmp` cost from O(nrows) to O(|existing_indices|),
124    /// which is meaningful when the parent selection has already narrowed
125    /// the row set substantially (typical for chained `.filter(...).filter(...)`).
126    ///
127    /// Caller is responsible for the density decision (see
128    /// `should_use_sparse_path`). Below that threshold, the random-access
129    /// gather here beats the sequential column scan in `interpret`.
130    pub fn interpret_sparse(
131        &self,
132        base: &DataFrame,
133        existing_indices: &[usize],
134        nrows: usize,
135    ) -> BitMask {
136        let nwords = nwords_for(nrows);
137        let mut stack: Vec<Vec<u64>> = Vec::with_capacity(4);
138
139        for op in &self.ops {
140            match op {
141                PredicateOp::Cmp { kind, op: cop } => {
142                    let mut words = vec![0u64; nwords];
143                    let dop = cop.to_dbinop();
144                    match *kind {
145                        LeafKind::FloatColFloatLit { col_idx, lit } => {
146                            if let Column::Float(data) = &base.columns[col_idx].1 {
147                                for &i in existing_indices {
148                                    if scalar_cmp_f64(data[i], lit, dop) {
149                                        words[i / 64] |= 1u64 << (i % 64);
150                                    }
151                                }
152                            }
153                        }
154                        LeafKind::FloatColIntLit { col_idx, lit } => {
155                            if let Column::Float(data) = &base.columns[col_idx].1 {
156                                let lit_f = lit as f64;
157                                for &i in existing_indices {
158                                    if scalar_cmp_f64(data[i], lit_f, dop) {
159                                        words[i / 64] |= 1u64 << (i % 64);
160                                    }
161                                }
162                            }
163                        }
164                        LeafKind::IntColIntLit { col_idx, lit } => {
165                            if let Column::Int(data) = &base.columns[col_idx].1 {
166                                for &i in existing_indices {
167                                    if scalar_cmp_i64(data[i], lit, dop) {
168                                        words[i / 64] |= 1u64 << (i % 64);
169                                    }
170                                }
171                            }
172                        }
173                        LeafKind::IntColFloatLit { col_idx, lit } => {
174                            if let Column::Int(data) = &base.columns[col_idx].1 {
175                                for &i in existing_indices {
176                                    if scalar_cmp_f64(data[i] as f64, lit, dop) {
177                                        words[i / 64] |= 1u64 << (i % 64);
178                                    }
179                                }
180                            }
181                        }
182                    }
183                    stack.push(words);
184                }
185                PredicateOp::And => {
186                    let r = stack.pop().expect("predicate bytecode: And on empty stack");
187                    let l = stack.pop().expect("predicate bytecode: And on empty stack");
188                    let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a & b).collect();
189                    stack.push(merged);
190                }
191                PredicateOp::Or => {
192                    let r = stack.pop().expect("predicate bytecode: Or on empty stack");
193                    let l = stack.pop().expect("predicate bytecode: Or on empty stack");
194                    let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a | b).collect();
195                    stack.push(merged);
196                }
197            }
198        }
199
200        let top = stack
201            .pop()
202            .expect("predicate bytecode: empty stack after interpretation");
203
204        // No final AND with existing_mask needed: AND/OR are monotone, so
205        // the result is already bounded by the input set, and the input set
206        // is by precondition a subset of the existing mask.
207        BitMask::from_words_for_test(top, nrows)
208    }
209
210    /// Interpret the bytecode against `base`, AND-merging the result with
211    /// `existing_mask`. Result is bit-identical to AST-walk on every shape
212    /// `lower` accepts.
213    /// Evaluate the predicate **without** ANDing into an existing mask,
214    /// classifying the result as an `AdaptiveSelection`. Used by the
215    /// v3 Phase 3 production wiring path: when the existing selection is
216    /// already a Hybrid (mid-band density, large nrows), we want the
217    /// fresh predicate result to also be classified so the downstream
218    /// `existing.intersect(fresh)` call can route through Phase 3's
219    /// per-chunk dispatch instead of materialising-and-AND.
220    ///
221    /// Bit-equivalent to `interpret` followed by `AdaptiveSelection::
222    /// from_predicate_result(...)`, but skips the per-call existing-mask
223    /// AND so the classification reflects the predicate's own selectivity.
224    pub fn evaluate_to_selection(
225        &self,
226        base: &DataFrame,
227        nrows: usize,
228    ) -> crate::adaptive_selection::AdaptiveSelection {
229        let words = self.evaluate_words(base, nrows);
230        crate::adaptive_selection::AdaptiveSelection::from_predicate_result(words, nrows)
231    }
232
233    /// Internal: evaluate the predicate and return the raw `Vec<u64>`
234    /// bitmap words. Shared between `interpret` and
235    /// `evaluate_to_selection`. Caller decides whether to AND with an
236    /// existing mask, classify into AdaptiveSelection, or just wrap as
237    /// BitMask.
238    fn evaluate_words(&self, base: &DataFrame, nrows: usize) -> Vec<u64> {
239        let nwords = nwords_for(nrows);
240        let mut stack: Vec<Vec<u64>> = Vec::with_capacity(4);
241        for op in &self.ops {
242            match op {
243                PredicateOp::Cmp { kind, op: cop } => {
244                    let mut words = vec![0u64; nwords];
245                    let dop = cop.to_dbinop();
246                    match *kind {
247                        LeafKind::FloatColFloatLit { col_idx, lit } => {
248                            if let Column::Float(data) = &base.columns[col_idx].1 {
249                                columnar_cmp_f64(data, lit, dop, &mut words);
250                            }
251                        }
252                        LeafKind::FloatColIntLit { col_idx, lit } => {
253                            if let Column::Float(data) = &base.columns[col_idx].1 {
254                                columnar_cmp_f64(data, lit as f64, dop, &mut words);
255                            }
256                        }
257                        LeafKind::IntColIntLit { col_idx, lit } => {
258                            if let Column::Int(data) = &base.columns[col_idx].1 {
259                                columnar_cmp_i64(data, lit, dop, &mut words);
260                            }
261                        }
262                        LeafKind::IntColFloatLit { col_idx, lit } => {
263                            if let Column::Int(data) = &base.columns[col_idx].1 {
264                                let floats: Vec<f64> = data.iter().map(|&x| x as f64).collect();
265                                columnar_cmp_f64(&floats, lit, dop, &mut words);
266                            }
267                        }
268                    }
269                    stack.push(words);
270                }
271                PredicateOp::And => {
272                    let r = stack.pop().expect("predicate bytecode: And on empty stack");
273                    let l = stack.pop().expect("predicate bytecode: And on empty stack");
274                    let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a & b).collect();
275                    stack.push(merged);
276                }
277                PredicateOp::Or => {
278                    let r = stack.pop().expect("predicate bytecode: Or on empty stack");
279                    let l = stack.pop().expect("predicate bytecode: Or on empty stack");
280                    let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a | b).collect();
281                    stack.push(merged);
282                }
283            }
284        }
285        stack.pop().expect("predicate bytecode: empty stack after interpretation")
286    }
287
288    pub fn interpret(&self, base: &DataFrame, existing_mask: &BitMask) -> BitMask {
289        let nrows = existing_mask.nrows();
290        let mut top = self.evaluate_words(base, nrows);
291        // Final AND with existing mask. Equivalent to AST-walk's per-leaf
292        // AND because bitwise AND distributes over AND/OR trees:
293        //   ((a | b) & c) & m == (a | b) & (c & m)  etc.
294        for (w, ew) in top.iter_mut().zip(existing_mask.words_slice().iter()) {
295            *w &= *ew;
296        }
297        BitMask::from_words_for_test(top, nrows)
298    }
299}
300
301fn lower_into(predicate: &DExpr, base: &DataFrame, ops: &mut Vec<PredicateOp>) -> Option<()> {
302    match predicate {
303        DExpr::BinOp {
304            op: DBinOp::And,
305            left,
306            right,
307        } => {
308            lower_into(left, base, ops)?;
309            lower_into(right, base, ops)?;
310            ops.push(PredicateOp::And);
311            Some(())
312        }
313        DExpr::BinOp {
314            op: DBinOp::Or,
315            left,
316            right,
317        } => {
318            lower_into(left, base, ops)?;
319            lower_into(right, base, ops)?;
320            ops.push(PredicateOp::Or);
321            Some(())
322        }
323        DExpr::BinOp { op, left, right } => {
324            let cmp = CmpKind::from_dbinop(*op)?;
325
326            #[derive(Clone, Copy)]
327            enum LitVal {
328                F(f64),
329                I(i64),
330            }
331
332            let (col_name, lit, reversed) = match (left.as_ref(), right.as_ref()) {
333                (DExpr::Col(n), DExpr::LitFloat(v)) => (n.as_str(), LitVal::F(*v), false),
334                (DExpr::LitFloat(v), DExpr::Col(n)) => (n.as_str(), LitVal::F(*v), true),
335                (DExpr::Col(n), DExpr::LitInt(v)) => (n.as_str(), LitVal::I(*v), false),
336                (DExpr::LitInt(v), DExpr::Col(n)) => (n.as_str(), LitVal::I(*v), true),
337                _ => return None,
338            };
339
340            let effective = if reversed { cmp.flip() } else { cmp };
341
342            let col_idx = base.columns.iter().position(|(n, _)| n == col_name)?;
343            let kind = match (&base.columns[col_idx].1, lit) {
344                (Column::Float(_), LitVal::F(v)) => LeafKind::FloatColFloatLit { col_idx, lit: v },
345                (Column::Float(_), LitVal::I(v)) => LeafKind::FloatColIntLit { col_idx, lit: v },
346                (Column::Int(_), LitVal::I(v)) => LeafKind::IntColIntLit { col_idx, lit: v },
347                (Column::Int(_), LitVal::F(v)) => LeafKind::IntColFloatLit { col_idx, lit: v },
348                _ => return None,
349            };
350
351            ops.push(PredicateOp::Cmp {
352                kind,
353                op: effective,
354            });
355            Some(())
356        }
357        _ => None,
358    }
359}
360
361#[inline]
362fn scalar_cmp_f64(val: f64, lit: f64, op: DBinOp) -> bool {
363    match op {
364        DBinOp::Gt => val > lit,
365        DBinOp::Lt => val < lit,
366        DBinOp::Ge => val >= lit,
367        DBinOp::Le => val <= lit,
368        DBinOp::Eq => val == lit,
369        DBinOp::Ne => val != lit,
370        _ => false,
371    }
372}
373
374#[inline]
375fn scalar_cmp_i64(val: i64, lit: i64, op: DBinOp) -> bool {
376    match op {
377        DBinOp::Gt => val > lit,
378        DBinOp::Lt => val < lit,
379        DBinOp::Ge => val >= lit,
380        DBinOp::Le => val <= lit,
381        DBinOp::Eq => val == lit,
382        DBinOp::Ne => val != lit,
383        _ => false,
384    }
385}
386
387/// Density rule for picking `interpret_sparse` over `interpret`.
388///
389/// Below ~25% density the random-access gather over `existing_indices`
390/// beats the sequential column scan: each `Cmp` opcode does ~`count`
391/// scattered loads vs ~`nrows` sequential loads, and the sequential
392/// streaming-prefetch advantage is roughly 4× per byte on modern x86.
393/// Empirical break-even tracked the 25% boundary closely on the
394/// `bench_density_crossover` workload, so we encode it directly as
395/// `count * 4 < nrows` (integer-only — no float density boundary, per
396/// the determinism contract).
397#[inline]
398pub fn should_use_sparse_path(count: usize, nrows: usize) -> bool {
399    count.saturating_mul(4) < nrows
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use crate::Column;
406
407    fn df_with_int(name: &str, xs: Vec<i64>) -> DataFrame {
408        DataFrame::from_columns(vec![(name.into(), Column::Int(xs))]).unwrap()
409    }
410
411    fn pred_lt(col: &str, v: i64) -> DExpr {
412        DExpr::BinOp {
413            op: DBinOp::Lt,
414            left: Box::new(DExpr::Col(col.into())),
415            right: Box::new(DExpr::LitInt(v)),
416        }
417    }
418
419    #[test]
420    fn lower_simple_lt() {
421        let df = df_with_int("x", (0..10).collect());
422        let bc = PredicateBytecode::lower(&pred_lt("x", 5), &df).unwrap();
423        assert_eq!(bc.ops().len(), 1);
424        match &bc.ops()[0] {
425            PredicateOp::Cmp {
426                kind: LeafKind::IntColIntLit { col_idx, lit },
427                op,
428            } => {
429                assert_eq!(*col_idx, 0);
430                assert_eq!(*lit, 5);
431                assert_eq!(*op, CmpKind::Lt);
432            }
433            other => panic!("unexpected op: {:?}", other),
434        }
435    }
436
437    #[test]
438    fn lower_reversed_lit_op_col_flips() {
439        // 5 > x  becomes  x < 5
440        let df = df_with_int("x", (0..10).collect());
441        let pred = DExpr::BinOp {
442            op: DBinOp::Gt,
443            left: Box::new(DExpr::LitInt(5)),
444            right: Box::new(DExpr::Col("x".into())),
445        };
446        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
447        match &bc.ops()[0] {
448            PredicateOp::Cmp { op, .. } => assert_eq!(*op, CmpKind::Lt),
449            other => panic!("unexpected op: {:?}", other),
450        }
451    }
452
453    #[test]
454    fn lower_and_emits_postorder() {
455        let df = df_with_int("x", (0..10).collect());
456        let pred = DExpr::BinOp {
457            op: DBinOp::And,
458            left: Box::new(pred_lt("x", 5)),
459            right: Box::new(pred_lt("x", 8)),
460        };
461        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
462        assert_eq!(bc.ops().len(), 3);
463        assert!(matches!(bc.ops()[0], PredicateOp::Cmp { .. }));
464        assert!(matches!(bc.ops()[1], PredicateOp::Cmp { .. }));
465        assert!(matches!(bc.ops()[2], PredicateOp::And));
466    }
467
468    #[test]
469    fn lower_unsupported_returns_none() {
470        // Add op is not a comparison; not handled.
471        let df = df_with_int("x", (0..10).collect());
472        let pred = DExpr::BinOp {
473            op: DBinOp::Add,
474            left: Box::new(DExpr::Col("x".into())),
475            right: Box::new(DExpr::LitInt(1)),
476        };
477        assert!(PredicateBytecode::lower(&pred, &df).is_none());
478    }
479
480    #[test]
481    fn lower_unknown_column_returns_none() {
482        let df = df_with_int("x", (0..10).collect());
483        assert!(PredicateBytecode::lower(&pred_lt("ghost", 5), &df).is_none());
484    }
485
486    #[test]
487    fn interpret_simple_lt_matches_count() {
488        let df = df_with_int("x", (0..100).collect());
489        let bc = PredicateBytecode::lower(&pred_lt("x", 30), &df).unwrap();
490        let mask = bc.interpret(&df, &BitMask::all_true(100));
491        assert_eq!(mask.count_ones(), 30);
492        for i in 0..30 {
493            assert!(mask.get(i));
494        }
495        for i in 30..100 {
496            assert!(!mask.get(i));
497        }
498    }
499
500    #[test]
501    fn interpret_and_intersects() {
502        let df = df_with_int("x", (0..100).collect());
503        // x < 50 AND x < 30  ==  x < 30
504        let pred = DExpr::BinOp {
505            op: DBinOp::And,
506            left: Box::new(pred_lt("x", 50)),
507            right: Box::new(pred_lt("x", 30)),
508        };
509        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
510        let mask = bc.interpret(&df, &BitMask::all_true(100));
511        assert_eq!(mask.count_ones(), 30);
512    }
513
514    #[test]
515    fn interpret_or_unions() {
516        let df = df_with_int("x", (0..100).collect());
517        // x < 30 OR x < 70  ==  x < 70
518        let pred = DExpr::BinOp {
519            op: DBinOp::Or,
520            left: Box::new(pred_lt("x", 30)),
521            right: Box::new(pred_lt("x", 70)),
522        };
523        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
524        let mask = bc.interpret(&df, &BitMask::all_true(100));
525        assert_eq!(mask.count_ones(), 70);
526    }
527
528    #[test]
529    fn interpret_respects_existing_mask() {
530        let df = df_with_int("x", (0..100).collect());
531        // existing_mask = only odd indices
532        let mut bools = vec![false; 100];
533        for i in (1..100).step_by(2) {
534            bools[i] = true;
535        }
536        let existing = BitMask::from_bools(&bools);
537
538        let bc = PredicateBytecode::lower(&pred_lt("x", 50), &df).unwrap();
539        let mask = bc.interpret(&df, &existing);
540
541        // x < 50 has 50 hits; intersecting with odd-only gives indices 1,3,…,49 = 25 hits.
542        assert_eq!(mask.count_ones(), 25);
543    }
544
545    #[test]
546    fn interpret_int_col_float_lit_promotes() {
547        // Verifies the IntColFloatLit arm reaches columnar_cmp_f64 with
548        // i64→f64-promoted column data.
549        let df = df_with_int("x", vec![1, 2, 3, 4, 5]);
550        let pred = DExpr::BinOp {
551            op: DBinOp::Lt,
552            left: Box::new(DExpr::Col("x".into())),
553            right: Box::new(DExpr::LitFloat(3.5)),
554        };
555        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
556        let mask = bc.interpret(&df, &BitMask::all_true(5));
557        assert_eq!(mask.count_ones(), 3); // 1, 2, 3
558    }
559
560    #[test]
561    fn interpret_nested_and_or() {
562        let df = df_with_int("x", (0..100).collect());
563        // (x < 10) OR (x > 90 AND x < 95)  ==  10 + 4 = 14
564        let pred = DExpr::BinOp {
565            op: DBinOp::Or,
566            left: Box::new(pred_lt("x", 10)),
567            right: Box::new(DExpr::BinOp {
568                op: DBinOp::And,
569                left: Box::new(DExpr::BinOp {
570                    op: DBinOp::Gt,
571                    left: Box::new(DExpr::Col("x".into())),
572                    right: Box::new(DExpr::LitInt(90)),
573                }),
574                right: Box::new(pred_lt("x", 95)),
575            }),
576        };
577        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
578        let mask = bc.interpret(&df, &BitMask::all_true(100));
579        assert_eq!(mask.count_ones(), 14);
580    }
581
582    // ── v2.2 sparse-aware interpretation ───────────────────────────────
583
584    #[test]
585    fn sparse_path_density_threshold() {
586        // 25% boundary, integer-only.
587        assert!(should_use_sparse_path(0, 100));
588        assert!(should_use_sparse_path(24, 100));
589        assert!(!should_use_sparse_path(25, 100));
590        assert!(!should_use_sparse_path(50, 100));
591        // Edge: nrows == 0 (vacuously dense — no rows to gather).
592        assert!(!should_use_sparse_path(0, 0));
593    }
594
595    #[test]
596    fn interpret_sparse_simple_lt() {
597        let df = df_with_int("x", (0..100).collect());
598        let bc = PredicateBytecode::lower(&pred_lt("x", 30), &df).unwrap();
599        // Existing set is every other row (sparse: 50/100 < threshold trivially false,
600        // but use a sparse subset anyway to validate the gather)
601        let existing: Vec<usize> = (0..100).step_by(4).collect(); // 0, 4, 8, ..., 96 → 25 indices
602        let mask = bc.interpret_sparse(&df, &existing, 100);
603        // Pass condition: x < 30, restricted to 0,4,8,...,96 → 0,4,8,...,28 = 8 hits.
604        assert_eq!(mask.count_ones(), 8);
605        for &i in &existing {
606            assert_eq!(mask.get(i), i < 30);
607        }
608        // Bits outside existing must be zero.
609        for i in 0..100 {
610            if !existing.contains(&i) {
611                assert!(!mask.get(i), "bit {} set but not in existing", i);
612            }
613        }
614    }
615
616    #[test]
617    fn interpret_sparse_matches_dense_on_same_inputs() {
618        // Parity oracle: for any predicate accepted by lower(), the sparse
619        // path with `existing_indices = existing_mask.iter_set()` produces
620        // bit-identical output to the dense path.
621        let df = df_with_int("x", (-100..100).collect());
622
623        let preds = [
624            pred_lt("x", 0),
625            pred_lt("x", 50),
626            DExpr::BinOp {
627                op: DBinOp::And,
628                left: Box::new(pred_lt("x", 50)),
629                right: Box::new(DExpr::BinOp {
630                    op: DBinOp::Gt,
631                    left: Box::new(DExpr::Col("x".into())),
632                    right: Box::new(DExpr::LitInt(-50)),
633                }),
634            },
635            DExpr::BinOp {
636                op: DBinOp::Or,
637                left: Box::new(pred_lt("x", -80)),
638                right: Box::new(DExpr::BinOp {
639                    op: DBinOp::Gt,
640                    left: Box::new(DExpr::Col("x".into())),
641                    right: Box::new(DExpr::LitInt(80)),
642                }),
643            },
644        ];
645
646        // Three masks: every other row, every fifth row, sparse 5%.
647        let masks: Vec<Vec<bool>> = vec![
648            (0..200).map(|i| i % 2 == 0).collect(),
649            (0..200).map(|i| i % 5 == 0).collect(),
650            (0..200).map(|i| i % 20 == 0).collect(),
651        ];
652
653        for pred in &preds {
654            let bc = PredicateBytecode::lower(pred, &df).unwrap();
655            for bools in &masks {
656                let bm = BitMask::from_bools(bools);
657                let dense = bc.interpret(&df, &bm);
658                let indices: Vec<usize> = bm.iter_set().collect();
659                let sparse = bc.interpret_sparse(&df, &indices, 200);
660                assert_eq!(
661                    dense.words_slice(),
662                    sparse.words_slice(),
663                    "sparse path diverged from dense path"
664                );
665            }
666        }
667    }
668
669    #[test]
670    fn interpret_sparse_or_monotone() {
671        // Tests that OR over sparse inputs stays bounded to the input set —
672        // i.e., a row that would pass OR in the full column scan but is not
673        // in `existing_indices` must NOT be set in the sparse output.
674        let df = df_with_int("x", (0..20).collect());
675        // Predicate: x < 5 OR x > 15 — would normally pass at 0..5 and 16..20.
676        let pred = DExpr::BinOp {
677            op: DBinOp::Or,
678            left: Box::new(pred_lt("x", 5)),
679            right: Box::new(DExpr::BinOp {
680                op: DBinOp::Gt,
681                left: Box::new(DExpr::Col("x".into())),
682                right: Box::new(DExpr::LitInt(15)),
683            }),
684        };
685        let bc = PredicateBytecode::lower(&pred, &df).unwrap();
686        // Existing: only even indices < 10. Of those, predicate passes at 0,2,4 (only).
687        let existing: Vec<usize> = (0..10).step_by(2).collect(); // 0,2,4,6,8
688        let mask = bc.interpret_sparse(&df, &existing, 20);
689        assert_eq!(mask.count_ones(), 3); // 0, 2, 4
690        for i in 16..20 {
691            assert!(!mask.get(i), "OR-pass row {} leaked despite not being in existing", i);
692        }
693    }
694}