use crate::algorithm::{select_algorithm, DEFAULT_PARALLEL_THRESHOLD_BYTES};
use crate::cli::CompressArgs;
use crate::commands::utils;
use crate::error::{CliError, Result};
use crate::output::{self, CompressionResult};
use crush_core::cancel::CancellationToken;
use crush_core::plugin::FileMetadata;
use crush_core::{compress_with_options, CompressionOptions};
use filetime::FileTime;
use indicatif::{ProgressBar, ProgressStyle};
use is_terminal::IsTerminal;
use std::fs;
use std::io::{self, Read};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, instrument, trace, warn};
pub fn run(
args: &CompressArgs,
interrupted: Arc<dyn CancellationToken>,
gpu_enabled: bool,
) -> Result<()> {
if args.input.is_empty() {
compress_stdin(args, interrupted, gpu_enabled)?;
} else {
for input_path in &args.input {
compress_file(input_path, args, interrupted.clone(), gpu_enabled)?;
}
}
Ok(())
}
#[instrument(skip(args, interrupted))]
fn compress_stdin(
args: &CompressArgs,
interrupted: Arc<dyn CancellationToken>,
gpu_enabled: bool,
) -> Result<()> {
info!("Compressing from stdin");
if interrupted.is_cancelled() {
return Err(CliError::Interrupted);
}
if !args.stdout && args.output.is_none() {
return Err(CliError::InvalidInput(
"When reading from stdin, either --output or --stdout must be specified".to_string(),
));
}
trace!("Reading from stdin");
let mut input_data = Vec::new();
io::stdin().read_to_end(&mut input_data)?;
let input_size = input_data.len() as u64;
debug!("Read {} bytes from stdin", input_size);
if interrupted.is_cancelled() {
return Err(CliError::Interrupted);
}
let selected_algo = select_algorithm(
None,
args.plugin.as_deref(),
DEFAULT_PARALLEL_THRESHOLD_BYTES,
gpu_enabled,
);
info!(
"Selected algorithm: {} (streaming, input size unknown)",
selected_algo
);
let mut options = CompressionOptions::default()
.with_weights(args.level.to_weights())
.with_cancel_token(Arc::clone(&interrupted));
if selected_algo != "default" {
debug!("Applying plugin selection: {}", selected_algo);
options = options.with_plugin(selected_algo);
}
if let Some(timeout_secs) = args.timeout {
debug!("Setting compression timeout: {} seconds", timeout_secs);
options = options.with_timeout(Duration::from_secs(timeout_secs));
}
let start = Instant::now();
trace!("Starting compression operation");
let compressed_data = compress_with_options(&input_data, &options)?;
let duration = start.elapsed();
debug!(
"Compression completed in {:.3}s, output size: {} bytes",
duration.as_secs_f64(),
compressed_data.len()
);
if interrupted.is_cancelled() {
return Err(CliError::Interrupted);
}
if args.stdout {
trace!("Writing compressed data to stdout");
utils::write_to_stdout(&compressed_data)?;
} else if let Some(ref output_path) = args.output {
trace!("Writing compressed data to {}", output_path.display());
utils::validate_output(output_path, args.force)?;
utils::write_with_cleanup(output_path, &compressed_data)?;
utils::check_cancelled_with_cleanup(&interrupted, output_path)?;
}
let output_size = compressed_data.len() as u64;
let compression_ratio = utils::calculate_compression_ratio(input_size, output_size);
let throughput_mbps = utils::calculate_throughput_mbps(input_size, duration);
debug!(
input_size,
output_size,
compression_ratio,
throughput_mbps,
plugin = %selected_algo,
"Stdin compression: throughput {:.2} MB/s, ratio {:.1}%",
throughput_mbps,
compression_ratio
);
info!(
output_size,
compression_ratio,
throughput_mbps,
duration_secs = duration.as_secs_f64(),
plugin = %selected_algo,
"Compressed stdin: {} bytes -> {} bytes ({:.1}% reduction) in {:.3}s at {:.2} MB/s",
input_size,
output_size,
100.0 - compression_ratio,
duration.as_secs_f64(),
throughput_mbps
);
Ok(())
}
#[instrument(skip(args, interrupted), fields(file = %input_path.display()))]
fn compress_file(
input_path: &Path,
args: &CompressArgs,
interrupted: Arc<dyn CancellationToken>,
gpu_enabled: bool,
) -> Result<()> {
info!("Starting compression of {}", input_path.display());
utils::check_cancelled(&interrupted)?;
utils::validate_input(input_path)?;
let output_path = determine_output_path(input_path, &args.output)?;
utils::validate_output(&output_path, args.force)?;
let file_metadata = fs::metadata(input_path)?;
let mtime = FileTime::from_last_modification_time(&file_metadata);
let input_size = file_metadata.len();
if !args.stdout {
crate::feedback::show_cancel_hint(crate::feedback::should_show_hint(input_size));
}
let show_progress = std::io::stderr().is_terminal() && !args.stdout;
let spinner = if show_progress && input_size > 1024 * 1024 {
let pb = ProgressBar::new(input_size);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} Compressing {msg} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.expect("Invalid progress bar template")
.progress_chars("=>-"),
);
pb.set_message(input_path.display().to_string());
pb.enable_steady_tick(Duration::from_millis(100));
Some(pb)
} else {
None
};
if args.gpu_device.is_some() {
let is_gpu_plugin = args
.plugin
.as_deref()
.is_some_and(|p| p.eq_ignore_ascii_case("gpu-deflate"));
if !is_gpu_plugin {
warn!("--gpu-device has no effect without --plugin gpu-deflate");
}
}
let selected_algo = select_algorithm(
Some(input_size),
args.plugin.as_deref(),
DEFAULT_PARALLEL_THRESHOLD_BYTES,
gpu_enabled,
);
info!(
"Selected algorithm: {} for {} byte input (threshold: {} bytes)",
selected_algo, input_size, DEFAULT_PARALLEL_THRESHOLD_BYTES
);
let file_meta = FileMetadata {
mtime: Some(mtime.unix_seconds()),
#[cfg(unix)]
permissions: {
use std::os::unix::fs::PermissionsExt;
Some(file_metadata.permissions().mode())
},
};
let mut options = CompressionOptions::default()
.with_weights(args.level.to_weights())
.with_file_metadata(file_meta)
.with_cancel_token(Arc::clone(&interrupted));
if selected_algo != "default" {
debug!("Applying plugin selection: {}", selected_algo);
options = options.with_plugin(selected_algo);
}
if let Some(timeout_secs) = args.timeout {
debug!("Setting compression timeout: {} seconds", timeout_secs);
options = options.with_timeout(Duration::from_secs(timeout_secs));
}
trace!("Reading input file: {}", input_path.display());
let input_data = fs::read(input_path)?;
debug!("Read {} bytes from input file", input_data.len());
utils::check_cancelled(&interrupted)?;
let start = Instant::now();
trace!("Starting compression operation");
let compressed_data = compress_with_options(&input_data, &options)?;
let duration = start.elapsed();
debug!(
"Compression completed in {:.3}s, output size: {} bytes",
duration.as_secs_f64(),
compressed_data.len()
);
if let Some(pb) = spinner {
pb.finish_and_clear();
}
if interrupted.is_cancelled() {
return Err(CliError::Interrupted);
}
if args.stdout {
trace!("Writing compressed data to stdout");
utils::write_to_stdout(&compressed_data)?;
} else {
if let Err(e) = fs::write(&output_path, &compressed_data) {
let _ = fs::remove_file(&output_path);
return Err(e.into());
}
if interrupted.is_cancelled() {
let _ = fs::remove_file(&output_path);
return Err(CliError::Interrupted);
}
}
let output_size = compressed_data.len() as u64;
let compression_ratio = utils::calculate_compression_ratio(input_size, output_size);
let throughput_mbps = utils::calculate_throughput_mbps(input_size, duration);
let plugin_used = selected_algo.to_string();
debug!(
input_size,
output_size,
compression_ratio,
throughput_mbps,
plugin = %plugin_used,
"Performance metrics - throughput: {:.2} MB/s, compression ratio: {:.1}%, plugin: {}",
throughput_mbps,
compression_ratio,
plugin_used
);
let size_reduction = 100.0 - compression_ratio;
info!(
input_path = %input_path.display(),
output_path = %output_path.display(),
input_size,
output_size,
compression_ratio,
throughput_mbps,
duration_secs = duration.as_secs_f64(),
plugin = %plugin_used,
"Compressed {} -> {} ({:.1}% {}) in {:.3}s at {:.2} MB/s",
input_path.display(),
output_path.display(),
size_reduction.abs(),
if size_reduction > 0.0 { "smaller" } else { "larger" },
duration.as_secs_f64(),
throughput_mbps
);
if !args.stdout {
let result = CompressionResult {
input_path: input_path.to_path_buf(),
output_path: output_path.clone(),
input_size,
output_size,
compression_ratio,
duration,
throughput_mbps,
plugin_used,
};
output::format_compression_result(&result, show_progress);
}
Ok(())
}
fn determine_output_path(input: &Path, output_arg: &Option<PathBuf>) -> Result<PathBuf> {
if let Some(output) = output_arg {
if output.is_dir() {
let filename = input
.file_name()
.ok_or_else(|| CliError::InvalidInput("Invalid input filename".to_string()))?;
Ok(output.join(filename).with_extension("crush"))
} else {
Ok(output.clone())
}
} else {
let mut output = input.to_path_buf();
let current_ext = output.extension().and_then(|s| s.to_str()).unwrap_or("");
let new_ext = if current_ext.is_empty() {
"crush".to_string()
} else {
format!("{}.crush", current_ext)
};
output.set_extension(new_ext);
Ok(output)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn test_determine_output_path_default_adds_crush_ext() {
let result = determine_output_path(Path::new("data.txt"), &None).expect("ok");
assert_eq!(result, PathBuf::from("data.txt.crush"));
}
#[test]
fn test_determine_output_path_no_extension() {
let result = determine_output_path(Path::new("data"), &None).expect("ok");
assert_eq!(result, PathBuf::from("data.crush"));
}
#[test]
fn test_determine_output_path_explicit_file() {
let output = Some(PathBuf::from("out.bin"));
let result = determine_output_path(Path::new("data.txt"), &output).expect("ok");
assert_eq!(result, PathBuf::from("out.bin"));
}
#[test]
fn test_determine_output_path_to_directory() {
let dir = tempfile::tempdir().expect("tempdir");
let output = Some(dir.path().to_path_buf());
let result = determine_output_path(Path::new("data.txt"), &output).expect("ok");
assert_eq!(result, dir.path().join("data.crush"));
}
#[test]
fn test_determine_output_path_with_parent_dir() {
let result = determine_output_path(Path::new("/tmp/data.log"), &None).expect("ok");
assert_eq!(result, PathBuf::from("/tmp/data.log.crush"));
}
#[test]
fn test_determine_output_path_double_extension() {
let result = determine_output_path(Path::new("archive.tar.gz"), &None).expect("ok");
assert_eq!(result, PathBuf::from("archive.tar.gz.crush"));
}
}