Skip to main content

pandrs/large/
out_of_core.rs

1//! Out-of-core DataFrame processing for datasets larger than RAM.
2//!
3//! This module provides streaming chunk-based processing so that arbitrarily
4//! large CSV/JSON datasets can be processed without loading them entirely into
5//! memory.
6
7use std::collections::HashMap;
8use std::fs::{File, OpenOptions};
9use std::io::{self, BufRead, BufReader, BufWriter, Write};
10use std::path::{Path, PathBuf};
11
12use csv::{ReaderBuilder, WriterBuilder};
13use rayon::prelude::*;
14
15use crate::core::error::{Error, Result};
16use crate::dataframe::DataFrame;
17use crate::series::Series;
18
19// ---------------------------------------------------------------------------
20// Configuration
21// ---------------------------------------------------------------------------
22
23/// Configuration for out-of-core processing.
24#[derive(Debug, Clone)]
25pub struct OutOfCoreConfig {
26    /// Number of data rows per chunk (not counting the CSV header).
27    pub chunk_size: usize,
28    /// Maximum bytes of RAM to use at a time (informational; actual enforcement
29    /// is done by keeping only `chunk_size` rows in memory at once).
30    pub max_memory_bytes: usize,
31    /// Directory where temporary chunk files are written.
32    pub temp_dir: PathBuf,
33    /// Whether to use gzip compression for temporary files.
34    pub compression: bool,
35    /// Number of chunks to process in parallel (via rayon).
36    pub parallelism: usize,
37}
38
39impl Default for OutOfCoreConfig {
40    fn default() -> Self {
41        OutOfCoreConfig {
42            chunk_size: 100_000,
43            max_memory_bytes: 512 * 1024 * 1024, // 512 MB
44            temp_dir: std::env::temp_dir(),
45            compression: false,
46            parallelism: num_cpus::get(),
47        }
48    }
49}
50
51// ---------------------------------------------------------------------------
52// DataFormat
53// ---------------------------------------------------------------------------
54
55/// Supported source file formats for out-of-core processing.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum DataFormat {
58    Csv,
59    Json,
60    Parquet,
61}
62
63// ---------------------------------------------------------------------------
64// AggOp
65// ---------------------------------------------------------------------------
66
67/// Aggregation operations supported across chunks.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum AggOp {
70    Sum,
71    Mean,
72    Min,
73    Max,
74    Count,
75}
76
77// ---------------------------------------------------------------------------
78// Internal helper: chunk writer
79// ---------------------------------------------------------------------------
80
81/// Write a slice of CSV rows (with header) to a temp file, returning its path.
82fn write_chunk_file(
83    rows: &[Vec<String>],
84    headers: &[String],
85    dir: &Path,
86    index: usize,
87) -> Result<PathBuf> {
88    let path = dir.join(format!("pandrs_ooc_chunk_{}.csv", index));
89    let file = File::create(&path).map_err(|e| Error::IoError(e.to_string()))?;
90    let mut wtr = WriterBuilder::new().from_writer(BufWriter::new(file));
91
92    wtr.write_record(headers)
93        .map_err(|e| Error::CsvError(e.to_string()))?;
94    for row in rows {
95        wtr.write_record(row)
96            .map_err(|e| Error::CsvError(e.to_string()))?;
97    }
98    wtr.flush().map_err(|e| Error::IoError(e.to_string()))?;
99    Ok(path)
100}
101
102/// Read a temp chunk file back into a DataFrame.
103fn read_chunk_file(path: &Path) -> Result<DataFrame> {
104    crate::io::csv::read_csv(path, true)
105}
106
107/// Convert a DataFrame to CSV rows (excluding header row).
108fn dataframe_to_rows(df: &DataFrame) -> Result<(Vec<String>, Vec<Vec<String>>)> {
109    let col_names = df.column_names();
110    let row_count = df.row_count();
111    let mut rows: Vec<Vec<String>> = Vec::with_capacity(row_count);
112    for i in 0..row_count {
113        let mut row: Vec<String> = Vec::with_capacity(col_names.len());
114        for col in &col_names {
115            let val = df.get_string_value(col, i).unwrap_or("").to_string();
116            row.push(val);
117        }
118        rows.push(row);
119    }
120    Ok((col_names, rows))
121}
122
123/// Write a full DataFrame to a CSV file.
124fn write_dataframe_csv(df: &DataFrame, path: &Path) -> Result<()> {
125    let (headers, rows) = dataframe_to_rows(df)?;
126    let file = File::create(path).map_err(|e| Error::IoError(e.to_string()))?;
127    let mut wtr = WriterBuilder::new().from_writer(BufWriter::new(file));
128    wtr.write_record(&headers)
129        .map_err(|e| Error::CsvError(e.to_string()))?;
130    for row in &rows {
131        wtr.write_record(row)
132            .map_err(|e| Error::CsvError(e.to_string()))?;
133    }
134    wtr.flush().map_err(|e| Error::IoError(e.to_string()))
135}
136
137// ---------------------------------------------------------------------------
138// OutOfCoreReader
139// ---------------------------------------------------------------------------
140
141/// Iterator-based out-of-core DataFrame processor.
142///
143/// Reads a large source file in chunks of `config.chunk_size` rows.
144pub struct OutOfCoreReader {
145    source_path: PathBuf,
146    pub(crate) format: DataFormat,
147    pub(crate) config: OutOfCoreConfig,
148    /// Cached total row count (excluding header), filled lazily.
149    total_rows: Option<usize>,
150}
151
152impl OutOfCoreReader {
153    /// Create a reader for a CSV source.
154    pub fn from_csv(path: impl AsRef<Path>, config: OutOfCoreConfig) -> Result<Self> {
155        let source_path = path.as_ref().to_path_buf();
156        if !source_path.exists() {
157            return Err(Error::IoError(format!(
158                "File not found: {}",
159                source_path.display()
160            )));
161        }
162        Ok(OutOfCoreReader {
163            source_path,
164            format: DataFormat::Csv,
165            config,
166            total_rows: None,
167        })
168    }
169
170    // -----------------------------------------------------------------------
171    // count
172    // -----------------------------------------------------------------------
173
174    /// Count the total number of data rows in the source file (excluding header).
175    pub fn count(&self) -> Result<usize> {
176        match self.format {
177            DataFormat::Csv => self.count_csv_rows(),
178            DataFormat::Json => Err(Error::NotImplemented("count() for JSON format".into())),
179            DataFormat::Parquet => Err(Error::NotImplemented("count() for Parquet format".into())),
180        }
181    }
182
183    fn count_csv_rows(&self) -> Result<usize> {
184        let file = File::open(&self.source_path).map_err(|e| Error::IoError(e.to_string()))?;
185        let reader = BufReader::new(file);
186        // subtract 1 for the header line
187        let mut count: usize = 0;
188        let mut first = true;
189        for line_result in reader.lines() {
190            let _ = line_result.map_err(|e| Error::IoError(e.to_string()))?;
191            if first {
192                first = false;
193                continue; // skip header
194            }
195            count += 1;
196        }
197        Ok(count)
198    }
199
200    // -----------------------------------------------------------------------
201    // chunk iteration helpers
202    // -----------------------------------------------------------------------
203
204    /// Call `f` with each chunk in sequence. The chunks are `DataFrame` values
205    /// of at most `config.chunk_size` rows.
206    fn for_each_chunk<F>(&self, mut f: F) -> Result<()>
207    where
208        F: FnMut(DataFrame) -> Result<()>,
209    {
210        match self.format {
211            DataFormat::Csv => self.for_each_csv_chunk(&mut f),
212            DataFormat::Json => Err(Error::NotImplemented(
213                "chunked iteration for JSON format".into(),
214            )),
215            DataFormat::Parquet => Err(Error::NotImplemented(
216                "chunked iteration for Parquet format".into(),
217            )),
218        }
219    }
220
221    fn for_each_csv_chunk<F>(&self, f: &mut F) -> Result<()>
222    where
223        F: FnMut(DataFrame) -> Result<()>,
224    {
225        let file = File::open(&self.source_path).map_err(|e| Error::IoError(e.to_string()))?;
226        let mut rdr = ReaderBuilder::new()
227            .has_headers(true)
228            .flexible(true)
229            .trim(csv::Trim::All)
230            .from_reader(BufReader::new(file));
231
232        let headers: Vec<String> = rdr
233            .headers()
234            .map_err(|e| Error::CsvError(e.to_string()))?
235            .iter()
236            .map(|h| h.to_string())
237            .collect();
238
239        let chunk_size = self.config.chunk_size;
240        let mut rows: Vec<Vec<String>> = Vec::with_capacity(chunk_size);
241
242        let flush_chunk = |rows: &mut Vec<Vec<String>>, headers: &[String]| -> Result<DataFrame> {
243            let mut df = DataFrame::new();
244            let num_cols = headers.len();
245            let mut col_data: Vec<Vec<String>> = vec![Vec::with_capacity(rows.len()); num_cols];
246            for row in rows.iter() {
247                for (ci, cell) in row.iter().enumerate() {
248                    if ci < num_cols {
249                        col_data[ci].push(cell.clone());
250                    }
251                }
252            }
253            for (ci, col_name) in headers.iter().enumerate() {
254                let series = Series::new(col_data[ci].clone(), Some(col_name.clone()))
255                    .map_err(|e| Error::Operation(e.to_string()))?;
256                df.add_column(col_name.clone(), series)
257                    .map_err(|e| Error::Operation(e.to_string()))?;
258            }
259            rows.clear();
260            Ok(df)
261        };
262
263        for record_result in rdr.records() {
264            let record = record_result.map_err(|e| Error::CsvError(e.to_string()))?;
265            let row: Vec<String> = record.iter().map(|f| f.to_string()).collect();
266            rows.push(row);
267            if rows.len() >= chunk_size {
268                let df = flush_chunk(&mut rows, &headers)?;
269                f(df)?;
270            }
271        }
272
273        // flush remaining rows
274        if !rows.is_empty() {
275            let df = flush_chunk(&mut rows, &headers)?;
276            f(df)?;
277        }
278
279        Ok(())
280    }
281
282    // -----------------------------------------------------------------------
283    // map
284    // -----------------------------------------------------------------------
285
286    /// Apply a transformation function to each chunk, writing intermediate
287    /// results to temp files and returning an `OutOfCoreWriter`.
288    pub fn map<F>(self, f: F) -> Result<OutOfCoreWriter>
289    where
290        F: Fn(DataFrame) -> Result<DataFrame> + Send + Sync,
291    {
292        // Collect all chunks into temp files first so rayon can parallelise
293        let mut chunk_input_paths: Vec<PathBuf> = Vec::new();
294        let mut chunk_index = 0usize;
295        self.for_each_chunk(|chunk_df| {
296            let path = write_chunk_file(
297                &dataframe_to_rows(&chunk_df)?.1,
298                &chunk_df.column_names(),
299                &self.config.temp_dir,
300                chunk_index,
301            )?;
302            chunk_input_paths.push(path);
303            chunk_index += 1;
304            Ok(())
305        })?;
306
307        // Process in parallel
308        let config = self.config.clone();
309        let results: Vec<Result<PathBuf>> = chunk_input_paths
310            .par_iter()
311            .enumerate()
312            .map(|(i, input_path)| {
313                let chunk_df = read_chunk_file(input_path)?;
314                let transformed = f(chunk_df)?;
315                let out_path = config.temp_dir.join(format!("pandrs_ooc_mapped_{}.csv", i));
316                write_dataframe_csv(&transformed, &out_path)?;
317                Ok(out_path)
318            })
319            .collect();
320
321        let mut output_chunks: Vec<PathBuf> = Vec::with_capacity(results.len());
322        for r in results {
323            output_chunks.push(r?);
324        }
325
326        // Clean up input chunk files
327        for p in &chunk_input_paths {
328            let _ = std::fs::remove_file(p);
329        }
330
331        Ok(OutOfCoreWriter {
332            chunks: output_chunks,
333            config: self.config,
334        })
335    }
336
337    // -----------------------------------------------------------------------
338    // collect
339    // -----------------------------------------------------------------------
340
341    /// Load and concatenate all chunks into a single in-memory DataFrame.
342    ///
343    /// Only use this when the total data fits in RAM.
344    pub fn collect(self) -> Result<DataFrame> {
345        let mut all_dfs: Vec<DataFrame> = Vec::new();
346        self.for_each_chunk(|chunk| {
347            all_dfs.push(chunk);
348            Ok(())
349        })?;
350        concat_dataframes(all_dfs)
351    }
352
353    // -----------------------------------------------------------------------
354    // foreach
355    // -----------------------------------------------------------------------
356
357    /// Apply a function to each chunk without collecting results.
358    pub fn foreach<F>(&self, f: F) -> Result<()>
359    where
360        F: Fn(DataFrame) -> Result<()> + Send + Sync,
361    {
362        self.for_each_chunk(f)
363    }
364
365    // -----------------------------------------------------------------------
366    // aggregate
367    // -----------------------------------------------------------------------
368
369    /// Compute per-column aggregates across all chunks.
370    ///
371    /// Returns a one-row `DataFrame`.  The output column names are
372    /// `"{column}_{op}"` (e.g. `"value_sum"`, `"id_count"`), so multiple
373    /// aggregation operations on the same source column do not collide.
374    ///
375    /// `ops` is a slice of `(column_name, AggOp)` pairs.
376    pub fn aggregate(&self, ops: &[(&str, AggOp)]) -> Result<DataFrame> {
377        struct ColState {
378            /// Source column name.
379            source_col: String,
380            /// Output column name, e.g. "value_sum".
381            output_col: String,
382            sum: f64,
383            count: usize,
384            min: Option<f64>,
385            max: Option<f64>,
386            op: AggOp,
387        }
388
389        let mut states: Vec<ColState> = ops
390            .iter()
391            .map(|(col, op)| {
392                let op_str = match op {
393                    AggOp::Sum => "sum",
394                    AggOp::Mean => "mean",
395                    AggOp::Min => "min",
396                    AggOp::Max => "max",
397                    AggOp::Count => "count",
398                };
399                ColState {
400                    source_col: col.to_string(),
401                    output_col: format!("{}_{}", col, op_str),
402                    sum: 0.0,
403                    count: 0,
404                    min: None,
405                    max: None,
406                    op: *op,
407                }
408            })
409            .collect();
410
411        self.for_each_chunk(|chunk| {
412            for state in states.iter_mut() {
413                if !chunk.contains_column(&state.source_col) {
414                    continue;
415                }
416                let row_count = chunk.row_count();
417                for row_idx in 0..row_count {
418                    let val_str = chunk
419                        .get_string_value(&state.source_col, row_idx)
420                        .unwrap_or("0");
421                    if let Ok(val) = val_str.parse::<f64>() {
422                        state.sum += val;
423                        state.count += 1;
424                        state.min = Some(state.min.map_or(val, |m: f64| m.min(val)));
425                        state.max = Some(state.max.map_or(val, |m: f64| m.max(val)));
426                    }
427                }
428            }
429            Ok(())
430        })?;
431
432        // Build result DataFrame (one row per aggregate)
433        let mut result_df = DataFrame::new();
434        for state in &states {
435            let value = match state.op {
436                AggOp::Sum => state.sum,
437                AggOp::Mean => {
438                    if state.count > 0 {
439                        state.sum / state.count as f64
440                    } else {
441                        f64::NAN
442                    }
443                }
444                AggOp::Min => state.min.unwrap_or(f64::NAN),
445                AggOp::Max => state.max.unwrap_or(f64::NAN),
446                AggOp::Count => state.count as f64,
447            };
448            let series = Series::new(vec![value.to_string()], Some(state.output_col.clone()))
449                .map_err(|e| Error::Operation(e.to_string()))?;
450            result_df
451                .add_column(state.output_col.clone(), series)
452                .map_err(|e| Error::Operation(e.to_string()))?;
453        }
454        Ok(result_df)
455    }
456}
457
458// ---------------------------------------------------------------------------
459// OutOfCoreWriter
460// ---------------------------------------------------------------------------
461
462/// Holds a list of temporary chunk files resulting from an out-of-core
463/// `map` or join operation.
464pub struct OutOfCoreWriter {
465    /// Paths to temporary chunk CSV files.
466    pub(crate) chunks: Vec<PathBuf>,
467    pub(crate) config: OutOfCoreConfig,
468}
469
470impl OutOfCoreWriter {
471    /// Write all chunks to a single CSV output file.
472    pub fn write_csv(&self, path: impl AsRef<Path>) -> Result<()> {
473        let out_path = path.as_ref();
474        let out_file = File::create(out_path).map_err(|e| Error::IoError(e.to_string()))?;
475        let mut out = BufWriter::new(out_file);
476
477        let mut header_written = false;
478        for chunk_path in &self.chunks {
479            let chunk_file = File::open(chunk_path).map_err(|e| Error::IoError(e.to_string()))?;
480            let reader = BufReader::new(chunk_file);
481            let mut lines = reader.lines();
482
483            // First line is header
484            if let Some(header_result) = lines.next() {
485                let header = header_result.map_err(|e| Error::IoError(e.to_string()))?;
486                if !header_written {
487                    out.write_all(header.as_bytes())
488                        .map_err(|e| Error::IoError(e.to_string()))?;
489                    out.write_all(b"\n")
490                        .map_err(|e| Error::IoError(e.to_string()))?;
491                    header_written = true;
492                }
493            }
494
495            for line_result in lines {
496                let line = line_result.map_err(|e| Error::IoError(e.to_string()))?;
497                if !line.trim().is_empty() {
498                    out.write_all(line.as_bytes())
499                        .map_err(|e| Error::IoError(e.to_string()))?;
500                    out.write_all(b"\n")
501                        .map_err(|e| Error::IoError(e.to_string()))?;
502                }
503            }
504        }
505        out.flush().map_err(|e| Error::IoError(e.to_string()))?;
506        Ok(())
507    }
508
509    /// Write all chunks to a JSON file (records orientation).
510    pub fn write_json(&self, path: impl AsRef<Path>) -> Result<()> {
511        let df = self.collect()?;
512        crate::io::json::write_json(&df, path, crate::io::json::JsonOrient::Records)
513    }
514
515    /// Merge all chunks into a single in-memory DataFrame.
516    pub fn collect(&self) -> Result<DataFrame> {
517        let mut all_dfs: Vec<DataFrame> = Vec::new();
518        for chunk_path in &self.chunks {
519            let df = read_chunk_file(chunk_path)?;
520            all_dfs.push(df);
521        }
522        concat_dataframes(all_dfs)
523    }
524}
525
526impl Drop for OutOfCoreWriter {
527    fn drop(&mut self) {
528        for p in &self.chunks {
529            let _ = std::fs::remove_file(p);
530        }
531    }
532}
533
534// ---------------------------------------------------------------------------
535// Helper: concatenate DataFrames
536// ---------------------------------------------------------------------------
537
538/// Concatenate a list of DataFrames vertically (same columns required).
539pub(crate) fn concat_dataframes(dfs: Vec<DataFrame>) -> Result<DataFrame> {
540    if dfs.is_empty() {
541        return Ok(DataFrame::new());
542    }
543
544    let col_names = dfs[0].column_names();
545    let total_rows: usize = dfs.iter().map(|df| df.row_count()).sum();
546
547    // Build per-column data
548    let mut col_data: HashMap<String, Vec<String>> = col_names
549        .iter()
550        .map(|n| (n.clone(), Vec::with_capacity(total_rows)))
551        .collect();
552
553    for df in &dfs {
554        let row_count = df.row_count();
555        for col in &col_names {
556            for row_idx in 0..row_count {
557                let val = df.get_string_value(col, row_idx).unwrap_or("").to_string();
558                if let Some(vec) = col_data.get_mut(col) {
559                    vec.push(val);
560                }
561            }
562        }
563    }
564
565    let mut result = DataFrame::new();
566    for col_name in &col_names {
567        let values = col_data.remove(col_name).unwrap_or_default();
568        let series = Series::new(values, Some(col_name.clone()))
569            .map_err(|e| Error::Operation(e.to_string()))?;
570        result
571            .add_column(col_name.clone(), series)
572            .map_err(|e| Error::Operation(e.to_string()))?;
573    }
574
575    Ok(result)
576}