use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64};
use crate::config::PostProcess;
use crate::types::Event;
use super::batching::{download_articles, fetch_download_record};
use super::context::{DownloadTaskContext, OutputFiles};
use super::finalization::finalize_download;
pub(super) struct DownloadResults {
pub(super) success_count: usize,
pub(super) failed_count: usize,
pub(super) first_error: Option<String>,
pub(super) total_articles: usize,
pub(super) individually_failed: u64,
}
pub(crate) async fn run_download_task(ctx: DownloadTaskContext) {
let id = ctx.id;
let (download, pending_articles) = match fetch_download_record(&ctx).await {
Some(pair) => pair,
None => return, };
if pending_articles.is_empty() {
match ctx.db.has_any_pending_articles(id).await {
Ok(true) => {
if let Err(e) = ctx.db.update_status(id, crate::types::Status::Paused.to_i32()).await {
tracing::error!(
download_id = id.0,
error = %e,
"Failed to keep paused-only download paused"
);
}
ctx.remove_from_active().await;
return;
}
Ok(false) => {
ctx.event_tx
.send(Event::DownloadComplete {
id,
articles_failed: None,
articles_total: None,
})
.ok();
ctx.remove_from_active().await;
ctx.spawn_post_processing();
return;
}
Err(e) => {
tracing::error!(
download_id = id.0,
error = %e,
"Failed to check for remaining paused work"
);
ctx.remove_from_active().await;
return;
}
}
}
let download_temp_dir = ctx
.config
.download
.temp_dir
.join(format!("download_{}", id.0));
if let Err(e) = tokio::fs::create_dir_all(&download_temp_dir).await {
let msg = format!("Failed to create temp directory: {}", e);
tracing::error!(download_id = id.0, error = %e, "Failed to create temp directory");
ctx.mark_failed(&msg).await;
ctx.remove_from_active().await;
return;
}
let download_files = match ctx.db.get_download_files(id).await {
Ok(files) => files,
Err(e) => {
let msg = format!("Failed to get download files: {}", e);
tracing::error!(download_id = id.0, error = %e, "Failed to get download files");
ctx.mark_failed(&msg).await;
ctx.remove_from_active().await;
return;
}
};
let output_files = if download_files.is_empty() {
Arc::new(OutputFiles {
files: HashMap::new(),
})
} else {
match OutputFiles::create(&download_files, &download_temp_dir) {
Ok(of) => Arc::new(of),
Err(e) => {
let msg = format!("Failed to create output files: {}", e);
tracing::error!(download_id = id.0, error = %e, "Failed to create output files");
ctx.mark_failed(&msg).await;
ctx.remove_from_active().await;
return;
}
}
};
let failed_articles = Arc::new(AtomicU64::new(0));
let (file_completion_tx, file_completion_rx) =
tokio::sync::mpsc::unbounded_channel::<i32>();
let file_article_counts: HashMap<i32, u32> = {
let mut counts: HashMap<i32, u32> = HashMap::new();
for article in &pending_articles {
*counts.entry(article.file_index).or_default() += 1;
}
counts
};
let file_completion_tracker = Arc::new(
super::context::FileCompletionTracker::new(file_article_counts, file_completion_tx),
);
let post_process = PostProcess::from_i32(download.post_process);
let direct_unpack_enabled = ctx.config.processing.direct_unpack.enabled
&& matches!(
post_process,
PostProcess::Unpack | PostProcess::UnpackAndCleanup
);
let download_complete = Arc::new(AtomicBool::new(false));
let direct_unpack_handle = if direct_unpack_enabled {
let coordinator = super::super::direct_unpack::DirectUnpackCoordinator::new(
id,
Arc::clone(&ctx.db),
Arc::clone(&ctx.config),
ctx.event_tx.clone(),
ctx.cancel_token.child_token(),
download_temp_dir.clone(),
Arc::clone(&failed_articles),
Arc::clone(&download_complete),
file_completion_rx,
);
Some(tokio::spawn(coordinator.run()))
} else {
None
};
let _total_articles = pending_articles.len();
let total_size_bytes = download.size_bytes as u64;
let results = download_articles(
&ctx,
pending_articles,
total_size_bytes,
&download_temp_dir,
&output_files,
&failed_articles,
&file_completion_tracker,
)
.await;
download_complete.store(true, std::sync::atomic::Ordering::Release);
if let Some(handle) = direct_unpack_handle {
match handle.await {
Ok(_result) => {
tracing::debug!(download_id = id.0, "DirectUnpack coordinator finished");
}
Err(e) => {
tracing::error!(
download_id = id.0,
error = %e,
"DirectUnpack coordinator task panicked"
);
}
}
}
finalize_download(ctx, results, total_size_bytes).await;
}