use pandrs::error::Result;
use pandrs::large::DataFrameOperations;
use pandrs::{DiskBasedDataFrame, DiskConfig};
use std::collections::HashMap;
fn main() -> Result<()> {
let file_path = "examples/data/large_dataset.csv";
println!("Working with large datasets example");
println!("----------------------------------");
let config = DiskConfig {
memory_limit: 500 * 1024 * 1024, chunk_size: 50_000, use_memory_mapping: true, temp_dir: None, };
let disk_df = DiskBasedDataFrame::new(file_path, Some(config))?;
println!("DataFrame Schema:");
for column in disk_df.schema().column_names() {
println!(" - {}", column);
}
let mut chunked_df = disk_df.chunked()?;
let mut total_rows = 0;
println!("\nProcessing in chunks:");
while let Some(chunk) = chunked_df.next_chunk()? {
let chunk_rows = chunk.row_count();
total_rows += chunk_rows;
println!(" - Processed chunk with {} rows", chunk_rows);
}
println!("\nTotal rows in dataset: {}", total_rows);
println!("\nFiltering data:");
let filtered = disk_df.filter(|value, _| {
value.starts_with('A')
})?;
println!("Filtered result has {} rows", filtered.len());
println!("\nSelecting columns:");
let columns_to_select = vec!["column1", "column2"]; let selected = disk_df.select(&columns_to_select)?;
println!("Selected result has {} rows and columns:", selected.len());
if !selected.is_empty() {
for column in selected[0].keys() {
println!(" - {}", column);
}
}
println!("\nGrouping and aggregation:");
let grouped = disk_df.group_by("category_column", "value_column", |values| {
let sum: f64 = values.iter().filter_map(|v| v.parse::<f64>().ok()).sum();
let count = values.len();
if count > 0 {
Ok(format!("{:.2}", sum / count as f64))
} else {
Ok("0.0".to_string())
}
})?;
println!("Grouped result has {} groups", grouped.len());
println!("\nParallel processing example:");
let chunk_results = chunked_df.parallel_process(
|chunk| {
let mut counts = HashMap::new();
for row_idx in 0..chunk.row_count() {
if let Ok(value) = chunk.get_string_value("category_column", row_idx) {
*counts.entry(value.to_string()).or_insert(0) += 1;
}
}
Ok(counts)
},
|chunk_maps| {
let mut result_map = HashMap::new();
for chunk_map in chunk_maps {
for (key, count) in chunk_map {
*result_map.entry(key).or_insert(0) += count;
}
}
Ok(result_map)
},
)?;
println!("Category counts from parallel processing:");
for (category, count) in chunk_results.iter().take(5) {
println!(" - {}: {}", category, count);
}
Ok(())
}