objects/store/compression/
mod.rs1#![deny(clippy::cast_possible_truncation)]
3
4#[cfg(feature = "zstd")]
11use std::io::Read;
12
13#[cfg(test)]
14use crate::delta::{DeltaDecoder, DeltaEncoder, MAX_DELTA_OUTPUT_SIZE};
15
16const COMPRESSED_HEADER_LEN: usize = 9;
17const MAX_DECOMPRESSED_SIZE: u64 = 256 * 1024 * 1024;
18const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u8)]
23enum CompressionType {
24 None = 0,
26 Zstd = 1,
28 Delta = 2,
30}
31
32impl CompressionType {
33 fn from_u8(value: u8) -> Option<Self> {
35 match value {
36 0 => Some(CompressionType::None),
37 1 => Some(CompressionType::Zstd),
38 2 => Some(CompressionType::Delta),
39 _ => None,
40 }
41 }
42}
43
44#[derive(Debug, Clone, Copy)]
46pub struct CompressionConfig {
47 pub enabled: bool,
49 pub level: i32,
52 pub min_size: usize,
54 pub max_delta_size: usize,
56}
57
58impl Default for CompressionConfig {
59 fn default() -> Self {
60 Self {
61 enabled: cfg!(feature = "zstd"),
62 level: 3, min_size: 256, max_delta_size: 10_000_000, }
66 }
67}
68
69impl CompressionConfig {
70 pub fn from_env() -> Self {
72 let mut config = Self::default();
73
74 if let Ok(val) = std::env::var("HEDDLE_COMPRESSION") {
75 let requested = val != "0" && val.to_lowercase() != "false";
76 config.enabled = requested && cfg!(feature = "zstd");
77 }
78
79 if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_LEVEL")
80 && let Ok(level) = val.parse::<i32>()
81 {
82 config.level = level.clamp(1, 22);
83 }
84
85 if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_MIN_SIZE")
86 && let Ok(size) = val.parse::<usize>()
87 {
88 config.min_size = size;
89 }
90
91 config
92 }
93
94 pub fn disabled() -> Self {
96 Self {
97 enabled: false,
98 level: 0,
99 min_size: usize::MAX,
100 max_delta_size: 0,
101 }
102 }
103}
104
105#[derive(Debug, thiserror::Error)]
107pub enum CompressionError {
108 #[error("decompression failed: {0}")]
109 DecompressionFailed(String),
110 #[error("compression failed: {0}")]
111 CompressionFailed(String),
112 #[error("invalid compression type: {0}")]
113 InvalidType(u8),
114 #[error("corrupted data: {0}")]
115 CorruptedData(String),
116 #[error("invalid operation: {0}")]
117 InvalidOperation(String),
118 #[error("object size {size} exceeds maximum {max}")]
119 SizeLimitExceeded { size: u64, max: u64 },
120}
121
122#[cfg(feature = "zstd")]
123fn compress_zstd_impl(data: &[u8], level: i32) -> Result<Vec<u8>, CompressionError> {
125 zstd::encode_all(data, level).map_err(|e| CompressionError::CompressionFailed(e.to_string()))
126}
127
128#[cfg(not(feature = "zstd"))]
129fn compress_zstd_impl(_data: &[u8], _level: i32) -> Result<Vec<u8>, CompressionError> {
130 Err(CompressionError::InvalidOperation(
131 "zstd compression support not compiled into this build".to_string(),
132 ))
133}
134
135#[cfg(feature = "bench")]
136pub fn compress_zstd(data: &[u8], level: i32) -> Result<Vec<u8>, CompressionError> {
138 compress_zstd_impl(data, level)
139}
140
141#[cfg(feature = "zstd")]
142fn decompress_zstd_impl(data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
144 validate_size(expected_size)?;
145 let expected_capacity = checked_size_to_usize("zstd expected size", expected_size)?;
146
147 let mut decoder = zstd::stream::read::Decoder::new(data)
148 .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
149 let mut decompressed = Vec::with_capacity(expected_capacity);
150 let mut buffer = [0u8; 8192];
151
152 loop {
153 let bytes_read = decoder
154 .read(&mut buffer)
155 .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
156 if bytes_read == 0 {
157 break;
158 }
159
160 let next_size = decompressed.len().checked_add(bytes_read).ok_or_else(|| {
161 CompressionError::CorruptedData("decompressed size overflows".to_string())
162 })?;
163 let next_size = u64::try_from(next_size).map_err(|_| {
164 CompressionError::CorruptedData("decompressed size exceeds platform limits".to_string())
165 })?;
166 if next_size > expected_size {
167 return Err(CompressionError::CorruptedData(format!(
168 "decompressed size exceeds recorded header size: expected {expected_size}, got at least {next_size}",
169 )));
170 }
171
172 decompressed.extend_from_slice(&buffer[..bytes_read]);
173 }
174
175 Ok(decompressed)
176}
177
178#[cfg(not(feature = "zstd"))]
179fn decompress_zstd_impl(_data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
180 validate_size(expected_size)?;
181 Err(CompressionError::InvalidOperation(
182 "zstd-compressed data is unsupported in this build".to_string(),
183 ))
184}
185
186#[cfg(feature = "bench")]
187pub fn decompress_zstd(data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
189 decompress_zstd_impl(data, expected_size)
190}
191
192pub fn compress(
197 data: &[u8],
198 config: &CompressionConfig,
199) -> Result<Option<Vec<u8>>, CompressionError> {
200 if !config.enabled || data.len() < config.min_size {
201 return Ok(None);
202 }
203
204 validate_size(data.len() as u64)?;
205
206 let compressed = compress_zstd_impl(data, config.level)?;
208
209 if compressed.len() >= data.len() {
211 return Ok(None);
212 }
213
214 let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + compressed.len());
216 result.push(CompressionType::Zstd as u8);
217 result.extend_from_slice(&(data.len() as u64).to_be_bytes());
218 result.extend_from_slice(&compressed);
219
220 Ok(Some(result))
221}
222
223#[cfg(test)]
224fn compress_delta(
229 data: &[u8],
230 base: &[u8],
231 config: &CompressionConfig,
232) -> Result<Option<Vec<u8>>, CompressionError> {
233 if !config.enabled || data.len() < config.min_size || base.len() > config.max_delta_size {
234 return Ok(None);
235 }
236
237 validate_size(data.len() as u64)?;
238
239 let delta = DeltaEncoder::encode(base, data);
240
241 if delta.len() >= data.len() {
243 return Ok(None);
244 }
245
246 let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + delta.len());
248 result.push(CompressionType::Delta as u8);
249 result.extend_from_slice(&(data.len() as u64).to_be_bytes());
250 result.extend_from_slice(&delta);
251
252 Ok(Some(result))
253}
254
255pub fn decompress(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
259 if data.len() < COMPRESSED_HEADER_LEN {
260 return Ok(data.to_vec());
262 }
263
264 let compression_type =
265 CompressionType::from_u8(data[0]).ok_or_else(|| CompressionError::InvalidType(data[0]))?;
266
267 match compression_type {
268 CompressionType::None => {
269 let expected_size = read_u64_size(data)?;
270 let payload = data[COMPRESSED_HEADER_LEN..].to_vec();
271 validate_decompressed_len(expected_size, payload.len())?;
272 Ok(payload)
273 }
274 CompressionType::Zstd if zstd_header_len(data).is_some() => {
275 decompress_zstd_with_header(data)
276 }
277 CompressionType::Zstd => Ok(data.to_vec()),
278 CompressionType::Delta => {
279 Err(CompressionError::InvalidOperation(
281 "Delta compression requires base object".to_string(),
282 ))
283 }
284 }
285}
286
287#[cfg(test)]
288fn decompress_delta(delta_data: &[u8], base: &[u8]) -> Result<Vec<u8>, CompressionError> {
290 if delta_data.len() < COMPRESSED_HEADER_LEN {
291 return Err(CompressionError::CorruptedData(
292 "Delta data too short".to_string(),
293 ));
294 }
295
296 let compression_type = CompressionType::from_u8(delta_data[0])
297 .ok_or_else(|| CompressionError::InvalidType(delta_data[0]))?;
298
299 if compression_type != CompressionType::Delta {
300 return Err(CompressionError::InvalidOperation(
301 "Expected delta compression".to_string(),
302 ));
303 }
304
305 decompress_delta_with_header(delta_data, base)
306}
307
308pub fn is_compressed(data: &[u8]) -> bool {
310 if data.len() < COMPRESSED_HEADER_LEN {
311 return false;
312 }
313
314 matches!(
315 CompressionType::from_u8(data[0]),
316 Some(CompressionType::Zstd)
317 ) && zstd_header_len(data).is_some()
318}
319
320pub fn header_uncompressed_size(data: &[u8]) -> Option<u64> {
328 if data.len() < COMPRESSED_HEADER_LEN {
329 return None;
330 }
331 match CompressionType::from_u8(data[0])? {
332 CompressionType::Zstd => {
333 zstd_header_len(data)?;
334 Some(u64::from_be_bytes(
335 data[1..COMPRESSED_HEADER_LEN].try_into().ok()?,
336 ))
337 }
338 CompressionType::None | CompressionType::Delta => None,
339 }
340}
341
342#[cfg(test)]
343fn compression_info(data: &[u8]) -> Option<(CompressionType, u64)> {
345 if data.len() < COMPRESSED_HEADER_LEN {
346 return None;
347 }
348
349 let compression_type = CompressionType::from_u8(data[0])?;
350 let uncompressed_size = u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().ok()?);
351
352 Some((compression_type, uncompressed_size))
353}
354
355fn decompress_zstd_with_header(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
356 try_decompress_zstd(data, COMPRESSED_HEADER_LEN, read_u64_size)
357}
358
359fn zstd_header_len(data: &[u8]) -> Option<usize> {
360 if has_magic_at(data, COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
361 Some(COMPRESSED_HEADER_LEN)
362 } else {
363 None
364 }
365}
366
367fn try_decompress_zstd<F>(
368 data: &[u8],
369 header_len: usize,
370 read_size: F,
371) -> Result<Vec<u8>, CompressionError>
372where
373 F: Fn(&[u8]) -> Result<u64, CompressionError>,
374{
375 let uncompressed_size = read_size(data)?;
376 let decompressed = decompress_zstd_impl(&data[header_len..], uncompressed_size)?;
377 validate_decompressed_len(uncompressed_size, decompressed.len())?;
378 Ok(decompressed)
379}
380
381#[cfg(test)]
382fn decompress_delta_with_header(
383 delta_data: &[u8],
384 base: &[u8],
385) -> Result<Vec<u8>, CompressionError> {
386 try_decompress_delta(delta_data, base, COMPRESSED_HEADER_LEN, read_u64_size)
387}
388
389#[cfg(test)]
390fn try_decompress_delta<F>(
391 delta_data: &[u8],
392 base: &[u8],
393 header_len: usize,
394 read_size: F,
395) -> Result<Vec<u8>, CompressionError>
396where
397 F: Fn(&[u8]) -> Result<u64, CompressionError>,
398{
399 let uncompressed_size = read_size(delta_data)?;
400 let uncompressed_size_usize =
401 checked_size_to_usize("delta uncompressed size", uncompressed_size)?;
402
403 if uncompressed_size > MAX_DELTA_OUTPUT_SIZE as u64 {
404 return Err(CompressionError::DecompressionFailed(format!(
405 "delta output size {} exceeds max {}",
406 uncompressed_size, MAX_DELTA_OUTPUT_SIZE
407 )));
408 }
409
410 let delta = &delta_data[header_len..];
411 let decompressed = DeltaDecoder::decode(base, delta, uncompressed_size_usize)
412 .map_err(|error| CompressionError::DecompressionFailed(error.to_string()))?;
413 validate_decompressed_len(uncompressed_size, decompressed.len())?;
414 Ok(decompressed)
415}
416
417fn read_u64_size(data: &[u8]) -> Result<u64, CompressionError> {
418 if data.len() < COMPRESSED_HEADER_LEN {
419 return Err(CompressionError::CorruptedData(
420 "compression header truncated".to_string(),
421 ));
422 }
423
424 let recorded_size =
425 u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().map_err(|_| {
426 CompressionError::CorruptedData("compression header truncated".to_string())
427 })?);
428 validate_size(recorded_size)?;
429 Ok(recorded_size)
430}
431
432fn validate_size(size: u64) -> Result<(), CompressionError> {
433 if size > MAX_DECOMPRESSED_SIZE {
434 return Err(CompressionError::SizeLimitExceeded {
435 size,
436 max: MAX_DECOMPRESSED_SIZE,
437 });
438 }
439
440 Ok(())
441}
442
443#[cfg(any(feature = "zstd", test))]
444fn checked_size_to_usize(field: &str, size: u64) -> Result<usize, CompressionError> {
445 usize::try_from(size)
446 .map_err(|_| CompressionError::CorruptedData(format!("{field} exceeds platform limits")))
447}
448
449fn validate_decompressed_len(expected: u64, actual: usize) -> Result<(), CompressionError> {
450 if actual as u64 != expected {
451 return Err(CompressionError::CorruptedData(format!(
452 "decompressed size mismatch: expected {expected}, got {actual}",
453 )));
454 }
455
456 Ok(())
457}
458
459fn has_magic_at(data: &[u8], offset: usize, magic: [u8; 4]) -> bool {
460 data.get(offset..offset + magic.len()) == Some(magic.as_slice())
461}
462
463#[cfg(test)]
464mod compression_tests;