parallel-processor 0.2.3

Framework to manage asynchronous execution of multiple compute units communicating using messages
Documentation
use crate::buckets::bucket_writer::BucketItemSerializer;
use crate::buckets::{LockFreeBucket, MultiThreadBuckets};
use crate::memory_data_size::MemoryDataSize;
use std::path::PathBuf;

pub struct SingleBucketThreadDispatcher<'a, B: LockFreeBucket, S: BucketItemSerializer> {
    buckets: &'a MultiThreadBuckets<B>,
    bucket_index: u16,
    buffer: Vec<u8>,
    serializer: S,
}

impl<'a, B: LockFreeBucket, S: BucketItemSerializer> SingleBucketThreadDispatcher<'a, B, S> {
    pub fn new(
        buffer_size: MemoryDataSize,
        bucket_index: u16,
        buckets: &'a MultiThreadBuckets<B>,
        serializer_init_data: S::InitData,
    ) -> Self {
        let buffer = Vec::with_capacity(buffer_size.as_bytes());

        Self {
            buckets,
            bucket_index,
            buffer,
            serializer: S::new(serializer_init_data),
        }
    }

    pub fn get_bucket_index(&self) -> u16 {
        self.bucket_index
    }

    pub fn get_path(&self) -> PathBuf {
        self.buckets.get_path(self.bucket_index)
    }

    fn flush_buffer(&mut self) {
        if self.buffer.len() == 0 {
            return;
        }

        self.buckets.add_data(self.bucket_index, &self.buffer);
        self.buffer.clear();
    }

    pub fn add_element_extended(
        &mut self,
        extra_data: &S::ExtraData,
        extra_buffer: &S::ExtraDataBuffer,
        element: &S::InputElementType<'_>,
    ) {
        if self.serializer.get_size(element, extra_data) + self.buffer.len()
            > self.buffer.capacity()
        {
            self.flush_buffer();
            self.serializer.reset();
        }
        self.serializer
            .write_to(element, &mut self.buffer, extra_data, extra_buffer);
    }

    pub fn add_element(&mut self, extra_data: &S::ExtraData, element: &S::InputElementType<'_>)
    where
        S: BucketItemSerializer<ExtraDataBuffer = ()>,
    {
        self.add_element_extended(extra_data, &(), element);
    }

    pub fn finalize(self) {}
}

impl<'a, B: LockFreeBucket, S: BucketItemSerializer> Drop
    for SingleBucketThreadDispatcher<'a, B, S>
{
    fn drop(&mut self) {
        self.flush_buffer();
    }
}