use crate::cli::DecompressArgs;
use crate::commands::utils;
use crate::error::{CliError, Result};
use crate::output::{self, DecompressionResult};
use crush_core::cancel::CancellationToken;
use crush_core::decompress_with_cancel;
use crush_core::plugin::FileMetadata;
use filetime::{set_file_mtime, FileTime};
use indicatif::{ProgressBar, ProgressStyle};
use is_terminal::IsTerminal;
use std::fs;
use std::io::{self, Cursor, Read};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, instrument, trace};
fn decompress_single_block(
compressed_data: &[u8],
block_n: u64,
) -> Result<(Vec<u8>, FileMetadata)> {
if compressed_data.len() >= 4 && &compressed_data[0..4] == b"CRSH" {
let mut cursor = Cursor::new(compressed_data);
let index = crush_parallel::load_index(&mut cursor)?;
if block_n >= index.len() {
return Err(CliError::InvalidInput(format!(
"Block {} does not exist (file has {} blocks)",
block_n,
index.len()
)));
}
let config = crush_parallel::EngineConfiguration::default();
let block_data = crush_parallel::decompress_block(&mut cursor, &index, block_n, &config)?;
debug!(
"Decompressed block {} ({} bytes)",
block_n,
block_data.len()
);
Ok((block_data, FileMetadata::default()))
} else {
Err(CliError::InvalidInput(
"Random access (--block) is only supported for parallel-deflate (.crsh) files"
.to_string(),
))
}
}
pub fn run(
args: &DecompressArgs,
interrupted: Arc<dyn CancellationToken>,
cancel_flag: Arc<std::sync::atomic::AtomicBool>,
) -> Result<()> {
if args.input.is_empty() {
if args.stdout {
decompress_stdin(args, interrupted, cancel_flag)?;
} else {
return Err(CliError::InvalidInput(
"No input files specified. Use --stdout with stdin, or provide file paths."
.to_string(),
));
}
} else {
for input_path in &args.input {
decompress_file(input_path, args, interrupted.clone(), cancel_flag.clone())?;
}
}
Ok(())
}
#[instrument(skip(_args, interrupted, cancel_flag))]
fn decompress_stdin(
_args: &DecompressArgs,
interrupted: Arc<dyn CancellationToken>,
cancel_flag: Arc<std::sync::atomic::AtomicBool>,
) -> Result<()> {
info!("Decompressing from stdin");
utils::check_cancelled(&interrupted)?;
trace!("Reading from stdin");
let mut compressed_data = Vec::new();
io::stdin().read_to_end(&mut compressed_data)?;
let input_size = compressed_data.len() as u64;
debug!("Read {} bytes from stdin", input_size);
utils::check_cancelled(&interrupted)?;
let detected_plugin = if compressed_data.len() >= 4 {
let magic = [
compressed_data[0],
compressed_data[1],
compressed_data[2],
compressed_data[3],
];
crush_core::list_plugins()
.iter()
.find(|p| p.magic_number == magic)
.map(|p| p.name.to_string())
.unwrap_or_else(|| "unknown".to_string())
} else {
"unknown".to_string()
};
info!(
plugin = %detected_plugin,
input_size,
"Decompressing stdin with plugin '{}' ({} bytes)",
detected_plugin,
input_size
);
let start = Instant::now();
trace!("Starting decompression operation");
let result = decompress_with_cancel(&compressed_data, cancel_flag)?;
let decompressed_data = result.data;
let duration = start.elapsed();
debug!(
"Decompression completed in {:.3}s, output size: {} bytes",
duration.as_secs_f64(),
decompressed_data.len()
);
utils::check_cancelled(&interrupted)?;
trace!("Writing decompressed data to stdout");
utils::write_to_stdout(&decompressed_data)?;
let output_size = decompressed_data.len() as u64;
let throughput_mbps = utils::calculate_throughput_mbps(output_size, duration);
debug!(
input_size,
output_size, throughput_mbps, "Stdin decompression: throughput {:.2} MB/s", throughput_mbps
);
info!(
input_size,
output_size,
throughput_mbps,
duration_secs = duration.as_secs_f64(),
"Decompressed stdin: {} bytes -> {} bytes in {:.3}s at {:.2} MB/s",
input_size,
output_size,
duration.as_secs_f64(),
throughput_mbps
);
Ok(())
}
#[instrument(skip(args, interrupted, cancel_flag), fields(file = %input_path.display()))]
fn decompress_file(
input_path: &Path,
args: &DecompressArgs,
interrupted: Arc<dyn CancellationToken>,
cancel_flag: Arc<std::sync::atomic::AtomicBool>,
) -> Result<()> {
info!("Starting decompression of {}", input_path.display());
utils::check_cancelled(&interrupted)?;
utils::validate_input(input_path)?;
let output_path = if !args.stdout {
let path = determine_output_path(input_path, &args.output)?;
utils::validate_output(&path, args.force)?;
path
} else {
PathBuf::new()
};
let input_size = fs::metadata(input_path)?.len();
if !args.stdout {
crate::feedback::show_cancel_hint(crate::feedback::should_show_hint(input_size));
}
let show_progress = std::io::stderr().is_terminal() && !args.stdout;
let spinner = if show_progress && input_size > 1024 * 1024 {
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} Decompressing {msg}...")
.expect("Invalid spinner template"),
);
pb.set_message(input_path.display().to_string());
pb.enable_steady_tick(Duration::from_millis(100));
Some(pb)
} else {
None
};
trace!("Reading compressed file: {}", input_path.display());
let compressed_data = fs::read(input_path)?;
debug!("Read {} bytes from compressed file", compressed_data.len());
utils::check_cancelled(&interrupted)?;
let start = Instant::now();
let detected_plugin = if compressed_data.len() >= 4 {
let magic = [
compressed_data[0],
compressed_data[1],
compressed_data[2],
compressed_data[3],
];
crush_core::list_plugins()
.iter()
.find(|p| p.magic_number == magic)
.map(|p| p.name.to_string())
.unwrap_or_else(|| "unknown".to_string())
} else {
"unknown".to_string()
};
info!(
plugin = %detected_plugin,
input_size,
"Decompressing with plugin '{}' ({} bytes)",
detected_plugin,
input_size
);
if args.force_cpu && detected_plugin == "gpu-deflate" {
info!("--force-cpu active: using CPU fallback for GPU-compressed file");
}
let (decompressed_data, metadata) = if let Some(block_n) = args.block {
trace!("Starting random access decompression for block {}", block_n);
decompress_single_block(&compressed_data, block_n)?
} else {
trace!("Starting full decompression operation");
let result = decompress_with_cancel(&compressed_data, cancel_flag)?;
(result.data, result.metadata)
};
let duration = start.elapsed();
debug!(
"Decompression completed in {:.3}s, output size: {} bytes",
duration.as_secs_f64(),
decompressed_data.len()
);
if let Some(pb) = spinner {
pb.finish_and_clear();
}
utils::check_cancelled(&interrupted)?;
if args.stdout {
utils::write_to_stdout(&decompressed_data)?;
} else {
utils::write_with_cleanup(&output_path, &decompressed_data)?;
utils::check_cancelled_with_cleanup(&interrupted, &output_path)?;
if let Some(mtime_secs) = metadata.mtime {
trace!("Restoring modification time: {}", mtime_secs);
let mtime = FileTime::from_unix_time(mtime_secs, 0);
if let Err(e) = set_file_mtime(&output_path, mtime) {
debug!("Could not set modification time: {}", e);
output::format_warning(
&format!(
"Could not set modification time for {}: {}",
output_path.display(),
e
),
true,
);
} else {
debug!("Successfully restored modification time");
}
}
#[cfg(unix)]
if let Some(permissions_mode) = metadata.permissions {
trace!("Restoring Unix permissions: {:o}", permissions_mode);
use std::os::unix::fs::PermissionsExt;
let permissions = std::fs::Permissions::from_mode(permissions_mode);
if let Err(e) = std::fs::set_permissions(&output_path, permissions) {
debug!("Could not restore Unix permissions: {}", e);
output::format_warning(
&format!(
"Could not restore Unix permissions for {}: {}",
output_path.display(),
e
),
true,
);
} else {
debug!(
"Successfully restored Unix permissions: {:o}",
permissions_mode
);
}
}
let output_size = decompressed_data.len() as u64;
let throughput_mbps = utils::calculate_throughput_mbps(output_size, duration);
debug!(
input_size,
output_size,
throughput_mbps,
"Performance metrics - throughput: {:.2} MB/s",
throughput_mbps
);
info!(
input_path = %input_path.display(),
output_path = %output_path.display(),
input_size,
output_size,
throughput_mbps,
duration_secs = duration.as_secs_f64(),
crc_valid = true,
"Decompressed {} -> {} in {:.3}s at {:.2} MB/s",
input_path.display(),
output_path.display(),
duration.as_secs_f64(),
throughput_mbps
);
let decomp_result = DecompressionResult {
input_path: input_path.to_path_buf(),
output_path: output_path.clone(),
input_size,
output_size,
duration,
throughput_mbps,
crc_valid: true, };
output::format_decompression_result(&decomp_result, show_progress);
}
Ok(())
}
fn determine_output_path(input: &Path, output_arg: &Option<PathBuf>) -> Result<PathBuf> {
if let Some(output) = output_arg {
if output.is_dir() {
let filename = strip_crush_extension(input)?;
Ok(output.join(filename))
} else {
Ok(output.clone())
}
} else {
let output_filename = strip_crush_extension(input)?;
if let Some(parent) = input.parent() {
Ok(parent.join(output_filename))
} else {
Ok(output_filename)
}
}
}
fn strip_crush_extension(path: &Path) -> Result<PathBuf> {
let filename = path
.file_name()
.and_then(|s| s.to_str())
.ok_or_else(|| CliError::InvalidInput("Invalid filename".to_string()))?;
if let Some(base_name) = filename.strip_suffix(".crush") {
Ok(PathBuf::from(base_name))
} else {
path.file_stem().map(PathBuf::from).ok_or_else(|| {
CliError::InvalidInput(format!(
"Cannot determine output filename for {}",
path.display()
))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn test_strip_crush_extension_dot_crush() {
let result = strip_crush_extension(Path::new("data.txt.crush")).expect("ok");
assert_eq!(result, PathBuf::from("data.txt"));
}
#[test]
fn test_strip_crush_extension_no_crush() {
let result = strip_crush_extension(Path::new("data.txt.gz")).expect("ok");
assert_eq!(result, PathBuf::from("data.txt"));
}
#[test]
fn test_strip_crush_extension_just_crush() {
let result = strip_crush_extension(Path::new("archive.crush")).expect("ok");
assert_eq!(result, PathBuf::from("archive"));
}
#[test]
fn test_strip_crush_extension_no_extension() {
let result = strip_crush_extension(Path::new("data")).expect("ok");
assert_eq!(result, PathBuf::from("data"));
}
#[test]
fn test_determine_output_path_default_strips_crush() {
let result = determine_output_path(Path::new("data.txt.crush"), &None).expect("ok");
assert_eq!(result, PathBuf::from("data.txt"));
}
#[test]
fn test_determine_output_path_with_parent() {
let result = determine_output_path(Path::new("/tmp/archive.crush"), &None).expect("ok");
assert_eq!(result, PathBuf::from("/tmp/archive"));
}
#[test]
fn test_determine_output_path_explicit_file() {
let output = Some(PathBuf::from("restored.txt"));
let result = determine_output_path(Path::new("data.txt.crush"), &output).expect("ok");
assert_eq!(result, PathBuf::from("restored.txt"));
}
#[test]
fn test_determine_output_path_to_directory() {
let dir = tempfile::tempdir().expect("tempdir");
let output = Some(dir.path().to_path_buf());
let result = determine_output_path(Path::new("data.txt.crush"), &output).expect("ok");
assert_eq!(result, dir.path().join("data.txt"));
}
#[test]
fn test_determine_output_path_non_crush_ext() {
let result = determine_output_path(Path::new("data.bin"), &None).expect("ok");
assert_eq!(result, PathBuf::from("data"));
}
#[test]
fn test_decompress_single_block_non_crsh_format() {
let data = b"NOT_CRSH_data";
let result = decompress_single_block(data, 0);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("Random access"));
}
#[test]
fn test_decompress_single_block_too_short() {
let data = b"CR"; let result = decompress_single_block(data, 0);
assert!(result.is_err());
}
}