parallel_processor/buckets/
single.rs1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::{LockFreeBucket, MultiThreadBuckets};
3use crate::memory_data_size::MemoryDataSize;
4use std::path::PathBuf;
5
6pub struct SingleBucketThreadDispatcher<'a, B: LockFreeBucket, S: BucketItemSerializer> {
7 buckets: &'a MultiThreadBuckets<B>,
8 bucket_index: u16,
9 buffer: Vec<u8>,
10 serializer: S,
11}
12
13impl<'a, B: LockFreeBucket, S: BucketItemSerializer> SingleBucketThreadDispatcher<'a, B, S> {
14 pub fn new(
15 buffer_size: MemoryDataSize,
16 bucket_index: u16,
17 buckets: &'a MultiThreadBuckets<B>,
18 ) -> Self {
19 let buffer = Vec::with_capacity(buffer_size.as_bytes());
20
21 Self {
22 buckets,
23 bucket_index,
24 buffer,
25 serializer: S::new(),
26 }
27 }
28
29 pub fn get_bucket_index(&self) -> u16 {
30 self.bucket_index
31 }
32
33 pub fn get_path(&self) -> PathBuf {
34 self.buckets.get_path(self.bucket_index)
35 }
36
37 fn flush_buffer(&mut self) {
38 if self.buffer.len() == 0 {
39 return;
40 }
41
42 self.buckets.add_data(self.bucket_index, &self.buffer);
43 self.buffer.clear();
44 }
45
46 pub fn add_element_extended(
47 &mut self,
48 extra_data: &S::ExtraData,
49 extra_buffer: &S::ExtraDataBuffer,
50 element: &S::InputElementType<'_>,
51 ) {
52 if self.serializer.get_size(element, extra_data) + self.buffer.len()
53 > self.buffer.capacity()
54 {
55 self.flush_buffer();
56 self.serializer.reset();
57 }
58 self.serializer
59 .write_to(element, &mut self.buffer, extra_data, extra_buffer);
60 }
61
62 pub fn add_element(&mut self, extra_data: &S::ExtraData, element: &S::InputElementType<'_>)
63 where
64 S: BucketItemSerializer<ExtraDataBuffer = ()>,
65 {
66 self.add_element_extended(extra_data, &(), element);
67 }
68
69 pub fn finalize(self) {}
70}
71
72impl<'a, B: LockFreeBucket, S: BucketItemSerializer> Drop
73 for SingleBucketThreadDispatcher<'a, B, S>
74{
75 fn drop(&mut self) {
76 self.flush_buffer();
77 }
78}