use std::collections::VecDeque;
use std::fs::File;
use serde::{Deserialize, Serialize};
use crate::checkedfile::{BatchReader, BatchWriter};
use crate::compression::Compress;
use crate::error::SwapVecError;
use crate::swapvec::SwapVecConfig;
struct VecDequeIndex<T: Clone> {
value: VecDeque<T>,
}
impl<T: Clone> From<VecDeque<T>> for VecDequeIndex<T> {
fn from(value: VecDeque<T>) -> Self {
Self { value }
}
}
impl<T: Clone> VecDequeIndex<T> {
fn get(&self, i: usize) -> Option<T> {
let (a, b) = self.value.as_slices();
if i < a.len() {
a.get(i).cloned()
} else {
b.get(i - a.len()).cloned()
}
}
}
pub struct SwapVecIter<T>
where
for<'a> T: Serialize + Deserialize<'a> + Clone,
{
new_error: Option<std::io::Error>,
current_batch_rev: Vec<T>,
tempfile: Option<BatchReader<File>>,
last_elements: VecDequeIndex<T>,
last_elements_index: usize,
config: SwapVecConfig,
}
impl<T: Serialize + for<'a> Deserialize<'a> + Clone> SwapVecIter<T> {
pub(crate) fn new(
tempfile_written: Option<BatchWriter<File>>,
last_elements: VecDeque<T>,
config: SwapVecConfig,
) -> Self {
let (tempfile, new_error) = match tempfile_written.map(|v| v.try_into()) {
None => (None, None),
Some(Ok(v)) => (Some(v), None),
Some(Err(e)) => (None, Some(e)),
};
let last_elements: VecDequeIndex<_> = last_elements.into();
Self {
new_error,
current_batch_rev: Vec::with_capacity(config.batch_size),
last_elements,
last_elements_index: 0,
tempfile,
config,
}
}
fn read_batch(&mut self) -> Result<Option<Vec<T>>, SwapVecError> {
if self.tempfile.is_none() {
return Ok(None);
}
assert!(self.tempfile.is_some());
if let Some(err) = self.new_error.take() {
return Err(err.into());
}
let tempfile = self.tempfile.as_mut().unwrap();
let buffer = tempfile.read_batch()?;
if buffer.is_none() {
return Ok(None);
}
let buffer = buffer.unwrap();
let decompressed: Vec<u8> = self
.config
.compression
.decompress(buffer.to_vec())
.map_err(|_| SwapVecError::Decompression)?;
let batch: Vec<T> = bincode::deserialize(&decompressed)?;
Ok(Some(batch))
}
fn next_in_batch(&mut self) -> Result<Option<T>, SwapVecError> {
if let Some(v) = self.current_batch_rev.pop() {
return Ok(Some(v));
}
if let Some(mut new_batch) = self.read_batch()? {
new_batch.reverse();
self.current_batch_rev = new_batch;
Ok(self.current_batch_rev.pop())
} else {
Ok(None)
}
}
pub fn reset(&mut self) {
self.current_batch_rev.clear();
self.last_elements_index = 0;
if let Some(tempfile) = self.tempfile.as_mut() {
if let Err(e) = tempfile.reset() {
self.new_error = Some(e);
}
}
}
}
impl<T: Serialize + for<'a> Deserialize<'a> + Clone> Iterator for SwapVecIter<T> {
type Item = Result<T, SwapVecError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.current_batch_rev.pop() {
return Some(Ok(item));
}
match self.next_in_batch() {
Err(err) => Some(Err(err)),
Ok(Some(item)) => Some(Ok(item)),
Ok(None) => {
let index = self.last_elements_index;
self.last_elements_index += 1;
self.last_elements.get(index).map(|x| Ok(x))
}
}
}
}