bed-utils 0.11.0

Utilities for manipulating genomic range objects
Documentation
//! External chunk.

use super::{DiskDeserializer, DiskSerializer};
use byteorder::{ReadBytesExt, WriteBytesExt};
use lz4::{Decoder, Encoder, EncoderBuilder};
use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use std::{
    error::Error,
    fmt::{self, Display},
    fs::File,
    io::{self, Read, Seek, Write},
    marker::PhantomData,
};

/// External chunk error
#[derive(Debug)]
pub enum ExternalChunkError {
    /// Common I/O error.
    IO(io::Error),
    /// Data serialization error.
    EncodeError(rkyv::rancor::Error),
    DecodeError(rkyv::rancor::Error),
}

impl From<io::Error> for ExternalChunkError {
    fn from(err: io::Error) -> Self {
        ExternalChunkError::IO(err)
    }
}

impl From<rkyv::rancor::Error> for ExternalChunkError {
    fn from(err: rkyv::rancor::Error) -> Self {
        ExternalChunkError::EncodeError(err)
    }
}

impl Error for ExternalChunkError {}

impl Display for ExternalChunkError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ExternalChunkError::IO(err) => write!(f, "{}", err),
            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
            ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
        }
    }
}

/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
pub struct ExternalChunk<T> {
    reader: Decoder<File>,
    read_buf: Vec<u8>,
    item_type: PhantomData<T>,
}

impl<T> ExternalChunk<T>
where
    T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
    T::Archived: RkyvDeserialize<T, DiskDeserializer>,
{
    /// Builds an instance of an external chunk creating file and dumping the items to it.
    ///
    /// # Arguments
    /// * `dir` - Directory the chunk file is created in
    /// * `items` - Items to be dumped to the chunk
    /// * `buf_size` - File I/O buffer size
    pub(crate) fn new(
        file: File,
        items: impl IntoIterator<Item = T>,
        compression: u32,
    ) -> Result<Self, ExternalChunkError> {
        let mut builder = ExternalChunkBuilder::new(file, compression)?;
        for item in items.into_iter() {
            builder.add(item)?;
        }
        builder.finish()
    }

    ///  Open an existing chunk from file.
    pub fn open(file: File) -> Result<Self, ExternalChunkError> {
        let reader = Decoder::new(file)?;
        Ok(Self {
            reader,
            read_buf: Vec::new(),
            item_type: PhantomData,
        })
    }

    pub fn into_inner(self) -> File {
        self.reader.finish().0
    }
}

impl<T> Iterator for ExternalChunk<T>
where
    T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
    T::Archived: RkyvDeserialize<T, DiskDeserializer>,
{
    type Item = Result<T, ExternalChunkError>;

    fn next(&mut self) -> Option<Self::Item> {
        match self.reader.read_u64::<byteorder::LittleEndian>() {
            Err(err) => match err.kind() {
                std::io::ErrorKind::UnexpectedEof => None,
                _ => Some(Err(ExternalChunkError::IO(err))),
            },
            Ok(length) => {
                self.read_buf.resize(length as usize, 0);
                if let Err(err) = self.reader.read_exact(&mut self.read_buf) {
                    return Some(Err(ExternalChunkError::IO(err)));
                } else {
                    // Bytes are produced by rkyv in `add`, so unchecked read is valid here.
                    match unsafe {
                        rkyv::from_bytes_unchecked::<T, rkyv::rancor::Error>(&self.read_buf)
                    } {
                        Err(err) => Some(Err(ExternalChunkError::DecodeError(err))),
                        Ok(ser) => Some(Ok(ser)),
                    }
                }
            }
        }
    }
}

pub struct ExternalChunkBuilder<T> {
    writer: Encoder<File>,
    write_buf: rkyv::util::AlignedVec,
    item_type: PhantomData<T>,
}

impl<T> ExternalChunkBuilder<T>
where
    T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
    T::Archived: RkyvDeserialize<T, DiskDeserializer>,
{
    pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
        let writer = EncoderBuilder::new().level(compression).build(file)?;
        Ok(Self {
            writer,
            write_buf: rkyv::util::AlignedVec::new(),
            item_type: PhantomData,
        })
    }

    pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
        // Reuse the write buffer across calls to avoid a heap allocation per item.
        let mut buf = std::mem::take(&mut self.write_buf);
        buf.clear();
        buf = rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&item, buf)
            .map_err(ExternalChunkError::EncodeError)?;
        self.writer
            .write_u64::<byteorder::LittleEndian>(buf.len() as u64)?;
        self.writer.write_all(&buf)?;
        self.write_buf = buf;
        Ok(())
    }

    pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
        let mut file = self.writer.finish().0;
        file.rewind()?;
        let reader = Decoder::new(file)?;

        Ok(ExternalChunk {
            reader,
            read_buf: Vec::new(),
            item_type: PhantomData,
        })
    }
}

#[cfg(test)]
mod test {
    use rstest::*;

    use super::ExternalChunk;

    #[fixture]
    fn tmp_dir() -> tempfile::TempDir {
        tempfile::tempdir_in("./").unwrap()
    }

    #[rstest]
    fn test_chunk(tmp_dir: tempfile::TempDir) {
        let saved = Vec::from_iter(0..100);

        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
        assert_eq!(restored, saved);

        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
        assert_eq!(restored, saved);
    }
}