use std::{collections::HashMap, sync::Arc};
use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_select::take::take;
use criterion::{Criterion, criterion_group, criterion_main};
use futures::StreamExt;
use lance_core::cache::LanceCache;
use lance_datagen::ArrayGeneratorExt;
use lance_encoding::{
decoder::{
DecodeBatchScheduler, DecoderConfig, DecoderPlugins, FilterExpression, create_decode_stream,
},
encoder::{EncodingOptions, default_encoding_strategy, encode_batch},
version::LanceFileVersion,
};
use tokio::sync::mpsc::unbounded_channel;
use rand::Rng;
const PRIMITIVE_TYPES: &[DataType] = &[
DataType::Date32,
DataType::Date64,
DataType::Int8,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::UInt8,
DataType::UInt16,
DataType::UInt32,
DataType::UInt64,
DataType::Float16,
DataType::Float32,
DataType::Float64,
DataType::Decimal128(10, 10),
DataType::Decimal256(10, 10),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Time32(TimeUnit::Second),
DataType::Time64(TimeUnit::Nanosecond),
DataType::Duration(TimeUnit::Second),
];
const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[DataType::Int8, DataType::Float32];
fn bench_decode(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_primitive");
const NUM_BYTES: u64 = 1024 * 1024 * 128;
group.throughput(criterion::Throughput::Bytes(NUM_BYTES));
for data_type in PRIMITIVE_TYPES {
let func_name = format!("{:?}", data_type).to_lowercase();
let num_rows = NUM_BYTES / data_type.primitive_width().unwrap() as u64;
group.bench_function(func_name, |b| {
let data = lance_datagen::gen_batch()
.anon_col(lance_datagen::array::rand_type(data_type))
.into_batch_rows(lance_datagen::RowCount::from(num_rows))
.unwrap();
let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::default());
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
LanceFileVersion::default(),
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
});
}
}
fn bench_decode_fsl(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_fsl");
const NUM_BYTES: u64 = 1024 * 1024 * 128;
for version in [
LanceFileVersion::V2_0,
LanceFileVersion::V2_1,
LanceFileVersion::V2_2,
] {
for data_type in PRIMITIVE_TYPES_FOR_FSL {
for dimension in [4, 16, 32, 64, 128] {
let nullable_choices: &[bool] = if version == LanceFileVersion::V2_0 {
&[false]
} else {
&[false, true]
};
for nullable in nullable_choices {
let func_name = format!(
"{:?}_{}_v{}_null{}",
data_type, dimension, version, nullable
)
.to_lowercase();
group.throughput(criterion::Throughput::Bytes(NUM_BYTES));
group.bench_function(func_name, |b| {
let num_rows =
NUM_BYTES / (dimension * data_type.primitive_width().unwrap() as u64);
let mut arraygen =
lance_datagen::array::rand_type(&DataType::FixedSizeList(
Arc::new(Field::new("item", data_type.clone(), true)),
dimension as i32,
));
if *nullable {
arraygen = arraygen.with_random_nulls(0.5);
}
let data = lance_datagen::gen_batch()
.anon_col(arraygen)
.into_batch_rows(lance_datagen::RowCount::from(num_rows))
.unwrap();
let lance_schema = Arc::new(
lance_core::datatypes::Schema::try_from(data.schema().as_ref())
.unwrap(),
);
let encoding_strategy = default_encoding_strategy(version);
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
version,
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
});
}
}
}
}
}
fn bench_decode_str_with_dict_encoding(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_primitive");
const NUM_ROWS: u64 = 100000;
let data_type = DataType::Utf8;
let string_data = lance_datagen::gen_batch()
.anon_col(lance_datagen::array::rand_type(&DataType::Utf8))
.into_batch_rows(lance_datagen::RowCount::from(20))
.unwrap();
group.throughput(criterion::Throughput::Bytes(
NUM_ROWS * std::mem::size_of::<u32>() as u64 + string_data.get_array_memory_size() as u64,
));
let func_name = format!("{:?}", data_type).to_lowercase();
group.bench_function(func_name, |b| {
let string_array = string_data.column(0);
let mut rng = rand::rng();
let integer_arr: Vec<u32> = (0..100_000).map(|_| rng.random_range(0..20)).collect();
let integer_array = UInt32Array::from(integer_arr);
let mapped_strings = take(string_array, &integer_array, None).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"string",
DataType::Utf8,
false,
)]));
let data = RecordBatch::try_new(schema, vec![Arc::new(mapped_strings)]).unwrap();
let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::default());
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
LanceFileVersion::default(),
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
});
}
fn bench_decode_packed_struct(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_primitive");
const NUM_ROWS: u64 = 10000;
let size_bytes =
((6 * std::mem::size_of::<i32>() as u64) + std::mem::size_of::<f32>() as u64) * NUM_ROWS;
group.throughput(criterion::Throughput::Bytes(size_bytes));
let func_name = "struct";
group.bench_function(func_name, |b| {
let fields = vec![
Arc::new(Field::new("int_field", DataType::Int32, false)),
Arc::new(Field::new("float_field", DataType::Float32, false)),
Arc::new(Field::new(
"fsl_field",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 5),
false,
)),
]
.into();
let data = lance_datagen::gen_batch()
.anon_col(lance_datagen::array::rand_type(&DataType::Struct(fields)))
.into_batch_rows(lance_datagen::RowCount::from(NUM_ROWS))
.unwrap();
let schema = data.schema();
let new_fields: Vec<Arc<Field>> = schema
.fields()
.iter()
.map(|field| {
if matches!(field.data_type(), &DataType::Struct(_)) {
let mut metadata = HashMap::new();
metadata.insert("packed".to_string(), "true".to_string());
let field =
Field::new(field.name(), field.data_type().clone(), field.is_nullable());
Arc::new(field.with_metadata(metadata))
} else {
field.clone()
}
})
.collect();
let new_schema = Schema::new(new_fields);
let data =
RecordBatch::try_new(Arc::new(new_schema.clone()), data.columns().to_vec()).unwrap();
let lance_schema = Arc::new(lance_core::datatypes::Schema::try_from(&new_schema).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::default());
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
LanceFileVersion::default(),
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
});
}
#[allow(dead_code)]
fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_primitive");
const NUM_ROWS: u64 = 10000;
const NUM_BYTES: u64 = NUM_ROWS * 16;
group.throughput(criterion::Throughput::Bytes(NUM_BYTES));
let func_name = "fixed-utf8".to_string();
group.bench_function(func_name, |b| {
let string_data = lance_datagen::gen_batch()
.anon_col(lance_datagen::array::rand_type(&DataType::Utf8))
.into_batch_rows(lance_datagen::RowCount::from(10000))
.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"string",
DataType::Utf8,
false,
)]));
let data = RecordBatch::try_new(schema, string_data.columns().to_vec()).unwrap();
let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::default());
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
LanceFileVersion::default(),
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
});
}
fn bench_decode_compressed(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_compressed");
const NUM_ROWS: usize = 5_000_000;
const NUM_COLUMNS: usize = 10;
let array: Arc<dyn arrow_array::Array> = Arc::new(arrow_array::StringArray::from_iter_values(
(0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)),
));
for compression in ["zstd", "lz4"] {
let mut metadata = HashMap::new();
metadata.insert(
"lance-encoding:compression".to_string(),
compression.to_string(),
);
metadata.insert(
"lance-encoding:dict-divisor".to_string(),
"100000".to_string(),
);
metadata.insert(
"lance-encoding:structural-encoding".to_string(),
"miniblock".to_string(),
);
let fields: Vec<Field> = (0..NUM_COLUMNS)
.map(|i| {
Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone())
})
.collect();
let columns: Vec<Arc<dyn arrow_array::Array>> =
(0..NUM_COLUMNS).map(|_| array.clone()).collect();
let schema = Arc::new(Schema::new(fields));
let data = RecordBatch::try_new(schema.clone(), columns).unwrap();
let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2);
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
group.throughput(criterion::Throughput::Elements(
(NUM_ROWS * NUM_COLUMNS) as u64,
));
group.bench_function(
format!("{}_strings_{}cols", compression, NUM_COLUMNS),
|b| {
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
LanceFileVersion::V2_2,
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
},
);
}
}
fn bench_decode_compressed_parallel(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_compressed_parallel");
const NUM_ROWS: u64 = 1_000_000;
const NUM_COLUMNS: usize = 10;
const BATCH_SIZE: u32 = 100_000;
let array: Arc<dyn arrow_array::Array> = Arc::new(arrow_array::StringArray::from_iter_values(
(0..NUM_ROWS as usize).map(|i| format!("prefix_that_compresses_well_{}", i)),
));
for compression in ["zstd", "lz4"] {
let mut metadata = HashMap::new();
metadata.insert(
"lance-encoding:compression".to_string(),
compression.to_string(),
);
metadata.insert(
"lance-encoding:dict-divisor".to_string(),
"100000".to_string(),
);
metadata.insert(
"lance-encoding:structural-encoding".to_string(),
"miniblock".to_string(),
);
let fields: Vec<Field> = (0..NUM_COLUMNS)
.map(|i| {
Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone())
})
.collect();
let columns: Vec<Arc<dyn arrow_array::Array>> =
(0..NUM_COLUMNS).map(|_| array.clone()).collect();
let schema = Arc::new(Schema::new(fields));
let data = RecordBatch::try_new(schema.clone(), columns).unwrap();
let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2);
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();
let encoded = Arc::new(encoded);
for parallelism in [1, 8] {
group.throughput(criterion::Throughput::Elements(
NUM_ROWS * NUM_COLUMNS as u64,
));
group.bench_function(
format!(
"{}_{}cols_parallel_{}",
compression, NUM_COLUMNS, parallelism
),
|b| {
b.iter(|| {
rt.block_on(async {
let io_scheduler = Arc::new(lance_encoding::BufferScheduler::new(
encoded.data.clone(),
))
as Arc<dyn lance_encoding::EncodingsIo>;
let cache = Arc::new(LanceCache::no_cache());
let filter = FilterExpression::no_filter();
let mut decode_scheduler = DecodeBatchScheduler::try_new(
encoded.schema.as_ref(),
&encoded.top_level_columns,
&encoded.page_table,
&vec![],
encoded.num_rows,
Arc::<DecoderPlugins>::default(),
io_scheduler.clone(),
cache,
&filter,
&DecoderConfig::default(),
)
.await
.unwrap();
let (tx, rx) = unbounded_channel();
decode_scheduler.schedule_range(
0..encoded.num_rows,
&filter,
tx,
io_scheduler,
);
let decode_stream = create_decode_stream(
&encoded.schema,
encoded.num_rows,
BATCH_SIZE,
true, false,
false,
rx,
)
.unwrap();
let batches: Vec<_> = decode_stream
.map(|task| task.task)
.buffered(parallelism)
.collect()
.await;
let total_rows: usize =
batches.iter().map(|b| b.as_ref().unwrap().num_rows()).sum();
assert_eq!(total_rows, NUM_ROWS as usize);
})
})
},
);
}
}
}
#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10)
.with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None)));
targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct,
bench_decode_str_with_fixed_size_binary_encoding, bench_decode_compressed,
bench_decode_compressed_parallel);
#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10);
targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct,
bench_decode_compressed, bench_decode_compressed_parallel);
criterion_main!(benches);