use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Write as _};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::Mutex;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use nzb_postproc::find_unrar;
const UNRAR_ERROR_PATTERNS: &[&str] = &[
"CRC failed",
"checksum failed",
"Cannot create",
"Cannot open",
"password is incorrect",
"Incorrect password",
"in the encrypted file",
"not enough space on the disk",
"Write error",
"checksum error",
"start extraction from a previous volume",
"Unexpected end of archive",
];
#[derive(Debug, Clone)]
pub struct DirectUnpackResult {
pub set_name: String,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug)]
struct RarSetState {
#[allow(dead_code)]
set_name: String,
volumes: BTreeMap<u32, PathBuf>,
}
#[derive(Debug)]
struct DirectUnpackState {
sets: BTreeMap<String, RarSetState>,
download_finished: bool,
}
pub struct DirectUnpacker {
volume_ready: Arc<Notify>,
state: Arc<Mutex<DirectUnpackState>>,
killed: Arc<AtomicBool>,
task: JoinHandle<Vec<DirectUnpackResult>>,
}
impl DirectUnpacker {
pub fn new(work_dir: &Path, output_dir: &Path, password: Option<String>) -> Option<Self> {
let unrar_bin = find_unrar()?;
let state = Arc::new(Mutex::new(DirectUnpackState {
sets: BTreeMap::new(),
download_finished: false,
}));
let volume_ready = Arc::new(Notify::new());
let killed = Arc::new(AtomicBool::new(false));
let task = {
let state = Arc::clone(&state);
let volume_ready = Arc::clone(&volume_ready);
let killed = Arc::clone(&killed);
let work_dir = work_dir.to_path_buf();
let output_dir = output_dir.to_path_buf();
tokio::task::spawn_blocking(move || {
run_direct_unpack(
&unrar_bin,
&work_dir,
&output_dir,
password.as_deref(),
&state,
&volume_ready,
&killed,
)
})
};
Some(Self {
volume_ready,
state,
killed,
task,
})
}
pub fn add_volume(&self, set_name: &str, volume_number: u32, path: PathBuf) {
{
let mut state = self.state.lock();
let set = state
.sets
.entry(set_name.to_string())
.or_insert_with(|| RarSetState {
set_name: set_name.to_string(),
volumes: BTreeMap::new(),
});
set.volumes.insert(volume_number, path);
}
self.volume_ready.notify_one();
}
pub fn download_complete(&self) {
{
let mut state = self.state.lock();
state.download_finished = true;
}
self.volume_ready.notify_one();
}
pub fn abort(&self) {
self.killed.store(true, Ordering::Release);
self.volume_ready.notify_one();
}
pub async fn finish(self) -> Vec<DirectUnpackResult> {
self.download_complete();
match self.task.await {
Ok(results) => results,
Err(e) => {
error!(error = %e, "Direct unpack task panicked");
Vec::new()
}
}
}
}
fn run_direct_unpack(
unrar_bin: &str,
_work_dir: &Path,
output_dir: &Path,
password: Option<&str>,
state: &Mutex<DirectUnpackState>,
volume_ready: &Notify,
killed: &AtomicBool,
) -> Vec<DirectUnpackResult> {
let mut results: Vec<DirectUnpackResult> = Vec::new();
let rt = tokio::runtime::Handle::current();
loop {
if killed.load(Ordering::Acquire) {
return results;
}
let first_set = {
let st = state.lock();
st.sets
.iter()
.find(|(name, set)| {
set.volumes.contains_key(&0) && !results.iter().any(|r| r.set_name == **name)
})
.map(|(name, set)| (name.clone(), set.volumes[&0].clone()))
};
if let Some((set_name, first_vol_path)) = first_set {
info!(
set = %set_name,
first_volume = %first_vol_path.display(),
"Starting direct unpack"
);
let result = unpack_set(
unrar_bin,
&set_name,
&first_vol_path,
output_dir,
password,
state,
volume_ready,
killed,
&rt,
);
let success = result.success;
results.push(result);
if !success || killed.load(Ordering::Acquire) {
return results;
}
continue;
}
{
let st = state.lock();
if st.download_finished {
let has_pending = st.sets.iter().any(|(name, set)| {
set.volumes.contains_key(&0) && !results.iter().any(|r| r.set_name == **name)
});
if !has_pending {
break;
}
continue;
}
}
rt.block_on(volume_ready.notified());
}
results
}
#[allow(clippy::too_many_arguments)]
fn unpack_set(
unrar_bin: &str,
set_name: &str,
first_volume: &Path,
output_dir: &Path,
password: Option<&str>,
state: &Mutex<DirectUnpackState>,
volume_ready: &Notify,
killed: &AtomicBool,
rt: &tokio::runtime::Handle,
) -> DirectUnpackResult {
if let Err(e) = std::fs::create_dir_all(output_dir) {
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(format!("Failed to create output dir: {e}")),
};
}
let pw_flag = match password {
Some(pw) => format!("-p{pw}"),
None => "-p-".to_string(),
};
let mut child = match Command::new(unrar_bin)
.args(["x", "-o+", "-y", "-vp"])
.arg(&pw_flag)
.arg(first_volume)
.arg(format!("{}/", output_dir.display()))
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(format!("Failed to spawn unrar: {e}")),
};
}
};
let result = drive_unrar(&mut child, set_name, state, volume_ready, killed, rt);
let _ = child.kill();
let _ = child.wait();
result
}
fn drive_unrar(
child: &mut Child,
set_name: &str,
state: &Mutex<DirectUnpackState>,
volume_ready: &Notify,
killed: &AtomicBool,
rt: &tokio::runtime::Handle,
) -> DirectUnpackResult {
let stdin = child.stdin.take().expect("stdin was piped");
let stdout = child.stdout.take().expect("stdout was piped");
let stdin = Arc::new(Mutex::new(stdin));
let mut reader = BufReader::new(stdout);
let mut next_volume: u32 = 1; let mut extracted_files: Vec<String> = Vec::new();
let mut line_buf = Vec::with_capacity(1024);
loop {
if killed.load(Ordering::Acquire) {
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some("Aborted".to_string()),
};
}
line_buf.clear();
match reader.read_until(b'\n', &mut line_buf) {
Ok(0) => {
break;
}
Ok(_) => {
let line = String::from_utf8_lossy(&line_buf);
let line_trimmed = line.trim();
if let Some(filename) = line_trimmed.strip_prefix("Extracting ") {
let filename = filename.trim();
if !filename.is_empty() {
extracted_files.push(filename.to_string());
}
} else if let Some(filename) = line_trimmed.strip_prefix("... ") {
let filename = filename.trim();
if !filename.is_empty() {
extracted_files.push(filename.to_string());
}
}
if line_trimmed == "All OK" {
info!(set = %set_name, "Direct unpack complete — All OK");
return DirectUnpackResult {
set_name: set_name.to_string(),
success: true,
error: None,
};
}
for pattern in UNRAR_ERROR_PATTERNS {
if line_trimmed.contains(pattern) {
error!(
set = %set_name,
error = %line_trimmed,
"Direct unpack error detected"
);
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(line_trimmed.to_string()),
};
}
}
if line_trimmed.contains("[C]ontinue, [Q]uit")
|| line_trimmed.contains("[C]ontinue, [Q]uit")
{
debug!(
set = %set_name,
next_volume,
"Unrar requesting next volume"
);
match wait_for_volume(set_name, next_volume, state, volume_ready, killed, rt) {
Ok(()) => {
let mut sin = stdin.lock();
if let Err(e) = sin.write_all(b"C\n") {
error!(error = %e, "Failed to write to unrar stdin");
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(format!("stdin write error: {e}")),
};
}
let _ = sin.flush();
next_volume += 1;
}
Err(e) => {
let mut sin = stdin.lock();
let _ = sin.write_all(b"Q\n");
let _ = sin.flush();
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(e),
};
}
}
}
if line_trimmed.contains("[R]etry, [A]bort") {
warn!(set = %set_name, "Unrar retry prompt — aborting");
let mut sin = stdin.lock();
let _ = sin.write_all(b"A\n");
let _ = sin.flush();
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some("Unrar requested retry — aborted".to_string()),
};
}
}
Err(e) => {
error!(error = %e, "Error reading unrar stdout");
return DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(format!("stdout read error: {e}")),
};
}
}
}
match child.wait() {
Ok(status) if status.success() => DirectUnpackResult {
set_name: set_name.to_string(),
success: true,
error: None,
},
Ok(status) => DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(format!("unrar exited with status {status}")),
},
Err(e) => DirectUnpackResult {
set_name: set_name.to_string(),
success: false,
error: Some(format!("Failed to wait on unrar: {e}")),
},
}
}
fn wait_for_volume(
set_name: &str,
volume_number: u32,
state: &Mutex<DirectUnpackState>,
volume_ready: &Notify,
killed: &AtomicBool,
rt: &tokio::runtime::Handle,
) -> Result<(), String> {
loop {
if killed.load(Ordering::Acquire) {
return Err("Aborted".to_string());
}
{
let st = state.lock();
if let Some(set) = st.sets.get(set_name)
&& set.volumes.contains_key(&volume_number)
{
return Ok(());
}
if st.download_finished {
return Err(format!(
"Volume {volume_number} of set '{set_name}' not available after download completed"
));
}
}
let notified = volume_ready.notified();
let timeout = std::time::Duration::from_secs(30);
match rt.block_on(async { tokio::time::timeout(timeout, notified).await }) {
Ok(()) => {} Err(_) => {
debug!(
set = %set_name,
volume = volume_number,
"Timeout waiting for volume — retrying"
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_patterns_are_non_empty() {
assert!(!UNRAR_ERROR_PATTERNS.is_empty());
for pattern in UNRAR_ERROR_PATTERNS {
assert!(!pattern.is_empty());
}
}
#[test]
fn test_direct_unpack_state_basics() {
let state = DirectUnpackState {
sets: BTreeMap::new(),
download_finished: false,
};
assert!(state.sets.is_empty());
assert!(!state.download_finished);
}
#[test]
fn test_rar_set_state_volume_tracking() {
let mut set = RarSetState {
set_name: "movie".to_string(),
volumes: BTreeMap::new(),
};
set.volumes
.insert(0, PathBuf::from("/tmp/movie.part001.rar"));
set.volumes
.insert(1, PathBuf::from("/tmp/movie.part002.rar"));
assert!(set.volumes.contains_key(&0));
assert!(set.volumes.contains_key(&1));
assert!(!set.volumes.contains_key(&2));
}
#[tokio::test]
async fn test_direct_unpacker_no_unrar() {
let work_dir = tempfile::tempdir().unwrap();
let output_dir = tempfile::tempdir().unwrap();
let _du = DirectUnpacker::new(work_dir.path(), output_dir.path(), None);
}
#[tokio::test]
async fn test_direct_unpacker_abort() {
let work_dir = tempfile::tempdir().unwrap();
let output_dir = tempfile::tempdir().unwrap();
if let Some(du) = DirectUnpacker::new(work_dir.path(), output_dir.path(), None) {
du.abort();
let results = du.finish().await;
for r in &results {
assert!(!r.success);
}
}
}
}