1use std::collections::HashMap;
8use std::fs::{File, OpenOptions};
9use std::io::{self, BufRead, BufReader, BufWriter, Write};
10use std::path::{Path, PathBuf};
11
12use csv::{ReaderBuilder, WriterBuilder};
13use rayon::prelude::*;
14
15use crate::core::error::{Error, Result};
16use crate::dataframe::DataFrame;
17use crate::series::Series;
18
19#[derive(Debug, Clone)]
25pub struct OutOfCoreConfig {
26 pub chunk_size: usize,
28 pub max_memory_bytes: usize,
31 pub temp_dir: PathBuf,
33 pub compression: bool,
35 pub parallelism: usize,
37}
38
39impl Default for OutOfCoreConfig {
40 fn default() -> Self {
41 OutOfCoreConfig {
42 chunk_size: 100_000,
43 max_memory_bytes: 512 * 1024 * 1024, temp_dir: std::env::temp_dir(),
45 compression: false,
46 parallelism: num_cpus::get(),
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum DataFormat {
58 Csv,
59 Json,
60 Parquet,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum AggOp {
70 Sum,
71 Mean,
72 Min,
73 Max,
74 Count,
75}
76
77fn write_chunk_file(
83 rows: &[Vec<String>],
84 headers: &[String],
85 dir: &Path,
86 index: usize,
87) -> Result<PathBuf> {
88 let path = dir.join(format!("pandrs_ooc_chunk_{}.csv", index));
89 let file = File::create(&path).map_err(|e| Error::IoError(e.to_string()))?;
90 let mut wtr = WriterBuilder::new().from_writer(BufWriter::new(file));
91
92 wtr.write_record(headers)
93 .map_err(|e| Error::CsvError(e.to_string()))?;
94 for row in rows {
95 wtr.write_record(row)
96 .map_err(|e| Error::CsvError(e.to_string()))?;
97 }
98 wtr.flush().map_err(|e| Error::IoError(e.to_string()))?;
99 Ok(path)
100}
101
102fn read_chunk_file(path: &Path) -> Result<DataFrame> {
104 crate::io::csv::read_csv(path, true)
105}
106
107fn dataframe_to_rows(df: &DataFrame) -> Result<(Vec<String>, Vec<Vec<String>>)> {
109 let col_names = df.column_names();
110 let row_count = df.row_count();
111 let mut rows: Vec<Vec<String>> = Vec::with_capacity(row_count);
112 for i in 0..row_count {
113 let mut row: Vec<String> = Vec::with_capacity(col_names.len());
114 for col in &col_names {
115 let val = df.get_string_value(col, i).unwrap_or("").to_string();
116 row.push(val);
117 }
118 rows.push(row);
119 }
120 Ok((col_names, rows))
121}
122
123fn write_dataframe_csv(df: &DataFrame, path: &Path) -> Result<()> {
125 let (headers, rows) = dataframe_to_rows(df)?;
126 let file = File::create(path).map_err(|e| Error::IoError(e.to_string()))?;
127 let mut wtr = WriterBuilder::new().from_writer(BufWriter::new(file));
128 wtr.write_record(&headers)
129 .map_err(|e| Error::CsvError(e.to_string()))?;
130 for row in &rows {
131 wtr.write_record(row)
132 .map_err(|e| Error::CsvError(e.to_string()))?;
133 }
134 wtr.flush().map_err(|e| Error::IoError(e.to_string()))
135}
136
137pub struct OutOfCoreReader {
145 source_path: PathBuf,
146 pub(crate) format: DataFormat,
147 pub(crate) config: OutOfCoreConfig,
148 total_rows: Option<usize>,
150}
151
152impl OutOfCoreReader {
153 pub fn from_csv(path: impl AsRef<Path>, config: OutOfCoreConfig) -> Result<Self> {
155 let source_path = path.as_ref().to_path_buf();
156 if !source_path.exists() {
157 return Err(Error::IoError(format!(
158 "File not found: {}",
159 source_path.display()
160 )));
161 }
162 Ok(OutOfCoreReader {
163 source_path,
164 format: DataFormat::Csv,
165 config,
166 total_rows: None,
167 })
168 }
169
170 pub fn count(&self) -> Result<usize> {
176 match self.format {
177 DataFormat::Csv => self.count_csv_rows(),
178 DataFormat::Json => Err(Error::NotImplemented("count() for JSON format".into())),
179 DataFormat::Parquet => Err(Error::NotImplemented("count() for Parquet format".into())),
180 }
181 }
182
183 fn count_csv_rows(&self) -> Result<usize> {
184 let file = File::open(&self.source_path).map_err(|e| Error::IoError(e.to_string()))?;
185 let reader = BufReader::new(file);
186 let mut count: usize = 0;
188 let mut first = true;
189 for line_result in reader.lines() {
190 let _ = line_result.map_err(|e| Error::IoError(e.to_string()))?;
191 if first {
192 first = false;
193 continue; }
195 count += 1;
196 }
197 Ok(count)
198 }
199
200 fn for_each_chunk<F>(&self, mut f: F) -> Result<()>
207 where
208 F: FnMut(DataFrame) -> Result<()>,
209 {
210 match self.format {
211 DataFormat::Csv => self.for_each_csv_chunk(&mut f),
212 DataFormat::Json => Err(Error::NotImplemented(
213 "chunked iteration for JSON format".into(),
214 )),
215 DataFormat::Parquet => Err(Error::NotImplemented(
216 "chunked iteration for Parquet format".into(),
217 )),
218 }
219 }
220
221 fn for_each_csv_chunk<F>(&self, f: &mut F) -> Result<()>
222 where
223 F: FnMut(DataFrame) -> Result<()>,
224 {
225 let file = File::open(&self.source_path).map_err(|e| Error::IoError(e.to_string()))?;
226 let mut rdr = ReaderBuilder::new()
227 .has_headers(true)
228 .flexible(true)
229 .trim(csv::Trim::All)
230 .from_reader(BufReader::new(file));
231
232 let headers: Vec<String> = rdr
233 .headers()
234 .map_err(|e| Error::CsvError(e.to_string()))?
235 .iter()
236 .map(|h| h.to_string())
237 .collect();
238
239 let chunk_size = self.config.chunk_size;
240 let mut rows: Vec<Vec<String>> = Vec::with_capacity(chunk_size);
241
242 let flush_chunk = |rows: &mut Vec<Vec<String>>, headers: &[String]| -> Result<DataFrame> {
243 let mut df = DataFrame::new();
244 let num_cols = headers.len();
245 let mut col_data: Vec<Vec<String>> = vec![Vec::with_capacity(rows.len()); num_cols];
246 for row in rows.iter() {
247 for (ci, cell) in row.iter().enumerate() {
248 if ci < num_cols {
249 col_data[ci].push(cell.clone());
250 }
251 }
252 }
253 for (ci, col_name) in headers.iter().enumerate() {
254 let series = Series::new(col_data[ci].clone(), Some(col_name.clone()))
255 .map_err(|e| Error::Operation(e.to_string()))?;
256 df.add_column(col_name.clone(), series)
257 .map_err(|e| Error::Operation(e.to_string()))?;
258 }
259 rows.clear();
260 Ok(df)
261 };
262
263 for record_result in rdr.records() {
264 let record = record_result.map_err(|e| Error::CsvError(e.to_string()))?;
265 let row: Vec<String> = record.iter().map(|f| f.to_string()).collect();
266 rows.push(row);
267 if rows.len() >= chunk_size {
268 let df = flush_chunk(&mut rows, &headers)?;
269 f(df)?;
270 }
271 }
272
273 if !rows.is_empty() {
275 let df = flush_chunk(&mut rows, &headers)?;
276 f(df)?;
277 }
278
279 Ok(())
280 }
281
282 pub fn map<F>(self, f: F) -> Result<OutOfCoreWriter>
289 where
290 F: Fn(DataFrame) -> Result<DataFrame> + Send + Sync,
291 {
292 let mut chunk_input_paths: Vec<PathBuf> = Vec::new();
294 let mut chunk_index = 0usize;
295 self.for_each_chunk(|chunk_df| {
296 let path = write_chunk_file(
297 &dataframe_to_rows(&chunk_df)?.1,
298 &chunk_df.column_names(),
299 &self.config.temp_dir,
300 chunk_index,
301 )?;
302 chunk_input_paths.push(path);
303 chunk_index += 1;
304 Ok(())
305 })?;
306
307 let config = self.config.clone();
309 let results: Vec<Result<PathBuf>> = chunk_input_paths
310 .par_iter()
311 .enumerate()
312 .map(|(i, input_path)| {
313 let chunk_df = read_chunk_file(input_path)?;
314 let transformed = f(chunk_df)?;
315 let out_path = config.temp_dir.join(format!("pandrs_ooc_mapped_{}.csv", i));
316 write_dataframe_csv(&transformed, &out_path)?;
317 Ok(out_path)
318 })
319 .collect();
320
321 let mut output_chunks: Vec<PathBuf> = Vec::with_capacity(results.len());
322 for r in results {
323 output_chunks.push(r?);
324 }
325
326 for p in &chunk_input_paths {
328 let _ = std::fs::remove_file(p);
329 }
330
331 Ok(OutOfCoreWriter {
332 chunks: output_chunks,
333 config: self.config,
334 })
335 }
336
337 pub fn collect(self) -> Result<DataFrame> {
345 let mut all_dfs: Vec<DataFrame> = Vec::new();
346 self.for_each_chunk(|chunk| {
347 all_dfs.push(chunk);
348 Ok(())
349 })?;
350 concat_dataframes(all_dfs)
351 }
352
353 pub fn foreach<F>(&self, f: F) -> Result<()>
359 where
360 F: Fn(DataFrame) -> Result<()> + Send + Sync,
361 {
362 self.for_each_chunk(f)
363 }
364
365 pub fn aggregate(&self, ops: &[(&str, AggOp)]) -> Result<DataFrame> {
377 struct ColState {
378 source_col: String,
380 output_col: String,
382 sum: f64,
383 count: usize,
384 min: Option<f64>,
385 max: Option<f64>,
386 op: AggOp,
387 }
388
389 let mut states: Vec<ColState> = ops
390 .iter()
391 .map(|(col, op)| {
392 let op_str = match op {
393 AggOp::Sum => "sum",
394 AggOp::Mean => "mean",
395 AggOp::Min => "min",
396 AggOp::Max => "max",
397 AggOp::Count => "count",
398 };
399 ColState {
400 source_col: col.to_string(),
401 output_col: format!("{}_{}", col, op_str),
402 sum: 0.0,
403 count: 0,
404 min: None,
405 max: None,
406 op: *op,
407 }
408 })
409 .collect();
410
411 self.for_each_chunk(|chunk| {
412 for state in states.iter_mut() {
413 if !chunk.contains_column(&state.source_col) {
414 continue;
415 }
416 let row_count = chunk.row_count();
417 for row_idx in 0..row_count {
418 let val_str = chunk
419 .get_string_value(&state.source_col, row_idx)
420 .unwrap_or("0");
421 if let Ok(val) = val_str.parse::<f64>() {
422 state.sum += val;
423 state.count += 1;
424 state.min = Some(state.min.map_or(val, |m: f64| m.min(val)));
425 state.max = Some(state.max.map_or(val, |m: f64| m.max(val)));
426 }
427 }
428 }
429 Ok(())
430 })?;
431
432 let mut result_df = DataFrame::new();
434 for state in &states {
435 let value = match state.op {
436 AggOp::Sum => state.sum,
437 AggOp::Mean => {
438 if state.count > 0 {
439 state.sum / state.count as f64
440 } else {
441 f64::NAN
442 }
443 }
444 AggOp::Min => state.min.unwrap_or(f64::NAN),
445 AggOp::Max => state.max.unwrap_or(f64::NAN),
446 AggOp::Count => state.count as f64,
447 };
448 let series = Series::new(vec![value.to_string()], Some(state.output_col.clone()))
449 .map_err(|e| Error::Operation(e.to_string()))?;
450 result_df
451 .add_column(state.output_col.clone(), series)
452 .map_err(|e| Error::Operation(e.to_string()))?;
453 }
454 Ok(result_df)
455 }
456}
457
458pub struct OutOfCoreWriter {
465 pub(crate) chunks: Vec<PathBuf>,
467 pub(crate) config: OutOfCoreConfig,
468}
469
470impl OutOfCoreWriter {
471 pub fn write_csv(&self, path: impl AsRef<Path>) -> Result<()> {
473 let out_path = path.as_ref();
474 let out_file = File::create(out_path).map_err(|e| Error::IoError(e.to_string()))?;
475 let mut out = BufWriter::new(out_file);
476
477 let mut header_written = false;
478 for chunk_path in &self.chunks {
479 let chunk_file = File::open(chunk_path).map_err(|e| Error::IoError(e.to_string()))?;
480 let reader = BufReader::new(chunk_file);
481 let mut lines = reader.lines();
482
483 if let Some(header_result) = lines.next() {
485 let header = header_result.map_err(|e| Error::IoError(e.to_string()))?;
486 if !header_written {
487 out.write_all(header.as_bytes())
488 .map_err(|e| Error::IoError(e.to_string()))?;
489 out.write_all(b"\n")
490 .map_err(|e| Error::IoError(e.to_string()))?;
491 header_written = true;
492 }
493 }
494
495 for line_result in lines {
496 let line = line_result.map_err(|e| Error::IoError(e.to_string()))?;
497 if !line.trim().is_empty() {
498 out.write_all(line.as_bytes())
499 .map_err(|e| Error::IoError(e.to_string()))?;
500 out.write_all(b"\n")
501 .map_err(|e| Error::IoError(e.to_string()))?;
502 }
503 }
504 }
505 out.flush().map_err(|e| Error::IoError(e.to_string()))?;
506 Ok(())
507 }
508
509 pub fn write_json(&self, path: impl AsRef<Path>) -> Result<()> {
511 let df = self.collect()?;
512 crate::io::json::write_json(&df, path, crate::io::json::JsonOrient::Records)
513 }
514
515 pub fn collect(&self) -> Result<DataFrame> {
517 let mut all_dfs: Vec<DataFrame> = Vec::new();
518 for chunk_path in &self.chunks {
519 let df = read_chunk_file(chunk_path)?;
520 all_dfs.push(df);
521 }
522 concat_dataframes(all_dfs)
523 }
524}
525
526impl Drop for OutOfCoreWriter {
527 fn drop(&mut self) {
528 for p in &self.chunks {
529 let _ = std::fs::remove_file(p);
530 }
531 }
532}
533
534pub(crate) fn concat_dataframes(dfs: Vec<DataFrame>) -> Result<DataFrame> {
540 if dfs.is_empty() {
541 return Ok(DataFrame::new());
542 }
543
544 let col_names = dfs[0].column_names();
545 let total_rows: usize = dfs.iter().map(|df| df.row_count()).sum();
546
547 let mut col_data: HashMap<String, Vec<String>> = col_names
549 .iter()
550 .map(|n| (n.clone(), Vec::with_capacity(total_rows)))
551 .collect();
552
553 for df in &dfs {
554 let row_count = df.row_count();
555 for col in &col_names {
556 for row_idx in 0..row_count {
557 let val = df.get_string_value(col, row_idx).unwrap_or("").to_string();
558 if let Some(vec) = col_data.get_mut(col) {
559 vec.push(val);
560 }
561 }
562 }
563 }
564
565 let mut result = DataFrame::new();
566 for col_name in &col_names {
567 let values = col_data.remove(col_name).unwrap_or_default();
568 let series = Series::new(values, Some(col_name.clone()))
569 .map_err(|e| Error::Operation(e.to_string()))?;
570 result
571 .add_column(col_name.clone(), series)
572 .map_err(|e| Error::Operation(e.to_string()))?;
573 }
574
575 Ok(result)
576}