use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
use arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
SchemaRef,
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::scheduler::Scheduler;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
use rand::distributions::Alphanumeric;
use rand::prelude::*;
use std::fs::File;
use std::io::Read;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
const NUM_BATCHES: usize = 2048;
const WRITE_RECORD_BATCH_SIZE: usize = 1024;
const ROW_GROUP_SIZE: usize = 1024 * 1024;
const EXPECTED_ROW_GROUPS: usize = 2;
fn schema() -> SchemaRef {
let string_dictionary_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
Arc::new(Schema::new(vec![
Field::new("dict_10_required", string_dictionary_type.clone(), false),
Field::new("dict_10_optional", string_dictionary_type.clone(), true),
Field::new("dict_100_required", string_dictionary_type.clone(), false),
Field::new("dict_100_optional", string_dictionary_type.clone(), true),
Field::new("dict_1000_required", string_dictionary_type.clone(), false),
Field::new("dict_1000_optional", string_dictionary_type, true),
Field::new("string_required", DataType::Utf8, false),
Field::new("string_optional", DataType::Utf8, true),
Field::new("i64_required", DataType::Int64, false),
Field::new("i64_optional", DataType::Int64, true),
Field::new("f64_required", DataType::Float64, false),
Field::new("f64_optional", DataType::Float64, true),
]))
}
fn generate_batch() -> RecordBatch {
let schema = schema();
let len = WRITE_RECORD_BATCH_SIZE;
RecordBatch::try_new(
schema,
vec![
generate_string_dictionary("prefix", 10, len, 1.0),
generate_string_dictionary("prefix", 10, len, 0.5),
generate_string_dictionary("prefix", 100, len, 1.0),
generate_string_dictionary("prefix", 100, len, 0.5),
generate_string_dictionary("prefix", 1000, len, 1.0),
generate_string_dictionary("prefix", 1000, len, 0.5),
generate_strings(0..100, len, 1.0),
generate_strings(0..100, len, 0.5),
generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
],
)
.unwrap()
}
fn generate_string_dictionary(
prefix: &str,
cardinality: usize,
len: usize,
valid_percent: f64,
) -> ArrayRef {
let mut rng = thread_rng();
let strings: Vec<_> = (0..cardinality)
.map(|x| format!("{}#{}", prefix, x))
.collect();
Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
|_| {
rng.gen_bool(valid_percent)
.then(|| strings[rng.gen_range(0..cardinality)].as_str())
},
)))
}
fn generate_strings(
string_length_range: Range<usize>,
len: usize,
valid_percent: f64,
) -> ArrayRef {
let mut rng = thread_rng();
Arc::new(StringArray::from_iter((0..len).map(|_| {
rng.gen_bool(valid_percent).then(|| {
let string_len = rng.gen_range(string_length_range.clone());
(0..string_len)
.map(|_| char::from(rng.sample(Alphanumeric)))
.collect::<String>()
})
})))
}
fn generate_primitive<T>(
len: usize,
valid_percent: f64,
range: Range<T::Native>,
) -> ArrayRef
where
T: ArrowPrimitiveType,
T::Native: SampleUniform,
{
let mut rng = thread_rng();
Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
rng.gen_bool(valid_percent)
.then(|| rng.gen_range(range.clone()))
})))
}
fn generate_file() -> NamedTempFile {
let now = Instant::now();
let mut named_file = tempfile::Builder::new()
.prefix("parquet_query_sql")
.suffix(".parquet")
.tempfile()
.unwrap();
println!("Generating parquet file - {}", named_file.path().display());
let schema = schema();
let properties = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_max_row_group_size(ROW_GROUP_SIZE)
.build();
let mut writer =
ArrowWriter::try_new(&mut named_file, schema, Some(properties)).unwrap();
for _ in 0..NUM_BATCHES {
let batch = generate_batch();
writer.write(&batch).unwrap();
}
let metadata = writer.close().unwrap();
assert_eq!(
metadata.num_rows as usize,
WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
);
assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);
println!(
"Generated parquet file in {} seconds",
now.elapsed().as_secs_f32()
);
named_file
}
fn criterion_benchmark(c: &mut Criterion) {
let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
Ok(file) => (file, None),
Err(_) => {
let temp_file = generate_file();
(temp_file.path().display().to_string(), Some(temp_file))
}
};
assert!(Path::new(&file_path).exists(), "path not found");
println!("Using parquet file {}", file_path);
let partitions = 4;
let config = SessionConfig::new().with_target_partitions(partitions);
let context = SessionContext::with_config(config);
let scheduler = Scheduler::new(partitions);
let local_rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let query_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(partitions)
.build()
.unwrap();
local_rt
.block_on(context.register_parquet("t", file_path.as_str(), Default::default()))
.unwrap();
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
let mut queries = String::new();
queries_file.read_to_string(&mut queries).unwrap();
for query in queries.split(';') {
let query = query.trim();
let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect();
let query = query.join(" ");
if query.is_empty() {
continue;
}
c.bench_function(&format!("tokio: {}", query), |b| {
b.iter(|| {
let query = query.clone();
let context = context.clone();
let (sender, mut receiver) = futures::channel::mpsc::unbounded();
query_rt.spawn(async move {
let query = context.sql(&query).await.unwrap();
let mut stream = query.execute_stream().await.unwrap();
while let Some(next) = stream.next().await {
sender.unbounded_send(next).unwrap();
}
});
local_rt.block_on(async {
while receiver.next().await.transpose().unwrap().is_some() {}
})
});
});
c.bench_function(&format!("scheduled: {}", query), |b| {
b.iter(|| {
let query = query.clone();
let context = context.clone();
local_rt.block_on(async {
let query = context.sql(&query).await.unwrap();
let plan = query.create_physical_plan().await.unwrap();
let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
let mut stream = results.stream();
while stream.next().await.transpose().unwrap().is_some() {}
});
});
});
}
std::mem::drop(temp_file);
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);