use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use aisle::{CmpOp, Expr};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::ScalarValue;
use fusio::executor::Executor;
use futures::Stream;
use pin_project_lite::pin_project;
use thiserror::Error;
use typed_arrow_dyn::{DynBuilders, DynProjection, DynRow, DynSchema};
use crate::query::stream::{StreamError, merge::MergeStream};
pin_project! {
pub struct PackageStream<'t, E>
where
E: Executor,
{
row_count: usize,
batch_size: usize,
#[pin]
inner: MergeStream<'t, E>,
builder: DynRecordBatchBuilder,
residual_predicate: Option<Expr>,
evaluator: Option<ResidualEvaluator>,
scan_schema: SchemaRef,
scan_dyn_schema: DynSchema,
projection: Option<DynProjection>,
limit: Option<usize>,
total_emitted: usize,
}
}
impl<'t, E> PackageStream<'t, E>
where
E: Executor + Clone + 'static,
{
#[cfg(test)]
pub(crate) fn new(
batch_size: usize,
merge: MergeStream<'t, E>,
scan_schema: SchemaRef,
result_schema: SchemaRef,
residual_predicate: Option<Expr>,
) -> Result<Self, StreamError> {
Self::with_limit(
batch_size,
merge,
scan_schema,
result_schema,
residual_predicate,
None,
)
}
pub(crate) fn with_limit(
batch_size: usize,
merge: MergeStream<'t, E>,
scan_schema: SchemaRef,
result_schema: SchemaRef,
residual_predicate: Option<Expr>,
limit: Option<usize>,
) -> Result<Self, StreamError> {
assert!(batch_size > 0, "batch size must be greater than zero");
let evaluator = if residual_predicate.is_some() {
Some(ResidualEvaluator::new(&scan_schema))
} else {
None
};
let projection = if scan_schema.as_ref() == result_schema.as_ref() {
None
} else {
Some(DynProjection::from_schema(
scan_schema.as_ref(),
result_schema.as_ref(),
)?)
};
Ok(Self {
row_count: 0,
batch_size,
inner: merge,
builder: DynRecordBatchBuilder::new(result_schema, batch_size),
residual_predicate,
evaluator,
scan_schema: Arc::clone(&scan_schema),
scan_dyn_schema: DynSchema::from_ref(scan_schema),
projection,
limit,
total_emitted: 0,
})
}
}
impl<'t, E> Stream for PackageStream<'t, E>
where
E: Executor + Clone + 'static,
{
type Item = Result<RecordBatch, StreamError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(limit) = *this.limit
&& *this.total_emitted >= limit
{
return Poll::Ready(None);
}
let mut upstream_done = false;
let remaining_limit = this
.limit
.map(|l| l.saturating_sub(*this.total_emitted))
.unwrap_or(usize::MAX);
let effective_batch_size = (*this.batch_size).min(remaining_limit);
while *this.row_count < effective_batch_size {
match this.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(entry))) => {
if let Some(row) = entry.into_row() {
if let (Some(predicate), Some(evaluator)) =
(this.residual_predicate.as_ref(), this.evaluator.as_ref())
{
match evaluator.matches_owned(predicate, &row) {
Ok(true) => {}
Ok(false) => continue,
Err(err) => {
return Poll::Ready(Some(Err(err.into())));
}
}
}
let projected = if let Some(projection) = this.projection.as_ref() {
let mut builders = DynBuilders::new(Arc::clone(&*this.scan_schema), 1);
builders.append_option_row(Some(row))?;
let batch = builders.try_finish_into_batch()?;
let raw =
projection.project_row_raw(&*this.scan_dyn_schema, &batch, 0)?;
raw.into_owned().map_err(StreamError::from)?
} else {
row
};
if let Err(err) = this.builder.append_row(projected) {
return Poll::Ready(Some(Err(err)));
}
*this.row_count += 1;
continue;
}
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
Poll::Ready(None) => {
upstream_done = true;
break;
}
Poll::Pending => break,
}
}
if *this.row_count == 0 {
return if upstream_done {
Poll::Ready(None)
} else {
Poll::Pending
};
}
let batch = match this.builder.finish_batch() {
Ok(batch) => batch,
Err(err) => return Poll::Ready(Some(Err(err))),
};
*this.total_emitted += batch.num_rows();
*this.row_count = 0;
Poll::Ready(Some(Ok(batch)))
}
}
struct DynRecordBatchBuilder {
schema: SchemaRef,
batch_size: usize,
builders: DynBuilders,
}
impl DynRecordBatchBuilder {
fn new(schema: SchemaRef, batch_size: usize) -> Self {
let builders = DynBuilders::new(schema.clone(), batch_size);
Self {
schema,
batch_size,
builders,
}
}
fn append_row(&mut self, row: DynRow) -> Result<(), StreamError> {
self.builders.append_option_row(Some(row))?;
Ok(())
}
fn finish_batch(&mut self) -> Result<RecordBatch, StreamError> {
let builders = std::mem::replace(
&mut self.builders,
DynBuilders::new(self.schema.clone(), self.batch_size),
);
Ok(builders.try_finish_into_batch()?)
}
}
#[derive(Debug, Error)]
pub enum ResidualError {
#[error("column {0} not found in projection")]
MissingColumn(Arc<str>),
#[error("unsupported column type for predicate evaluation")]
UnsupportedColumn,
#[error("invalid utf8 data")]
InvalidUtf8,
#[error("predicate evaluation returned no value")]
MissingValue,
#[error("predicate evaluation produced a residual clause")]
UnexpectedResidual,
#[error("predicate evaluation encountered unsupported predicate")]
UnsupportedPredicate,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum TriState {
True,
False,
Unknown,
}
impl TriState {
fn from_bool(value: bool) -> Self {
if value { Self::True } else { Self::False }
}
fn and(self, other: Self) -> Self {
match (self, other) {
(Self::False, _) | (_, Self::False) => Self::False,
(Self::True, Self::True) => Self::True,
_ => Self::Unknown,
}
}
fn or(self, other: Self) -> Self {
match (self, other) {
(Self::True, _) | (_, Self::True) => Self::True,
(Self::False, Self::False) => Self::False,
_ => Self::Unknown,
}
}
fn not(self) -> Self {
match self {
Self::True => Self::False,
Self::False => Self::True,
Self::Unknown => Self::Unknown,
}
}
fn is_true(self) -> bool {
matches!(self, Self::True)
}
}
struct ResidualEvaluator {
column_map: HashMap<Arc<str>, usize>,
}
impl ResidualEvaluator {
fn new(schema: &SchemaRef) -> Self {
let column_map = schema
.fields()
.iter()
.enumerate()
.map(|(idx, field)| (Arc::<str>::from(field.name().as_str()), idx))
.collect();
Self { column_map }
}
fn matches_owned(&self, predicate: &Expr, row: &DynRow) -> Result<bool, ResidualError> {
let outcome = self.evaluate_expr(predicate, row)?;
Ok(outcome.is_true())
}
fn evaluate_expr(&self, expr: &Expr, row: &DynRow) -> Result<TriState, ResidualError> {
match expr {
Expr::True => Ok(TriState::True),
Expr::False => Ok(TriState::False),
Expr::Cmp { column, op, value } => self.evaluate_cmp(column, *op, value, row),
Expr::Between {
column,
low,
high,
inclusive,
} => self.evaluate_between(column, low, high, *inclusive, row),
Expr::InList { column, values } => self.evaluate_in_list(column, values, row),
Expr::BloomFilterEq { .. } | Expr::BloomFilterInList { .. } => {
Err(ResidualError::UnsupportedPredicate)
}
Expr::StartsWith { column, prefix } => self.evaluate_starts_with(column, prefix, row),
Expr::IsNull { column, negated } => self.evaluate_is_null(column, *negated, row),
Expr::And(children) => {
if children.is_empty() {
return Ok(TriState::True);
}
let mut result = TriState::True;
for child in children {
result = result.and(self.evaluate_expr(child, row)?);
if result == TriState::False {
return Ok(result);
}
}
Ok(result)
}
Expr::Or(children) => {
if children.is_empty() {
return Ok(TriState::False);
}
let mut result = TriState::False;
for child in children {
result = result.or(self.evaluate_expr(child, row)?);
if result == TriState::True {
return Ok(result);
}
}
Ok(result)
}
Expr::Not(child) => Ok(self.evaluate_expr(child, row)?.not()),
_ => Err(ResidualError::UnexpectedResidual),
}
}
fn evaluate_cmp(
&self,
column: &str,
op: CmpOp,
value: &ScalarValue,
row: &DynRow,
) -> Result<TriState, ResidualError> {
let Some(lhs) = self.resolve_column(column, row)? else {
return Ok(TriState::Unknown);
};
if lhs.is_null() || value.is_null() {
return Ok(TriState::Unknown);
}
let ordering = compare_scalar_values(&lhs, value);
let result = match op {
CmpOp::Eq => ordering.map(|ord| ord == std::cmp::Ordering::Equal),
CmpOp::NotEq => ordering.map(|ord| ord != std::cmp::Ordering::Equal),
CmpOp::Lt => ordering.map(|ord| ord == std::cmp::Ordering::Less),
CmpOp::LtEq => ordering.map(|ord| ord != std::cmp::Ordering::Greater),
CmpOp::Gt => ordering.map(|ord| ord == std::cmp::Ordering::Greater),
CmpOp::GtEq => ordering.map(|ord| ord != std::cmp::Ordering::Less),
};
Ok(result.map_or(TriState::Unknown, TriState::from_bool))
}
fn evaluate_between(
&self,
column: &str,
low: &ScalarValue,
high: &ScalarValue,
inclusive: bool,
row: &DynRow,
) -> Result<TriState, ResidualError> {
let op_low = if inclusive { CmpOp::GtEq } else { CmpOp::Gt };
let op_high = if inclusive { CmpOp::LtEq } else { CmpOp::Lt };
let lower = self.evaluate_cmp(column, op_low, low, row)?;
let upper = self.evaluate_cmp(column, op_high, high, row)?;
Ok(lower.and(upper))
}
fn evaluate_in_list(
&self,
column: &str,
values: &[ScalarValue],
row: &DynRow,
) -> Result<TriState, ResidualError> {
let Some(lhs) = self.resolve_column(column, row)? else {
return Ok(TriState::Unknown);
};
if lhs.is_null() {
return Ok(TriState::Unknown);
}
let mut saw_null = false;
for value in values {
if value.is_null() {
saw_null = true;
continue;
}
if let Some(ordering) = compare_scalar_values(&lhs, value) {
if ordering == std::cmp::Ordering::Equal {
return Ok(TriState::True);
}
} else {
saw_null = true;
}
}
if saw_null {
Ok(TriState::Unknown)
} else {
Ok(TriState::False)
}
}
fn evaluate_starts_with(
&self,
column: &str,
prefix: &str,
row: &DynRow,
) -> Result<TriState, ResidualError> {
let Some(value) = self.resolve_column(column, row)? else {
return Ok(TriState::Unknown);
};
if value.is_null() {
return Ok(TriState::Unknown);
}
match value {
ScalarValue::Utf8(Some(value))
| ScalarValue::LargeUtf8(Some(value))
| ScalarValue::Utf8View(Some(value)) => {
Ok(TriState::from_bool(value.starts_with(prefix)))
}
_ => Err(ResidualError::UnsupportedColumn),
}
}
fn evaluate_is_null(
&self,
column: &str,
negated: bool,
row: &DynRow,
) -> Result<TriState, ResidualError> {
let value = self.resolve_column(column, row)?;
let is_null = value.as_ref().is_none_or(ScalarValue::is_null);
Ok(TriState::from_bool(if negated {
!is_null
} else {
is_null
}))
}
fn resolve_column(
&self,
column: &str,
row: &DynRow,
) -> Result<Option<ScalarValue>, ResidualError> {
let idx = self
.column_map
.get(column)
.copied()
.ok_or_else(|| ResidualError::MissingColumn(Arc::<str>::from(column)))?;
let cell = row
.0
.get(idx)
.ok_or_else(|| ResidualError::MissingColumn(Arc::<str>::from(column)))?;
match cell {
None => Ok(None),
Some(c) => convert_owned_cell(c).map(Some),
}
}
}
fn compare_scalar_values(lhs: &ScalarValue, rhs: &ScalarValue) -> Option<std::cmp::Ordering> {
if lhs.data_type() == rhs.data_type() {
return lhs.partial_cmp(rhs);
}
numeric_compare(lhs, rhs)
}
fn numeric_compare(lhs: &ScalarValue, rhs: &ScalarValue) -> Option<std::cmp::Ordering> {
let lhs_is_float = is_float_scalar(lhs);
let rhs_is_float = is_float_scalar(rhs);
match (lhs_is_float, rhs_is_float) {
(true, true) => {
let lhs_val = scalar_to_f64(lhs)?;
let rhs_val = scalar_to_f64(rhs)?;
lhs_val.partial_cmp(&rhs_val)
}
(false, false) => {
let lhs_val = scalar_to_i128(lhs)?;
let rhs_val = scalar_to_i128(rhs)?;
Some(lhs_val.cmp(&rhs_val))
}
(true, false) => {
let lhs_val = scalar_to_f64(lhs)?;
let rhs_val = scalar_to_i128(rhs)?;
compare_i128_f64(rhs_val, lhs_val).map(std::cmp::Ordering::reverse)
}
(false, true) => {
let lhs_val = scalar_to_i128(lhs)?;
let rhs_val = scalar_to_f64(rhs)?;
compare_i128_f64(lhs_val, rhs_val)
}
}
}
fn compare_i128_f64(int_val: i128, float_val: f64) -> Option<std::cmp::Ordering> {
if float_val.is_nan() {
return None;
}
if float_val.is_infinite() {
return Some(if float_val.is_sign_positive() {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
});
}
if float_val == 0.0 {
return Some(int_val.cmp(&0));
}
if int_val == 0 {
return Some(if float_val.is_sign_positive() {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
});
}
let int_neg = int_val.is_negative();
let float_neg = float_val.is_sign_negative();
if int_neg != float_neg {
return Some(if int_neg {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
});
}
let int_abs = i128_abs_to_u128(int_val);
let (mantissa, exp2) = decompose_f64(float_val.abs());
let ordering = compare_u128_f64(int_abs, mantissa, exp2);
Some(if int_neg {
ordering.reverse()
} else {
ordering
})
}
fn compare_u128_f64(int_abs: u128, mantissa: u64, exp2: i32) -> std::cmp::Ordering {
let int_bits = bit_length_u128(int_abs);
let mantissa_bits = bit_length_u64(mantissa);
if exp2 >= 0 {
let float_bits = mantissa_bits + exp2;
if float_bits > int_bits {
return std::cmp::Ordering::Less;
}
if float_bits < int_bits {
return std::cmp::Ordering::Greater;
}
let float_int = (mantissa as u128) << (exp2 as u32);
return int_abs.cmp(&float_int);
}
let shift = -exp2;
let scaled_int_bits = int_bits + shift;
if scaled_int_bits > mantissa_bits {
return std::cmp::Ordering::Greater;
}
if scaled_int_bits < mantissa_bits {
return std::cmp::Ordering::Less;
}
let scaled_int = int_abs << (shift as u32);
scaled_int.cmp(&(mantissa as u128))
}
fn decompose_f64(value: f64) -> (u64, i32) {
let bits = value.to_bits();
let exp_bits = ((bits >> 52) & 0x7ff) as i32;
let frac = bits & 0x000f_ffff_ffff_ffff;
if exp_bits == 0 {
(frac, -1074)
} else {
let mantissa = (1u64 << 52) | frac;
(mantissa, exp_bits - 1023 - 52)
}
}
fn i128_abs_to_u128(value: i128) -> u128 {
if value >= 0 {
value as u128
} else {
let value_u = value as u128;
(!value_u).wrapping_add(1)
}
}
fn bit_length_u128(value: u128) -> i32 {
if value == 0 {
0
} else {
128 - value.leading_zeros() as i32
}
}
fn bit_length_u64(value: u64) -> i32 {
if value == 0 {
0
} else {
64 - value.leading_zeros() as i32
}
}
fn is_float_scalar(value: &ScalarValue) -> bool {
matches!(
value,
ScalarValue::Float16(_) | ScalarValue::Float32(_) | ScalarValue::Float64(_)
)
}
fn scalar_to_i128(value: &ScalarValue) -> Option<i128> {
match value {
ScalarValue::Int8(Some(v)) => Some(i128::from(*v)),
ScalarValue::Int16(Some(v)) => Some(i128::from(*v)),
ScalarValue::Int32(Some(v)) => Some(i128::from(*v)),
ScalarValue::Int64(Some(v)) => Some(i128::from(*v)),
ScalarValue::UInt8(Some(v)) => Some(i128::from(*v)),
ScalarValue::UInt16(Some(v)) => Some(i128::from(*v)),
ScalarValue::UInt32(Some(v)) => Some(i128::from(*v)),
ScalarValue::UInt64(Some(v)) => Some(i128::from(*v)),
_ => None,
}
}
fn scalar_to_f64(value: &ScalarValue) -> Option<f64> {
match value {
ScalarValue::Float16(Some(v)) => Some(f32::from(*v) as f64),
ScalarValue::Float32(Some(v)) => Some(f64::from(*v)),
ScalarValue::Float64(Some(v)) => Some(*v),
ScalarValue::Int8(Some(v)) => Some(f64::from(*v)),
ScalarValue::Int16(Some(v)) => Some(f64::from(*v)),
ScalarValue::Int32(Some(v)) => Some(f64::from(*v)),
ScalarValue::Int64(Some(v)) => Some(*v as f64),
ScalarValue::UInt8(Some(v)) => Some(f64::from(*v)),
ScalarValue::UInt16(Some(v)) => Some(f64::from(*v)),
ScalarValue::UInt32(Some(v)) => Some(f64::from(*v)),
ScalarValue::UInt64(Some(v)) => Some(*v as f64),
_ => None,
}
}
fn convert_owned_cell(cell: &typed_arrow_dyn::DynCell) -> Result<ScalarValue, ResidualError> {
use typed_arrow_dyn::DynCell;
match cell {
DynCell::Str(s) => Ok(ScalarValue::Utf8(Some(s.to_string()))),
DynCell::Bin(b) => Ok(ScalarValue::Binary(Some(b.clone()))),
DynCell::Bool(v) => Ok(ScalarValue::Boolean(Some(*v))),
DynCell::I8(v) => Ok(ScalarValue::Int8(Some(*v))),
DynCell::I16(v) => Ok(ScalarValue::Int16(Some(*v))),
DynCell::I32(v) => Ok(ScalarValue::Int32(Some(*v))),
DynCell::I64(v) => Ok(ScalarValue::Int64(Some(*v))),
DynCell::U8(v) => Ok(ScalarValue::UInt8(Some(*v))),
DynCell::U16(v) => Ok(ScalarValue::UInt16(Some(*v))),
DynCell::U32(v) => Ok(ScalarValue::UInt32(Some(*v))),
DynCell::U64(v) => Ok(ScalarValue::UInt64(Some(*v))),
DynCell::F32(v) => Ok(ScalarValue::Float32(Some(*v))),
DynCell::F64(v) => Ok(ScalarValue::Float64(Some(*v))),
DynCell::Null => Ok(ScalarValue::Null),
_ => Err(ResidualError::UnsupportedColumn),
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::sync::Arc;
use arrow_array::{Array, Int64Array, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use futures::{StreamExt, executor::block_on};
use typed_arrow_dyn::{DynCell, DynRow};
use super::*;
use crate::{
extractor::{KeyProjection, projection_for_columns, projection_for_field},
inmem::mutable::memtable::DynMem,
mvcc::Timestamp,
query::{
Expr, ScalarValue,
stream::{Order, OwnedMutableScan, ScanStream, merge::MergeStream},
},
test::build_batch,
};
fn make_test_mem(schema: SchemaRef) -> DynMem {
let extractor: Arc<dyn KeyProjection> = projection_for_field(schema.clone(), 0)
.expect("extractor")
.into();
let delete_projection: Arc<dyn KeyProjection> = projection_for_columns(
Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)])),
vec![0],
)
.expect("delete projection")
.into();
DynMem::new(schema, extractor, delete_projection)
}
#[tokio::test(flavor = "current_thread")]
async fn package_stream_emits_multiple_batches() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int64, true),
]));
let mutable = make_test_mem(schema.clone());
let rows = (0..5)
.map(|idx| {
DynRow(vec![
Some(DynCell::Str(format!("k{idx}"))),
Some(DynCell::I64(idx as i64)),
])
})
.collect();
let batch = build_batch(schema.clone(), rows).expect("batch");
mutable
.insert_batch(batch, Timestamp::new(1))
.expect("insert batch");
let mutable_guard = mutable.read();
let mutable_scan =
OwnedMutableScan::from_guard(mutable_guard, None, Timestamp::MAX).expect("scan rows");
let merge = MergeStream::from_vec(
vec![ScanStream::<'_, fusio::executor::NoopExecutor>::from(
mutable_scan,
)],
None,
Some(Order::Asc),
)
.await
.expect("merge stream");
let mut stream = Box::pin(
PackageStream::new(2, merge, Arc::clone(&schema), Arc::clone(&schema), None)
.expect("package stream"),
);
let batches = block_on(async {
let mut out = Vec::new();
while let Some(batch) = stream.next().await {
out.push(batch.expect("batch ok"));
}
out
});
assert_eq!(batches.len(), 3, "expected three batches");
assert_eq!(batches[0].num_rows(), 2);
assert_eq!(batches[1].num_rows(), 2);
assert_eq!(batches[2].num_rows(), 1);
let mut collected = Vec::new();
for batch in batches {
let array = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.expect("int column");
for i in 0..array.len() {
collected.push(array.value(i));
}
}
assert_eq!(collected, vec![0, 1, 2, 3, 4]);
}
#[test]
fn package_stream_applies_residual_predicate() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int64, true),
]));
let mutable = make_test_mem(schema.clone());
let rows = vec![
DynRow(vec![
Some(DynCell::Str("keep".into())),
Some(DynCell::I64(10)),
]),
DynRow(vec![
Some(DynCell::Str("drop".into())),
Some(DynCell::I64(-5)),
]),
];
let batch = build_batch(schema.clone(), rows).expect("batch");
mutable
.insert_batch(batch, Timestamp::new(1))
.expect("insert batch");
let predicate = Expr::gt("v", ScalarValue::from(0i64));
let mutable_guard = mutable.read();
let mutable_scan =
OwnedMutableScan::from_guard(mutable_guard, None, Timestamp::MAX).expect("scan rows");
let merge = block_on(MergeStream::from_vec(
vec![ScanStream::<'_, fusio::executor::NoopExecutor>::from(
mutable_scan,
)],
None,
Some(Order::Asc),
))
.expect("merge stream");
let mut stream = Box::pin(
PackageStream::new(
1,
merge,
Arc::clone(&schema),
Arc::clone(&schema),
Some(predicate),
)
.expect("package stream"),
);
let collected = block_on(async {
let mut out = Vec::new();
while let Some(batch) = stream.next().await {
let batch = batch.expect("batch ok");
let ids = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("string column");
for i in 0..ids.len() {
out.push(ids.value(i).to_string());
}
}
out
});
assert_eq!(collected, vec!["keep"]);
}
#[test]
fn package_stream_surfaces_predicate_errors() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int64, true),
]));
let mutable = make_test_mem(schema.clone());
let rows = vec![DynRow(vec![
Some(DynCell::Str("only".into())),
Some(DynCell::I64(1)),
])];
let batch = build_batch(schema.clone(), rows).expect("batch");
mutable
.insert_batch(batch, Timestamp::new(1))
.expect("insert batch");
let predicate = Expr::gt("missing", ScalarValue::from(0i64));
let mutable_guard = mutable.read();
let mutable_scan =
OwnedMutableScan::from_guard(mutable_guard, None, Timestamp::MAX).expect("scan rows");
let merge = block_on(MergeStream::from_vec(
vec![ScanStream::<'_, fusio::executor::NoopExecutor>::from(
mutable_scan,
)],
None,
Some(Order::Asc),
))
.expect("merge stream");
let mut stream = Box::pin(
PackageStream::new(
1,
merge,
Arc::clone(&schema),
Arc::clone(&schema),
Some(predicate),
)
.expect("package stream"),
);
let err = block_on(async {
stream
.next()
.await
.expect("stream should yield error")
.expect_err("expected error")
});
match err {
StreamError::Predicate(ResidualError::MissingColumn(column)) => {
assert_eq!(column.as_ref(), "missing");
}
other => panic!("unexpected error {other:?}"),
}
}
#[test]
fn residual_cmp_null_yields_unknown() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, true)]));
let evaluator = ResidualEvaluator::new(&schema);
let row_null = DynRow(vec![Some(DynCell::Null)]);
let predicate = Expr::gt("v", ScalarValue::from(0i64));
let outcome = evaluator
.evaluate_expr(&predicate, &row_null)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::Unknown);
let row_value = DynRow(vec![Some(DynCell::I64(5))]);
let predicate_null = Expr::eq("v", ScalarValue::Int64(None));
let outcome_null = evaluator
.evaluate_expr(&predicate_null, &row_value)
.expect("evaluate predicate");
assert_eq!(outcome_null, TriState::Unknown);
}
#[test]
fn residual_in_list_null_semantics() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, true)]));
let evaluator = ResidualEvaluator::new(&schema);
let row_null = DynRow(vec![Some(DynCell::Null)]);
let predicate = Expr::in_list("v", vec![ScalarValue::from(1i64), ScalarValue::from(2i64)]);
let outcome = evaluator
.evaluate_expr(&predicate, &row_null)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::Unknown);
let row_value = DynRow(vec![Some(DynCell::I64(5))]);
let predicate_with_null =
Expr::in_list("v", vec![ScalarValue::from(1i64), ScalarValue::Int64(None)]);
let outcome_with_null = evaluator
.evaluate_expr(&predicate_with_null, &row_value)
.expect("evaluate predicate");
assert_eq!(outcome_with_null, TriState::Unknown);
let predicate_match =
Expr::in_list("v", vec![ScalarValue::Int64(None), ScalarValue::from(5i64)]);
let outcome_match = evaluator
.evaluate_expr(&predicate_match, &row_value)
.expect("evaluate predicate");
assert_eq!(outcome_match, TriState::True);
}
#[test]
fn residual_boolean_composition_preserves_unknown() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, true)]));
let evaluator = ResidualEvaluator::new(&schema);
let row = DynRow(vec![Some(DynCell::I64(5))]);
let truthy = Expr::gt("v", ScalarValue::from(0i64));
let falsy = Expr::lt("v", ScalarValue::from(0i64));
let unknown = Expr::eq("v", ScalarValue::Int64(None));
let and_unknown = Expr::and(vec![truthy.clone(), unknown.clone()]);
let outcome = evaluator
.evaluate_expr(&and_unknown, &row)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::Unknown);
let and_false = Expr::and(vec![falsy.clone(), unknown.clone()]);
let outcome = evaluator
.evaluate_expr(&and_false, &row)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::False);
let or_unknown = Expr::or(vec![falsy.clone(), unknown.clone()]);
let outcome = evaluator
.evaluate_expr(&or_unknown, &row)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::Unknown);
let or_true = Expr::or(vec![truthy, unknown.clone()]);
let outcome = evaluator
.evaluate_expr(&or_true, &row)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::True);
let not_unknown = Expr::not(unknown);
let outcome = evaluator
.evaluate_expr(¬_unknown, &row)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::Unknown);
}
#[test]
fn residual_is_null_semantics() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, true)]));
let evaluator = ResidualEvaluator::new(&schema);
let row_null = DynRow(vec![Some(DynCell::Null)]);
let is_null = Expr::is_null("v");
let is_not_null = Expr::is_not_null("v");
let outcome = evaluator
.evaluate_expr(&is_null, &row_null)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::True);
let outcome = evaluator
.evaluate_expr(&is_not_null, &row_null)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::False);
let row_value = DynRow(vec![Some(DynCell::I64(1))]);
let outcome = evaluator
.evaluate_expr(&is_null, &row_value)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::False);
let outcome = evaluator
.evaluate_expr(&is_not_null, &row_value)
.expect("evaluate predicate");
assert_eq!(outcome, TriState::True);
}
#[test]
fn residual_numeric_compare_large_uint64_vs_float64() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::UInt64, true)]));
let evaluator = ResidualEvaluator::new(&schema);
let row = DynRow(vec![Some(DynCell::U64(9_007_199_254_740_993))]);
let float_value = ScalarValue::Float64(Some(9_007_199_254_740_992.0));
let gt_predicate = Expr::gt("v", float_value.clone());
let eq_predicate = Expr::eq("v", float_value);
assert!(
evaluator
.matches_owned(>_predicate, &row)
.expect("gt predicate")
);
assert!(
!evaluator
.matches_owned(&eq_predicate, &row)
.expect("eq predicate")
);
}
#[test]
fn residual_numeric_compare_u64_near_max_vs_float64() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::UInt64, true)]));
let evaluator = ResidualEvaluator::new(&schema);
let row_value = 18_446_744_073_709_548_616_u64;
let row = DynRow(vec![Some(DynCell::U64(row_value))]);
let float_value = ScalarValue::Float64(Some(18_446_744_073_709_547_520.0));
let gt_predicate = Expr::gt("v", float_value.clone());
let eq_predicate = Expr::eq("v", float_value);
assert!(
evaluator
.matches_owned(>_predicate, &row)
.expect("gt predicate")
);
assert!(
!evaluator
.matches_owned(&eq_predicate, &row)
.expect("eq predicate")
);
}
}