use std::{
borrow::Cow,
fs::File,
io::{BufReader, BufWriter, Seek, SeekFrom, Write},
path::Path,
sync::Arc,
};
use bincode::Options;
use serde::{Serialize, de::DeserializeOwned};
use tempfile::tempfile_in;
pub(crate) enum FileOrMemBuf<T> {
ChunkedTmpFile { write: BufWriter<Arc<File>> },
Memory { data: Vec<T> },
}
pub(crate) enum Iter<'a, T> {
ChunkedTmpFile {
read: BufReader<Arc<File>>,
chunk_iter: std::vec::IntoIter<T>,
},
Memory {
iter: std::slice::Iter<'a, T>,
},
}
pub(crate) enum ChunkIter<'a, T> {
ChunkedTmpFile { read: BufReader<Arc<File>> },
Memory { iter: std::slice::Chunks<'a, T> },
}
impl<T> FileOrMemBuf<T> {
pub(crate) fn new(dir: Option<&Path>, capacity: usize) -> std::io::Result<Self> {
if let Some(dir) = dir {
let f = Arc::new(tempfile_in(dir)?);
let write = BufWriter::new(Arc::clone(&f));
Ok(Self::ChunkedTmpFile { write })
} else {
Ok(Self::Memory {
data: Vec::with_capacity(capacity),
})
}
}
pub(crate) fn iter(&mut self) -> std::io::Result<Iter<'_, T>> {
match self {
FileOrMemBuf::ChunkedTmpFile { write } => {
write.flush()?;
let mut file = Arc::clone(write.get_ref());
file.rewind()?;
let read = BufReader::new(file);
Ok(Iter::ChunkedTmpFile {
read,
chunk_iter: Default::default(),
})
}
FileOrMemBuf::Memory { data } => Ok(Iter::Memory { iter: data.iter() }),
}
}
pub(crate) fn chunks(&mut self, size: usize) -> std::io::Result<ChunkIter<'_, T>> {
match self {
FileOrMemBuf::ChunkedTmpFile { write } => {
write.flush()?;
let mut file = Arc::clone(write.get_ref());
file.rewind()?;
let read = BufReader::new(file);
Ok(ChunkIter::ChunkedTmpFile { read })
}
FileOrMemBuf::Memory { data } => Ok(ChunkIter::Memory {
iter: data.chunks(size),
}),
}
}
fn bincode() -> impl bincode::Options {
bincode::options().allow_trailing_bytes()
}
}
impl<T> Default for FileOrMemBuf<T> {
fn default() -> Self {
Self::Memory { data: vec![] }
}
}
impl<T: Serialize + Clone> FileOrMemBuf<T> {
pub(crate) fn write_chunk(&mut self, chunk: &[T]) -> bincode::Result<()> {
match self {
FileOrMemBuf::ChunkedTmpFile { write, .. } => {
let opts = Self::bincode();
opts.serialize_into(write, chunk)?;
}
FileOrMemBuf::Memory { data } => {
data.extend_from_slice(chunk);
}
}
Ok(())
}
}
impl<'a, T: DeserializeOwned + Clone> Iterator for Iter<'a, T> {
type Item = bincode::Result<T>;
fn next(&mut self) -> Option<Self::Item> {
match self {
Iter::ChunkedTmpFile { read, chunk_iter } => {
if let Some(gate) = chunk_iter.next() {
return Some(Ok(gate));
}
let opts = FileOrMemBuf::<T>::bincode();
match opts.deserialize_from::<_, Vec<T>>(read) {
Ok(chunk) => {
*chunk_iter = chunk.into_iter();
self.next()
}
Err(err) => {
if let bincode::ErrorKind::Io(io) = &*err
&& std::io::ErrorKind::UnexpectedEof == io.kind()
{
return None;
}
Some(Err(err))
}
}
}
Iter::Memory { iter } => iter.next().map(|e| Ok(e.clone())),
}
}
}
impl<'a, T> Drop for Iter<'a, T> {
fn drop(&mut self) {
if let Self::ChunkedTmpFile { read, .. } = self {
read.get_mut()
.seek(SeekFrom::End(0))
.expect("unable to reset seek position");
}
}
}
impl<'a, T: DeserializeOwned + Clone> Iterator for ChunkIter<'a, T> {
type Item = bincode::Result<Cow<'a, [T]>>;
fn next(&mut self) -> Option<Self::Item> {
match self {
ChunkIter::ChunkedTmpFile { read } => {
let opts = FileOrMemBuf::<T>::bincode();
match opts.deserialize_from::<_, Vec<T>>(read) {
Ok(chunk) => Some(Ok(Cow::Owned(chunk))),
Err(err) => {
if let bincode::ErrorKind::Io(io) = &*err
&& std::io::ErrorKind::UnexpectedEof == io.kind()
{
return None;
}
Some(Err(err))
}
}
}
ChunkIter::Memory { iter } => iter.next().map(|e| Ok(Cow::Borrowed(e))),
}
}
}
impl<'a, T> Drop for ChunkIter<'a, T> {
fn drop(&mut self) {
if let Self::ChunkedTmpFile { read, .. } = self {
read.get_mut()
.seek(SeekFrom::End(0))
.expect("unable to reset seek position");
}
}
}