#![allow(
clippy::cast_possible_truncation,
clippy::expect_used,
clippy::print_stdout,
clippy::print_stderr,
clippy::unwrap_in_result,
clippy::unwrap_used
)]
use std::env;
use std::fs::File;
use std::path::PathBuf;
use std::time::Instant;
use arrow_array::Array;
use arrow_array::cast::AsArray;
use onpair::Bits;
use onpair::Config;
use onpair::Threshold;
use onpair::compress;
use onpair::decompress;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
const BITS: &[u8] = &[12, 16];
fn main() {
let max_bytes = env::var("ONPAIR_BENCH_MAX_BYTES")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(1 << 30);
let iters = env::var("ONPAIR_BENCH_ITERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(3);
let threshold = env::var("ONPAIR_BENCH_THRESHOLD")
.ok()
.and_then(|s| s.parse::<f64>().ok())
.map(|t| Threshold::new(t).expect("ONPAIR_BENCH_THRESHOLD must be in (0.0, 1.0]"))
.unwrap_or_else(|| Threshold::new(0.2).expect("0.2 is in range"));
let (source, bytes, offsets) = load_corpus(max_bytes);
let n = offsets.len() - 1;
let total = bytes.len();
let mib = total as f64 / (1024.0 * 1024.0);
println!("corpus: {source}\n rows = {n}, bytes = {mib:.2} MiB");
for &bits in BITS {
println!("\n=== bits = {bits} ===");
let cfg = Config {
bits: Bits::new(bits).expect("BITS entries are in 9..=16"),
threshold,
seed: Some(42),
};
let mut compress_secs = f64::MAX;
let mut col = compress(&bytes, &offsets, cfg).expect("compress");
for _ in 0..iters {
let t = Instant::now();
col = compress(&bytes, &offsets, cfg).expect("compress");
compress_secs = compress_secs.min(t.elapsed().as_secs_f64());
}
let parts = col.as_parts();
let mut decompress_secs = f64::MAX;
let mut decoded = decompress(parts);
for _ in 0..iters {
let t = Instant::now();
decoded = decompress(col.as_parts());
decompress_secs = decompress_secs.min(t.elapsed().as_secs_f64());
}
let dict_bytes = parts.dict_bytes.len();
let dict_offsets = parts.dict_offsets.len() * 4;
let codes = parts.codes.len() * 2;
let code_offsets = std::mem::size_of_val(col.code_offsets.as_slice());
let compressed = dict_bytes + dict_offsets + codes + code_offsets;
let comp_mib = compressed as f64 / (1024.0 * 1024.0);
println!(
" compress: {compress_secs:.3}s {:.1} MiB/s",
mib / compress_secs
);
println!(
" decompress: {decompress_secs:.3}s {:.1} MiB/s",
mib / decompress_secs
);
println!(
" dict tokens = {}, dict bytes = {dict_bytes}, codes = {} ({} bytes)",
parts.dict_offsets.len() - 1,
parts.codes.len(),
codes
);
println!(
" compressed = {comp_mib:.2} MiB, ratio = {:.3}x",
mib / comp_mib
);
let roundtrip_ok = decoded == bytes;
println!(
" roundtrip: {} (decoded={:.2} MiB)",
if roundtrip_ok { "PASS" } else { "FAIL" },
decoded.len() as f64 / (1024.0 * 1024.0)
);
assert!(roundtrip_ok, "roundtrip mismatch at bits={bits}");
}
}
fn load_corpus(max_bytes: usize) -> (String, Vec<u8>, Vec<u64>) {
if let Ok(path) = env::var("ONPAIR_BENCH_PARQUET")
&& let Some((bytes, offsets)) = read_parquet(&PathBuf::from(&path), max_bytes)
{
return (format!("{path} (parquet)"), bytes, offsets);
}
let (bytes, offsets) = synthetic(max_bytes);
(
"synthetic TPC-H-comment-shaped corpus".to_string(),
bytes,
offsets,
)
}
fn read_parquet(path: &PathBuf, max_bytes: usize) -> Option<(Vec<u8>, Vec<u64>)> {
let file = File::open(path).ok()?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).ok()?;
let schema = builder.schema().clone();
let col_name = env::var("ONPAIR_BENCH_COLUMN").ok();
let picked = match col_name.as_deref() {
Some(name) => schema.fields().iter().position(|f| f.name() == name)?,
None => schema.fields().iter().position(|f| {
use arrow_schema::DataType::*;
matches!(f.data_type(), Utf8 | LargeUtf8 | Utf8View)
})?,
};
eprintln!(
"[bench] column #{picked} `{}`",
schema.fields().get(picked).unwrap().name()
);
let mut bytes = Vec::new();
let mut offsets = vec![0u64];
let reader = builder.build().ok()?;
'outer: for batch in reader.flatten() {
let arr = batch.column(picked);
use arrow_schema::DataType::*;
macro_rules! push_iter {
($it:expr) => {
for s in $it {
let b = s.unwrap_or("").as_bytes();
bytes.extend_from_slice(b);
offsets.push(bytes.len() as u64);
if bytes.len() >= max_bytes {
break 'outer;
}
}
};
}
match arr.data_type() {
Utf8 => push_iter!(arr.as_string::<i32>().iter()),
LargeUtf8 => push_iter!(arr.as_string::<i64>().iter()),
Utf8View => push_iter!(arr.as_string_view().iter()),
_ => return None,
}
}
Some((bytes, offsets))
}
fn synthetic(max_bytes: usize) -> (Vec<u8>, Vec<u64>) {
const WORDS: &[&str] = &[
"the",
"quickly",
"final",
"regular",
"ironic",
"express",
"packages",
"accounts",
"deposits",
"foxes",
"requests",
"blithely",
"carefully",
"furiously",
"slyly",
"pending",
"unusual",
"even",
"bold",
"silent",
"theodolites",
"instructions",
"asymptotes",
"across",
"above",
"after",
"among",
"around",
"thinly",
"sometimes",
"boldly",
"fluffily",
];
let mut bytes = Vec::with_capacity(max_bytes.min(1 << 28));
let mut offsets = vec![0u64];
let mut x = 0x9E3779B97F4A7C15u64;
while bytes.len() < max_bytes {
let nwords = 3 + (x >> 60) as usize % 7;
let start = bytes.len();
for w in 0..nwords {
x = x.wrapping_add(0x9E3779B97F4A7C15);
x ^= x >> 31;
if w > 0 {
bytes.push(b' ');
}
bytes.extend_from_slice(WORDS[(x as usize) % WORDS.len()].as_bytes());
}
if bytes.len() == start {
bytes.push(b' ');
}
offsets.push(bytes.len() as u64);
}
(bytes, offsets)
}