use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
const N_ROWS: usize = 10_000;
fn narrow_batch(n: usize) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("score", DataType::Int64, false),
Field::new("cat", DataType::Int64, false),
Field::new("flag", DataType::Int64, false),
Field::new("ts", DataType::Int64, false),
]));
let col = |off: i64| -> Arc<dyn Array> {
Arc::new(Int64Array::from_iter_values(off..off + n as i64))
};
RecordBatch::try_new(
schema,
vec![col(0), col(1000), col(2000), col(3000), col(4000)],
)
.unwrap()
}
fn wide_batch(n: usize) -> RecordBatch {
let fields: Vec<Field> = (0..10)
.map(|i| Field::new(format!("col_{i}"), DataType::Utf8, false))
.collect();
let schema = Arc::new(Schema::new(fields));
let payload = "x".repeat(200);
let col = || -> Arc<dyn Array> {
Arc::new(StringArray::from(
(0..n).map(|_| payload.as_str()).collect::<Vec<_>>(),
))
};
let cols: Vec<Arc<dyn Array>> = (0..10).map(|_| col()).collect();
RecordBatch::try_new(schema, cols).unwrap()
}
fn hc_batch(n: usize) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("uuid_col", DataType::Utf8, false),
Field::new("email", DataType::Utf8, false),
]));
let uuids: Vec<String> = (0..n)
.map(|i| format!("550e8400-e29b-41d4-a716-{i:012}"))
.collect();
let emails: Vec<String> = (0..n)
.map(|i| format!("user{i}@bench.example.com"))
.collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(
uuids.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
emails.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
)),
],
)
.unwrap()
}
fn batch_bytes(batch: &RecordBatch) -> usize {
use arrow::array::Array;
batch
.columns()
.iter()
.map(|col| col.get_array_memory_size())
.sum()
}
fn count_auto_shrink_splits(batch: &RecordBatch, limit_bytes: usize) -> usize {
use arrow::array::Array;
fn split(b: &RecordBatch, limit: usize, acc: &mut usize) {
let bytes: usize = b.columns().iter().map(|c| c.get_array_memory_size()).sum();
if bytes <= limit || b.num_rows() <= 1 {
*acc += 1;
return;
}
let mid = b.num_rows() / 2;
let lo = b.slice(0, mid);
let hi = b.slice(mid, b.num_rows() - mid);
split(&lo, limit, acc);
split(&hi, limit, acc);
}
let mut count = 0;
split(batch, limit_bytes, &mut count);
count
}
fn bench_auto_shrink(c: &mut Criterion) {
let batch = wide_batch(N_ROWS);
let actual = batch_bytes(&batch);
let mut g = c.benchmark_group("auto_shrink");
g.throughput(Throughput::Elements(N_ROWS as u64));
g.bench_function("no_split_needed", |b| {
b.iter(|| std::hint::black_box(count_auto_shrink_splits(&batch, actual * 2)))
});
g.bench_function("half_cap_2_splits", |b| {
b.iter(|| std::hint::black_box(count_auto_shrink_splits(&batch, (actual / 2).max(1))))
});
g.bench_function("tight_cap_8_splits", |b| {
b.iter(|| std::hint::black_box(count_auto_shrink_splits(&batch, (actual / 8).max(1))))
});
g.bench_function("extreme_cap_single_rows", |b| {
let small = wide_batch(64); b.iter(|| std::hint::black_box(count_auto_shrink_splits(&small, 1)))
});
g.finish();
}
fn write_parquet_with_codec(
batch: &RecordBatch,
compression: parquet::basic::Compression,
) -> Vec<u8> {
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
let mut buf = Vec::new();
let props = WriterProperties::builder()
.set_compression(compression)
.build();
let mut w = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
w.write(batch).unwrap();
w.close().unwrap();
buf
}
fn bench_compression(c: &mut Criterion) {
use parquet::basic::{Compression, GzipLevel, ZstdLevel};
let batch = narrow_batch(N_ROWS);
let mut g = c.benchmark_group("compression_profiles");
g.throughput(Throughput::Elements(N_ROWS as u64));
g.bench_function("none", |b| {
b.iter(|| std::hint::black_box(write_parquet_with_codec(&batch, Compression::UNCOMPRESSED)))
});
g.bench_function("fast_snappy", |b| {
b.iter(|| std::hint::black_box(write_parquet_with_codec(&batch, Compression::SNAPPY)))
});
g.bench_function("balanced_zstd3", |b| {
b.iter(|| {
std::hint::black_box(write_parquet_with_codec(
&batch,
Compression::ZSTD(ZstdLevel::try_new(3).unwrap()),
))
})
});
g.bench_function("compact_zstd9", |b| {
b.iter(|| {
std::hint::black_box(write_parquet_with_codec(
&batch,
Compression::ZSTD(ZstdLevel::try_new(9).unwrap()),
))
})
});
g.bench_function("gzip6_reference", |b| {
b.iter(|| {
std::hint::black_box(write_parquet_with_codec(
&batch,
Compression::GZIP(GzipLevel::try_new(6).unwrap()),
))
})
});
g.finish();
}
fn compute_rows_per_group(schema: &Schema, target_mb: u64) -> usize {
let estimated_row_bytes: u64 = schema
.fields()
.iter()
.map(|f| match f.data_type() {
DataType::Int64 | DataType::Float64 | DataType::Timestamp(_, _) => 8,
DataType::Int32 | DataType::Float32 => 4,
DataType::Int16 => 2,
DataType::Boolean => 1,
DataType::Utf8 | DataType::LargeUtf8 => 64,
_ => 16,
})
.sum();
if estimated_row_bytes == 0 {
return 1000;
}
let target_bytes = target_mb * 1024 * 1024;
((target_bytes / estimated_row_bytes) as usize).max(1000)
}
fn bench_row_group(c: &mut Criterion) {
let narrow_schema = narrow_batch(1).schema();
let wide_schema = wide_batch(1).schema();
let mut g = c.benchmark_group("row_group_computation");
g.throughput(Throughput::Elements(1));
for mb in [32u64, 64, 128, 256] {
g.bench_function(format!("narrow_{mb}mb"), |b| {
b.iter(|| std::hint::black_box(compute_rows_per_group(&narrow_schema, mb)))
});
g.bench_function(format!("wide_{mb}mb"), |b| {
b.iter(|| std::hint::black_box(compute_rows_per_group(&wide_schema, mb)))
});
}
g.finish();
}
fn track_unique_hashes(
batch: &RecordBatch,
col_idx: usize,
cap: usize,
) -> (std::collections::HashSet<u64>, bool) {
use std::io::Write as _;
use xxhash_rust::xxh3::xxh3_64;
let col = batch.column(col_idx);
let fmt_opts = arrow::util::display::FormatOptions::default();
let mut set = std::collections::HashSet::with_capacity(cap.min(batch.num_rows()));
let mut capped = false;
if let Ok(fmt) = arrow::util::display::ArrayFormatter::try_new(col.as_ref(), &fmt_opts) {
let mut scratch = Vec::with_capacity(64);
for row in 0..col.len() {
if cap > 0 && set.len() >= cap {
capped = true;
break;
}
scratch.clear();
let _ = write!(scratch, "{}", fmt.value(row));
set.insert(xxh3_64(&scratch));
}
}
(set, capped)
}
fn bench_quality_cap(c: &mut Criterion) {
let batch = hc_batch(N_ROWS);
let mut g = c.benchmark_group("quality_uniqueness_cap");
g.throughput(Throughput::Elements(N_ROWS as u64));
g.bench_function("no_cap_all_rows", |b| {
b.iter(|| std::hint::black_box(track_unique_hashes(&batch, 0, 0)))
});
g.bench_function("cap_50pct", |b| {
b.iter(|| std::hint::black_box(track_unique_hashes(&batch, 0, N_ROWS / 2)))
});
g.bench_function("cap_10pct", |b| {
b.iter(|| std::hint::black_box(track_unique_hashes(&batch, 0, N_ROWS / 10)))
});
g.bench_function("cap_1", |b| {
b.iter(|| std::hint::black_box(track_unique_hashes(&batch, 0, 1)))
});
g.finish();
}
criterion_group!(
resource_aware_benches,
bench_auto_shrink,
bench_compression,
bench_row_group,
bench_quality_cap,
);
criterion_main!(resource_aware_benches);