cqlite-core 0.11.0

Core engine for CQLite — read Apache Cassandra 5.0 SSTables locally without a cluster
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
//! Bulletproof chunk-based decompression for SSTable Data.db files
//!
//! This module implements the proper decompression strategy for Cassandra SSTable files
//! using CompressionInfo.db metadata to decompress chunks on-demand.

use super::compression_info::CompressionInfo;
use crate::parser::header::CassandraVersion;
use crate::{Error, Result};
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom};

/// Chunk-based decompressor for SSTable Data.db files
pub struct ChunkDecompressor {
    /// Compression metadata from CompressionInfo.db
    compression_info: CompressionInfo,
    /// Cache of decompressed chunks
    chunk_cache: HashMap<usize, Vec<u8>>,
    /// Maximum number of chunks to cache
    max_cached_chunks: usize,
    /// Data file path for error reporting
    data_file_path: Option<String>,
}

impl ChunkDecompressor {
    /// Create a new chunk decompressor with compression metadata.
    ///
    /// The `cassandra_version` parameter is accepted for API compatibility but is no longer
    /// used: the NB format (all Cassandra 5.0 files) always has inline CRC32 in Data.db
    /// regardless of version, which is handled deterministically in decompress_chunk().
    pub fn new(
        compression_info: CompressionInfo,
        _cassandra_version: CassandraVersion,
    ) -> Result<Self> {
        compression_info.validate()?;

        Ok(Self {
            compression_info,
            chunk_cache: HashMap::new(),
            max_cached_chunks: 16, // Cache up to 16 chunks (16 * 16KB = 256KB max memory)
            data_file_path: None,
        })
    }

    /// Create a new chunk decompressor with file path for enhanced error reporting.
    ///
    /// See `new()` for notes on `cassandra_version`.
    pub fn new_with_path(
        compression_info: CompressionInfo,
        _cassandra_version: CassandraVersion,
        data_file_path: String,
    ) -> Result<Self> {
        compression_info.validate()?;

        Ok(Self {
            compression_info,
            chunk_cache: HashMap::new(),
            max_cached_chunks: 16,
            data_file_path: Some(data_file_path),
        })
    }

    /// Read data from compressed SSTable at specified offset and length
    pub fn read_data<R: Read + Seek>(
        &mut self,
        reader: &mut R,
        offset: u64,
        length: usize,
    ) -> Result<Vec<u8>> {
        let mut result = Vec::with_capacity(length);
        let mut remaining = length;
        let mut current_offset = offset;

        while remaining > 0 {
            // Determine which chunk contains this offset
            let chunk_index = self.compression_info.chunk_for_offset(current_offset);
            let offset_in_chunk = self.compression_info.offset_within_chunk(current_offset);

            // Get the decompressed chunk
            let chunk_data = self.get_decompressed_chunk(reader, chunk_index)?;

            // Extract the requested data from this chunk
            let chunk_start = offset_in_chunk as usize;
            let chunk_end = std::cmp::min(chunk_start + remaining, chunk_data.len());

            if chunk_start >= chunk_data.len() {
                return Err(Error::InvalidFormat(format!(
                    "Offset {} beyond chunk {} size {}",
                    chunk_start,
                    chunk_index,
                    chunk_data.len()
                )));
            }

            let chunk_slice = &chunk_data[chunk_start..chunk_end];
            result.extend_from_slice(chunk_slice);

            let bytes_read = chunk_slice.len();
            remaining -= bytes_read;
            current_offset += bytes_read as u64;
        }

        Ok(result)
    }

    /// Get a decompressed chunk, using cache if available
    fn get_decompressed_chunk<R: Read + Seek>(
        &mut self,
        reader: &mut R,
        chunk_index: usize,
    ) -> Result<Vec<u8>> {
        // Check cache first
        if let Some(cached_chunk) = self.chunk_cache.get(&chunk_index) {
            return Ok(cached_chunk.clone());
        }

        // Decompress the chunk
        let chunk_data = self.decompress_chunk(reader, chunk_index)?;

        // Add to cache (with LRU eviction if necessary)
        if self.chunk_cache.len() >= self.max_cached_chunks {
            // Simple eviction: remove first entry
            if let Some(first_key) = self.chunk_cache.keys().next().copied() {
                self.chunk_cache.remove(&first_key);
            }
        }

        self.chunk_cache.insert(chunk_index, chunk_data.clone());
        Ok(chunk_data)
    }

