cqlite_core/storage/sstable/
chunk_reader.rs1use crate::storage::sstable::compression_info::CompressionInfo;
9use crate::{Error, Result};
10use std::io::{Read, Seek, SeekFrom};
11
12pub struct ChunkReader<R: Read + Seek> {
20 reader: R,
21 compression_info: CompressionInfo,
22 total_file_size: u64,
23}
24
25impl<R: Read + Seek> ChunkReader<R> {
26 pub fn new(reader: R, compression_info: CompressionInfo, total_file_size: u64) -> Self {
38 Self {
39 reader,
40 compression_info,
41 total_file_size,
42 }
43 }
44
45 pub fn read_chunk(&mut self, chunk_index: usize) -> Result<Vec<u8>> {
68 let offset = self
70 .compression_info
71 .compressed_chunk_offset(chunk_index)
72 .ok_or_else(|| {
73 Error::InvalidFormat(format!(
74 "Chunk {} not found in CompressionInfo (total chunks: {})",
75 chunk_index,
76 self.compression_info.chunk_offsets.len()
77 ))
78 })?;
79
80 let total_chunk_size = self
83 .compression_info
84 .compressed_chunk_size(chunk_index, self.total_file_size)
85 .ok_or_else(|| {
86 Error::InvalidFormat(format!(
87 "Cannot determine size for chunk {} (file_size={})",
88 chunk_index, self.total_file_size
89 ))
90 })?;
91
92 self.reader.seek(SeekFrom::Start(offset)).map_err(|e| {
94 Error::Io(std::io::Error::new(
95 e.kind(),
96 format!(
97 "Failed to seek to chunk {} at offset 0x{:x}: {}",
98 chunk_index, offset, e
99 ),
100 ))
101 })?;
102
103 if total_chunk_size < 4 {
106 return Err(Error::InvalidFormat(format!(
107 "Chunk {} size too small: {} bytes (minimum 4 for CRC)",
108 chunk_index, total_chunk_size
109 )));
110 }
111
112 let chunk_size = (total_chunk_size - 4) as usize;
113 let mut chunk_data = vec![0u8; chunk_size];
114 self.reader.read_exact(&mut chunk_data).map_err(|e| {
115 Error::Io(std::io::Error::new(
116 e.kind(),
117 format!(
118 "Failed to read chunk {} data ({} bytes at offset 0x{:x}): {}",
119 chunk_index, chunk_size, offset, e
120 ),
121 ))
122 })?;
123
124 let mut crc_bytes = [0u8; 4];
126 self.reader.read_exact(&mut crc_bytes).map_err(|e| {
127 Error::Io(std::io::Error::new(
128 e.kind(),
129 format!(
130 "Failed to read CRC32 for chunk {} at offset 0x{:x}: {}",
131 chunk_index,
132 offset + chunk_size as u64,
133 e
134 ),
135 ))
136 })?;
137 let expected_crc = u32::from_be_bytes(crc_bytes);
138
139 let computed_crc = crc32fast::hash(&chunk_data);
141
142 if computed_crc != expected_crc {
144 return Err(Error::InvalidFormat(format!(
145 "CRC32 mismatch for chunk {} at offset 0x{:x}: expected=0x{:08x}, computed=0x{:08x}, chunk_size={}",
146 chunk_index, offset, expected_crc, computed_crc, chunk_size
147 )));
148 }
149
150 Ok(chunk_data)
151 }
152
153 pub fn read_all_chunks(&mut self) -> Result<Vec<Vec<u8>>> {
165 let chunk_count = self.compression_info.chunk_offsets.len();
166 let mut chunks = Vec::with_capacity(chunk_count);
167
168 for i in 0..chunk_count {
169 let chunk = self.read_chunk(i)?;
170 chunks.push(chunk);
171 }
172
173 Ok(chunks)
174 }
175
176 pub fn chunk_count(&self) -> usize {
178 self.compression_info.chunk_offsets.len()
179 }
180
181 pub fn compression_algorithm(&self) -> &str {
183 &self.compression_info.algorithm
184 }
185
186 pub fn chunk_length(&self) -> u32 {
188 self.compression_info.chunk_length
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use std::io::Cursor;
196
197 #[test]
198 fn test_read_chunk_with_valid_crc() {
199 let compressed_data = b"test compressed chunk data";
201 let crc = crc32fast::hash(compressed_data);
202 let crc_bytes = crc.to_be_bytes();
203
204 let mut data = Vec::new();
205 data.extend_from_slice(compressed_data);
206 data.extend_from_slice(&crc_bytes);
207
208 let total_size = data.len() as u64;
209
210 let compression_info = CompressionInfo {
212 algorithm: "LZ4Compressor".to_string(),
213 chunk_length: 16384,
214 data_length: compressed_data.len() as u64,
215 chunk_offsets: vec![0],
216 option_pairs: vec![],
217 max_compressed_length: i32::MAX as u32,
218 };
219
220 let cursor = Cursor::new(data);
221 let mut reader = ChunkReader::new(cursor, compression_info, total_size);
222
223 let result = reader.read_chunk(0);
224 assert!(result.is_ok());
225 assert_eq!(result.unwrap(), compressed_data);
226 }
227
228 #[test]
229 fn test_read_chunk_with_invalid_crc() {
230 let compressed_data = b"test compressed chunk data";
232 let wrong_crc = 0xDEADBEEFu32;
233 let crc_bytes = wrong_crc.to_be_bytes();
234
235 let mut data = Vec::new();
236 data.extend_from_slice(compressed_data);
237 data.extend_from_slice(&crc_bytes);
238
239 let total_size = data.len() as u64;
240
241 let compression_info = CompressionInfo {
242 algorithm: "LZ4Compressor".to_string(),
243 chunk_length: 16384,
244 data_length: compressed_data.len() as u64,
245 chunk_offsets: vec![0],
246 option_pairs: vec![],
247 max_compressed_length: i32::MAX as u32,
248 };
249
250 let cursor = Cursor::new(data);
251 let mut reader = ChunkReader::new(cursor, compression_info, total_size);
252
253 let result = reader.read_chunk(0);
254 assert!(result.is_err());
255 let err_msg = result.unwrap_err().to_string();
256 assert!(err_msg.contains("CRC32 mismatch"));
257 assert!(err_msg.contains("0xdeadbeef")); }
259
260 #[test]
261 fn test_read_multiple_chunks() {
262 let chunk1_data = b"first chunk data";
264 let chunk1_crc = crc32fast::hash(chunk1_data);
265
266 let chunk2_data = b"second chunk data with more content";
267 let chunk2_crc = crc32fast::hash(chunk2_data);
268
269 let mut data = Vec::new();
270 data.extend_from_slice(chunk1_data);
271 data.extend_from_slice(&chunk1_crc.to_be_bytes());
272 data.extend_from_slice(chunk2_data);
273 data.extend_from_slice(&chunk2_crc.to_be_bytes());
274
275 let chunk1_size = chunk1_data.len() + 4;
276 let total_size = data.len() as u64;
277
278 let compression_info = CompressionInfo {
279 algorithm: "SnappyCompressor".to_string(),
280 chunk_length: 16384,
281 data_length: (chunk1_data.len() + chunk2_data.len()) as u64,
282 chunk_offsets: vec![0, chunk1_size as u64],
283 option_pairs: vec![],
284 max_compressed_length: i32::MAX as u32,
285 };
286
287 let cursor = Cursor::new(data);
288 let mut reader = ChunkReader::new(cursor, compression_info, total_size);
289
290 let result1 = reader.read_chunk(0);
292 assert!(result1.is_ok());
293 assert_eq!(result1.unwrap(), chunk1_data);
294
295 let result2 = reader.read_chunk(1);
297 assert!(result2.is_ok());
298 assert_eq!(result2.unwrap(), chunk2_data);
299 }
300
301 #[test]
302 fn test_read_all_chunks() {
303 let chunks_data = vec![b"chunk1".to_vec(), b"chunk2data".to_vec(), b"c3".to_vec()];
305
306 let mut data = Vec::new();
307 let mut offsets = vec![0u64];
308
309 for chunk in &chunks_data {
310 let crc = crc32fast::hash(chunk);
311 data.extend_from_slice(chunk);
312 data.extend_from_slice(&crc.to_be_bytes());
313 offsets.push(data.len() as u64);
314 }
315 offsets.pop(); let total_size = data.len() as u64;
318 let total_uncompressed = chunks_data.iter().map(|c| c.len()).sum::<usize>() as u64;
319
320 let compression_info = CompressionInfo {
321 algorithm: "LZ4Compressor".to_string(),
322 chunk_length: 16384,
323 data_length: total_uncompressed,
324 chunk_offsets: offsets,
325 option_pairs: vec![],
326 max_compressed_length: i32::MAX as u32,
327 };
328
329 let cursor = Cursor::new(data);
330 let mut reader = ChunkReader::new(cursor, compression_info, total_size);
331
332 let result = reader.read_all_chunks();
333 assert!(result.is_ok());
334
335 let all_chunks = result.unwrap();
336 assert_eq!(all_chunks.len(), 3);
337 assert_eq!(all_chunks[0], chunks_data[0]);
338 assert_eq!(all_chunks[1], chunks_data[1]);
339 assert_eq!(all_chunks[2], chunks_data[2]);
340 }
341
342 #[test]
343 fn test_invalid_chunk_index() {
344 let compressed_data = b"test data";
345 let crc = crc32fast::hash(compressed_data);
346
347 let mut data = Vec::new();
348 data.extend_from_slice(compressed_data);
349 data.extend_from_slice(&crc.to_be_bytes());
350
351 let total_size = data.len() as u64;
352
353 let compression_info = CompressionInfo {
354 algorithm: "LZ4Compressor".to_string(),
355 chunk_length: 16384,
356 data_length: compressed_data.len() as u64,
357 chunk_offsets: vec![0],
358 option_pairs: vec![],
359 max_compressed_length: i32::MAX as u32,
360 };
361
362 let cursor = Cursor::new(data);
363 let mut reader = ChunkReader::new(cursor, compression_info, total_size);
364
365 let result = reader.read_chunk(1);
367 assert!(result.is_err());
368 let err_msg = result.unwrap_err().to_string();
369 assert!(err_msg.contains("Chunk 1 not found"));
370 }
371
372 #[test]
373 fn test_chunk_size_too_small() {
374 let data = vec![0xAB, 0xCD]; let compression_info = CompressionInfo {
378 algorithm: "LZ4Compressor".to_string(),
379 chunk_length: 16384,
380 data_length: 0,
381 chunk_offsets: vec![0],
382 option_pairs: vec![],
383 max_compressed_length: i32::MAX as u32,
384 };
385
386 let cursor = Cursor::new(data.clone());
387 let total_size = data.len() as u64;
388 let mut reader = ChunkReader::new(cursor, compression_info, total_size);
389
390 let result = reader.read_chunk(0);
391 assert!(result.is_err());
392 let err_msg = result.unwrap_err().to_string();
393 assert!(err_msg.contains("size too small"));
394 }
395
396 #[test]
397 fn test_accessor_methods() {
398 let compression_info = CompressionInfo {
399 algorithm: "SnappyCompressor".to_string(),
400 chunk_length: 32768,
401 data_length: 65536,
402 chunk_offsets: vec![0, 16384, 32768],
403 option_pairs: vec![],
404 max_compressed_length: i32::MAX as u32,
405 };
406
407 let cursor = Cursor::new(vec![]);
408 let reader = ChunkReader::new(cursor, compression_info, 0);
409
410 assert_eq!(reader.chunk_count(), 3);
411 assert_eq!(reader.compression_algorithm(), "SnappyCompressor");
412 assert_eq!(reader.chunk_length(), 32768);
413 }
414}