1use crate::{columnar_cmp_f64, columnar_cmp_i64, nwords_for, BitMask, Column, DBinOp, DExpr, DataFrame};
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22pub enum CmpKind {
23 Gt,
24 Lt,
25 Ge,
26 Le,
27 Eq,
28 Ne,
29}
30
31impl CmpKind {
32 fn from_dbinop(op: DBinOp) -> Option<Self> {
33 match op {
34 DBinOp::Gt => Some(Self::Gt),
35 DBinOp::Lt => Some(Self::Lt),
36 DBinOp::Ge => Some(Self::Ge),
37 DBinOp::Le => Some(Self::Le),
38 DBinOp::Eq => Some(Self::Eq),
39 DBinOp::Ne => Some(Self::Ne),
40 _ => None,
41 }
42 }
43
44 fn to_dbinop(self) -> DBinOp {
45 match self {
46 Self::Gt => DBinOp::Gt,
47 Self::Lt => DBinOp::Lt,
48 Self::Ge => DBinOp::Ge,
49 Self::Le => DBinOp::Le,
50 Self::Eq => DBinOp::Eq,
51 Self::Ne => DBinOp::Ne,
52 }
53 }
54
55 fn flip(self) -> Self {
58 match self {
59 Self::Gt => Self::Lt,
60 Self::Lt => Self::Gt,
61 Self::Ge => Self::Le,
62 Self::Le => Self::Ge,
63 Self::Eq => Self::Eq,
64 Self::Ne => Self::Ne,
65 }
66 }
67}
68
69#[derive(Clone, Copy, Debug)]
77pub enum LeafKind {
78 FloatColFloatLit { col_idx: usize, lit: f64 },
79 FloatColIntLit { col_idx: usize, lit: i64 },
80 IntColIntLit { col_idx: usize, lit: i64 },
81 IntColFloatLit { col_idx: usize, lit: f64 },
82}
83
84#[derive(Clone, Debug)]
85pub enum PredicateOp {
86 Cmp { kind: LeafKind, op: CmpKind },
88 And,
90 Or,
92}
93
94#[derive(Clone, Debug)]
95pub struct PredicateBytecode {
96 ops: Vec<PredicateOp>,
97}
98
99impl PredicateBytecode {
100 pub fn ops(&self) -> &[PredicateOp] {
103 &self.ops
104 }
105
106 pub fn lower(predicate: &DExpr, base: &DataFrame) -> Option<Self> {
110 let mut ops = Vec::new();
111 lower_into(predicate, base, &mut ops)?;
112 Some(PredicateBytecode { ops })
113 }
114
115 pub fn interpret_sparse(
131 &self,
132 base: &DataFrame,
133 existing_indices: &[usize],
134 nrows: usize,
135 ) -> BitMask {
136 let nwords = nwords_for(nrows);
137 let mut stack: Vec<Vec<u64>> = Vec::with_capacity(4);
138
139 for op in &self.ops {
140 match op {
141 PredicateOp::Cmp { kind, op: cop } => {
142 let mut words = vec![0u64; nwords];
143 let dop = cop.to_dbinop();
144 match *kind {
145 LeafKind::FloatColFloatLit { col_idx, lit } => {
146 if let Column::Float(data) = &base.columns[col_idx].1 {
147 for &i in existing_indices {
148 if scalar_cmp_f64(data[i], lit, dop) {
149 words[i / 64] |= 1u64 << (i % 64);
150 }
151 }
152 }
153 }
154 LeafKind::FloatColIntLit { col_idx, lit } => {
155 if let Column::Float(data) = &base.columns[col_idx].1 {
156 let lit_f = lit as f64;
157 for &i in existing_indices {
158 if scalar_cmp_f64(data[i], lit_f, dop) {
159 words[i / 64] |= 1u64 << (i % 64);
160 }
161 }
162 }
163 }
164 LeafKind::IntColIntLit { col_idx, lit } => {
165 if let Column::Int(data) = &base.columns[col_idx].1 {
166 for &i in existing_indices {
167 if scalar_cmp_i64(data[i], lit, dop) {
168 words[i / 64] |= 1u64 << (i % 64);
169 }
170 }
171 }
172 }
173 LeafKind::IntColFloatLit { col_idx, lit } => {
174 if let Column::Int(data) = &base.columns[col_idx].1 {
175 for &i in existing_indices {
176 if scalar_cmp_f64(data[i] as f64, lit, dop) {
177 words[i / 64] |= 1u64 << (i % 64);
178 }
179 }
180 }
181 }
182 }
183 stack.push(words);
184 }
185 PredicateOp::And => {
186 let r = stack.pop().expect("predicate bytecode: And on empty stack");
187 let l = stack.pop().expect("predicate bytecode: And on empty stack");
188 let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a & b).collect();
189 stack.push(merged);
190 }
191 PredicateOp::Or => {
192 let r = stack.pop().expect("predicate bytecode: Or on empty stack");
193 let l = stack.pop().expect("predicate bytecode: Or on empty stack");
194 let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a | b).collect();
195 stack.push(merged);
196 }
197 }
198 }
199
200 let top = stack
201 .pop()
202 .expect("predicate bytecode: empty stack after interpretation");
203
204 BitMask::from_words_for_test(top, nrows)
208 }
209
210 pub fn evaluate_to_selection(
225 &self,
226 base: &DataFrame,
227 nrows: usize,
228 ) -> crate::adaptive_selection::AdaptiveSelection {
229 let words = self.evaluate_words(base, nrows);
230 crate::adaptive_selection::AdaptiveSelection::from_predicate_result(words, nrows)
231 }
232
233 fn evaluate_words(&self, base: &DataFrame, nrows: usize) -> Vec<u64> {
239 let nwords = nwords_for(nrows);
240 let mut stack: Vec<Vec<u64>> = Vec::with_capacity(4);
241 for op in &self.ops {
242 match op {
243 PredicateOp::Cmp { kind, op: cop } => {
244 let mut words = vec![0u64; nwords];
245 let dop = cop.to_dbinop();
246 match *kind {
247 LeafKind::FloatColFloatLit { col_idx, lit } => {
248 if let Column::Float(data) = &base.columns[col_idx].1 {
249 columnar_cmp_f64(data, lit, dop, &mut words);
250 }
251 }
252 LeafKind::FloatColIntLit { col_idx, lit } => {
253 if let Column::Float(data) = &base.columns[col_idx].1 {
254 columnar_cmp_f64(data, lit as f64, dop, &mut words);
255 }
256 }
257 LeafKind::IntColIntLit { col_idx, lit } => {
258 if let Column::Int(data) = &base.columns[col_idx].1 {
259 columnar_cmp_i64(data, lit, dop, &mut words);
260 }
261 }
262 LeafKind::IntColFloatLit { col_idx, lit } => {
263 if let Column::Int(data) = &base.columns[col_idx].1 {
264 let floats: Vec<f64> = data.iter().map(|&x| x as f64).collect();
265 columnar_cmp_f64(&floats, lit, dop, &mut words);
266 }
267 }
268 }
269 stack.push(words);
270 }
271 PredicateOp::And => {
272 let r = stack.pop().expect("predicate bytecode: And on empty stack");
273 let l = stack.pop().expect("predicate bytecode: And on empty stack");
274 let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a & b).collect();
275 stack.push(merged);
276 }
277 PredicateOp::Or => {
278 let r = stack.pop().expect("predicate bytecode: Or on empty stack");
279 let l = stack.pop().expect("predicate bytecode: Or on empty stack");
280 let merged: Vec<u64> = l.iter().zip(r.iter()).map(|(a, b)| a | b).collect();
281 stack.push(merged);
282 }
283 }
284 }
285 stack.pop().expect("predicate bytecode: empty stack after interpretation")
286 }
287
288 pub fn interpret(&self, base: &DataFrame, existing_mask: &BitMask) -> BitMask {
289 let nrows = existing_mask.nrows();
290 let mut top = self.evaluate_words(base, nrows);
291 for (w, ew) in top.iter_mut().zip(existing_mask.words_slice().iter()) {
295 *w &= *ew;
296 }
297 BitMask::from_words_for_test(top, nrows)
298 }
299}
300
301fn lower_into(predicate: &DExpr, base: &DataFrame, ops: &mut Vec<PredicateOp>) -> Option<()> {
302 match predicate {
303 DExpr::BinOp {
304 op: DBinOp::And,
305 left,
306 right,
307 } => {
308 lower_into(left, base, ops)?;
309 lower_into(right, base, ops)?;
310 ops.push(PredicateOp::And);
311 Some(())
312 }
313 DExpr::BinOp {
314 op: DBinOp::Or,
315 left,
316 right,
317 } => {
318 lower_into(left, base, ops)?;
319 lower_into(right, base, ops)?;
320 ops.push(PredicateOp::Or);
321 Some(())
322 }
323 DExpr::BinOp { op, left, right } => {
324 let cmp = CmpKind::from_dbinop(*op)?;
325
326 #[derive(Clone, Copy)]
327 enum LitVal {
328 F(f64),
329 I(i64),
330 }
331
332 let (col_name, lit, reversed) = match (left.as_ref(), right.as_ref()) {
333 (DExpr::Col(n), DExpr::LitFloat(v)) => (n.as_str(), LitVal::F(*v), false),
334 (DExpr::LitFloat(v), DExpr::Col(n)) => (n.as_str(), LitVal::F(*v), true),
335 (DExpr::Col(n), DExpr::LitInt(v)) => (n.as_str(), LitVal::I(*v), false),
336 (DExpr::LitInt(v), DExpr::Col(n)) => (n.as_str(), LitVal::I(*v), true),
337 _ => return None,
338 };
339
340 let effective = if reversed { cmp.flip() } else { cmp };
341
342 let col_idx = base.columns.iter().position(|(n, _)| n == col_name)?;
343 let kind = match (&base.columns[col_idx].1, lit) {
344 (Column::Float(_), LitVal::F(v)) => LeafKind::FloatColFloatLit { col_idx, lit: v },
345 (Column::Float(_), LitVal::I(v)) => LeafKind::FloatColIntLit { col_idx, lit: v },
346 (Column::Int(_), LitVal::I(v)) => LeafKind::IntColIntLit { col_idx, lit: v },
347 (Column::Int(_), LitVal::F(v)) => LeafKind::IntColFloatLit { col_idx, lit: v },
348 _ => return None,
349 };
350
351 ops.push(PredicateOp::Cmp {
352 kind,
353 op: effective,
354 });
355 Some(())
356 }
357 _ => None,
358 }
359}
360
361#[inline]
362fn scalar_cmp_f64(val: f64, lit: f64, op: DBinOp) -> bool {
363 match op {
364 DBinOp::Gt => val > lit,
365 DBinOp::Lt => val < lit,
366 DBinOp::Ge => val >= lit,
367 DBinOp::Le => val <= lit,
368 DBinOp::Eq => val == lit,
369 DBinOp::Ne => val != lit,
370 _ => false,
371 }
372}
373
374#[inline]
375fn scalar_cmp_i64(val: i64, lit: i64, op: DBinOp) -> bool {
376 match op {
377 DBinOp::Gt => val > lit,
378 DBinOp::Lt => val < lit,
379 DBinOp::Ge => val >= lit,
380 DBinOp::Le => val <= lit,
381 DBinOp::Eq => val == lit,
382 DBinOp::Ne => val != lit,
383 _ => false,
384 }
385}
386
387#[inline]
398pub fn should_use_sparse_path(count: usize, nrows: usize) -> bool {
399 count.saturating_mul(4) < nrows
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use crate::Column;
406
407 fn df_with_int(name: &str, xs: Vec<i64>) -> DataFrame {
408 DataFrame::from_columns(vec![(name.into(), Column::Int(xs))]).unwrap()
409 }
410
411 fn pred_lt(col: &str, v: i64) -> DExpr {
412 DExpr::BinOp {
413 op: DBinOp::Lt,
414 left: Box::new(DExpr::Col(col.into())),
415 right: Box::new(DExpr::LitInt(v)),
416 }
417 }
418
419 #[test]
420 fn lower_simple_lt() {
421 let df = df_with_int("x", (0..10).collect());
422 let bc = PredicateBytecode::lower(&pred_lt("x", 5), &df).unwrap();
423 assert_eq!(bc.ops().len(), 1);
424 match &bc.ops()[0] {
425 PredicateOp::Cmp {
426 kind: LeafKind::IntColIntLit { col_idx, lit },
427 op,
428 } => {
429 assert_eq!(*col_idx, 0);
430 assert_eq!(*lit, 5);
431 assert_eq!(*op, CmpKind::Lt);
432 }
433 other => panic!("unexpected op: {:?}", other),
434 }
435 }
436
437 #[test]
438 fn lower_reversed_lit_op_col_flips() {
439 let df = df_with_int("x", (0..10).collect());
441 let pred = DExpr::BinOp {
442 op: DBinOp::Gt,
443 left: Box::new(DExpr::LitInt(5)),
444 right: Box::new(DExpr::Col("x".into())),
445 };
446 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
447 match &bc.ops()[0] {
448 PredicateOp::Cmp { op, .. } => assert_eq!(*op, CmpKind::Lt),
449 other => panic!("unexpected op: {:?}", other),
450 }
451 }
452
453 #[test]
454 fn lower_and_emits_postorder() {
455 let df = df_with_int("x", (0..10).collect());
456 let pred = DExpr::BinOp {
457 op: DBinOp::And,
458 left: Box::new(pred_lt("x", 5)),
459 right: Box::new(pred_lt("x", 8)),
460 };
461 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
462 assert_eq!(bc.ops().len(), 3);
463 assert!(matches!(bc.ops()[0], PredicateOp::Cmp { .. }));
464 assert!(matches!(bc.ops()[1], PredicateOp::Cmp { .. }));
465 assert!(matches!(bc.ops()[2], PredicateOp::And));
466 }
467
468 #[test]
469 fn lower_unsupported_returns_none() {
470 let df = df_with_int("x", (0..10).collect());
472 let pred = DExpr::BinOp {
473 op: DBinOp::Add,
474 left: Box::new(DExpr::Col("x".into())),
475 right: Box::new(DExpr::LitInt(1)),
476 };
477 assert!(PredicateBytecode::lower(&pred, &df).is_none());
478 }
479
480 #[test]
481 fn lower_unknown_column_returns_none() {
482 let df = df_with_int("x", (0..10).collect());
483 assert!(PredicateBytecode::lower(&pred_lt("ghost", 5), &df).is_none());
484 }
485
486 #[test]
487 fn interpret_simple_lt_matches_count() {
488 let df = df_with_int("x", (0..100).collect());
489 let bc = PredicateBytecode::lower(&pred_lt("x", 30), &df).unwrap();
490 let mask = bc.interpret(&df, &BitMask::all_true(100));
491 assert_eq!(mask.count_ones(), 30);
492 for i in 0..30 {
493 assert!(mask.get(i));
494 }
495 for i in 30..100 {
496 assert!(!mask.get(i));
497 }
498 }
499
500 #[test]
501 fn interpret_and_intersects() {
502 let df = df_with_int("x", (0..100).collect());
503 let pred = DExpr::BinOp {
505 op: DBinOp::And,
506 left: Box::new(pred_lt("x", 50)),
507 right: Box::new(pred_lt("x", 30)),
508 };
509 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
510 let mask = bc.interpret(&df, &BitMask::all_true(100));
511 assert_eq!(mask.count_ones(), 30);
512 }
513
514 #[test]
515 fn interpret_or_unions() {
516 let df = df_with_int("x", (0..100).collect());
517 let pred = DExpr::BinOp {
519 op: DBinOp::Or,
520 left: Box::new(pred_lt("x", 30)),
521 right: Box::new(pred_lt("x", 70)),
522 };
523 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
524 let mask = bc.interpret(&df, &BitMask::all_true(100));
525 assert_eq!(mask.count_ones(), 70);
526 }
527
528 #[test]
529 fn interpret_respects_existing_mask() {
530 let df = df_with_int("x", (0..100).collect());
531 let mut bools = vec![false; 100];
533 for i in (1..100).step_by(2) {
534 bools[i] = true;
535 }
536 let existing = BitMask::from_bools(&bools);
537
538 let bc = PredicateBytecode::lower(&pred_lt("x", 50), &df).unwrap();
539 let mask = bc.interpret(&df, &existing);
540
541 assert_eq!(mask.count_ones(), 25);
543 }
544
545 #[test]
546 fn interpret_int_col_float_lit_promotes() {
547 let df = df_with_int("x", vec![1, 2, 3, 4, 5]);
550 let pred = DExpr::BinOp {
551 op: DBinOp::Lt,
552 left: Box::new(DExpr::Col("x".into())),
553 right: Box::new(DExpr::LitFloat(3.5)),
554 };
555 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
556 let mask = bc.interpret(&df, &BitMask::all_true(5));
557 assert_eq!(mask.count_ones(), 3); }
559
560 #[test]
561 fn interpret_nested_and_or() {
562 let df = df_with_int("x", (0..100).collect());
563 let pred = DExpr::BinOp {
565 op: DBinOp::Or,
566 left: Box::new(pred_lt("x", 10)),
567 right: Box::new(DExpr::BinOp {
568 op: DBinOp::And,
569 left: Box::new(DExpr::BinOp {
570 op: DBinOp::Gt,
571 left: Box::new(DExpr::Col("x".into())),
572 right: Box::new(DExpr::LitInt(90)),
573 }),
574 right: Box::new(pred_lt("x", 95)),
575 }),
576 };
577 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
578 let mask = bc.interpret(&df, &BitMask::all_true(100));
579 assert_eq!(mask.count_ones(), 14);
580 }
581
582 #[test]
585 fn sparse_path_density_threshold() {
586 assert!(should_use_sparse_path(0, 100));
588 assert!(should_use_sparse_path(24, 100));
589 assert!(!should_use_sparse_path(25, 100));
590 assert!(!should_use_sparse_path(50, 100));
591 assert!(!should_use_sparse_path(0, 0));
593 }
594
595 #[test]
596 fn interpret_sparse_simple_lt() {
597 let df = df_with_int("x", (0..100).collect());
598 let bc = PredicateBytecode::lower(&pred_lt("x", 30), &df).unwrap();
599 let existing: Vec<usize> = (0..100).step_by(4).collect(); let mask = bc.interpret_sparse(&df, &existing, 100);
603 assert_eq!(mask.count_ones(), 8);
605 for &i in &existing {
606 assert_eq!(mask.get(i), i < 30);
607 }
608 for i in 0..100 {
610 if !existing.contains(&i) {
611 assert!(!mask.get(i), "bit {} set but not in existing", i);
612 }
613 }
614 }
615
616 #[test]
617 fn interpret_sparse_matches_dense_on_same_inputs() {
618 let df = df_with_int("x", (-100..100).collect());
622
623 let preds = [
624 pred_lt("x", 0),
625 pred_lt("x", 50),
626 DExpr::BinOp {
627 op: DBinOp::And,
628 left: Box::new(pred_lt("x", 50)),
629 right: Box::new(DExpr::BinOp {
630 op: DBinOp::Gt,
631 left: Box::new(DExpr::Col("x".into())),
632 right: Box::new(DExpr::LitInt(-50)),
633 }),
634 },
635 DExpr::BinOp {
636 op: DBinOp::Or,
637 left: Box::new(pred_lt("x", -80)),
638 right: Box::new(DExpr::BinOp {
639 op: DBinOp::Gt,
640 left: Box::new(DExpr::Col("x".into())),
641 right: Box::new(DExpr::LitInt(80)),
642 }),
643 },
644 ];
645
646 let masks: Vec<Vec<bool>> = vec![
648 (0..200).map(|i| i % 2 == 0).collect(),
649 (0..200).map(|i| i % 5 == 0).collect(),
650 (0..200).map(|i| i % 20 == 0).collect(),
651 ];
652
653 for pred in &preds {
654 let bc = PredicateBytecode::lower(pred, &df).unwrap();
655 for bools in &masks {
656 let bm = BitMask::from_bools(bools);
657 let dense = bc.interpret(&df, &bm);
658 let indices: Vec<usize> = bm.iter_set().collect();
659 let sparse = bc.interpret_sparse(&df, &indices, 200);
660 assert_eq!(
661 dense.words_slice(),
662 sparse.words_slice(),
663 "sparse path diverged from dense path"
664 );
665 }
666 }
667 }
668
669 #[test]
670 fn interpret_sparse_or_monotone() {
671 let df = df_with_int("x", (0..20).collect());
675 let pred = DExpr::BinOp {
677 op: DBinOp::Or,
678 left: Box::new(pred_lt("x", 5)),
679 right: Box::new(DExpr::BinOp {
680 op: DBinOp::Gt,
681 left: Box::new(DExpr::Col("x".into())),
682 right: Box::new(DExpr::LitInt(15)),
683 }),
684 };
685 let bc = PredicateBytecode::lower(&pred, &df).unwrap();
686 let existing: Vec<usize> = (0..10).step_by(2).collect(); let mask = bc.interpret_sparse(&df, &existing, 20);
689 assert_eq!(mask.count_ones(), 3); for i in 16..20 {
691 assert!(!mask.get(i), "OR-pass row {} leaked despite not being in existing", i);
692 }
693 }
694}