rskafka/protocol/
vec_builder.rs1pub const DEFAULT_BLOCK_SIZE: usize = 1024 * 1024 * 10;
5
6#[derive(Debug)]
11pub struct VecBuilder<T> {
12 elements_per_block: usize,
13 blocks: Vec<Vec<T>>,
14 remaining_elements: usize,
15}
16
17impl<T> VecBuilder<T> {
18 pub fn new(expected_elements: usize) -> Self {
24 Self::new_with_block_size(expected_elements, DEFAULT_BLOCK_SIZE)
25 }
26
27 pub fn new_with_block_size(expected_elements: usize, block_size: usize) -> Self {
31 let element_size = std::mem::size_of::<T>();
32 let elements_per_block = if element_size == 0 {
33 expected_elements
34 } else {
35 block_size / element_size
36 };
37 if elements_per_block == 0 {
38 panic!("Block size too small for this type!");
39 }
40
41 Self {
42 elements_per_block,
43 blocks: vec![Vec::with_capacity(
44 elements_per_block.min(expected_elements),
45 )],
46 remaining_elements: expected_elements,
47 }
48 }
49
50 pub fn push(&mut self, element: T) {
55 assert!(
56 self.remaining_elements > 0,
57 "Got more elements than expected!"
58 );
59
60 let mut target_block = self.blocks.last_mut().expect("Has always at least 1 block");
61 if target_block.len() >= self.elements_per_block {
62 self.blocks.push(Vec::with_capacity(
63 self.remaining_elements.min(self.elements_per_block),
64 ));
65 target_block = self.blocks.last_mut().expect("Just pushed a new block");
66 }
67 target_block.push(element);
68 self.remaining_elements -= 1;
69 }
70}
71
72impl VecBuilder<u8> {
73 pub fn read_exact<R>(mut self, reader: &mut R) -> Result<Self, std::io::Error>
75 where
76 R: std::io::Read,
77 {
78 while self.remaining_elements > 0 {
82 let mut buf = self.blocks.last_mut().expect("Has always at least 1 block");
83 if buf.len() >= self.elements_per_block {
84 self.blocks.push(Vec::with_capacity(
85 self.remaining_elements.min(self.elements_per_block),
86 ));
87 buf = self.blocks.last_mut().expect("Just pushed a new block");
88 }
89
90 let to_read = self
91 .remaining_elements
92 .min(self.elements_per_block - buf.len());
93
94 let buf_start_pos = buf.len();
95 buf.resize(buf.len() + to_read, 0);
96
97 reader.read_exact(&mut buf[buf_start_pos..])?;
98 self.remaining_elements -= to_read;
99 }
100
101 Ok(self)
102 }
103}
104
105impl<T> From<VecBuilder<T>> for Vec<T> {
106 fn from(builder: VecBuilder<T>) -> Self {
107 if builder.blocks.len() == 1 {
108 builder
109 .blocks
110 .into_iter()
111 .next()
112 .expect("Just checked number of blocks")
113 } else {
114 let mut out = Self::with_capacity(builder.blocks.iter().map(Self::len).sum());
115 for mut block in builder.blocks.into_iter() {
116 out.append(&mut block);
117 }
118 out
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::io::Cursor;
126
127 use super::*;
128
129 #[test]
130 fn test_build() {
131 let expected_elements = 7;
132 let mut builder = VecBuilder::<i16>::new_with_block_size(expected_elements, 5);
133
134 let mut expected = vec![];
135 for i in 0..expected_elements {
136 let i = i as i16;
137 builder.push(i);
138 expected.push(i);
139 }
140
141 let blocks = vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6]];
142 assert_eq!(builder.blocks, blocks);
143
144 let actual: Vec<_> = builder.into();
145 assert_eq!(actual, expected);
146 }
147
148 #[test]
149 fn test_zst() {
150 let expected_elements = 3;
151 let mut builder = VecBuilder::<()>::new_with_block_size(expected_elements, 1);
152
153 let mut expected = vec![];
154 for _ in 0..expected_elements {
155 builder.push(());
156 expected.push(());
157 }
158
159 let blocks = vec![vec![(), (), ()]];
160 assert_eq!(builder.blocks, blocks);
161
162 let actual: Vec<_> = builder.into();
163 assert_eq!(actual, expected);
164 }
165
166 #[test]
167 fn test_reader() {
168 let data = b"abc".to_vec();
169 let mut reader = Cursor::new(data.clone());
170
171 let mut builder = VecBuilder::<u8>::new_with_block_size(data.len(), 2);
172 builder = builder.read_exact(&mut reader).unwrap();
173
174 let blocks = vec![b"ab".to_vec(), b"c".to_vec()];
175 assert_eq!(builder.blocks, blocks);
176
177 let actual: Vec<_> = builder.into();
178 assert_eq!(actual, data);
179 }
180
181 #[test]
182 fn test_push_reader_interaction() {
183 let data = b"bc".to_vec();
184 let mut reader = Cursor::new(data.clone());
185
186 let mut builder = VecBuilder::<u8>::new_with_block_size(data.len() + 1, 2);
187 builder.push(b'a');
188 builder = builder.read_exact(&mut reader).unwrap();
189
190 let blocks = vec![b"ab".to_vec(), b"c".to_vec()];
191 assert_eq!(builder.blocks, blocks);
192
193 let actual: Vec<_> = builder.into();
194 assert_eq!(&actual, b"abc");
195 }
196
197 #[test]
198 fn test_single_block_not_copied() {
199 let expected_elements = 5;
200 let mut builder = VecBuilder::<i16>::new_with_block_size(expected_elements, 10);
201
202 let mut expected = vec![];
203 for i in 0..expected_elements {
204 let i = i as i16;
205 builder.push(i);
206 expected.push(i);
207 }
208
209 let blocks = vec![vec![0, 1, 2, 3, 4]];
210 assert_eq!(builder.blocks, blocks);
211
212 let addr = builder.blocks[0].as_ptr();
213
214 let actual: Vec<_> = builder.into();
215 assert_eq!(actual, expected);
216 assert_eq!(actual.as_ptr(), addr);
217 }
218
219 #[test]
220 #[should_panic(expected = "Block size too small for this type!")]
221 fn test_panic_block_size_too_small() {
222 VecBuilder::<i16>::new_with_block_size(0, 1);
223 }
224
225 #[test]
226 #[should_panic(expected = "Got more elements than expected!")]
227 fn test_panic_too_many_elements() {
228 let mut builder = VecBuilder::<i8>::new_with_block_size(0, 1);
229 builder.push(1);
230 }
231}