use std::{
cmp::Ordering,
collections::VecDeque,
fs::{File, OpenOptions},
io::{BufWriter, Error},
path::PathBuf,
};
use rayon::slice::ParallelSliceMut;
use crate::{ExternalSorterOptions, Sortable, SortedIterator};
pub struct PushExternalSorter<T, F>
where
T: Sortable,
F: Fn(&T, &T) -> Ordering + Send + Sync + Clone,
{
options: ExternalSorterOptions,
tempdir: Option<tempfile::TempDir>,
count: u64,
segment_files: Vec<File>,
buffer: Vec<T>,
cmp: F,
}
impl<T, F> PushExternalSorter<T, F>
where
T: Sortable,
F: Fn(&T, &T) -> Ordering + Send + Sync + Clone,
{
pub(crate) fn new(options: crate::ExternalSorterOptions, cmp: F) -> PushExternalSorter<T, F> {
PushExternalSorter {
options,
tempdir: None,
count: 0,
segment_files: Vec::new(),
buffer: Vec::new(),
cmp,
}
}
pub fn push_iter<I>(&mut self, iterator: I) -> Result<(), Error>
where
I: IntoIterator<Item = T>,
{
for next_item in iterator.into_iter() {
self.push(next_item)?;
}
Ok(())
}
pub fn push(&mut self, item: T) -> Result<(), Error> {
self.buffer.push(item);
self.count += 1;
if self.buffer.len() > self.options.segment_size {
self.sort_and_write_segment()?;
}
Ok(())
}
pub fn done(mut self) -> Result<SortedIterator<T, F>, Error> {
let pass_through_queue = if !self.buffer.is_empty() && !self.segment_files.is_empty() {
self.sort_and_write_segment()?;
None
} else {
let cmp = self.cmp.clone();
self.buffer.sort_unstable_by(cmp);
Some(VecDeque::from(self.buffer))
};
SortedIterator::new(
self.tempdir,
pass_through_queue,
self.segment_files,
self.count,
self.cmp,
self.options.clone(),
)
}
fn sort_and_write_segment(&mut self) -> Result<(), Error> {
let cmp = self.cmp.clone();
if self.options.parallel {
self.buffer.par_sort_unstable_by(|a, b| cmp(a, b));
} else {
self.buffer.sort_unstable_by(|a, b| cmp(a, b));
}
let sort_dir = self.get_sort_dir()?;
let segment_path = sort_dir.join(format!("{}", self.segment_files.len()));
let segment_file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(segment_path)?;
let mut buf_writer = BufWriter::new(segment_file);
for item in self.buffer.drain(0..) {
item.encode(&mut buf_writer)?;
}
let file = buf_writer.into_inner()?;
self.segment_files.push(file);
Ok(())
}
fn get_sort_dir(&mut self) -> Result<PathBuf, Error> {
if let Some(sort_dir) = self.options.sort_dir.as_ref() {
return Ok(sort_dir.clone());
}
self.options.sort_dir = if let Some(ref sort_dir) = self.options.sort_dir {
Some(sort_dir.to_path_buf())
} else {
self.tempdir = Some(tempfile::TempDir::new()?);
Some(self.tempdir.as_ref().unwrap().path().to_path_buf())
};
Ok(self.options.sort_dir.as_ref().unwrap().clone())
}
}