use crate::config::{Config, PostProcess};
use crate::error::{PostProcessError, Result};
use crate::parity::ParityHandler;
use crate::types::{DownloadId, Event};
use crate::utils::get_unique_path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
mod cleanup;
mod repair;
mod verify;
use cleanup::run_cleanup_stage;
use repair::run_repair_stage;
use verify::run_verify_stage;
pub struct PostProcessor {
event_tx: broadcast::Sender<Event>,
config: Arc<Config>,
parity_handler: Arc<dyn ParityHandler>,
db: Arc<crate::db::Database>,
}
impl PostProcessor {
pub fn new(
event_tx: broadcast::Sender<Event>,
config: Arc<Config>,
parity_handler: Arc<dyn ParityHandler>,
db: Arc<crate::db::Database>,
) -> Self {
Self {
event_tx,
config,
parity_handler,
db,
}
}
pub async fn start_post_processing(
&self,
download_id: DownloadId,
download_path: PathBuf,
post_process: PostProcess,
destination: PathBuf,
) -> Result<PathBuf> {
info!(
download_id = download_id.0,
?post_process,
?download_path,
?destination,
"starting post-processing pipeline"
);
match post_process {
PostProcess::None => {
debug!(
download_id = download_id.0,
"skipping post-processing (mode: None)"
);
Ok(download_path)
}
PostProcess::Verify => {
run_verify_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
Ok(download_path)
}
PostProcess::Repair => {
run_verify_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
run_repair_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
Ok(download_path)
}
PostProcess::Unpack => {
run_verify_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
run_repair_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
let extracted_path = self.run_extract_stage(download_id, &download_path).await?;
Ok(extracted_path)
}
PostProcess::UnpackAndCleanup => {
run_verify_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
run_repair_stage(
download_id,
&download_path,
&self.event_tx,
&self.parity_handler,
)
.await?;
let extracted_path = self.run_extract_stage(download_id, &download_path).await?;
let final_path = self
.run_move_stage(download_id, &extracted_path, &destination)
.await?;
run_cleanup_stage(download_id, &download_path, &self.event_tx, &self.config)
.await?;
Ok(final_path)
}
}
}
pub async fn reextract(
&self,
download_id: DownloadId,
download_path: PathBuf,
destination: PathBuf,
) -> Result<PathBuf> {
info!(
download_id = download_id.0,
?download_path,
?destination,
"starting re-extraction (skip verify/repair)"
);
let extracted_path = self.run_extract_stage(download_id, &download_path).await?;
let final_path = self
.run_move_stage(download_id, &extracted_path, &destination)
.await?;
Ok(final_path)
}
async fn run_extract_stage(
&self,
download_id: DownloadId,
download_path: &Path,
) -> Result<PathBuf> {
debug!(
download_id = download_id.0,
?download_path,
"running extract stage"
);
self.event_tx
.send(Event::Extracting {
id: download_id,
archive: String::new(),
percent: 0.0,
})
.ok();
let archives = self.detect_all_archives(download_path)?;
if archives.is_empty() {
info!(
download_id = download_id.0,
?download_path,
"no archives found in directory, skipping extraction"
);
self.event_tx
.send(Event::ExtractComplete { id: download_id })
.ok();
return Ok(download_path.to_path_buf());
}
info!(
download_id = download_id.0,
archive_count = archives.len(),
"found {} archive(s) to extract",
archives.len()
);
let extract_dest = download_path.join("extracted");
tokio::fs::create_dir_all(&extract_dest).await?;
let passwords = self.collect_extraction_passwords(download_id).await;
self.extract_archives(download_id, &archives, &extract_dest, &passwords)
.await;
self.event_tx
.send(Event::ExtractComplete { id: download_id })
.ok();
info!(
download_id = download_id.0,
?extract_dest,
"extraction stage complete, extracted files in: {:?}",
extract_dest
);
Ok(extract_dest)
}
async fn collect_extraction_passwords(
&self,
download_id: DownloadId,
) -> crate::extraction::PasswordList {
let cached_password = match self.db.get_cached_password(download_id).await {
Ok(Some(pw)) => Some(pw),
_ => None,
};
let passwords = crate::extraction::PasswordList::collect(
cached_password.as_deref(),
None, None, self.config.tools.password_file.as_deref(), self.config.tools.try_empty_password, )
.await;
info!(
download_id = download_id.0,
password_count = passwords.len(),
"collected {} password(s) for extraction",
passwords.len()
);
passwords
}
async fn extract_archives(
&self,
download_id: DownloadId,
archives: &[PathBuf],
extract_dest: &Path,
passwords: &crate::extraction::PasswordList,
) {
for (i, archive_path) in archives.iter().enumerate() {
let archive_name = archive_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
info!(
download_id = download_id.0,
?archive_path,
progress = i + 1,
total = archives.len(),
"extracting archive {}/{}: {}",
i + 1,
archives.len(),
archive_name
);
self.event_tx
.send(Event::Extracting {
id: download_id,
archive: archive_name.to_string(),
percent: (((i as f64) / (archives.len() as f64)) * 100.0) as f32,
})
.ok();
match crate::extraction::extract_recursive(
download_id,
archive_path,
extract_dest,
passwords,
&self.db,
&self.config.processing.extraction,
0, )
.await
{
Ok(extracted_files) => {
info!(
download_id = download_id.0,
?archive_path,
extracted_count = extracted_files.len(),
"successfully extracted {} files from {}",
extracted_files.len(),
archive_name
);
}
Err(e) => {
warn!(
download_id = download_id.0,
?archive_path,
error = %e,
"failed to extract archive {}, continuing with others",
archive_name
);
}
}
}
}
fn detect_all_archives(&self, download_path: &Path) -> Result<Vec<PathBuf>> {
let mut all_archives = Vec::new();
let rar_archives = crate::extraction::RarExtractor::detect_rar_files(download_path)?;
all_archives.extend(rar_archives);
let sevenzip_archives =
crate::extraction::SevenZipExtractor::detect_7z_files(download_path)?;
all_archives.extend(sevenzip_archives);
let zip_archives = crate::extraction::ZipExtractor::detect_zip_files(download_path)?;
all_archives.extend(zip_archives);
Ok(all_archives)
}
async fn run_move_stage(
&self,
download_id: DownloadId,
source_path: &Path,
destination: &Path,
) -> Result<PathBuf> {
debug!(
download_id = download_id.0,
?source_path,
?destination,
"running move stage"
);
self.event_tx
.send(Event::Moving {
id: download_id,
destination: destination.to_path_buf(),
})
.ok();
self.move_files(download_id, source_path, destination).await
}
async fn move_files(
&self,
download_id: DownloadId,
source_path: &Path,
destination: &Path,
) -> Result<PathBuf> {
use tokio::fs;
debug!(
download_id = download_id.0,
?source_path,
?destination,
"moving files with collision action: {:?}",
self.config.download.file_collision
);
let source_metadata = match fs::metadata(source_path).await {
Ok(meta) => meta,
Err(_) => {
return Err(crate::error::Error::PostProcess(
PostProcessError::InvalidPath {
path: source_path.to_path_buf(),
reason: "Source path does not exist".to_string(),
},
));
}
};
if let Some(parent) = destination.parent() {
fs::create_dir_all(parent).await?;
}
if source_metadata.is_file() {
return self
.move_single_file(download_id, source_path, destination)
.await;
}
if source_metadata.is_dir() {
return self
.move_directory_contents(download_id, source_path, destination)
.await;
}
Err(crate::error::Error::PostProcess(
PostProcessError::InvalidPath {
path: source_path.to_path_buf(),
reason: "Source is neither a file nor a directory".to_string(),
},
))
}
async fn move_single_file(
&self,
download_id: DownloadId,
source_file: &Path,
destination: &Path,
) -> Result<PathBuf> {
use tokio::fs;
let final_destination = get_unique_path(destination, self.config.download.file_collision)?;
debug!(
download_id = download_id.0,
?source_file,
?final_destination,
"moving single file"
);
fs::rename(source_file, &final_destination).await?;
info!(
download_id = download_id.0,
?source_file,
?final_destination,
"successfully moved file"
);
Ok(final_destination)
}
fn move_directory_contents<'a>(
&'a self,
download_id: DownloadId,
source_dir: &'a Path,
destination: &'a Path,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<PathBuf>> + Send + 'a>> {
Box::pin(async move {
use tokio::fs;
debug!(
download_id = download_id.0,
?source_dir,
?destination,
"moving directory contents"
);
fs::create_dir_all(destination).await?;
let mut entries = fs::read_dir(source_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let source_entry_path = entry.path();
let entry_name = entry.file_name();
let dest_entry_path = destination.join(&entry_name);
let file_type = entry.file_type().await?;
if file_type.is_file() {
self.move_single_file(download_id, &source_entry_path, &dest_entry_path)
.await?;
} else if file_type.is_dir() {
self.move_directory_contents(download_id, &source_entry_path, &dest_entry_path)
.await?;
fs::remove_dir(&source_entry_path).await?;
}
}
info!(
download_id = download_id.0,
?source_dir,
?destination,
"successfully moved directory contents"
);
Ok(destination.to_path_buf())
})
}
}
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
mod tests;