use arrow::array::{ArrayRef, Int64Array, ListArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field};
use criterion::{
criterion_group, criterion_main, {BenchmarkId, Criterion},
};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion};
use rand::SeedableRng;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use std::collections::HashSet;
use std::hint::black_box;
use std::sync::Arc;
const NUM_ROWS: usize = 1000;
const ARRAY_SIZES: &[usize] = &[10, 50, 100];
const SEED: u64 = 42;
fn criterion_benchmark(c: &mut Criterion) {
bench_array_union(c);
bench_array_intersect(c);
bench_array_distinct(c);
}
fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(array1.clone()),
ColumnarValue::Array(array2.clone()),
],
arg_fields: vec![
Field::new("arr1", array1.data_type().clone(), false).into(),
Field::new("arr2", array2.data_type().clone(), false).into(),
],
number_rows: NUM_ROWS,
return_field: Field::new("result", array1.data_type().clone(), false).into(),
config_options: Arc::new(ConfigOptions::default()),
})
.unwrap(),
);
}
fn bench_array_union(c: &mut Criterion) {
let mut group = c.benchmark_group("array_union");
let udf = ArrayUnion::new();
for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
for &array_size in ARRAY_SIZES {
let (array1, array2) =
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
group.bench_with_input(
BenchmarkId::new(*overlap_label, array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
);
}
}
group.finish();
}
fn bench_array_intersect(c: &mut Criterion) {
let mut group = c.benchmark_group("array_intersect");
let udf = ArrayIntersect::new();
for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
for &array_size in ARRAY_SIZES {
let (array1, array2) =
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
group.bench_with_input(
BenchmarkId::new(*overlap_label, array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
);
}
}
group.finish();
}
fn bench_array_distinct(c: &mut Criterion) {
let mut group = c.benchmark_group("array_distinct");
let udf = ArrayDistinct::new();
for (duplicate_label, duplicate_ratio) in
&[("high_duplicate", 0.8), ("low_duplicate", 0.2)]
{
for &array_size in ARRAY_SIZES {
let array =
create_array_with_duplicates(NUM_ROWS, array_size, *duplicate_ratio);
group.bench_with_input(
BenchmarkId::new(*duplicate_label, array_size),
&array_size,
|b, _| {
b.iter(|| {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: vec![ColumnarValue::Array(array.clone())],
arg_fields: vec![
Field::new("arr", array.data_type().clone(), false)
.into(),
],
number_rows: NUM_ROWS,
return_field: Field::new(
"result",
array.data_type().clone(),
false,
)
.into(),
config_options: Arc::new(ConfigOptions::default()),
})
.unwrap(),
)
})
},
);
}
}
group.finish();
}
fn create_arrays_with_overlap(
num_rows: usize,
array_size: usize,
overlap_ratio: f64,
) -> (ArrayRef, ArrayRef) {
assert!((0.0..=1.0).contains(&overlap_ratio));
let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize;
let mut rng = StdRng::seed_from_u64(SEED);
let mut values1 = Vec::with_capacity(num_rows * array_size);
let mut values2 = Vec::with_capacity(num_rows * array_size);
for row in 0..num_rows {
let base = (row as i64) * (array_size as i64) * 2;
for i in 0..array_size {
values1.push(base + i as i64);
}
let mut positions: Vec<usize> = (0..array_size).collect();
positions.shuffle(&mut rng);
let overlap_positions: HashSet<_> =
positions[..overlap_count].iter().copied().collect();
for i in 0..array_size {
if overlap_positions.contains(&i) {
values2.push(base + i as i64);
} else {
values2.push(base + array_size as i64 + i as i64);
}
}
}
let values1 = Int64Array::from(values1);
let values2 = Int64Array::from(values2);
let field = Arc::new(Field::new("item", DataType::Int64, true));
let offsets = (0..=num_rows)
.map(|i| (i * array_size) as i32)
.collect::<Vec<i32>>();
let array1 = Arc::new(
ListArray::try_new(
field.clone(),
OffsetBuffer::new(offsets.clone().into()),
Arc::new(values1),
None,
)
.unwrap(),
);
let array2 = Arc::new(
ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values2),
None,
)
.unwrap(),
);
(array1, array2)
}
fn create_array_with_duplicates(
num_rows: usize,
array_size: usize,
duplicate_ratio: f64,
) -> ArrayRef {
assert!((0.0..=1.0).contains(&duplicate_ratio));
let unique_count = ((array_size as f64) * (1.0 - duplicate_ratio)).round() as usize;
let duplicate_count = array_size - unique_count;
let mut rng = StdRng::seed_from_u64(SEED);
let mut values = Vec::with_capacity(num_rows * array_size);
for row in 0..num_rows {
let base = (row as i64) * (array_size as i64) * 2;
for i in 0..unique_count {
values.push(base + i as i64);
}
let mut unique_indices: Vec<i64> =
(0..unique_count).map(|i| base + i as i64).collect();
unique_indices.shuffle(&mut rng);
for i in 0..duplicate_count {
values.push(unique_indices[i % unique_count]);
}
}
let values = Int64Array::from(values);
let field = Arc::new(Field::new("item", DataType::Int64, true));
let offsets = (0..=num_rows)
.map(|i| (i * array_size) as i32)
.collect::<Vec<i32>>();
Arc::new(
ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)
.unwrap(),
)
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);