parallel_processor/buckets/
bucket_writer.rs

1use std::{
2    io::{Read, Write},
3    marker::PhantomData,
4};
5
6use serde::{de::DeserializeOwned, Serialize};
7
8pub trait BucketItemSerializer {
9    type InputElementType<'a>: ?Sized;
10    type ExtraData;
11    type ReadBuffer;
12    type ExtraDataBuffer;
13    type ReadType<'a>;
14    type InitData;
15
16    type CheckpointData: Serialize + DeserializeOwned + 'static;
17
18    /// Creates a new instance
19    fn new(init_data: Self::InitData) -> Self;
20    /// Reset on non continuous data
21    fn reset(&mut self);
22
23    fn write_to(
24        &mut self,
25        element: &Self::InputElementType<'_>,
26        bucket: &mut Vec<u8>,
27        extra_data: &Self::ExtraData,
28        extra_read_buffer: &Self::ExtraDataBuffer,
29    );
30    fn read_from<'a, S: Read>(
31        &mut self,
32        stream: S,
33        read_buffer: &'a mut Self::ReadBuffer,
34        extra_read_buffer: &mut Self::ExtraDataBuffer,
35    ) -> Option<Self::ReadType<'a>>;
36
37    fn get_size(&self, element: &Self::InputElementType<'_>, extra: &Self::ExtraData) -> usize;
38}
39
40pub struct BytesArraySerializer<const SIZE: usize>(PhantomData<[(); SIZE]>);
41impl<const SIZE: usize> BucketItemSerializer for BytesArraySerializer<SIZE> {
42    type InputElementType<'a> = [u8; SIZE];
43    type ExtraData = ();
44    type ExtraDataBuffer = ();
45    type ReadBuffer = [u8; SIZE];
46    type ReadType<'a> = &'a [u8; SIZE];
47    type InitData = ();
48
49    type CheckpointData = ();
50
51    #[inline(always)]
52    fn new(_: ()) -> Self {
53        Self(PhantomData)
54    }
55
56    #[inline(always)]
57    fn reset(&mut self) {}
58
59    #[inline(always)]
60    fn write_to(
61        &mut self,
62        element: &Self::InputElementType<'_>,
63        bucket: &mut Vec<u8>,
64        _: &Self::ExtraData,
65        _: &Self::ExtraDataBuffer,
66    ) {
67        bucket.write(element).unwrap();
68    }
69
70    fn read_from<'a, S: Read>(
71        &mut self,
72        mut stream: S,
73        read_buffer: &'a mut Self::ReadBuffer,
74        _: &mut Self::ExtraDataBuffer,
75    ) -> Option<Self::ReadType<'a>> {
76        stream.read_exact(read_buffer).ok()?;
77        Some(read_buffer)
78    }
79
80    #[inline(always)]
81    fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
82        element.len()
83    }
84}
85
86pub struct BytesSliceSerializer;
87impl BucketItemSerializer for BytesSliceSerializer {
88    type InputElementType<'a> = [u8];
89    type ExtraData = ();
90    type ExtraDataBuffer = ();
91    type ReadBuffer = ();
92    type ReadType<'a> = ();
93    type InitData = ();
94
95    type CheckpointData = ();
96
97    #[inline(always)]
98    fn new(_: ()) -> Self {
99        Self
100    }
101
102    #[inline(always)]
103    fn reset(&mut self) {}
104
105    #[inline(always)]
106    fn write_to(
107        &mut self,
108        element: &Self::InputElementType<'_>,
109        bucket: &mut Vec<u8>,
110        _extra_data: &Self::ExtraData,
111        _: &Self::ExtraDataBuffer,
112    ) {
113        bucket.write(element).unwrap();
114    }
115
116    fn read_from<'a, S: Read>(
117        &mut self,
118        _stream: S,
119        _read_buffer: &'a mut Self::ReadBuffer,
120        _: &mut Self::ExtraDataBuffer,
121    ) -> Option<Self::ReadType<'a>> {
122        unimplemented!("Cannot read slices of unknown size!")
123    }
124
125    #[inline(always)]
126    fn get_size(&self, element: &Self::InputElementType<'_>, _: &()) -> usize {
127        element.len()
128    }
129}