Skip to main content

objects/store/compression/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2#![deny(clippy::cast_possible_truncation)]
3
4//! Compression utilities for Heddle storage.
5//!
6//! Provides configurable compression with support for:
7//! - zstd: High compression ratio, good speed
8//! - Delta encoding: For similar versions of the same file
9
10#[cfg(feature = "zstd")]
11use std::io::Read;
12
13#[cfg(test)]
14use crate::delta::{DeltaDecoder, DeltaEncoder, MAX_DELTA_OUTPUT_SIZE};
15
16const COMPRESSED_HEADER_LEN: usize = 9;
17const MAX_DECOMPRESSED_SIZE: u64 = 256 * 1024 * 1024;
18const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
19
20/// Compression algorithm selection.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u8)]
23enum CompressionType {
24    /// No compression (stored as-is).
25    None = 0,
26    /// Zstandard compression.
27    Zstd = 1,
28    /// Delta compression (stores diff from base).
29    Delta = 2,
30}
31
32impl CompressionType {
33    /// Convert from byte value.
34    fn from_u8(value: u8) -> Option<Self> {
35        match value {
36            0 => Some(CompressionType::None),
37            1 => Some(CompressionType::Zstd),
38            2 => Some(CompressionType::Delta),
39            _ => None,
40        }
41    }
42}
43
44/// Compression configuration.
45#[derive(Debug, Clone, Copy)]
46pub struct CompressionConfig {
47    /// Whether compression is enabled.
48    pub enabled: bool,
49    /// Compression level (algorithm-specific).
50    /// For zstd: 1-22 (1=fast, 22=best, 3=default)
51    pub level: i32,
52    /// Minimum size to compress (smaller objects aren't worth it).
53    pub min_size: usize,
54    /// Maximum size for delta compression base.
55    pub max_delta_size: usize,
56}
57
58impl Default for CompressionConfig {
59    fn default() -> Self {
60        Self {
61            enabled: cfg!(feature = "zstd"),
62            level: 3,                   // zstd default
63            min_size: 256,              // Don't compress tiny objects
64            max_delta_size: 10_000_000, // 10MB max for delta base
65        }
66    }
67}
68
69impl CompressionConfig {
70    /// Create configuration from environment variables.
71    pub fn from_env() -> Self {
72        let mut config = Self::default();
73
74        if let Ok(val) = std::env::var("HEDDLE_COMPRESSION") {
75            let requested = val != "0" && val.to_lowercase() != "false";
76            config.enabled = requested && cfg!(feature = "zstd");
77        }
78
79        if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_LEVEL")
80            && let Ok(level) = val.parse::<i32>()
81        {
82            config.level = level.clamp(1, 22);
83        }
84
85        if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_MIN_SIZE")
86            && let Ok(size) = val.parse::<usize>()
87        {
88            config.min_size = size;
89        }
90
91        config
92    }
93
94    /// Disable compression.
95    pub fn disabled() -> Self {
96        Self {
97            enabled: false,
98            level: 0,
99            min_size: usize::MAX,
100            max_delta_size: 0,
101        }
102    }
103}
104
105/// Compression error type.
106#[derive(Debug, thiserror::Error)]
107pub enum CompressionError {
108    #[error("decompression failed: {0}")]
109    DecompressionFailed(String),
110    #[error("compression failed: {0}")]
111    CompressionFailed(String),
112    #[error("invalid compression type: {0}")]
113    InvalidType(u8),
114    #[error("corrupted data: {0}")]
115    CorruptedData(String),
116    #[error("invalid operation: {0}")]
117    InvalidOperation(String),
118    #[error("object size {size} exceeds maximum {max}")]
119    SizeLimitExceeded { size: u64, max: u64 },
120}
121
122#[cfg(feature = "zstd")]
123/// Compress data using zstd.
124fn compress_zstd_impl(data: &[u8], level: i32) -> Result<Vec<u8>, CompressionError> {
125    zstd::encode_all(data, level).map_err(|e| CompressionError::CompressionFailed(e.to_string()))
126}
127
128#[cfg(not(feature = "zstd"))]
129fn compress_zstd_impl(_data: &[u8], _level: i32) -> Result<Vec<u8>, CompressionError> {
130    Err(CompressionError::InvalidOperation(
131        "zstd compression support not compiled into this build".to_string(),
132    ))
133}
134
135#[cfg(feature = "bench")]
136/// Compress data using zstd.
137pub fn compress_zstd(data: &[u8], level: i32) -> Result<Vec<u8>, CompressionError> {
138    compress_zstd_impl(data, level)
139}
140
141#[cfg(feature = "zstd")]
142/// Decompress zstd data while enforcing the recorded output size.
143fn decompress_zstd_impl(data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
144    validate_size(expected_size)?;
145    let expected_capacity = checked_size_to_usize("zstd expected size", expected_size)?;
146
147    let mut decoder = zstd::stream::read::Decoder::new(data)
148        .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
149    let mut decompressed = Vec::with_capacity(expected_capacity);
150    let mut buffer = [0u8; 8192];
151
152    loop {
153        let bytes_read = decoder
154            .read(&mut buffer)
155            .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
156        if bytes_read == 0 {
157            break;
158        }
159
160        let next_size = decompressed.len().checked_add(bytes_read).ok_or_else(|| {
161            CompressionError::CorruptedData("decompressed size overflows".to_string())
162        })?;
163        let next_size = u64::try_from(next_size).map_err(|_| {
164            CompressionError::CorruptedData("decompressed size exceeds platform limits".to_string())
165        })?;
166        if next_size > expected_size {
167            return Err(CompressionError::CorruptedData(format!(
168                "decompressed size exceeds recorded header size: expected {expected_size}, got at least {next_size}",
169            )));
170        }
171
172        decompressed.extend_from_slice(&buffer[..bytes_read]);
173    }
174
175    Ok(decompressed)
176}
177
178#[cfg(not(feature = "zstd"))]
179fn decompress_zstd_impl(_data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
180    validate_size(expected_size)?;
181    Err(CompressionError::InvalidOperation(
182        "zstd-compressed data is unsupported in this build".to_string(),
183    ))
184}
185
186#[cfg(feature = "bench")]
187/// Decompress zstd data while enforcing the recorded output size.
188pub fn decompress_zstd(data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
189    decompress_zstd_impl(data, expected_size)
190}
191
192/// Compress data with automatic algorithm selection.
193///
194/// Returns the compressed data with header, or None if compression
195/// doesn't help (compressed would be larger).
196pub fn compress(
197    data: &[u8],
198    config: &CompressionConfig,
199) -> Result<Option<Vec<u8>>, CompressionError> {
200    if !config.enabled || data.len() < config.min_size {
201        return Ok(None);
202    }
203
204    validate_size(data.len() as u64)?;
205
206    // Try zstd compression
207    let compressed = compress_zstd_impl(data, config.level)?;
208
209    // Only use compression if it actually helps
210    if compressed.len() >= data.len() {
211        return Ok(None);
212    }
213
214    // Build header: [type][size][data]
215    let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + compressed.len());
216    result.push(CompressionType::Zstd as u8);
217    result.extend_from_slice(&(data.len() as u64).to_be_bytes());
218    result.extend_from_slice(&compressed);
219
220    Ok(Some(result))
221}
222
223#[cfg(test)]
224/// Compress using delta encoding against a base.
225///
226/// Returns the delta-compressed data with header, or None if delta
227/// doesn't help.
228fn compress_delta(
229    data: &[u8],
230    base: &[u8],
231    config: &CompressionConfig,
232) -> Result<Option<Vec<u8>>, CompressionError> {
233    if !config.enabled || data.len() < config.min_size || base.len() > config.max_delta_size {
234        return Ok(None);
235    }
236
237    validate_size(data.len() as u64)?;
238
239    let delta = DeltaEncoder::encode(base, data);
240
241    // Only use delta if it helps
242    if delta.len() >= data.len() {
243        return Ok(None);
244    }
245
246    // Build header: [type][size][data]
247    let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + delta.len());
248    result.push(CompressionType::Delta as u8);
249    result.extend_from_slice(&(data.len() as u64).to_be_bytes());
250    result.extend_from_slice(&delta);
251
252    Ok(Some(result))
253}
254
255/// Decompress data based on header.
256///
257/// Returns the decompressed data, or original data if uncompressed.
258pub fn decompress(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
259    if data.len() < COMPRESSED_HEADER_LEN {
260        // Too short for header, assume uncompressed
261        return Ok(data.to_vec());
262    }
263
264    let compression_type =
265        CompressionType::from_u8(data[0]).ok_or_else(|| CompressionError::InvalidType(data[0]))?;
266
267    match compression_type {
268        CompressionType::None => {
269            let expected_size = read_u64_size(data)?;
270            let payload = data[COMPRESSED_HEADER_LEN..].to_vec();
271            validate_decompressed_len(expected_size, payload.len())?;
272            Ok(payload)
273        }
274        CompressionType::Zstd if zstd_header_len(data).is_some() => {
275            decompress_zstd_with_header(data)
276        }
277        CompressionType::Zstd => Ok(data.to_vec()),
278        CompressionType::Delta => {
279            // Delta requires base - this is handled separately
280            Err(CompressionError::InvalidOperation(
281                "Delta compression requires base object".to_string(),
282            ))
283        }
284    }
285}
286
287#[cfg(test)]
288/// Decompress delta-encoded data.
289fn decompress_delta(delta_data: &[u8], base: &[u8]) -> Result<Vec<u8>, CompressionError> {
290    if delta_data.len() < COMPRESSED_HEADER_LEN {
291        return Err(CompressionError::CorruptedData(
292            "Delta data too short".to_string(),
293        ));
294    }
295
296    let compression_type = CompressionType::from_u8(delta_data[0])
297        .ok_or_else(|| CompressionError::InvalidType(delta_data[0]))?;
298
299    if compression_type != CompressionType::Delta {
300        return Err(CompressionError::InvalidOperation(
301            "Expected delta compression".to_string(),
302        ));
303    }
304
305    decompress_delta_with_header(delta_data, base)
306}
307
308/// Check if data is compressed (has compression header).
309pub fn is_compressed(data: &[u8]) -> bool {
310    if data.len() < COMPRESSED_HEADER_LEN {
311        return false;
312    }
313
314    matches!(
315        CompressionType::from_u8(data[0]),
316        Some(CompressionType::Zstd)
317    ) && zstd_header_len(data).is_some()
318}
319
320/// Peek at the recorded *uncompressed* size in a header-prefixed blob,
321/// without decompressing the payload. Returns `None` for short or
322/// unprefixed inputs (the caller can then fall back to the file length).
323///
324/// Used by header-only size queries (e.g. [`ObjectStore::blob_size`])
325/// where reading the full blob would dominate. Only the first 9 bytes
326/// of the input are consulted.
327pub fn header_uncompressed_size(data: &[u8]) -> Option<u64> {
328    if data.len() < COMPRESSED_HEADER_LEN {
329        return None;
330    }
331    match CompressionType::from_u8(data[0])? {
332        CompressionType::Zstd => {
333            zstd_header_len(data)?;
334            Some(u64::from_be_bytes(
335                data[1..COMPRESSED_HEADER_LEN].try_into().ok()?,
336            ))
337        }
338        CompressionType::None | CompressionType::Delta => None,
339    }
340}
341
342#[cfg(test)]
343/// Get compression info from header.
344fn compression_info(data: &[u8]) -> Option<(CompressionType, u64)> {
345    if data.len() < COMPRESSED_HEADER_LEN {
346        return None;
347    }
348
349    let compression_type = CompressionType::from_u8(data[0])?;
350    let uncompressed_size = u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().ok()?);
351
352    Some((compression_type, uncompressed_size))
353}
354
355fn decompress_zstd_with_header(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
356    try_decompress_zstd(data, COMPRESSED_HEADER_LEN, read_u64_size)
357}
358
359fn zstd_header_len(data: &[u8]) -> Option<usize> {
360    if has_magic_at(data, COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
361        Some(COMPRESSED_HEADER_LEN)
362    } else {
363        None
364    }
365}
366
367fn try_decompress_zstd<F>(
368    data: &[u8],
369    header_len: usize,
370    read_size: F,
371) -> Result<Vec<u8>, CompressionError>
372where
373    F: Fn(&[u8]) -> Result<u64, CompressionError>,
374{
375    let uncompressed_size = read_size(data)?;
376    let decompressed = decompress_zstd_impl(&data[header_len..], uncompressed_size)?;
377    validate_decompressed_len(uncompressed_size, decompressed.len())?;
378    Ok(decompressed)
379}
380
381#[cfg(test)]
382fn decompress_delta_with_header(
383    delta_data: &[u8],
384    base: &[u8],
385) -> Result<Vec<u8>, CompressionError> {
386    try_decompress_delta(delta_data, base, COMPRESSED_HEADER_LEN, read_u64_size)
387}
388
389#[cfg(test)]
390fn try_decompress_delta<F>(
391    delta_data: &[u8],
392    base: &[u8],
393    header_len: usize,
394    read_size: F,
395) -> Result<Vec<u8>, CompressionError>
396where
397    F: Fn(&[u8]) -> Result<u64, CompressionError>,
398{
399    let uncompressed_size = read_size(delta_data)?;
400    let uncompressed_size_usize =
401        checked_size_to_usize("delta uncompressed size", uncompressed_size)?;
402
403    if uncompressed_size > MAX_DELTA_OUTPUT_SIZE as u64 {
404        return Err(CompressionError::DecompressionFailed(format!(
405            "delta output size {} exceeds max {}",
406            uncompressed_size, MAX_DELTA_OUTPUT_SIZE
407        )));
408    }
409
410    let delta = &delta_data[header_len..];
411    let decompressed = DeltaDecoder::decode(base, delta, uncompressed_size_usize)
412        .map_err(|error| CompressionError::DecompressionFailed(error.to_string()))?;
413    validate_decompressed_len(uncompressed_size, decompressed.len())?;
414    Ok(decompressed)
415}
416
417fn read_u64_size(data: &[u8]) -> Result<u64, CompressionError> {
418    if data.len() < COMPRESSED_HEADER_LEN {
419        return Err(CompressionError::CorruptedData(
420            "compression header truncated".to_string(),
421        ));
422    }
423
424    let recorded_size =
425        u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().map_err(|_| {
426            CompressionError::CorruptedData("compression header truncated".to_string())
427        })?);
428    validate_size(recorded_size)?;
429    Ok(recorded_size)
430}
431
432fn validate_size(size: u64) -> Result<(), CompressionError> {
433    if size > MAX_DECOMPRESSED_SIZE {
434        return Err(CompressionError::SizeLimitExceeded {
435            size,
436            max: MAX_DECOMPRESSED_SIZE,
437        });
438    }
439
440    Ok(())
441}
442
443#[cfg(any(feature = "zstd", test))]
444fn checked_size_to_usize(field: &str, size: u64) -> Result<usize, CompressionError> {
445    usize::try_from(size)
446        .map_err(|_| CompressionError::CorruptedData(format!("{field} exceeds platform limits")))
447}
448
449fn validate_decompressed_len(expected: u64, actual: usize) -> Result<(), CompressionError> {
450    if actual as u64 != expected {
451        return Err(CompressionError::CorruptedData(format!(
452            "decompressed size mismatch: expected {expected}, got {actual}",
453        )));
454    }
455
456    Ok(())
457}
458
459fn has_magic_at(data: &[u8], offset: usize, magic: [u8; 4]) -> bool {
460    data.get(offset..offset + magic.len()) == Some(magic.as_slice())
461}
462
463#[cfg(test)]
464mod compression_tests;