parallel_processor/buckets/
single.rs

1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::{LockFreeBucket, MultiThreadBuckets};
3use crate::memory_data_size::MemoryDataSize;
4use std::path::PathBuf;
5
6pub struct SingleBucketThreadDispatcher<'a, B: LockFreeBucket, S: BucketItemSerializer> {
7    buckets: &'a MultiThreadBuckets<B>,
8    bucket_index: u16,
9    buffer: Vec<u8>,
10    serializer: S,
11}
12
13impl<'a, B: LockFreeBucket, S: BucketItemSerializer> SingleBucketThreadDispatcher<'a, B, S> {
14    pub fn new(
15        buffer_size: MemoryDataSize,
16        bucket_index: u16,
17        buckets: &'a MultiThreadBuckets<B>,
18    ) -> Self {
19        let buffer = Vec::with_capacity(buffer_size.as_bytes());
20
21        Self {
22            buckets,
23            bucket_index,
24            buffer,
25            serializer: S::new(),
26        }
27    }
28
29    pub fn get_bucket_index(&self) -> u16 {
30        self.bucket_index
31    }
32
33    pub fn get_path(&self) -> PathBuf {
34        self.buckets.get_path(self.bucket_index)
35    }
36
37    fn flush_buffer(&mut self) {
38        if self.buffer.len() == 0 {
39            return;
40        }
41
42        self.buckets.add_data(self.bucket_index, &self.buffer);
43        self.buffer.clear();
44    }
45
46    pub fn add_element_extended(
47        &mut self,
48        extra_data: &S::ExtraData,
49        extra_buffer: &S::ExtraDataBuffer,
50        element: &S::InputElementType<'_>,
51    ) {
52        if self.serializer.get_size(element, extra_data) + self.buffer.len()
53            > self.buffer.capacity()
54        {
55            self.flush_buffer();
56            self.serializer.reset();
57        }
58        self.serializer
59            .write_to(element, &mut self.buffer, extra_data, extra_buffer);
60    }
61
62    pub fn add_element(&mut self, extra_data: &S::ExtraData, element: &S::InputElementType<'_>)
63    where
64        S: BucketItemSerializer<ExtraDataBuffer = ()>,
65    {
66        self.add_element_extended(extra_data, &(), element);
67    }
68
69    pub fn finalize(self) {}
70}
71
72impl<'a, B: LockFreeBucket, S: BucketItemSerializer> Drop
73    for SingleBucketThreadDispatcher<'a, B, S>
74{
75    fn drop(&mut self) {
76        self.flush_buffer();
77    }
78}