parallel_processor/buckets/
bucket_writer.rs1use std::{
2 io::{Read, Write},
3 marker::PhantomData,
4};
5
6use bincode::{Decode, Encode};
7
8pub trait BucketItemSerializer {
9 type InputElementType<'a>: ?Sized;
10 type ExtraData;
11 type ReadBuffer: Default;
12 type ExtraDataBuffer: Default;
13 type ReadType<'a>;
14 type InitData;
15
16 type CheckpointData: Encode + Decode<()> + 'static;
17
18 fn new(init_data: Self::InitData) -> Self;
20 fn reset(&mut self);
22
23 fn write_to(
24 &mut self,
25 element: &Self::InputElementType<'_>,
26 bucket: &mut Vec<u8>,
27 extra_data: &Self::ExtraData,
28 extra_read_buffer: &Self::ExtraDataBuffer,
29 );
30 fn read_from<'a, S: Read>(
31 &mut self,
32 stream: S,
33 read_buffer: &'a mut Self::ReadBuffer,
34 extra_read_buffer: &mut Self::ExtraDataBuffer,
35 ) -> Option<Self::ReadType<'a>>;
36
37 fn get_size(&self, element: &Self::InputElementType<'_>, extra: &Self::ExtraData) -> usize;
38}
39
40#[repr(transparent)]
41pub struct BytesArrayBuffer<const SIZE: usize>([u8; SIZE]);
42
43impl<const SIZE: usize> Default for BytesArrayBuffer<SIZE> {
44 fn default() -> Self {
45 Self([0; SIZE])
46 }
47}
48
49pub struct BytesArraySerializer<const SIZE: usize>(PhantomData<[(); SIZE]>);
50impl<const SIZE: usize> BucketItemSerializer for BytesArraySerializer<SIZE> {
51 type InputElementType<'a> = [u8; SIZE];
52 type ExtraData = ();
53 type ExtraDataBuffer = ();
54 type ReadBuffer = BytesArrayBuffer<SIZE>;
55 type ReadType<'a> = &'a [u8; SIZE];
56 type InitData = ();
57
58 type CheckpointData = ();
59
60 #[inline(always)]
61 fn new(_: ()) -> Self {
62 Self(PhantomData)
63 }
64
65 #[inline(always)]
66 fn reset(&mut self) {}
67
68 #[inline(always)]
69 fn write_to(
70 &mut self,
71 element: &Self::InputElementType<'_>,
72 bucket: &mut Vec<u8>,
73 _: &Self::ExtraData,
74 _: &Self::ExtraDataBuffer,
75 ) {
76 bucket.write(element).unwrap();
77 }
78
79 fn read_from<'a, S: Read>(
80 &mut self,
81 mut stream: S,
82 read_buffer: &'a mut Self::ReadBuffer,
83 _: &mut Self::ExtraDataBuffer,
84 ) -> Option<Self::ReadType<'a>> {
85 stream.read_exact(&mut read_buffer.0).ok()?;
86 Some(&read_buffer.0)
87 }
88
89 #[inline(always)]
90 fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
91 element.len()
92 }
93}
94
95pub struct BytesSliceSerializer;
96impl BucketItemSerializer for BytesSliceSerializer {
97 type InputElementType<'a> = [u8];
98 type ExtraData = ();
99 type ExtraDataBuffer = ();
100 type ReadBuffer = ();
101 type ReadType<'a> = ();
102 type InitData = ();
103
104 type CheckpointData = ();
105
106 #[inline(always)]
107 fn new(_: ()) -> Self {
108 Self
109 }
110
111 #[inline(always)]
112 fn reset(&mut self) {}
113
114 #[inline(always)]
115 fn write_to(
116 &mut self,
117 element: &Self::InputElementType<'_>,
118 bucket: &mut Vec<u8>,
119 _extra_data: &Self::ExtraData,
120 _: &Self::ExtraDataBuffer,
121 ) {
122 bucket.write(element).unwrap();
123 }
124
125 fn read_from<'a, S: Read>(
126 &mut self,
127 _stream: S,
128 _read_buffer: &'a mut Self::ReadBuffer,
129 _: &mut Self::ExtraDataBuffer,
130 ) -> Option<Self::ReadType<'a>> {
131 unimplemented!("Cannot read slices of unknown size!")
132 }
133
134 #[inline(always)]
135 fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
136 element.len()
137 }
138}