use arrow::array::{ArrayRef, Int64Array, ListArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field};
use criterion::{
criterion_group, criterion_main, {BenchmarkId, Criterion},
};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use datafusion_functions_nested::position::ArrayPosition;
use rand::Rng;
use rand::SeedableRng;
use rand::rngs::StdRng;
use std::hint::black_box;
use std::sync::Arc;
const NUM_ROWS: usize = 10000;
const SEED: u64 = 42;
const NULL_DENSITY: f64 = 0.1;
const SENTINEL_NEEDLE: i64 = -1;
fn criterion_benchmark(c: &mut Criterion) {
for size in [10, 100, 500] {
bench_array_position(c, size);
}
}
fn bench_array_position(c: &mut Criterion, array_size: usize) {
let mut group = c.benchmark_group("array_position_i64");
let haystack_found_once = create_haystack_with_sentinel(
NUM_ROWS,
array_size,
NULL_DENSITY,
SENTINEL_NEEDLE,
0,
);
let haystack_found_many = create_haystack_with_sentinels(
NUM_ROWS,
array_size,
NULL_DENSITY,
SENTINEL_NEEDLE,
);
let haystack_not_found =
create_haystack_without_sentinel(NUM_ROWS, array_size, NULL_DENSITY);
let num_rows = haystack_not_found.len();
let arg_fields: Vec<Arc<Field>> = vec![
Field::new("haystack", haystack_not_found.data_type().clone(), false).into(),
Field::new("needle", DataType::Int64, false).into(),
];
let return_field: Arc<Field> = Field::new("result", DataType::UInt64, true).into();
let config_options = Arc::new(ConfigOptions::default());
let needle = ScalarValue::Int64(Some(SENTINEL_NEEDLE));
let args_found_once = vec![
ColumnarValue::Array(haystack_found_once.clone()),
ColumnarValue::Scalar(needle.clone()),
];
group.bench_with_input(
BenchmarkId::new("found_once", array_size),
&array_size,
|b, _| {
let udf = ArrayPosition::new();
b.iter(|| {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: args_found_once.clone(),
arg_fields: arg_fields.clone(),
number_rows: num_rows,
return_field: return_field.clone(),
config_options: config_options.clone(),
})
.unwrap(),
)
})
},
);
let args_found_many = vec![
ColumnarValue::Array(haystack_found_many.clone()),
ColumnarValue::Scalar(needle.clone()),
];
group.bench_with_input(
BenchmarkId::new("found_many", array_size),
&array_size,
|b, _| {
let udf = ArrayPosition::new();
b.iter(|| {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: args_found_many.clone(),
arg_fields: arg_fields.clone(),
number_rows: num_rows,
return_field: return_field.clone(),
config_options: config_options.clone(),
})
.unwrap(),
)
})
},
);
let args_not_found = vec![
ColumnarValue::Array(haystack_not_found.clone()),
ColumnarValue::Scalar(needle.clone()),
];
group.bench_with_input(
BenchmarkId::new("not_found", array_size),
&array_size,
|b, _| {
let udf = ArrayPosition::new();
b.iter(|| {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: args_not_found.clone(),
arg_fields: arg_fields.clone(),
number_rows: num_rows,
return_field: return_field.clone(),
config_options: config_options.clone(),
})
.unwrap(),
)
})
},
);
group.finish();
}
fn create_haystack_without_sentinel(
num_rows: usize,
array_size: usize,
null_density: f64,
) -> ArrayRef {
create_haystack_from_fn(num_rows, array_size, |_, _, rng| {
random_haystack_value(rng, array_size, null_density)
})
}
fn create_haystack_with_sentinel(
num_rows: usize,
array_size: usize,
null_density: f64,
sentinel: i64,
sentinel_index: usize,
) -> ArrayRef {
assert!(sentinel_index < array_size);
create_haystack_from_fn(num_rows, array_size, |_, col, rng| {
if col == sentinel_index {
Some(sentinel)
} else {
random_haystack_value(rng, array_size, null_density)
}
})
}
fn create_haystack_with_sentinels(
num_rows: usize,
array_size: usize,
null_density: f64,
sentinel: i64,
) -> ArrayRef {
create_haystack_from_fn(num_rows, array_size, |_, col, rng| {
if col % 2 == 0 {
Some(sentinel)
} else {
random_haystack_value(rng, array_size, null_density)
}
})
}
fn create_haystack_from_fn<F>(
num_rows: usize,
array_size: usize,
mut value_at: F,
) -> ArrayRef
where
F: FnMut(usize, usize, &mut StdRng) -> Option<i64>,
{
let mut rng = StdRng::seed_from_u64(SEED);
let mut values = Vec::with_capacity(num_rows * array_size);
for row in 0..num_rows {
for col in 0..array_size {
values.push(value_at(row, col, &mut rng));
}
}
let values = values.into_iter().collect::<Int64Array>();
let offsets = (0..=num_rows)
.map(|i| (i * array_size) as i32)
.collect::<Vec<i32>>();
Arc::new(
ListArray::try_new(
Arc::new(Field::new("item", DataType::Int64, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(values),
None,
)
.unwrap(),
)
}
fn random_haystack_value(
rng: &mut StdRng,
array_size: usize,
null_density: f64,
) -> Option<i64> {
if rng.random::<f64>() < null_density {
None
} else {
Some(rng.random_range(0..array_size as i64))
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);