use futures_util::StreamExt;
use lightstream::enums::BufferChunkSize;
use lightstream::enums::IPCMessageProtocol;
use lightstream::models::readers::ipc::table_stream_reader::TableStreamReader64;
use lightstream::models::streams::disk::DiskByteStream;
use lightstream::models::writers::ipc::table_stream_writer::TableStreamWriter;
use minarrow::ffi::arrow_dtype::ArrowType;
use minarrow::{
Array, Buffer, Field, FieldArray, IntegerArray, NumericArray, StringArray, Table, TextArray,
Vec64,
};
use std::path::Path;
use std::sync::Arc;
use tempfile::tempdir;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("TableStreamWriter Example");
println!("========================");
let tables = create_sample_tables();
println!("Created {} tables for streaming", tables.len());
let temp_dir = tempdir()?;
let stream_path = temp_dir.path().join("stream_output.arrow");
println!("\n1. Streaming Encoding with Frame Processing");
stream_encode_tables(&tables, &stream_path).await?;
println!("\n2. Reading Back Streamed Data");
read_streamed_data(&stream_path).await?;
println!("\n✓ TableStreamWriter example completed successfully!");
Ok(())
}
fn create_sample_tables() -> Vec<Table> {
let mut tables = Vec::new();
for batch_num in 0..3 {
let start_id = batch_num * 1000;
let n_rows = 1000;
let int_data: Vec<i32> = (start_id..start_id + n_rows).map(|i| i as i32).collect();
let int_array = Array::NumericArray(NumericArray::Int32(Arc::new(IntegerArray {
data: Buffer::from(Vec64::from_slice(&int_data)),
null_mask: None,
})));
let int_field = FieldArray::new(
Field {
name: "batch_id".into(),
dtype: ArrowType::Int32,
nullable: false,
metadata: Default::default(),
},
int_array,
);
let individual_strings: Vec<String> = (0..n_rows)
.map(|i| format!("batch_{}_item_{:04}", batch_num, i))
.collect();
let mut str_data = Vec::new();
let mut offsets = Vec::with_capacity(n_rows + 1);
offsets.push(0u32);
for s in &individual_strings {
str_data.extend_from_slice(s.as_bytes());
offsets.push(str_data.len() as u32);
}
let str_array = Array::TextArray(TextArray::String32(Arc::new(StringArray::new(
Buffer::from(Vec64::from_slice(&str_data)),
None,
Buffer::from(Vec64::from_slice(&offsets)),
))));
let str_field = FieldArray::new(
Field {
name: "description".into(),
dtype: ArrowType::String,
nullable: false,
metadata: Default::default(),
},
str_array,
);
let table = Table {
name: format!("batch_{}", batch_num),
n_rows,
cols: vec![int_field, str_field],
};
tables.push(table);
}
tables
}
async fn stream_encode_tables(
tables: &[Table],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
let schema: Vec<Field> = tables[0]
.cols
.iter()
.map(|col| (*col.field).clone())
.collect();
let mut stream_writer = TableStreamWriter::<Vec64<u8>>::new(schema, IPCMessageProtocol::Stream);
println!(" Writing {} tables to stream...", tables.len());
for (i, table) in tables.iter().enumerate() {
stream_writer.write(table)?;
println!(" Added table {} to stream ('{}')", i + 1, table.name);
}
stream_writer.finish()?;
println!(" Stream finished, processing frames...");
let mut file = File::create(output_path).await?;
let mut frame_count = 0;
let mut total_bytes = 0;
while let Some(frame_result) = stream_writer.next_frame() {
let frame = frame_result?;
let frame_size = frame.len();
file.write_all(frame.as_ref()).await?;
frame_count += 1;
total_bytes += frame_size;
println!(" Processed frame {}: {} bytes", frame_count, frame_size);
}
file.flush().await?;
println!(
" ✓ Wrote {} frames, {} total bytes",
frame_count, total_bytes
);
assert!(stream_writer.is_finished(), "Stream should be finished");
Ok(())
}
async fn read_streamed_data(stream_path: &Path) -> Result<(), Box<dyn std::error::Error>> {
println!(" Opening streamed file for reading...");
let disk_stream = DiskByteStream::open(stream_path, BufferChunkSize::Custom(64 * 1024)).await?;
let mut reader = TableStreamReader64::new(disk_stream, 64 * 1024, IPCMessageProtocol::Stream);
let mut table_count = 0;
let mut total_rows = 0;
while let Some(result) = reader.next().await {
let table = result?;
table_count += 1;
total_rows += table.n_rows;
println!(
" Read table {}: '{}' with {} rows",
table_count, table.name, table.n_rows
);
if let Array::NumericArray(NumericArray::Int32(int_arr)) = &table.cols[0].array {
let sample_ids = &int_arr.data.as_ref()[0..3.min(table.n_rows)];
println!(" Sample IDs: {:?}", sample_ids);
}
if let Array::TextArray(TextArray::String32(str_arr)) = &table.cols[1].array {
if table.n_rows > 0 {
let first_offset = str_arr.offsets[0] as usize;
let second_offset = str_arr.offsets[1] as usize;
let first_str = std::str::from_utf8(&str_arr.data[first_offset..second_offset])?;
println!(" Sample description: '{}'", first_str);
}
}
}
println!(
" ✓ Read {} tables with {} total rows",
table_count, total_rows
);
Ok(())
}