parallel_processor/buckets/
bucket_writer.rs1use std::{
2 io::{Read, Write},
3 marker::PhantomData,
4};
5
6use serde::{de::DeserializeOwned, Serialize};
7
8pub trait BucketItemSerializer {
9 type InputElementType<'a>: ?Sized;
10 type ExtraData;
11 type ReadBuffer;
12 type ExtraDataBuffer;
13 type ReadType<'a>;
14 type InitData;
15
16 type CheckpointData: Serialize + DeserializeOwned + 'static;
17
18 fn new(init_data: Self::InitData) -> Self;
20 fn reset(&mut self);
22
23 fn write_to(
24 &mut self,
25 element: &Self::InputElementType<'_>,
26 bucket: &mut Vec<u8>,
27 extra_data: &Self::ExtraData,
28 extra_read_buffer: &Self::ExtraDataBuffer,
29 );
30 fn read_from<'a, S: Read>(
31 &mut self,
32 stream: S,
33 read_buffer: &'a mut Self::ReadBuffer,
34 extra_read_buffer: &mut Self::ExtraDataBuffer,
35 ) -> Option<Self::ReadType<'a>>;
36
37 fn get_size(&self, element: &Self::InputElementType<'_>, extra: &Self::ExtraData) -> usize;
38}
39
40pub struct BytesArraySerializer<const SIZE: usize>(PhantomData<[(); SIZE]>);
41impl<const SIZE: usize> BucketItemSerializer for BytesArraySerializer<SIZE> {
42 type InputElementType<'a> = [u8; SIZE];
43 type ExtraData = ();
44 type ExtraDataBuffer = ();
45 type ReadBuffer = [u8; SIZE];
46 type ReadType<'a> = &'a [u8; SIZE];
47 type InitData = ();
48
49 type CheckpointData = ();
50
51 #[inline(always)]
52 fn new(_: ()) -> Self {
53 Self(PhantomData)
54 }
55
56 #[inline(always)]
57 fn reset(&mut self) {}
58
59 #[inline(always)]
60 fn write_to(
61 &mut self,
62 element: &Self::InputElementType<'_>,
63 bucket: &mut Vec<u8>,
64 _: &Self::ExtraData,
65 _: &Self::ExtraDataBuffer,
66 ) {
67 bucket.write(element).unwrap();
68 }
69
70 fn read_from<'a, S: Read>(
71 &mut self,
72 mut stream: S,
73 read_buffer: &'a mut Self::ReadBuffer,
74 _: &mut Self::ExtraDataBuffer,
75 ) -> Option<Self::ReadType<'a>> {
76 stream.read_exact(read_buffer).ok()?;
77 Some(read_buffer)
78 }
79
80 #[inline(always)]
81 fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
82 element.len()
83 }
84}
85
86pub struct BytesSliceSerializer;
87impl BucketItemSerializer for BytesSliceSerializer {
88 type InputElementType<'a> = [u8];
89 type ExtraData = ();
90 type ExtraDataBuffer = ();
91 type ReadBuffer = ();
92 type ReadType<'a> = ();
93 type InitData = ();
94
95 type CheckpointData = ();
96
97 #[inline(always)]
98 fn new(_: ()) -> Self {
99 Self
100 }
101
102 #[inline(always)]
103 fn reset(&mut self) {}
104
105 #[inline(always)]
106 fn write_to(
107 &mut self,
108 element: &Self::InputElementType<'_>,
109 bucket: &mut Vec<u8>,
110 _extra_data: &Self::ExtraData,
111 _: &Self::ExtraDataBuffer,
112 ) {
113 bucket.write(element).unwrap();
114 }
115
116 fn read_from<'a, S: Read>(
117 &mut self,
118 _stream: S,
119 _read_buffer: &'a mut Self::ReadBuffer,
120 _: &mut Self::ExtraDataBuffer,
121 ) -> Option<Self::ReadType<'a>> {
122 unimplemented!("Cannot read slices of unknown size!")
123 }
124
125 #[inline(always)]
126 fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
127 element.len()
128 }
129}