parallel-processor 0.2.3

Framework to manage asynchronous execution of multiple compute units communicating using messages
Documentation
use crate::buckets::readers::compressed_decoder::CompressedStreamDecoder;
use crate::buckets::readers::lock_free_decoder::LockFreeStreamDecoder;
use crate::buckets::writers::{BucketCheckpoints, BucketHeader};
use crate::memory_fs::file::reader::{FileRangeReference, FileReader};
use crate::memory_fs::{MemoryFs, RemoveFileMode};
use crate::DEFAULT_BINCODE_CONFIG;
use bincode::Decode;
use desse::Desse;
use desse::DesseSized;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::Arc;

pub trait ChunkDecoder {
    const MAGIC_HEADER: &'static [u8; 16];
    type ReaderType: Read;
    fn decode_stream(reader: FileReader, size: u64) -> Self::ReaderType;
    fn dispose_stream(stream: Self::ReaderType) -> FileReader;
}

pub(crate) struct RemoveFileGuard {
    path: PathBuf,
    remove_mode: RemoveFileMode,
}

impl Drop for RemoveFileGuard {
    fn drop(&mut self) {
        MemoryFs::remove_file(&self.path, self.remove_mode).unwrap();
    }
}

#[derive(Clone)]
pub struct BinaryReaderChunk {
    pub(crate) reader: FileReader,
    pub(crate) length: u64,
    pub(crate) remove_guard: Arc<RemoveFileGuard>,
    pub(crate) extra_data: Option<Vec<u8>>,
    pub(crate) decoder_type: DecoderType,
}

impl Debug for BinaryReaderChunk {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("BinaryReaderChunk")
            .field("length", &self.length)
            .field("extra_data", &self.extra_data)
            .field("decoder_type", &self.decoder_type)
            .field("file_name", &self.reader.get_file_path().display())
            .finish()
    }
}

impl BinaryReaderChunk {
    pub fn get_extra_data<E: Decode<()>>(&self) -> Option<E> {
        self.extra_data.as_ref().map(|data| {
            bincode::decode_from_slice(&data, DEFAULT_BINCODE_CONFIG)
                .unwrap()
                .0
        })
    }

    pub fn get_decoder_type(&self) -> DecoderType {
        self.decoder_type
    }

    pub fn get_unique_file_id(&self) -> usize {
        self.reader.get_unique_file_id()
    }

    pub fn get_length(&self) -> u64 {
        self.length
    }
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum DecoderType {
    LockFree,
    Compressed,
}

pub struct ChunkedBinaryReaderIndex {
    file_path: PathBuf,
    chunks: Vec<BinaryReaderChunk>,
    format_data_info: Vec<u8>,
    file_size: u64,
}

pub enum DecodeItemsStatus<T> {
    Decompressed,
    Passtrough {
        file_range: FileRangeReference,
        data: Option<T>,
    },
}

impl ChunkedBinaryReaderIndex {
    fn get_chunk_size(
        checkpoints: &BucketCheckpoints,
        last_byte_position: u64,
        index: usize,
    ) -> u64 {
        if checkpoints.index.len() > (index + 1) as usize {
            checkpoints.index[(index + 1) as usize].offset
                - checkpoints.index[index as usize].offset
        } else {
            last_byte_position - checkpoints.index[index as usize].offset
        }
    }

    pub fn get_file_size(&self) -> u64 {
        self.file_size
    }

    pub fn from_file(
        name: impl AsRef<Path>,
        remove_file: RemoveFileMode,
        prefetch_amount: Option<usize>,
    ) -> Self {
        let mut file = FileReader::open(&name, prefetch_amount)
            .unwrap_or_else(|| panic!("Cannot open file {}", name.as_ref().display()));

        let mut header_buffer = [0; BucketHeader::SIZE];
        file.read_exact(&mut header_buffer)
            .unwrap_or_else(|_| panic!("File {} is corrupted", name.as_ref().display()));

        let header: BucketHeader = BucketHeader::deserialize_from(&header_buffer);

        file.seek(SeekFrom::Start(header.index_offset)).unwrap();
        let index: BucketCheckpoints =
            bincode::decode_from_std_read(&mut file, DEFAULT_BINCODE_CONFIG).unwrap();

        let remove_guard = Arc::new(RemoveFileGuard {
            path: name.as_ref().to_path_buf(),
            remove_mode: remove_file,
        });

        let mut chunks = Vec::with_capacity(index.index.len());

        let decoder_type = match &header.magic {
            LockFreeStreamDecoder::MAGIC_HEADER => DecoderType::LockFree,
            CompressedStreamDecoder::MAGIC_HEADER => DecoderType::Compressed,
            _ => panic!(
                "Invalid decode bucket header magic: {:?}. This is a bug",
                header.magic
            ),
        };

        if index.index.len() > 0 {
            file.seek(SeekFrom::Start(index.index[0].offset)).unwrap();

            for (chunk_idx, chunk) in index.index.iter().enumerate() {
                let length = Self::get_chunk_size(&index, header.index_offset, chunk_idx);

                chunks.push(BinaryReaderChunk {
                    reader: file.clone(),
                    length,
                    remove_guard: remove_guard.clone(),
                    extra_data: chunk.data.clone(),
                    decoder_type,
                });

                file.seek(SeekFrom::Current(length as i64)).unwrap();
            }
        }

        Self {
            file_path: name.as_ref().to_path_buf(),
            chunks,
            format_data_info: header.data_format_info.to_vec(),
            file_size: MemoryFs::get_file_size(name).unwrap() as u64,
        }
    }

    pub fn get_data_format_info<T: Decode<()>>(&self) -> T {
        bincode::decode_from_slice(&self.format_data_info, DEFAULT_BINCODE_CONFIG)
            .unwrap()
            .0
    }

    pub fn get_path(&self) -> &Path {
        &self.file_path
    }

    pub fn into_chunks(self) -> Vec<BinaryReaderChunk> {
        self.chunks
    }

    pub fn into_parallel_chunks(self) -> Mutex<VecDeque<BinaryReaderChunk>> {
        Mutex::new(self.chunks.into())
    }
}

pub struct BinaryChunkReader<D: ChunkDecoder> {
    reader: D::ReaderType,
    _remove_guard: Arc<RemoveFileGuard>,
}

impl<D: ChunkDecoder> BinaryChunkReader<D> {
    pub fn new(chunk: BinaryReaderChunk) -> Self {
        Self {
            reader: D::decode_stream(chunk.reader, chunk.length),
            _remove_guard: chunk.remove_guard,
        }
    }
}

impl<D: ChunkDecoder> Read for BinaryChunkReader<D> {
    #[inline(always)]
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        self.reader.read(buf)
    }
}