use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
use parking_lot::Mutex;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::nzb_core::config::ServerConfig;
use crate::nzb_core::models::NzbJob;
use crate::nzb_core::nzb_nntp::Pipeline;
use crate::nzb_core::nzb_nntp::connection::NntpConnection;
use crate::nzb_core::nzb_nntp::error::NntpError;
use nzb_decode::FileAssembler;
use nzb_decode::yenc::decode_yenc;
use crate::bandwidth::BandwidthLimiter;
const MAX_TRIES_PER_SERVER: u32 = 3;
const RECONNECT_DELAY: Duration = Duration::from_secs(5);
const MAX_RECONNECT_ATTEMPTS: u32 = 5;
const WORKER_RAMP_DELAY: Duration = Duration::from_millis(15);
const CIRCUIT_BREAK_THRESHOLD: u32 = 3;
const AUTH_FAILURE_COOLDOWN: Duration = Duration::from_secs(120);
const TRANSIENT_FAILURE_COOLDOWN: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub struct ServerHealth {
pub consecutive_failures: u32,
pub disabled_until: Option<Instant>,
pub reason: Option<String>,
pub is_auth_failure: bool,
}
impl Default for ServerHealth {
fn default() -> Self {
Self::new()
}
}
impl ServerHealth {
pub fn new() -> Self {
Self {
consecutive_failures: 0,
disabled_until: None,
reason: None,
is_auth_failure: false,
}
}
pub fn is_available(&self) -> bool {
match self.disabled_until {
None => true,
Some(until) => Instant::now() >= until,
}
}
pub fn record_failure(&mut self, is_auth: bool, reason: &str) {
self.consecutive_failures += 1;
self.is_auth_failure = is_auth;
self.reason = Some(reason.to_string());
if is_auth || self.consecutive_failures >= CIRCUIT_BREAK_THRESHOLD {
let cooldown = if is_auth {
AUTH_FAILURE_COOLDOWN
} else {
TRANSIENT_FAILURE_COOLDOWN
};
self.disabled_until = Some(Instant::now() + cooldown);
}
}
pub fn record_success(&mut self) {
*self = Self::new();
}
}
pub type ServerHealthMap = Arc<Mutex<HashMap<String, ServerHealth>>>;
#[derive(Debug, Clone)]
pub enum ProgressUpdate {
ArticleComplete {
job_id: String,
file_id: String,
segment_number: u32,
decoded_bytes: u64,
file_complete: bool,
server_id: Option<String>,
},
ArticleFailed {
job_id: String,
file_id: String,
segment_number: u32,
error: String,
server_id: Option<String>,
},
JobFinished {
job_id: String,
success: bool,
articles_failed: usize,
},
NoServersAvailable { job_id: String, reason: String },
}
#[derive(Debug, Clone)]
struct WorkItem {
job_id: String,
file_id: String,
filename: String,
message_id: String,
segment_number: u32,
tried_servers: Vec<String>,
tries_on_current: u32,
}
pub struct DownloadEngine {
cancelled: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
pub total_decode_us: Arc<AtomicU64>,
pub total_assemble_us: Arc<AtomicU64>,
pub total_articles_decoded: Arc<AtomicU64>,
}
impl Default for DownloadEngine {
fn default() -> Self {
Self::new()
}
}
impl DownloadEngine {
pub fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
total_decode_us: Arc::new(AtomicU64::new(0)),
total_assemble_us: Arc::new(AtomicU64::new(0)),
total_articles_decoded: Arc::new(AtomicU64::new(0)),
}
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
pub fn pause(&self) {
self.paused.store(true, Ordering::Relaxed);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::Relaxed);
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Relaxed)
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
pub async fn run(
&self,
job: &NzbJob,
servers: Arc<Mutex<Vec<ServerConfig>>>,
server_health: ServerHealthMap,
progress_tx: mpsc::UnboundedSender<ProgressUpdate>,
bandwidth: Arc<BandwidthLimiter>,
) {
let job_id = job.id.clone();
let engine_start = Instant::now();
let mut work_items: Vec<WorkItem> = job
.files
.iter()
.flat_map(|file| {
file.articles
.iter()
.enumerate()
.filter(|(_, a)| !a.downloaded)
.map(move |(idx, article)| WorkItem {
job_id: job.id.clone(),
file_id: file.id.clone(),
filename: file.filename.clone(),
message_id: article.message_id.clone(),
segment_number: (idx as u32) + 1,
tried_servers: Vec::new(),
tries_on_current: 0,
})
})
.collect();
work_items.sort_by_key(|item| par2_sort_key(&item.filename));
if work_items.is_empty() {
let _ = progress_tx.send(ProgressUpdate::JobFinished {
job_id,
success: true,
articles_failed: 0,
});
return;
}
info!(
job_id = %job_id,
articles = work_items.len(),
"Starting download engine"
);
let assembler = Arc::new(FileAssembler::new());
for file in &job.files {
let output_path = job.work_dir.join(&file.filename);
if let Err(e) =
assembler.register_file(&job.id, &file.id, output_path, file.articles.len() as u32)
{
error!(file = %file.filename, "Failed to register file for assembly: {e}");
let _ = progress_tx.send(ProgressUpdate::JobFinished {
job_id,
success: false,
articles_failed: work_items.len(),
});
return;
}
}
let work_queue = Arc::new(Mutex::new(VecDeque::from(work_items)));
let articles_failed = Arc::new(AtomicUsize::new(0));
self.total_decode_us.store(0, Ordering::Relaxed);
self.total_assemble_us.store(0, Ordering::Relaxed);
self.total_articles_decoded.store(0, Ordering::Relaxed);
let nzb_filenames: HashMap<String, String> = job
.files
.iter()
.map(|f| (f.id.clone(), f.filename.clone()))
.collect();
let yenc_names: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
let sorted_servers: Vec<ServerConfig> = {
let srv = servers.lock();
let mut v: Vec<ServerConfig> = srv.iter().filter(|s| s.enabled).cloned().collect();
v.sort_by_key(|s| s.priority);
v
};
info!(
job_id = %job_id,
total_servers = servers.lock().len(),
enabled_servers = sorted_servers.len(),
server_names = %sorted_servers.iter().map(|s| s.name.as_str()).collect::<Vec<_>>().join(", "),
"Download engine server list"
);
if sorted_servers.is_empty() {
error!(job_id = %job_id, "No enabled servers configured");
let _ = progress_tx.send(ProgressUpdate::NoServersAvailable {
job_id,
reason: "No enabled servers configured".into(),
});
return;
}
{
let mut any_ok = false;
for server in &sorted_servers {
info!(
job_id = %job_id,
server = %server.name,
host = %server.host,
port = server.port,
ssl = server.ssl,
"Pre-flight: testing server connectivity"
);
let mut conn = NntpConnection::new(format!("preflight-{}", server.id));
match tokio::time::timeout(Duration::from_secs(5), conn.connect(server)).await {
Ok(Ok(())) => {
info!(
job_id = %job_id,
server = %server.name,
"Pre-flight: server OK"
);
let _ = conn.quit().await;
any_ok = true;
break;
}
Ok(Err(e)) => {
warn!(
job_id = %job_id,
server = %server.name,
error = %e,
"Pre-flight: server connection failed"
);
}
Err(_) => {
warn!(
job_id = %job_id,
server = %server.name,
"Pre-flight: server connection timed out (5s)"
);
}
}
}
if !any_ok {
warn!(
job_id = %job_id,
servers_tested = sorted_servers.len(),
"Pre-flight failed — proceeding anyway (workers will retry connections)"
);
}
}
let mut worker_handles = Vec::new();
for server in &sorted_servers {
let num_conns = server.connections.min(500) as usize;
for conn_idx in 0..num_conns {
let handle = tokio::spawn({
let server_config = server.clone();
let work_queue = Arc::clone(&work_queue);
let assembler = Arc::clone(&assembler);
let progress_tx = progress_tx.clone();
let cancelled = Arc::clone(&self.cancelled);
let paused = Arc::clone(&self.paused);
let articles_failed = Arc::clone(&articles_failed);
let all_servers = Arc::clone(&servers);
let server_health = Arc::clone(&server_health);
let yenc_names = Arc::clone(¥c_names);
let total_decode_us = Arc::clone(&self.total_decode_us);
let total_assemble_us = Arc::clone(&self.total_assemble_us);
let total_articles_decoded = Arc::clone(&self.total_articles_decoded);
let bandwidth = Arc::clone(&bandwidth);
async move {
download_worker(
server_config,
conn_idx,
work_queue,
assembler,
progress_tx,
cancelled,
paused,
articles_failed,
all_servers,
server_health,
yenc_names,
total_decode_us,
total_assemble_us,
total_articles_decoded,
bandwidth,
)
.await;
}
});
worker_handles.push(handle);
}
}
for handle in worker_handles {
let _ = handle.await;
}
let download_elapsed = engine_start.elapsed();
let total_bytes = job.total_bytes;
let throughput_mbps = if download_elapsed.as_secs_f64() > 0.001 {
(total_bytes as f64 / download_elapsed.as_secs_f64()) / (1024.0 * 1024.0)
} else {
0.0
};
let decode_total_us = self.total_decode_us.load(Ordering::Relaxed);
let assemble_total_us = self.total_assemble_us.load(Ordering::Relaxed);
let articles_decoded = self.total_articles_decoded.load(Ordering::Relaxed);
info!(
job_id = %job_id,
elapsed_secs = download_elapsed.as_secs_f64(),
total_bytes,
throughput_mbps = format!("{throughput_mbps:.2}"),
"Download phase complete"
);
info!(
job_id = %job_id,
articles_decoded,
decode_secs = format!("{:.3}", decode_total_us as f64 / 1_000_000.0),
assemble_secs = format!("{:.3}", assemble_total_us as f64 / 1_000_000.0),
decode_pct = format!("{:.1}", decode_total_us as f64 / download_elapsed.as_micros() as f64 * 100.0),
assemble_pct = format!("{:.1}", assemble_total_us as f64 / download_elapsed.as_micros() as f64 * 100.0),
"Decode timing summary (cumulative across all workers)"
);
{
let remaining_count = work_queue.lock().len();
if remaining_count > 0 {
let all_broken = {
let health = server_health.lock();
let srv = servers.lock();
srv.iter()
.filter(|s| s.enabled)
.all(|s| health.get(&s.id).is_some_and(|h| !h.is_available()))
};
if all_broken {
let reason = {
let health = server_health.lock();
health
.values()
.filter_map(|h| h.reason.as_deref())
.next()
.unwrap_or("All servers unavailable")
.to_string()
};
warn!(
job_id = %job_id,
remaining_articles = remaining_count,
"All servers circuit-broken — pausing job for user intervention"
);
let _ = progress_tx.send(ProgressUpdate::NoServersAvailable { job_id, reason });
return;
}
}
}
let remaining: Vec<WorkItem> = work_queue.lock().drain(..).collect();
for item in remaining {
articles_failed.fetch_add(1, Ordering::Relaxed);
warn!(
article = %item.message_id,
"Article could not be downloaded — no available server"
);
let _ = progress_tx.send(ProgressUpdate::ArticleFailed {
job_id: item.job_id,
file_id: item.file_id,
segment_number: item.segment_number,
error: "No available server could download this article".into(),
server_id: None,
});
}
{
let renames = yenc_names.lock();
for (file_id, yenc_name) in renames.iter() {
if let Some(nzb_name) = nzb_filenames.get(file_id) {
if nzb_name == yenc_name {
continue;
}
let clean_yenc = std::path::Path::new(yenc_name.as_str())
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(yenc_name);
if clean_yenc.is_empty() || nzb_name == clean_yenc {
continue;
}
let nzb_has_ext = has_known_extension(nzb_name);
let yenc_has_ext = has_known_extension(clean_yenc);
let (old_name, new_name) = if yenc_has_ext && !nzb_has_ext {
(nzb_name.as_str(), clean_yenc)
} else if nzb_has_ext && !yenc_has_ext {
continue;
} else if yenc_has_ext && nzb_has_ext {
(nzb_name.as_str(), clean_yenc)
} else {
continue;
};
let old_path = job.work_dir.join(old_name);
let new_path = job.work_dir.join(new_name);
if old_path.exists() && !new_path.exists() {
if let Err(e) = std::fs::rename(&old_path, &new_path) {
warn!(
job_id = %job_id,
from = %old_name,
to = %new_name,
"Failed to deobfuscate file: {e}"
);
} else {
info!(
job_id = %job_id,
from = %old_name,
to = %new_name,
"Deobfuscated file"
);
}
}
}
}
}
let failed_count = articles_failed.load(Ordering::Relaxed);
let _ = progress_tx.send(ProgressUpdate::JobFinished {
job_id,
success: failed_count == 0,
articles_failed: failed_count,
});
}
}
#[allow(clippy::too_many_arguments)]
async fn download_worker(
primary_server: ServerConfig,
conn_idx: usize,
work_queue: Arc<Mutex<VecDeque<WorkItem>>>,
assembler: Arc<FileAssembler>,
progress_tx: mpsc::UnboundedSender<ProgressUpdate>,
cancelled: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
articles_failed: Arc<AtomicUsize>,
all_servers: Arc<Mutex<Vec<ServerConfig>>>,
server_health: ServerHealthMap,
yenc_names: Arc<Mutex<HashMap<String, String>>>,
total_decode_us: Arc<AtomicU64>,
total_assemble_us: Arc<AtomicU64>,
total_articles_decoded: Arc<AtomicU64>,
bandwidth: Arc<BandwidthLimiter>,
) {
let worker_id = format!("{}#{}", primary_server.id, conn_idx);
if conn_idx > 0 {
let stagger = WORKER_RAMP_DELAY * conn_idx as u32;
debug!(
worker = %worker_id,
server = %primary_server.name,
delay_ms = stagger.as_millis(),
"Worker stagger delay before connect"
);
tokio::time::sleep(stagger).await;
}
{
let health = server_health.lock();
if let Some(h) = health.get(&primary_server.id)
&& !h.is_available()
{
info!(
worker = %worker_id,
server = %primary_server.name,
reason = h.reason.as_deref().unwrap_or("unknown"),
"Server circuit-broken — worker exiting without connecting"
);
return;
}
}
info!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
port = primary_server.port,
ssl = primary_server.ssl,
conn_idx,
"Worker starting — connecting to primary server"
);
let mut conn = NntpConnection::new(worker_id.clone());
if let Err(e) = connect_with_retry(
&mut conn,
&primary_server,
&worker_id,
&server_health,
&all_servers,
)
.await
{
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
"Worker FAILED to connect after all retries: {e} — worker exiting"
);
return;
}
let pipe_depth = primary_server.pipelining.max(1);
info!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
pipelining = pipe_depth,
"Worker connected and ready"
);
if pipe_depth <= 1 {
download_worker_serial(
&mut conn,
&primary_server,
&worker_id,
&work_queue,
&assembler,
&progress_tx,
&cancelled,
&paused,
&articles_failed,
&all_servers,
&server_health,
¥c_names,
&total_decode_us,
&total_assemble_us,
&total_articles_decoded,
&bandwidth,
)
.await;
} else {
download_worker_pipelined(
&mut conn,
&primary_server,
&worker_id,
pipe_depth,
&work_queue,
&assembler,
&progress_tx,
&cancelled,
&paused,
&articles_failed,
&all_servers,
&server_health,
¥c_names,
&total_decode_us,
&total_assemble_us,
&total_articles_decoded,
&bandwidth,
)
.await;
}
let _ = conn.quit().await;
}
#[allow(clippy::too_many_arguments)]
async fn download_worker_pipelined(
conn: &mut NntpConnection,
primary_server: &ServerConfig,
worker_id: &str,
pipe_depth: u8,
work_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
assembler: &Arc<FileAssembler>,
progress_tx: &mpsc::UnboundedSender<ProgressUpdate>,
cancelled: &Arc<AtomicBool>,
paused: &Arc<AtomicBool>,
articles_failed: &Arc<AtomicUsize>,
all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
server_health: &ServerHealthMap,
yenc_names: &Arc<Mutex<HashMap<String, String>>>,
total_decode_us: &Arc<AtomicU64>,
total_assemble_us: &Arc<AtomicU64>,
total_articles_decoded: &Arc<AtomicU64>,
bandwidth: &Arc<BandwidthLimiter>,
) {
let mut pipeline = Pipeline::new(pipe_depth);
let mut in_flight_items: HashMap<u64, WorkItem> = HashMap::new();
let mut next_tag: u64 = 0;
let mut consecutive_errors: u32 = 0;
let mut perf_articles: u64 = 0;
let mut perf_bytes: u64 = 0;
let mut perf_queue_lock_us: u64 = 0;
let mut perf_receive_us: u64 = 0;
let mut perf_decode_us: u64 = 0;
let mut perf_assemble_us: u64 = 0;
let mut perf_bandwidth_us: u64 = 0;
let mut perf_yield_us: u64 = 0;
let mut perf_flush_us: u64 = 0;
let mut perf_last_log = Instant::now();
const PERF_LOG_INTERVAL: Duration = Duration::from_secs(10);
loop {
if cancelled.load(Ordering::Relaxed) {
break;
}
while paused.load(Ordering::Relaxed) && !cancelled.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(250)).await;
}
if cancelled.load(Ordering::Relaxed) {
break;
}
let server_disabled = all_servers
.lock()
.iter()
.find(|s| s.id == primary_server.id)
.is_none_or(|s| !s.enabled);
if server_disabled {
info!(worker = %worker_id, server = %primary_server.name, "Server disabled, worker exiting");
requeue_all(&mut in_flight_items, work_queue);
break;
}
{
let health = server_health.lock();
if let Some(h) = health.get(&primary_server.id)
&& !h.is_available()
{
info!(
worker = %worker_id,
server = %primary_server.name,
reason = h.reason.as_deref().unwrap_or("unknown"),
"Server circuit-broken, worker exiting"
);
requeue_all(&mut in_flight_items, work_queue);
break;
}
}
let mut consecutive_skips: usize = 0;
while pipeline.pending_count() + pipeline.in_flight_count() < pipe_depth as usize {
let lock_t = Instant::now();
let item = { work_queue.lock().pop_front() };
perf_queue_lock_us += lock_t.elapsed().as_micros() as u64;
let Some(item) = item else {
break;
};
if item.tried_servers.contains(&primary_server.id) {
let queue_len = {
let mut q = work_queue.lock();
q.push_back(item);
q.len()
};
consecutive_skips += 1;
if consecutive_skips > queue_len {
break;
}
continue;
}
consecutive_skips = 0;
let tag = next_tag;
next_tag += 1;
pipeline.submit(item.message_id.clone(), tag);
in_flight_items.insert(tag, item);
}
if pipeline.is_empty() && in_flight_items.is_empty() {
debug!(worker = %worker_id, "Pipeline empty, work queue exhausted, exiting");
break;
}
let flush_t = Instant::now();
if let Err(e) = pipeline.flush_sends(conn).await {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
error = %e,
in_flight = in_flight_items.len(),
"Pipeline send error — re-queuing all in-flight items"
);
requeue_all(&mut in_flight_items, work_queue);
consecutive_errors += 1;
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
warn!(
worker = %worker_id,
server = %primary_server.name,
consecutive_errors,
"Too many pipeline errors — worker exiting permanently"
);
break;
}
info!(
worker = %worker_id,
server = %primary_server.name,
delay_secs = RECONNECT_DELAY.as_secs(),
consecutive_errors,
"Waiting before pipeline reconnect"
);
tokio::time::sleep(RECONNECT_DELAY).await;
*conn = NntpConnection::new(worker_id.to_string());
if let Err(e) =
connect_with_retry(conn, primary_server, worker_id, server_health, all_servers)
.await
{
warn!(worker = %worker_id, server = %primary_server.name, "Pipeline reconnect FAILED: {e} — worker exiting");
break;
}
info!(worker = %worker_id, server = %primary_server.name, "Pipeline reconnected successfully");
pipeline = Pipeline::new(pipe_depth);
continue;
}
perf_flush_us += flush_t.elapsed().as_micros() as u64;
let recv_t = Instant::now();
let result = pipeline.receive_one(conn).await;
perf_receive_us += recv_t.elapsed().as_micros() as u64;
match result {
Ok(Some(pipe_result)) => {
let Some(mut item) = in_flight_items.remove(&pipe_result.request.tag) else {
continue;
};
match pipe_result.result {
Ok(response) => {
consecutive_errors = 0;
let raw_data = response.data.unwrap_or_default();
let yield_t = Instant::now();
tokio::task::yield_now().await;
perf_yield_us += yield_t.elapsed().as_micros() as u64;
match decode_and_assemble(&item, &raw_data, assembler) {
Ok(process_result) => {
perf_decode_us += process_result.decode_us;
perf_assemble_us += process_result.assemble_us;
perf_bytes += process_result.decoded_bytes;
perf_articles += 1;
total_decode_us
.fetch_add(process_result.decode_us, Ordering::Relaxed);
total_assemble_us
.fetch_add(process_result.assemble_us, Ordering::Relaxed);
total_articles_decoded.fetch_add(1, Ordering::Relaxed);
if let Some(ref yname) = process_result.yenc_filename {
yenc_names
.lock()
.entry(item.file_id.clone())
.or_insert_with(|| crate::util::normalize_nfc(yname));
}
let bw_t = Instant::now();
if let Some(n) =
std::num::NonZeroU32::new(process_result.decoded_bytes as u32)
{
let _ = bandwidth.acquire_download(n).await;
}
perf_bandwidth_us += bw_t.elapsed().as_micros() as u64;
let _ = progress_tx.send(ProgressUpdate::ArticleComplete {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
decoded_bytes: process_result.decoded_bytes,
file_complete: process_result.file_complete,
server_id: Some(primary_server.id.clone()),
});
if perf_last_log.elapsed() >= PERF_LOG_INTERVAL {
let elapsed = perf_last_log.elapsed().as_secs_f64();
let mbps = perf_bytes as f64 / elapsed / (1024.0 * 1024.0);
info!(
worker = %worker_id,
articles = perf_articles,
throughput_mbps = format!("{mbps:.1}"),
recv_ms = perf_receive_us / 1000,
decode_ms = perf_decode_us / 1000,
assemble_ms = perf_assemble_us / 1000,
queue_lock_ms = perf_queue_lock_us / 1000,
flush_ms = perf_flush_us / 1000,
yield_ms = perf_yield_us / 1000,
bw_wait_ms = perf_bandwidth_us / 1000,
"Worker perf summary"
);
perf_articles = 0;
perf_bytes = 0;
perf_queue_lock_us = 0;
perf_receive_us = 0;
perf_decode_us = 0;
perf_assemble_us = 0;
perf_bandwidth_us = 0;
perf_yield_us = 0;
perf_flush_us = 0;
perf_last_log = Instant::now();
}
}
Err(ArticleError::DecodeError(msg)) => {
handle_article_not_available(
&mut item,
primary_server,
all_servers,
server_health,
articles_failed,
work_queue,
progress_tx,
&format!("Decode error: {msg}"),
);
}
Err(ArticleError::AssemblyError(msg)) => {
articles_failed.fetch_add(1, Ordering::Relaxed);
error!(article = %item.message_id, "Assembly error: {msg}");
let _ = progress_tx.send(ProgressUpdate::ArticleFailed {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
error: format!("Assembly error: {msg}"),
server_id: Some(primary_server.id.clone()),
});
}
Err(_) => {}
}
}
Err(NntpError::ArticleNotFound(_)) => {
handle_article_not_available(
&mut item,
primary_server,
all_servers,
server_health,
articles_failed,
work_queue,
progress_tx,
"Article not found on any server",
);
}
Err(NntpError::Connection(_) | NntpError::Io(_)) => {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
article = %item.message_id,
in_flight = in_flight_items.len(),
consecutive_errors,
"Pipeline: connection lost during receive — re-queuing all"
);
work_queue.lock().push_front(item);
requeue_all(&mut in_flight_items, work_queue);
consecutive_errors += 1;
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
warn!(
worker = %worker_id,
server = %primary_server.name,
consecutive_errors,
"Too many pipeline connection losses — worker exiting"
);
break;
}
info!(
worker = %worker_id,
delay_secs = RECONNECT_DELAY.as_secs(),
"Waiting before pipeline reconnect"
);
tokio::time::sleep(RECONNECT_DELAY).await;
*conn = NntpConnection::new(worker_id.to_string());
if let Err(e) = connect_with_retry(
conn,
primary_server,
worker_id,
server_health,
all_servers,
)
.await
{
warn!(worker = %worker_id, server = %primary_server.name, "Pipeline reconnect FAILED: {e}");
break;
}
info!(worker = %worker_id, server = %primary_server.name, "Pipeline reconnected after connection loss");
pipeline = Pipeline::new(pipe_depth);
continue;
}
Err(e) => {
warn!(worker = %worker_id, article = %item.message_id, "Pipeline error: {e}");
handle_article_not_available(
&mut item,
primary_server,
all_servers,
server_health,
articles_failed,
work_queue,
progress_tx,
&format!("Pipeline error: {e}"),
);
}
}
}
Ok(None) => {
}
Err(e) => {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
error = %e,
in_flight = in_flight_items.len(),
consecutive_errors,
"Pipeline receive error"
);
requeue_all(&mut in_flight_items, work_queue);
consecutive_errors += 1;
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
warn!(
worker = %worker_id,
server = %primary_server.name,
consecutive_errors,
"Too many pipeline receive errors — worker exiting"
);
break;
}
info!(
worker = %worker_id,
delay_secs = RECONNECT_DELAY.as_secs(),
"Waiting before pipeline reconnect"
);
tokio::time::sleep(RECONNECT_DELAY).await;
*conn = NntpConnection::new(worker_id.to_string());
if let Err(e) =
connect_with_retry(conn, primary_server, worker_id, server_health, all_servers)
.await
{
warn!(worker = %worker_id, server = %primary_server.name, "Pipeline reconnect FAILED: {e}");
break;
}
info!(worker = %worker_id, server = %primary_server.name, "Pipeline reconnected after receive error");
pipeline = Pipeline::new(pipe_depth);
}
}
}
}
fn par2_sort_key(filename: &str) -> u8 {
let lower = filename.to_lowercase();
if lower.ends_with(".par2") {
if lower.contains(".vol") {
1 } else {
0 }
} else {
2 }
}
fn has_known_extension(name: &str) -> bool {
let lower = name.to_lowercase();
if let Some(dot_pos) = lower.rfind('.') {
let ext = &lower[dot_pos + 1..];
matches!(
ext,
"rar" | "r00" | "r01" | "r02" | "r03" | "r04" | "r05"
| "zip" | "7z" | "gz" | "bz2" | "xz" | "tar"
| "mkv" | "mp4" | "avi" | "wmv" | "ts" | "m4v" | "mov" | "mpg" | "mpeg"
| "mp3" | "flac" | "ogg" | "m4a" | "aac" | "wav"
| "srt" | "sub" | "idx" | "ass" | "ssa" | "sup"
| "nfo" | "jpg" | "jpeg" | "png" | "gif" | "bmp"
| "par2"
| "001" | "002" | "003" | "004" | "005"
)
} else {
false
}
}
#[allow(clippy::too_many_arguments)]
fn handle_article_not_available(
item: &mut WorkItem,
primary_server: &ServerConfig,
all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
server_health: &ServerHealthMap,
articles_failed: &Arc<AtomicUsize>,
work_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
progress_tx: &mpsc::UnboundedSender<ProgressUpdate>,
error_msg: &str,
) {
item.tried_servers.push(primary_server.id.clone());
item.tries_on_current = 0;
let all_tried = {
let servers = all_servers.lock();
let health = server_health.lock();
servers.iter().filter(|s| s.enabled).all(|s| {
item.tried_servers.contains(&s.id)
|| health.get(&s.id).is_some_and(|h| !h.is_available())
})
};
if all_tried {
articles_failed.fetch_add(1, Ordering::Relaxed);
warn!(article = %item.message_id, "{error_msg}");
let _ = progress_tx.send(ProgressUpdate::ArticleFailed {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
error: error_msg.to_string(),
server_id: Some(primary_server.id.clone()),
});
} else {
work_queue.lock().push_back(item.clone());
}
}
fn requeue_all(
in_flight: &mut HashMap<u64, WorkItem>,
work_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
) {
let items: Vec<WorkItem> = in_flight.drain().map(|(_, item)| item).collect();
if !items.is_empty() {
let mut q = work_queue.lock();
for item in items {
q.push_front(item);
}
}
}
#[allow(clippy::too_many_arguments)]
async fn download_worker_serial(
conn: &mut NntpConnection,
primary_server: &ServerConfig,
worker_id: &str,
work_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
assembler: &Arc<FileAssembler>,
progress_tx: &mpsc::UnboundedSender<ProgressUpdate>,
cancelled: &Arc<AtomicBool>,
paused: &Arc<AtomicBool>,
articles_failed: &Arc<AtomicUsize>,
all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
server_health: &ServerHealthMap,
yenc_names: &Arc<Mutex<HashMap<String, String>>>,
total_decode_us: &Arc<AtomicU64>,
total_assemble_us: &Arc<AtomicU64>,
total_articles_decoded: &Arc<AtomicU64>,
bandwidth: &Arc<BandwidthLimiter>,
) {
let mut consecutive_errors: u32 = 0;
let mut consecutive_skips: usize = 0;
loop {
if cancelled.load(Ordering::Relaxed) {
break;
}
while paused.load(Ordering::Relaxed) && !cancelled.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(250)).await;
}
if cancelled.load(Ordering::Relaxed) {
break;
}
let server_disabled = all_servers
.lock()
.iter()
.find(|s| s.id == primary_server.id)
.is_none_or(|s| !s.enabled);
if server_disabled {
info!(worker = %worker_id, server = %primary_server.name, "Server disabled, worker exiting");
break;
}
{
let health = server_health.lock();
if let Some(h) = health.get(&primary_server.id)
&& !h.is_available()
{
info!(
worker = %worker_id,
server = %primary_server.name,
reason = h.reason.as_deref().unwrap_or("unknown"),
"Server circuit-broken, worker exiting"
);
break;
}
}
let item = { work_queue.lock().pop_front() };
let Some(mut item) = item else {
debug!(worker = %worker_id, "Work queue empty, exiting");
break;
};
if item.tried_servers.contains(&primary_server.id) {
let queue_len = {
let mut q = work_queue.lock();
q.push_back(item);
q.len()
};
consecutive_skips += 1;
if consecutive_skips > queue_len {
debug!(worker = %worker_id, "No serviceable articles remaining, exiting");
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
consecutive_skips = 0;
let result =
fetch_article_with_retry(conn, &item, assembler, primary_server, worker_id).await;
match result {
Ok(process_result) => {
consecutive_errors = 0;
total_decode_us.fetch_add(process_result.decode_us, Ordering::Relaxed);
total_assemble_us.fetch_add(process_result.assemble_us, Ordering::Relaxed);
total_articles_decoded.fetch_add(1, Ordering::Relaxed);
if let Some(ref yname) = process_result.yenc_filename {
yenc_names
.lock()
.entry(item.file_id.clone())
.or_insert_with(|| crate::util::normalize_nfc(yname));
}
if let Some(n) = std::num::NonZeroU32::new(process_result.decoded_bytes as u32) {
let _ = bandwidth.acquire_download(n).await;
}
let _ = progress_tx.send(ProgressUpdate::ArticleComplete {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
decoded_bytes: process_result.decoded_bytes,
file_complete: process_result.file_complete,
server_id: Some(primary_server.id.clone()),
});
}
Err(ArticleError::ArticleNotFound) => {
handle_article_not_available(
&mut item,
primary_server,
all_servers,
server_health,
articles_failed,
work_queue,
progress_tx,
"Article not found on any server",
);
}
Err(ArticleError::ConnectionLost(msg)) => {
consecutive_errors += 1;
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
consecutive_errors,
max_reconnects = MAX_RECONNECT_ATTEMPTS,
article = %item.message_id,
"Connection lost: {msg}"
);
work_queue.lock().push_front(item);
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
consecutive_errors,
"Too many consecutive errors — worker exiting permanently"
);
break;
}
info!(
worker = %worker_id,
server = %primary_server.name,
delay_secs = RECONNECT_DELAY.as_secs(),
"Waiting before reconnect"
);
tokio::time::sleep(RECONNECT_DELAY).await;
*conn = NntpConnection::new(worker_id.to_string());
if let Err(e) =
connect_with_retry(conn, primary_server, worker_id, server_health, all_servers)
.await
{
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
"Reconnect FAILED: {e} — worker exiting"
);
break;
}
info!(worker = %worker_id, server = %primary_server.name, "Reconnected successfully");
}
Err(ArticleError::DecodeError(msg)) => {
handle_article_not_available(
&mut item,
primary_server,
all_servers,
server_health,
articles_failed,
work_queue,
progress_tx,
&format!("Decode error: {msg}"),
);
}
Err(ArticleError::AssemblyError(msg)) => {
articles_failed.fetch_add(1, Ordering::Relaxed);
error!(article = %item.message_id, "Assembly error: {msg}");
let _ = progress_tx.send(ProgressUpdate::ArticleFailed {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
error: format!("Assembly error: {msg}"),
server_id: Some(primary_server.id.clone()),
});
}
}
}
}
async fn connect_with_retry(
conn: &mut NntpConnection,
server: &ServerConfig,
worker_id: &str,
server_health: &ServerHealthMap,
all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
) -> Result<(), String> {
for attempt in 1..=MAX_RECONNECT_ATTEMPTS {
{
let health = server_health.lock();
if let Some(h) = health.get(&server.id)
&& !h.is_available()
{
return Err(format!(
"Server circuit-broken: {}",
h.reason.as_deref().unwrap_or("unknown")
));
}
}
let current_config = all_servers
.lock()
.iter()
.find(|s| s.id == server.id)
.cloned()
.unwrap_or_else(|| server.clone());
info!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
port = current_config.port,
attempt,
max_attempts = MAX_RECONNECT_ATTEMPTS,
"Connect attempt starting"
);
match conn.connect(¤t_config).await {
Ok(()) => {
info!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
attempt,
"Connect attempt succeeded"
);
server_health
.lock()
.entry(server.id.clone())
.or_default()
.record_success();
return Ok(());
}
Err(e) => {
let is_auth = matches!(e, NntpError::Auth(_) | NntpError::ServiceUnavailable(_));
{
let mut health = server_health.lock();
let entry = health.entry(server.id.clone()).or_default();
entry.record_failure(is_auth, &e.to_string());
if !entry.is_available() {
warn!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
error = %e,
cooldown_secs = if is_auth { AUTH_FAILURE_COOLDOWN.as_secs() } else { TRANSIENT_FAILURE_COOLDOWN.as_secs() },
"Server circuit-broken — stopping all connection attempts"
);
return Err(format!("Server circuit-broken: {e}"));
}
}
warn!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
attempt,
max_attempts = MAX_RECONNECT_ATTEMPTS,
error = %e,
is_auth,
"Connect attempt FAILED: {e}"
);
if is_auth {
return Err(format!("Auth/permission failure: {e}"));
}
if attempt < MAX_RECONNECT_ATTEMPTS {
info!(
worker = %worker_id,
server = %current_config.name,
delay_secs = RECONNECT_DELAY.as_secs(),
"Waiting before retry"
);
tokio::time::sleep(RECONNECT_DELAY).await;
*conn = NntpConnection::new(worker_id.to_string());
} else {
return Err(format!(
"All {MAX_RECONNECT_ATTEMPTS} connect attempts failed: {e}"
));
}
}
}
}
Err("Connect retry loop exited unexpectedly".into())
}
async fn fetch_article_with_retry(
conn: &mut NntpConnection,
item: &WorkItem,
assembler: &FileAssembler,
_server: &ServerConfig,
worker_id: &str,
) -> Result<ProcessResult, ArticleError> {
let mut last_error = None;
for attempt in 1..=MAX_TRIES_PER_SERVER {
let fetch_start = Instant::now();
match conn.fetch_article(&item.message_id).await {
Ok(response) => {
let fetch_us = fetch_start.elapsed().as_micros();
let raw_data = response.data.unwrap_or_default();
debug!(
worker = %worker_id,
article = %item.message_id,
raw_bytes = raw_data.len(),
fetch_us,
"NNTP fetch complete"
);
return decode_and_assemble(item, &raw_data, assembler);
}
Err(NntpError::ArticleNotFound(_)) => {
debug!(
worker = %worker_id,
article = %item.message_id,
"Article not found (430) — will try next server"
);
return Err(ArticleError::ArticleNotFound);
}
Err(e @ (NntpError::Connection(_) | NntpError::Io(_))) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
conn_state = ?conn.state,
"Connection/IO error during fetch — connection lost"
);
return Err(ArticleError::ConnectionLost(format!(
"Connection error on attempt {attempt}: {e}"
)));
}
Err(e @ NntpError::Tls(_)) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"TLS error during fetch — connection lost"
);
return Err(ArticleError::ConnectionLost(format!("TLS error: {e}")));
}
Err(e @ NntpError::ServiceUnavailable(_)) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"Service unavailable (502) during article fetch — likely rate limited or blocked"
);
return Err(ArticleError::ConnectionLost(format!(
"Service unavailable: {e}"
)));
}
Err(e @ NntpError::AuthRequired(_)) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"Auth required (480) during article fetch — session expired or rate limited"
);
return Err(ArticleError::ConnectionLost(format!(
"Auth required mid-session: {e}"
)));
}
Err(e) => {
last_error = Some(format!("{e}"));
if attempt < MAX_TRIES_PER_SERVER {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
max_tries = MAX_TRIES_PER_SERVER,
error = %e,
"Transient fetch error, retrying in 500ms"
);
tokio::time::sleep(Duration::from_millis(500)).await;
} else {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"All retries on this server exhausted"
);
}
}
}
}
Err(ArticleError::DecodeError(
last_error.unwrap_or_else(|| "Unknown error after retries".into()),
))
}
#[derive(Debug)]
struct ProcessResult {
decoded_bytes: u64,
file_complete: bool,
yenc_filename: Option<String>,
decode_us: u64,
assemble_us: u64,
}
#[derive(Debug, thiserror::Error)]
enum ArticleError {
#[error("Article not found on server")]
ArticleNotFound,
#[error("Connection lost: {0}")]
ConnectionLost(String),
#[error("Decode error: {0}")]
DecodeError(String),
#[error("Assembly error: {0}")]
AssemblyError(String),
}
fn decode_and_assemble(
item: &WorkItem,
raw_data: &[u8],
assembler: &FileAssembler,
) -> Result<ProcessResult, ArticleError> {
let decode_start = Instant::now();
let decoded = decode_yenc(raw_data).map_err(|e| {
ArticleError::DecodeError(format!(
"yEnc decode failed for {} seg {}: {e}",
item.filename, item.segment_number
))
})?;
let decode_us = decode_start.elapsed().as_micros();
let yenc_filename = decoded.filename;
let data_begin = decoded.part_begin.unwrap_or(0);
let decoded_len = decoded.data.len() as u64;
let assemble_start = Instant::now();
let file_complete = assembler
.assemble_article(
&item.job_id,
&item.file_id,
item.segment_number,
data_begin,
&decoded.data,
)
.map_err(|e| {
ArticleError::AssemblyError(format!(
"Assembly failed for {} seg {}: {e}",
item.filename, item.segment_number
))
})?;
let assemble_us = assemble_start.elapsed().as_micros();
debug!(
file = %item.filename,
segment = item.segment_number,
raw_bytes = raw_data.len(),
decoded_bytes = decoded_len,
decode_us,
assemble_us,
"Article decode+assemble timing"
);
Ok(ProcessResult {
decoded_bytes: decoded_len,
file_complete,
yenc_filename,
decode_us: decode_us as u64,
assemble_us: assemble_us as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn has_known_extension_recognizes_archives() {
assert!(has_known_extension("movie.rar"));
assert!(has_known_extension("movie.part01.rar"));
assert!(has_known_extension("file.zip"));
assert!(has_known_extension("file.7z"));
assert!(has_known_extension("archive.001"));
}
#[test]
fn has_known_extension_recognizes_video() {
assert!(has_known_extension("episode.mkv"));
assert!(has_known_extension("movie.mp4"));
assert!(has_known_extension("video.avi"));
assert!(has_known_extension("clip.ts"));
}
#[test]
fn has_known_extension_recognizes_par2() {
assert!(has_known_extension("file.par2"));
assert!(has_known_extension("file.vol00+01.par2"));
assert!(has_known_extension("file.vol015-031.par2"));
}
#[test]
fn has_known_extension_recognizes_misc() {
assert!(has_known_extension("info.nfo"));
assert!(has_known_extension("sub.srt"));
assert!(has_known_extension("cover.jpg"));
assert!(has_known_extension("song.flac"));
}
#[test]
fn has_known_extension_rejects_obfuscated_hashes() {
assert!(!has_known_extension("9b6a324d7560b87091685020371ba869"));
assert!(!has_known_extension("1fG1GP7L2263LHXH213HTNIxZsX7l0cv44BZ"));
assert!(!has_known_extension("DfKUx3bl7L6PSo6276WSaXSZ7"));
assert!(!has_known_extension("Q77O1ZxL237vc241z77hFoLBxl"));
}
#[test]
fn has_known_extension_rejects_unknown_extensions() {
assert!(!has_known_extension("file.xyz123"));
assert!(!has_known_extension("noext"));
assert!(!has_known_extension(""));
}
#[test]
fn has_known_extension_case_insensitive() {
assert!(has_known_extension("file.RAR"));
assert!(has_known_extension("file.MKV"));
assert!(has_known_extension("file.Par2"));
assert!(has_known_extension("file.MP4"));
}
#[test]
fn deobfuscation_direction_nzb_readable_yenc_hash() {
let nzb_name = "DTF.St.Louis.S01E04.part70.rar";
let yenc_name = "9b6a324d7560b87091685020371ba869";
let nzb_has_ext = has_known_extension(nzb_name);
let yenc_has_ext = has_known_extension(yenc_name);
assert!(nzb_has_ext, "NZB name should have known extension");
assert!(!yenc_has_ext, "yEnc hash should NOT have known extension");
}
#[test]
fn deobfuscation_direction_nzb_hash_yenc_readable() {
let nzb_name = "a8f3c72d1e4b5689";
let yenc_name = "movie.mkv";
let nzb_has_ext = has_known_extension(nzb_name);
let yenc_has_ext = has_known_extension(yenc_name);
assert!(!nzb_has_ext, "NZB hash should NOT have known extension");
assert!(yenc_has_ext, "yEnc name should have known extension");
}
#[test]
fn deobfuscation_direction_both_readable() {
let nzb_name = "file_from_subject.rar";
let yenc_name = "actual_file.rar";
assert!(has_known_extension(nzb_name));
assert!(has_known_extension(yenc_name));
}
#[test]
fn deobfuscation_direction_both_obfuscated() {
let nzb_name = "abc123def456";
let yenc_name = "789ghi012jkl";
assert!(!has_known_extension(nzb_name));
assert!(!has_known_extension(yenc_name));
}
}