use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use llkv_column_map::ROW_ID_COLUMN_NAME;
use llkv_column_map::{
ColumnStore,
store::debug::{ColumnStoreDebug, discover_all_pks},
};
use llkv_storage::{pager::MemPager, types::PhysicalKey};
use llkv_types::ids::LogicalFieldId;
const BATCHES: usize = 4;
const C1_ROWS: usize = 500;
const C2_ROWS: usize = 500;
const C3_ROWS: usize = 5_000;
use arrow::array::{Array, ArrayRef, BinaryBuilder, UInt32Array, UInt64Array};
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
static NEXT_ROW_ID: AtomicU64 = AtomicU64::new(0);
fn build_put_for_col1(start: usize, end: usize) -> Option<(LogicalFieldId, ArrayRef)> {
let s = start.min(C1_ROWS);
let e = end.min(C1_ROWS);
if s >= e {
return None;
}
let vals: Vec<u32> = (s..e).map(|i| i as u32).collect();
Some((
LogicalFieldId::for_user_table_0(1),
Arc::new(UInt32Array::from(vals)) as ArrayRef,
))
}
fn build_put_for_col2(start: usize, end: usize) -> Option<(LogicalFieldId, ArrayRef)> {
let s = start.min(C2_ROWS);
let e = end.min(C2_ROWS);
if s >= e {
return None;
}
let mut b = BinaryBuilder::new();
for i in s..e {
let len = i % 21 + 1; b.append_value(vec![b'A' + (i % 26) as u8; len]);
}
Some((
LogicalFieldId::for_user_table_0(2),
Arc::new(b.finish()) as ArrayRef,
))
}
fn build_put_for_col3(start: usize, end: usize) -> Option<(LogicalFieldId, ArrayRef)> {
let s = start.min(C3_ROWS);
let e = end.min(C3_ROWS);
if s >= e {
return None;
}
let vals: Vec<u64> = (s..e).map(|_| 0x55u64).collect(); Some((
LogicalFieldId::for_user_table_0(3),
Arc::new(UInt64Array::from(vals)) as ArrayRef,
))
}
fn batch_from_pairs(pairs: &[(LogicalFieldId, ArrayRef)]) -> RecordBatch {
let fields: Vec<Field> = pairs
.iter()
.enumerate()
.map(|(i, (fid, arr))| {
let mut md = std::collections::HashMap::new();
md.insert(
llkv_column_map::store::FIELD_ID_META_KEY.to_string(),
u64::from(*fid).to_string(),
);
Field::new(format!("c{i}"), arr.data_type().clone(), false).with_metadata(md)
})
.collect();
let num_rows = if pairs.is_empty() {
0
} else {
pairs[0].1.len()
};
let row_id_field = Field::new(
ROW_ID_COLUMN_NAME,
arrow::datatypes::DataType::UInt64,
false,
);
let start_row_id = NEXT_ROW_ID.fetch_add(num_rows as u64, Ordering::Relaxed);
let end_row_id = start_row_id + num_rows as u64;
let row_id_array =
Arc::new(UInt64Array::from_iter_values(start_row_id..end_row_id)) as ArrayRef;
let mut final_fields = vec![row_id_field];
final_fields.extend(fields);
let mut final_arrays = vec![row_id_array];
final_arrays.extend(pairs.iter().map(|(_, a)| Arc::clone(a)));
let schema = Arc::new(Schema::new(final_fields));
RecordBatch::try_new(schema, final_arrays).unwrap()
}
#[allow(clippy::print_stdout)]
fn main() -> Result<(), Box<dyn std::error::Error>> {
let pager = Arc::new(MemPager::default());
let store = ColumnStore::open(Arc::clone(&pager)).unwrap();
let mut created_in_batch: HashMap<PhysicalKey, usize> = HashMap::new();
for pk in discover_all_pks(pager.as_ref()) {
created_in_batch.insert(pk, 0);
}
let rows_per_batch = C3_ROWS.div_ceil(BATCHES);
println!(
"Ingesting col1={} rows, col2={} rows, col3={} rows in {} batches \
({} rows/batch on col3)...",
C1_ROWS, C2_ROWS, C3_ROWS, BATCHES, rows_per_batch
);
let t_total = Instant::now();
for b in 0..BATCHES {
let start = b * rows_per_batch;
let end = (start + rows_per_batch).min(C3_ROWS);
let mut pairs: Vec<(LogicalFieldId, ArrayRef)> = Vec::new();
if let Some(p) = build_put_for_col1(start, end) {
pairs.push(p);
}
if let Some(p) = build_put_for_col2(start, end) {
pairs.push(p);
}
if let Some(p) = build_put_for_col3(start, end) {
pairs.push(p);
}
if pairs.is_empty() {
continue;
}
let mut by_len: HashMap<usize, Vec<(LogicalFieldId, ArrayRef)>> = HashMap::new();
for (fid, arr) in pairs {
by_len.entry(arr.len()).or_default().push((fid, arr));
}
let t_batch = Instant::now();
for (_len, group) in by_len {
let batch = batch_from_pairs(&group);
store.append(&batch).unwrap();
}
let dt = t_batch.elapsed();
println!(" batch {}: rows [{}..{}) in {:?}", b + 1, start, end, dt);
for pk in discover_all_pks(pager.as_ref()) {
created_in_batch.entry(pk).or_insert(b + 1);
}
}
println!("Total ingest time: {:?}", t_total.elapsed());
use llkv_column_map::store::scan::{
PrimitiveSortedVisitor, PrimitiveSortedWithRowIdsVisitor, PrimitiveVisitor,
PrimitiveWithRowIdsVisitor, ScanOptions,
};
struct Count; impl PrimitiveVisitor for Count {
fn u64_chunk(&mut self, a: &UInt64Array) {
ROWS.fetch_add(a.len() as u64, Ordering::Relaxed);
}
fn u32_chunk(&mut self, a: &UInt32Array) {
ROWS.fetch_add(a.len() as u64, Ordering::Relaxed);
}
}
impl PrimitiveSortedVisitor for Count {}
impl PrimitiveWithRowIdsVisitor for Count {}
impl PrimitiveSortedWithRowIdsVisitor for Count {}
static ROWS: AtomicU64 = AtomicU64::new(0);
for id in [1u32, 2, 3] {
let field_id = LogicalFieldId::for_user_table_0(id);
ROWS.store(0, Ordering::Relaxed);
let mut v = Count;
match store.scan(field_id, ScanOptions::default(), &mut v) {
Ok(()) => {
let rows = ROWS.load(Ordering::Relaxed) as usize;
println!(
"col={:?} -> total primitive rows scanned: {}",
field_id, rows
);
}
Err(_) => {
println!(
"col={:?} -> scan not supported for this dtype in this example",
field_id
);
}
}
}
let summary_table = store.render_storage_as_formatted_string();
println!("\n==== STORAGE LAYOUT ====\n{}", summary_table);
let dot = store.render_storage_as_dot(&created_in_batch);
std::fs::write("storage_layout.dot", dot)?;
println!("Wrote storage_layout.dot (single graph, nodes colored by batch)");
Ok(())
}