Skip to main content

crush_parallel/
config.rs

1//! Engine configuration and progress reporting types.
2
3use std::sync::{Arc, Mutex};
4
5/// Callback type invoked after each block completes.
6///
7/// Returning `false` signals cancellation. The engine halts at the next block
8/// boundary, discards partial output, and returns `CrushError::Cancelled`.
9pub type ProgressCallback = Box<dyn FnMut(ProgressEvent) -> bool + Send>;
10
11/// Which direction the engine is operating.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ProgressPhase {
14    /// Engine is compressing input blocks.
15    Compressing,
16    /// Engine is decompressing blocks.
17    Decompressing,
18}
19
20/// Data payload delivered to the progress callback after each block completes.
21#[derive(Debug, Clone)]
22pub struct ProgressEvent {
23    /// Cumulative uncompressed bytes processed so far.
24    pub bytes_processed: u64,
25    /// Number of blocks fully compressed/decompressed.
26    pub blocks_completed: u64,
27    /// Total blocks in the operation. `None` for streaming (unknown total).
28    pub total_blocks: Option<u64>,
29    /// Whether the engine is compressing or decompressing.
30    pub phase: ProgressPhase,
31}
32
33/// Configuration for the parallel compression/decompression engine.
34///
35/// Construct via [`EngineConfiguration::builder()`] or use the [`Default`] impl.
36#[derive(Clone)]
37pub struct EngineConfiguration {
38    /// Number of rayon worker threads. `0` = rayon default (logical CPU count).
39    pub workers: usize,
40    /// Dedicated rayon thread pool built from `workers`. `None` when `workers == 0`
41    /// (falls back to the global rayon pool).
42    pub(crate) thread_pool: Option<Arc<rayon::ThreadPool>>,
43    /// Uncompressed block size in bytes. Range: 64 KB – 256 MB.
44    pub block_size: u32,
45    /// DEFLATE compression level 0–9.
46    pub compression_level: u8,
47    /// If `compressed / uncompressed > max_expansion_ratio`, store block raw.
48    pub max_expansion_ratio: f64,
49    /// During decompression, halt if total bytes would exceed
50    /// `compressed_file_size * max_decompression_ratio`.
51    pub max_decompression_ratio: f64,
52    /// Enable per-block CRC32 checksums.
53    pub checksums: bool,
54    /// Optional progress callback.
55    pub progress: Option<Arc<Mutex<ProgressCallback>>>,
56}
57
58impl Default for EngineConfiguration {
59    fn default() -> Self {
60        Self {
61            workers: 0,
62            thread_pool: None,
63            block_size: 1_048_576, // 1 MB
64            compression_level: 6,
65            max_expansion_ratio: 1.0,
66            max_decompression_ratio: 1024.0,
67            checksums: true,
68            progress: None,
69        }
70    }
71}
72
73impl EngineConfiguration {
74    /// Create a new builder.
75    #[must_use]
76    pub fn builder() -> EngineConfigurationBuilder {
77        EngineConfigurationBuilder::new()
78    }
79}
80
81/// Builder for [`EngineConfiguration`].
82#[derive(Default)]
83pub struct EngineConfigurationBuilder {
84    inner: EngineConfiguration,
85}
86
87impl EngineConfigurationBuilder {
88    fn new() -> Self {
89        Self {
90            inner: EngineConfiguration::default(),
91        }
92    }
93
94    /// Set the number of worker threads (`0` = rayon default).
95    #[must_use]
96    pub fn workers(mut self, n: usize) -> Self {
97        self.inner.workers = n;
98        self
99    }
100
101    /// Set the uncompressed block size in bytes.
102    #[must_use]
103    pub fn block_size(mut self, bytes: u32) -> Self {
104        self.inner.block_size = bytes;
105        self
106    }
107
108    /// Set the DEFLATE compression level (0–9).
109    #[must_use]
110    pub fn compression_level(mut self, level: u8) -> Self {
111        self.inner.compression_level = level;
112        self
113    }
114
115    /// Set the maximum expansion ratio for incompressible block detection.
116    #[must_use]
117    pub fn max_expansion_ratio(mut self, ratio: f64) -> Self {
118        self.inner.max_expansion_ratio = ratio;
119        self
120    }
121
122    /// Set the maximum decompression expansion ratio.
123    #[must_use]
124    pub fn max_decompression_ratio(mut self, ratio: f64) -> Self {
125        self.inner.max_decompression_ratio = ratio;
126        self
127    }
128
129    /// Enable or disable per-block CRC32 checksums.
130    #[must_use]
131    pub fn checksums(mut self, enabled: bool) -> Self {
132        self.inner.checksums = enabled;
133        self
134    }
135
136    /// Attach a progress callback.
137    #[must_use]
138    pub fn progress(mut self, cb: Arc<Mutex<ProgressCallback>>) -> Self {
139        self.inner.progress = Some(cb);
140        self
141    }
142
143    /// Validate all fields and return the built configuration.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`crush_core::error::CrushError::InvalidConfig`] if any field is out of range.
148    pub fn build(self) -> crush_core::error::Result<EngineConfiguration> {
149        use crush_core::error::CrushError;
150        let mut cfg = self.inner;
151        if cfg.block_size < 65_536 || cfg.block_size > 268_435_456 {
152            return Err(CrushError::InvalidConfig(format!(
153                "block_size {} is out of range [65536, 268435456]",
154                cfg.block_size
155            )));
156        }
157        if cfg.compression_level > 9 {
158            return Err(CrushError::InvalidConfig(format!(
159                "compression_level {} must be in [0, 9]",
160                cfg.compression_level
161            )));
162        }
163        if cfg.max_expansion_ratio <= 0.0 {
164            return Err(CrushError::InvalidConfig(
165                "max_expansion_ratio must be > 0.0".to_owned(),
166            ));
167        }
168        if cfg.max_decompression_ratio <= 0.0 {
169            return Err(CrushError::InvalidConfig(
170                "max_decompression_ratio must be > 0.0".to_owned(),
171            ));
172        }
173        if cfg.workers > 0 {
174            let pool = rayon::ThreadPoolBuilder::new()
175                .num_threads(cfg.workers)
176                .build()
177                .map_err(|e| CrushError::InvalidConfig(format!("thread pool: {e}")))?;
178            cfg.thread_pool = Some(Arc::new(pool));
179        }
180        Ok(cfg)
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    #[test]
189    fn test_engine_configuration_builder_validates_fields() {
190        // block_size too small
191        let err = EngineConfiguration::builder().block_size(1024).build();
192        assert!(err.is_err());
193
194        // block_size too large
195        let err = EngineConfiguration::builder().block_size(u32::MAX).build();
196        assert!(err.is_err());
197
198        // invalid compression level
199        let err = EngineConfiguration::builder().compression_level(10).build();
200        assert!(err.is_err());
201
202        // zero expansion ratio
203        let err = EngineConfiguration::builder()
204            .max_expansion_ratio(0.0)
205            .build();
206        assert!(err.is_err());
207
208        // zero decompression ratio
209        let err = EngineConfiguration::builder()
210            .max_decompression_ratio(0.0)
211            .build();
212        assert!(err.is_err());
213
214        // valid configuration
215        let ok = EngineConfiguration::builder()
216            .workers(4)
217            .block_size(1_048_576)
218            .compression_level(6)
219            .build();
220        assert!(ok.is_ok());
221    }
222}