use std::{
fs::OpenOptions,
io::{BufWriter, Write},
path::PathBuf,
};
use extsort::ExternalSorter;
use crate::{data, entry::SortableEntry, size::DataSize, utils::CountedWrite, Entry, Serializable};
const CHECKPOINT_WRITE_UPCOMING_WITHIN_DISTANCE: u64 = 3; const LEVELS_MINIMUM_ITEMS: u64 = 2;
pub struct Builder<K, V, KS = u16, VS = u16>
where
K: Ord + Serializable,
V: Serializable,
KS: DataSize,
VS: DataSize,
{
path: PathBuf,
log_base: f64,
extsort_segment_size: Option<usize>,
phantom: std::marker::PhantomData<(K, KS, V, VS)>,
}
impl<K, V> Builder<K, V, u16, u16>
where
K: Ord + Serializable,
V: Serializable,
{
pub fn new<P: Into<PathBuf>>(path: P) -> Builder<K, V, u16, u16> {
Self::new_sized(path)
}
}
impl<K, V, KS, VS> Builder<K, V, KS, VS>
where
K: Ord + Serializable,
V: Serializable,
KS: DataSize,
VS: DataSize,
{
pub fn new_sized<P: Into<PathBuf>>(path: P) -> Builder<K, V, KS, VS> {
Builder {
path: path.into(),
log_base: 5.0,
extsort_segment_size: None,
phantom: std::marker::PhantomData,
}
}
pub fn with_log_base(mut self, log_base: f64) -> Self {
self.log_base = log_base;
self
}
pub fn with_extsort_segment_size(mut self, max_size: usize) -> Self {
self.extsort_segment_size = Some(max_size);
self
}
pub fn build<I>(self, iter: I) -> Result<(), BuilderError>
where
I: Iterator<Item = Entry<K, V>>,
{
let sort_dir = self.sort_dir();
std::fs::create_dir_all(&sort_dir).expect("couldn't create dit");
let mut sorter = ExternalSorter::new().with_sort_dir(sort_dir);
if let Some(segment_size) = self.extsort_segment_size {
sorter = sorter.with_segment_size(segment_size);
}
let iter = iter.map(|entry| SortableEntry::<K, V, KS, VS>::new(entry));
let sorted_iter = sorter.sort(iter)?;
let sorted_count = sorted_iter.sorted_count();
let sorted_iter = sorted_iter.map(|entry| Ok(entry?.into_inner()));
self.build_from_sorted_fallible(sorted_iter, sorted_count)
}
pub fn build_from_sorted<I>(self, iter: I, nb_items: u64) -> Result<(), BuilderError>
where
I: Iterator<Item = Entry<K, V>>,
{
self.build_from_sorted_fallible(iter.map(Ok), nb_items)
}
pub fn build_from_sorted_fallible<I>(self, iter: I, nb_items: u64) -> Result<(), BuilderError>
where
I: Iterator<Item = std::io::Result<Entry<K, V>>>,
{
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(false)
.write(true)
.open(&self.path)?;
let buffered_output = BufWriter::new(file);
let mut counted_output = CountedWrite::new(buffered_output);
let mut levels = levels_for_items_count(nb_items, self.log_base);
if levels.len() > data::MAX_LEVELS {
return Err(BuilderError::MaxSize);
}
self.write_header(&mut counted_output, &levels)?;
if !levels.is_empty() {
let checkpoint_interval = levels.last().unwrap().expected_items;
let mut entries_since_last_checkpoint = 0;
let mut last_entry_position: usize = 0;
for entry in iter {
let entry = entry?;
last_entry_position = counted_output.written_count() as usize;
self.write_entry(&mut counted_output, &entry)?;
entries_since_last_checkpoint += 1;
if entries_since_last_checkpoint >= checkpoint_interval {
for level in &mut levels {
level.current_items += entries_since_last_checkpoint;
}
let current_position = counted_output.written_count() as usize;
self.write_checkpoint(
&mut counted_output,
current_position,
last_entry_position,
&mut levels,
false,
)?;
entries_since_last_checkpoint = 0;
}
}
if entries_since_last_checkpoint > 0 {
let current_position = counted_output.written_count() as usize;
self.write_checkpoint(
&mut counted_output,
current_position,
last_entry_position,
&mut levels,
true,
)?;
}
}
let _ = std::fs::remove_dir_all(self.sort_dir());
Ok(())
}
fn write_header<W: Write>(&self, output: &mut W, levels: &[Level]) -> Result<(), BuilderError> {
let seri_header = data::Header {
nb_levels: levels.len() as u8,
};
seri_header.write(output)?;
Ok(())
}
fn write_entry<W: Write>(
&self,
output: &mut W,
entry: &Entry<K, V>,
) -> Result<(), BuilderError> {
data::Entry::<K, V, KS, VS>::write(entry, output)?;
Ok(())
}
fn write_checkpoint<W: Write>(
&self,
output: &mut W,
current_position: usize,
entry_position: usize,
levels: &mut [Level],
force_all_levels: bool,
) -> Result<(), BuilderError> {
let seri_levels = levels.iter().map(|level| data::CheckpointLevel {
next_position: level.last_item_position.unwrap_or(0),
});
data::Checkpoint::write(output, entry_position, seri_levels)?;
for level in levels.iter_mut() {
if force_all_levels
|| level.current_items > level.expected_items
|| (level.expected_items - level.current_items)
<= CHECKPOINT_WRITE_UPCOMING_WITHIN_DISTANCE
{
level.last_item_position = Some(current_position);
level.current_items = 0;
}
}
Ok(())
}
fn sort_dir(&self) -> PathBuf {
self.path.with_extension("tmp_sort")
}
}
fn levels_for_items_count(nb_items: u64, log_base: f64) -> Vec<Level> {
if nb_items == 0 {
return Vec::new();
}
let nb_levels = (nb_items as f64).log(log_base).round().max(1.0) as u64;
let log_base_u64 = log_base as u64;
let mut levels = Vec::new();
let mut max_items = (nb_items / log_base_u64).max(1);
for _ in 0..nb_levels {
if !levels.is_empty() && max_items < LEVELS_MINIMUM_ITEMS {
break;
}
levels.push(Level {
expected_items: max_items,
current_items: 0,
last_item_position: None,
});
max_items /= log_base_u64;
}
levels
}
#[derive(Debug)]
struct Level {
expected_items: u64,
current_items: u64,
last_item_position: Option<usize>,
}
#[derive(Debug)]
pub enum BuilderError {
MaxSize,
InvalidItem,
Serialization(data::SerializationError),
IO(std::io::Error),
}
impl From<std::io::Error> for BuilderError {
fn from(err: std::io::Error) -> Self {
BuilderError::IO(err)
}
}
impl From<data::SerializationError> for BuilderError {
fn from(err: data::SerializationError) -> Self {
BuilderError::Serialization(err)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::TestString;
#[test]
fn calculate_levels() {
let levels = levels_for_items_count(0, 5.0);
let levels_items = levels.iter().map(|l| l.expected_items).collect::<Vec<_>>();
assert_eq!(levels_items, vec![]);
let levels = levels_for_items_count(1, 5.0);
let levels_items = levels.iter().map(|l| l.expected_items).collect::<Vec<_>>();
assert_eq!(levels_items, vec![1]);
let levels = levels_for_items_count(20, 5.0);
let levels_items = levels.iter().map(|l| l.expected_items).collect::<Vec<_>>();
assert_eq!(levels_items, vec![4]);
let levels = levels_for_items_count(100, 5.0);
let levels_items = levels.iter().map(|l| l.expected_items).collect::<Vec<_>>();
assert_eq!(levels_items, vec![20, 4]);
let levels = levels_for_items_count(1_000, 5.0);
let levels_items = levels.iter().map(|l| l.expected_items).collect::<Vec<_>>();
assert_eq!(levels_items, vec![200, 40, 8]);
let levels = levels_for_items_count(1_000_000, 10.0);
let levels_items = levels.iter().map(|l| l.expected_items).collect::<Vec<_>>();
assert_eq!(levels_items, vec![100_000, 10_000, 1000, 100, 10]);
}
#[test]
fn build_from_unsorted() {
let mut data = Vec::new();
for _i in 0..100 {
data.push(Entry::new(
TestString("ccc".to_string()),
TestString("ccc".to_string()),
));
}
let index_file = tempfile::NamedTempFile::new().unwrap();
let index_file = index_file.path();
let builder = Builder::<TestString, TestString>::new(index_file);
builder.build(data.into_iter()).unwrap();
let file_meta = std::fs::metadata(index_file).unwrap();
assert!(file_meta.len() > 10);
}
}