    /// Decompress a specific chunk from the compressed data file
    fn decompress_chunk<R: Read + Seek>(
        &self,
        reader: &mut R,
        chunk_index: usize,
    ) -> Result<Vec<u8>> {
        // Get compressed chunk offset
        let compressed_offset = self
            .compression_info
            .compressed_chunk_offset(chunk_index)
            .ok_or_else(|| Error::InvalidFormat(format!("No offset for chunk {}", chunk_index)))?;

        // Determine record size (compressed payload + 4-byte inline CRC) using file size
        let current_pos = reader.stream_position().map_err(Error::Io)?;
        let file_size = reader.seek(SeekFrom::End(0)).map_err(Error::Io)?;
        reader
            .seek(SeekFrom::Start(current_pos))
            .map_err(Error::Io)?;

        // compressed_chunk_size returns the full record delta including the 4-byte inline CRC.
        // CompressedSequentialWriter.java:203: chunkOffset += compressedLength + 4
        let record_size = self
            .compression_info
            .compressed_chunk_size(chunk_index, file_size)
            .ok_or_else(|| {
                Error::InvalidFormat(format!("Cannot determine size for chunk {}", chunk_index))
            })?;

        // Bug #639 fix: subtract the 4-byte inline CRC from the delta.
        // The old code passed all (delta) bytes to the decompressor, which included the
        // trailing CRC and caused decompression failures on well-formed chunks.
        if record_size < 4 {
            return Err(Error::InvalidFormat(format!(
                "Chunk {} record size {} is too small (minimum 4 bytes for inline CRC)",
                chunk_index, record_size
            )));
        }
        let compressed_len = (record_size - 4) as usize;

        // Seek to compressed chunk offset and read compressed payload only
        reader
            .seek(SeekFrom::Start(compressed_offset))
            .map_err(Error::Io)?;

        let mut compressed_data = vec![0u8; compressed_len];
        reader.read_exact(&mut compressed_data).map_err(Error::Io)?;

        // Read the 4-byte inline CRC32 (big-endian) and validate it over the compressed bytes.
        // Authority: CompressedSequentialWriter.java:192 + read path lines 275-282.
        let mut crc_bytes = [0u8; 4];
        reader.read_exact(&mut crc_bytes).map_err(Error::Io)?;
        let stored_crc = u32::from_be_bytes(crc_bytes);
        let computed_crc = crc32fast::hash(&compressed_data);
        if stored_crc != computed_crc {
            let file_info = match &self.data_file_path {
                Some(path) => format!(" in file {}", path),
                None => String::new(),
            };
            return Err(Error::InvalidFormat(format!(
                "CRC32 mismatch for chunk {} at offset 0x{:x}{}: stored=0x{:08x}, computed=0x{:08x}, compressed_len={}",
                chunk_index, compressed_offset, file_info, stored_crc, computed_crc, compressed_len
            )));
        }

        log::debug!(
            "Reading chunk {} at offset {} ({} bytes compressed, CRC OK)",
            chunk_index,
            compressed_offset,
            compressed_len
        );

        // Incompressible-chunk fallback (Bug #639):
        // When compressedLength >= maxCompressedLength, Cassandra stored the chunk uncompressed.
        // CompressedSequentialWriter.java:160-177: if compressedLen >= maxCompressedLen, use raw buffer.
        let max_compressed_length = self.compression_info.max_compressed_length as usize;
        if compressed_len >= max_compressed_length {
            log::debug!(
                "Chunk {} is incompressible (compressed_len={} >= max_compressed_length={}), returning raw bytes",
                chunk_index, compressed_len, max_compressed_length
            );
            return Ok(compressed_data);
        }

        // Decompress based on algorithm
        let decompressed = match self.compression_info.algorithm.as_str() {
            "LZ4Compressor" => self.decompress_lz4_chunk(&compressed_data, chunk_index),
            "SnappyCompressor" => self.decompress_snappy_chunk(&compressed_data, chunk_index),
            "DeflateCompressor" => self.decompress_deflate_chunk(&compressed_data, chunk_index),
            "ZstdCompressor" => self.decompress_zstd_chunk(&compressed_data, chunk_index),
            algorithm => Err(Error::UnsupportedFormat(format!(
                "Unknown compression algorithm: {}",
                algorithm
            ))),
        }?;

        // Validate decompressed data size matches expected chunk length
        // (for all chunks except possibly the last one)
        if chunk_index < self.compression_info.chunk_offsets.len() - 1 {
            let expected_size = self.compression_info.chunk_length as usize;
            if decompressed.len() != expected_size {
                return Err(Error::InvalidFormat(format!(
                    "Decompressed chunk {} size mismatch: expected {}, got {}",
                    chunk_index,
                    expected_size,
                    decompressed.len()
                )));
            }
        }

        Ok(decompressed)
    }

