1use crate::compression::decompress;
2use crate::error::{Error, Result};
3use crate::types::CompressionType;
4use byteorder::{LittleEndian, ReadBytesExt};
5use std::io::Cursor;
6
7pub struct DataBlock {
8 data: Vec<u8>,
9 restart_offset: usize,
10 num_restarts: u32,
11 restart_points: Vec<u32>,
12}
13
14pub struct KeyValue {
15 pub key: Vec<u8>,
16 pub value: Vec<u8>,
17}
18
19impl DataBlock {
20 pub fn new(compressed_data: &[u8], compression_type: CompressionType) -> Result<Self> {
21 let raw_data = decompress(compressed_data, compression_type)?;
22
23 let data = if raw_data.len() >= 5 {
25 raw_data[..raw_data.len() - 5].to_vec()
26 } else {
27 raw_data
28 };
29
30 if data.len() < 4 {
31 return Err(Error::InvalidBlockFormat(
32 "Block too small to contain restart info".to_string(),
33 ));
34 }
35
36 let mut cursor = Cursor::new(&data);
37 cursor.set_position((data.len() - 4) as u64);
38 let num_restarts = cursor.read_u32::<LittleEndian>()?;
39
40 if num_restarts == 0 {
41 return Err(Error::InvalidBlockFormat("No restart points".to_string()));
42 }
43
44 if data.len() < 4 + (num_restarts as usize * 4) {
45 return Err(Error::InvalidBlockFormat(
46 "Data block too small to contain restart points".to_string(),
47 ));
48 }
49
50 let restart_offset = data.len() - 4 - (num_restarts as usize * 4);
51 if restart_offset >= data.len() {
52 return Err(Error::InvalidBlockFormat(
53 "Invalid restart offset".to_string(),
54 ));
55 }
56
57 let mut restart_points = Vec::with_capacity(num_restarts as usize);
58 cursor.set_position(restart_offset as u64);
59
60 for _ in 0..num_restarts {
61 restart_points.push(cursor.read_u32::<LittleEndian>()?);
62 }
63
64 Ok(DataBlock {
65 data,
66 restart_offset,
67 num_restarts,
68 restart_points,
69 })
70 }
71
72 pub fn get_entries(&self) -> Result<Vec<KeyValue>> {
73 let mut entries = Vec::new();
74 let mut cursor = Cursor::new(&self.data);
75 let mut last_key = Vec::new();
76
77 while (cursor.position() as usize) < self.restart_offset {
78 let entry_start = cursor.position();
79
80 if self.is_restart_point(entry_start as u32) {
83 last_key.clear();
84 }
85
86 let shared_key_len = self.read_varint(&mut cursor)?;
87 let unshared_key_len = self.read_varint(&mut cursor)?;
88 let value_len = self.read_varint(&mut cursor)?;
89
90 if shared_key_len > last_key.len() as u32 {
91 return Err(Error::InvalidBlockFormat(
92 "Shared key length exceeds previous key length".to_string(),
93 ));
94 }
95
96 let mut key = Vec::new();
97 key.extend_from_slice(&last_key[..shared_key_len as usize]);
98
99 if unshared_key_len > 0 {
100 let pos = cursor.position() as usize;
101 if pos + unshared_key_len as usize > self.data.len() {
102 return Err(Error::InvalidBlockFormat(
103 "Key extends beyond block".to_string(),
104 ));
105 }
106 key.extend_from_slice(&self.data[pos..pos + unshared_key_len as usize]);
107 cursor.set_position((pos + unshared_key_len as usize) as u64);
108 }
109
110 let mut value = Vec::new();
111 if value_len > 0 {
112 let pos = cursor.position() as usize;
113 if pos + value_len as usize > self.data.len() {
114 return Err(Error::InvalidBlockFormat(
115 "Value extends beyond block".to_string(),
116 ));
117 }
118 value.extend_from_slice(&self.data[pos..pos + value_len as usize]);
119 cursor.set_position((pos + value_len as usize) as u64);
120 }
121
122 last_key = key.clone();
123 entries.push(KeyValue { key, value });
124 }
125
126 Ok(entries)
127 }
128
129 fn read_varint(&self, cursor: &mut Cursor<&Vec<u8>>) -> Result<u32> {
130 let mut result = 0u32;
131 let mut shift = 0;
132
133 loop {
134 if (cursor.position() as usize) >= self.data.len() {
135 return Err(Error::InvalidVarint);
136 }
137
138 let byte = self.data[cursor.position() as usize];
139 cursor.set_position(cursor.position() + 1);
140
141 result |= ((byte & 0x7F) as u32) << shift;
142
143 if (byte & 0x80) == 0 {
144 break;
145 }
146
147 shift += 7;
148 if shift >= 32 {
149 return Err(Error::InvalidVarint);
150 }
151 }
152
153 Ok(result)
154 }
155
156 fn is_restart_point(&self, offset: u32) -> bool {
157 self.restart_points.contains(&offset)
158 }
159
160 pub fn num_entries(&self) -> usize {
161 match self.get_entries() {
162 Ok(entries) => entries.len(),
163 Err(_) => 0,
164 }
165 }
166
167 pub fn get_restart_points(&self) -> &[u32] {
168 &self.restart_points
169 }
170}
171
172pub struct DataBlockReader {
173 block: DataBlock,
174 current_entry: usize,
175 entries: Vec<KeyValue>,
176}
177
178impl DataBlockReader {
179 pub fn new(compressed_data: &[u8], compression_type: CompressionType) -> Result<Self> {
180 let block = DataBlock::new(compressed_data, compression_type)?;
181 let entries = block.get_entries()?;
182
183 Ok(DataBlockReader {
184 block,
185 current_entry: 0,
186 entries,
187 })
188 }
189
190 pub fn seek_to_first(&mut self) {
191 self.current_entry = 0;
192 }
193
194 pub fn next(&mut self) -> Option<&KeyValue> {
195 if self.current_entry < self.entries.len() {
196 let entry = &self.entries[self.current_entry];
197 self.current_entry += 1;
198 Some(entry)
199 } else {
200 None
201 }
202 }
203
204 pub fn valid(&self) -> bool {
205 self.current_entry < self.entries.len()
206 }
207
208 pub fn key(&self) -> Option<&[u8]> {
209 if self.current_entry > 0 && self.current_entry <= self.entries.len() {
210 Some(&self.entries[self.current_entry - 1].key)
211 } else {
212 None
213 }
214 }
215
216 pub fn value(&self) -> Option<&[u8]> {
217 if self.current_entry > 0 && self.current_entry <= self.entries.len() {
218 Some(&self.entries[self.current_entry - 1].value)
219 } else {
220 None
221 }
222 }
223
224 pub fn seek(&mut self, target_key: &[u8]) -> bool {
225 for (i, entry) in self.entries.iter().enumerate() {
226 if entry.key.as_slice() >= target_key {
227 self.current_entry = i;
228 return true;
229 }
230 }
231 self.current_entry = self.entries.len();
232 false
233 }
234
235 pub fn entries(&self) -> &[KeyValue] {
236 &self.entries
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::block_builder::{DataBlockBuilder, DataBlockBuilderOptions};
244 use crate::types::CompressionType;
245
246 #[test]
247 fn test_data_block_basic_roundtrip() -> Result<()> {
248 let mut builder =
250 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(4));
251
252 let test_data = vec![
253 (b"key001".to_vec(), b"value001".to_vec()),
254 (b"key002".to_vec(), b"value002".to_vec()),
255 (b"key003".to_vec(), b"value003".to_vec()),
256 (b"key004".to_vec(), b"value004".to_vec()),
257 (b"key005".to_vec(), b"value005".to_vec()),
258 ];
259
260 for (key, value) in &test_data {
262 builder.add(key, value);
263 }
264
265 let block_bytes = builder.finish(
266 CompressionType::None,
267 crate::types::ChecksumType::CRC32c,
268 None,
269 None,
270 )?;
271
272 let block = DataBlock::new(&block_bytes, CompressionType::None)?;
274 let entries = block.get_entries()?;
275
276 assert_eq!(entries.len(), test_data.len());
278 for (i, entry) in entries.iter().enumerate() {
279 assert_eq!(entry.key, test_data[i].0, "Key mismatch at index {}", i);
280 assert_eq!(entry.value, test_data[i].1, "Value mismatch at index {}", i);
281 }
282
283 Ok(())
284 }
285
286 #[test]
287 fn test_data_block_roundtrip_with_reader() -> Result<()> {
288 let mut builder =
290 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
291
292 let test_data = vec![
293 (b"apple".to_vec(), b"fruit".to_vec()),
294 (b"banana".to_vec(), b"yellow".to_vec()),
295 (b"carrot".to_vec(), b"vegetable".to_vec()),
296 (b"date".to_vec(), b"sweet".to_vec()),
297 ];
298
299 for (key, value) in &test_data {
300 builder.add(key, value);
301 }
302
303 let block_bytes = builder.finish(
304 CompressionType::None,
305 crate::types::ChecksumType::CRC32c,
306 None,
307 None,
308 )?;
309
310 let mut reader = DataBlockReader::new(&block_bytes, CompressionType::None)?;
312
313 reader.seek_to_first();
315 let mut read_entries = Vec::new();
316
317 while let Some(entry) = reader.next() {
318 read_entries.push((entry.key.clone(), entry.value.clone()));
319 }
320
321 assert_eq!(read_entries.len(), test_data.len());
323 for (i, (key, value)) in read_entries.iter().enumerate() {
324 assert_eq!(key, &test_data[i].0, "Key mismatch at index {}", i);
325 assert_eq!(value, &test_data[i].1, "Value mismatch at index {}", i);
326 }
327
328 Ok(())
329 }
330
331 #[test]
332 fn test_data_block_roundtrip_with_restarts() -> Result<()> {
333 let mut builder =
335 DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(2)); let test_data = vec![
338 (b"a".to_vec(), b"1".to_vec()),
339 (b"b".to_vec(), b"2".to_vec()),
340 (b"c".to_vec(), b"3".to_vec()),
341 (b"d".to_vec(), b"4".to_vec()),
342 (b"e".to_vec(), b"5".to_vec()),
343 (b"f".to_vec(), b"6".to_vec()),
344 ];
345
346 for (key, value) in &test_data {
347 builder.add(key, value);
348 }
349
350 let block_bytes = builder.finish(
351 CompressionType::None,
352 crate::types::ChecksumType::CRC32c,
353 None,
354 None,
355 )?;
356
357 let block = DataBlock::new(&block_bytes, CompressionType::None)?;
359 let entries = block.get_entries()?;
360
361 assert_eq!(entries.len(), test_data.len());
362 for (i, entry) in entries.iter().enumerate() {
363 assert_eq!(entry.key, test_data[i].0);
364 assert_eq!(entry.value, test_data[i].1);
365 }
366
367 let restart_points = block.get_restart_points();
369 assert!(
370 restart_points.len() >= 3,
371 "Expected at least 3 restart points, got {}",
372 restart_points.len()
373 );
374
375 Ok(())
376 }
377}