cmprss 0.4.0

A compression multi-tool for the command line.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
use crate::utils::{
    CmprssInput, CmprssOutput, Compressor, ExtractedTarget, ReadWrapper, Result, WriteWrapper,
};
use anyhow::{anyhow, bail};
use std::io::{self, Read, Write};
use std::path::Path;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;

/// A pipeline of one or more compressors applied in sequence (e.g., tar.gz)
pub struct Pipeline {
    // The chain of compressors to apply in order (innermost to outermost)
    compressors: Vec<Box<dyn Compressor>>,
    /// Preserves the user's original format string (e.g. `tgz`) so default
    /// filenames use it verbatim instead of the dotted composition of each
    /// stage's extension. `None` falls back to joining the per-stage
    /// extensions.
    format_override: Option<String>,
}

impl Clone for Pipeline {
    fn clone(&self) -> Self {
        Pipeline {
            compressors: self.compressors.iter().map(|c| c.clone_boxed()).collect(),
            format_override: self.format_override.clone(),
        }
    }
}

/// Which method intermediate (threaded) stages should invoke. The final stage
/// always runs on the calling thread and is handled by a caller-supplied
/// closure — only the intermediate layers need this dispatch.
#[derive(Clone, Copy)]
enum StageAction {
    Compress,
    Extract,
}

impl Pipeline {
    /// Create a new Pipeline with the given compressors
    pub fn new(compressors: Vec<Box<dyn Compressor>>) -> Self {
        Pipeline {
            compressors,
            format_override: None,
        }
    }

    /// Create a Pipeline that keeps `format` as its canonical format string,
    /// used for default output filenames. Intended for shortcut forms like
    /// `tgz` where the user-facing extension differs from the dotted chain.
    pub fn with_format(compressors: Vec<Box<dyn Compressor>>, format: String) -> Self {
        Pipeline {
            compressors,
            format_override: Some(format),
        }
    }

    /// Get a string representation of the chained format (e.g., "tar.gz")
    fn format_chain(&self) -> String {
        if let Some(ref f) = self.format_override {
            return f.clone();
        }
        self.compressors
            .iter()
            .map(|c| c.extension())
            .collect::<Vec<&str>>()
            .join(".")
    }

    /// Run an ordered chain of compressor stages, with each non-final stage
    /// in its own thread linked by an in-memory pipe. The final (last) stage
    /// runs on the calling thread via `finalize`. Intermediate stages all
    /// invoke the same method — `compress` going outward through a
    /// compression pipeline, `extract` unwrapping layers on the way in.
    fn run_threaded<F>(
        stages: Vec<Box<dyn Compressor>>,
        initial_input: CmprssInput,
        intermediate: StageAction,
        finalize: F,
    ) -> Result
    where
        F: FnOnce(Box<dyn Compressor>, CmprssInput) -> Result,
    {
        debug_assert!(!stages.is_empty(), "pipeline is never empty");
        let mut stages = stages;
        let last = stages.pop().expect("pipeline is never empty");
        let buffer_size = 64 * 1024;
        let mut current_input = initial_input;
        let mut handles = Vec::new();

        for stage in stages {
            let (sender, receiver) = channel::<Vec<u8>>();
            let stage_output =
                CmprssOutput::Writer(WriteWrapper(Box::new(PipeWriter::new(sender, buffer_size))));
            let next_input = CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver))));
            let stage_input = std::mem::replace(&mut current_input, next_input);

            let handle = thread::spawn(move || match intermediate {
                StageAction::Compress => stage.compress(stage_input, stage_output),
                StageAction::Extract => stage.extract(stage_input, stage_output),
            });
            handles.push(handle);
        }

        finalize(last, current_input)?;

        for handle in handles {
            handle
                .join()
                .map_err(|_| anyhow!("Pipeline stage thread panicked"))??;
        }
        Ok(())
    }
}

/// A reader that reads from a receiver channel
struct PipeReader {
    receiver: Receiver<Vec<u8>>,
    buffer: Vec<u8>,
    position: usize,
    eof: bool,
}

impl PipeReader {
    fn new(receiver: Receiver<Vec<u8>>) -> Self {
        PipeReader {
            receiver,
            buffer: Vec::new(),
            position: 0,
            eof: false,
        }
    }
}

