use std::{io, num::NonZeroUsize, path::PathBuf};
use crate::{
orderer::Orderer,
run::file_run::create_buffer_run,
tape::{compressor::CompressionCodec, TapeCollection},
};
use self::result_iter::ResultIterator;
pub mod result_iter;
#[non_exhaustive]
pub struct ExtsortConfig {
pub(crate) sort_buffer_size_bytes: usize,
pub temp_file_folder: PathBuf,
#[cfg(feature = "compression")]
pub compress_with: CompressionCodec,
}
impl Default for ExtsortConfig {
fn default() -> Self {
Self {
sort_buffer_size_bytes: 10_000_000,
temp_file_folder: PathBuf::from("/tmp"),
#[cfg(feature = "compression")]
compress_with: Default::default(),
}
}
}
impl ExtsortConfig {
fn get_num_items_for<T>(&self) -> NonZeroUsize {
let t_size = std::mem::size_of::<T>();
let one = NonZeroUsize::new(1).unwrap();
if t_size == 0 {
one
} else {
NonZeroUsize::new(self.sort_buffer_size_bytes / t_size).unwrap_or(one)
}
}
pub fn new() -> Self {
Default::default()
}
pub fn with_buffer_size(sort_buf_bytes: usize) -> Self {
ExtsortConfig {
sort_buffer_size_bytes: sort_buf_bytes,
..Default::default()
}
}
#[deprecated = "Use new() or the Default impl instead. These do not require a type annotation"]
pub fn create_with_buffer_size_for<T>(sort_buf_bytes: usize) -> Self {
ExtsortConfig {
sort_buffer_size_bytes: sort_buf_bytes,
..Default::default()
}
}
#[deprecated = "Use new() or the Default impl instead. These do not require a type annotation"]
pub fn default_for<T>() -> Self {
Default::default()
}
pub fn temp_file_folder(self, folder: impl Into<PathBuf>) -> Self {
Self {
temp_file_folder: folder.into(),
..self
}
}
#[cfg(feature = "compression_lz4_flex")]
pub fn compress_lz4_flex(mut self) -> Self {
self.compress_with = CompressionCodec::Lz4Flex;
self
}
pub fn sort_buffer_size(mut self, new_size: usize) -> Self {
self.sort_buffer_size_bytes = new_size;
self
}
fn compression_choice(&self) -> CompressionCodec {
#[cfg(feature = "compression")]
{
self.compress_with
}
#[cfg(not(feature = "compression"))]
{
CompressionCodec::NoCompression
}
}
}
pub struct ExtSorter {
config: ExtsortConfig,
}
impl ExtSorter {
pub fn new(options: ExtsortConfig) -> Self {
Self { config: options }
}
pub fn run<'a, S, T, O, F>(
self,
mut source: S,
orderer: O,
mut buffer_sort: F,
) -> io::Result<ResultIterator<T, O>>
where
S: Iterator<Item = T>,
O: Orderer<T>,
T: 'a,
F: FnMut(&O, &mut [T]),
{
let max_buffer_size_nonzero = self.config.get_num_items_for::<T>();
let max_buffer_size = max_buffer_size_nonzero.get();
let mut sort_buffer = Vec::with_capacity(max_buffer_size);
let compression_choice = self.config.compression_choice();
let mut tape_collection = TapeCollection::<T>::new(
self.config.temp_file_folder,
NonZeroUsize::new(256).unwrap(),
compression_choice,
);
let source = &mut source;
loop {
sort_buffer.extend(source.take(max_buffer_size));
buffer_sort(&orderer, &mut sort_buffer);
if sort_buffer.len() < max_buffer_size {
if tape_collection.is_empty() {
let buffer_run = create_buffer_run(sort_buffer);
return Ok(ResultIterator::new(vec![buffer_run], orderer));
} else if !sort_buffer.is_empty() {
tape_collection.add_run(&mut sort_buffer)?;
}
break;
} else {
tape_collection.add_run(&mut sort_buffer)?;
}
}
debug_assert!(sort_buffer.is_empty());
drop(sort_buffer);
let tapes = tape_collection.into_tapes(max_buffer_size_nonzero);
Ok(ResultIterator::new(tapes, orderer))
}
}