use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use arrow::array::{Int64Array, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use llkv_column_map::store::{Projection, ROW_ID_COLUMN_NAME};
use llkv_expr::{Expr, Filter, Operator};
use llkv_storage::pager::MemPager;
use llkv_table::Table;
use llkv_table::table::ScanStreamOptions;
use llkv_table::types::{FieldId, RowId, TableId};
use llkv_types::LogicalFieldId;
use std::ops::Bound;
fn field_with_fid(name: &str, dt: DataType, fid: FieldId, nullable: bool) -> Field {
Field::new(name, dt, nullable).with_metadata(HashMap::from([(
llkv_table::constants::FIELD_ID_META_KEY.to_string(),
fid.to_string(),
)]))
}
#[allow(clippy::print_stdout)]
fn benchmark_scenario(
name: &str,
table: &Table<MemPager>,
projections: &[Projection],
filter_expr: &Expr<FieldId>,
options: ScanStreamOptions,
) {
println!("\n=== {} ===", name);
let mut batch_count = 0;
let mut total_rows = 0;
let mut total_values: i64 = 0;
let start = Instant::now();
match table.scan_stream(projections, filter_expr, options, |batch| {
batch_count += 1;
total_rows += batch.num_rows();
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
for i in 0..arr.len() {
total_values += arr.value(i);
}
}) {
Ok(()) => {
let elapsed = start.elapsed();
println!(" Time: {:?}", elapsed);
println!(" Batches: {}", batch_count);
println!(" Total rows: {}", total_rows);
println!(" Sum: {}", total_values);
println!(
" Throughput: {:.0} rows/sec",
total_rows as f64 / elapsed.as_secs_f64()
);
if elapsed.as_millis() < 5 {
println!(" 🚀 VERY FAST - likely using optimized path");
} else if elapsed.as_millis() < 20 {
println!(" ✅ Fast - good performance");
} else {
println!(" ⚠️ Slower - might be using materialization");
}
}
Err(e) => {
println!(" ❌ Failed: {}", e);
}
}
}
#[allow(clippy::print_stdout)]
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Table Layer Performance Optimization Benchmark");
println!("============================================");
let pager = Arc::new(MemPager::default());
const TABLE_ID: TableId = 1;
const FIELD_A: FieldId = 10;
const FIELD_B: FieldId = 11;
let table = Table::from_id(TABLE_ID, pager)?;
let row_count = 1_000_000;
let rows: Vec<RowId> = (1..=row_count).collect();
let values_a: Vec<i64> = (0..row_count).map(|i| (i * 2) as i64).collect();
let values_b: Vec<i64> = (0..row_count).map(|i| (i * 3) as i64).collect();
println!("Creating {} rows of test data...", row_count);
let schema = Arc::new(Schema::new(vec![
Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
field_with_fid("col_a", DataType::Int64, FIELD_A, false),
field_with_fid("col_b", DataType::Int64, FIELD_B, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(UInt64Array::from(rows)),
Arc::new(Int64Array::from(values_a)),
Arc::new(Int64Array::from(values_b)),
],
)?;
table.append(&batch)?;
println!("Data inserted successfully!");
let lfid_a = LogicalFieldId::for_user(TABLE_ID, FIELD_A);
let lfid_b = LogicalFieldId::for_user(TABLE_ID, FIELD_B);
let single_proj = vec![Projection::with_alias(lfid_a, "col_a")];
let unbounded_filter = Expr::Pred(Filter {
field_id: FIELD_A,
op: Operator::Range {
lower: Bound::Unbounded,
upper: Bound::Unbounded,
},
});
let default_options = ScanStreamOptions {
include_nulls: false,
order: None,
row_id_filter: None,
include_row_ids: true,
};
benchmark_scenario(
"Single Column, Unbounded Scan",
&table,
&single_proj,
&unbounded_filter,
default_options.clone(),
);
let multi_proj = vec![
Projection::with_alias(lfid_a, "col_a"),
Projection::with_alias(lfid_b, "col_b"),
];
benchmark_scenario(
"Multi-Column Scan",
&table,
&multi_proj,
&unbounded_filter,
default_options.clone(),
);
let with_nulls_options = ScanStreamOptions {
include_nulls: true,
order: None,
row_id_filter: None,
include_row_ids: true,
};
benchmark_scenario(
"Single Column with Nulls",
&table,
&single_proj,
&unbounded_filter,
with_nulls_options.clone(),
);
let bounded_filter = Expr::Pred(Filter {
field_id: FIELD_A,
op: Operator::Range {
lower: Bound::Included(1000.into()),
upper: Bound::Excluded(2000.into()),
},
});
#[allow(clippy::print_stdout)]
benchmark_scenario(
"Single Column, Bounded Filter",
&table,
&single_proj,
&bounded_filter,
default_options,
);
println!("\n=== Summary ===");
println!("This benchmark shows different optimization paths in the Table layer:");
println!("• Single column unbounded scans should be fastest (direct streaming)");
println!("• Multi-column scans use materialization but are still fast");
println!("• Complex filters and null handling add overhead");
println!("• All scenarios should be significantly faster than 10x overhead we had before");
Ok(())
}