    /// Decompress LZ4 chunk - Cassandra uses 4-byte little-endian length prefix
    fn decompress_lz4_chunk(&self, compressed_data: &[u8], chunk_index: usize) -> Result<Vec<u8>> {
        let file_info = match &self.data_file_path {
            Some(path) => format!(" in file {}", path),
            None => String::new(),
        };

        if compressed_data.len() < 4 {
            return Err(Error::InvalidFormat(format!(
                "LZ4 compressed data too short for chunk {}{} (need at least 4 bytes for length prefix, got {})",
                chunk_index, file_info, compressed_data.len()
            )));
        }

        // CRITICAL: Cassandra's LZ4Compressor prepends a 4-byte little-endian length prefix
        // See: org.apache.cassandra.io.compress.LZ4Compressor.decompress() lines 169-172
        // This is NOT the lz4_flex size-prepended format (which uses varint encoding)
        let decompressed_length = u32::from_le_bytes([
            compressed_data[0],
            compressed_data[1],
            compressed_data[2],
            compressed_data[3],
        ]) as usize;

        // Validate the decompressed length against expected chunk size
        let expected_size = self.compression_info.chunk_length as usize;

        // For all chunks except possibly the last one, decompressed length should match chunk_length
        if chunk_index < self.compression_info.chunk_offsets.len() - 1
            && decompressed_length != expected_size
        {
            return Err(Error::InvalidFormat(format!(
                "LZ4 length prefix mismatch for chunk {} at offset 0x{:x}: expected {}, got {} (first 4 bytes: {:02x} {:02x} {:02x} {:02x}){}",
                chunk_index,
                self.compression_info
                    .chunk_offsets
                    .get(chunk_index)
                    .unwrap_or(&0),
                expected_size,
                decompressed_length,
                compressed_data[0],
                compressed_data[1],
                compressed_data[2],
                compressed_data[3],
                file_info
            )));
        }

        // Skip the 4-byte length prefix and decompress the actual LZ4 data
        let lz4_data = &compressed_data[4..];

        match lz4_flex::decompress(lz4_data, decompressed_length) {
            Ok(decompressed) => {
                if decompressed.len() != decompressed_length {
                    return Err(Error::InvalidFormat(format!(
                        "LZ4 decompression size mismatch for chunk {} at offset 0x{:x}: expected {}, got {}{}",
                        chunk_index,
                        self.compression_info
                            .chunk_offsets
                            .get(chunk_index)
                            .unwrap_or(&0),
                        decompressed_length,
                        decompressed.len(),
                        file_info
                    )));
                }
                Ok(decompressed)
            }
            Err(e) => Err(Error::InvalidFormat(format!(
                "LZ4 decompression failed for chunk {} at offset 0x{:x}: {}{}",
                chunk_index,
                self.compression_info
                    .chunk_offsets
                    .get(chunk_index)
                    .unwrap_or(&0),
                e,
                file_info
            ))),
        }
    }

    /// Decompress Snappy chunk - strict mode for modern formats
    fn decompress_snappy_chunk(
        &self,
        compressed_data: &[u8],
        chunk_index: usize,
    ) -> Result<Vec<u8>> {
        #[cfg(feature = "snappy")]
        {
            use snap::raw::Decoder;
            let mut decoder = Decoder::new();

            match decoder.decompress_vec(compressed_data) {
                Ok(decompressed) => Ok(decompressed),
                Err(e) => Err(Error::InvalidFormat(format!(
                    "Snappy decompression failed for chunk {} at offset 0x{:x}: {}. No fallback allowed for modern formats.",
                    chunk_index,
                    self.compression_info
                        .chunk_offsets
                        .get(chunk_index)
                        .unwrap_or(&0),
                    e
                ))),
            }
        }

        #[cfg(not(feature = "snappy"))]
        {
            let _ = (compressed_data, chunk_index); // Suppress unused warnings
            Err(Error::UnsupportedFormat(
                "Snappy support not compiled in".to_string(),
            ))
        }
    }

    /// Decompress Deflate chunk - strict mode for modern formats
    fn decompress_deflate_chunk(
        &self,
        compressed_data: &[u8],
        chunk_index: usize,
    ) -> Result<Vec<u8>> {
        #[cfg(feature = "deflate")]
        {
            use flate2::read::DeflateDecoder;
            use std::io::Read;

            let mut decoder = DeflateDecoder::new(compressed_data);
            let mut decompressed = Vec::new();

            match decoder.read_to_end(&mut decompressed) {
                Ok(_) => Ok(decompressed),
                Err(e) => Err(Error::InvalidFormat(format!(
                    "Deflate decompression failed for chunk {} at offset 0x{:x}: {}. No fallback allowed for modern formats.",
                    chunk_index,
                    self.compression_info
                        .chunk_offsets
                        .get(chunk_index)
                        .unwrap_or(&0),
                    e
                ))),
            }
        }

        #[cfg(not(feature = "deflate"))]
        {
            let _ = (compressed_data, chunk_index); // Suppress unused warnings
            Err(Error::UnsupportedFormat(
                "Deflate support not compiled in".to_string(),
            ))
        }
    }

