#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_lossless,
clippy::uninlined_format_args,
clippy::doc_markdown,
clippy::too_many_lines,
clippy::redundant_clone,
clippy::use_debug,
clippy::unnecessary_debug_formatting
)]
use std::sync::Arc;
use alimentar::{
streaming::{ChainedSource, MemorySource, StreamingDataset},
ArrowDataset,
};
use arrow::{
array::{Float64Array, Int32Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
fn create_large_batch(start: i32, count: usize) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Float64, false),
]));
let ids: Vec<i32> = (start..start + count as i32).collect();
let values: Vec<f64> = ids.iter().map(|&i| i as f64 * 0.1).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Float64Array::from(values)),
],
)
.expect("batch creation failed")
}
fn main() -> alimentar::Result<()> {
println!("=== Alimentar Streaming Dataset Example ===\n");
println!("1. Streaming from MemorySource");
let batches = vec![
create_large_batch(0, 1000),
create_large_batch(1000, 1000),
create_large_batch(2000, 1000),
];
let source = MemorySource::new(batches.clone())?;
let dataset = StreamingDataset::new(Box::new(source), 4);
println!(" Schema: {:?}", dataset.schema());
if let Some(hint) = dataset.size_hint() {
println!(" Size hint: {} rows", hint);
}
let mut total_rows = 0;
let mut batch_count = 0;
for batch in dataset {
total_rows += batch.num_rows();
batch_count += 1;
}
println!(
" Processed {} batches, {} total rows",
batch_count, total_rows
);
println!("\n2. StreamingDataset with prefetch=2");
let source = MemorySource::new(batches.clone())?;
let dataset = StreamingDataset::new(Box::new(source), 4).prefetch(2);
let mut batch_count = 0;
for batch in dataset {
batch_count += 1;
if batch_count <= 3 {
println!(" Batch {}: {} rows", batch_count, batch.num_rows());
}
}
println!(" ... ({} batches total)", batch_count);
println!("\n3. Streaming from Parquet file");
let temp_dir = std::env::temp_dir();
let parquet_path = temp_dir.join("streaming_example.parquet");
let sample_dataset = ArrowDataset::from_batch(create_large_batch(0, 5000))?;
sample_dataset.to_parquet(&parquet_path)?;
println!(" Created sample Parquet file: {:?}", parquet_path);
let streaming = StreamingDataset::from_parquet(&parquet_path, 512)?;
println!(" Batch size: 512 rows");
if let Some(hint) = streaming.size_hint() {
println!(" Total rows (from size_hint): {}", hint);
}
let mut batch_count = 0;
let mut total_rows = 0;
for batch in streaming {
total_rows += batch.num_rows();
batch_count += 1;
}
println!(" Streamed {} batches, {} rows", batch_count, total_rows);
println!("\n4. Chaining multiple data sources");
let source1 = MemorySource::new(vec![create_large_batch(0, 500)])?;
let source2 = MemorySource::new(vec![create_large_batch(500, 500)])?;
let source3 = MemorySource::new(vec![create_large_batch(1000, 500)])?;
let chained = ChainedSource::new(vec![
Box::new(source1),
Box::new(source2),
Box::new(source3),
])?;
let dataset = StreamingDataset::new(Box::new(chained), 4);
println!(" Chained 3 sources");
let mut total_rows = 0;
for batch in dataset {
total_rows += batch.num_rows();
}
println!(" Total rows from chain: {}", total_rows);
println!("\n5. Processing large data in streaming fashion");
let large_batches: Vec<_> = (0..10)
.map(|i| create_large_batch(i * 1000, 1000))
.collect();
let source = MemorySource::new(large_batches)?;
let dataset = StreamingDataset::new(Box::new(source), 2).prefetch(2);
println!(" Simulating ML preprocessing pipeline...");
let mut processed_count = 0;
let mut sum = 0.0;
for batch in dataset {
if let Some(values) = batch.column(1).as_any().downcast_ref::<Float64Array>() {
for val in values.iter().flatten() {
sum += val;
processed_count += 1;
}
}
}
println!(" Processed {} values", processed_count);
println!(" Mean value: {:.4}", sum / processed_count as f64);
println!("\n6. Memory-efficient iteration (constant memory)");
let batches = vec![
create_large_batch(0, 2000),
create_large_batch(2000, 2000),
create_large_batch(4000, 2000),
];
let source = MemorySource::new(batches)?;
let dataset = StreamingDataset::new(Box::new(source), 1);
let mut min_val = f64::MAX;
let mut max_val = f64::MIN;
for batch in dataset {
if let Some(values) = batch.column(1).as_any().downcast_ref::<Float64Array>() {
for val in values.iter().flatten() {
min_val = min_val.min(val);
max_val = max_val.max(val);
}
}
}
println!(" Min value: {:.4}", min_val);
println!(" Max value: {:.4}", max_val);
println!(" (Only 1 batch in memory at a time)");
let _ = std::fs::remove_file(&parquet_path);
println!("\n=== Example Complete ===");
Ok(())
}