Skip to main content

cqlite_core/storage/sstable/
chunk_reader.rs

1//! Chunk-based reader for NB format Data.db files
2//!
3//! NB format Data.db has NO header - starts directly with compressed data.
4//! Each chunk is followed by a 4-byte CRC32 checksum (big-endian).
5//!
6//! Format: [chunk_0_bytes][CRC32(chunk_0)][chunk_1_bytes][CRC32(chunk_1)]...
7
8use crate::storage::sstable::compression_info::CompressionInfo;
9use crate::{Error, Result};
10use std::io::{Read, Seek, SeekFrom};
11
12/// Reader for chunked Data.db files (NB format)
13///
14/// This reader handles NB format Data.db files where:
15/// - No magic number or header exists (file starts with compressed data)
16/// - Each compressed chunk is followed by a 4-byte CRC32 checksum
17/// - CRC32 uses Java's `java.util.zip.CRC32` algorithm (IEEE polynomial 0x04C11DB7)
18/// - Checksums are stored in big-endian format
19pub struct ChunkReader<R: Read + Seek> {
20    reader: R,
21    compression_info: CompressionInfo,
22    total_file_size: u64,
23}
24
25impl<R: Read + Seek> ChunkReader<R> {
26    /// Create new chunk reader
27    ///
28    /// # Arguments
29    ///
30    /// * `reader` - The underlying reader for Data.db file
31    /// * `compression_info` - Parsed CompressionInfo containing chunk metadata
32    /// * `total_file_size` - Total size of the Data.db file in bytes
33    ///
34    /// # Returns
35    ///
36    /// A new `ChunkReader` instance ready to read chunks
37    pub fn new(reader: R, compression_info: CompressionInfo, total_file_size: u64) -> Self {
38        Self {
39            reader,
40            compression_info,
41            total_file_size,
42        }
43    }
44
45    /// Read and validate a specific chunk by index
46    ///
47    /// This method:
48    /// 1. Seeks to the chunk offset in Data.db
49    /// 2. Reads the compressed chunk bytes
50    /// 3. Reads the trailing 4-byte CRC32 checksum
51    /// 4. Validates the CRC32 (fail-fast on mismatch)
52    ///
53    /// # Arguments
54    ///
55    /// * `chunk_index` - Zero-based index of the chunk to read
56    ///
57    /// # Returns
58    ///
59    /// Compressed chunk bytes ready for decompression
60    ///
61    /// # Errors
62    ///
63    /// Returns an error if:
64    /// - Chunk index is invalid
65    /// - I/O error occurs during reading
66    /// - CRC32 validation fails
67    pub fn read_chunk(&mut self, chunk_index: usize) -> Result<Vec<u8>> {
68        // 1. Get chunk offset from CompressionInfo
69        let offset = self
70            .compression_info
71            .compressed_chunk_offset(chunk_index)
72            .ok_or_else(|| {
73                Error::InvalidFormat(format!(
74                    "Chunk {} not found in CompressionInfo (total chunks: {})",
75                    chunk_index,
76                    self.compression_info.chunk_offsets.len()
77                ))
78            })?;
79
80        // 2. Calculate chunk size (distance to next chunk or end of file)
81        // NOTE: This includes the trailing 4-byte CRC32
82        let total_chunk_size = self
83            .compression_info
84            .compressed_chunk_size(chunk_index, self.total_file_size)
85            .ok_or_else(|| {
86                Error::InvalidFormat(format!(
87                    "Cannot determine size for chunk {} (file_size={})",
88                    chunk_index, self.total_file_size
89                ))
90            })?;
91
92        // 3. Seek to chunk offset in Data.db
93        self.reader.seek(SeekFrom::Start(offset)).map_err(|e| {
94            Error::Io(std::io::Error::new(
95                e.kind(),
96                format!(
97                    "Failed to seek to chunk {} at offset 0x{:x}: {}",
98                    chunk_index, offset, e
99                ),
100            ))
101        })?;
102
103        // 4. Read chunk bytes (NOT including trailing CRC32)
104        // Subtract 4 bytes for trailing CRC
105        if total_chunk_size < 4 {
106            return Err(Error::InvalidFormat(format!(
107                "Chunk {} size too small: {} bytes (minimum 4 for CRC)",
108                chunk_index, total_chunk_size
109            )));
110        }
111
112        let chunk_size = (total_chunk_size - 4) as usize;
113        let mut chunk_data = vec![0u8; chunk_size];
114        self.reader.read_exact(&mut chunk_data).map_err(|e| {
115            Error::Io(std::io::Error::new(
116                e.kind(),
117                format!(
118                    "Failed to read chunk {} data ({} bytes at offset 0x{:x}): {}",
119                    chunk_index, chunk_size, offset, e
120                ),
121            ))
122        })?;
123
124        // 5. Read trailing CRC32 (4 bytes, big-endian)
125        let mut crc_bytes = [0u8; 4];
126        self.reader.read_exact(&mut crc_bytes).map_err(|e| {
127            Error::Io(std::io::Error::new(
128                e.kind(),
129                format!(
130                    "Failed to read CRC32 for chunk {} at offset 0x{:x}: {}",
131                    chunk_index,
132                    offset + chunk_size as u64,
133                    e
134                ),
135            ))
136        })?;
137        let expected_crc = u32::from_be_bytes(crc_bytes);
138
139        // 6. Compute CRC32 of chunk bytes using crc32fast (Java-compatible algorithm)
140        let computed_crc = crc32fast::hash(&chunk_data);
141
142        // 7. Validate CRC (fail-fast on mismatch)
143        if computed_crc != expected_crc {
144            return Err(Error::InvalidFormat(format!(
145                "CRC32 mismatch for chunk {} at offset 0x{:x}: expected=0x{:08x}, computed=0x{:08x}, chunk_size={}",
146                chunk_index, offset, expected_crc, computed_crc, chunk_size
147            )));
148        }
149
150        Ok(chunk_data)
151    }
152
153    /// Read all chunks and validate CRC32 for each
154    ///
155    /// This is a convenience method that reads all chunks in sequential order.
156    ///
157    /// # Returns
158    ///
159    /// A vector of compressed chunk byte arrays, one per chunk
160    ///
161    /// # Errors
162    ///
163    /// Returns an error on the first chunk that fails to read or validate
164    pub fn read_all_chunks(&mut self) -> Result<Vec<Vec<u8>>> {
165        let chunk_count = self.compression_info.chunk_offsets.len();
166        let mut chunks = Vec::with_capacity(chunk_count);
167
168        for i in 0..chunk_count {
169            let chunk = self.read_chunk(i)?;
170            chunks.push(chunk);
171        }
172
173        Ok(chunks)
174    }
175
176    /// Get the number of chunks in this file
177    pub fn chunk_count(&self) -> usize {
178        self.compression_info.chunk_offsets.len()
179    }
180
181    /// Get the compression algorithm name
182    pub fn compression_algorithm(&self) -> &str {
183        &self.compression_info.algorithm
184    }
185
186    /// Get the uncompressed chunk size
187    pub fn chunk_length(&self) -> u32 {
188        self.compression_info.chunk_length
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use std::io::Cursor;
196
197    #[test]
198    fn test_read_chunk_with_valid_crc() {
199        // Create synthetic NB format data: [compressed_bytes][CRC32]
200        let compressed_data = b"test compressed chunk data";
201        let crc = crc32fast::hash(compressed_data);
202        let crc_bytes = crc.to_be_bytes();
203
204        let mut data = Vec::new();
205        data.extend_from_slice(compressed_data);
206        data.extend_from_slice(&crc_bytes);
207
208        let total_size = data.len() as u64;
209
210        // Create mock CompressionInfo
211        let compression_info = CompressionInfo {
212            algorithm: "LZ4Compressor".to_string(),
213            chunk_length: 16384,
214            data_length: compressed_data.len() as u64,
215            chunk_offsets: vec![0],
216            option_pairs: vec![],
217            max_compressed_length: i32::MAX as u32,
218        };
219
220        let cursor = Cursor::new(data);
221        let mut reader = ChunkReader::new(cursor, compression_info, total_size);
222
223        let result = reader.read_chunk(0);
224        assert!(result.is_ok());
225        assert_eq!(result.unwrap(), compressed_data);
226    }
227
228    #[test]
229    fn test_read_chunk_with_invalid_crc() {
230        // Create data with WRONG CRC
231        let compressed_data = b"test compressed chunk data";
232        let wrong_crc = 0xDEADBEEFu32;
233        let crc_bytes = wrong_crc.to_be_bytes();
234
235        let mut data = Vec::new();
236        data.extend_from_slice(compressed_data);
237        data.extend_from_slice(&crc_bytes);
238
239        let total_size = data.len() as u64;
240
241        let compression_info = CompressionInfo {
242            algorithm: "LZ4Compressor".to_string(),
243            chunk_length: 16384,
244            data_length: compressed_data.len() as u64,
245            chunk_offsets: vec![0],
246            option_pairs: vec![],
247            max_compressed_length: i32::MAX as u32,
248        };
249
250        let cursor = Cursor::new(data);
251        let mut reader = ChunkReader::new(cursor, compression_info, total_size);
252
253        let result = reader.read_chunk(0);
254        assert!(result.is_err());
255        let err_msg = result.unwrap_err().to_string();
256        assert!(err_msg.contains("CRC32 mismatch"));
257        assert!(err_msg.contains("0xdeadbeef")); // Verify expected CRC is in error
258    }
259
260    #[test]
261    fn test_read_multiple_chunks() {
262        // Create two chunks with valid CRCs
263        let chunk1_data = b"first chunk data";
264        let chunk1_crc = crc32fast::hash(chunk1_data);
265
266        let chunk2_data = b"second chunk data with more content";
267        let chunk2_crc = crc32fast::hash(chunk2_data);
268
269        let mut data = Vec::new();
270        data.extend_from_slice(chunk1_data);
271        data.extend_from_slice(&chunk1_crc.to_be_bytes());
272        data.extend_from_slice(chunk2_data);
273        data.extend_from_slice(&chunk2_crc.to_be_bytes());
274
275        let chunk1_size = chunk1_data.len() + 4;
276        let total_size = data.len() as u64;
277
278        let compression_info = CompressionInfo {
279            algorithm: "SnappyCompressor".to_string(),
280            chunk_length: 16384,
281            data_length: (chunk1_data.len() + chunk2_data.len()) as u64,
282            chunk_offsets: vec![0, chunk1_size as u64],
283            option_pairs: vec![],
284            max_compressed_length: i32::MAX as u32,
285        };
286
287        let cursor = Cursor::new(data);
288        let mut reader = ChunkReader::new(cursor, compression_info, total_size);
289
290        // Read first chunk
291        let result1 = reader.read_chunk(0);
292        assert!(result1.is_ok());
293        assert_eq!(result1.unwrap(), chunk1_data);
294
295        // Read second chunk
296        let result2 = reader.read_chunk(1);
297        assert!(result2.is_ok());
298        assert_eq!(result2.unwrap(), chunk2_data);
299    }
300
301    #[test]
302    fn test_read_all_chunks() {
303        // Create three chunks
304        let chunks_data = vec![b"chunk1".to_vec(), b"chunk2data".to_vec(), b"c3".to_vec()];
305
306        let mut data = Vec::new();
307        let mut offsets = vec![0u64];
308
309        for chunk in &chunks_data {
310            let crc = crc32fast::hash(chunk);
311            data.extend_from_slice(chunk);
312            data.extend_from_slice(&crc.to_be_bytes());
313            offsets.push(data.len() as u64);
314        }
315        offsets.pop(); // Remove last offset (beyond file)
316
317        let total_size = data.len() as u64;
318        let total_uncompressed = chunks_data.iter().map(|c| c.len()).sum::<usize>() as u64;
319
320        let compression_info = CompressionInfo {
321            algorithm: "LZ4Compressor".to_string(),
322            chunk_length: 16384,
323            data_length: total_uncompressed,
324            chunk_offsets: offsets,
325            option_pairs: vec![],
326            max_compressed_length: i32::MAX as u32,
327        };
328
329        let cursor = Cursor::new(data);
330        let mut reader = ChunkReader::new(cursor, compression_info, total_size);
331
332        let result = reader.read_all_chunks();
333        assert!(result.is_ok());
334
335        let all_chunks = result.unwrap();
336        assert_eq!(all_chunks.len(), 3);
337        assert_eq!(all_chunks[0], chunks_data[0]);
338        assert_eq!(all_chunks[1], chunks_data[1]);
339        assert_eq!(all_chunks[2], chunks_data[2]);
340    }
341
342    #[test]
343    fn test_invalid_chunk_index() {
344        let compressed_data = b"test data";
345        let crc = crc32fast::hash(compressed_data);
346
347        let mut data = Vec::new();
348        data.extend_from_slice(compressed_data);
349        data.extend_from_slice(&crc.to_be_bytes());
350
351        let total_size = data.len() as u64;
352
353        let compression_info = CompressionInfo {
354            algorithm: "LZ4Compressor".to_string(),
355            chunk_length: 16384,
356            data_length: compressed_data.len() as u64,
357            chunk_offsets: vec![0],
358            option_pairs: vec![],
359            max_compressed_length: i32::MAX as u32,
360        };
361
362        let cursor = Cursor::new(data);
363        let mut reader = ChunkReader::new(cursor, compression_info, total_size);
364
365        // Try to read chunk that doesn't exist
366        let result = reader.read_chunk(1);
367        assert!(result.is_err());
368        let err_msg = result.unwrap_err().to_string();
369        assert!(err_msg.contains("Chunk 1 not found"));
370    }
371
372    #[test]
373    fn test_chunk_size_too_small() {
374        // Create a chunk that's smaller than the CRC (invalid)
375        let data = vec![0xAB, 0xCD]; // Only 2 bytes, need at least 4
376
377        let compression_info = CompressionInfo {
378            algorithm: "LZ4Compressor".to_string(),
379            chunk_length: 16384,
380            data_length: 0,
381            chunk_offsets: vec![0],
382            option_pairs: vec![],
383            max_compressed_length: i32::MAX as u32,
384        };
385
386        let cursor = Cursor::new(data.clone());
387        let total_size = data.len() as u64;
388        let mut reader = ChunkReader::new(cursor, compression_info, total_size);
389
390        let result = reader.read_chunk(0);
391        assert!(result.is_err());
392        let err_msg = result.unwrap_err().to_string();
393        assert!(err_msg.contains("size too small"));
394    }
395
396    #[test]
397    fn test_accessor_methods() {
398        let compression_info = CompressionInfo {
399            algorithm: "SnappyCompressor".to_string(),
400            chunk_length: 32768,
401            data_length: 65536,
402            chunk_offsets: vec![0, 16384, 32768],
403            option_pairs: vec![],
404            max_compressed_length: i32::MAX as u32,
405        };
406
407        let cursor = Cursor::new(vec![]);
408        let reader = ChunkReader::new(cursor, compression_info, 0);
409
410        assert_eq!(reader.chunk_count(), 3);
411        assert_eq!(reader.compression_algorithm(), "SnappyCompressor");
412        assert_eq!(reader.chunk_length(), 32768);
413    }
414}