use std::{
collections::VecDeque,
fmt::Debug,
fs::File,
};
use serde::{Deserialize, Serialize};
use crate::{
checkedfile::BatchWriter,
compression::{Compress, CompressBoxedClone},
error::SwapVecError,
swapveciter::SwapVecIter,
};
#[derive(Clone, Debug, Copy)]
pub enum CompressionLevel {
Slow,
Default,
Fast,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum Compression {
Lz4,
Deflate(CompressionLevel),
Custom(Box<dyn CompressBoxedClone>),
}
impl Clone for Compression {
fn clone(&self) -> Self {
match &self {
Self::Lz4 => Self::Lz4,
Self::Deflate(n) => Self::Deflate(*n),
Self::Custom(x) => Self::Custom(x.boxed_clone()),
}
}
}
#[derive(Debug)]
pub struct SwapVecConfig {
pub swap_after: usize,
pub batch_size: usize,
pub compression: Option<Compression>,
}
impl Default for SwapVecConfig {
fn default() -> Self {
Self {
swap_after: 32 * 1024 * 1024,
batch_size: 32 * 1024,
compression: None,
}
}
}
pub struct SwapVec<T>
where
for<'a> T: Serialize + Deserialize<'a>,
{
tempfile: Option<BatchWriter<File>>,
vector: VecDeque<T>,
config: SwapVecConfig,
}
impl<T: Serialize + for<'a> Deserialize<'a>> Default for SwapVec<T> {
fn default() -> Self {
Self {
tempfile: None,
vector: VecDeque::new(),
config: SwapVecConfig::default(),
}
}
}
impl<T: Serialize + for<'a> Deserialize<'a>> Debug for SwapVec<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SwapVec {{elements_in_ram: {}, elements_in_file: {}}}",
self.vector.len(),
self.tempfile.as_ref().map(|x| x.batch_count()).unwrap_or(0) * self.config.batch_size,
)
}
}
impl<T> SwapVec<T>
where
for<'a> T: Serialize + Deserialize<'a> + Clone,
{
pub fn with_config(config: SwapVecConfig) -> Self {
Self {
tempfile: None,
vector: VecDeque::new(),
config,
}
}
pub fn consume(&mut self, it: impl Iterator<Item = T>) -> Result<(), SwapVecError> {
for element in it {
self.push(element)?;
self.after_push_work()?;
}
Ok(())
}
pub fn push(&mut self, element: T) -> Result<(), SwapVecError> {
self.vector.push_back(element);
self.after_push_work()
}
pub fn written_to_file(&self) -> bool {
self.tempfile.is_some()
}
pub fn file_size(&self) -> Option<usize> {
self.tempfile.as_ref().map(|f| f.bytes_written())
}
pub fn batches_written(&self) -> usize {
match self.tempfile.as_ref() {
None => 0,
Some(f) => f.batch_count(),
}
}
fn after_push_work(&mut self) -> Result<(), SwapVecError> {
if self.vector.len() <= self.config.batch_size {
return Ok(());
}
if self.tempfile.is_none() && self.vector.len() <= self.config.swap_after {
return Ok(());
}
if self.tempfile.is_none() {
let tf = tempfile::Builder::new().tempfile_in(".")?.into_file();
self.tempfile = Some(BatchWriter::new(tf));
}
assert!(self.tempfile.is_some());
let batch: Vec<_> = self.vector.drain(0..self.config.batch_size).collect();
let buffer = bincode::serialize(&batch)?;
let compressed = self.config.compression.compress(buffer);
self.tempfile.as_mut().unwrap().write_batch(&compressed)?;
Ok(())
}
}
impl<T: Serialize + for<'a> Deserialize<'a> + Clone> IntoIterator for SwapVec<T> {
type Item = Result<T, SwapVecError>;
type IntoIter = SwapVecIter<T>;
fn into_iter(self) -> Self::IntoIter {
SwapVecIter::new(self.tempfile, self.vector, self.config)
}
}