    /// Decompress Zstd chunk - strict mode for modern formats
    fn decompress_zstd_chunk(&self, compressed_data: &[u8], chunk_index: usize) -> Result<Vec<u8>> {
        #[cfg(feature = "zstd")]
        {
            match zstd::decode_all(compressed_data) {
                Ok(decompressed) => Ok(decompressed),
                Err(e) => Err(Error::InvalidFormat(format!(
                    "Zstd decompression failed for chunk {} at offset 0x{:x}: {}. No fallback allowed for modern formats.",
                    chunk_index,
                    self.compression_info
                        .chunk_offsets
                        .get(chunk_index)
                        .unwrap_or(&0),
                    e
                ))),
            }
        }

        #[cfg(not(feature = "zstd"))]
        {
            let _ = (compressed_data, chunk_index); // Suppress unused warnings
            Err(Error::UnsupportedFormat(
                "Zstd support not compiled in".to_string(),
            ))
        }
    }

    /// Clear the chunk cache to free memory
    pub fn clear_cache(&mut self) {
        self.chunk_cache.clear();
    }

    /// Get cache statistics
    pub fn cache_stats(&self) -> (usize, usize) {
        (self.chunk_cache.len(), self.max_cached_chunks)
    }

    /// Read all data from the compressed file (for testing/debugging)
    pub fn read_all_data<R: Read + Seek>(&mut self, reader: &mut R) -> Result<Vec<u8>> {
        self.read_data(reader, 0, self.compression_info.data_length as usize)
    }

    /// Get compression info
    pub fn compression_info(&self) -> &CompressionInfo {
        &self.compression_info
    }
}

/// Utility function to create a chunk decompressor from CompressionInfo.db file
pub fn create_decompressor_from_file(
    compression_info_path: &std::path::Path,
) -> Result<ChunkDecompressor> {
    let compression_data = std::fs::read(compression_info_path).map_err(Error::Io)?;

    // Parse CompressionInfo using the deterministic Cassandra format.
    // (Bug #638: old heuristic parse_alternative_format violated the no-heuristics mandate
    // and has been removed.  The standard parse() is authoritative for all supported files.)
    let compression_info = CompressionInfo::parse(&compression_data)?;

    log::info!("Loaded compression info:");
    log::info!("   Algorithm: {}", compression_info.algorithm);
    log::info!("   Chunk Length: {} bytes", compression_info.chunk_length);
    log::info!("   Data Length: {} bytes", compression_info.data_length);
    log::info!("   Chunk Count: {}", compression_info.chunk_offsets.len());

    ChunkDecompressor::new(compression_info, CassandraVersion::V5_0Release)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_chunk_decompressor_creation() {
        let compression_info = CompressionInfo {
            algorithm: "LZ4Compressor".to_string(),
            chunk_length: 16384,
            data_length: 32768,
            chunk_offsets: vec![0, 8192, 16384],
            option_pairs: vec![],
            max_compressed_length: i32::MAX as u32,
        };

        let decompressor =
            ChunkDecompressor::new(compression_info, CassandraVersion::V5_0Release).unwrap();
        assert_eq!(decompressor.compression_info.algorithm, "LZ4Compressor");
        assert_eq!(decompressor.compression_info.chunk_length, 16384);
        assert_eq!(decompressor.compression_info.chunk_offsets.len(), 3);
    }

    #[test]
    fn test_chunk_cache() {
        let compression_info = CompressionInfo {
            algorithm: "LZ4Compressor".to_string(),
            chunk_length: 16384,
            data_length: 16384,
            chunk_offsets: vec![0],
            option_pairs: vec![],
            max_compressed_length: i32::MAX as u32,
        };

        let mut decompressor =
            ChunkDecompressor::new(compression_info, CassandraVersion::V5_0Release).unwrap();

        let (cached, max) = decompressor.cache_stats();
        assert_eq!(cached, 0);
        assert_eq!(max, 16);

        decompressor.clear_cache();
        let (cached_after_clear, _) = decompressor.cache_stats();
        assert_eq!(cached_after_clear, 0);
    }
}