use super::{DiskDeserializer, DiskSerializer};
use byteorder::{ReadBytesExt, WriteBytesExt};
use lz4::{Decoder, Encoder, EncoderBuilder};
use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use std::{
error::Error,
fmt::{self, Display},
fs::File,
io::{self, Read, Seek, Write},
marker::PhantomData,
};
#[derive(Debug)]
pub enum ExternalChunkError {
IO(io::Error),
EncodeError(rkyv::rancor::Error),
DecodeError(rkyv::rancor::Error),
}
impl From<io::Error> for ExternalChunkError {
fn from(err: io::Error) -> Self {
ExternalChunkError::IO(err)
}
}
impl From<rkyv::rancor::Error> for ExternalChunkError {
fn from(err: rkyv::rancor::Error) -> Self {
ExternalChunkError::EncodeError(err)
}
}
impl Error for ExternalChunkError {}
impl Display for ExternalChunkError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExternalChunkError::IO(err) => write!(f, "{}", err),
ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
}
}
}
pub struct ExternalChunk<T> {
reader: Decoder<File>,
read_buf: Vec<u8>,
item_type: PhantomData<T>,
}
impl<T> ExternalChunk<T>
where
T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
T::Archived: RkyvDeserialize<T, DiskDeserializer>,
{
pub(crate) fn new(
file: File,
items: impl IntoIterator<Item = T>,
compression: u32,
) -> Result<Self, ExternalChunkError> {
let mut builder = ExternalChunkBuilder::new(file, compression)?;
for item in items.into_iter() {
builder.add(item)?;
}
builder.finish()
}
pub fn open(file: File) -> Result<Self, ExternalChunkError> {
let reader = Decoder::new(file)?;
Ok(Self {
reader,
read_buf: Vec::new(),
item_type: PhantomData,
})
}
pub fn into_inner(self) -> File {
self.reader.finish().0
}
}
impl<T> Iterator for ExternalChunk<T>
where
T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
T::Archived: RkyvDeserialize<T, DiskDeserializer>,
{
type Item = Result<T, ExternalChunkError>;
fn next(&mut self) -> Option<Self::Item> {
match self.reader.read_u64::<byteorder::LittleEndian>() {
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => None,
_ => Some(Err(ExternalChunkError::IO(err))),
},
Ok(length) => {
self.read_buf.resize(length as usize, 0);
if let Err(err) = self.reader.read_exact(&mut self.read_buf) {
return Some(Err(ExternalChunkError::IO(err)));
} else {
match unsafe {
rkyv::from_bytes_unchecked::<T, rkyv::rancor::Error>(&self.read_buf)
} {
Err(err) => Some(Err(ExternalChunkError::DecodeError(err))),
Ok(ser) => Some(Ok(ser)),
}
}
}
}
}
}
pub struct ExternalChunkBuilder<T> {
writer: Encoder<File>,
write_buf: rkyv::util::AlignedVec,
item_type: PhantomData<T>,
}
impl<T> ExternalChunkBuilder<T>
where
T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
T::Archived: RkyvDeserialize<T, DiskDeserializer>,
{
pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
let writer = EncoderBuilder::new().level(compression).build(file)?;
Ok(Self {
writer,
write_buf: rkyv::util::AlignedVec::new(),
item_type: PhantomData,
})
}
pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
let mut buf = std::mem::take(&mut self.write_buf);
buf.clear();
buf = rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&item, buf)
.map_err(ExternalChunkError::EncodeError)?;
self.writer
.write_u64::<byteorder::LittleEndian>(buf.len() as u64)?;
self.writer.write_all(&buf)?;
self.write_buf = buf;
Ok(())
}
pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
let mut file = self.writer.finish().0;
file.rewind()?;
let reader = Decoder::new(file)?;
Ok(ExternalChunk {
reader,
read_buf: Vec::new(),
item_type: PhantomData,
})
}
}
#[cfg(test)]
mod test {
use rstest::*;
use super::ExternalChunk;
#[fixture]
fn tmp_dir() -> tempfile::TempDir {
tempfile::tempdir_in("./").unwrap()
}
#[rstest]
fn test_chunk(tmp_dir: tempfile::TempDir) {
let saved = Vec::from_iter(0..100);
let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(restored, saved);
let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(restored, saved);
}
}