pub mod join;
pub mod merge_sort;
pub mod out_of_core;
pub use join::{hash_join_out_of_core, JoinType as OutOfCoreJoinType};
pub use merge_sort::{external_sort, merge_sorted_chunks};
pub use out_of_core::{AggOp, DataFormat, OutOfCoreConfig, OutOfCoreReader, OutOfCoreWriter};
use memmap2::{Mmap, MmapMut, MmapOptions};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, BufRead, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use crate::dataframe::DataFrame;
use crate::error::{Error, PandRSError, Result};
use crate::optimized::dataframe::OptimizedDataFrame;
use csv::ReaderBuilder;
#[derive(Debug, Clone)]
pub struct DiskConfig {
pub memory_limit: usize,
pub temp_dir: Option<PathBuf>,
pub chunk_size: usize,
pub use_memory_mapping: bool,
}
impl Default for DiskConfig {
fn default() -> Self {
DiskConfig {
memory_limit: 1024 * 1024 * 1024,
temp_dir: None,
chunk_size: 100_000,
use_memory_mapping: true,
}
}
}
#[derive(Debug)]
struct MemoryTracker {
current_usage: usize,
limit: usize,
}
impl MemoryTracker {
fn new(limit: usize) -> Self {
MemoryTracker {
current_usage: 0,
limit,
}
}
fn allocate(&mut self, bytes: usize) -> bool {
let new_usage = self.current_usage + bytes;
if new_usage <= self.limit {
self.current_usage = new_usage;
true
} else {
false
}
}
fn deallocate(&mut self, bytes: usize) {
self.current_usage = self.current_usage.saturating_sub(bytes);
}
fn usage(&self) -> usize {
self.current_usage
}
fn is_limit_reached(&self) -> bool {
self.current_usage >= self.limit
}
}
#[derive(Debug)]
pub struct ChunkedDataFrame {
source_path: PathBuf,
config: DiskConfig,
current_chunk: Option<DataFrame>,
chunk_index: usize,
total_chunks: Option<usize>,
temp_dir: tempfile::TempDir,
memory_tracker: MemoryTracker,
}
impl ChunkedDataFrame {
pub fn new<P: AsRef<Path>>(path: P, config: Option<DiskConfig>) -> Result<Self> {
let config = config.unwrap_or_default();
let source_path = path.as_ref().to_path_buf();
if !source_path.exists() {
return Err(Error::IoError(format!("File not found: {:?}", source_path)));
}
let temp_dir = match &config.temp_dir {
Some(dir) => tempdir_in(dir)?,
None => tempdir()?,
};
let memory_tracker = MemoryTracker::new(config.memory_limit);
Ok(ChunkedDataFrame {
source_path,
config,
current_chunk: None,
chunk_index: 0,
total_chunks: None,
temp_dir,
memory_tracker,
})
}
pub fn next_chunk(&mut self) -> Result<Option<&DataFrame>> {
if self.total_chunks.is_none() {
self.calculate_total_chunks()?;
}
if let Some(total) = self.total_chunks {
if self.chunk_index >= total {
return Ok(None);
}
}
if self.config.use_memory_mapping {
self.load_chunk_mmap()?;
} else {
self.load_chunk_standard()?;
}
self.chunk_index += 1;
Ok(self.current_chunk.as_ref())
}
fn calculate_total_chunks(&mut self) -> Result<()> {
let file = File::open(&self.source_path)?;
let metadata = file.metadata()?;
let file_size = metadata.len() as usize;
let sample_size = file_size.min(1024 * 1024); let mut buffer = vec![0; sample_size];
let mut file = File::open(&self.source_path)?;
let bytes_read = file.read(&mut buffer)?;
buffer.truncate(bytes_read);
let newlines = buffer.iter().filter(|&&b| b == b'\n').count();
if newlines == 0 {
return Err(Error::Consistency(
"Could not determine row count in file".into(),
));
}
let bytes_per_row = sample_size / newlines;
let estimated_rows = file_size / bytes_per_row;
let total_chunks = (estimated_rows + self.config.chunk_size - 1) / self.config.chunk_size;
self.total_chunks = Some(total_chunks);
Ok(())
}
fn load_chunk_mmap(&mut self) -> Result<()> {
let file = File::open(&self.source_path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let (start_pos, end_pos) = self.find_chunk_boundaries(&mmap)?;
let chunk_data = if end_pos > start_pos {
if self.chunk_index == 0 {
&mmap[start_pos..end_pos]
} else {
let header_end = mmap.iter().position(|&b| b == b'\n').unwrap_or(0) + 1;
let header = &mmap[0..header_end];
let data = &mmap[start_pos..end_pos];
let mut combined = Vec::new();
combined.extend_from_slice(header);
combined.extend_from_slice(data);
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(combined.as_slice());
let df = DataFrame::from_csv_reader(&mut reader, true)?;
let estimated_memory = estimate_dataframe_memory(&df);
if !self.memory_tracker.allocate(estimated_memory) {
if let Some(prev_chunk) = self.current_chunk.take() {
self.spill_to_disk(prev_chunk)?;
self.memory_tracker.allocate(estimated_memory);
}
}
self.current_chunk = Some(df);
return Ok(());
}
} else {
&[]
};
let mut reader = csv::ReaderBuilder::new()
.has_headers(self.chunk_index == 0) .from_reader(chunk_data);
let df = if chunk_data.is_empty() {
DataFrame::new()
} else {
DataFrame::from_csv_reader(&mut reader, self.chunk_index == 0)?
};
let estimated_memory = estimate_dataframe_memory(&df);
if !self.memory_tracker.allocate(estimated_memory) {
if let Some(prev_chunk) = self.current_chunk.take() {
self.spill_to_disk(prev_chunk)?;
self.memory_tracker.allocate(estimated_memory);
}
}
self.current_chunk = Some(df);
Ok(())
}
fn load_chunk_standard(&mut self) -> Result<()> {
let file = File::open(&self.source_path)?;
let mut reader = io::BufReader::new(file);
let mut lines = Vec::new();
let mut line_count = 0;
let mut line = String::new();
if self.chunk_index == 0 {
reader.read_line(&mut line)?;
lines.push(line.clone());
line.clear();
} else {
self.skip_to_chunk(&mut reader)?;
}
while line_count < self.config.chunk_size {
line.clear();
let bytes = reader.read_line(&mut line)?;
if bytes == 0 {
break; }
lines.push(line.clone());
line_count += 1;
}
let csv_data = lines.join("");
let mut csv_reader = csv::ReaderBuilder::new()
.has_headers(self.chunk_index == 0) .from_reader(csv_data.as_bytes());
let df = if csv_data.is_empty() {
DataFrame::new()
} else {
DataFrame::from_csv_reader(&mut csv_reader, self.chunk_index == 0)?
};
let estimated_memory = estimate_dataframe_memory(&df);
if !self.memory_tracker.allocate(estimated_memory) {
if let Some(prev_chunk) = self.current_chunk.take() {
self.spill_to_disk(prev_chunk)?;
self.memory_tracker.allocate(estimated_memory);
}
}
self.current_chunk = Some(df);
Ok(())
}
fn find_chunk_boundaries(&self, mmap: &Mmap) -> Result<(usize, usize)> {
let file_size = mmap.len();
if file_size == 0 {
return Ok((0, 0));
}
let header_end = mmap.iter().position(|&b| b == b'\n').unwrap_or(0) + 1;
if self.chunk_index == 0 {
let mut end_pos = header_end;
let mut lines_read = 0;
for i in header_end..file_size {
if mmap[i] == b'\n' {
lines_read += 1;
end_pos = i + 1;
if lines_read == self.config.chunk_size {
break;
}
}
}
Ok((0, end_pos))
} else {
let mut start_pos = header_end;
let mut lines_skipped = 0;
let lines_to_skip = self.chunk_index * self.config.chunk_size;
for i in header_end..file_size {
if mmap[i] == b'\n' {
lines_skipped += 1;
if lines_skipped == lines_to_skip {
start_pos = i + 1;
break;
}
}
}
let mut end_pos = start_pos;
let mut lines_read = 0;
for i in start_pos..file_size {
if mmap[i] == b'\n' {
lines_read += 1;
end_pos = i + 1;
if lines_read == self.config.chunk_size {
break;
}
}
}
Ok((start_pos, end_pos))
}
}
fn skip_to_chunk<R: Read + BufRead>(&self, reader: &mut R) -> Result<()> {
let mut line = String::new();
reader.read_line(&mut line)?;
for _ in 0..(self.chunk_index * self.config.chunk_size) {
line.clear();
let bytes = reader.read_line(&mut line)?;
if bytes == 0 {
break; }
}
Ok(())
}
fn spill_to_disk(&mut self, df: DataFrame) -> Result<()> {
let file_path = self
.temp_dir
.path()
.join(format!("chunk_{}.csv", self.chunk_index));
df.to_csv(&file_path)?;
let estimated_memory = estimate_dataframe_memory(&df);
self.memory_tracker.deallocate(estimated_memory);
Ok(())
}
pub fn process_with<F, T>(&mut self, mut func: F) -> Result<Vec<T>>
where
F: FnMut(&DataFrame) -> Result<T>,
{
let mut results = Vec::new();
while let Some(chunk) = self.next_chunk()? {
let result = func(chunk)?;
results.push(result);
}
Ok(results)
}
pub fn parallel_process<F, T, C>(&mut self, chunk_func: F, combiner: C) -> Result<T>
where
F: Fn(&DataFrame) -> Result<T> + Send + Sync,
T: Send + 'static,
C: FnOnce(Vec<T>) -> Result<T>,
{
use rayon::prelude::*;
let mut chunk_paths = Vec::new();
while let Some(_) = self.next_chunk()? {
if let Some(chunk) = self.current_chunk.take() {
let file_path = self
.temp_dir
.path()
.join(format!("chunk_{}.csv", self.chunk_index - 1));
chunk.to_csv(&file_path)?;
chunk_paths.push(file_path);
let estimated_memory = estimate_dataframe_memory(&chunk);
self.memory_tracker.deallocate(estimated_memory);
}
}
let results: Vec<Result<T>> = chunk_paths
.par_iter()
.map(|path| {
let df = DataFrame::from_csv(path, true)?;
chunk_func(&df)
})
.collect();
let mut unwrapped_results = Vec::new();
for result in results {
unwrapped_results.push(result?);
}
combiner(unwrapped_results)
}
}
fn estimate_dataframe_memory(df: &DataFrame) -> usize {
let row_count = df.row_count();
let col_count = df.column_count();
let avg_bytes_per_cell = 16;
let base_overhead = 1000;
base_overhead + (row_count * col_count * avg_bytes_per_cell)
}
fn tempdir_in<P: AsRef<Path>>(dir: P) -> io::Result<tempfile::TempDir> {
tempfile::Builder::new().prefix("pandrs_").tempdir_in(dir)
}
#[derive(Debug)]
pub struct DiskBasedDataFrame {
source_path: PathBuf,
config: DiskConfig,
schema: DataFrame,
mmap: Option<Mmap>,
temp_dir: tempfile::TempDir,
memory_tracker: Arc<Mutex<MemoryTracker>>,
}
impl DiskBasedDataFrame {
pub fn new<P: AsRef<Path>>(path: P, config: Option<DiskConfig>) -> Result<Self> {
let config = config.unwrap_or_default();
let source_path = path.as_ref().to_path_buf();
if !source_path.exists() {
return Err(Error::IoError(format!("File not found: {:?}", source_path)));
}
let temp_dir = match &config.temp_dir {
Some(dir) => tempdir_in(dir)?,
None => tempdir()?,
};
let file = File::open(&source_path)?;
let mut reader = csv::Reader::from_reader(io::BufReader::new(file));
let headers = reader.headers()?.clone();
let mut schema = DataFrame::new();
for header in headers.iter() {
schema.add_column(
header.to_string(),
crate::series::Series::new(Vec::<String>::new(), Some(header.to_string()))?,
)?;
}
let memory_tracker = Arc::new(Mutex::new(MemoryTracker::new(config.memory_limit)));
let mmap = if config.use_memory_mapping {
let file = File::open(&source_path)?;
Some(unsafe { MmapOptions::new().map(&file)? })
} else {
None
};
Ok(DiskBasedDataFrame {
source_path,
config,
schema,
mmap,
temp_dir,
memory_tracker,
})
}
pub fn schema(&self) -> &DataFrame {
&self.schema
}
pub fn chunked(&self) -> Result<ChunkedDataFrame> {
ChunkedDataFrame::new(&self.source_path, Some(self.config.clone()))
}
pub fn apply<F, T>(&self, function: F) -> Result<T>
where
F: FnMut(&DataFrame) -> Result<T>,
T: Send + 'static,
{
let mut chunked = self.chunked()?;
let results = chunked.process_with(function)?;
if results.is_empty() {
return Err(Error::EmptyDataFrame("No data to process".into()));
}
Ok(results
.into_iter()
.last()
.expect("operation should succeed"))
}
pub fn aggregate<F, C, T>(&self, chunk_func: F, combiner: C) -> Result<T>
where
F: Fn(&DataFrame) -> Result<T> + Send + Sync,
T: Send + 'static,
C: FnOnce(Vec<T>) -> Result<T>,
{
let mut chunked = self.chunked()?;
chunked.parallel_process(chunk_func, combiner)
}
}
pub trait DataFrameOperations {
fn filter(
&self,
condition: impl Fn(&str, usize) -> bool + Send + Sync,
) -> Result<Vec<HashMap<String, String>>>;
fn select(&self, columns: &[&str]) -> Result<Vec<HashMap<String, String>>>;
fn transform(
&self,
transformation: impl Fn(&str, &str, usize) -> Result<String> + Send + Sync,
) -> Result<Vec<HashMap<String, String>>>;
fn group_by(
&self,
group_column: &str,
agg_column: &str,
agg_func: impl Fn(Vec<String>) -> Result<String> + Send + Sync,
) -> Result<HashMap<String, Vec<String>>>;
}
impl DataFrameOperations for DiskBasedDataFrame {
fn filter(
&self,
condition: impl Fn(&str, usize) -> bool + Send + Sync,
) -> Result<Vec<HashMap<String, String>>> {
self.aggregate(
|chunk| {
let mut result_rows = Vec::new();
for row_idx in 0..chunk.row_count() {
let mut keep_row = true;
for col_name in chunk.column_names() {
let value = chunk.get_string_value(&col_name, row_idx)?;
if !condition(value, row_idx) {
keep_row = false;
break;
}
}
if keep_row {
let mut row = HashMap::new();
for col_name in chunk.column_names() {
let value = chunk.get_string_value(&col_name, row_idx)?;
row.insert(col_name.clone(), value.to_string());
}
result_rows.push(row);
}
}
Ok(result_rows)
},
|all_rows: Vec<Vec<HashMap<String, String>>>| {
let mut combined_rows = Vec::new();
for chunk_rows in all_rows {
combined_rows.extend(chunk_rows);
}
Ok(combined_rows)
},
)
}
fn select(&self, columns: &[&str]) -> Result<Vec<HashMap<String, String>>> {
for &col in columns {
if !self.schema.contains_column(col) {
return Err(Error::Column(format!("Column '{}' does not exist", col)));
}
}
self.aggregate(
|chunk| {
let mut selected_rows = Vec::new();
for row_idx in 0..chunk.row_count() {
let mut row = HashMap::new();
for &col in columns {
if chunk.contains_column(col) {
let value = chunk.get_string_value(&col, row_idx)?;
row.insert(col.to_string(), value.to_string());
}
}
selected_rows.push(row);
}
Ok(selected_rows)
},
|all_rows: Vec<Vec<HashMap<String, String>>>| {
let mut combined_rows = Vec::new();
for chunk_rows in all_rows {
combined_rows.extend(chunk_rows);
}
Ok(combined_rows)
},
)
}
fn transform(
&self,
transformation: impl Fn(&str, &str, usize) -> Result<String> + Send + Sync,
) -> Result<Vec<HashMap<String, String>>> {
self.aggregate(
|chunk| {
let mut transformed_rows = Vec::new();
for row_idx in 0..chunk.row_count() {
let mut row = HashMap::new();
for col_name in chunk.column_names() {
let value = chunk.get_string_value(&col_name, row_idx)?;
let new_value = transformation(&col_name, value, row_idx)?;
row.insert(col_name.clone(), new_value);
}
transformed_rows.push(row);
}
Ok(transformed_rows)
},
|all_rows: Vec<Vec<HashMap<String, String>>>| {
let mut combined_rows = Vec::new();
for chunk_rows in all_rows {
combined_rows.extend(chunk_rows);
}
Ok(combined_rows)
},
)
}
fn group_by(
&self,
group_column: &str,
agg_column: &str,
agg_func: impl Fn(Vec<String>) -> Result<String> + Send + Sync,
) -> Result<HashMap<String, Vec<String>>> {
if !self.schema.contains_column(group_column) {
return Err(Error::Column(format!(
"Group column '{}' does not exist",
group_column
)));
}
if !self.schema.contains_column(agg_column) {
return Err(Error::Column(format!(
"Aggregation column '{}' does not exist",
agg_column
)));
}
self.aggregate(
|chunk| {
let mut grouped_data: HashMap<String, Vec<String>> = HashMap::new();
for row_idx in 0..chunk.row_count() {
let group_value = chunk.get_string_value(group_column, row_idx)?;
let agg_value = chunk.get_string_value(agg_column, row_idx)?;
grouped_data
.entry(group_value.to_string())
.or_insert_with(Vec::new)
.push(agg_value.to_string());
}
Ok(grouped_data)
},
|chunk_maps| {
let mut result_map: HashMap<String, Vec<String>> = HashMap::new();
for chunk_map in chunk_maps {
for (key, values) in chunk_map {
result_map
.entry(key)
.or_insert_with(Vec::new)
.extend(values);
}
}
Ok(result_map)
},
)
}
}
#[derive(Debug)]
pub struct DiskBasedOptimizedDataFrame {
inner: DiskBasedDataFrame,
}
impl DiskBasedOptimizedDataFrame {
pub fn new<P: AsRef<Path>>(path: P, config: Option<DiskConfig>) -> Result<Self> {
Ok(DiskBasedOptimizedDataFrame {
inner: DiskBasedDataFrame::new(path, config)?,
})
}
pub fn to_optimized_dataframe(&self) -> Result<OptimizedDataFrame> {
self.inner.aggregate(
|chunk| Ok(OptimizedDataFrame::from_dataframe(chunk)?),
|chunk_results| {
if chunk_results.is_empty() {
return Ok(OptimizedDataFrame::new());
}
let mut result = chunk_results[0].clone();
for chunk in chunk_results.iter().skip(1) {
result = result.concat_rows(chunk)?;
}
Ok(result)
},
)
}
pub fn aggregate<F, C, T>(&self, chunk_func: F, combiner: C) -> Result<T>
where
F: Fn(&OptimizedDataFrame) -> Result<T> + Send + Sync,
T: Send + 'static,
C: FnOnce(Vec<T>) -> Result<T>,
{
self.inner.aggregate(
|chunk| {
let opt_chunk = OptimizedDataFrame::from_dataframe(chunk)?;
chunk_func(&opt_chunk)
},
combiner,
)
}
}