Skip to main content

cqlite_core/storage/sstable/
compression_info.rs

1//! CompressionInfo.db file parser for SSTable compression metadata
2//!
3//! This module provides deterministic parsing of Cassandra's CompressionInfo.db files
4//! which contain the metadata needed to decompress chunks from compressed Data.db files.
5//!
6//! ## Binary Format (Cassandra NB / 5.0, CompressionMetadata.java:375-392)
7//!
8//! ```text
9//! writeUTF(compressor_simple_name)     // 2-byte BE length + UTF-8 bytes
10//! writeInt(option_count)               // 4 bytes BE
11//! for each option:
12//!     writeUTF(key)                    // 2-byte BE length + UTF-8 bytes
13//!     writeUTF(value)                  // 2-byte BE length + UTF-8 bytes
14//! writeInt(chunk_length)               // 4 bytes BE — uncompressed chunk size
15//! writeInt(max_compressed_length)      // 4 bytes BE — present when version >= "na" (all 5.0 files)
16//! writeLong(data_length)               // 8 bytes BE — total uncompressed data length
17//! writeInt(chunk_count)                // 4 bytes BE
18//! for each chunk:
19//!     writeLong(chunk_offset)          // 8 bytes BE — byte offset in Data.db
20//! ```
21//!
22//! ## Note on CRCs
23//!
24//! Per-chunk CRC32 checksums are stored INLINE in Data.db, not in CompressionInfo.db.
25//! Each compressed chunk in Data.db is followed by a 4-byte little-endian CRC32 of the
26//! compressed bytes. See: CompressedSequentialWriter.java:192.
27//! There is NO trailing metadata CRC in CompressionInfo.db.
28
29use crate::{Error, Result};
30use std::io::{Cursor, Read};
31
32/// CompressionInfo.db file content parsed from binary format
33#[derive(Debug, Clone)]
34pub struct CompressionInfo {
35    /// Compression algorithm simple name (e.g., "LZ4Compressor", "SnappyCompressor")
36    pub algorithm: String,
37    /// Optional compression parameters from CompressionInfo.db
38    /// Key-value pairs as written by CompressionMetadata.writeHeader()
39    pub option_pairs: Vec<(String, String)>,
40    /// Size of uncompressed data chunks (bytes)
41    pub chunk_length: u32,
42    /// Maximum compressed chunk length; if a compressed chunk reaches this size, the chunk
43    /// was stored uncompressed in Data.db instead. Equals i32::MAX when minCompressRatio=0
44    /// (the default). Source: CompressionParams.java:186-189.
45    pub max_compressed_length: u32,
46    /// Total uncompressed data length (bytes)
47    pub data_length: u64,
48    /// List of compressed chunk offsets in Data.db file
49    ///
50    /// Each offset points to the start of a compressed-chunk record. The record consists of:
51    ///   [compressed_bytes][4-byte CRC32 of compressed_bytes]
52    /// The delta between consecutive offsets therefore includes the trailing 4-byte CRC.
53    /// See: CompressedSequentialWriter.java:203 `chunkOffset += compressedLength + 4`
54    pub chunk_offsets: Vec<u64>,
55    // NOTE: per-chunk CRC32 values are NOT stored in CompressionInfo.db.
56    // They are written inline in Data.db after each compressed chunk.
57    // CompressedSequentialWriter.java:192: crcMetadata.appendDirect(toWrite, true)
58}
59
60/// Read a Java-style writeUTF string: 2-byte BE length followed by UTF-8 bytes
61fn read_utf(cursor: &mut Cursor<&[u8]>) -> Result<String> {
62    let mut len_bytes = [0u8; 2];
63    cursor
64        .read_exact(&mut len_bytes)
65        .map_err(|e| Error::InvalidFormat(format!("Failed to read writeUTF length: {}", e)))?;
66    let len = u16::from_be_bytes(len_bytes) as usize;
67
68    let mut string_bytes = vec![0u8; len];
69    cursor.read_exact(&mut string_bytes).map_err(|e| {
70        Error::InvalidFormat(format!(
71            "Failed to read writeUTF bytes (len={}): {}",
72            len, e
73        ))
74    })?;
75
76    String::from_utf8(string_bytes)
77        .map_err(|e| Error::InvalidFormat(format!("Invalid UTF-8 in writeUTF string: {}", e)))
78}
79
80/// Read a 4-byte big-endian u32
81fn read_u32(cursor: &mut Cursor<&[u8]>, field: &str) -> Result<u32> {
82    let mut bytes = [0u8; 4];
83    cursor
84        .read_exact(&mut bytes)
85        .map_err(|e| Error::InvalidFormat(format!("Failed to read {} (u32 BE): {}", field, e)))?;
86    Ok(u32::from_be_bytes(bytes))
87}
88
89/// Read an 8-byte big-endian u64
90fn read_u64(cursor: &mut Cursor<&[u8]>, field: &str) -> Result<u64> {
91    let mut bytes = [0u8; 8];
92    cursor
93        .read_exact(&mut bytes)
94        .map_err(|e| Error::InvalidFormat(format!("Failed to read {} (u64 BE): {}", field, e)))?;
95    Ok(u64::from_be_bytes(bytes))
96}
97
98impl CompressionInfo {
99    /// Parse CompressionInfo.db file from binary data.
100    ///
101    /// Implements the deterministic layout from CompressionMetadata.java:375-392.
102    /// No heuristics — every field is read at its authoritative position.
103    pub fn parse(data: &[u8]) -> Result<Self> {
104        if data.is_empty() {
105            return Err(Error::InvalidFormat(
106                "Empty compression info data".to_string(),
107            ));
108        }
109
110        let mut cursor = Cursor::new(data);
111
112        // 1. writeUTF(compressor simple name)
113        let algorithm = read_utf(&mut cursor)?;
114        if algorithm.is_empty() {
115            return Err(Error::InvalidFormat(
116                "Compressor simple name is empty".to_string(),
117            ));
118        }
119
120        // 2. writeInt(option_count)
121        let option_count = read_u32(&mut cursor, "option_count")?;
122        if option_count > 256 {
123            return Err(Error::InvalidFormat(format!(
124                "Unreasonably large option_count: {} (max 256)",
125                option_count
126            )));
127        }
128
129        // 3. option_count × (writeUTF key + writeUTF value)
130        let mut option_pairs = Vec::with_capacity(option_count as usize);
131        for i in 0..option_count {
132            let key = read_utf(&mut cursor).map_err(|e| {
133                Error::InvalidFormat(format!("Failed to read option key {}: {}", i, e))
134            })?;
135            let value = read_utf(&mut cursor).map_err(|e| {
136                Error::InvalidFormat(format!("Failed to read option value {}: {}", i, e))
137            })?;
138            option_pairs.push((key, value));
139        }
140
141        // 4. writeInt(chunk_length)
142        let chunk_length = read_u32(&mut cursor, "chunk_length")?;
143        if chunk_length == 0 {
144            return Err(Error::InvalidFormat(
145                "chunk_length cannot be zero".to_string(),
146            ));
147        }
148        if chunk_length > 256 * 1024 * 1024 {
149            return Err(Error::InvalidFormat(format!(
150                "chunk_length too large: {} bytes (max 256 MiB)",
151                chunk_length
152            )));
153        }
154
155        // 5. writeInt(max_compressed_length) — present for all Cassandra 5.0 (version >= "na") files
156        let max_compressed_length = read_u32(&mut cursor, "max_compressed_length")?;
157
158        // 6. writeLong(data_length)
159        let data_length = read_u64(&mut cursor, "data_length")?;
160
161        // 7. writeInt(chunk_count)
162        let chunk_count = read_u32(&mut cursor, "chunk_count")? as usize;
163        if chunk_count == 0 {
164            return Err(Error::InvalidFormat(
165                "chunk_count cannot be zero".to_string(),
166            ));
167        }
168        if chunk_count > 1_000_000 {
169            return Err(Error::InvalidFormat(format!(
170                "chunk_count too large: {} (max 1,000,000)",
171                chunk_count
172            )));
173        }
174
175        // 8. chunk_count × writeLong(chunk_offset)
176        let mut chunk_offsets = Vec::with_capacity(chunk_count);
177        for i in 0..chunk_count {
178            let offset = read_u64(&mut cursor, &format!("chunk_offset[{}]", i))?;
179            chunk_offsets.push(offset);
180        }
181
182        let info = CompressionInfo {
183            algorithm,
184            option_pairs,
185            chunk_length,
186            max_compressed_length,
187            data_length,
188            chunk_offsets,
189        };
190
191        info.validate()?;
192        Ok(info)
193    }
194
195    /// Get the chunk index for a given offset in the uncompressed data
196    pub fn chunk_for_offset(&self, offset: u64) -> usize {
197        (offset / self.chunk_length as u64) as usize
198    }
199
200    /// Get the offset within a chunk for a given global offset
201    pub fn offset_within_chunk(&self, offset: u64) -> u64 {
202        offset % self.chunk_length as u64
203    }
204
205    /// Get the compressed chunk offset for a given chunk index
206    pub fn compressed_chunk_offset(&self, chunk_index: usize) -> Option<u64> {
207        self.chunk_offsets.get(chunk_index).copied()
208    }
209
210    /// Get the size of a compressed-chunk record (delta between consecutive offsets or
211    /// end-of-file), INCLUDING the 4-byte trailing CRC appended inline in Data.db.
212    ///
213    /// To get the actual compressed payload size, subtract 4 from the returned value.
214    /// See: CompressedSequentialWriter.java:203 `chunkOffset += compressedLength + 4`
215    pub fn compressed_chunk_size(
216        &self,
217        chunk_index: usize,
218        total_compressed_size: u64,
219    ) -> Option<u64> {
220        let start_offset = self.compressed_chunk_offset(chunk_index)?;
221
222        if chunk_index + 1 < self.chunk_offsets.len() {
223            let next_offset = self.chunk_offsets[chunk_index + 1];
224            Some(next_offset - start_offset)
225        } else {
226            Some(total_compressed_size - start_offset)
227        }
228    }
229
230    /// Validate the compression info structure
231    pub fn validate(&self) -> Result<()> {
232        if self.algorithm.is_empty() {
233            return Err(Error::InvalidFormat(
234                "Empty compression algorithm".to_string(),
235            ));
236        }
237        if self.chunk_length == 0 {
238            return Err(Error::InvalidFormat("Zero chunk length".to_string()));
239        }
240        if self.chunk_offsets.is_empty() {
241            return Err(Error::InvalidFormat("No chunk offsets".to_string()));
242        }
243        // Offsets must be strictly ascending
244        for i in 1..self.chunk_offsets.len() {
245            if self.chunk_offsets[i] <= self.chunk_offsets[i - 1] {
246                return Err(Error::InvalidFormat(format!(
247                    "Chunk offsets not in ascending order: offsets[{}]={} <= offsets[{}]={}",
248                    i,
249                    self.chunk_offsets[i],
250                    i - 1,
251                    self.chunk_offsets[i - 1]
252                )));
253            }
254        }
255        Ok(())
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    /// Build a minimal valid Cassandra CompressionInfo.db blob with no options.
264    fn make_compression_info_blob(
265        algorithm: &str,
266        options: &[(&str, &str)],
267        chunk_length: u32,
268        max_compressed_length: u32,
269        data_length: u64,
270        offsets: &[u64],
271    ) -> Vec<u8> {
272        let mut data = Vec::new();
273
274        // writeUTF(algorithm)
275        let name_bytes = algorithm.as_bytes();
276        data.extend_from_slice(&(name_bytes.len() as u16).to_be_bytes());
277        data.extend_from_slice(name_bytes);
278
279        // writeInt(option_count)
280        data.extend_from_slice(&(options.len() as u32).to_be_bytes());
281
282        // option key-value pairs
283        for (k, v) in options {
284            let kb = k.as_bytes();
285            data.extend_from_slice(&(kb.len() as u16).to_be_bytes());
286            data.extend_from_slice(kb);
287            let vb = v.as_bytes();
288            data.extend_from_slice(&(vb.len() as u16).to_be_bytes());
289            data.extend_from_slice(vb);
290        }
291
292        // writeInt(chunk_length)
293        data.extend_from_slice(&chunk_length.to_be_bytes());
294
295        // writeInt(max_compressed_length)
296        data.extend_from_slice(&max_compressed_length.to_be_bytes());
297
298        // writeLong(data_length)
299        data.extend_from_slice(&data_length.to_be_bytes());
300
301        // writeInt(chunk_count)
302        data.extend_from_slice(&(offsets.len() as u32).to_be_bytes());
303
304        // chunk offsets
305        for &off in offsets {
306            data.extend_from_slice(&off.to_be_bytes());
307        }
308
309        data
310    }
311
312    #[test]
313    fn test_parse_no_options() {
314        let blob = make_compression_info_blob(
315            "LZ4Compressor",
316            &[],
317            16384,
318            i32::MAX as u32,
319            32768,
320            &[0, 8200],
321        );
322
323        let info = CompressionInfo::parse(&blob).expect("parse should succeed");
324        assert_eq!(info.algorithm, "LZ4Compressor");
325        assert!(info.option_pairs.is_empty());
326        assert_eq!(info.chunk_length, 16384);
327        assert_eq!(info.max_compressed_length, i32::MAX as u32);
328        assert_eq!(info.data_length, 32768);
329        assert_eq!(info.chunk_offsets, vec![0, 8200]);
330    }
331
332    #[test]
333    fn test_parse_with_options() {
334        // Regression test for Bug #638: old heuristic parser skipped option_count bytes
335        // as "padding" and misread chunk_length when options were present.
336        let blob = make_compression_info_blob(
337            "LZ4Compressor",
338            &[("compression_level", "9")],
339            16384,
340            i32::MAX as u32,
341            16384,
342            &[0],
343        );
344
345        let info = CompressionInfo::parse(&blob).expect(
346            "Bug #638 repro: parser must handle option_count > 0 deterministically, \
347             not skip 4 bytes as padding",
348        );
349        assert_eq!(info.algorithm, "LZ4Compressor");
350        assert_eq!(info.option_pairs.len(), 1);
351        assert_eq!(info.option_pairs[0].0, "compression_level");
352        assert_eq!(info.option_pairs[0].1, "9");
353        assert_eq!(info.chunk_length, 16384);
354        assert_eq!(info.max_compressed_length, i32::MAX as u32);
355    }
356
357    #[test]
358    fn test_parse_exposes_max_compressed_length() {
359        // Regression test for Bug #638: old struct had no max_compressed_length field,
360        // making the incompressible-chunk fallback impossible to implement.
361        let blob = make_compression_info_blob(
362            "SnappyCompressor",
363            &[],
364            16384,
365            i32::MAX as u32,
366            16384,
367            &[0],
368        );
369
370        let info = CompressionInfo::parse(&blob).expect("parse should succeed");
371        // max_compressed_length must be accessible on the parsed struct
372        assert_eq!(
373            info.max_compressed_length,
374            i32::MAX as u32,
375            "Bug #638: max_compressed_length field must be exposed for incompressible-chunk fallback"
376        );
377    }
378
379    #[test]
380    fn test_parse_real_snappy_fixture() {
381        // Parse a real CompressionInfo.db from test fixtures and verify sensible values.
382        // This fixture has 0 options (SnappyCompressor with default settings).
383        let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
384            .parent()
385            .unwrap()
386            .join("test-data/datasets/sstables/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-CompressionInfo.db");
387
388        if !fixture_path.exists() {
389            println!("Skipping real-fixture test: {}", fixture_path.display());
390            return;
391        }
392
393        let data = std::fs::read(&fixture_path).expect("read fixture");
394        let info = CompressionInfo::parse(&data).expect("parse real CompressionInfo.db");
395
396        assert_eq!(info.algorithm, "SnappyCompressor");
397        assert_eq!(info.chunk_length, 16384);
398        assert_eq!(info.max_compressed_length, i32::MAX as u32);
399        assert!(!info.chunk_offsets.is_empty());
400        // Offsets must be strictly increasing
401        for i in 1..info.chunk_offsets.len() {
402            assert!(
403                info.chunk_offsets[i] > info.chunk_offsets[i - 1],
404                "offsets must be increasing"
405            );
406        }
407        // No CRC bytes in CompressionInfo.db: file ends exactly after offsets
408        let expected_size: usize = 2
409            + info.algorithm.len()
410            + 4 // option_count
411            + 4 // chunk_length
412            + 4 // max_compressed_length
413            + 8 // data_length
414            + 4 // chunk_count
415            + info.chunk_offsets.len() * 8;
416        assert_eq!(
417            data.len(),
418            expected_size,
419            "CompressionInfo.db must end immediately after offsets — no CRC bytes appended"
420        );
421    }
422
423    #[test]
424    fn test_chunk_calculations() {
425        let info = CompressionInfo {
426            algorithm: "LZ4Compressor".to_string(),
427            option_pairs: vec![],
428            chunk_length: 16384,
429            max_compressed_length: i32::MAX as u32,
430            data_length: 32768,
431            chunk_offsets: vec![0, 8192],
432        };
433
434        assert_eq!(info.chunk_for_offset(0), 0);
435        assert_eq!(info.chunk_for_offset(16384), 1);
436        assert_eq!(info.offset_within_chunk(100), 100);
437        assert_eq!(info.offset_within_chunk(16484), 100);
438
439        assert_eq!(info.compressed_chunk_offset(0), Some(0));
440        assert_eq!(info.compressed_chunk_offset(1), Some(8192));
441
442        assert_eq!(info.compressed_chunk_size(0, 20000), Some(8192));
443        assert_eq!(info.compressed_chunk_size(1, 20000), Some(11808));
444    }
445
446    #[test]
447    fn test_validation() {
448        let valid_info = CompressionInfo {
449            algorithm: "LZ4Compressor".to_string(),
450            option_pairs: vec![],
451            chunk_length: 16384,
452            max_compressed_length: i32::MAX as u32,
453            data_length: 32768,
454            chunk_offsets: vec![0, 8192],
455        };
456        assert!(valid_info.validate().is_ok());
457
458        // Empty algorithm
459        let invalid = CompressionInfo {
460            algorithm: "".to_string(),
461            option_pairs: vec![],
462            chunk_length: 16384,
463            max_compressed_length: i32::MAX as u32,
464            data_length: 0,
465            chunk_offsets: vec![0],
466        };
467        assert!(invalid.validate().is_err());
468
469        // Non-ascending offsets
470        let invalid2 = CompressionInfo {
471            algorithm: "LZ4Compressor".to_string(),
472            option_pairs: vec![],
473            chunk_length: 16384,
474            max_compressed_length: i32::MAX as u32,
475            data_length: 32768,
476            chunk_offsets: vec![8192, 0],
477        };
478        assert!(invalid2.validate().is_err());
479    }
480}