cqlite_core/storage/sstable/
compression_info.rs1use crate::{Error, Result};
30use std::io::{Cursor, Read};
31
32#[derive(Debug, Clone)]
34pub struct CompressionInfo {
35 pub algorithm: String,
37 pub option_pairs: Vec<(String, String)>,
40 pub chunk_length: u32,
42 pub max_compressed_length: u32,
46 pub data_length: u64,
48 pub chunk_offsets: Vec<u64>,
55 }
59
60fn read_utf(cursor: &mut Cursor<&[u8]>) -> Result<String> {
62 let mut len_bytes = [0u8; 2];
63 cursor
64 .read_exact(&mut len_bytes)
65 .map_err(|e| Error::InvalidFormat(format!("Failed to read writeUTF length: {}", e)))?;
66 let len = u16::from_be_bytes(len_bytes) as usize;
67
68 let mut string_bytes = vec![0u8; len];
69 cursor.read_exact(&mut string_bytes).map_err(|e| {
70 Error::InvalidFormat(format!(
71 "Failed to read writeUTF bytes (len={}): {}",
72 len, e
73 ))
74 })?;
75
76 String::from_utf8(string_bytes)
77 .map_err(|e| Error::InvalidFormat(format!("Invalid UTF-8 in writeUTF string: {}", e)))
78}
79
80fn read_u32(cursor: &mut Cursor<&[u8]>, field: &str) -> Result<u32> {
82 let mut bytes = [0u8; 4];
83 cursor
84 .read_exact(&mut bytes)
85 .map_err(|e| Error::InvalidFormat(format!("Failed to read {} (u32 BE): {}", field, e)))?;
86 Ok(u32::from_be_bytes(bytes))
87}
88
89fn read_u64(cursor: &mut Cursor<&[u8]>, field: &str) -> Result<u64> {
91 let mut bytes = [0u8; 8];
92 cursor
93 .read_exact(&mut bytes)
94 .map_err(|e| Error::InvalidFormat(format!("Failed to read {} (u64 BE): {}", field, e)))?;
95 Ok(u64::from_be_bytes(bytes))
96}
97
98impl CompressionInfo {
99 pub fn parse(data: &[u8]) -> Result<Self> {
104 if data.is_empty() {
105 return Err(Error::InvalidFormat(
106 "Empty compression info data".to_string(),
107 ));
108 }
109
110 let mut cursor = Cursor::new(data);
111
112 let algorithm = read_utf(&mut cursor)?;
114 if algorithm.is_empty() {
115 return Err(Error::InvalidFormat(
116 "Compressor simple name is empty".to_string(),
117 ));
118 }
119
120 let option_count = read_u32(&mut cursor, "option_count")?;
122 if option_count > 256 {
123 return Err(Error::InvalidFormat(format!(
124 "Unreasonably large option_count: {} (max 256)",
125 option_count
126 )));
127 }
128
129 let mut option_pairs = Vec::with_capacity(option_count as usize);
131 for i in 0..option_count {
132 let key = read_utf(&mut cursor).map_err(|e| {
133 Error::InvalidFormat(format!("Failed to read option key {}: {}", i, e))
134 })?;
135 let value = read_utf(&mut cursor).map_err(|e| {
136 Error::InvalidFormat(format!("Failed to read option value {}: {}", i, e))
137 })?;
138 option_pairs.push((key, value));
139 }
140
141 let chunk_length = read_u32(&mut cursor, "chunk_length")?;
143 if chunk_length == 0 {
144 return Err(Error::InvalidFormat(
145 "chunk_length cannot be zero".to_string(),
146 ));
147 }
148 if chunk_length > 256 * 1024 * 1024 {
149 return Err(Error::InvalidFormat(format!(
150 "chunk_length too large: {} bytes (max 256 MiB)",
151 chunk_length
152 )));
153 }
154
155 let max_compressed_length = read_u32(&mut cursor, "max_compressed_length")?;
157
158 let data_length = read_u64(&mut cursor, "data_length")?;
160
161 let chunk_count = read_u32(&mut cursor, "chunk_count")? as usize;
163 if chunk_count == 0 {
164 return Err(Error::InvalidFormat(
165 "chunk_count cannot be zero".to_string(),
166 ));
167 }
168 if chunk_count > 1_000_000 {
169 return Err(Error::InvalidFormat(format!(
170 "chunk_count too large: {} (max 1,000,000)",
171 chunk_count
172 )));
173 }
174
175 let mut chunk_offsets = Vec::with_capacity(chunk_count);
177 for i in 0..chunk_count {
178 let offset = read_u64(&mut cursor, &format!("chunk_offset[{}]", i))?;
179 chunk_offsets.push(offset);
180 }
181
182 let info = CompressionInfo {
183 algorithm,
184 option_pairs,
185 chunk_length,
186 max_compressed_length,
187 data_length,
188 chunk_offsets,
189 };
190
191 info.validate()?;
192 Ok(info)
193 }
194
195 pub fn chunk_for_offset(&self, offset: u64) -> usize {
197 (offset / self.chunk_length as u64) as usize
198 }
199
200 pub fn offset_within_chunk(&self, offset: u64) -> u64 {
202 offset % self.chunk_length as u64
203 }
204
205 pub fn compressed_chunk_offset(&self, chunk_index: usize) -> Option<u64> {
207 self.chunk_offsets.get(chunk_index).copied()
208 }
209
210 pub fn compressed_chunk_size(
216 &self,
217 chunk_index: usize,
218 total_compressed_size: u64,
219 ) -> Option<u64> {
220 let start_offset = self.compressed_chunk_offset(chunk_index)?;
221
222 if chunk_index + 1 < self.chunk_offsets.len() {
223 let next_offset = self.chunk_offsets[chunk_index + 1];
224 Some(next_offset - start_offset)
225 } else {
226 Some(total_compressed_size - start_offset)
227 }
228 }
229
230 pub fn validate(&self) -> Result<()> {
232 if self.algorithm.is_empty() {
233 return Err(Error::InvalidFormat(
234 "Empty compression algorithm".to_string(),
235 ));
236 }
237 if self.chunk_length == 0 {
238 return Err(Error::InvalidFormat("Zero chunk length".to_string()));
239 }
240 if self.chunk_offsets.is_empty() {
241 return Err(Error::InvalidFormat("No chunk offsets".to_string()));
242 }
243 for i in 1..self.chunk_offsets.len() {
245 if self.chunk_offsets[i] <= self.chunk_offsets[i - 1] {
246 return Err(Error::InvalidFormat(format!(
247 "Chunk offsets not in ascending order: offsets[{}]={} <= offsets[{}]={}",
248 i,
249 self.chunk_offsets[i],
250 i - 1,
251 self.chunk_offsets[i - 1]
252 )));
253 }
254 }
255 Ok(())
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 fn make_compression_info_blob(
265 algorithm: &str,
266 options: &[(&str, &str)],
267 chunk_length: u32,
268 max_compressed_length: u32,
269 data_length: u64,
270 offsets: &[u64],
271 ) -> Vec<u8> {
272 let mut data = Vec::new();
273
274 let name_bytes = algorithm.as_bytes();
276 data.extend_from_slice(&(name_bytes.len() as u16).to_be_bytes());
277 data.extend_from_slice(name_bytes);
278
279 data.extend_from_slice(&(options.len() as u32).to_be_bytes());
281
282 for (k, v) in options {
284 let kb = k.as_bytes();
285 data.extend_from_slice(&(kb.len() as u16).to_be_bytes());
286 data.extend_from_slice(kb);
287 let vb = v.as_bytes();
288 data.extend_from_slice(&(vb.len() as u16).to_be_bytes());
289 data.extend_from_slice(vb);
290 }
291
292 data.extend_from_slice(&chunk_length.to_be_bytes());
294
295 data.extend_from_slice(&max_compressed_length.to_be_bytes());
297
298 data.extend_from_slice(&data_length.to_be_bytes());
300
301 data.extend_from_slice(&(offsets.len() as u32).to_be_bytes());
303
304 for &off in offsets {
306 data.extend_from_slice(&off.to_be_bytes());
307 }
308
309 data
310 }
311
312 #[test]
313 fn test_parse_no_options() {
314 let blob = make_compression_info_blob(
315 "LZ4Compressor",
316 &[],
317 16384,
318 i32::MAX as u32,
319 32768,
320 &[0, 8200],
321 );
322
323 let info = CompressionInfo::parse(&blob).expect("parse should succeed");
324 assert_eq!(info.algorithm, "LZ4Compressor");
325 assert!(info.option_pairs.is_empty());
326 assert_eq!(info.chunk_length, 16384);
327 assert_eq!(info.max_compressed_length, i32::MAX as u32);
328 assert_eq!(info.data_length, 32768);
329 assert_eq!(info.chunk_offsets, vec![0, 8200]);
330 }
331
332 #[test]
333 fn test_parse_with_options() {
334 let blob = make_compression_info_blob(
337 "LZ4Compressor",
338 &[("compression_level", "9")],
339 16384,
340 i32::MAX as u32,
341 16384,
342 &[0],
343 );
344
345 let info = CompressionInfo::parse(&blob).expect(
346 "Bug #638 repro: parser must handle option_count > 0 deterministically, \
347 not skip 4 bytes as padding",
348 );
349 assert_eq!(info.algorithm, "LZ4Compressor");
350 assert_eq!(info.option_pairs.len(), 1);
351 assert_eq!(info.option_pairs[0].0, "compression_level");
352 assert_eq!(info.option_pairs[0].1, "9");
353 assert_eq!(info.chunk_length, 16384);
354 assert_eq!(info.max_compressed_length, i32::MAX as u32);
355 }
356
357 #[test]
358 fn test_parse_exposes_max_compressed_length() {
359 let blob = make_compression_info_blob(
362 "SnappyCompressor",
363 &[],
364 16384,
365 i32::MAX as u32,
366 16384,
367 &[0],
368 );
369
370 let info = CompressionInfo::parse(&blob).expect("parse should succeed");
371 assert_eq!(
373 info.max_compressed_length,
374 i32::MAX as u32,
375 "Bug #638: max_compressed_length field must be exposed for incompressible-chunk fallback"
376 );
377 }
378
379 #[test]
380 fn test_parse_real_snappy_fixture() {
381 let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
384 .parent()
385 .unwrap()
386 .join("test-data/datasets/sstables/test_basic/simple_table-6aa08200a25111f0a3fef1a551383fb9/nb-1-big-CompressionInfo.db");
387
388 if !fixture_path.exists() {
389 println!("Skipping real-fixture test: {}", fixture_path.display());
390 return;
391 }
392
393 let data = std::fs::read(&fixture_path).expect("read fixture");
394 let info = CompressionInfo::parse(&data).expect("parse real CompressionInfo.db");
395
396 assert_eq!(info.algorithm, "SnappyCompressor");
397 assert_eq!(info.chunk_length, 16384);
398 assert_eq!(info.max_compressed_length, i32::MAX as u32);
399 assert!(!info.chunk_offsets.is_empty());
400 for i in 1..info.chunk_offsets.len() {
402 assert!(
403 info.chunk_offsets[i] > info.chunk_offsets[i - 1],
404 "offsets must be increasing"
405 );
406 }
407 let expected_size: usize = 2
409 + info.algorithm.len()
410 + 4 + 4 + 4 + 8 + 4 + info.chunk_offsets.len() * 8;
416 assert_eq!(
417 data.len(),
418 expected_size,
419 "CompressionInfo.db must end immediately after offsets — no CRC bytes appended"
420 );
421 }
422
423 #[test]
424 fn test_chunk_calculations() {
425 let info = CompressionInfo {
426 algorithm: "LZ4Compressor".to_string(),
427 option_pairs: vec![],
428 chunk_length: 16384,
429 max_compressed_length: i32::MAX as u32,
430 data_length: 32768,
431 chunk_offsets: vec![0, 8192],
432 };
433
434 assert_eq!(info.chunk_for_offset(0), 0);
435 assert_eq!(info.chunk_for_offset(16384), 1);
436 assert_eq!(info.offset_within_chunk(100), 100);
437 assert_eq!(info.offset_within_chunk(16484), 100);
438
439 assert_eq!(info.compressed_chunk_offset(0), Some(0));
440 assert_eq!(info.compressed_chunk_offset(1), Some(8192));
441
442 assert_eq!(info.compressed_chunk_size(0, 20000), Some(8192));
443 assert_eq!(info.compressed_chunk_size(1, 20000), Some(11808));
444 }
445
446 #[test]
447 fn test_validation() {
448 let valid_info = CompressionInfo {
449 algorithm: "LZ4Compressor".to_string(),
450 option_pairs: vec![],
451 chunk_length: 16384,
452 max_compressed_length: i32::MAX as u32,
453 data_length: 32768,
454 chunk_offsets: vec![0, 8192],
455 };
456 assert!(valid_info.validate().is_ok());
457
458 let invalid = CompressionInfo {
460 algorithm: "".to_string(),
461 option_pairs: vec![],
462 chunk_length: 16384,
463 max_compressed_length: i32::MAX as u32,
464 data_length: 0,
465 chunk_offsets: vec![0],
466 };
467 assert!(invalid.validate().is_err());
468
469 let invalid2 = CompressionInfo {
471 algorithm: "LZ4Compressor".to_string(),
472 option_pairs: vec![],
473 chunk_length: 16384,
474 max_compressed_length: i32::MAX as u32,
475 data_length: 32768,
476 chunk_offsets: vec![8192, 0],
477 };
478 assert!(invalid2.validate().is_err());
479 }
480}