use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use csv::{ReaderBuilder, WriterBuilder};
use crate::core::error::{Error, Result};
use crate::large::out_of_core::OutOfCoreConfig;
pub fn external_sort(
input_path: &str,
output_path: &str,
sort_column: &str,
ascending: bool,
config: &OutOfCoreConfig,
) -> Result<()> {
let run_paths = create_sorted_runs(input_path, sort_column, ascending, config)?;
merge_sorted_chunks(&run_paths, output_path, sort_column, ascending)?;
for p in &run_paths {
let _ = std::fs::remove_file(p);
}
Ok(())
}
fn create_sorted_runs(
input_path: &str,
sort_column: &str,
ascending: bool,
config: &OutOfCoreConfig,
) -> Result<Vec<PathBuf>> {
let file = File::open(input_path).map_err(|e| Error::IoError(e.to_string()))?;
let mut rdr = ReaderBuilder::new()
.has_headers(true)
.flexible(true)
.trim(csv::Trim::All)
.from_reader(BufReader::new(file));
let headers: Vec<String> = rdr
.headers()
.map_err(|e| Error::CsvError(e.to_string()))?
.iter()
.map(|h| h.to_string())
.collect();
let sort_col_idx = headers
.iter()
.position(|h| h == sort_column)
.ok_or_else(|| {
Error::Column(format!(
"Sort column '{}' not found in headers",
sort_column
))
})?;
let chunk_size = config.chunk_size;
let temp_dir = &config.temp_dir;
let mut run_index = 0usize;
let mut run_paths: Vec<PathBuf> = Vec::new();
let mut current_rows: Vec<Vec<String>> = Vec::with_capacity(chunk_size);
let flush_run = |rows: &mut Vec<Vec<String>>,
headers: &[String],
sort_idx: usize,
asc: bool,
dir: &Path,
idx: &mut usize|
-> Result<PathBuf> {
rows.sort_by(|a, b| {
let va = a.get(sort_idx).map(|s| s.as_str()).unwrap_or("");
let vb = b.get(sort_idx).map(|s| s.as_str()).unwrap_or("");
let ord = compare_values(va, vb);
if asc {
ord
} else {
ord.reverse()
}
});
let path = dir.join(format!("pandrs_sort_run_{}.csv", *idx));
let f = File::create(&path).map_err(|e| Error::IoError(e.to_string()))?;
let mut wtr = WriterBuilder::new().from_writer(BufWriter::new(f));
wtr.write_record(headers)
.map_err(|e| Error::CsvError(e.to_string()))?;
for row in rows.iter() {
wtr.write_record(row)
.map_err(|e| Error::CsvError(e.to_string()))?;
}
wtr.flush().map_err(|e| Error::IoError(e.to_string()))?;
rows.clear();
*idx += 1;
Ok(path)
};
for record_result in rdr.records() {
let record = record_result.map_err(|e| Error::CsvError(e.to_string()))?;
let row: Vec<String> = record.iter().map(|f| f.to_string()).collect();
current_rows.push(row);
if current_rows.len() >= chunk_size {
let p = flush_run(
&mut current_rows,
&headers,
sort_col_idx,
ascending,
temp_dir,
&mut run_index,
)?;
run_paths.push(p);
}
}
if !current_rows.is_empty() {
let p = flush_run(
&mut current_rows,
&headers,
sort_col_idx,
ascending,
temp_dir,
&mut run_index,
)?;
run_paths.push(p);
}
Ok(run_paths)
}
struct HeapItem {
key: String,
row: Vec<String>,
run_index: usize,
ascending: bool,
}
impl PartialEq for HeapItem {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl Eq for HeapItem {}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> Ordering {
let ord = compare_values(&self.key, &other.key);
let ord = if self.ascending { ord.reverse() } else { ord };
ord
}
}
fn compare_values(a: &str, b: &str) -> Ordering {
match (a.parse::<f64>(), b.parse::<f64>()) {
(Ok(fa), Ok(fb)) => fa.partial_cmp(&fb).unwrap_or(Ordering::Equal),
_ => a.cmp(b),
}
}
pub fn merge_sorted_chunks(
chunk_paths: &[PathBuf],
output_path: &str,
sort_column: &str,
ascending: bool,
) -> Result<()> {
if chunk_paths.is_empty() {
File::create(output_path).map_err(|e| Error::IoError(e.to_string()))?;
return Ok(());
}
let mut readers: Vec<csv::Reader<BufReader<File>>> = Vec::with_capacity(chunk_paths.len());
let mut headers_per_run: Vec<Vec<String>> = Vec::with_capacity(chunk_paths.len());
for path in chunk_paths {
let f = File::open(path).map_err(|e| Error::IoError(e.to_string()))?;
let mut rdr = ReaderBuilder::new()
.has_headers(true)
.flexible(true)
.from_reader(BufReader::new(f));
let hdrs: Vec<String> = rdr
.headers()
.map_err(|e| Error::CsvError(e.to_string()))?
.iter()
.map(|h| h.to_string())
.collect();
headers_per_run.push(hdrs);
readers.push(rdr);
}
let headers = headers_per_run[0].clone();
let sort_col_idx = headers
.iter()
.position(|h| h == sort_column)
.ok_or_else(|| {
Error::Column(format!(
"Sort column '{}' not found during merge",
sort_column
))
})?;
let out_file = File::create(output_path).map_err(|e| Error::IoError(e.to_string()))?;
let mut wtr = WriterBuilder::new().from_writer(BufWriter::new(out_file));
wtr.write_record(&headers)
.map_err(|e| Error::CsvError(e.to_string()))?;
let mut record_iters: Vec<csv::StringRecordsIntoIter<BufReader<File>>> =
readers.into_iter().map(|r| r.into_records()).collect();
let mut heap: BinaryHeap<HeapItem> = BinaryHeap::new();
for (run_idx, iter) in record_iters.iter_mut().enumerate() {
if let Some(result) = iter.next() {
let record = result.map_err(|e| Error::CsvError(e.to_string()))?;
let row: Vec<String> = record.iter().map(|f| f.to_string()).collect();
let key = row.get(sort_col_idx).cloned().unwrap_or_default();
heap.push(HeapItem {
key,
row,
run_index: run_idx,
ascending,
});
}
}
while let Some(item) = heap.pop() {
wtr.write_record(&item.row)
.map_err(|e| Error::CsvError(e.to_string()))?;
let run_idx = item.run_index;
if let Some(result) = record_iters[run_idx].next() {
let record = result.map_err(|e| Error::CsvError(e.to_string()))?;
let row: Vec<String> = record.iter().map(|f| f.to_string()).collect();
let key = row.get(sort_col_idx).cloned().unwrap_or_default();
heap.push(HeapItem {
key,
row,
run_index: run_idx,
ascending,
});
}
}
wtr.flush().map_err(|e| Error::IoError(e.to_string()))?;
Ok(())
}