use futures::StreamExt;
use scirs2_io::async_io::{
process_csv_async, process_file_async, AsyncChunkedReader, AsyncLineReader,
AsyncStreamingConfig, CancellationToken,
};
use std::io::Write;
use tempfile::tempdir;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("⚡ Async I/O Example");
println!("==================");
demonstrate_async_chunked_reading().await?;
demonstrate_async_line_reading().await?;
demonstrate_concurrent_processing().await?;
demonstrate_cancellation().await?;
demonstrate_async_csv_processing().await?;
println!("\n✅ All async I/O demonstrations completed successfully!");
println!("💡 Async I/O enables non-blocking, concurrent processing of large datasets");
Ok(())
}
async fn demonstrate_async_chunked_reading() -> Result<(), Box<dyn std::error::Error>> {
println!("\n🔄 Demonstrating Async Chunked Reading...");
let temp_dir = tempdir()?;
let test_file = temp_dir.path().join("async_test.bin");
println!(" 📝 Creating test file (500KB)...");
let mut file = std::fs::File::create(&test_file)?;
let chunk_data = vec![42u8; 1024]; for i in 0..500 {
let mut pattern_data = chunk_data.clone();
pattern_data[0] = (i % 256) as u8;
file.write_all(&pattern_data)?;
}
file.flush()?;
println!(" ⚡ Reading file asynchronously in 32KB chunks...");
let config = AsyncStreamingConfig::new()
.chunk_size(32 * 1024) .timeout(1000);
let mut reader = AsyncChunkedReader::new(&test_file, config).await?;
let mut total_bytes = 0;
let mut chunk_count = 0;
let start_time = std::time::Instant::now();
while let Some(chunk_result) = reader.read_next_chunk().await? {
total_bytes += chunk_result.len();
chunk_count += 1;
if chunk_count % 5 == 0 {
tokio::task::yield_now().await; }
}
let elapsed = start_time.elapsed();
println!(" Total bytes read: {}", total_bytes);
println!(" Number of chunks: {}", chunk_count);
println!(
" Processing time: {:.2}ms",
elapsed.as_secs_f64() * 1000.0
);
println!(
" Speed: {:.2} MB/s",
(total_bytes as f64 / 1024.0 / 1024.0) / elapsed.as_secs_f64()
);
Ok(())
}
async fn demonstrate_async_line_reading() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📄 Demonstrating Async Line Reading...");
let temp_dir = tempdir()?;
let test_file = temp_dir.path().join("async_lines.txt");
println!(" 📝 Creating test file with 2,000 lines...");
let mut file = std::fs::File::create(&test_file)?;
for i in 0..2000 {
writeln!(
file,
"This is async line number {} with timestamp {}",
i,
chrono::Utc::now().timestamp_millis()
)?;
}
file.flush()?;
println!(" ⚡ Reading lines asynchronously in batches of 100...");
let config = AsyncStreamingConfig::new()
.chunk_size(100) .timeout(500);
let mut reader = AsyncLineReader::new(&test_file, config).await?;
let mut total_lines = 0;
let mut batch_count = 0;
let start_time = std::time::Instant::now();
while let Some(lines_result) = reader.next().await {
let lines = lines_result?;
total_lines += lines.len();
batch_count += 1;
if batch_count == 1 {
println!(" First 3 lines of first batch:");
for (i, line) in lines.iter().take(3).enumerate() {
println!(" {}: {}", i + 1, line);
}
}
tokio::task::yield_now().await;
}
let elapsed = start_time.elapsed();
println!(" Total lines read: {}", total_lines);
println!(" Number of batches: {}", batch_count);
println!(
" Processing time: {:.2}ms",
elapsed.as_secs_f64() * 1000.0
);
Ok(())
}
async fn demonstrate_concurrent_processing() -> Result<(), Box<dyn std::error::Error>> {
println!("\n🚀 Demonstrating Concurrent Processing...");
let temp_dir = tempdir()?;
let test_file = temp_dir.path().join("concurrent_test.dat");
println!(" 📝 Creating test file (2MB) for concurrent processing...");
let mut file = std::fs::File::create(&test_file)?;
for i in 0..2048 {
let data: Vec<u8> = (0..1024).map(|j| ((i + j) % 256) as u8).collect();
file.write_all(&data)?;
}
file.flush()?;
println!(" 🚀 Processing with 4 concurrent workers...");
let config = AsyncStreamingConfig::new()
.chunk_size(64 * 1024) .concurrency(4) .timeout(2000);
let start_time = std::time::Instant::now();
let (results, stats) = process_file_async(&test_file, config, |chunk, chunk_id| async move {
let mut checksum = 0u64;
for &byte in &chunk {
checksum = checksum.wrapping_add(byte as u64);
if checksum.is_multiple_of(10000) {
tokio::task::yield_now().await;
}
}
sleep(Duration::from_millis(10)).await;
Ok((chunk_id, checksum, chunk.len()))
})
.await?;
let elapsed = start_time.elapsed();
let total_chunks = results.len();
let total_checksum: u64 = results.iter().map(|(_, checksum, _)| *checksum).sum();
let total_bytes: usize = results.iter().map(|(_, _, size)| *size).sum();
println!(" 📊 Concurrent Processing Results:");
println!(" {}", stats.summary());
println!(" Total chunks processed: {}", total_chunks);
println!(" Total bytes processed: {}", total_bytes);
println!(" Combined checksum: {}", total_checksum);
println!(
" Wall clock time: {:.2}ms",
elapsed.as_secs_f64() * 1000.0
);
println!(
" Effective speedup: {:.2}x",
stats.processing_time_ms / (elapsed.as_secs_f64() * 1000.0)
);
Ok(())
}
async fn demonstrate_cancellation() -> Result<(), Box<dyn std::error::Error>> {
println!("\n⏹️ Demonstrating Cancellation Support...");
let temp_dir = tempdir()?;
let test_file = temp_dir.path().join("cancellation_test.dat");
println!(" 📝 Creating test file for cancellation demo...");
let mut file = std::fs::File::create(&test_file)?;
for i in 0..1000 {
let data = vec![i as u8; 1024]; file.write_all(&data)?;
}
file.flush()?;
println!(" ⏹️ Starting async processing with cancellation...");
let config = AsyncStreamingConfig::new()
.chunk_size(32 * 1024) .concurrency(2);
let token = CancellationToken::new();
let token_clone = token.clone();
let cancel_task = tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
println!(" ⏹️ Cancelling operation...");
token_clone.cancel();
});
let start_time = std::time::Instant::now();
let mut reader = AsyncChunkedReader::new(&test_file, config).await?;
let mut chunks_processed = 0;
let mut bytes_processed = 0;
while let Some(chunk_result) = reader.read_next_chunk().await? {
if token.is_cancelled() {
println!(" ⏹️ Operation cancelled!");
break;
}
let chunk = chunk_result;
bytes_processed += chunk.len();
chunks_processed += 1;
sleep(Duration::from_millis(20)).await;
}
cancel_task.await?;
let elapsed = start_time.elapsed();
println!(" 📊 Cancellation Results:");
println!(
" Chunks processed before cancellation: {}",
chunks_processed
);
println!(" Bytes processed: {}", bytes_processed);
println!(
" Time before cancellation: {:.2}ms",
elapsed.as_secs_f64() * 1000.0
);
println!(" Successfully demonstrated graceful cancellation!");
Ok(())
}
async fn demonstrate_async_csv_processing() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📊 Demonstrating Async CSV Processing...");
let temp_dir = tempdir()?;
let test_file = temp_dir.path().join("async_data.csv");
println!(" 📝 Creating CSV file for async processing (1,000 records)...");
let mut file = std::fs::File::create(&test_file)?;
writeln!(file, "id,value,category,score")?;
for i in 0..1000 {
let value = (i as f64 * 0.1).sin() * 100.0;
let category = match i % 4 {
0 => "A",
1 => "B",
2 => "C",
_ => "D",
};
let score = 50.0 + (i as f64 * 0.05).cos() * 30.0;
writeln!(file, "{},{:.2},{},{:.1}", i, value, category, score)?;
}
file.flush()?;
println!(" ⚡ Processing CSV asynchronously with concurrent workers...");
let config = AsyncStreamingConfig::new()
.chunk_size(1) .concurrency(3) .timeout(1000);
let start_time = std::time::Instant::now();
let (results, stats) = process_csv_async(&test_file, config, |lines, line_id| async move {
if let Some(line) = lines.first() {
let fields: Vec<&str> = line.split(',').collect();
if fields.len() >= 4 {
sleep(Duration::from_millis(1)).await;
let id: i32 = fields[0].parse().unwrap_or(0);
let value: f64 = fields[1].parse().unwrap_or(0.0);
let category = fields[2].to_string();
let score: f64 = fields[3].parse().unwrap_or(0.0);
let adjusted_score = score + value.abs() * 0.1;
return Ok((id, category, adjusted_score));
}
}
Ok((line_id as i32, "unknown".to_string(), 0.0))
})
.await?;
let elapsed = start_time.elapsed();
let mut category_stats = std::collections::HashMap::new();
let mut total_score = 0.0;
for (_, category, score) in &results {
*category_stats.entry(category.clone()).or_insert(0) += 1;
total_score += score;
}
let avg_score = total_score / results.len() as f64;
println!(" 📊 Async CSV Processing Results:");
println!(" {}", stats.summary());
println!(" Records processed: {}", results.len());
println!(" Average adjusted score: {:.2}", avg_score);
println!(" Category distribution:");
for (category, count) in category_stats {
println!(" {}: {} records", category, count);
}
println!(
" Wall clock time: {:.2}ms",
elapsed.as_secs_f64() * 1000.0
);
Ok(())
}