parallel_processor/buckets/
bucket_writer.rs

1use std::{
2    io::{Read, Write},
3    marker::PhantomData,
4};
5
6use bincode::{Decode, Encode};
7
8pub trait BucketItemSerializer {
9    type InputElementType<'a>: ?Sized;
10    type ExtraData;
11    type ReadBuffer: Default;
12    type ExtraDataBuffer: Default;
13    type ReadType<'a>;
14    type InitData;
15
16    type CheckpointData: Encode + Decode<()> + 'static;
17
18    /// Creates a new instance
19    fn new(init_data: Self::InitData) -> Self;
20    /// Reset on non continuous data
21    fn reset(&mut self);
22
23    fn write_to(
24        &mut self,
25        element: &Self::InputElementType<'_>,
26        bucket: &mut Vec<u8>,
27        extra_data: &Self::ExtraData,
28        extra_read_buffer: &Self::ExtraDataBuffer,
29    );
30    fn read_from<'a, S: Read>(
31        &mut self,
32        stream: S,
33        read_buffer: &'a mut Self::ReadBuffer,
34        extra_read_buffer: &mut Self::ExtraDataBuffer,
35    ) -> Option<Self::ReadType<'a>>;
36
37    fn get_size(&self, element: &Self::InputElementType<'_>, extra: &Self::ExtraData) -> usize;
38}
39
40#[repr(transparent)]
41pub struct BytesArrayBuffer<const SIZE: usize>([u8; SIZE]);
42
43impl<const SIZE: usize> Default for BytesArrayBuffer<SIZE> {
44    fn default() -> Self {
45        Self([0; SIZE])
46    }
47}
48
49pub struct BytesArraySerializer<const SIZE: usize>(PhantomData<[(); SIZE]>);
50impl<const SIZE: usize> BucketItemSerializer for BytesArraySerializer<SIZE> {
51    type InputElementType<'a> = [u8; SIZE];
52    type ExtraData = ();
53    type ExtraDataBuffer = ();
54    type ReadBuffer = BytesArrayBuffer<SIZE>;
55    type ReadType<'a> = &'a [u8; SIZE];
56    type InitData = ();
57
58    type CheckpointData = ();
59
60    #[inline(always)]
61    fn new(_: ()) -> Self {
62        Self(PhantomData)
63    }
64
65    #[inline(always)]
66    fn reset(&mut self) {}
67
68    #[inline(always)]
69    fn write_to(
70        &mut self,
71        element: &Self::InputElementType<'_>,
72        bucket: &mut Vec<u8>,
73        _: &Self::ExtraData,
74        _: &Self::ExtraDataBuffer,
75    ) {
76        bucket.write(element).unwrap();
77    }
78
79    fn read_from<'a, S: Read>(
80        &mut self,
81        mut stream: S,
82        read_buffer: &'a mut Self::ReadBuffer,
83        _: &mut Self::ExtraDataBuffer,
84    ) -> Option<Self::ReadType<'a>> {
85        stream.read_exact(&mut read_buffer.0).ok()?;
86        Some(&read_buffer.0)
87    }
88
89    #[inline(always)]
90    fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
91        element.len()
92    }
93}
94
95pub struct BytesSliceSerializer;
96impl BucketItemSerializer for BytesSliceSerializer {
97    type InputElementType<'a> = [u8];
98    type ExtraData = ();
99    type ExtraDataBuffer = ();
100    type ReadBuffer = ();
101    type ReadType<'a> = ();
102    type InitData = ();
103
104    type CheckpointData = ();
105
106    #[inline(always)]
107    fn new(_: ()) -> Self {
108        Self
109    }
110
111    #[inline(always)]
112    fn reset(&mut self) {}
113
114    #[inline(always)]
115    fn write_to(
116        &mut self,
117        element: &Self::InputElementType<'_>,
118        bucket: &mut Vec<u8>,
119        _extra_data: &Self::ExtraData,
120        _: &Self::ExtraDataBuffer,
121    ) {
122        bucket.write(element).unwrap();
123    }
124
125    fn read_from<'a, S: Read>(
126        &mut self,
127        _stream: S,
128        _read_buffer: &'a mut Self::ReadBuffer,
129        _: &mut Self::ExtraDataBuffer,
130    ) -> Option<Self::ReadType<'a>> {
131        unimplemented!("Cannot read slices of unknown size!")
132    }
133
134    #[inline(always)]
135    fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
136        element.len()
137    }
138}