objects/store/compression/
mod.rs1#[cfg(feature = "zstd")]
9use std::io::Read;
10
11#[cfg(test)]
12use crate::delta::{DeltaDecoder, DeltaEncoder, MAX_DELTA_OUTPUT_SIZE};
13
14const COMPRESSED_HEADER_LEN: usize = 9;
15const LEGACY_COMPRESSED_HEADER_LEN: usize = 5;
16const MAX_DECOMPRESSED_SIZE: u64 = 256 * 1024 * 1024;
17const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21#[repr(u8)]
22enum CompressionType {
23 None = 0,
25 Zstd = 1,
27 Delta = 2,
29}
30
31impl CompressionType {
32 fn from_u8(value: u8) -> Option<Self> {
34 match value {
35 0 => Some(CompressionType::None),
36 1 => Some(CompressionType::Zstd),
37 2 => Some(CompressionType::Delta),
38 _ => None,
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy)]
45pub struct CompressionConfig {
46 pub enabled: bool,
48 pub level: i32,
51 pub min_size: usize,
53 pub max_delta_size: usize,
55}
56
57impl Default for CompressionConfig {
58 fn default() -> Self {
59 Self {
60 enabled: cfg!(feature = "zstd"),
61 level: 3, min_size: 256, max_delta_size: 10_000_000, }
65 }
66}
67
68impl CompressionConfig {
69 pub fn from_env() -> Self {
71 let mut config = Self::default();
72
73 if let Ok(val) = std::env::var("HEDDLE_COMPRESSION") {
74 let requested = val != "0" && val.to_lowercase() != "false";
75 config.enabled = requested && cfg!(feature = "zstd");
76 }
77
78 if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_LEVEL")
79 && let Ok(level) = val.parse::<i32>()
80 {
81 config.level = level.clamp(1, 22);
82 }
83
84 if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_MIN_SIZE")
85 && let Ok(size) = val.parse::<usize>()
86 {
87 config.min_size = size;
88 }
89
90 config
91 }
92
93 pub fn disabled() -> Self {
95 Self {
96 enabled: false,
97 level: 0,
98 min_size: usize::MAX,
99 max_delta_size: 0,
100 }
101 }
102}
103
104#[derive(Debug, thiserror::Error)]
106pub enum CompressionError {
107 #[error("decompression failed: {0}")]
108 DecompressionFailed(String),
109 #[error("compression failed: {0}")]
110 CompressionFailed(String),
111 #[error("invalid compression type: {0}")]
112 InvalidType(u8),
113 #[error("corrupted data: {0}")]
114 CorruptedData(String),
115 #[error("invalid operation: {0}")]
116 InvalidOperation(String),
117 #[error("object size {size} exceeds maximum {max}")]
118 SizeLimitExceeded { size: u64, max: u64 },
119}
120
121#[cfg(feature = "zstd")]
122fn compress_zstd(data: &[u8], level: i32) -> Result<Vec<u8>, CompressionError> {
124 zstd::encode_all(data, level).map_err(|e| CompressionError::CompressionFailed(e.to_string()))
125}
126
127#[cfg(not(feature = "zstd"))]
128fn compress_zstd(_data: &[u8], _level: i32) -> Result<Vec<u8>, CompressionError> {
129 Err(CompressionError::InvalidOperation(
130 "zstd compression support not compiled into this build".to_string(),
131 ))
132}
133
134#[cfg(feature = "zstd")]
135fn decompress_zstd(data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
137 validate_size(expected_size)?;
138
139 let mut decoder = zstd::stream::read::Decoder::new(data)
140 .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
141 let mut decompressed = Vec::with_capacity(expected_size as usize);
142 let mut buffer = [0u8; 8192];
143
144 loop {
145 let bytes_read = decoder
146 .read(&mut buffer)
147 .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
148 if bytes_read == 0 {
149 break;
150 }
151
152 let next_size = (decompressed.len() + bytes_read) as u64;
153 if next_size > expected_size {
154 return Err(CompressionError::CorruptedData(format!(
155 "decompressed size exceeds recorded header size: expected {expected_size}, got at least {next_size}",
156 )));
157 }
158
159 decompressed.extend_from_slice(&buffer[..bytes_read]);
160 }
161
162 Ok(decompressed)
163}
164
165#[cfg(not(feature = "zstd"))]
166fn decompress_zstd(_data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
167 validate_size(expected_size)?;
168 Err(CompressionError::InvalidOperation(
169 "zstd-compressed data is unsupported in this build".to_string(),
170 ))
171}
172
173pub fn compress(
178 data: &[u8],
179 config: &CompressionConfig,
180) -> Result<Option<Vec<u8>>, CompressionError> {
181 if !config.enabled || data.len() < config.min_size {
182 return Ok(None);
183 }
184
185 validate_size(data.len() as u64)?;
186
187 let compressed = compress_zstd(data, config.level)?;
189
190 if compressed.len() >= data.len() {
192 return Ok(None);
193 }
194
195 let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + compressed.len());
197 result.push(CompressionType::Zstd as u8);
198 result.extend_from_slice(&(data.len() as u64).to_be_bytes());
199 result.extend_from_slice(&compressed);
200
201 Ok(Some(result))
202}
203
204#[cfg(test)]
205fn compress_delta(
210 data: &[u8],
211 base: &[u8],
212 config: &CompressionConfig,
213) -> Result<Option<Vec<u8>>, CompressionError> {
214 if !config.enabled || data.len() < config.min_size || base.len() > config.max_delta_size {
215 return Ok(None);
216 }
217
218 validate_size(data.len() as u64)?;
219
220 let delta = DeltaEncoder::encode(base, data);
221
222 if delta.len() >= data.len() {
224 return Ok(None);
225 }
226
227 let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + delta.len());
229 result.push(CompressionType::Delta as u8);
230 result.extend_from_slice(&(data.len() as u64).to_be_bytes());
231 result.extend_from_slice(&delta);
232
233 Ok(Some(result))
234}
235
236pub fn decompress(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
240 if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
241 return Ok(data.to_vec());
243 }
244
245 let compression_type =
246 CompressionType::from_u8(data[0]).ok_or_else(|| CompressionError::InvalidType(data[0]))?;
247
248 match compression_type {
249 CompressionType::None => {
250 let offset = if data.len() >= COMPRESSED_HEADER_LEN {
251 COMPRESSED_HEADER_LEN
252 } else {
253 LEGACY_COMPRESSED_HEADER_LEN
254 };
255 let expected_size = if offset == COMPRESSED_HEADER_LEN {
256 read_u64_size(data)?
257 } else {
258 read_u32_size(data)?
259 };
260 let payload = data[offset..].to_vec();
261 validate_decompressed_len(expected_size, payload.len())?;
262 Ok(payload)
263 }
264 CompressionType::Zstd if zstd_header_len(data).is_some() => {
265 decompress_zstd_with_header(data)
266 }
267 CompressionType::Zstd => Ok(data.to_vec()),
268 CompressionType::Delta => {
269 Err(CompressionError::InvalidOperation(
271 "Delta compression requires base object".to_string(),
272 ))
273 }
274 }
275}
276
277#[cfg(test)]
278fn decompress_delta(delta_data: &[u8], base: &[u8]) -> Result<Vec<u8>, CompressionError> {
280 if delta_data.len() < LEGACY_COMPRESSED_HEADER_LEN {
281 return Err(CompressionError::CorruptedData(
282 "Delta data too short".to_string(),
283 ));
284 }
285
286 let compression_type = CompressionType::from_u8(delta_data[0])
287 .ok_or_else(|| CompressionError::InvalidType(delta_data[0]))?;
288
289 if compression_type != CompressionType::Delta {
290 return Err(CompressionError::InvalidOperation(
291 "Expected delta compression".to_string(),
292 ));
293 }
294
295 decompress_delta_with_header(delta_data, base)
296}
297
298pub fn is_compressed(data: &[u8]) -> bool {
300 if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
301 return false;
302 }
303
304 matches!(
305 CompressionType::from_u8(data[0]),
306 Some(CompressionType::Zstd)
307 ) && zstd_header_len(data).is_some()
308}
309
310pub fn header_uncompressed_size(data: &[u8]) -> Option<u64> {
318 if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
319 return None;
320 }
321 match CompressionType::from_u8(data[0])? {
322 CompressionType::Zstd => match zstd_header_len(data)? {
323 COMPRESSED_HEADER_LEN => {
324 u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().ok()?).into()
325 }
326 LEGACY_COMPRESSED_HEADER_LEN => Some(u32::from_be_bytes(
327 data[1..LEGACY_COMPRESSED_HEADER_LEN].try_into().ok()?,
328 ) as u64),
329 _ => None,
330 },
331 CompressionType::None | CompressionType::Delta => None,
332 }
333}
334
335#[cfg(test)]
336fn compression_info(data: &[u8]) -> Option<(CompressionType, u64)> {
338 if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
339 return None;
340 }
341
342 let compression_type = CompressionType::from_u8(data[0])?;
343 let uncompressed_size = if data.len() >= COMPRESSED_HEADER_LEN {
344 u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().ok()?)
345 } else {
346 u32::from_be_bytes(data[1..LEGACY_COMPRESSED_HEADER_LEN].try_into().ok()?) as u64
347 };
348
349 Some((compression_type, uncompressed_size))
350}
351
352fn decompress_zstd_with_header(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
353 if has_magic_at(data, COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
354 return try_decompress_zstd(data, COMPRESSED_HEADER_LEN, read_u64_size);
355 }
356
357 if has_magic_at(data, LEGACY_COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
358 return try_decompress_zstd(data, LEGACY_COMPRESSED_HEADER_LEN, read_u32_size);
359 }
360
361 try_decompress_zstd(data, COMPRESSED_HEADER_LEN, read_u64_size)
362 .or_else(|_| try_decompress_zstd(data, LEGACY_COMPRESSED_HEADER_LEN, read_u32_size))
363}
364
365fn zstd_header_len(data: &[u8]) -> Option<usize> {
366 if has_magic_at(data, COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
367 Some(COMPRESSED_HEADER_LEN)
368 } else if has_magic_at(data, LEGACY_COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
369 Some(LEGACY_COMPRESSED_HEADER_LEN)
370 } else {
371 None
372 }
373}
374
375fn try_decompress_zstd<F>(
376 data: &[u8],
377 header_len: usize,
378 read_size: F,
379) -> Result<Vec<u8>, CompressionError>
380where
381 F: Fn(&[u8]) -> Result<u64, CompressionError>,
382{
383 let uncompressed_size = read_size(data)?;
384 let decompressed = decompress_zstd(&data[header_len..], uncompressed_size)?;
385 validate_decompressed_len(uncompressed_size, decompressed.len())?;
386 Ok(decompressed)
387}
388
389#[cfg(test)]
390fn decompress_delta_with_header(
391 delta_data: &[u8],
392 base: &[u8],
393) -> Result<Vec<u8>, CompressionError> {
394 if let Ok(result) = try_decompress_delta(delta_data, base, COMPRESSED_HEADER_LEN, read_u64_size)
395 {
396 return Ok(result);
397 }
398
399 try_decompress_delta(
400 delta_data,
401 base,
402 LEGACY_COMPRESSED_HEADER_LEN,
403 read_u32_size,
404 )
405}
406
407#[cfg(test)]
408fn try_decompress_delta<F>(
409 delta_data: &[u8],
410 base: &[u8],
411 header_len: usize,
412 read_size: F,
413) -> Result<Vec<u8>, CompressionError>
414where
415 F: Fn(&[u8]) -> Result<u64, CompressionError>,
416{
417 let uncompressed_size = read_size(delta_data)?;
418
419 if uncompressed_size > MAX_DELTA_OUTPUT_SIZE as u64 {
420 return Err(CompressionError::DecompressionFailed(format!(
421 "delta output size {} exceeds max {}",
422 uncompressed_size, MAX_DELTA_OUTPUT_SIZE
423 )));
424 }
425
426 let delta = &delta_data[header_len..];
427 let decompressed = DeltaDecoder::decode(base, delta, uncompressed_size as usize)
428 .map_err(|error| CompressionError::DecompressionFailed(error.to_string()))?;
429 validate_decompressed_len(uncompressed_size, decompressed.len())?;
430 Ok(decompressed)
431}
432
433fn read_u64_size(data: &[u8]) -> Result<u64, CompressionError> {
434 if data.len() < COMPRESSED_HEADER_LEN {
435 return Err(CompressionError::CorruptedData(
436 "compression header truncated".to_string(),
437 ));
438 }
439
440 let recorded_size =
441 u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().map_err(|_| {
442 CompressionError::CorruptedData("compression header truncated".to_string())
443 })?);
444 validate_size(recorded_size)?;
445 Ok(recorded_size)
446}
447
448fn read_u32_size(data: &[u8]) -> Result<u64, CompressionError> {
449 if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
450 return Err(CompressionError::CorruptedData(
451 "compression header truncated".to_string(),
452 ));
453 }
454
455 let recorded_size = u32::from_be_bytes(
456 data[1..LEGACY_COMPRESSED_HEADER_LEN]
457 .try_into()
458 .map_err(|_| {
459 CompressionError::CorruptedData("compression header truncated".to_string())
460 })?,
461 ) as u64;
462 validate_size(recorded_size)?;
463 Ok(recorded_size)
464}
465
466fn validate_size(size: u64) -> Result<(), CompressionError> {
467 if size > MAX_DECOMPRESSED_SIZE {
468 return Err(CompressionError::SizeLimitExceeded {
469 size,
470 max: MAX_DECOMPRESSED_SIZE,
471 });
472 }
473
474 Ok(())
475}
476
477fn validate_decompressed_len(expected: u64, actual: usize) -> Result<(), CompressionError> {
478 if actual as u64 != expected {
479 return Err(CompressionError::CorruptedData(format!(
480 "decompressed size mismatch: expected {expected}, got {actual}",
481 )));
482 }
483
484 Ok(())
485}
486
487fn has_magic_at(data: &[u8], offset: usize, magic: [u8; 4]) -> bool {
488 data.get(offset..offset + magic.len()) == Some(magic.as_slice())
489}
490
491#[cfg(test)]
492mod compression_tests;