impl Read for PipeReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // If we've reached EOF, return 0 to signal that
        if self.eof && self.position >= self.buffer.len() {
            return Ok(0);
        }

        // If we've consumed the current buffer, try to get a new one
        if self.position >= self.buffer.len() {
            match self.receiver.recv() {
                Ok(data) => {
                    // Empty data signals EOF from the writer
                    if data.is_empty() {
                        self.eof = true;
                        return Ok(0);
                    }
                    self.buffer = data;
                    self.position = 0;
                }
                Err(_) => {
                    // Channel closed, this means EOF
                    self.eof = true;
                    return Ok(0);
                }
            }
        }

        // Copy data from our buffer to the output buffer
        let available = self.buffer.len() - self.position;
        let to_copy = available.min(buf.len());
        buf[..to_copy].copy_from_slice(&self.buffer[self.position..self.position + to_copy]);
        self.position += to_copy;
        Ok(to_copy)
    }
}

/// A writer that writes to a sender channel
struct PipeWriter {
    sender: Sender<Vec<u8>>,
    buffer_size: usize,
}

impl PipeWriter {
    fn new(sender: Sender<Vec<u8>>, buffer_size: usize) -> Self {
        PipeWriter {
            sender,
            buffer_size,
        }
    }
}

impl Write for PipeWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        // Split the input into chunks of buffer_size
        let mut start = 0;
        while start < buf.len() {
            let end = (start + self.buffer_size).min(buf.len());
            let chunk = Vec::from(&buf[start..end]);

            // Send the chunk through the channel
            if self.sender.send(chunk).is_err() {
                // If the receiver is gone, report an error
                return Err(io::Error::new(
                    io::ErrorKind::BrokenPipe,
                    "Pipe receiver has been closed",
                ));
            }
            start = end;
        }
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        // No need to flush, the channel sends immediately
        Ok(())
    }
}

impl Drop for PipeWriter {
    fn drop(&mut self) {
        // Send an empty buffer to signal EOF
        let _ = self.sender.send(Vec::new());
    }
}

impl Compressor for Pipeline {
    fn name(&self) -> &str {
        self.compressors
            .last()
            .expect("pipeline is never empty")
            .name()
    }

    fn extension(&self) -> &str {
        self.compressors
            .last()
            .expect("pipeline is never empty")
            .extension()
    }

    fn default_extracted_target(&self) -> ExtractedTarget {
        self.compressors
            .first()
            .expect("pipeline is never empty")
            .default_extracted_target()
    }

    fn default_compressed_filename(&self, in_path: &Path) -> String {
        // Add all extensions: input.txt → input.txt.tar.gz
        let base = in_path
            .file_name()
            .map(|n| n.to_string_lossy().into_owned())
            .unwrap_or_else(|| "archive".to_string());
        format!("{}.{}", base, self.format_chain())
    }

    fn default_extracted_filename(&self, in_path: &Path) -> String {
        if self.default_extracted_target() == ExtractedTarget::Directory {
            return ".".to_string();
        }
        // Strip all known extensions: input.tar.gz → input
        let mut name = in_path
            .file_name()
            .map(|n| n.to_string_lossy().into_owned())
            .unwrap_or_else(|| "archive".to_string());
        for comp in self.compressors.iter().rev() {
            let ext = format!(".{}", comp.extension());
            if let Some(stripped) = name.strip_suffix(&ext) {
                name = stripped.to_string();
            }
        }
        name
    }

    fn is_archive(&self, in_path: &Path) -> bool {
        let file_name = match in_path.file_name().and_then(|f| f.to_str()) {
            Some(f) => f,
            None => return false,
        };
        file_name.ends_with(&format!(".{}", self.format_chain()))
    }

    fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result {
        debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
        if self.compressors.len() == 1 {
            return self.compressors[0].compress(input, output);
        }
        // Innermost → outermost: the outermost compressor runs on the main
        // thread and writes to the user-supplied output.
        let stages = self.compressors.iter().map(|c| c.clone_boxed()).collect();
        Self::run_threaded(stages, input, StageAction::Compress, |last, input| {
            last.compress(input, output)
        })
    }

    fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result {
        debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
        if self.compressors.len() == 1 {
            return self.compressors[0].extract(input, output);
        }
        // Outermost → innermost: the innermost extractor (typically the
        // container format like tar/zip) runs on the main thread so it can
        // unpack into the user-supplied output.
        let stages = self
            .compressors
            .iter()
            .rev()
            .map(|c| c.clone_boxed())
            .collect();
        Self::run_threaded(stages, input, StageAction::Extract, |last, input| {
            let final_output = match output {
                CmprssOutput::Path(ref p) => {
                    // If the innermost extractor wants a directory and the
                    // user's output path doesn't exist yet, create it so
                    // e.g. tar::unpack has somewhere to write.
                    if last.default_extracted_target() == ExtractedTarget::Directory && !p.exists()
                    {
                        std::fs::create_dir_all(p)?;
                    }
                    CmprssOutput::Path(p.clone())
                }
                CmprssOutput::Pipe(_) | CmprssOutput::Writer(_) => output,
            };
            last.extract(input, final_output)
        })
    }

    fn append(&self, input: CmprssInput, output: CmprssOutput) -> Result {
        debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
        if self.compressors.len() == 1 {
            // Single-stage pipelines are just a wrapper; delegate so tar/zip
            // reached via positional-path inference still support --append.
            return self.compressors[0].append(input, output);
        }
        bail!(
            "cannot --append to a compound archive ({}); it would require decompressing and recompressing the whole archive",
            self.format_chain()
        )
    }

    fn list(&self, input: CmprssInput) -> Result {
        debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
        if self.compressors.len() == 1 {
            return self.compressors[0].list(input);
        }
        // Same plumbing as `extract`, except the innermost compressor lists
        // its entries to stdout instead of unpacking. Outer layers still
        // decompress into the in-memory pipe so the innermost container sees
        // plain archive bytes.
        let stages = self
            .compressors
            .iter()
            .rev()
            .map(|c| c.clone_boxed())
            .collect();
        Self::run_threaded(stages, input, StageAction::Extract, |innermost, input| {
            innermost.list(input)
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;
    use tempfile::tempdir;

    #[test]
    fn test_pipeline_compression() -> Result {
        let temp_dir = tempdir()?;

        let test_content = "This is a test file for pipeline compression";
        let test_file_path = temp_dir.path().join("test.txt");
        fs::write(&test_file_path, test_content)?;

        let pipeline = Pipeline::new(vec![
            Box::new(crate::backends::Tar::default()),
            Box::new(crate::backends::Gzip::default()),
        ]);

        let archive_path = temp_dir.path().join("test.tar.gz");
        pipeline.compress(
            CmprssInput::Path(vec![test_file_path.clone()]),
            CmprssOutput::Path(archive_path.clone()),
        )?;

        assert!(archive_path.exists());

        let output_dir = temp_dir.path().join("extracted");
        fs::create_dir(&output_dir)?;
        pipeline.extract(
            CmprssInput::Path(vec![archive_path.clone()]),
            CmprssOutput::Path(output_dir.clone()),
        )?;

        let extracted_file = output_dir.join("test.txt");
        assert!(extracted_file.exists());

        let extracted_content = fs::read_to_string(extracted_file)?;
        assert_eq!(extracted_content, test_content);

        Ok(())
    }

    /// Regression test: per-stage configuration (e.g. `--level 1` vs
    /// `--level 9` on the outer gzip of a `.tar.gz`) must survive the
    /// thread-dispatch in `Pipeline::compress`. Previously the pipeline
    /// reconstructed each stage from its *name* alone, producing a default
    /// Gzip regardless of the level the user requested.
    #[test]
    fn test_pipeline_preserves_stage_config() -> Result {
        use crate::progress::ProgressArgs;

        let temp_dir = tempdir()?;
        let input = temp_dir.path().join("input.txt");
        // Repetitive content amplifies the level difference in output size.
        fs::write(&input, "0123456789abcdef".repeat(1024))?;

        let run = |level: i32, suffix: &str| -> Result<u64> {
            let fast = Pipeline::new(vec![
                Box::new(crate::backends::Tar::default()),
                Box::new(crate::backends::Gzip {
                    compression_level: level,
                    progress_args: ProgressArgs::default(),
                }),
            ]);
            let out = temp_dir.path().join(format!("out.{suffix}.tar.gz"));
            fast.compress(
                CmprssInput::Path(vec![input.clone()]),
                CmprssOutput::Path(out.clone()),
            )?;
            Ok(fs::metadata(&out)?.len())
        };

        let fast_size = run(1, "fast")?;
        let best_size = run(9, "best")?;
        assert!(
            best_size < fast_size,
            "expected best (level 9) to be smaller than fast (level 1), got {best_size} >= {fast_size}",
        );

        Ok(())
    }
}