Skip to main content

crush_core/
compression.rs

1//! Compression functionality
2//!
3//! Provides the public `compress()` API that compresses data using the default
4//! DEFLATE plugin and wraps it with a Crush header.
5
6use crate::cancel::CancellationToken;
7use crate::error::Result;
8use crate::plugin::registry::{get_default_plugin, get_plugin_by_magic};
9use crate::plugin::{
10    run_with_timeout, run_with_timeout_and_cancel, CrushHeader, FileMetadata, PluginSelector,
11    ScoringWeights,
12};
13use crc32fast::Hasher;
14use std::sync::Arc;
15use std::time::Duration;
16
17/// Default timeout for compression operations (0 = no timeout)
18pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(0);
19
20/// Compression options for plugin selection and scoring
21#[derive(Clone)]
22pub struct CompressionOptions {
23    /// Optional plugin name for manual override
24    plugin_name: Option<String>,
25
26    /// Scoring weights for automatic selection
27    weights: ScoringWeights,
28
29    /// Timeout for compression operation
30    timeout: Duration,
31
32    /// Optional file metadata
33    file_metadata: Option<FileMetadata>,
34
35    /// Optional cancellation token for Ctrl+C support
36    cancel_token: Option<Arc<dyn CancellationToken>>,
37}
38
39impl CompressionOptions {
40    /// Create new compression options with default settings
41    #[must_use]
42    pub fn new() -> Self {
43        Self {
44            plugin_name: None,
45            weights: ScoringWeights::default(),
46            timeout: DEFAULT_TIMEOUT,
47            file_metadata: None,
48            cancel_token: None,
49        }
50    }
51
52    /// Specify a plugin by name (manual override)
53    #[must_use]
54    pub fn with_plugin(mut self, name: &str) -> Self {
55        self.plugin_name = Some(name.to_string());
56        self
57    }
58
59    /// Set custom scoring weights
60    #[must_use]
61    pub fn with_weights(mut self, weights: ScoringWeights) -> Self {
62        self.weights = weights;
63        self
64    }
65
66    /// Set timeout for compression operation
67    #[must_use]
68    pub fn with_timeout(mut self, timeout: Duration) -> Self {
69        self.timeout = timeout;
70        self
71    }
72
73    /// Set file metadata
74    #[must_use]
75    pub fn with_file_metadata(mut self, metadata: FileMetadata) -> Self {
76        self.file_metadata = Some(metadata);
77        self
78    }
79
80    /// Set cancellation token for Ctrl+C support
81    #[must_use]
82    pub fn with_cancel_token(mut self, token: Arc<dyn CancellationToken>) -> Self {
83        self.cancel_token = Some(token);
84        self
85    }
86}
87
88impl std::fmt::Debug for CompressionOptions {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("CompressionOptions")
91            .field("plugin_name", &self.plugin_name)
92            .field("weights", &self.weights)
93            .field("timeout", &self.timeout)
94            .field("file_metadata", &self.file_metadata)
95            .field(
96                "cancel_token",
97                &self.cancel_token.as_ref().map(|_| "Some(...)"),
98            )
99            .finish()
100    }
101}
102
103impl Default for CompressionOptions {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109/// Compress data using the default compression algorithm
110///
111/// Uses the DEFLATE plugin (magic number `0x43525100`) to compress the input data.
112/// The compressed output includes a 16-byte Crush header with:
113/// - Magic number identifying the plugin
114/// - Original uncompressed size
115/// - CRC32 checksum of the compressed payload
116///
117/// # Errors
118///
119/// Returns an error if:
120/// - No default plugin is available (should never happen - DEFLATE is always registered)
121/// - Compression operation fails
122///
123/// # Examples
124///
125/// ```
126/// use crush_core::{init_plugins, compress};
127///
128/// init_plugins().expect("Plugin initialization failed");
129/// let data = b"Hello, world!";
130/// let compressed = compress(data).expect("Compression failed");
131/// assert!(!compressed.is_empty());
132/// ```
133pub fn compress(input: &[u8]) -> Result<Vec<u8>> {
134    // Get the default DEFLATE plugin from registry
135    let plugin = get_default_plugin().ok_or_else(|| {
136        crate::error::PluginError::NotFound(
137            "Default DEFLATE plugin not found. Call init_plugins() first.".to_string(),
138        )
139    })?;
140
141    let default_magic = [0x43, 0x52, 0x01, 0x00];
142
143    // Clone input for move into timeout closure
144    let input_owned = input.to_vec();
145
146    // Compress the data with timeout protection
147    let compressed_payload = run_with_timeout(DEFAULT_TIMEOUT, move |cancel_flag| {
148        plugin.compress(&input_owned, cancel_flag)
149    })?;
150
151    // Calculate CRC32 of compressed payload
152    let mut hasher = Hasher::new();
153    hasher.update(&compressed_payload);
154    let crc32 = hasher.finalize();
155
156    // Create header with original size and CRC32
157    let header = CrushHeader::new(default_magic, input.len() as u64).with_crc32();
158
159    // Build final output: header + compressed payload
160    let mut output = Vec::with_capacity(CrushHeader::SIZE + 4 + compressed_payload.len());
161    output.extend_from_slice(&header.to_bytes());
162    output.extend_from_slice(&crc32.to_le_bytes());
163    output.extend_from_slice(&compressed_payload);
164
165    Ok(output)
166}
167
168/// Compress data with custom options (plugin selection, scoring weights)
169///
170/// Provides fine-grained control over plugin selection through either:
171/// - Manual plugin override by name
172/// - Automatic selection with custom scoring weights
173///
174/// # Errors
175///
176/// Returns an error if:
177/// - Specified plugin is not found (manual override)
178/// - No plugins are available (automatic selection)
179/// - Compression operation fails
180/// - Operation exceeds the specified timeout (0 = no timeout)
181///
182/// # Examples
183///
184/// ```
185/// use crush_core::{init_plugins, compress_with_options, CompressionOptions};
186///
187/// init_plugins().expect("Plugin initialization failed");
188/// let data = b"Hello, world!";
189///
190/// // Use default automatic selection
191/// let options = CompressionOptions::default();
192/// let compressed = compress_with_options(data, &options).expect("Compression failed");
193///
194/// // Manual override: specify plugin by name
195/// let options = CompressionOptions::default().with_plugin("deflate");
196/// let compressed = compress_with_options(data, &options).expect("Compression failed");
197/// ```
198pub fn compress_with_options(input: &[u8], options: &CompressionOptions) -> Result<Vec<u8>> {
199    // Check if already cancelled before starting
200    if let Some(ref token) = options.cancel_token {
201        if token.is_cancelled() {
202            return Err(crate::error::CrushError::Cancelled);
203        }
204    }
205
206    // Select plugin based on options
207    let selector = PluginSelector::new(options.weights);
208
209    let selected_metadata = if let Some(ref plugin_name) = options.plugin_name {
210        // Manual override
211        selector.select_by_name(plugin_name)?
212    } else {
213        // Automatic selection
214        selector.select()?
215    };
216
217    // Get the actual plugin from registry
218    let plugin = get_plugin_by_magic(selected_metadata.magic_number).ok_or_else(|| {
219        crate::error::PluginError::NotFound(format!(
220            "Plugin '{}' metadata found but not in registry",
221            selected_metadata.name
222        ))
223    })?;
224
225    // Clone input for move into timeout closure
226    let input_owned = input.to_vec();
227    let timeout = options.timeout;
228    let cancel_token = options.cancel_token.clone();
229
230    // Compress the data with timeout and cancellation protection
231    let compressed_payload =
232        run_with_timeout_and_cancel(timeout, cancel_token, move |cancel_flag| {
233            plugin.compress(&input_owned, cancel_flag)
234        })?;
235
236    // Handle file metadata
237    let metadata_bytes = options
238        .file_metadata
239        .as_ref()
240        .map_or(Vec::new(), super::plugin::metadata::FileMetadata::to_bytes);
241
242    let mut payload_with_metadata = Vec::new();
243    if !metadata_bytes.is_empty() {
244        #[allow(clippy::cast_possible_truncation)]
245        let metadata_len = metadata_bytes.len() as u16; // FileMetadata is always < 64KB
246        payload_with_metadata.extend_from_slice(&metadata_len.to_le_bytes());
247        payload_with_metadata.extend_from_slice(&metadata_bytes);
248    }
249    payload_with_metadata.extend_from_slice(&compressed_payload);
250
251    // Calculate CRC32 of compressed payload + metadata
252    let mut hasher = Hasher::new();
253    hasher.update(&payload_with_metadata);
254    let crc32 = hasher.finalize();
255
256    // Create header with original size and CRC32
257    let mut header =
258        CrushHeader::new(selected_metadata.magic_number, input.len() as u64).with_crc32();
259    if !metadata_bytes.is_empty() {
260        header = header.with_metadata();
261    }
262
263    // Build final output: header + CRC32 + payload_with_metadata
264    let mut output = Vec::with_capacity(CrushHeader::SIZE + 4 + payload_with_metadata.len());
265    output.extend_from_slice(&header.to_bytes());
266    output.extend_from_slice(&crc32.to_le_bytes());
267    output.extend_from_slice(&payload_with_metadata);
268
269    Ok(output)
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::init_plugins;
276
277    #[test]
278    #[allow(clippy::unwrap_used)]
279    fn test_compress_basic() {
280        init_plugins().unwrap();
281        let data = b"Hello, Crush!";
282        let compressed = compress(data).unwrap();
283
284        // Should have at least the header
285        assert!(compressed.len() >= CrushHeader::SIZE);
286
287        // Header should have correct magic number
288        assert_eq!(&compressed[0..4], &[0x43, 0x52, 0x01, 0x00]);
289    }
290
291    #[test]
292    #[allow(clippy::unwrap_used)]
293    fn test_compress_empty() {
294        init_plugins().unwrap();
295        let data = b"";
296        let compressed = compress(data).unwrap();
297
298        // Even empty data gets a header
299        assert!(compressed.len() >= CrushHeader::SIZE);
300    }
301
302    #[test]
303    #[allow(clippy::unwrap_used)]
304    fn test_compress_large() {
305        init_plugins().unwrap();
306        let data = vec![0x42; 1_000_000]; // 1MB of repeated bytes
307        let compressed = compress(&data).unwrap();
308
309        // Should compress well (repeated data)
310        assert!(compressed.len() < data.len() / 2);
311    }
312
313    // CompressionOptions tests
314    #[test]
315    fn test_compression_options_default() {
316        let options = CompressionOptions::default();
317        assert_eq!(options.timeout, DEFAULT_TIMEOUT);
318        assert!(options.plugin_name.is_none());
319        assert!(options.file_metadata.is_none());
320        assert!(options.cancel_token.is_none());
321    }
322
323    #[test]
324    fn test_compression_options_new() {
325        let options = CompressionOptions::new();
326        assert_eq!(options.timeout, DEFAULT_TIMEOUT);
327    }
328
329    #[test]
330    fn test_compression_options_with_plugin() {
331        let options = CompressionOptions::new().with_plugin("deflate");
332        assert_eq!(options.plugin_name, Some("deflate".to_string()));
333    }
334
335    #[test]
336    fn test_compression_options_with_weights() {
337        let weights = ScoringWeights {
338            throughput: 0.5,
339            compression_ratio: 0.5,
340        };
341        let options = CompressionOptions::new().with_weights(weights);
342        assert!((options.weights.throughput - 0.5).abs() < f64::EPSILON);
343        assert!((options.weights.compression_ratio - 0.5).abs() < f64::EPSILON);
344    }
345
346    #[test]
347    fn test_compression_options_with_timeout() {
348        let timeout = Duration::from_secs(10);
349        let options = CompressionOptions::new().with_timeout(timeout);
350        assert_eq!(options.timeout, timeout);
351    }
352
353    #[test]
354    fn test_compression_options_with_file_metadata() {
355        let metadata = FileMetadata {
356            mtime: Some(1_234_567_890),
357            #[cfg(unix)]
358            permissions: Some(0o644),
359        };
360        let options = CompressionOptions::new().with_file_metadata(metadata);
361        assert!(options.file_metadata.is_some());
362    }
363
364    #[test]
365    fn test_compression_options_with_cancel_token() {
366        use crate::cancel::AtomicCancellationToken;
367        let token: Arc<dyn CancellationToken> = Arc::new(AtomicCancellationToken::new());
368        let options = CompressionOptions::new().with_cancel_token(token);
369        assert!(options.cancel_token.is_some());
370    }
371
372    #[test]
373    fn test_compression_options_debug() {
374        let options = CompressionOptions::new().with_plugin("test");
375        let debug_str = format!("{options:?}");
376        assert!(debug_str.contains("CompressionOptions"));
377        assert!(debug_str.contains("test"));
378    }
379
380    // compress_with_options tests
381    #[test]
382    #[allow(clippy::unwrap_used)]
383    fn test_compress_with_options_default() {
384        init_plugins().unwrap();
385        let data = b"Test data for compression";
386        let options = CompressionOptions::default();
387        let compressed = compress_with_options(data, &options).unwrap();
388
389        assert!(compressed.len() >= CrushHeader::SIZE);
390    }
391
392    #[test]
393    #[allow(clippy::unwrap_used)]
394    fn test_compress_with_options_manual_plugin() {
395        init_plugins().unwrap();
396        let data = b"Test data";
397        let options = CompressionOptions::default().with_plugin("deflate");
398        let compressed = compress_with_options(data, &options).unwrap();
399
400        assert!(compressed.len() >= CrushHeader::SIZE);
401        assert_eq!(&compressed[0..4], &[0x43, 0x52, 0x01, 0x00]); // DEFLATE magic
402    }
403
404    #[test]
405    #[allow(clippy::unwrap_used)]
406    fn test_compress_with_options_invalid_plugin() {
407        init_plugins().unwrap();
408        let data = b"Test data";
409        let options = CompressionOptions::default().with_plugin("nonexistent");
410        let result = compress_with_options(data, &options);
411
412        assert!(result.is_err());
413    }
414
415    #[test]
416    #[allow(clippy::unwrap_used)]
417    fn test_compress_with_options_with_metadata() {
418        init_plugins().unwrap();
419        let data = b"Test data with metadata";
420        let metadata = FileMetadata {
421            mtime: Some(1_234_567_890),
422            #[cfg(unix)]
423            permissions: Some(0o644),
424        };
425        let options = CompressionOptions::default().with_file_metadata(metadata);
426        let compressed = compress_with_options(data, &options).unwrap();
427
428        // Should have header + metadata
429        assert!(compressed.len() > CrushHeader::SIZE + 4); // +4 for metadata length
430    }
431
432    #[test]
433    #[allow(clippy::unwrap_used)]
434    fn test_compress_with_options_cancellation() {
435        use crate::cancel::AtomicCancellationToken;
436
437        init_plugins().unwrap();
438        let data = b"Test data";
439        let token = Arc::new(AtomicCancellationToken::new());
440        token.cancel(); // Cancel before compression
441
442        let options = CompressionOptions::default().with_cancel_token(token);
443        let result = compress_with_options(data, &options);
444
445        assert!(result.is_err());
446        assert!(matches!(
447            result.unwrap_err(),
448            crate::error::CrushError::Cancelled
449        ));
450    }
451
452    #[test]
453    #[allow(clippy::unwrap_used)]
454    fn test_compress_with_options_timeout() {
455        init_plugins().unwrap();
456        let data = vec![0u8; 10_000_000]; // Large data
457        let options = CompressionOptions::default().with_timeout(Duration::from_nanos(1)); // Extremely short timeout
458
459        let result = compress_with_options(&data, &options);
460        // May succeed or timeout depending on system speed
461        // Just verify it doesn't panic
462        let _ = result;
463    }
464
465    #[test]
466    #[allow(clippy::unwrap_used)]
467    fn test_compress_with_options_zero_timeout() {
468        init_plugins().unwrap();
469        let data = b"Test with no timeout";
470        let options = CompressionOptions::default().with_timeout(Duration::from_secs(0)); // No timeout
471
472        let result = compress_with_options(data, &options);
473        assert!(result.is_ok());
474    }
475
476    #[test]
477    #[allow(clippy::unwrap_used)]
478    fn test_compress_roundtrip_with_options() {
479        use crate::decompress;
480
481        init_plugins().unwrap();
482        let original = b"Roundtrip test data with options";
483        let options = CompressionOptions::default();
484        let compressed = compress_with_options(original, &options).unwrap();
485        let result = decompress(&compressed).unwrap();
486
487        assert_eq!(result.data, original);
488    }
489
490    #[test]
491    #[allow(clippy::unwrap_used)]
492    fn test_compression_options_builder_chain() {
493        use crate::cancel::AtomicCancellationToken;
494
495        let metadata = FileMetadata {
496            mtime: Some(1_234_567_890),
497            #[cfg(unix)]
498            permissions: Some(0o644),
499        };
500        let weights = ScoringWeights {
501            throughput: 0.7,
502            compression_ratio: 0.3,
503        };
504        let token: Arc<dyn CancellationToken> = Arc::new(AtomicCancellationToken::new());
505
506        // Test method chaining
507        let options = CompressionOptions::new()
508            .with_plugin("deflate")
509            .with_weights(weights)
510            .with_timeout(Duration::from_secs(30))
511            .with_file_metadata(metadata)
512            .with_cancel_token(token);
513
514        assert_eq!(options.plugin_name, Some("deflate".to_string()));
515        assert!((options.weights.throughput - 0.7).abs() < f64::EPSILON);
516        assert_eq!(options.timeout, Duration::from_secs(30));
517        assert!(options.file_metadata.is_some());
518        assert!(options.cancel_token.is_some());
519    }
520}