use lightstream::enums::BufferChunkSize;
use lightstream::enums::IPCMessageProtocol;
use lightstream::models::readers::ipc::table_reader::TableReader;
use lightstream::models::streams::disk::DiskByteStream;
use lightstream::models::writers::ipc::table_stream_writer::TableStreamWriter;
use minarrow::ffi::arrow_dtype::ArrowType;
use minarrow::{
Array, Buffer, Field, FieldArray, FloatArray, IntegerArray, NumericArray, StringArray, Table,
TextArray, Vec64,
};
use std::path::Path;
use std::sync::Arc;
use tempfile::tempdir;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Standard TableReader Example");
println!("===========================");
let temp_dir = tempdir()?;
println!("\n1. Reading All Tables");
let all_tables_path = temp_dir.path().join("all_tables.stream");
read_all_tables_example(&all_tables_path).await?;
println!("\n2. Reading Limited Number of Tables");
let limited_path = temp_dir.path().join("limited.stream");
read_limited_tables_example(&limited_path).await?;
println!("\n3. Combining to SuperTable");
let super_table_path = temp_dir.path().join("super_table.stream");
super_table_example(&super_table_path).await?;
println!("\n4. Combining to Single Table");
let single_table_path = temp_dir.path().join("single_table.stream");
single_table_example(&single_table_path).await?;
println!("\n✓ Standard TableReader example completed successfully!");
Ok(())
}
async fn read_all_tables_example(file_path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let source_tables = create_test_tables(5);
write_test_stream(&source_tables, file_path).await?;
println!(" Created stream with {} tables", source_tables.len());
let disk_stream = DiskByteStream::open(file_path, BufferChunkSize::Custom(8192)).await?;
let reader = TableReader::new(disk_stream, 8192, IPCMessageProtocol::Stream);
let start = std::time::Instant::now();
let tables = reader.read_all_tables().await?;
let duration = start.elapsed();
println!(" Read {} tables in {:?}", tables.len(), duration);
let total_rows: usize = tables.iter().map(|t| t.n_rows).sum();
let expected_rows: usize = source_tables.iter().map(|t| t.n_rows).sum();
println!(" Total rows: {} (expected: {})", total_rows, expected_rows);
assert_eq!(total_rows, expected_rows);
for (i, table) in tables.iter().enumerate() {
println!(
" Table {}: '{}' with {} rows",
i + 1,
table.name,
table.n_rows
);
if let Array::NumericArray(NumericArray::Int32(int_arr)) = &table.cols[0].array {
if table.n_rows > 0 {
println!(" First ID: {}", int_arr.data.as_ref()[0]);
}
}
}
println!(" ✓ Successfully read all tables");
Ok(())
}
async fn read_limited_tables_example(file_path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let source_tables = create_test_tables(10);
write_test_stream(&source_tables, file_path).await?;
println!(" Created stream with {} tables", source_tables.len());
let disk_stream = DiskByteStream::open(file_path, BufferChunkSize::Custom(8192)).await?;
let reader = TableReader::new(disk_stream, 8192, IPCMessageProtocol::Stream);
let start = std::time::Instant::now();
let tables = reader.read_tables(Some(3)).await?;
let duration = start.elapsed();
println!(
" Read {} tables (limited to 3) in {:?}",
tables.len(),
duration
);
assert_eq!(tables.len(), 3, "Should have read exactly 3 tables");
for (i, table) in tables.iter().enumerate() {
println!(
" Table {}: '{}' with {} rows",
i + 1,
table.name,
table.n_rows
);
assert_eq!(table.name, format!("test_batch_{}", i));
}
println!(" ✓ Successfully read limited number of tables");
Ok(())
}
async fn super_table_example(file_path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let source_tables = create_varying_size_tables();
write_test_stream(&source_tables, file_path).await?;
println!(
" Created stream with {} tables of varying sizes",
source_tables.len()
);
let disk_stream = DiskByteStream::open(file_path, BufferChunkSize::Custom(8192)).await?;
let reader = TableReader::new(disk_stream, 8192, IPCMessageProtocol::Stream);
let start = std::time::Instant::now();
let super_table = reader
.read_to_super_table(Some("CombinedData".to_string()), None)
.await?;
let duration = start.elapsed();
println!(
" Created SuperTable '{}' in {:?}",
super_table.name, duration
);
println!(" Total rows: {}", super_table.n_rows);
println!(" Number of batches: {}", super_table.batches.len());
println!(" Schema fields: {}", super_table.schema.len());
for (i, batch) in super_table.batches.iter().enumerate() {
println!(" Batch {}: {} rows", i + 1, batch.n_rows);
}
for field in &super_table.schema {
println!(" Field: '{}' ({:?})", field.name, field.dtype);
}
println!(" ✓ Successfully created SuperTable preserving batch structure");
Ok(())
}
async fn single_table_example(file_path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let source_tables = create_test_tables(4);
let expected_total_rows: usize = source_tables.iter().map(|t| t.n_rows).sum();
write_test_stream(&source_tables, file_path).await?;
println!(
" Created stream with {} tables ({} total rows)",
source_tables.len(),
expected_total_rows
);
let disk_stream = DiskByteStream::open(file_path, BufferChunkSize::Custom(8192)).await?;
let reader = TableReader::new(disk_stream, 8192, IPCMessageProtocol::Stream);
let start = std::time::Instant::now();
let combined_table = reader
.combine_to_table(Some("CombinedBatches".to_string()))
.await?;
let duration = start.elapsed();
println!(
" Created combined Table '{}' in {:?}",
combined_table.name, duration
);
println!(
" Total rows: {} (expected: {})",
combined_table.n_rows, expected_total_rows
);
println!(" Number of columns: {}", combined_table.cols.len());
assert_eq!(
combined_table.n_rows, expected_total_rows,
"Row count should match"
);
if let Array::NumericArray(NumericArray::Int32(int_arr)) = &combined_table.cols[0].array {
let sample_size = 10.min(combined_table.n_rows);
let sample_data = &int_arr.data.as_ref()[0..sample_size];
println!(" First {} IDs: {:?}", sample_size, sample_data);
if combined_table.n_rows > 0 {
let first_id = int_arr.data.as_ref()[0];
let last_id = int_arr.data.as_ref()[combined_table.n_rows - 1];
println!(" ID range: {} to {}", first_id, last_id);
}
}
println!(" ✓ Successfully combined all batches into single Table");
Ok(())
}
fn create_test_tables(num_tables: usize) -> Vec<Table> {
let mut tables = Vec::new();
let mut global_id = 0;
for batch_num in 0..num_tables {
let n_rows = 1000;
let int_data: Vec<i32> = (global_id..global_id + n_rows).map(|i| i as i32).collect();
let int_array = Array::NumericArray(NumericArray::Int32(Arc::new(IntegerArray {
data: Buffer::from(Vec64::from_slice(&int_data)),
null_mask: None,
})));
let int_field = FieldArray::new(
Field {
name: "id".into(),
dtype: ArrowType::Int32,
nullable: false,
metadata: Default::default(),
},
int_array,
);
let float_data: Vec<f64> = (0..n_rows)
.map(|i| (i as f64 + batch_num as f64 * 10000.0) * 0.001)
.collect();
let float_array = Array::NumericArray(NumericArray::Float64(Arc::new(FloatArray {
data: Buffer::from(Vec64::from_slice(&float_data)),
null_mask: None,
})));
let float_field = FieldArray::new(
Field {
name: "value".into(),
dtype: ArrowType::Float64,
nullable: false,
metadata: Default::default(),
},
float_array,
);
let individual_strings: Vec<String> = (0..n_rows)
.map(|i| format!("batch{}_item{:04}", batch_num, i))
.collect();
let mut str_data = Vec::new();
let mut offsets = Vec::with_capacity(n_rows + 1);
offsets.push(0u32);
for s in &individual_strings {
str_data.extend_from_slice(s.as_bytes());
offsets.push(str_data.len() as u32);
}
let str_array = Array::TextArray(TextArray::String32(Arc::new(StringArray::new(
Buffer::from(Vec64::from_slice(&str_data)),
None,
Buffer::from(Vec64::from_slice(&offsets)),
))));
let str_field = FieldArray::new(
Field {
name: "label".into(),
dtype: ArrowType::String,
nullable: false,
metadata: Default::default(),
},
str_array,
);
let table = Table {
name: format!("test_batch_{}", batch_num),
n_rows,
cols: vec![int_field, float_field, str_field],
};
tables.push(table);
global_id += n_rows;
}
tables
}
fn create_varying_size_tables() -> Vec<Table> {
let sizes = vec![500, 1500, 800, 2000, 300]; let mut tables = Vec::new();
for (batch_num, &n_rows) in sizes.iter().enumerate() {
let int_data: Vec<i32> = (0..n_rows)
.map(|i| (batch_num as i32) * 10000 + i as i32)
.collect();
let int_array = Array::NumericArray(NumericArray::Int32(Arc::new(IntegerArray {
data: Buffer::from(Vec64::from_slice(&int_data)),
null_mask: None,
})));
let int_field = FieldArray::new(
Field {
name: "batch_value".into(),
dtype: ArrowType::Int32,
nullable: false,
metadata: Default::default(),
},
int_array,
);
let table = Table {
name: format!("varying_batch_{}", batch_num),
n_rows,
cols: vec![int_field],
};
tables.push(table);
}
tables
}
async fn write_test_stream(
tables: &[Table],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
let schema: Vec<Field> = tables[0]
.cols
.iter()
.map(|col| (*col.field).clone())
.collect();
let mut stream_writer = TableStreamWriter::<Vec64<u8>>::new(schema, IPCMessageProtocol::Stream);
for table in tables {
stream_writer.write(table)?;
}
stream_writer.finish()?;
let mut file = File::create(output_path).await?;
while let Some(frame_result) = stream_writer.next_frame() {
let frame = frame_result?;
file.write_all(frame.as_ref()).await?;
}
file.flush().await?;
Ok(())
}