Skip to main content

objects/store/compression/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Compression utilities for Heddle storage.
3//!
4//! Provides configurable compression with support for:
5//! - zstd: High compression ratio, good speed
6//! - Delta encoding: For similar versions of the same file
7
8#[cfg(feature = "zstd")]
9use std::io::Read;
10
11#[cfg(test)]
12use crate::delta::{DeltaDecoder, DeltaEncoder, MAX_DELTA_OUTPUT_SIZE};
13
14const COMPRESSED_HEADER_LEN: usize = 9;
15const LEGACY_COMPRESSED_HEADER_LEN: usize = 5;
16const MAX_DECOMPRESSED_SIZE: u64 = 256 * 1024 * 1024;
17const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
18
19/// Compression algorithm selection.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21#[repr(u8)]
22enum CompressionType {
23    /// No compression (stored as-is).
24    None = 0,
25    /// Zstandard compression.
26    Zstd = 1,
27    /// Delta compression (stores diff from base).
28    Delta = 2,
29}
30
31impl CompressionType {
32    /// Convert from byte value.
33    fn from_u8(value: u8) -> Option<Self> {
34        match value {
35            0 => Some(CompressionType::None),
36            1 => Some(CompressionType::Zstd),
37            2 => Some(CompressionType::Delta),
38            _ => None,
39        }
40    }
41}
42
43/// Compression configuration.
44#[derive(Debug, Clone, Copy)]
45pub struct CompressionConfig {
46    /// Whether compression is enabled.
47    pub enabled: bool,
48    /// Compression level (algorithm-specific).
49    /// For zstd: 1-22 (1=fast, 22=best, 3=default)
50    pub level: i32,
51    /// Minimum size to compress (smaller objects aren't worth it).
52    pub min_size: usize,
53    /// Maximum size for delta compression base.
54    pub max_delta_size: usize,
55}
56
57impl Default for CompressionConfig {
58    fn default() -> Self {
59        Self {
60            enabled: cfg!(feature = "zstd"),
61            level: 3,                   // zstd default
62            min_size: 256,              // Don't compress tiny objects
63            max_delta_size: 10_000_000, // 10MB max for delta base
64        }
65    }
66}
67
68impl CompressionConfig {
69    /// Create configuration from environment variables.
70    pub fn from_env() -> Self {
71        let mut config = Self::default();
72
73        if let Ok(val) = std::env::var("HEDDLE_COMPRESSION") {
74            let requested = val != "0" && val.to_lowercase() != "false";
75            config.enabled = requested && cfg!(feature = "zstd");
76        }
77
78        if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_LEVEL")
79            && let Ok(level) = val.parse::<i32>()
80        {
81            config.level = level.clamp(1, 22);
82        }
83
84        if let Ok(val) = std::env::var("HEDDLE_COMPRESSION_MIN_SIZE")
85            && let Ok(size) = val.parse::<usize>()
86        {
87            config.min_size = size;
88        }
89
90        config
91    }
92
93    /// Disable compression.
94    pub fn disabled() -> Self {
95        Self {
96            enabled: false,
97            level: 0,
98            min_size: usize::MAX,
99            max_delta_size: 0,
100        }
101    }
102}
103
104/// Compression error type.
105#[derive(Debug, thiserror::Error)]
106pub enum CompressionError {
107    #[error("decompression failed: {0}")]
108    DecompressionFailed(String),
109    #[error("compression failed: {0}")]
110    CompressionFailed(String),
111    #[error("invalid compression type: {0}")]
112    InvalidType(u8),
113    #[error("corrupted data: {0}")]
114    CorruptedData(String),
115    #[error("invalid operation: {0}")]
116    InvalidOperation(String),
117    #[error("object size {size} exceeds maximum {max}")]
118    SizeLimitExceeded { size: u64, max: u64 },
119}
120
121#[cfg(feature = "zstd")]
122/// Compress data using zstd.
123fn compress_zstd(data: &[u8], level: i32) -> Result<Vec<u8>, CompressionError> {
124    zstd::encode_all(data, level).map_err(|e| CompressionError::CompressionFailed(e.to_string()))
125}
126
127#[cfg(not(feature = "zstd"))]
128fn compress_zstd(_data: &[u8], _level: i32) -> Result<Vec<u8>, CompressionError> {
129    Err(CompressionError::InvalidOperation(
130        "zstd compression support not compiled into this build".to_string(),
131    ))
132}
133
134#[cfg(feature = "zstd")]
135/// Decompress zstd data while enforcing the recorded output size.
136fn decompress_zstd(data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
137    validate_size(expected_size)?;
138
139    let mut decoder = zstd::stream::read::Decoder::new(data)
140        .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
141    let mut decompressed = Vec::with_capacity(expected_size as usize);
142    let mut buffer = [0u8; 8192];
143
144    loop {
145        let bytes_read = decoder
146            .read(&mut buffer)
147            .map_err(|e| CompressionError::DecompressionFailed(e.to_string()))?;
148        if bytes_read == 0 {
149            break;
150        }
151
152        let next_size = (decompressed.len() + bytes_read) as u64;
153        if next_size > expected_size {
154            return Err(CompressionError::CorruptedData(format!(
155                "decompressed size exceeds recorded header size: expected {expected_size}, got at least {next_size}",
156            )));
157        }
158
159        decompressed.extend_from_slice(&buffer[..bytes_read]);
160    }
161
162    Ok(decompressed)
163}
164
165#[cfg(not(feature = "zstd"))]
166fn decompress_zstd(_data: &[u8], expected_size: u64) -> Result<Vec<u8>, CompressionError> {
167    validate_size(expected_size)?;
168    Err(CompressionError::InvalidOperation(
169        "zstd-compressed data is unsupported in this build".to_string(),
170    ))
171}
172
173/// Compress data with automatic algorithm selection.
174///
175/// Returns the compressed data with header, or None if compression
176/// doesn't help (compressed would be larger).
177pub fn compress(
178    data: &[u8],
179    config: &CompressionConfig,
180) -> Result<Option<Vec<u8>>, CompressionError> {
181    if !config.enabled || data.len() < config.min_size {
182        return Ok(None);
183    }
184
185    validate_size(data.len() as u64)?;
186
187    // Try zstd compression
188    let compressed = compress_zstd(data, config.level)?;
189
190    // Only use compression if it actually helps
191    if compressed.len() >= data.len() {
192        return Ok(None);
193    }
194
195    // Build header: [type][size][data]
196    let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + compressed.len());
197    result.push(CompressionType::Zstd as u8);
198    result.extend_from_slice(&(data.len() as u64).to_be_bytes());
199    result.extend_from_slice(&compressed);
200
201    Ok(Some(result))
202}
203
204#[cfg(test)]
205/// Compress using delta encoding against a base.
206///
207/// Returns the delta-compressed data with header, or None if delta
208/// doesn't help.
209fn compress_delta(
210    data: &[u8],
211    base: &[u8],
212    config: &CompressionConfig,
213) -> Result<Option<Vec<u8>>, CompressionError> {
214    if !config.enabled || data.len() < config.min_size || base.len() > config.max_delta_size {
215        return Ok(None);
216    }
217
218    validate_size(data.len() as u64)?;
219
220    let delta = DeltaEncoder::encode(base, data);
221
222    // Only use delta if it helps
223    if delta.len() >= data.len() {
224        return Ok(None);
225    }
226
227    // Build header: [type][size][data]
228    let mut result = Vec::with_capacity(COMPRESSED_HEADER_LEN + delta.len());
229    result.push(CompressionType::Delta as u8);
230    result.extend_from_slice(&(data.len() as u64).to_be_bytes());
231    result.extend_from_slice(&delta);
232
233    Ok(Some(result))
234}
235
236/// Decompress data based on header.
237///
238/// Returns the decompressed data, or original data if uncompressed.
239pub fn decompress(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
240    if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
241        // Too short for header, assume uncompressed
242        return Ok(data.to_vec());
243    }
244
245    let compression_type =
246        CompressionType::from_u8(data[0]).ok_or_else(|| CompressionError::InvalidType(data[0]))?;
247
248    match compression_type {
249        CompressionType::None => {
250            let offset = if data.len() >= COMPRESSED_HEADER_LEN {
251                COMPRESSED_HEADER_LEN
252            } else {
253                LEGACY_COMPRESSED_HEADER_LEN
254            };
255            let expected_size = if offset == COMPRESSED_HEADER_LEN {
256                read_u64_size(data)?
257            } else {
258                read_u32_size(data)?
259            };
260            let payload = data[offset..].to_vec();
261            validate_decompressed_len(expected_size, payload.len())?;
262            Ok(payload)
263        }
264        CompressionType::Zstd if zstd_header_len(data).is_some() => {
265            decompress_zstd_with_header(data)
266        }
267        CompressionType::Zstd => Ok(data.to_vec()),
268        CompressionType::Delta => {
269            // Delta requires base - this is handled separately
270            Err(CompressionError::InvalidOperation(
271                "Delta compression requires base object".to_string(),
272            ))
273        }
274    }
275}
276
277#[cfg(test)]
278/// Decompress delta-encoded data.
279fn decompress_delta(delta_data: &[u8], base: &[u8]) -> Result<Vec<u8>, CompressionError> {
280    if delta_data.len() < LEGACY_COMPRESSED_HEADER_LEN {
281        return Err(CompressionError::CorruptedData(
282            "Delta data too short".to_string(),
283        ));
284    }
285
286    let compression_type = CompressionType::from_u8(delta_data[0])
287        .ok_or_else(|| CompressionError::InvalidType(delta_data[0]))?;
288
289    if compression_type != CompressionType::Delta {
290        return Err(CompressionError::InvalidOperation(
291            "Expected delta compression".to_string(),
292        ));
293    }
294
295    decompress_delta_with_header(delta_data, base)
296}
297
298/// Check if data is compressed (has compression header).
299pub fn is_compressed(data: &[u8]) -> bool {
300    if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
301        return false;
302    }
303
304    matches!(
305        CompressionType::from_u8(data[0]),
306        Some(CompressionType::Zstd)
307    ) && zstd_header_len(data).is_some()
308}
309
310/// Peek at the recorded *uncompressed* size in a header-prefixed blob,
311/// without decompressing the payload. Returns `None` for short or
312/// unprefixed inputs (the caller can then fall back to the file length).
313///
314/// Used by header-only size queries (e.g. [`ObjectStore::blob_size`])
315/// where reading the full blob would dominate. Only the first 9 bytes
316/// of the input are consulted.
317pub fn header_uncompressed_size(data: &[u8]) -> Option<u64> {
318    if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
319        return None;
320    }
321    match CompressionType::from_u8(data[0])? {
322        CompressionType::Zstd => match zstd_header_len(data)? {
323            COMPRESSED_HEADER_LEN => {
324                u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().ok()?).into()
325            }
326            LEGACY_COMPRESSED_HEADER_LEN => Some(u32::from_be_bytes(
327                data[1..LEGACY_COMPRESSED_HEADER_LEN].try_into().ok()?,
328            ) as u64),
329            _ => None,
330        },
331        CompressionType::None | CompressionType::Delta => None,
332    }
333}
334
335#[cfg(test)]
336/// Get compression info from header.
337fn compression_info(data: &[u8]) -> Option<(CompressionType, u64)> {
338    if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
339        return None;
340    }
341
342    let compression_type = CompressionType::from_u8(data[0])?;
343    let uncompressed_size = if data.len() >= COMPRESSED_HEADER_LEN {
344        u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().ok()?)
345    } else {
346        u32::from_be_bytes(data[1..LEGACY_COMPRESSED_HEADER_LEN].try_into().ok()?) as u64
347    };
348
349    Some((compression_type, uncompressed_size))
350}
351
352fn decompress_zstd_with_header(data: &[u8]) -> Result<Vec<u8>, CompressionError> {
353    if has_magic_at(data, COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
354        return try_decompress_zstd(data, COMPRESSED_HEADER_LEN, read_u64_size);
355    }
356
357    if has_magic_at(data, LEGACY_COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
358        return try_decompress_zstd(data, LEGACY_COMPRESSED_HEADER_LEN, read_u32_size);
359    }
360
361    try_decompress_zstd(data, COMPRESSED_HEADER_LEN, read_u64_size)
362        .or_else(|_| try_decompress_zstd(data, LEGACY_COMPRESSED_HEADER_LEN, read_u32_size))
363}
364
365fn zstd_header_len(data: &[u8]) -> Option<usize> {
366    if has_magic_at(data, COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
367        Some(COMPRESSED_HEADER_LEN)
368    } else if has_magic_at(data, LEGACY_COMPRESSED_HEADER_LEN, ZSTD_MAGIC) {
369        Some(LEGACY_COMPRESSED_HEADER_LEN)
370    } else {
371        None
372    }
373}
374
375fn try_decompress_zstd<F>(
376    data: &[u8],
377    header_len: usize,
378    read_size: F,
379) -> Result<Vec<u8>, CompressionError>
380where
381    F: Fn(&[u8]) -> Result<u64, CompressionError>,
382{
383    let uncompressed_size = read_size(data)?;
384    let decompressed = decompress_zstd(&data[header_len..], uncompressed_size)?;
385    validate_decompressed_len(uncompressed_size, decompressed.len())?;
386    Ok(decompressed)
387}
388
389#[cfg(test)]
390fn decompress_delta_with_header(
391    delta_data: &[u8],
392    base: &[u8],
393) -> Result<Vec<u8>, CompressionError> {
394    if let Ok(result) = try_decompress_delta(delta_data, base, COMPRESSED_HEADER_LEN, read_u64_size)
395    {
396        return Ok(result);
397    }
398
399    try_decompress_delta(
400        delta_data,
401        base,
402        LEGACY_COMPRESSED_HEADER_LEN,
403        read_u32_size,
404    )
405}
406
407#[cfg(test)]
408fn try_decompress_delta<F>(
409    delta_data: &[u8],
410    base: &[u8],
411    header_len: usize,
412    read_size: F,
413) -> Result<Vec<u8>, CompressionError>
414where
415    F: Fn(&[u8]) -> Result<u64, CompressionError>,
416{
417    let uncompressed_size = read_size(delta_data)?;
418
419    if uncompressed_size > MAX_DELTA_OUTPUT_SIZE as u64 {
420        return Err(CompressionError::DecompressionFailed(format!(
421            "delta output size {} exceeds max {}",
422            uncompressed_size, MAX_DELTA_OUTPUT_SIZE
423        )));
424    }
425
426    let delta = &delta_data[header_len..];
427    let decompressed = DeltaDecoder::decode(base, delta, uncompressed_size as usize)
428        .map_err(|error| CompressionError::DecompressionFailed(error.to_string()))?;
429    validate_decompressed_len(uncompressed_size, decompressed.len())?;
430    Ok(decompressed)
431}
432
433fn read_u64_size(data: &[u8]) -> Result<u64, CompressionError> {
434    if data.len() < COMPRESSED_HEADER_LEN {
435        return Err(CompressionError::CorruptedData(
436            "compression header truncated".to_string(),
437        ));
438    }
439
440    let recorded_size =
441        u64::from_be_bytes(data[1..COMPRESSED_HEADER_LEN].try_into().map_err(|_| {
442            CompressionError::CorruptedData("compression header truncated".to_string())
443        })?);
444    validate_size(recorded_size)?;
445    Ok(recorded_size)
446}
447
448fn read_u32_size(data: &[u8]) -> Result<u64, CompressionError> {
449    if data.len() < LEGACY_COMPRESSED_HEADER_LEN {
450        return Err(CompressionError::CorruptedData(
451            "compression header truncated".to_string(),
452        ));
453    }
454
455    let recorded_size = u32::from_be_bytes(
456        data[1..LEGACY_COMPRESSED_HEADER_LEN]
457            .try_into()
458            .map_err(|_| {
459                CompressionError::CorruptedData("compression header truncated".to_string())
460            })?,
461    ) as u64;
462    validate_size(recorded_size)?;
463    Ok(recorded_size)
464}
465
466fn validate_size(size: u64) -> Result<(), CompressionError> {
467    if size > MAX_DECOMPRESSED_SIZE {
468        return Err(CompressionError::SizeLimitExceeded {
469            size,
470            max: MAX_DECOMPRESSED_SIZE,
471        });
472    }
473
474    Ok(())
475}
476
477fn validate_decompressed_len(expected: u64, actual: usize) -> Result<(), CompressionError> {
478    if actual as u64 != expected {
479        return Err(CompressionError::CorruptedData(format!(
480            "decompressed size mismatch: expected {expected}, got {actual}",
481        )));
482    }
483
484    Ok(())
485}
486
487fn has_magic_at(data: &[u8], offset: usize, magic: [u8; 4]) -> bool {
488    data.get(offset..offset + magic.len()) == Some(magic.as_slice())
489}
490
491#[cfg(test)]
492mod compression_tests;