use std::sync::Arc;
use std::time::Instant;
use arrow::array::{Array, ArrayRef, StringArray, cast::AsArray};
use arrow_schema::DataType;
use clap::Parser;
use datafusion::prelude::*;
use liquid_cache::liquid_array::raw::FsstArray;
use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::seq::{SliceRandom, index::sample};
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[derive(Parser, Debug, Clone)]
#[command(name = "FSST Selected Decode Benchmark")]
#[command(about = "Benchmark FsstArray::to_uncompressed_selected at multiple selectivities")]
struct CliArgs {
#[arg(long, default_value = "../../benchmark/clickbench/data/hits.parquet")]
parquet: String,
#[arg(long, value_delimiter = ',', default_value = "Title,URL")]
columns: Vec<String>,
#[arg(long)]
limit: Option<usize>,
#[arg(long, default_value_t = 8192 * 2)]
batch_size: usize,
#[arg(long, value_delimiter = ',', default_value = "1,10,50,99")]
selectivities: Vec<u8>,
#[arg(long, default_value_t = 5)]
iterations: usize,
#[arg(long, default_value = "false")]
bench: bool,
}
struct Selection {
pct: u8,
indices: Vec<usize>,
approx_bytes: usize,
}
#[tokio::main]
async fn main() {
let args = CliArgs::parse();
let mut config = SessionConfig::default().with_batch_size(args.batch_size);
let options = config.options_mut();
options.execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config(config);
ctx.register_parquet("hits", &args.parquet, Default::default())
.await
.expect("register parquet");
let mut score = 0;
for (col_idx, column) in args.columns.iter().enumerate() {
let array = load_column_array(&ctx, column, args.limit).await;
let row_count = array.len();
if row_count == 0 {
println!("Column {column}: no rows");
continue;
}
let compressor = Arc::new(FsstArray::train_compressor(
array.iter().flatten().map(|value| value.as_bytes()),
));
let fsst = FsstArray::from_byte_array_with_compressor(&array, compressor);
let total_uncompressed = fsst.uncompressed_bytes();
let avg_len = total_uncompressed as f64 / row_count as f64;
drop(array);
println!(
"Column {column}\n rows: {row_count}\n uncompressed: {}\n avg_len: {:.2} bytes",
format_bytes(total_uncompressed),
avg_len
);
let selections = build_selections(row_count, avg_len, &args.selectivities, col_idx as u64);
for selection in selections {
if selection.indices.is_empty() {
println!(
" selectivity {:>3}% -> selected: 0 (skipped)",
selection.pct
);
continue;
}
std::hint::black_box(fsst.to_uncompressed_selected(&selection.indices));
let mut total = 0.0;
let mut min = f64::MAX;
let mut max = 0.0_f64;
for _ in 0..args.iterations {
let start = Instant::now();
let output = fsst.to_uncompressed_selected(&selection.indices);
std::hint::black_box(output);
let elapsed = start.elapsed().as_secs_f64();
total += elapsed;
min = min.min(elapsed);
max = max.max(elapsed);
}
let avg = total / args.iterations as f64;
let values_per_sec = selection.indices.len() as f64 / avg;
let mb_per_sec = selection.approx_bytes as f64 / avg / (1024.0 * 1024.0);
println!(
" selectivity {:>3}% -> selected: {:>8} | avg: {:>8.6}s | min: {:>8.6}s | max: {:>8.6}s | {:>10.1} values/s | {:>8.1} MiB/s",
selection.pct,
selection.indices.len(),
avg,
min,
max,
values_per_sec,
mb_per_sec
);
score += mb_per_sec as usize;
}
}
println!("Final score: {score}");
}
async fn load_column_array(
ctx: &SessionContext,
column: &str,
limit: Option<usize>,
) -> StringArray {
let sql = if let Some(limit) = limit {
format!("SELECT \"{column}\" FROM \"hits\" LIMIT {limit}")
} else {
format!("SELECT \"{column}\" FROM \"hits\"")
};
let df = ctx.sql(&sql).await.expect("create df");
let batches = df.collect().await.expect("collect");
if batches.is_empty() {
return StringArray::from(Vec::<Option<&str>>::new());
}
let mut arrays = Vec::with_capacity(batches.len());
for batch in batches {
let array = batch.column(0).clone();
let array = if array.data_type() == &DataType::Utf8 {
array
} else {
arrow::compute::cast(&array, &DataType::Utf8).expect("cast to Utf8")
};
arrays.push(array);
}
concat_utf8_arrays(arrays)
}
fn concat_utf8_arrays(arrays: Vec<ArrayRef>) -> StringArray {
if arrays.is_empty() {
return StringArray::from(Vec::<Option<&str>>::new());
}
let refs: Vec<&dyn Array> = arrays.iter().map(|array| array.as_ref()).collect();
let concatenated = arrow::compute::concat(&refs).expect("concat arrays");
concatenated.as_string::<i32>().clone()
}
fn build_selections(len: usize, avg_len: f64, selectivities: &[u8], seed: u64) -> Vec<Selection> {
selectivities
.iter()
.copied()
.map(|pct| {
let mut rng = StdRng::seed_from_u64(seed ^ (pct as u64).wrapping_mul(0x9e37_79b9));
let indices = build_selection(len, pct, &mut rng);
let approx_bytes = (avg_len * indices.len() as f64).round() as usize;
Selection {
pct,
indices,
approx_bytes,
}
})
.collect()
}
fn build_selection(len: usize, pct: u8, rng: &mut StdRng) -> Vec<usize> {
if len == 0 || pct == 0 {
return Vec::new();
}
let mut target = len.saturating_mul(pct as usize) / 100;
if target == 0 {
target = 1;
}
if target >= len {
return (0..len).collect();
}
let mut selected = if target > len / 2 {
let remove_count = len - target;
let mut remove = vec![false; len];
for idx in sample(rng, len, remove_count) {
remove[idx] = true;
}
let mut indices = Vec::with_capacity(target);
for (idx, should_remove) in remove.iter().enumerate() {
if !should_remove {
indices.push(idx);
}
}
indices
} else {
sample(rng, len, target).into_vec()
};
selected.shuffle(rng);
selected
}
fn format_bytes(bytes: usize) -> String {
const KB: f64 = 1024.0;
const MB: f64 = 1024.0 * KB;
const GB: f64 = 1024.0 * MB;
let b = bytes as f64;
if b >= GB {
format!("{:.2} GiB", b / GB)
} else if b >= MB {
format!("{:.2} MiB", b / MB)
} else if b >= KB {
format!("{:.2} KiB", b / KB)
} else {
format!("{bytes} B")
}
}