use std::ops::Range;
use std::sync::Arc;
use arrow::array::{Array, ArrayRef, BooleanArray, PrimitiveArray, cast::AsArray};
use arrow::buffer::BooleanBuffer;
use arrow::datatypes::DataType;
use bytes::Bytes;
use clap::Parser;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use futures::StreamExt;
use liquid_cache::cache::CacheExpression;
use liquid_cache::liquid_array::{
IntegerSqueezePolicy, LiquidArray, LiquidPrimitiveArray, LiquidPrimitiveType,
LiquidSqueezedArray, SqueezeIoHandler,
};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[derive(Parser, Debug, Default, Clone)]
#[command(name = "Squeeze Integer Study")]
#[command(about = "Clamp vs Quantize squeeze on representative integer filters from ClickBench")]
struct CliArgs {
#[arg(long, default_value = "../../benchmark/clickbench/data/hits.parquet")]
parquet: String,
#[arg(long)]
limit: Option<usize>,
#[arg(long, default_value = "false")]
bench: bool,
}
#[derive(Debug, Clone)]
struct FilterCase {
column: String,
op: datafusion::logical_expr::Operator,
scalar: ScalarValue,
}
#[derive(Default, Debug, Clone)]
struct Stats {
rows: usize,
arrow_bytes: usize,
liquid_bytes: usize,
clamp_mem_bytes: usize,
clamp_disk_bytes: usize,
quant_mem_bytes: usize,
quant_disk_bytes: usize,
clamp_pred_io_bytes: usize,
quant_pred_io_bytes: usize,
clamp_select_io_bytes: usize,
quant_select_io_bytes: usize,
pred_cases: usize,
}
impl Stats {
fn add(&mut self, other: &Stats) {
self.rows += other.rows;
self.arrow_bytes += other.arrow_bytes;
self.liquid_bytes += other.liquid_bytes;
self.clamp_mem_bytes += other.clamp_mem_bytes;
self.clamp_disk_bytes += other.clamp_disk_bytes;
self.quant_mem_bytes += other.quant_mem_bytes;
self.quant_disk_bytes += other.quant_disk_bytes;
self.clamp_pred_io_bytes += other.clamp_pred_io_bytes;
self.quant_pred_io_bytes += other.quant_pred_io_bytes;
self.clamp_select_io_bytes += other.clamp_select_io_bytes;
self.quant_select_io_bytes += other.quant_select_io_bytes;
self.pred_cases += other.pred_cases;
}
}
fn representative_integer_filters() -> Vec<FilterCase> {
use datafusion::logical_expr::Operator as Op;
vec![
FilterCase {
column: "AdvEngineID".to_string(),
op: Op::NotEq,
scalar: ScalarValue::Int64(Some(0)),
},
FilterCase {
column: "UserID".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(435_090_932_899_640_449)),
},
FilterCase {
column: "CounterID".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(62)),
},
FilterCase {
column: "IsRefresh".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(0)),
},
FilterCase {
column: "DontCountHits".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(0)),
},
FilterCase {
column: "IsLink".to_string(),
op: Op::NotEq,
scalar: ScalarValue::Int64(Some(0)),
},
FilterCase {
column: "IsDownload".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(0)),
},
FilterCase {
column: "TraficSourceID".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(-1)),
},
FilterCase {
column: "TraficSourceID".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(6)),
},
FilterCase {
column: "RefererHash".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(3_594_120_000_172_545_465)),
},
FilterCase {
column: "URLHash".to_string(),
op: Op::Eq,
scalar: ScalarValue::Int64(Some(2_868_770_270_353_813_622)),
},
]
}
#[tokio::main]
async fn main() {
let args = CliArgs::parse();
let mut config = SessionConfig::default().with_batch_size(8192 * 2);
let options = config.options_mut();
options.execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config(config);
ctx.register_parquet("hits", &args.parquet, Default::default())
.await
.expect("register parquet");
let cases = representative_integer_filters();
println!("Squeeze Integer Study over {} case(s)", cases.len());
let mut grand = Stats::default();
for case in &cases {
let stats = run_case(&ctx, case, args.limit).await;
println!(
"Case on column '{}', op '{:?}', scalar {:?}:\n rows: {}\n sizes (bytes) -> arrow: {}, liquid: {}, clamp: {} (mem: {}, disk: {}), quant: {} (mem: {}, disk: {})\n io (bytes) -> pred: clamp {}, quant {}; select: clamp {}, quant {}",
case.column,
case.op,
case.scalar,
stats.rows,
stats.arrow_bytes,
stats.liquid_bytes,
stats.clamp_mem_bytes + stats.clamp_disk_bytes,
stats.clamp_mem_bytes,
stats.clamp_disk_bytes,
stats.quant_mem_bytes + stats.quant_disk_bytes,
stats.quant_mem_bytes,
stats.quant_disk_bytes,
stats.clamp_pred_io_bytes,
stats.quant_pred_io_bytes,
stats.clamp_select_io_bytes,
stats.quant_select_io_bytes
);
grand.add(&stats);
}
println!(
"TOTAL\n rows: {}\n sizes (bytes) -> arrow: {}, liquid: {}, clamp: {} (mem: {}, disk: {}), quant: {} (mem: {}, disk: {})\n io (bytes) -> pred: clamp {}, quant {}; select: clamp {}, quant {}",
grand.rows,
grand.arrow_bytes,
grand.liquid_bytes,
grand.clamp_mem_bytes + grand.clamp_disk_bytes,
grand.clamp_mem_bytes,
grand.clamp_disk_bytes,
grand.quant_mem_bytes + grand.quant_disk_bytes,
grand.quant_mem_bytes,
grand.quant_disk_bytes,
grand.clamp_pred_io_bytes,
grand.quant_pred_io_bytes,
grand.clamp_select_io_bytes,
grand.quant_select_io_bytes
);
}
async fn run_case(ctx: &SessionContext, case: &FilterCase, limit: Option<usize>) -> Stats {
let sql = if let Some(n) = limit {
format!("SELECT \"{}\" FROM \"hits\" LIMIT {n}", case.column)
} else {
format!("SELECT \"{}\" FROM \"hits\"", case.column)
};
let df = ctx.sql(&sql).await.expect("create df");
let mut stream = df.execute_stream().await.expect("execute stream");
let mut stats = Stats::default();
while let Some(batch_res) = stream.next().await {
let batch = batch_res.expect("stream batch");
let array: ArrayRef = batch.column(0).clone();
stats.rows += array.len();
stats.arrow_bytes += array.get_array_memory_size();
match array.data_type() {
DataType::Int8 => run_for_array::<arrow::datatypes::Int8Type>(&array, case, &mut stats),
DataType::Int16 => {
run_for_array::<arrow::datatypes::Int16Type>(&array, case, &mut stats)
}
DataType::Int32 => {
run_for_array::<arrow::datatypes::Int32Type>(&array, case, &mut stats)
}
DataType::Int64 => {
run_for_array::<arrow::datatypes::Int64Type>(&array, case, &mut stats)
}
DataType::UInt8 => {
run_for_array::<arrow::datatypes::UInt8Type>(&array, case, &mut stats)
}
DataType::UInt16 => {
run_for_array::<arrow::datatypes::UInt16Type>(&array, case, &mut stats)
}
DataType::UInt32 => {
run_for_array::<arrow::datatypes::UInt32Type>(&array, case, &mut stats)
}
DataType::UInt64 => {
run_for_array::<arrow::datatypes::UInt64Type>(&array, case, &mut stats)
}
DataType::Date32 => {
run_for_array::<arrow::datatypes::Date32Type>(&array, case, &mut stats)
}
DataType::Date64 => {
run_for_array::<arrow::datatypes::Date64Type>(&array, case, &mut stats)
}
_ => {}
}
}
stats
}
#[derive(Debug, Default)]
struct InMemorySqueezeIo {
bytes: Mutex<Option<Bytes>>,
bytes_read: AtomicUsize,
}
impl InMemorySqueezeIo {
fn set_bytes(&self, bytes: Bytes) {
*self.bytes.lock().unwrap() = Some(bytes);
}
fn bytes(&self) -> Bytes {
self.bytes
.lock()
.unwrap()
.clone()
.expect("in-memory squeeze bytes set")
}
fn reset_bytes_read(&self) {
self.bytes_read.store(0, Ordering::SeqCst);
}
fn bytes_read(&self) -> usize {
self.bytes_read.load(Ordering::SeqCst)
}
}
#[async_trait::async_trait]
impl SqueezeIoHandler for InMemorySqueezeIo {
async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
let bytes = self.bytes();
let out = match range {
Some(range) => bytes.slice(range.start as usize..range.end as usize),
None => bytes,
};
self.bytes_read.fetch_add(out.len(), Ordering::SeqCst);
Ok(out)
}
}
fn run_for_array<T: LiquidPrimitiveType>(array: &ArrayRef, case: &FilterCase, stats: &mut Stats)
where
<T as arrow::array::ArrowPrimitiveType>::Native: num_traits::cast::AsPrimitive<f64>
+ num_traits::FromPrimitive
+ num_traits::bounds::Bounded,
{
let prim = array.as_primitive::<T>().clone();
let liquid = LiquidPrimitiveArray::<T>::from_arrow_array(prim.clone());
stats.liquid_bytes += liquid.get_array_memory_size();
let hint = CacheExpression::PredicateColumn;
let mut lp = LiquidPrimitiveArray::<T>::from_arrow_array(prim.clone());
let clamp_io = Arc::new(InMemorySqueezeIo::default());
let clamp_hybrid_and_bytes = {
lp.set_squeeze_policy(IntegerSqueezePolicy::Clamp);
lp.squeeze(clamp_io.clone(), Some(&hint))
};
let mut lq = LiquidPrimitiveArray::<T>::from_arrow_array(prim.clone());
let quant_io = Arc::new(InMemorySqueezeIo::default());
let quant_hybrid_and_bytes = {
lq.set_squeeze_policy(IntegerSqueezePolicy::Quantize);
lq.squeeze(quant_io.clone(), Some(&hint))
};
if let Some((h, bytes)) = clamp_hybrid_and_bytes.as_ref() {
clamp_io.set_bytes(bytes.clone());
stats.clamp_mem_bytes += h.get_array_memory_size();
stats.clamp_disk_bytes += bytes.len();
}
if let Some((h, bytes)) = quant_hybrid_and_bytes.as_ref() {
quant_io.set_bytes(bytes.clone());
stats.quant_mem_bytes += h.get_array_memory_size();
stats.quant_disk_bytes += bytes.len();
}
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
let expr: std::sync::Arc<dyn datafusion::physical_plan::PhysicalExpr> =
std::sync::Arc::new(BinaryExpr::new(
std::sync::Arc::new(Column::new("col", 0)),
case.op,
std::sync::Arc::new(Literal::new(case.scalar.clone())),
));
let all_true = BooleanBuffer::new_set(prim.len());
if let Some((hy, _full_bytes)) = clamp_hybrid_and_bytes.clone() {
let (mask, pred_io_bytes) =
try_eval_or_fetch::<T>(&*hy, clamp_io.as_ref(), &expr, &all_true);
stats.clamp_pred_io_bytes += pred_io_bytes;
let sel = bool_array_to_selection(&mask);
let expected_filtered = filter_expected::<T>(&prim, &case.op, &case.scalar);
let sel_io = get_with_selection(&*hy, clamp_io.as_ref(), &sel, expected_filtered.as_ref());
stats.clamp_select_io_bytes += sel_io;
}
if let Some((hy, _full_bytes)) = quant_hybrid_and_bytes.clone() {
let (mask, pred_io_bytes) =
try_eval_or_fetch::<T>(&*hy, quant_io.as_ref(), &expr, &all_true);
stats.quant_pred_io_bytes += pred_io_bytes;
let sel = bool_array_to_selection(&mask);
let expected_filtered = filter_expected::<T>(&prim, &case.op, &case.scalar);
let sel_io = get_with_selection(&*hy, quant_io.as_ref(), &sel, expected_filtered.as_ref());
stats.quant_select_io_bytes += sel_io;
}
stats.pred_cases += 1;
}
fn try_eval_or_fetch<T: LiquidPrimitiveType>(
hybrid: &dyn LiquidSqueezedArray,
io: &InMemorySqueezeIo,
expr: &std::sync::Arc<dyn datafusion::physical_plan::PhysicalExpr>,
filter: &BooleanBuffer,
) -> (BooleanArray, usize) {
io.reset_bytes_read();
match futures::executor::block_on(hybrid.try_eval_predicate(expr, filter)) {
Some(mask) => (mask, io.bytes_read()),
None => {
let full_bytes = io.bytes();
let liq = LiquidPrimitiveArray::<T>::from_bytes(full_bytes.clone());
let arr = liq.to_arrow_array();
let mask = eval_on_arrow(&arr, expr);
(mask, full_bytes.len())
}
}
}
fn get_with_selection(
hybrid: &dyn LiquidSqueezedArray,
io: &InMemorySqueezeIo,
selection: &BooleanBuffer,
expected: &dyn Array,
) -> usize {
io.reset_bytes_read();
let arr = futures::executor::block_on(hybrid.filter(selection));
assert_eq!(arr.as_ref(), expected);
io.bytes_read()
}
fn eval_on_arrow(
array: &ArrayRef,
expr: &std::sync::Arc<dyn datafusion::physical_plan::PhysicalExpr>,
) -> BooleanArray {
use arrow::compute::cast;
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::datum::apply_cmp;
use datafusion::physical_plan::expressions::{BinaryExpr, Literal};
if let Some(be) = expr.as_any().downcast_ref::<BinaryExpr>()
&& let Some(lit) = be.right().as_any().downcast_ref::<Literal>()
{
let target_dt = scalar_data_type(lit.value()).unwrap_or_else(|| array.data_type().clone());
let lhs_arr = if &target_dt == array.data_type() {
array.clone()
} else {
cast(array, &target_dt).expect("cast lhs for comparison")
};
let lhs = ColumnarValue::Array(lhs_arr);
let rhs = ColumnarValue::Scalar(lit.value().clone());
let res = match be.op() {
datafusion::logical_expr::Operator::Eq => {
apply_cmp(datafusion::logical_expr::Operator::Eq, &lhs, &rhs)
}
datafusion::logical_expr::Operator::NotEq => {
apply_cmp(datafusion::logical_expr::Operator::NotEq, &lhs, &rhs)
}
datafusion::logical_expr::Operator::Lt => {
apply_cmp(datafusion::logical_expr::Operator::Lt, &lhs, &rhs)
}
datafusion::logical_expr::Operator::LtEq => {
apply_cmp(datafusion::logical_expr::Operator::LtEq, &lhs, &rhs)
}
datafusion::logical_expr::Operator::Gt => {
apply_cmp(datafusion::logical_expr::Operator::Gt, &lhs, &rhs)
}
datafusion::logical_expr::Operator::GtEq => {
apply_cmp(datafusion::logical_expr::Operator::GtEq, &lhs, &rhs)
}
_ => panic!("unsupported operator"),
}
.expect("cmp ok");
let arr = res.into_array(array.len()).unwrap();
arr.as_boolean().clone()
} else {
panic!("unexpected expression kind for numeric predicate")
}
}
fn bool_array_to_selection(mask: &BooleanArray) -> BooleanBuffer {
let iter = (0..mask.len()).map(|i| mask.is_valid(i) && mask.value(i));
BooleanBuffer::from_iter(iter)
}
fn filter_expected<T: arrow::array::ArrowPrimitiveType>(
prim: &PrimitiveArray<T>,
op: &datafusion::logical_expr::Operator,
scalar: &ScalarValue,
) -> ArrayRef {
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
let arr: ArrayRef = std::sync::Arc::new(prim.clone());
let expr: std::sync::Arc<dyn datafusion::physical_plan::PhysicalExpr> =
std::sync::Arc::new(BinaryExpr::new(
std::sync::Arc::new(Column::new("col", 0)),
*op,
std::sync::Arc::new(Literal::new(scalar.clone())),
));
let mask = eval_on_arrow(&arr, &expr);
arrow::compute::kernels::filter::filter(&arr, &mask).unwrap()
}
fn scalar_data_type(sv: &ScalarValue) -> Option<DataType> {
Some(match sv {
ScalarValue::Int8(_) => DataType::Int8,
ScalarValue::Int16(_) => DataType::Int16,
ScalarValue::Int32(_) => DataType::Int32,
ScalarValue::Int64(_) => DataType::Int64,
ScalarValue::UInt8(_) => DataType::UInt8,
ScalarValue::UInt16(_) => DataType::UInt16,
ScalarValue::UInt32(_) => DataType::UInt32,
ScalarValue::UInt64(_) => DataType::UInt64,
ScalarValue::Date32(_) => DataType::Date32,
ScalarValue::Date64(_) => DataType::Date64,
_ => return None,
})
}