1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use crate::buckets::bucket_writer::BucketItemSerializer;
use crate::buckets::{LockFreeBucket, MultiThreadBuckets};
use crate::memory_data_size::MemoryDataSize;
use std::path::PathBuf;

pub struct SingleBucketThreadDispatcher<'a, B: LockFreeBucket, S: BucketItemSerializer> {
    buckets: &'a MultiThreadBuckets<B>,
    bucket_index: u16,
    buffer: Vec<u8>,
    serializer: S,
}

impl<'a, B: LockFreeBucket, S: BucketItemSerializer> SingleBucketThreadDispatcher<'a, B, S> {
    pub fn new(
        buffer_size: MemoryDataSize,
        bucket_index: u16,
        buckets: &'a MultiThreadBuckets<B>,
    ) -> Self {
        let buffer = Vec::with_capacity(buffer_size.as_bytes());

        Self {
            buckets,
            bucket_index,
            buffer,
            serializer: S::new(),
        }
    }

    pub fn get_bucket_index(&self) -> u16 {
        self.bucket_index
    }

    pub fn get_path(&self) -> PathBuf {
        self.buckets.get_path(self.bucket_index)
    }

    fn flush_buffer(&mut self) {
        if self.buffer.len() == 0 {
            return;
        }

        self.buckets.add_data(self.bucket_index, &self.buffer);
        self.buffer.clear();
    }

    pub fn add_element_extended(
        &mut self,
        extra_data: &S::ExtraData,
        extra_buffer: &S::ExtraDataBuffer,
        element: &S::InputElementType<'_>,
    ) {
        if self.serializer.get_size(element, extra_data) + self.buffer.len()
            > self.buffer.capacity()
        {
            self.flush_buffer();
            self.serializer.reset();
        }
        self.serializer
            .write_to(element, &mut self.buffer, extra_data, extra_buffer);
    }

    pub fn add_element(&mut self, extra_data: &S::ExtraData, element: &S::InputElementType<'_>)
    where
        S: BucketItemSerializer<ExtraDataBuffer = ()>,
    {
        self.add_element_extended(extra_data, &(), element);
    }

    pub fn finalize(self) {}
}

impl<'a, B: LockFreeBucket, S: BucketItemSerializer> Drop
    for SingleBucketThreadDispatcher<'a, B, S>
{
    fn drop(&mut self) {
        self.flush_buffer();
    }
}