use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use tokio::sync::{Notify, mpsc};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use nzb_core::config::ServerConfig;
use nzb_core::models::NzbJob;
use nzb_decode::FileAssembler;
use nzb_decode::yenc::decode_yenc;
use nzb_nntp::Pipeline;
use nzb_nntp::connection::NntpConnection;
use nzb_nntp::error::NntpError;
use crate::bandwidth::BandwidthLimiter;
const MAX_TRIES_PER_SERVER: u32 = 3;
const RECONNECT_DELAY: Duration = Duration::from_secs(5);
const MAX_RECONNECT_ATTEMPTS: u32 = 5;
const WORKER_RAMP_DELAY: Duration = Duration::from_millis(15);
const CIRCUIT_BREAK_THRESHOLD: u32 = 3;
const AUTH_FAILURE_COOLDOWN: Duration = Duration::from_secs(120);
const TRANSIENT_FAILURE_COOLDOWN: Duration = Duration::from_secs(30);
const SUPERVISOR_INTERVAL: Duration = Duration::from_secs(1);
const DEFAULT_MAX_WORKER_IDLE: Duration = Duration::from_secs(60);
const WORKER_IDLE_POLL: Duration = Duration::from_millis(500);
pub const PROGRESS_CHANNEL_CAPACITY: usize = 10_000;
fn try_send_progress(tx: &mpsc::Sender<ProgressUpdate>, job_id: &str, update: ProgressUpdate) {
if let Err(e) = tx.try_send(update) {
match e {
mpsc::error::TrySendError::Full(_) => {
warn!(
job_id,
capacity = PROGRESS_CHANNEL_CAPACITY,
"Progress channel full — dropping update (handler backpressure)"
);
}
mpsc::error::TrySendError::Closed(_) => {
}
}
}
}
pub struct ConnectionTracker {
pools: Mutex<HashMap<String, ServerSlot>>,
}
#[derive(Clone)]
struct ServerSlot {
name: String,
limit: usize,
semaphore: Arc<tokio::sync::Semaphore>,
}
impl ConnectionTracker {
pub fn new() -> Self {
Self {
pools: Mutex::new(HashMap::new()),
}
}
pub fn set_limit(&self, server_id: &str, server_name: &str, limit: usize) {
let mut pools = self.pools.lock();
match pools.get_mut(server_id) {
Some(slot) if slot.limit == limit && slot.name == server_name => {
}
Some(slot) if limit > slot.limit && slot.name == server_name => {
let added = limit - slot.limit;
let old = slot.limit;
slot.semaphore.add_permits(added);
slot.limit = limit;
info!(
server_id,
server = %server_name,
old_limit = old,
new_limit = limit,
added,
"Connection pool grew in place"
);
}
existing => {
let (prev_limit, prev_name) = match existing {
Some(s) => (Some(s.limit), Some(s.name.clone())),
None => (None, None),
};
pools.insert(
server_id.to_string(),
ServerSlot {
name: server_name.to_string(),
limit,
semaphore: Arc::new(tokio::sync::Semaphore::new(limit)),
},
);
if let Some(prev) = prev_limit {
let renamed = prev_name.as_deref() != Some(server_name);
info!(
server_id,
server = %server_name,
old_limit = prev,
new_limit = limit,
renamed,
"Connection pool replaced (shrink or rename); old permits orphaned"
);
} else {
info!(
server_id,
server = %server_name,
limit,
"Connection pool created"
);
}
}
}
}
pub fn remove_server(&self, server_id: &str) {
self.pools.lock().remove(server_id);
}
pub async fn acquire(&self, server_id: &str) -> Option<ConnectionSlot> {
let server_slot = {
let pools = self.pools.lock();
pools.get(server_id).cloned()?
};
if server_slot.limit == 0 {
return None;
}
let permit = Arc::clone(&server_slot.semaphore)
.acquire_owned()
.await
.ok()?;
Some(ConnectionSlot {
server_id: server_id.to_string(),
server_name: server_slot.name,
semaphore_origin: server_slot.semaphore,
_permit: permit,
})
}
pub fn slot_is_current(&self, slot: &ConnectionSlot) -> bool {
matches!(self.slot_status(slot), SlotStatus::Current)
}
pub fn slot_status(&self, slot: &ConnectionSlot) -> SlotStatus {
let pools = self.pools.lock();
match pools.get(&slot.server_id) {
Some(server_slot) => {
if Arc::ptr_eq(&server_slot.semaphore, &slot.semaphore_origin) {
SlotStatus::Current
} else {
SlotStatus::PoolReplaced
}
}
None => SlotStatus::ServerRemoved,
}
}
pub fn snapshot(&self) -> Vec<(String, usize, usize)> {
let pools = self.pools.lock();
pools
.iter()
.map(|(id, slot)| {
let active = slot
.limit
.saturating_sub(slot.semaphore.available_permits());
(id.clone(), active, slot.limit)
})
.collect()
}
pub fn total(&self) -> usize {
let pools = self.pools.lock();
pools
.values()
.map(|s| s.limit.saturating_sub(s.semaphore.available_permits()))
.sum()
}
}
impl Default for ConnectionTracker {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SlotStatus {
Current,
PoolReplaced,
ServerRemoved,
}
pub struct ConnectionSlot {
server_id: String,
server_name: String,
semaphore_origin: Arc<tokio::sync::Semaphore>,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl ConnectionSlot {
pub fn server_id(&self) -> &str {
&self.server_id
}
pub fn server_name(&self) -> &str {
&self.server_name
}
}
#[derive(Debug)]
pub struct ServerHealth {
pub consecutive_failures: u32,
pub disabled_until: Option<Instant>,
pub reason: Option<String>,
pub is_auth_failure: bool,
}
impl Default for ServerHealth {
fn default() -> Self {
Self::new()
}
}
impl ServerHealth {
pub fn new() -> Self {
Self {
consecutive_failures: 0,
disabled_until: None,
reason: None,
is_auth_failure: false,
}
}
pub fn is_available(&self) -> bool {
match self.disabled_until {
None => true,
Some(until) => Instant::now() >= until,
}
}
pub fn record_failure(&mut self, is_auth: bool, reason: &str) {
self.consecutive_failures += 1;
self.is_auth_failure = is_auth;
self.reason = Some(reason.to_string());
if is_auth || self.consecutive_failures >= CIRCUIT_BREAK_THRESHOLD {
let cooldown = if is_auth {
AUTH_FAILURE_COOLDOWN
} else {
TRANSIENT_FAILURE_COOLDOWN
};
self.disabled_until = Some(Instant::now() + cooldown);
}
}
pub fn record_success(&mut self) {
*self = Self::new();
}
}
pub type ServerHealthMap = Arc<Mutex<HashMap<String, ServerHealth>>>;
#[derive(Debug, Clone)]
pub enum ProgressUpdate {
ArticleComplete {
job_id: String,
file_id: String,
segment_number: u32,
decoded_bytes: u64,
file_complete: bool,
server_id: Option<String>,
},
ArticleFailed {
job_id: String,
file_id: String,
segment_number: u32,
failure: crate::article_failure::ArticleFailure,
},
JobFinished {
job_id: String,
success: bool,
articles_failed: usize,
},
NoServersAvailable {
job_id: String,
reason: String,
},
JobAborted {
job_id: String,
reason: String,
},
}
#[derive(Debug, Clone)]
pub(crate) struct WorkItem {
pub(crate) job_id: String,
pub(crate) file_id: String,
pub(crate) filename: String,
pub(crate) message_id: String,
pub(crate) segment_number: u32,
pub(crate) tried_servers: Vec<String>,
pub(crate) tries_on_current: u32,
}
pub(crate) struct JobContext {
pub job_id: String,
pub work_dir: PathBuf,
pub assembler: Arc<FileAssembler>,
pub progress_tx: mpsc::Sender<ProgressUpdate>,
pub yenc_names: Arc<Mutex<HashMap<String, String>>>,
pub nzb_filenames: HashMap<String, String>,
pub articles_remaining: AtomicUsize,
pub articles_failed: AtomicUsize,
pub paused: AtomicBool,
pub cancelled: AtomicBool,
pub abort_reason: Mutex<Option<String>>,
pub total_decode_us: Arc<AtomicU64>,
pub total_assemble_us: Arc<AtomicU64>,
pub total_articles_decoded: Arc<AtomicU64>,
pub engine_start: Instant,
pub total_bytes: u64,
finished: AtomicBool,
}
pub(crate) type JobContextMap = Arc<Mutex<HashMap<String, Arc<JobContext>>>>;
impl JobContext {
fn new(
job: &NzbJob,
assembler: Arc<FileAssembler>,
progress_tx: mpsc::Sender<ProgressUpdate>,
total_articles: usize,
) -> Self {
let nzb_filenames = job
.files
.iter()
.map(|f| (f.id.clone(), f.filename.clone()))
.collect();
Self {
job_id: job.id.clone(),
work_dir: job.work_dir.clone(),
assembler,
progress_tx,
yenc_names: Arc::new(Mutex::new(HashMap::new())),
nzb_filenames,
articles_remaining: AtomicUsize::new(total_articles),
articles_failed: AtomicUsize::new(0),
paused: AtomicBool::new(false),
cancelled: AtomicBool::new(false),
abort_reason: Mutex::new(None),
total_decode_us: Arc::new(AtomicU64::new(0)),
total_assemble_us: Arc::new(AtomicU64::new(0)),
total_articles_decoded: Arc::new(AtomicU64::new(0)),
engine_start: Instant::now(),
total_bytes: job.total_bytes,
finished: AtomicBool::new(false),
}
}
pub(crate) fn resolve_one_public(&self) {
self.resolve_one();
}
pub(crate) fn emit_terminal_public(&self) {
self.emit_terminal();
}
fn resolve_one(&self) {
let prev = self.articles_remaining.fetch_sub(1, Ordering::Relaxed);
if prev != 1 {
return;
}
self.emit_terminal();
}
fn emit_terminal(&self) {
if self.finished.swap(true, Ordering::Relaxed) {
return;
}
self.deobfuscate_files();
let download_elapsed = self.engine_start.elapsed();
let decode_total_us = self.total_decode_us.load(Ordering::Relaxed);
let assemble_total_us = self.total_assemble_us.load(Ordering::Relaxed);
let articles_decoded = self.total_articles_decoded.load(Ordering::Relaxed);
let elapsed_us = download_elapsed.as_micros().max(1);
let throughput_mbps = (self.total_bytes as f64 / download_elapsed.as_secs_f64().max(0.001))
/ (1024.0 * 1024.0);
info!(
job_id = %self.job_id,
elapsed_secs = download_elapsed.as_secs_f64(),
total_bytes = self.total_bytes,
throughput_mbps = format!("{throughput_mbps:.2}"),
"Download phase complete"
);
info!(
job_id = %self.job_id,
articles_decoded,
decode_secs = format!("{:.3}", decode_total_us as f64 / 1_000_000.0),
assemble_secs = format!("{:.3}", assemble_total_us as f64 / 1_000_000.0),
decode_pct = format!("{:.1}", decode_total_us as f64 / elapsed_us as f64 * 100.0),
assemble_pct = format!("{:.1}", assemble_total_us as f64 / elapsed_us as f64 * 100.0),
"Decode timing summary (cumulative across all workers)"
);
let abort_reason = self.abort_reason.lock().clone();
if let Some(reason) = abort_reason {
try_send_progress(
&self.progress_tx,
&self.job_id,
ProgressUpdate::JobAborted {
job_id: self.job_id.clone(),
reason,
},
);
return;
}
let failed = self.articles_failed.load(Ordering::Relaxed);
try_send_progress(
&self.progress_tx,
&self.job_id,
ProgressUpdate::JobFinished {
job_id: self.job_id.clone(),
success: failed == 0,
articles_failed: failed,
},
);
}
fn deobfuscate_files(&self) {
let renames = self.yenc_names.lock();
for (file_id, yenc_name) in renames.iter() {
let Some(nzb_name) = self.nzb_filenames.get(file_id) else {
continue;
};
if nzb_name == yenc_name {
continue;
}
let clean_yenc = std::path::Path::new(yenc_name.as_str())
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(yenc_name);
if clean_yenc.is_empty() || nzb_name == clean_yenc {
continue;
}
let nzb_has_ext = has_known_extension(nzb_name);
let yenc_has_ext = has_known_extension(clean_yenc);
let (old_name, new_name) = if yenc_has_ext && !nzb_has_ext {
(nzb_name.as_str(), clean_yenc)
} else if nzb_has_ext && !yenc_has_ext {
continue;
} else if yenc_has_ext && nzb_has_ext {
(nzb_name.as_str(), clean_yenc)
} else {
continue;
};
let old_path = self.work_dir.join(old_name);
let new_path = self.work_dir.join(new_name);
if old_path.exists() && !new_path.exists() {
if let Err(e) = std::fs::rename(&old_path, &new_path) {
warn!(
job_id = %self.job_id,
from = %old_name,
to = %new_name,
"Failed to deobfuscate file: {e}"
);
} else {
info!(
job_id = %self.job_id,
from = %old_name,
to = %new_name,
"Deobfuscated file"
);
}
}
}
}
}
pub(crate) struct SharedWorkQueue {
inner: Mutex<InnerState>,
notify: Notify,
}
struct InnerState {
items: VecDeque<WorkItem>,
last_served: HashMap<String, String>,
}
impl SharedWorkQueue {
pub fn new() -> Self {
Self {
inner: Mutex::new(InnerState {
items: VecDeque::new(),
last_served: HashMap::new(),
}),
notify: Notify::new(),
}
}
pub fn submit_items(&self, mut items: Vec<WorkItem>) {
items.sort_by_key(|item| par2_sort_key(&item.filename));
let had_items = !items.is_empty();
{
let mut state = self.inner.lock();
state.items.reserve(items.len());
for item in items {
state.items.push_back(item);
}
}
if had_items {
self.notify.notify_waiters();
}
}
fn push_front(&self, item: WorkItem) {
self.inner.lock().items.push_front(item);
self.notify.notify_waiters();
}
fn push_back(&self, item: WorkItem) {
self.inner.lock().items.push_back(item);
self.notify.notify_waiters();
}
pub(crate) fn workable_count_for(
&self,
server_id: &str,
higher_priority_servers: &[String],
) -> (usize, usize) {
let state = self.inner.lock();
let total = state.items.len();
let workable = state
.items
.iter()
.filter(|i| !i.tried_servers.iter().any(|s| s == server_id))
.filter(|i| {
higher_priority_servers
.iter()
.all(|hp| i.tried_servers.contains(hp))
})
.count();
(workable, total)
}
fn pop_workable(
&self,
server_id: &str,
higher_priority_servers: &[String],
) -> Option<WorkItem> {
let mut state = self.inner.lock();
let eligible = |item: &WorkItem| -> bool {
!item.tried_servers.iter().any(|s| s == server_id)
&& higher_priority_servers
.iter()
.all(|hp| item.tried_servers.contains(hp))
};
let last_served = state.last_served.get(server_id).cloned();
let mut chosen = None;
if let Some(ref last) = last_served {
chosen = state
.items
.iter()
.position(|item| eligible(item) && item.job_id != *last);
}
if chosen.is_none() {
chosen = state.items.iter().position(eligible);
}
let idx = chosen?;
let item = state.items.remove(idx)?;
state
.last_served
.insert(server_id.to_string(), item.job_id.clone());
Some(item)
}
fn drain_job(&self, job_id: &str) -> Vec<WorkItem> {
let mut state = self.inner.lock();
let mut kept = VecDeque::with_capacity(state.items.len());
let mut drained = Vec::new();
while let Some(item) = state.items.pop_front() {
if item.job_id == job_id {
drained.push(item);
} else {
kept.push_back(item);
}
}
state.items = kept;
state.last_served.retain(|_, v| v != job_id);
drained
}
pub fn len(&self) -> usize {
self.inner.lock().items.len()
}
}
impl Default for SharedWorkQueue {
fn default() -> Self {
Self::new()
}
}
struct ActiveWorker {
shutdown: Arc<AtomicBool>,
last_progress: Arc<AtomicU64>,
handle: JoinHandle<()>,
}
pub struct WorkerPool {
work_queue: Arc<SharedWorkQueue>,
job_contexts: JobContextMap,
servers: Arc<Mutex<Vec<ServerConfig>>>,
server_health: ServerHealthMap,
bandwidth: Arc<BandwidthLimiter>,
conn_tracker: Arc<ConnectionTracker>,
stall_timeout: Option<Duration>,
created_at: Instant,
max_worker_idle: Mutex<Duration>,
starvation_log: Mutex<HashMap<String, Instant>>,
evictions: AtomicU64,
workers: Mutex<HashMap<String, Vec<ActiveWorker>>>,
shutdown: Arc<AtomicBool>,
supervisor_handle: Mutex<Option<JoinHandle<()>>>,
}
impl WorkerPool {
pub fn new(
servers: Arc<Mutex<Vec<ServerConfig>>>,
bandwidth: Arc<BandwidthLimiter>,
conn_tracker: Arc<ConnectionTracker>,
stall_timeout_secs: u64,
) -> Arc<Self> {
let stall_timeout = if stall_timeout_secs > 0 {
Some(Duration::from_secs(stall_timeout_secs))
} else {
None
};
Arc::new(Self {
work_queue: Arc::new(SharedWorkQueue::new()),
job_contexts: Arc::new(Mutex::new(HashMap::new())),
servers,
server_health: Arc::new(Mutex::new(HashMap::new())),
bandwidth,
conn_tracker,
stall_timeout,
created_at: Instant::now(),
max_worker_idle: Mutex::new(DEFAULT_MAX_WORKER_IDLE),
starvation_log: Mutex::new(HashMap::new()),
evictions: AtomicU64::new(0),
workers: Mutex::new(HashMap::new()),
shutdown: Arc::new(AtomicBool::new(false)),
supervisor_handle: Mutex::new(None),
})
}
pub fn set_max_worker_idle(&self, d: Duration) {
*self.max_worker_idle.lock() = d;
}
pub fn max_worker_idle(&self) -> Duration {
*self.max_worker_idle.lock()
}
fn elapsed_ms(&self) -> u64 {
self.created_at.elapsed().as_millis() as u64
}
fn created_at(&self) -> Instant {
self.created_at
}
fn higher_priority_servers(&self, my_priority: u8, my_server_id: &str) -> Vec<String> {
let servers = self.servers.lock();
let health = self.server_health.lock();
servers
.iter()
.filter(|s| s.enabled && s.priority < my_priority && s.id != my_server_id)
.filter(|s| health.get(&s.id).is_none_or(|h| h.is_available()))
.map(|s| s.id.clone())
.collect()
}
pub fn eviction_count(&self) -> u64 {
self.evictions.load(Ordering::Relaxed)
}
pub fn start(self: &Arc<Self>) {
self.reconcile_servers();
let this = Arc::clone(self);
let handle = tokio::spawn(async move {
this.supervisor_loop().await;
});
*self.supervisor_handle.lock() = Some(handle);
}
pub fn reconcile_servers(self: &Arc<Self>) {
if self.shutdown.load(Ordering::Relaxed) {
return;
}
let servers_snapshot: Vec<ServerConfig> = self.servers.lock().clone();
let mut workers = self.workers.lock();
let mut retire: Vec<String> = Vec::new();
for key in workers.keys() {
let still_active = servers_snapshot.iter().any(|s| s.enabled && &s.id == key);
if !still_active {
retire.push(key.clone());
}
}
for key in retire {
if let Some(list) = workers.remove(&key) {
for w in list {
w.shutdown.store(true, Ordering::Relaxed);
drop(w.handle);
}
}
}
for server in &servers_snapshot {
if !server.enabled {
continue;
}
let target = (server.connections as usize).min(500);
let entry = workers.entry(server.id.clone()).or_default();
while entry.len() > target {
if let Some(w) = entry.pop() {
w.shutdown.store(true, Ordering::Relaxed);
drop(w.handle);
}
}
let current = entry.len();
for conn_idx in current..target {
let worker_shutdown = Arc::new(AtomicBool::new(false));
let last_progress = Arc::new(AtomicU64::new(self.elapsed_ms()));
let pool = Arc::clone(self);
let server_clone = server.clone();
let ws_clone = Arc::clone(&worker_shutdown);
let lp_clone = Arc::clone(&last_progress);
let handle = tokio::spawn(async move {
pool_worker(pool, server_clone, conn_idx, ws_clone, lp_clone).await;
});
entry.push(ActiveWorker {
shutdown: worker_shutdown,
last_progress,
handle,
});
}
}
}
pub(crate) fn submit_job(self: &Arc<Self>, ctx: Arc<JobContext>, items: Vec<WorkItem>) {
let job_id = ctx.job_id.clone();
if items.is_empty() {
ctx.emit_terminal();
return;
}
self.job_contexts.lock().insert(job_id.clone(), ctx);
self.work_queue.submit_items(items);
debug!(job_id = %job_id, queue_len = self.work_queue.len(), "Job submitted to worker pool");
}
pub fn pause_job(&self, job_id: &str) {
if let Some(ctx) = self.job_contexts.lock().get(job_id) {
ctx.paused.store(true, Ordering::Relaxed);
}
}
pub fn resume_job(&self, job_id: &str) {
if let Some(ctx) = self.job_contexts.lock().get(job_id) {
ctx.paused.store(false, Ordering::Relaxed);
self.work_queue.notify.notify_waiters();
}
}
pub fn abort_job(&self, job_id: &str, reason: String) {
let ctx = self.job_contexts.lock().get(job_id).cloned();
let Some(ctx) = ctx else {
return;
};
*ctx.abort_reason.lock() = Some(reason);
ctx.cancelled.store(true, Ordering::Relaxed);
let drained = self.work_queue.drain_job(job_id);
for _ in drained {
ctx.resolve_one();
}
ctx.emit_terminal();
self.job_contexts.lock().remove(job_id);
}
pub fn cancel_job(&self, job_id: &str) {
let ctx = self.job_contexts.lock().remove(job_id);
let Some(ctx) = ctx else {
return;
};
ctx.cancelled.store(true, Ordering::Relaxed);
let _ = self.work_queue.drain_job(job_id);
}
fn mark_no_servers(&self, job_id: &str, reason: String) {
let ctx = self.job_contexts.lock().remove(job_id);
let Some(ctx) = ctx else {
return;
};
ctx.paused.store(true, Ordering::Relaxed);
try_send_progress(
&ctx.progress_tx,
&ctx.job_id,
ProgressUpdate::NoServersAvailable {
job_id: ctx.job_id.clone(),
reason,
},
);
let _ = self.work_queue.drain_job(job_id);
}
async fn supervisor_loop(self: Arc<Self>) {
let mut ticker = tokio::time::interval(SUPERVISOR_INTERVAL);
loop {
ticker.tick().await;
if self.shutdown.load(Ordering::Relaxed) {
break;
}
let now_ms = self.elapsed_ms();
let max_idle_ms = self.max_worker_idle().as_millis() as u64;
let server_priorities: Vec<(String, u8)> = {
let srv = self.servers.lock();
srv.iter()
.filter(|s| s.enabled)
.map(|s| (s.id.clone(), s.priority))
.collect()
};
let has_workable: HashMap<String, bool> = server_priorities
.iter()
.map(|(sid, prio)| {
let hp = self.higher_priority_servers(*prio, sid);
let (workable, _) = self.work_queue.workable_count_for(sid, &hp);
(sid.clone(), workable > 0)
})
.collect();
{
let workers = self.workers.lock();
for (server_id, list) in workers.iter() {
for (idx, w) in list.iter().enumerate() {
if w.shutdown.load(Ordering::Relaxed) {
continue;
}
let last = w.last_progress.load(Ordering::Relaxed);
let idle = now_ms.saturating_sub(last);
if idle > max_idle_ms {
if !has_workable.get(server_id).copied().unwrap_or(true) {
continue;
}
warn!(
server = %server_id,
worker_idx = idx,
idle_ms = idle,
max_idle_ms,
"Idle-worker watchdog: evicting stalled worker"
);
w.shutdown.store(true, Ordering::Relaxed);
self.evictions.fetch_add(1, Ordering::Relaxed);
}
}
}
}
{
let mut workers = self.workers.lock();
for (_id, list) in workers.iter_mut() {
list.retain(|w| !w.handle.is_finished());
}
}
self.reconcile_servers();
let enabled_servers: Vec<String> =
server_priorities.iter().map(|(id, _)| id.clone()).collect();
let now_instant = Instant::now();
for (sid, prio) in &server_priorities {
let hp = self.higher_priority_servers(*prio, sid);
let (workable, total) = self.work_queue.workable_count_for(sid, &hp);
if workable == 0 && total > 0 {
let mut log = self.starvation_log.lock();
let should_log = log
.get(sid)
.map(|t| now_instant.duration_since(*t) >= Duration::from_secs(60))
.unwrap_or(true);
if should_log {
log.insert(sid.clone(), now_instant);
let reason = if hp.is_empty() {
"every item has been tried here already"
} else {
"every item has been tried here, or is waiting for a higher-priority server"
};
info!(
server = %sid,
total_items = total,
higher_priority_servers = hp.len(),
"Queue has items but none are workable for this server ({reason})"
);
}
}
}
if enabled_servers.is_empty() {
continue;
}
let healthy_servers: Vec<String> = {
let health = self.server_health.lock();
enabled_servers
.iter()
.filter(|sid| health.get(sid.as_str()).is_none_or(|h| h.is_available()))
.cloned()
.collect()
};
let all_broken = healthy_servers.is_empty();
let ctxs: Vec<Arc<JobContext>> = self.job_contexts.lock().values().cloned().collect();
for ctx in ctxs {
if ctx.articles_remaining.load(Ordering::Relaxed) == 0 {
continue;
}
if ctx.cancelled.load(Ordering::Relaxed) {
continue;
}
if all_broken {
let reason = {
let health = self.server_health.lock();
health
.values()
.filter_map(|h| h.reason.clone())
.next()
.unwrap_or_else(|| "All servers unavailable".into())
};
warn!(
job_id = %ctx.job_id,
remaining = ctx.articles_remaining.load(Ordering::Relaxed),
"All servers circuit-broken — pausing job for user intervention"
);
self.mark_no_servers(&ctx.job_id, reason);
}
}
}
}
pub async fn shutdown(self: &Arc<Self>) {
self.shutdown.store(true, Ordering::Relaxed);
let handles: Vec<JoinHandle<()>> = {
let mut workers = self.workers.lock();
let mut out = Vec::new();
for (_id, list) in workers.drain() {
for w in list {
w.shutdown.store(true, Ordering::Relaxed);
out.push(w.handle);
}
}
out
};
self.work_queue.notify.notify_waiters();
let timeout = Duration::from_secs(10);
for h in handles {
let _ = tokio::time::timeout(timeout, h).await;
}
if let Some(h) = self.supervisor_handle.lock().take() {
h.abort();
}
}
pub fn conn_tracker(&self) -> &Arc<ConnectionTracker> {
&self.conn_tracker
}
pub fn has_job(&self, job_id: &str) -> bool {
self.job_contexts.lock().contains_key(job_id)
}
}
async fn pool_worker(
pool: Arc<WorkerPool>,
primary_server: ServerConfig,
conn_idx: usize,
worker_shutdown: Arc<AtomicBool>,
last_progress: Arc<AtomicU64>,
) {
let worker_id = format!("{}#{}", primary_server.id, conn_idx);
if conn_idx > 0 {
let stagger = WORKER_RAMP_DELAY * conn_idx as u32;
tokio::time::sleep(stagger).await;
}
let should_exit = |worker_shutdown: &Arc<AtomicBool>, pool: &Arc<WorkerPool>| {
worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed)
};
let mut conn_slot = match pool.conn_tracker.acquire(&primary_server.id).await {
Some(slot) => slot,
None => {
info!(
worker = %worker_id,
server = %primary_server.name,
"No connection slot available (server removed or limit=0); worker exiting"
);
return;
}
};
'reconnect: loop {
if should_exit(&worker_shutdown, &pool) {
return;
}
match pool.conn_tracker.slot_status(&conn_slot) {
SlotStatus::Current => {}
SlotStatus::PoolReplaced => {
info!(
worker = %worker_id,
server = %primary_server.name,
reason = "pool_replaced",
"Connection slot is stale (connection limit changed); worker exiting"
);
return;
}
SlotStatus::ServerRemoved => {
info!(
worker = %worker_id,
server = %primary_server.name,
reason = "server_removed",
"Connection slot is stale (server removed from tracker); worker exiting"
);
return;
}
}
let circuit_broken = {
let health = pool.server_health.lock();
health
.get(&primary_server.id)
.is_some_and(|h| !h.is_available())
};
if circuit_broken {
tokio::time::sleep(WORKER_IDLE_POLL).await;
continue 'reconnect;
}
info!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
port = primary_server.port,
ssl = primary_server.ssl,
conn_idx,
"Worker starting — connecting to primary server"
);
let mut conn = NntpConnection::new(worker_id.clone());
conn.set_io_heartbeat(last_progress.clone(), pool.created_at());
if let Err(e) = connect_with_retry(
&mut conn,
&primary_server,
&worker_id,
&pool.server_health,
&pool.servers,
)
.await
{
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
"Worker FAILED to connect after all retries: {e}"
);
if should_exit(&worker_shutdown, &pool) {
return;
}
tokio::time::sleep(RECONNECT_DELAY).await;
continue 'reconnect;
}
let pipe_depth = primary_server.pipelining.max(1);
let active_conns = pool.conn_tracker.total();
info!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
pipelining = pipe_depth,
total_nntp_connections = active_conns,
"Worker connected and ready"
);
let reconnect_needed = if pipe_depth <= 1 {
run_worker_serial(
&pool,
&primary_server,
&worker_id,
&worker_shutdown,
&mut conn,
&mut conn_slot,
&last_progress,
)
.await
} else {
run_worker_pipelined(
&pool,
&primary_server,
&worker_id,
pipe_depth,
&worker_shutdown,
&mut conn,
&mut conn_slot,
&last_progress,
)
.await
};
let _ = conn.quit().await;
match reconnect_needed {
WorkerExit::Reconnect => {
continue 'reconnect;
}
WorkerExit::Exit => {
return;
}
}
}
}
enum WorkerExit {
Exit,
Reconnect,
}
async fn next_work_item(
pool: &Arc<WorkerPool>,
server_id: &str,
higher_priority_servers: &[String],
worker_shutdown: &Arc<AtomicBool>,
) -> Option<(WorkItem, Arc<JobContext>)> {
loop {
if worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed) {
return None;
}
if let Some(item) = pool
.work_queue
.pop_workable(server_id, higher_priority_servers)
{
let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
let Some(ctx) = ctx else {
continue;
};
if ctx.cancelled.load(Ordering::Relaxed) {
continue;
}
if ctx.paused.load(Ordering::Relaxed) {
pool.work_queue.push_back(item);
tokio::time::sleep(WORKER_IDLE_POLL).await;
continue;
}
return Some((item, ctx));
}
let notified = pool.work_queue.notify.notified();
tokio::select! {
_ = notified => {}
_ = tokio::time::sleep(WORKER_IDLE_POLL) => {}
}
}
}
async fn run_worker_serial(
pool: &Arc<WorkerPool>,
primary_server: &ServerConfig,
worker_id: &str,
worker_shutdown: &Arc<AtomicBool>,
conn: &mut NntpConnection,
_conn_slot: &mut ConnectionSlot,
last_progress: &Arc<AtomicU64>,
) -> WorkerExit {
let mut consecutive_errors: u32 = 0;
loop {
let server_disabled = pool
.servers
.lock()
.iter()
.find(|s| s.id == primary_server.id)
.is_none_or(|s| !s.enabled);
if server_disabled {
info!(
worker = %worker_id,
server = %primary_server.name,
"Server disabled, worker exiting"
);
return WorkerExit::Exit;
}
{
let health = pool.server_health.lock();
if let Some(h) = health.get(&primary_server.id)
&& !h.is_available()
{
info!(
worker = %worker_id,
server = %primary_server.name,
reason = h.reason.as_deref().unwrap_or("unknown"),
"Server circuit-broken, worker reconnecting after cooldown"
);
return WorkerExit::Reconnect;
}
}
let higher_priority_servers =
pool.higher_priority_servers(primary_server.priority, &primary_server.id);
let Some((mut item, ctx)) = next_work_item(
pool,
&primary_server.id,
&higher_priority_servers,
worker_shutdown,
)
.await
else {
return WorkerExit::Exit;
};
let fetch_fut =
fetch_article_with_retry(conn, &item, &ctx.assembler, primary_server, worker_id);
let result = if let Some(timeout) = pool.stall_timeout {
match tokio::time::timeout(timeout, fetch_fut).await {
Ok(r) => r,
Err(_) => {
warn!(
worker = %worker_id,
server = %primary_server.name,
article = %item.message_id,
"Connection stalled — no response within {}s, reconnecting",
timeout.as_secs()
);
pool.work_queue.push_front(item);
return WorkerExit::Reconnect;
}
}
} else {
fetch_fut.await
};
match result {
Ok(process_result) => {
consecutive_errors = 0;
ctx.total_decode_us
.fetch_add(process_result.decode_us, Ordering::Relaxed);
ctx.total_assemble_us
.fetch_add(process_result.assemble_us, Ordering::Relaxed);
ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
if let Some(ref yname) = process_result.yenc_filename {
ctx.yenc_names
.lock()
.entry(item.file_id.clone())
.or_insert_with(|| crate::util::normalize_nfc(yname));
}
if let Some(n) = std::num::NonZeroU32::new(process_result.decoded_bytes as u32) {
let _ = pool.bandwidth.acquire_download(n).await;
}
try_send_progress(
&ctx.progress_tx,
&item.job_id,
ProgressUpdate::ArticleComplete {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
decoded_bytes: process_result.decoded_bytes,
file_complete: process_result.file_complete,
server_id: Some(primary_server.id.clone()),
},
);
ctx.resolve_one();
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
Err(ArticleError::ArticleNotFound) => {
if handle_article_not_available(
&mut item,
primary_server,
&pool.servers,
&pool.server_health,
&ctx,
&pool.work_queue,
crate::article_failure::ArticleFailureKind::NotFound,
"Article not found on any server",
) {
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
}
Err(ArticleError::ConnectionLost(msg)) => {
consecutive_errors += 1;
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
consecutive_errors,
max_reconnects = MAX_RECONNECT_ATTEMPTS,
article = %item.message_id,
"Connection lost: {msg}"
);
pool.work_queue.push_front(item);
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
consecutive_errors,
"Too many consecutive errors — worker reconnecting"
);
return WorkerExit::Reconnect;
}
return WorkerExit::Reconnect;
}
Err(ArticleError::DecodeError(msg)) => {
if handle_article_not_available(
&mut item,
primary_server,
&pool.servers,
&pool.server_health,
&ctx,
&pool.work_queue,
crate::article_failure::ArticleFailureKind::DecodeError,
&format!("Decode error: {msg}"),
) {
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
}
Err(ArticleError::AssemblyError(msg)) => {
error!(article = %item.message_id, "Assembly error: {msg}");
try_send_progress(
&ctx.progress_tx,
&item.job_id,
ProgressUpdate::ArticleFailed {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
failure: crate::article_failure::ArticleFailure::decode_error(
&primary_server.id,
format!("Assembly error: {msg}"),
),
},
);
ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
ctx.resolve_one();
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn run_worker_pipelined(
pool: &Arc<WorkerPool>,
primary_server: &ServerConfig,
worker_id: &str,
pipe_depth: u8,
worker_shutdown: &Arc<AtomicBool>,
conn: &mut NntpConnection,
_conn_slot: &mut ConnectionSlot,
last_progress: &Arc<AtomicU64>,
) -> WorkerExit {
let mut pipeline = Pipeline::new(pipe_depth);
let mut in_flight_items: HashMap<u64, WorkItem> = HashMap::new();
let mut next_tag: u64 = 0;
let mut consecutive_errors: u32 = 0;
let mut perf_articles: u64 = 0;
let mut perf_bytes: u64 = 0;
let mut perf_queue_lock_us: u64 = 0;
let mut perf_receive_us: u64 = 0;
let mut perf_decode_us: u64 = 0;
let mut perf_assemble_us: u64 = 0;
let mut perf_bandwidth_us: u64 = 0;
let mut perf_yield_us: u64 = 0;
let mut perf_flush_us: u64 = 0;
let mut perf_last_log = Instant::now();
const PERF_LOG_INTERVAL: Duration = Duration::from_secs(10);
loop {
if worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed) {
requeue_all(&mut in_flight_items, &pool.work_queue);
return WorkerExit::Exit;
}
let server_disabled = pool
.servers
.lock()
.iter()
.find(|s| s.id == primary_server.id)
.is_none_or(|s| !s.enabled);
if server_disabled {
info!(
worker = %worker_id,
server = %primary_server.name,
"Server disabled, worker exiting"
);
requeue_all(&mut in_flight_items, &pool.work_queue);
return WorkerExit::Exit;
}
{
let health = pool.server_health.lock();
if let Some(h) = health.get(&primary_server.id)
&& !h.is_available()
{
info!(
worker = %worker_id,
server = %primary_server.name,
reason = h.reason.as_deref().unwrap_or("unknown"),
"Server circuit-broken, worker reconnecting after cooldown"
);
requeue_all(&mut in_flight_items, &pool.work_queue);
return WorkerExit::Reconnect;
}
}
let higher_priority_servers =
pool.higher_priority_servers(primary_server.priority, &primary_server.id);
while pipeline.pending_count() + pipeline.in_flight_count() < pipe_depth as usize {
let lock_t = Instant::now();
let item = pool
.work_queue
.pop_workable(&primary_server.id, &higher_priority_servers);
perf_queue_lock_us += lock_t.elapsed().as_micros() as u64;
let Some(item) = item else {
break;
};
let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
let Some(ctx) = ctx else {
continue;
};
if ctx.cancelled.load(Ordering::Relaxed) {
continue;
}
if ctx.paused.load(Ordering::Relaxed) {
pool.work_queue.push_back(item);
break;
}
let tag = next_tag;
next_tag += 1;
pipeline.submit(item.message_id.clone(), tag);
in_flight_items.insert(tag, item);
}
if pipeline.is_empty() && in_flight_items.is_empty() {
let Some((first_item, ctx)) = next_work_item(
pool,
&primary_server.id,
&higher_priority_servers,
worker_shutdown,
)
.await
else {
return WorkerExit::Exit;
};
let _ = ctx; let tag = next_tag;
next_tag += 1;
pipeline.submit(first_item.message_id.clone(), tag);
in_flight_items.insert(tag, first_item);
}
let flush_t = Instant::now();
if let Err(e) = pipeline.flush_sends(conn).await {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
error = %e,
in_flight = in_flight_items.len(),
"Pipeline send error — re-queuing all in-flight items"
);
requeue_all(&mut in_flight_items, &pool.work_queue);
consecutive_errors += 1;
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
warn!(
worker = %worker_id,
server = %primary_server.name,
consecutive_errors,
"Too many pipeline errors — worker reconnecting"
);
return WorkerExit::Reconnect;
}
tokio::time::sleep(RECONNECT_DELAY).await;
return WorkerExit::Reconnect;
}
perf_flush_us += flush_t.elapsed().as_micros() as u64;
let recv_t = Instant::now();
trace!(
worker = %worker_id,
in_flight = in_flight_items.len(),
stall_timeout_secs = pool.stall_timeout.map(|d| d.as_secs()).unwrap_or(0),
"Pipeline: awaiting response"
);
let result = if let Some(timeout) = pool.stall_timeout {
match tokio::time::timeout(timeout, pipeline.receive_one(conn)).await {
Ok(r) => r,
Err(_) => {
let elapsed_ms = recv_t.elapsed().as_millis();
warn!(
worker = %worker_id,
server = %primary_server.name,
elapsed_ms,
in_flight = in_flight_items.len(),
"Connection stalled — no response within {}s, reconnecting",
timeout.as_secs()
);
requeue_all(&mut in_flight_items, &pool.work_queue);
return WorkerExit::Reconnect;
}
}
} else {
pipeline.receive_one(conn).await
};
perf_receive_us += recv_t.elapsed().as_micros() as u64;
match result {
Ok(Some(pipe_result)) => {
let Some(mut item) = in_flight_items.remove(&pipe_result.request.tag) else {
continue;
};
let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
let Some(ctx) = ctx else {
continue;
};
if ctx.cancelled.load(Ordering::Relaxed) {
continue;
}
match pipe_result.result {
Ok(response) => {
consecutive_errors = 0;
let raw_data = response.data.unwrap_or_default();
let yield_t = Instant::now();
tokio::task::yield_now().await;
perf_yield_us += yield_t.elapsed().as_micros() as u64;
match decode_and_assemble(&item, &raw_data, &ctx.assembler) {
Ok(process_result) => {
perf_decode_us += process_result.decode_us;
perf_assemble_us += process_result.assemble_us;
perf_bytes += process_result.decoded_bytes;
perf_articles += 1;
ctx.total_decode_us
.fetch_add(process_result.decode_us, Ordering::Relaxed);
ctx.total_assemble_us
.fetch_add(process_result.assemble_us, Ordering::Relaxed);
ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
if let Some(ref yname) = process_result.yenc_filename {
ctx.yenc_names
.lock()
.entry(item.file_id.clone())
.or_insert_with(|| crate::util::normalize_nfc(yname));
}
let bw_t = Instant::now();
if let Some(n) =
std::num::NonZeroU32::new(process_result.decoded_bytes as u32)
{
let _ = pool.bandwidth.acquire_download(n).await;
}
perf_bandwidth_us += bw_t.elapsed().as_micros() as u64;
try_send_progress(
&ctx.progress_tx,
&item.job_id,
ProgressUpdate::ArticleComplete {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
decoded_bytes: process_result.decoded_bytes,
file_complete: process_result.file_complete,
server_id: Some(primary_server.id.clone()),
},
);
ctx.resolve_one();
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
if perf_last_log.elapsed() >= PERF_LOG_INTERVAL {
let elapsed = perf_last_log.elapsed().as_secs_f64();
let mbps = perf_bytes as f64 / elapsed / (1024.0 * 1024.0);
info!(
worker = %worker_id,
articles = perf_articles,
throughput_mbps = format!("{mbps:.1}"),
recv_ms = perf_receive_us / 1000,
decode_ms = perf_decode_us / 1000,
assemble_ms = perf_assemble_us / 1000,
queue_lock_ms = perf_queue_lock_us / 1000,
flush_ms = perf_flush_us / 1000,
yield_ms = perf_yield_us / 1000,
bw_wait_ms = perf_bandwidth_us / 1000,
"Worker perf summary"
);
perf_articles = 0;
perf_bytes = 0;
perf_queue_lock_us = 0;
perf_receive_us = 0;
perf_decode_us = 0;
perf_assemble_us = 0;
perf_bandwidth_us = 0;
perf_yield_us = 0;
perf_flush_us = 0;
perf_last_log = Instant::now();
}
}
Err(ArticleError::DecodeError(msg)) => {
if handle_article_not_available(
&mut item,
primary_server,
&pool.servers,
&pool.server_health,
&ctx,
&pool.work_queue,
crate::article_failure::ArticleFailureKind::DecodeError,
&format!("Decode error: {msg}"),
) {
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
}
Err(ArticleError::AssemblyError(msg)) => {
error!(article = %item.message_id, "Assembly error: {msg}");
try_send_progress(
&ctx.progress_tx,
&item.job_id,
ProgressUpdate::ArticleFailed {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
failure:
crate::article_failure::ArticleFailure::decode_error(
&primary_server.id,
format!("Assembly error: {msg}"),
),
},
);
ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
ctx.resolve_one();
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
Err(_) => {}
}
}
Err(NntpError::ArticleNotFound(_)) => {
if handle_article_not_available(
&mut item,
primary_server,
&pool.servers,
&pool.server_health,
&ctx,
&pool.work_queue,
crate::article_failure::ArticleFailureKind::NotFound,
"Article not found on any server",
) {
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
}
Err(NntpError::Connection(_) | NntpError::Io(_)) => {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
article = %item.message_id,
in_flight = in_flight_items.len(),
consecutive_errors,
"Pipeline: connection lost during receive — re-queuing all"
);
pool.work_queue.push_front(item);
requeue_all(&mut in_flight_items, &pool.work_queue);
consecutive_errors += 1;
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
return WorkerExit::Reconnect;
}
tokio::time::sleep(RECONNECT_DELAY).await;
return WorkerExit::Reconnect;
}
Err(e) => {
warn!(worker = %worker_id, article = %item.message_id, "Pipeline error: {e}");
let kind = crate::article_failure::ArticleFailure::from_nntp(
&e,
&primary_server.id,
)
.kind;
if handle_article_not_available(
&mut item,
primary_server,
&pool.servers,
&pool.server_health,
&ctx,
&pool.work_queue,
kind,
&format!("Pipeline error: {e}"),
) {
last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
}
}
}
}
Ok(None) => {
}
Err(e) => {
warn!(
worker = %worker_id,
server = %primary_server.name,
host = %primary_server.host,
error = %e,
in_flight = in_flight_items.len(),
consecutive_errors,
"Pipeline receive error"
);
requeue_all(&mut in_flight_items, &pool.work_queue);
consecutive_errors += 1;
if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
return WorkerExit::Reconnect;
}
tokio::time::sleep(RECONNECT_DELAY).await;
return WorkerExit::Reconnect;
}
}
}
}
async fn connect_with_retry(
conn: &mut NntpConnection,
server: &ServerConfig,
worker_id: &str,
server_health: &ServerHealthMap,
all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
) -> Result<(), String> {
for attempt in 1..=MAX_RECONNECT_ATTEMPTS {
{
let health = server_health.lock();
if let Some(h) = health.get(&server.id)
&& !h.is_available()
{
return Err(format!(
"Server circuit-broken: {}",
h.reason.as_deref().unwrap_or("unknown")
));
}
}
let current_config = all_servers
.lock()
.iter()
.find(|s| s.id == server.id)
.cloned()
.unwrap_or_else(|| server.clone());
info!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
port = current_config.port,
attempt,
max_attempts = MAX_RECONNECT_ATTEMPTS,
"Connect attempt starting"
);
match conn.connect(¤t_config).await {
Ok(()) => {
info!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
attempt,
"Connect attempt succeeded"
);
server_health
.lock()
.entry(server.id.clone())
.or_default()
.record_success();
return Ok(());
}
Err(e) => {
let is_auth = matches!(e, NntpError::Auth(_) | NntpError::ServiceUnavailable(_));
{
let mut health = server_health.lock();
let entry = health.entry(server.id.clone()).or_default();
entry.record_failure(is_auth, &e.to_string());
if !entry.is_available() {
warn!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
error = %e,
cooldown_secs = if is_auth { AUTH_FAILURE_COOLDOWN.as_secs() } else { TRANSIENT_FAILURE_COOLDOWN.as_secs() },
"Server circuit-broken — stopping all connection attempts"
);
return Err(format!("Server circuit-broken: {e}"));
}
}
warn!(
worker = %worker_id,
server = %current_config.name,
host = %current_config.host,
attempt,
max_attempts = MAX_RECONNECT_ATTEMPTS,
error = %e,
is_auth,
"Connect attempt FAILED: {e}"
);
if is_auth {
return Err(format!("Auth/permission failure: {e}"));
}
if attempt < MAX_RECONNECT_ATTEMPTS {
info!(
worker = %worker_id,
server = %current_config.name,
delay_secs = RECONNECT_DELAY.as_secs(),
"Waiting before retry"
);
tokio::time::sleep(RECONNECT_DELAY).await;
*conn = NntpConnection::new(worker_id.to_string());
} else {
return Err(format!(
"All {MAX_RECONNECT_ATTEMPTS} connect attempts failed: {e}"
));
}
}
}
}
Err("Connect retry loop exited unexpectedly".into())
}
#[allow(clippy::too_many_arguments)]
fn handle_article_not_available(
item: &mut WorkItem,
primary_server: &ServerConfig,
all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
server_health: &ServerHealthMap,
ctx: &Arc<JobContext>,
work_queue: &Arc<SharedWorkQueue>,
kind: crate::article_failure::ArticleFailureKind,
error_msg: &str,
) -> bool {
item.tried_servers.push(primary_server.id.clone());
item.tries_on_current = 0;
let all_tried = {
let servers = all_servers.lock();
let health = server_health.lock();
servers.iter().filter(|s| s.enabled).all(|s| {
item.tried_servers.contains(&s.id)
|| health.get(&s.id).is_some_and(|h| !h.is_available())
})
};
debug!(
article = %item.message_id,
server = %primary_server.id,
kind = kind.as_str(),
tried_count = item.tried_servers.len(),
all_tried,
"Article returned error on this server"
);
if all_tried {
warn!(article = %item.message_id, kind = kind.as_str(), "{error_msg}");
let final_failure = if kind == crate::article_failure::ArticleFailureKind::DecodeError {
crate::article_failure::ArticleFailure::decode_error(
&primary_server.id,
error_msg.to_string(),
)
} else {
crate::article_failure::ArticleFailure::not_found_anywhere(&primary_server.id)
};
try_send_progress(
&ctx.progress_tx,
&item.job_id,
ProgressUpdate::ArticleFailed {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
failure: final_failure,
},
);
ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
ctx.resolve_one();
true
} else {
work_queue.push_front(item.clone());
false
}
}
fn requeue_all(in_flight: &mut HashMap<u64, WorkItem>, work_queue: &Arc<SharedWorkQueue>) {
let items: Vec<WorkItem> = in_flight.drain().map(|(_, item)| item).collect();
for item in items {
work_queue.push_front(item);
}
}
fn par2_sort_key(filename: &str) -> u8 {
let lower = filename.to_lowercase();
if lower.ends_with(".par2") {
if lower.contains(".vol") { 1 } else { 0 }
} else {
2
}
}
fn has_known_extension(name: &str) -> bool {
let lower = name.to_lowercase();
if let Some(dot_pos) = lower.rfind('.') {
let ext = &lower[dot_pos + 1..];
matches!(
ext,
"rar"
| "r00"
| "r01"
| "r02"
| "r03"
| "r04"
| "r05"
| "zip"
| "7z"
| "gz"
| "bz2"
| "xz"
| "tar"
| "mkv"
| "mp4"
| "avi"
| "wmv"
| "ts"
| "m4v"
| "mov"
| "mpg"
| "mpeg"
| "mp3"
| "flac"
| "ogg"
| "m4a"
| "aac"
| "wav"
| "srt"
| "sub"
| "idx"
| "ass"
| "ssa"
| "sup"
| "nfo"
| "jpg"
| "jpeg"
| "png"
| "gif"
| "bmp"
| "par2"
| "001"
| "002"
| "003"
| "004"
| "005"
)
} else {
false
}
}
pub(crate) fn build_job_submission(
job: &NzbJob,
progress_tx: mpsc::Sender<ProgressUpdate>,
) -> (Arc<JobContext>, Vec<WorkItem>) {
let assembler = Arc::new(FileAssembler::new());
for file in &job.files {
let output_path = job.work_dir.join(&file.filename);
if let Err(e) =
assembler.register_file(&job.id, &file.id, output_path, file.articles.len() as u32)
{
error!(file = %file.filename, "Failed to register file for assembly: {e}");
}
}
let work_items: Vec<WorkItem> = job
.files
.iter()
.flat_map(|file| {
file.articles
.iter()
.enumerate()
.filter(|(_, a)| !a.downloaded)
.map(move |(idx, article)| WorkItem {
job_id: job.id.clone(),
file_id: file.id.clone(),
filename: file.filename.clone(),
message_id: article.message_id.clone(),
segment_number: (idx as u32) + 1,
tried_servers: Vec::new(),
tries_on_current: 0,
})
})
.collect();
let total_remaining = work_items.len();
let ctx = Arc::new(JobContext::new(
job,
assembler,
progress_tx,
total_remaining,
));
(ctx, work_items)
}
async fn fetch_article_with_retry(
conn: &mut NntpConnection,
item: &WorkItem,
assembler: &FileAssembler,
_server: &ServerConfig,
worker_id: &str,
) -> Result<ProcessResult, ArticleError> {
let mut last_error = None;
for attempt in 1..=MAX_TRIES_PER_SERVER {
let fetch_start = Instant::now();
match conn.fetch_article(&item.message_id).await {
Ok(response) => {
let fetch_us = fetch_start.elapsed().as_micros();
let raw_data = response.data.unwrap_or_default();
debug!(
worker = %worker_id,
article = %item.message_id,
raw_bytes = raw_data.len(),
fetch_us,
"NNTP fetch complete"
);
return decode_and_assemble(item, &raw_data, assembler);
}
Err(NntpError::ArticleNotFound(_)) => {
debug!(
worker = %worker_id,
article = %item.message_id,
"Article not found (430) — will try next server"
);
return Err(ArticleError::ArticleNotFound);
}
Err(e @ (NntpError::Connection(_) | NntpError::Io(_))) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
conn_state = ?conn.state,
"Connection/IO error during fetch — connection lost"
);
return Err(ArticleError::ConnectionLost(format!(
"Connection error on attempt {attempt}: {e}"
)));
}
Err(e @ NntpError::Tls(_)) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"TLS error during fetch — connection lost"
);
return Err(ArticleError::ConnectionLost(format!("TLS error: {e}")));
}
Err(e @ NntpError::ServiceUnavailable(_)) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"Service unavailable (502) during article fetch — likely rate limited or blocked"
);
return Err(ArticleError::ConnectionLost(format!(
"Service unavailable: {e}"
)));
}
Err(e @ NntpError::AuthRequired(_)) => {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"Auth required (480) during article fetch — session expired or rate limited"
);
return Err(ArticleError::ConnectionLost(format!(
"Auth required mid-session: {e}"
)));
}
Err(e) => {
last_error = Some(format!("{e}"));
if attempt < MAX_TRIES_PER_SERVER {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
max_tries = MAX_TRIES_PER_SERVER,
error = %e,
"Transient fetch error, retrying in 500ms"
);
tokio::time::sleep(Duration::from_millis(500)).await;
} else {
warn!(
worker = %worker_id,
article = %item.message_id,
attempt,
error = %e,
"All retries on this server exhausted"
);
}
}
}
}
Err(ArticleError::DecodeError(
last_error.unwrap_or_else(|| "Unknown error after retries".into()),
))
}
#[derive(Debug)]
struct ProcessResult {
decoded_bytes: u64,
file_complete: bool,
yenc_filename: Option<String>,
decode_us: u64,
assemble_us: u64,
}
#[derive(Debug, thiserror::Error)]
enum ArticleError {
#[error("Article not found on server")]
ArticleNotFound,
#[error("Connection lost: {0}")]
ConnectionLost(String),
#[error("Decode error: {0}")]
DecodeError(String),
#[error("Assembly error: {0}")]
AssemblyError(String),
}
fn decode_and_assemble(
item: &WorkItem,
raw_data: &[u8],
assembler: &FileAssembler,
) -> Result<ProcessResult, ArticleError> {
let decode_start = Instant::now();
let decoded = decode_yenc(raw_data).map_err(|e| {
ArticleError::DecodeError(format!(
"yEnc decode failed for {} seg {}: {e}",
item.filename, item.segment_number
))
})?;
let decode_us = decode_start.elapsed().as_micros();
let yenc_filename = decoded.filename;
let data_begin = decoded.part_begin.unwrap_or(0);
let decoded_len = decoded.data.len() as u64;
let assemble_start = Instant::now();
let file_complete = assembler
.assemble_article(
&item.job_id,
&item.file_id,
item.segment_number,
data_begin,
&decoded.data,
)
.map_err(|e| {
ArticleError::AssemblyError(format!(
"Assembly failed for {} seg {}: {e}",
item.filename, item.segment_number
))
})?;
let assemble_us = assemble_start.elapsed().as_micros();
debug!(
file = %item.filename,
segment = item.segment_number,
raw_bytes = raw_data.len(),
decoded_bytes = decoded_len,
decode_us,
assemble_us,
"Article decode+assemble timing"
);
Ok(ProcessResult {
decoded_bytes: decoded_len,
file_complete,
yenc_filename,
decode_us: decode_us as u64,
assemble_us: assemble_us as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn has_known_extension_recognizes_archives() {
assert!(has_known_extension("movie.rar"));
assert!(has_known_extension("movie.part01.rar"));
assert!(has_known_extension("file.zip"));
assert!(has_known_extension("file.7z"));
assert!(has_known_extension("archive.001"));
}
#[test]
fn has_known_extension_recognizes_video() {
assert!(has_known_extension("episode.mkv"));
assert!(has_known_extension("movie.mp4"));
assert!(has_known_extension("video.avi"));
assert!(has_known_extension("clip.ts"));
}
#[test]
fn has_known_extension_recognizes_par2() {
assert!(has_known_extension("file.par2"));
assert!(has_known_extension("file.vol00+01.par2"));
assert!(has_known_extension("file.vol015-031.par2"));
}
#[test]
fn has_known_extension_recognizes_misc() {
assert!(has_known_extension("info.nfo"));
assert!(has_known_extension("sub.srt"));
assert!(has_known_extension("cover.jpg"));
assert!(has_known_extension("song.flac"));
}
#[test]
fn has_known_extension_rejects_obfuscated_hashes() {
assert!(!has_known_extension("9b6a324d7560b87091685020371ba869"));
assert!(!has_known_extension("1fG1GP7L2263LHXH213HTNIxZsX7l0cv44BZ"));
assert!(!has_known_extension("DfKUx3bl7L6PSo6276WSaXSZ7"));
assert!(!has_known_extension("Q77O1ZxL237vc241z77hFoLBxl"));
}
#[test]
fn has_known_extension_rejects_unknown_extensions() {
assert!(!has_known_extension("file.xyz123"));
assert!(!has_known_extension("noext"));
assert!(!has_known_extension(""));
}
#[test]
fn has_known_extension_case_insensitive() {
assert!(has_known_extension("file.RAR"));
assert!(has_known_extension("file.MKV"));
assert!(has_known_extension("file.Par2"));
assert!(has_known_extension("file.MP4"));
}
fn make_item(job_id: &str, msg_id: &str, filename: &str) -> WorkItem {
WorkItem {
job_id: job_id.to_string(),
file_id: "f1".to_string(),
filename: filename.to_string(),
message_id: msg_id.to_string(),
segment_number: 1,
tried_servers: Vec::new(),
tries_on_current: 0,
}
}
#[test]
fn shared_queue_par2_first() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a", "movie.rar"),
make_item("j1", "b", "movie.par2"),
make_item("j1", "c", "movie.vol00+01.par2"),
make_item("j1", "d", "movie.r00"),
]);
let first = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(first.filename, "movie.par2", "index file first");
let second = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(second.filename, "movie.vol00+01.par2", "vol file second");
}
#[test]
fn shared_queue_skips_tried_servers() {
let q = SharedWorkQueue::new();
let mut item = make_item("j1", "a", "file.rar");
item.tried_servers.push("srv1".to_string());
q.submit_items(vec![item, make_item("j1", "b", "other.rar")]);
let picked = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(picked.message_id, "b");
}
#[test]
fn pop_workable_respects_priority() {
let q = SharedWorkQueue::new();
q.submit_items(vec![make_item("j1", "a", "file.rar")]);
let higher = vec!["srv_primary".to_string()];
assert!(q.pop_workable("srv_backup", &higher).is_none());
let item = q.pop_workable("srv_primary", &[]).unwrap();
assert_eq!(item.message_id, "a");
}
#[test]
fn pop_workable_allows_backup_after_primary_tried() {
let q = SharedWorkQueue::new();
let mut item = make_item("j1", "a", "file.rar");
item.tried_servers.push("srv_primary".to_string());
q.submit_items(vec![item]);
let higher = vec!["srv_primary".to_string()];
let picked = q.pop_workable("srv_backup", &higher).unwrap();
assert_eq!(picked.message_id, "a");
}
#[test]
fn pop_workable_ignores_circuit_broken_higher_server() {
let q = SharedWorkQueue::new();
q.submit_items(vec![make_item("j1", "a", "file.rar")]);
let higher: Vec<String> = vec![]; let item = q.pop_workable("srv_backup", &higher).unwrap();
assert_eq!(item.message_id, "a");
}
#[test]
fn workable_count_for_respects_priority() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a", "a.rar"),
make_item("j1", "b", "b.rar"),
]);
let higher = vec!["srv_primary".to_string()];
let (workable, total) = q.workable_count_for("srv_backup", &higher);
assert_eq!(workable, 0);
assert_eq!(total, 2);
let (workable, total) = q.workable_count_for("srv_primary", &[]);
assert_eq!(workable, 2);
assert_eq!(total, 2);
}
#[test]
fn shared_queue_drain_job_removes_only_target() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a", "a.rar"),
make_item("j2", "b", "b.rar"),
make_item("j1", "c", "c.rar"),
]);
let drained = q.drain_job("j1");
assert_eq!(drained.len(), 2);
assert_eq!(q.len(), 1);
let remaining = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(remaining.job_id, "j2");
}
#[test]
fn pop_workable_alternates_between_jobs_on_single_server() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a1", "a1.rar"),
make_item("j1", "a2", "a2.rar"),
make_item("j1", "a3", "a3.rar"),
make_item("j2", "b1", "b1.rar"),
make_item("j2", "b2", "b2.rar"),
make_item("j2", "b3", "b3.rar"),
]);
let mut order: Vec<String> = Vec::new();
while let Some(item) = q.pop_workable("srv1", &[]) {
order.push(item.job_id);
}
assert_eq!(
order,
vec!["j1", "j2", "j1", "j2", "j1", "j2"],
"single-server pops must alternate across jobs, not drain one"
);
}
#[test]
fn pop_workable_falls_back_when_only_same_job_is_available() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a1", "a.rar"),
make_item("j1", "a2", "b.rar"),
]);
let first = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(first.job_id, "j1");
let second = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(
second.job_id, "j1",
"falls back to same job when no sibling"
);
}
#[test]
fn per_server_cursors_are_independent() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a1", "a1.rar"),
make_item("j2", "b1", "b1.rar"),
make_item("j1", "a2", "a2.rar"),
make_item("j2", "b2", "b2.rar"),
]);
let x1 = q.pop_workable("srv_x", &[]).unwrap();
assert_eq!(x1.job_id, "j1");
let x2 = q.pop_workable("srv_x", &[]).unwrap();
assert_eq!(x2.job_id, "j2");
let y1 = q.pop_workable("srv_y", &[]).unwrap();
assert_eq!(
y1.job_id, "j1",
"srv_y has its own cursor state; srv_x's j2 cursor must not leak"
);
}
#[test]
fn fairness_respects_tried_servers_and_priority() {
let q = SharedWorkQueue::new();
let mut j2_tried = make_item("j2", "b1", "b1.rar");
j2_tried.tried_servers.push("srv1".to_string());
q.submit_items(vec![make_item("j1", "a1", "a1.rar"), j2_tried]);
let first = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(first.job_id, "j1");
assert!(
q.pop_workable("srv1", &[]).is_none(),
"must not serve an ineligible item just to satisfy fairness"
);
}
#[test]
fn drained_jobs_clear_last_served_cursor() {
let q = SharedWorkQueue::new();
q.submit_items(vec![
make_item("j1", "a1", "a1.rar"),
make_item("j2", "b1", "b1.rar"),
]);
let _ = q.pop_workable("srv1", &[]).unwrap(); q.drain_job("j1");
let pick = q.pop_workable("srv1", &[]).unwrap();
assert_eq!(pick.job_id, "j2");
}
#[tokio::test]
async fn connection_tracker_acquire_releases_slot_on_drop() {
let t = ConnectionTracker::new();
t.set_limit("srv1", "Server 1", 2);
let s1 = t.acquire("srv1").await.unwrap();
let s2 = t.acquire("srv1").await.unwrap();
assert_eq!(t.total(), 2);
drop(s1);
assert_eq!(t.total(), 1);
drop(s2);
assert_eq!(t.total(), 0);
}
#[tokio::test]
async fn connection_tracker_blocks_at_limit() {
let t = Arc::new(ConnectionTracker::new());
t.set_limit("srv1", "Server 1", 1);
let _held = t.acquire("srv1").await.unwrap();
let t2 = Arc::clone(&t);
let res = tokio::time::timeout(Duration::from_millis(150), async move {
t2.acquire("srv1").await
})
.await;
assert!(
res.is_err(),
"second acquire should block while limit is reached"
);
}
#[tokio::test]
async fn connection_tracker_grow_in_place_lets_more_acquire() {
let t = ConnectionTracker::new();
t.set_limit("srv1", "Server 1", 2);
let _a = t.acquire("srv1").await.unwrap();
let _b = t.acquire("srv1").await.unwrap();
t.set_limit("srv1", "Server 1", 4);
let _c = t.acquire("srv1").await.unwrap();
let _d = t.acquire("srv1").await.unwrap();
assert_eq!(t.total(), 4);
}
#[tokio::test]
async fn connection_tracker_shrink_marks_old_slots_stale() {
let t = ConnectionTracker::new();
t.set_limit("srv1", "Server 1", 4);
let s = t.acquire("srv1").await.unwrap();
assert!(t.slot_is_current(&s));
t.set_limit("srv1", "Server 1", 1);
assert!(
!t.slot_is_current(&s),
"after shrink, the previously-acquired slot must be marked stale"
);
assert_eq!(t.total(), 0);
let new_slot = t.acquire("srv1").await.unwrap();
assert!(t.slot_is_current(&new_slot));
assert_eq!(t.total(), 1);
}
#[tokio::test]
async fn connection_tracker_remove_server_marks_slot_stale() {
let t = ConnectionTracker::new();
t.set_limit("srv1", "Server 1", 2);
let s = t.acquire("srv1").await.unwrap();
assert!(t.slot_is_current(&s));
t.remove_server("srv1");
assert!(
!t.slot_is_current(&s),
"after remove_server, the slot must be marked stale"
);
assert_eq!(t.total(), 0);
}
#[tokio::test]
async fn connection_tracker_snapshot_reflects_active_count() {
let t = ConnectionTracker::new();
t.set_limit("srv1", "Server 1", 3);
t.set_limit("srv2", "Server 2", 5);
let _a1 = t.acquire("srv1").await.unwrap();
let _a2 = t.acquire("srv1").await.unwrap();
let _b1 = t.acquire("srv2").await.unwrap();
let mut snap = t.snapshot();
snap.sort_by(|a, b| a.0.cmp(&b.0));
assert_eq!(snap.len(), 2);
assert_eq!(snap[0], ("srv1".into(), 2, 3));
assert_eq!(snap[1], ("srv2".into(), 1, 5));
}
}