parallel_processor/buckets/
single.rs

1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::{LockFreeBucket, MultiThreadBuckets};
3use crate::memory_data_size::MemoryDataSize;
4use std::path::PathBuf;
5
6pub struct SingleBucketThreadDispatcher<'a, B: LockFreeBucket, S: BucketItemSerializer> {
7    buckets: &'a MultiThreadBuckets<B>,
8    bucket_index: u16,
9    buffer: Vec<u8>,
10    serializer: S,
11}
12
13impl<'a, B: LockFreeBucket, S: BucketItemSerializer> SingleBucketThreadDispatcher<'a, B, S> {
14    pub fn new(
15        buffer_size: MemoryDataSize,
16        bucket_index: u16,
17        buckets: &'a MultiThreadBuckets<B>,
18        serializer_init_data: S::InitData,
19    ) -> Self {
20        let buffer = Vec::with_capacity(buffer_size.as_bytes());
21
22        Self {
23            buckets,
24            bucket_index,
25            buffer,
26            serializer: S::new(serializer_init_data),
27        }
28    }
29
30    pub fn get_bucket_index(&self) -> u16 {
31        self.bucket_index
32    }
33
34    pub fn get_path(&self) -> PathBuf {
35        self.buckets.get_path(self.bucket_index)
36    }
37
38    fn flush_buffer(&mut self) {
39        if self.buffer.len() == 0 {
40            return;
41        }
42
43        self.buckets.add_data(self.bucket_index, &self.buffer);
44        self.buffer.clear();
45    }
46
47    pub fn add_element_extended(
48        &mut self,
49        extra_data: &S::ExtraData,
50        extra_buffer: &S::ExtraDataBuffer,
51        element: &S::InputElementType<'_>,
52    ) {
53        if self.serializer.get_size(element, extra_data) + self.buffer.len()
54            > self.buffer.capacity()
55        {
56            self.flush_buffer();
57            self.serializer.reset();
58        }
59        self.serializer
60            .write_to(element, &mut self.buffer, extra_data, extra_buffer);
61    }
62
63    pub fn add_element(&mut self, extra_data: &S::ExtraData, element: &S::InputElementType<'_>)
64    where
65        S: BucketItemSerializer<ExtraDataBuffer = ()>,
66    {
67        self.add_element_extended(extra_data, &(), element);
68    }
69
70    pub fn finalize(self) {}
71}
72
73impl<'a, B: LockFreeBucket, S: BucketItemSerializer> Drop
74    for SingleBucketThreadDispatcher<'a, B, S>
75{
76    fn drop(&mut self) {
77        self.flush_buffer();
78    }
79}