1use crate::block_handle::BlockHandle;
2use crate::compression::compress;
3use crate::error::Result;
4use crate::types::{ChecksumType, CompressionType, checksum_modifier_for_context};
5use byteorder::{LittleEndian, WriteBytesExt};
6
7#[derive(Debug, Clone)]
9pub struct DataBlockBuilderOptions {
10 pub restart_interval: usize,
12 pub block_size_target: Option<usize>,
14 pub enable_checksums: bool,
16}
17
18impl Default for DataBlockBuilderOptions {
19 fn default() -> Self {
20 Self {
21 restart_interval: 16,
22 block_size_target: None,
23 enable_checksums: false,
24 }
25 }
26}
27
28impl DataBlockBuilderOptions {
29 pub fn with_restart_interval(mut self, restart_interval: usize) -> Self {
31 self.restart_interval = restart_interval;
32 self
33 }
34
35 pub fn with_block_size_target(mut self, size: usize) -> Self {
37 self.block_size_target = Some(size);
38 self
39 }
40
41 pub fn with_checksums(mut self, enable: bool) -> Self {
43 self.enable_checksums = enable;
44 self
45 }
46}
47
48pub struct DataBlockBuilder {
50 buffer: Vec<u8>,
51 restarts: Vec<u32>,
52 counter: usize,
53 options: DataBlockBuilderOptions,
54 last_key: Vec<u8>,
55 finished: bool,
56}
57
58impl DataBlockBuilder {
59 pub fn new(options: DataBlockBuilderOptions) -> Self {
61 let mut builder = DataBlockBuilder {
62 buffer: Vec::new(),
63 restarts: Vec::new(),
64 counter: 0,
65 options,
66 last_key: Vec::new(),
67 finished: false,
68 };
69
70 builder.restarts.push(0);
72 builder
73 }
74
75 pub fn add(&mut self, key: &[u8], value: &[u8]) {
76 assert!(!self.finished);
77 assert!(self.counter <= self.options.restart_interval);
78 assert!(self.buffer.len() < u32::MAX as usize);
79
80 let mut shared = 0;
81 if self.counter < self.options.restart_interval {
82 let min_len = std::cmp::min(self.last_key.len(), key.len());
84 while shared < min_len && self.last_key[shared] == key[shared] {
85 shared += 1;
86 }
87 } else {
88 self.restarts.push(self.buffer.len() as u32);
90 self.counter = 0;
91 }
92
93 let non_shared = key.len() - shared;
94
95 self.encode_varint(shared as u32);
97 self.encode_varint(non_shared as u32);
98 self.encode_varint(value.len() as u32);
99
100 self.buffer.extend_from_slice(&key[shared..]);
102
103 self.buffer.extend_from_slice(value);
105
106 self.last_key.clear();
108 self.last_key.extend_from_slice(key);
109 self.counter += 1;
110 }
111
112 pub fn finish(
113 &mut self,
114 compression_type: CompressionType,
115 checksum_type: ChecksumType,
116 file_offset: Option<u64>,
117 base_context_checksum: Option<u32>,
118 ) -> Result<Vec<u8>> {
119 if self.finished {
120 panic!("DataBlockBuilder already finished");
121 }
122 self.finished = true;
123
124 for restart in &self.restarts {
126 self.buffer.write_u32::<LittleEndian>(*restart).unwrap();
127 }
128
129 self.buffer
131 .write_u32::<LittleEndian>(self.restarts.len() as u32)
132 .unwrap();
133
134 let mut checksum_data = self.buffer.clone();
136 checksum_data.push(compression_type as u8);
137 let mut checksum = checksum_type.calculate(&checksum_data);
138
139 if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
141 let modifier = checksum_modifier_for_context(base_checksum, offset);
142 checksum = checksum.wrapping_add(modifier);
143 }
144
145 if compression_type == CompressionType::None {
147 let mut result = self.buffer.clone();
148 result.push(compression_type as u8);
149 result.write_u32::<LittleEndian>(checksum).unwrap();
150 Ok(result)
151 } else {
152 let compressed_data = compress(&self.buffer, compression_type)?;
154
155 let mut compressed_checksum_data = compressed_data.clone();
157 compressed_checksum_data.push(compression_type as u8);
158 let mut compressed_checksum = checksum_type.calculate(&compressed_checksum_data);
159
160 if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
162 let modifier = checksum_modifier_for_context(base_checksum, offset);
163 compressed_checksum = compressed_checksum.wrapping_add(modifier);
164 }
165
166 let mut result = compressed_data;
168 result.push(compression_type as u8);
169 result
170 .write_u32::<LittleEndian>(compressed_checksum)
171 .unwrap();
172
173 Ok(result)
174 }
175 }
176
177 pub fn reset(&mut self) {
178 self.buffer.clear();
179 self.restarts.clear();
180 self.restarts.push(0);
181 self.counter = 0;
182 self.last_key.clear();
183 self.finished = false;
184 }
185
186 pub fn empty(&self) -> bool {
187 self.buffer.is_empty()
188 }
189
190 pub fn size_estimate(&self) -> usize {
191 self.buffer.len() + 4 * self.restarts.len() + 4 + 5 }
193
194 fn encode_varint(&mut self, mut value: u32) {
195 while value >= 0x80 {
196 self.buffer.push((value & 0x7F) as u8 | 0x80);
197 value >>= 7;
198 }
199 self.buffer.push(value as u8);
200 }
201}
202
203pub struct IndexBlockBuilder {
205 buffer: Vec<u8>,
206 restarts: Vec<u32>,
207 counter: usize,
208 restart_interval: usize,
209 last_key: Vec<u8>,
210 finished: bool,
211}
212
213impl IndexBlockBuilder {
214 pub fn new(restart_interval: usize) -> Self {
215 let mut builder = IndexBlockBuilder {
216 buffer: Vec::new(),
217 restarts: Vec::new(),
218 counter: 0,
219 restart_interval,
220 last_key: Vec::new(),
221 finished: false,
222 };
223
224 builder.restarts.push(0);
226 builder
227 }
228
229 pub fn add_index_entry(&mut self, key: &[u8], block_handle: &BlockHandle) {
230 assert!(!self.finished);
231 assert!(self.counter <= self.restart_interval);
232 assert!(self.buffer.len() < u32::MAX as usize);
233
234 let mut shared = 0;
235 if self.counter < self.restart_interval {
236 let min_len = std::cmp::min(self.last_key.len(), key.len());
238 while shared < min_len && self.last_key[shared] == key[shared] {
239 shared += 1;
240 }
241 } else {
242 self.restarts.push(self.buffer.len() as u32);
244 self.counter = 0;
245 }
246
247 let non_shared = key.len() - shared;
248
249 let mut handle_data = Vec::new();
251 self.encode_varint_to(&mut handle_data, block_handle.offset as u32);
252 self.encode_varint_to(&mut handle_data, block_handle.size as u32);
253
254 self.encode_varint(shared as u32);
256 self.encode_varint(non_shared as u32);
257 self.encode_varint(handle_data.len() as u32);
258
259 self.buffer.extend_from_slice(&key[shared..]);
261
262 self.buffer.extend_from_slice(&handle_data);
264
265 self.last_key.clear();
267 self.last_key.extend_from_slice(key);
268 self.counter += 1;
269 }
270
271 pub fn finish(
272 &mut self,
273 compression_type: CompressionType,
274 checksum_type: ChecksumType,
275 file_offset: Option<u64>,
276 base_context_checksum: Option<u32>,
277 ) -> Result<Vec<u8>> {
278 if self.finished {
279 panic!("IndexBlockBuilder already finished");
280 }
281 self.finished = true;
282
283 for restart in &self.restarts {
285 self.buffer.write_u32::<LittleEndian>(*restart).unwrap();
286 }
287
288 self.buffer
290 .write_u32::<LittleEndian>(self.restarts.len() as u32)
291 .unwrap();
292
293 let mut checksum_data = self.buffer.clone();
295 checksum_data.push(compression_type as u8);
296 let mut checksum = checksum_type.calculate(&checksum_data);
297
298 if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
300 let modifier = checksum_modifier_for_context(base_checksum, offset);
301 checksum = checksum.wrapping_add(modifier);
302 }
303
304 if compression_type == CompressionType::None {
306 let mut result = self.buffer.clone();
307 result.push(compression_type as u8);
308 result.write_u32::<LittleEndian>(checksum).unwrap();
309 Ok(result)
310 } else {
311 let compressed_data = compress(&self.buffer, compression_type)?;
313
314 let mut compressed_checksum_data = compressed_data.clone();
316 compressed_checksum_data.push(compression_type as u8);
317 let mut compressed_checksum = checksum_type.calculate(&compressed_checksum_data);
318
319 if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
321 let modifier = checksum_modifier_for_context(base_checksum, offset);
322 compressed_checksum = compressed_checksum.wrapping_add(modifier);
323 }
324
325 let mut result = compressed_data;
327 result.push(compression_type as u8);
328 result
329 .write_u32::<LittleEndian>(compressed_checksum)
330 .unwrap();
331
332 Ok(result)
333 }
334 }
335
336 pub fn empty(&self) -> bool {
337 self.buffer.is_empty()
338 }
339
340 fn encode_varint(&mut self, mut value: u32) {
341 while value >= 0x80 {
342 self.buffer.push((value & 0x7F) as u8 | 0x80);
343 value >>= 7;
344 }
345 self.buffer.push(value as u8);
346 }
347
348 fn encode_varint_to(&self, buffer: &mut Vec<u8>, mut value: u32) {
349 while value >= 0x80 {
350 buffer.push((value & 0x7F) as u8 | 0x80);
351 value >>= 7;
352 }
353 buffer.push(value as u8);
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::types::{ChecksumType, CompressionType};
361
362 #[test]
363 fn test_data_block_builder_simple() -> Result<()> {
364 let mut builder =
365 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
366
367 builder.add(b"key1", b"value1");
368 builder.add(b"key2", b"value2");
369
370 let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
371 assert!(!block_data.is_empty());
372 Ok(())
373 }
374
375 #[test]
376 fn test_data_block_builder_with_compression() -> Result<()> {
377 let mut builder =
378 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
379
380 for i in 0..10 {
382 let key = format!("key{:03}", i);
383 let value = format!("value{:03}", i);
384 builder.add(key.as_bytes(), value.as_bytes());
385 }
386
387 let compressed_block =
388 builder.finish(CompressionType::Snappy, ChecksumType::CRC32c, None, None)?;
389 assert!(!compressed_block.is_empty());
390 Ok(())
391 }
392
393 #[test]
394 fn test_index_block_builder() -> Result<()> {
395 let mut builder = IndexBlockBuilder::new(16);
396
397 let handle1 = BlockHandle {
398 offset: 0,
399 size: 100,
400 };
401 let handle2 = BlockHandle {
402 offset: 100,
403 size: 150,
404 };
405
406 builder.add_index_entry(b"key1", &handle1);
407 builder.add_index_entry(b"key2", &handle2);
408
409 let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
410 assert!(!block_data.is_empty());
411 Ok(())
412 }
413
414 #[test]
415 fn test_data_block_builder_empty() -> Result<()> {
416 let builder =
417 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
418 assert!(builder.empty());
419 Ok(())
420 }
421
422 #[test]
423 fn test_data_block_builder_reset() -> Result<()> {
424 let mut builder =
425 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
426 builder.add(b"key1", b"value1");
427 assert!(!builder.empty());
428
429 builder.reset();
430 assert!(builder.empty());
431 Ok(())
432 }
433}