parallel_processor/buckets/
single.rs1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::{LockFreeBucket, MultiThreadBuckets};
3use crate::memory_data_size::MemoryDataSize;
4use std::path::PathBuf;
5
6pub struct SingleBucketThreadDispatcher<'a, B: LockFreeBucket, S: BucketItemSerializer> {
7 buckets: &'a MultiThreadBuckets<B>,
8 bucket_index: u16,
9 buffer: Vec<u8>,
10 serializer: S,
11}
12
13impl<'a, B: LockFreeBucket, S: BucketItemSerializer> SingleBucketThreadDispatcher<'a, B, S> {
14 pub fn new(
15 buffer_size: MemoryDataSize,
16 bucket_index: u16,
17 buckets: &'a MultiThreadBuckets<B>,
18 serializer_init_data: S::InitData,
19 ) -> Self {
20 let buffer = Vec::with_capacity(buffer_size.as_bytes());
21
22 Self {
23 buckets,
24 bucket_index,
25 buffer,
26 serializer: S::new(serializer_init_data),
27 }
28 }
29
30 pub fn get_bucket_index(&self) -> u16 {
31 self.bucket_index
32 }
33
34 pub fn get_path(&self) -> PathBuf {
35 self.buckets.get_path(self.bucket_index)
36 }
37
38 fn flush_buffer(&mut self) {
39 if self.buffer.len() == 0 {
40 return;
41 }
42
43 self.buckets.add_data(self.bucket_index, &self.buffer);
44 self.buffer.clear();
45 }
46
47 pub fn add_element_extended(
48 &mut self,
49 extra_data: &S::ExtraData,
50 extra_buffer: &S::ExtraDataBuffer,
51 element: &S::InputElementType<'_>,
52 ) {
53 if self.serializer.get_size(element, extra_data) + self.buffer.len()
54 > self.buffer.capacity()
55 {
56 self.flush_buffer();
57 self.serializer.reset();
58 }
59 self.serializer
60 .write_to(element, &mut self.buffer, extra_data, extra_buffer);
61 }
62
63 pub fn add_element(&mut self, extra_data: &S::ExtraData, element: &S::InputElementType<'_>)
64 where
65 S: BucketItemSerializer<ExtraDataBuffer = ()>,
66 {
67 self.add_element_extended(extra_data, &(), element);
68 }
69
70 pub fn finalize(self) {}
71}
72
73impl<'a, B: LockFreeBucket, S: BucketItemSerializer> Drop
74 for SingleBucketThreadDispatcher<'a, B, S>
75{
76 fn drop(&mut self) {
77 self.flush_buffer();
78 }
79}