Skip to main content

nzb_dispatch/
download_engine.rs

1//! Download engine — shared NNTP worker pool that services all active jobs.
2//!
3//! Architecture:
4//! - A single long-lived [`WorkerPool`] is owned by [`crate::queue_manager::QueueManager`].
5//! - For each enabled server, exactly `server.connections` workers are
6//!   spawned and live as long as the server stays enabled. When the server
7//!   list or per-server connection limit changes, the pool reconciles.
8//! - Jobs register a [`JobContext`] and push their work items into a
9//!   [`SharedWorkQueue`]. Workers pop items tagged with `job_id` and look up
10//!   per-job state (assembler, progress sink, pause/cancel flags) via the
11//!   shared [`JobContextMap`].
12//! - Pause / cancel / completion are per-job flags; workers themselves are
13//!   never torn down on job transitions. Pausing a job causes workers holding
14//!   one of its items to return that item to the queue and pull something
15//!   else. Cancelling a job drains its items and drops in-flight results.
16//! - A supervisor task detects "all enabled servers circuit-broken for a
17//!   given job" and emits [`ProgressUpdate::NoServersAvailable`] so the user
18//!   can fix config and resume, matching the prior per-engine behaviour.
19//!
20//! Retry logic (per article):
21//! 1. Try the article on the current server up to [`MAX_TRIES_PER_SERVER`]
22//!    times, reconnecting on transient errors.
23//! 2. On `ArticleNotFound` (430) — requeue with the current server added to
24//!    `tried_servers`; another worker on a different server picks it up.
25//! 3. On connection loss — requeue and reconnect.
26//! 4. On decode error — treated like "not available on this server", try
27//!    another.
28//! 5. When every enabled server is in `tried_servers` (or circuit-broken),
29//!    the article is marked failed.
30//! 6. A job only fails if failed articles exceed the threshold and no par2
31//!    recovery is possible.
32
33use std::collections::{HashMap, VecDeque};
34use std::path::PathBuf;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
37use std::time::{Duration, Instant};
38
39use parking_lot::Mutex;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinHandle;
42use tracing::{debug, error, info, trace, warn};
43
44use nzb_core::config::ServerConfig;
45use nzb_core::models::NzbJob;
46use nzb_decode::FileAssembler;
47use nzb_decode::yenc::decode_yenc;
48use nzb_nntp::Pipeline;
49use nzb_nntp::connection::NntpConnection;
50use nzb_nntp::error::NntpError;
51
52use crate::bandwidth::BandwidthLimiter;
53
54// ---------------------------------------------------------------------------
55// Constants
56// ---------------------------------------------------------------------------
57
58/// Max times to retry an article on the SAME server before trying the next.
59const MAX_TRIES_PER_SERVER: u32 = 3;
60/// Delay between reconnection attempts.
61const RECONNECT_DELAY: Duration = Duration::from_secs(5);
62/// Max reconnect attempts before giving up on a server for this session.
63const MAX_RECONNECT_ATTEMPTS: u32 = 5;
64/// Stagger delay between worker initial connections to avoid thundering herd.
65/// Each worker waits conn_idx * WORKER_RAMP_DELAY before its first connect.
66const WORKER_RAMP_DELAY: Duration = Duration::from_millis(15);
67/// Consecutive connect failures before circuit-breaking a server.
68const CIRCUIT_BREAK_THRESHOLD: u32 = 3;
69/// Cooldown after auth/permission failure (bad credentials, 502, account blocked).
70const AUTH_FAILURE_COOLDOWN: Duration = Duration::from_secs(120);
71/// Cooldown after transient connection failures exceed threshold.
72const TRANSIENT_FAILURE_COOLDOWN: Duration = Duration::from_secs(30);
73/// Supervisor tick interval for detecting stuck jobs.
74const SUPERVISOR_INTERVAL: Duration = Duration::from_secs(1);
75
76/// Default maximum idle time before a worker is evicted by the supervisor.
77/// Tunable per pool via [`WorkerPool::set_max_worker_idle`]. Production code
78/// uses the default; the test harness shrinks this to make Phase 5 tests
79/// converge in seconds rather than minutes.
80const DEFAULT_MAX_WORKER_IDLE: Duration = Duration::from_secs(60);
81/// Worker idle poll interval when the shared queue is empty.
82const WORKER_IDLE_POLL: Duration = Duration::from_millis(500);
83
84/// Phase 7: capacity of the per-job progress channel. The handler reads
85/// this at ~articles per second; under DB-lock contention or
86/// post-processing pauses it can fall behind. With this cap, the worst
87/// case is bounded buffering plus a `WARN` from
88/// [`try_send_progress`] when the channel is full.
89pub const PROGRESS_CHANNEL_CAPACITY: usize = 10_000;
90
91/// Send a progress update to the per-job channel. On `Full` (handler
92/// backpressure), log a warning and drop the update — the alternative is
93/// awaiting and stalling the worker, which would cascade into the entire
94/// download pipeline. On `Closed` (handler shut down), drop silently.
95///
96/// Workers should call this through their `JobContext` rather than
97/// touching `progress_tx` directly.
98fn try_send_progress(tx: &mpsc::Sender<ProgressUpdate>, job_id: &str, update: ProgressUpdate) {
99    if let Err(e) = tx.try_send(update) {
100        match e {
101            mpsc::error::TrySendError::Full(_) => {
102                warn!(
103                    job_id,
104                    capacity = PROGRESS_CHANNEL_CAPACITY,
105                    "Progress channel full — dropping update (handler backpressure)"
106                );
107            }
108            mpsc::error::TrySendError::Closed(_) => {
109                // Handler has shut down. Workers can't recover this state;
110                // dropping is correct.
111            }
112        }
113    }
114}
115
116// ---------------------------------------------------------------------------
117// Global connection tracking — semaphore-backed permit pools
118// ---------------------------------------------------------------------------
119
120/// Tracks the per-server NNTP connection budget. Each server has a
121/// `tokio::sync::Semaphore` whose initial permit count is the configured
122/// `connections` limit. Workers acquire a permit *before* connecting and
123/// hold it for their entire lifetime; the permit's `Drop` releases the slot
124/// synchronously back to the pool. This makes over-allocation a type-level
125/// impossibility — once `limit` permits are out, the next acquire awaits.
126///
127/// **Limit changes at runtime** are handled differently for grow vs shrink:
128///
129/// - **Grow** (e.g. 5 → 10): the existing semaphore is given the additional
130///   permits via `add_permits`; existing slot holders are unaffected.
131/// - **Shrink** (e.g. 10 → 5): the entire `ServerSlot` is replaced with a
132///   fresh one. Old permit holders continue to reference the orphaned
133///   semaphore via their `Arc`; their drops release back to it (a no-op
134///   since nothing else points at it). Workers detect the replacement via
135///   [`ConnectionTracker::slot_is_current`] and exit on the next iteration.
136pub struct ConnectionTracker {
137    pools: Mutex<HashMap<String, ServerSlot>>,
138}
139
140#[derive(Clone)]
141struct ServerSlot {
142    name: String,
143    limit: usize,
144    semaphore: Arc<tokio::sync::Semaphore>,
145}
146
147impl ConnectionTracker {
148    pub fn new() -> Self {
149        Self {
150            pools: Mutex::new(HashMap::new()),
151        }
152    }
153
154    /// Set or update the per-server connection limit.
155    ///
156    /// - First call for a `server_id`: creates a fresh semaphore with `limit` permits.
157    /// - Subsequent call with the same `limit` and `server_name`: no-op.
158    /// - Grow (`limit > current`): adds permits in place.
159    /// - Shrink or rename: replaces the slot. Old permit holders detach
160    ///   naturally via [`slot_is_current`].
161    pub fn set_limit(&self, server_id: &str, server_name: &str, limit: usize) {
162        let mut pools = self.pools.lock();
163        match pools.get_mut(server_id) {
164            Some(slot) if slot.limit == limit && slot.name == server_name => {
165                // No change.
166            }
167            Some(slot) if limit > slot.limit && slot.name == server_name => {
168                let added = limit - slot.limit;
169                let old = slot.limit;
170                slot.semaphore.add_permits(added);
171                slot.limit = limit;
172                info!(
173                    server_id,
174                    server = %server_name,
175                    old_limit = old,
176                    new_limit = limit,
177                    added,
178                    "Connection pool grew in place"
179                );
180            }
181            existing => {
182                // Capture diagnostics before re-borrowing `pools` for insert.
183                let (prev_limit, prev_name) = match existing {
184                    Some(s) => (Some(s.limit), Some(s.name.clone())),
185                    None => (None, None),
186                };
187                pools.insert(
188                    server_id.to_string(),
189                    ServerSlot {
190                        name: server_name.to_string(),
191                        limit,
192                        semaphore: Arc::new(tokio::sync::Semaphore::new(limit)),
193                    },
194                );
195                if let Some(prev) = prev_limit {
196                    let renamed = prev_name.as_deref() != Some(server_name);
197                    info!(
198                        server_id,
199                        server = %server_name,
200                        old_limit = prev,
201                        new_limit = limit,
202                        renamed,
203                        "Connection pool replaced (shrink or rename); old permits orphaned"
204                    );
205                } else {
206                    info!(
207                        server_id,
208                        server = %server_name,
209                        limit,
210                        "Connection pool created"
211                    );
212                }
213            }
214        }
215    }
216
217    /// Forget a server entirely (e.g. on `update_servers` removing it).
218    /// Existing permit holders are unaffected; they will detect that their
219    /// slot is no longer current and exit.
220    pub fn remove_server(&self, server_id: &str) {
221        self.pools.lock().remove(server_id);
222    }
223
224    /// Acquire a connection slot for `server_id`. Awaits if the pool is at
225    /// the limit. Returns `None` if the server isn't registered or its
226    /// limit is zero.
227    pub async fn acquire(&self, server_id: &str) -> Option<ConnectionSlot> {
228        // Snapshot the ServerSlot under lock, release the lock before await.
229        let server_slot = {
230            let pools = self.pools.lock();
231            pools.get(server_id).cloned()?
232        };
233        if server_slot.limit == 0 {
234            return None;
235        }
236        let permit = Arc::clone(&server_slot.semaphore)
237            .acquire_owned()
238            .await
239            .ok()?;
240        Some(ConnectionSlot {
241            server_id: server_id.to_string(),
242            server_name: server_slot.name,
243            semaphore_origin: server_slot.semaphore,
244            _permit: permit,
245        })
246    }
247
248    /// Returns true if `slot` was acquired from the *current* semaphore for
249    /// its server. False if the limit was changed (semaphore replaced) or
250    /// the server was removed — the worker should exit at its next safe
251    /// checkpoint.
252    pub fn slot_is_current(&self, slot: &ConnectionSlot) -> bool {
253        matches!(self.slot_status(slot), SlotStatus::Current)
254    }
255
256    /// Like [`slot_is_current`] but distinguishes the reason a slot is no
257    /// longer current — useful for diagnostics on the worker exit path.
258    pub fn slot_status(&self, slot: &ConnectionSlot) -> SlotStatus {
259        let pools = self.pools.lock();
260        match pools.get(&slot.server_id) {
261            Some(server_slot) => {
262                if Arc::ptr_eq(&server_slot.semaphore, &slot.semaphore_origin) {
263                    SlotStatus::Current
264                } else {
265                    SlotStatus::PoolReplaced
266                }
267            }
268            None => SlotStatus::ServerRemoved,
269        }
270    }
271
272    /// `(server_id, active, limit)` triples for the live pool. `active` is
273    /// derived from the semaphore's available permits and is always
274    /// `<= limit` by construction.
275    pub fn snapshot(&self) -> Vec<(String, usize, usize)> {
276        let pools = self.pools.lock();
277        pools
278            .iter()
279            .map(|(id, slot)| {
280                let active = slot
281                    .limit
282                    .saturating_sub(slot.semaphore.available_permits());
283                (id.clone(), active, slot.limit)
284            })
285            .collect()
286    }
287
288    /// Total currently-held permits across all servers in the live pool.
289    /// Permits held against orphaned (replaced) semaphores are NOT counted —
290    /// they'll go away as the holding workers exit.
291    pub fn total(&self) -> usize {
292        let pools = self.pools.lock();
293        pools
294            .values()
295            .map(|s| s.limit.saturating_sub(s.semaphore.available_permits()))
296            .sum()
297    }
298}
299
300impl Default for ConnectionTracker {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306/// Why a `ConnectionSlot` may no longer match its server's live pool.
307#[derive(Debug, Clone, Copy, PartialEq, Eq)]
308pub enum SlotStatus {
309    /// Slot is still attached to the current semaphore for its server.
310    Current,
311    /// The semaphore was replaced (set_limit shrunk or rebuilt the pool).
312    /// Old permit holders should exit at their next safe checkpoint.
313    PoolReplaced,
314    /// The server has been removed entirely from the tracker.
315    ServerRemoved,
316}
317
318/// RAII handle for one acquired NNTP connection slot. The underlying
319/// semaphore permit is released synchronously on drop. Workers should hold
320/// a `ConnectionSlot` for their entire lifetime — across reconnects, even
321/// across temporary connect failures — and only drop it when exiting.
322pub struct ConnectionSlot {
323    server_id: String,
324    server_name: String,
325    /// The Arc identity of the semaphore the permit was issued by.
326    /// Used by `ConnectionTracker::slot_is_current` to detect a stale slot
327    /// after a `set_limit` shrink (which replaces the semaphore).
328    semaphore_origin: Arc<tokio::sync::Semaphore>,
329    _permit: tokio::sync::OwnedSemaphorePermit,
330}
331
332impl ConnectionSlot {
333    pub fn server_id(&self) -> &str {
334        &self.server_id
335    }
336    pub fn server_name(&self) -> &str {
337        &self.server_name
338    }
339}
340
341// ---------------------------------------------------------------------------
342// Server health tracking (circuit breaker)
343// ---------------------------------------------------------------------------
344
345#[derive(Debug)]
346pub struct ServerHealth {
347    pub consecutive_failures: u32,
348    pub disabled_until: Option<Instant>,
349    pub reason: Option<String>,
350    pub is_auth_failure: bool,
351}
352
353impl Default for ServerHealth {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359impl ServerHealth {
360    pub fn new() -> Self {
361        Self {
362            consecutive_failures: 0,
363            disabled_until: None,
364            reason: None,
365            is_auth_failure: false,
366        }
367    }
368
369    pub fn is_available(&self) -> bool {
370        match self.disabled_until {
371            None => true,
372            Some(until) => Instant::now() >= until,
373        }
374    }
375
376    pub fn record_failure(&mut self, is_auth: bool, reason: &str) {
377        self.consecutive_failures += 1;
378        self.is_auth_failure = is_auth;
379        self.reason = Some(reason.to_string());
380
381        if is_auth || self.consecutive_failures >= CIRCUIT_BREAK_THRESHOLD {
382            let cooldown = if is_auth {
383                AUTH_FAILURE_COOLDOWN
384            } else {
385                TRANSIENT_FAILURE_COOLDOWN
386            };
387            self.disabled_until = Some(Instant::now() + cooldown);
388        }
389    }
390
391    pub fn record_success(&mut self) {
392        *self = Self::new();
393    }
394}
395
396pub type ServerHealthMap = Arc<Mutex<HashMap<String, ServerHealth>>>;
397
398// ---------------------------------------------------------------------------
399// Progress update messages
400// ---------------------------------------------------------------------------
401
402#[derive(Debug, Clone)]
403pub enum ProgressUpdate {
404    ArticleComplete {
405        job_id: String,
406        file_id: String,
407        segment_number: u32,
408        decoded_bytes: u64,
409        file_complete: bool,
410        server_id: Option<String>,
411    },
412    /// An article could not be retrieved. `failure` carries the typed
413    /// classification of *why* (NotFound, ServerDown, AuthFailed, …).
414    /// See `crate::article_failure` for the taxonomy.
415    ArticleFailed {
416        job_id: String,
417        file_id: String,
418        segment_number: u32,
419        failure: crate::article_failure::ArticleFailure,
420    },
421    JobFinished {
422        job_id: String,
423        success: bool,
424        articles_failed: usize,
425    },
426    NoServersAvailable {
427        job_id: String,
428        reason: String,
429    },
430    JobAborted {
431        job_id: String,
432        reason: String,
433    },
434}
435
436// ---------------------------------------------------------------------------
437// Work item
438// ---------------------------------------------------------------------------
439
440#[derive(Debug, Clone)]
441pub(crate) struct WorkItem {
442    pub(crate) job_id: String,
443    pub(crate) file_id: String,
444    pub(crate) filename: String,
445    pub(crate) message_id: String,
446    pub(crate) segment_number: u32,
447    /// Servers already tried for this article (by server ID).
448    pub(crate) tried_servers: Vec<String>,
449    /// Number of attempts on the current server.
450    pub(crate) tries_on_current: u32,
451}
452
453// ---------------------------------------------------------------------------
454// Per-job context
455// ---------------------------------------------------------------------------
456
457/// Per-job state that workers reference via `item.job_id`.
458///
459/// Everything a worker needs to process an article for a specific job lives
460/// here. The queue manager owns one `Arc<JobContext>` per active job; the
461/// pool holds a clone in its [`JobContextMap`] so workers can look it up.
462pub(crate) struct JobContext {
463    pub job_id: String,
464    pub work_dir: PathBuf,
465    pub assembler: Arc<FileAssembler>,
466    pub progress_tx: mpsc::Sender<ProgressUpdate>,
467    pub yenc_names: Arc<Mutex<HashMap<String, String>>>,
468    pub nzb_filenames: HashMap<String, String>,
469    /// Articles that still need a definitive result (success or all-server
470    /// failure). When this reaches zero, `JobFinished` is emitted.
471    pub articles_remaining: AtomicUsize,
472    pub articles_failed: AtomicUsize,
473    pub paused: AtomicBool,
474    pub cancelled: AtomicBool,
475    /// Optional abort reason — if set when articles_remaining hits zero
476    /// (or when cancellation fires), `JobAborted` is emitted instead of
477    /// `JobFinished`.
478    pub abort_reason: Mutex<Option<String>>,
479    pub total_decode_us: Arc<AtomicU64>,
480    pub total_assemble_us: Arc<AtomicU64>,
481    pub total_articles_decoded: Arc<AtomicU64>,
482    pub engine_start: Instant,
483    /// Total bytes across all files (for perf summary throughput).
484    pub total_bytes: u64,
485    /// Ensures JobFinished/JobAborted is only emitted once.
486    finished: AtomicBool,
487}
488
489pub(crate) type JobContextMap = Arc<Mutex<HashMap<String, Arc<JobContext>>>>;
490
491impl JobContext {
492    fn new(
493        job: &NzbJob,
494        assembler: Arc<FileAssembler>,
495        progress_tx: mpsc::Sender<ProgressUpdate>,
496        total_articles: usize,
497    ) -> Self {
498        let nzb_filenames = job
499            .files
500            .iter()
501            .map(|f| (f.id.clone(), f.filename.clone()))
502            .collect();
503        Self {
504            job_id: job.id.clone(),
505            work_dir: job.work_dir.clone(),
506            assembler,
507            progress_tx,
508            yenc_names: Arc::new(Mutex::new(HashMap::new())),
509            nzb_filenames,
510            articles_remaining: AtomicUsize::new(total_articles),
511            articles_failed: AtomicUsize::new(0),
512            paused: AtomicBool::new(false),
513            cancelled: AtomicBool::new(false),
514            abort_reason: Mutex::new(None),
515            total_decode_us: Arc::new(AtomicU64::new(0)),
516            total_assemble_us: Arc::new(AtomicU64::new(0)),
517            total_articles_decoded: Arc::new(AtomicU64::new(0)),
518            engine_start: Instant::now(),
519            total_bytes: job.total_bytes,
520            finished: AtomicBool::new(false),
521        }
522    }
523
524    /// Crate-public accessor for [`resolve_one`](Self::resolve_one). Used
525    /// by alternative `DispatchEngine` impls in sibling modules.
526    pub(crate) fn resolve_one_public(&self) {
527        self.resolve_one();
528    }
529
530    /// Crate-public accessor for [`emit_terminal`](Self::emit_terminal).
531    pub(crate) fn emit_terminal_public(&self) {
532        self.emit_terminal();
533    }
534
535    /// Decrement articles_remaining. If it reaches zero, run deobfuscation
536    /// and emit the job-finished/aborted terminal update. Idempotent.
537    fn resolve_one(&self) {
538        let prev = self.articles_remaining.fetch_sub(1, Ordering::Relaxed);
539        if prev != 1 {
540            return;
541        }
542        self.emit_terminal();
543    }
544
545    /// Emit the terminal (JobFinished / JobAborted) message. Safe to call
546    /// multiple times; only the first call does anything.
547    fn emit_terminal(&self) {
548        if self.finished.swap(true, Ordering::Relaxed) {
549            return;
550        }
551
552        // Run deobfuscation before signalling completion so post-processing
553        // sees the final filenames.
554        self.deobfuscate_files();
555
556        let download_elapsed = self.engine_start.elapsed();
557        let decode_total_us = self.total_decode_us.load(Ordering::Relaxed);
558        let assemble_total_us = self.total_assemble_us.load(Ordering::Relaxed);
559        let articles_decoded = self.total_articles_decoded.load(Ordering::Relaxed);
560        let elapsed_us = download_elapsed.as_micros().max(1);
561        let throughput_mbps = (self.total_bytes as f64 / download_elapsed.as_secs_f64().max(0.001))
562            / (1024.0 * 1024.0);
563        info!(
564            job_id = %self.job_id,
565            elapsed_secs = download_elapsed.as_secs_f64(),
566            total_bytes = self.total_bytes,
567            throughput_mbps = format!("{throughput_mbps:.2}"),
568            "Download phase complete"
569        );
570        info!(
571            job_id = %self.job_id,
572            articles_decoded,
573            decode_secs = format!("{:.3}", decode_total_us as f64 / 1_000_000.0),
574            assemble_secs = format!("{:.3}", assemble_total_us as f64 / 1_000_000.0),
575            decode_pct = format!("{:.1}", decode_total_us as f64 / elapsed_us as f64 * 100.0),
576            assemble_pct = format!("{:.1}", assemble_total_us as f64 / elapsed_us as f64 * 100.0),
577            "Decode timing summary (cumulative across all workers)"
578        );
579
580        let abort_reason = self.abort_reason.lock().clone();
581        if let Some(reason) = abort_reason {
582            try_send_progress(
583                &self.progress_tx,
584                &self.job_id,
585                ProgressUpdate::JobAborted {
586                    job_id: self.job_id.clone(),
587                    reason,
588                },
589            );
590            return;
591        }
592
593        let failed = self.articles_failed.load(Ordering::Relaxed);
594        try_send_progress(
595            &self.progress_tx,
596            &self.job_id,
597            ProgressUpdate::JobFinished {
598                job_id: self.job_id.clone(),
599                success: failed == 0,
600                articles_failed: failed,
601            },
602        );
603    }
604
605    /// Choose the best filename between NZB subject and yEnc header per file
606    /// and rename on disk if needed. Called exactly once at job completion.
607    fn deobfuscate_files(&self) {
608        let renames = self.yenc_names.lock();
609        for (file_id, yenc_name) in renames.iter() {
610            let Some(nzb_name) = self.nzb_filenames.get(file_id) else {
611                continue;
612            };
613            if nzb_name == yenc_name {
614                continue;
615            }
616            let clean_yenc = std::path::Path::new(yenc_name.as_str())
617                .file_name()
618                .and_then(|n| n.to_str())
619                .unwrap_or(yenc_name);
620            if clean_yenc.is_empty() || nzb_name == clean_yenc {
621                continue;
622            }
623
624            let nzb_has_ext = has_known_extension(nzb_name);
625            let yenc_has_ext = has_known_extension(clean_yenc);
626
627            let (old_name, new_name) = if yenc_has_ext && !nzb_has_ext {
628                (nzb_name.as_str(), clean_yenc)
629            } else if nzb_has_ext && !yenc_has_ext {
630                continue;
631            } else if yenc_has_ext && nzb_has_ext {
632                (nzb_name.as_str(), clean_yenc)
633            } else {
634                continue;
635            };
636
637            let old_path = self.work_dir.join(old_name);
638            let new_path = self.work_dir.join(new_name);
639            if old_path.exists() && !new_path.exists() {
640                if let Err(e) = std::fs::rename(&old_path, &new_path) {
641                    warn!(
642                        job_id = %self.job_id,
643                        from = %old_name,
644                        to = %new_name,
645                        "Failed to deobfuscate file: {e}"
646                    );
647                } else {
648                    info!(
649                        job_id = %self.job_id,
650                        from = %old_name,
651                        to = %new_name,
652                        "Deobfuscated file"
653                    );
654                }
655            }
656        }
657    }
658}
659
660// ---------------------------------------------------------------------------
661// Shared work queue
662// ---------------------------------------------------------------------------
663
664/// Multi-job FIFO work queue with PAR2-first priority within each submission.
665///
666/// Items submitted via [`SharedWorkQueue::submit_items`] are inserted so that
667/// PAR2 index and volume files land ahead of data files (matching the prior
668/// per-job ordering), while data files land at the tail. Cross-job ordering
669/// is FIFO by submission time, per the chosen FIFO priority model.
670pub(crate) struct SharedWorkQueue {
671    inner: Mutex<InnerState>,
672    notify: Notify,
673}
674
675struct InnerState {
676    /// FIFO-ish queue of work items. Ordering is PAR2-first within each
677    /// `submit_items` batch; `push_front` (used for fast failover after a
678    /// per-server failure) prepends.
679    items: VecDeque<WorkItem>,
680    /// Per-server round-robin cursor: the `job_id` of the most recent item
681    /// this server popped. On the next pop the scan prefers items whose
682    /// `job_id` differs from this value so one active job with a large
683    /// backlog can't monopolise the server's workers while a sibling job
684    /// has workable items. Falls back to same-job items when no other
685    /// job has anything eligible.
686    last_served: HashMap<String, String>,
687}
688
689impl SharedWorkQueue {
690    pub fn new() -> Self {
691        Self {
692            inner: Mutex::new(InnerState {
693                items: VecDeque::new(),
694                last_served: HashMap::new(),
695            }),
696            notify: Notify::new(),
697        }
698    }
699
700    /// Insert a batch of work items with PAR2 items ahead of data items.
701    /// Cross-batch order is preserved: PAR2 items from this batch go after
702    /// any existing items, then data items.
703    pub fn submit_items(&self, mut items: Vec<WorkItem>) {
704        items.sort_by_key(|item| par2_sort_key(&item.filename));
705        let had_items = !items.is_empty();
706        {
707            let mut state = self.inner.lock();
708            state.items.reserve(items.len());
709            for item in items {
710                state.items.push_back(item);
711            }
712        }
713        if had_items {
714            self.notify.notify_waiters();
715        }
716    }
717
718    /// Push a single item back onto the front (used when a worker is
719    /// returning an item because its job is paused or its server was just
720    /// tried for this item).
721    fn push_front(&self, item: WorkItem) {
722        self.inner.lock().items.push_front(item);
723        self.notify.notify_waiters();
724    }
725
726    /// Push a single item to the back (used after handle_article_not_available
727    /// when another server can still try it).
728    fn push_back(&self, item: WorkItem) {
729        self.inner.lock().items.push_back(item);
730        self.notify.notify_waiters();
731    }
732
733    /// `(workable, total)` for `server_id`. `workable` is the number of
734    /// items in the queue that are eligible for a worker on `server_id`;
735    /// `total` is the queue length. Used by the supervisor's starvation
736    /// diagnostic — if `workable == 0` while `total > 0`, the server has
737    /// items it can't service yet (either already tried, or waiting on a
738    /// higher-priority server).
739    ///
740    /// An item is "workable" for this server when:
741    /// - `server_id` is NOT in `item.tried_servers`, AND
742    /// - no healthy higher-priority server still needs to try it (i.e. every
743    ///   server in `higher_priority_servers` is already in `item.tried_servers`).
744    ///
745    /// Pass an empty slice when the caller has no strictly-higher-priority
746    /// peers (priority 0, single-server setups, or all peers circuit-broken).
747    pub(crate) fn workable_count_for(
748        &self,
749        server_id: &str,
750        higher_priority_servers: &[String],
751    ) -> (usize, usize) {
752        let state = self.inner.lock();
753        let total = state.items.len();
754        let workable = state
755            .items
756            .iter()
757            .filter(|i| !i.tried_servers.iter().any(|s| s == server_id))
758            .filter(|i| {
759                higher_priority_servers
760                    .iter()
761                    .all(|hp| i.tried_servers.contains(hp))
762            })
763            .count();
764        (workable, total)
765    }
766
767    /// Pop the next item that can be processed by a worker on `server_id`,
768    /// biased toward fair round-robin across active jobs.
769    ///
770    /// Two-pass scan:
771    /// 1. Prefer an item whose `job_id` differs from the last one served to
772    ///    this server (fairness — sibling jobs don't starve behind a
773    ///    backlog-heavy job).
774    /// 2. Fall back to any eligible item (the sibling-preferred pass found
775    ///    nothing; same-job work is fine).
776    ///
777    /// Eligibility:
778    /// - `server_id` NOT in `item.tried_servers`, AND
779    /// - every server in `higher_priority_servers` IS in `item.tried_servers`
780    ///   (priority gate — matches SABnzbd `get_article()`).
781    ///
782    /// `higher_priority_servers` is a caller-prepared list of server IDs with
783    /// strictly higher priority than the caller, filtered to only enabled +
784    /// healthy servers. Empty slice disables the priority gate (priority-0
785    /// servers, single-server setups, or all peers circuit-broken).
786    fn pop_workable(
787        &self,
788        server_id: &str,
789        higher_priority_servers: &[String],
790    ) -> Option<WorkItem> {
791        let mut state = self.inner.lock();
792
793        let eligible = |item: &WorkItem| -> bool {
794            !item.tried_servers.iter().any(|s| s == server_id)
795                && higher_priority_servers
796                    .iter()
797                    .all(|hp| item.tried_servers.contains(hp))
798        };
799
800        let last_served = state.last_served.get(server_id).cloned();
801
802        // Pass 1: prefer different job than last served.
803        let mut chosen = None;
804        if let Some(ref last) = last_served {
805            chosen = state
806                .items
807                .iter()
808                .position(|item| eligible(item) && item.job_id != *last);
809        }
810
811        // Pass 2: any eligible (fallback when no non-last-job eligible items exist).
812        if chosen.is_none() {
813            chosen = state.items.iter().position(eligible);
814        }
815
816        let idx = chosen?;
817        // VecDeque::remove(i) is O(min(i, len - i)) — acceptable for typical
818        // queue lengths. Not using swap_remove_back because it would break
819        // PAR2-first ordering.
820        let item = state.items.remove(idx)?;
821        state
822            .last_served
823            .insert(server_id.to_string(), item.job_id.clone());
824        Some(item)
825    }
826
827    /// Remove all items belonging to `job_id`. Used on cancel_job / remove_job.
828    fn drain_job(&self, job_id: &str) -> Vec<WorkItem> {
829        let mut state = self.inner.lock();
830        let mut kept = VecDeque::with_capacity(state.items.len());
831        let mut drained = Vec::new();
832        while let Some(item) = state.items.pop_front() {
833            if item.job_id == job_id {
834                drained.push(item);
835            } else {
836                kept.push_back(item);
837            }
838        }
839        state.items = kept;
840        // Drop any round-robin cursors pointing at the removed job so the
841        // next pop doesn't try to prefer items for a job that no longer
842        // exists. Purely a tidy-up; correctness isn't affected.
843        state.last_served.retain(|_, v| v != job_id);
844        drained
845    }
846
847    pub fn len(&self) -> usize {
848        self.inner.lock().items.len()
849    }
850}
851
852impl Default for SharedWorkQueue {
853    fn default() -> Self {
854        Self::new()
855    }
856}
857
858// ---------------------------------------------------------------------------
859// Worker pool
860// ---------------------------------------------------------------------------
861
862struct ActiveWorker {
863    shutdown: Arc<AtomicBool>,
864    /// Worker heartbeat: monotonic millis since pool creation
865    /// (`WorkerPool::created_at`). Updated by the worker on every
866    /// "definitive progress" event (successful decode or definitive
867    /// failure). The supervisor reads this to detect zombie workers.
868    last_progress: Arc<AtomicU64>,
869    handle: JoinHandle<()>,
870}
871
872/// Long-lived worker pool that services all active download jobs.
873pub struct WorkerPool {
874    work_queue: Arc<SharedWorkQueue>,
875    job_contexts: JobContextMap,
876    servers: Arc<Mutex<Vec<ServerConfig>>>,
877    server_health: ServerHealthMap,
878    bandwidth: Arc<BandwidthLimiter>,
879    conn_tracker: Arc<ConnectionTracker>,
880    stall_timeout: Option<Duration>,
881    /// Reference epoch for worker heartbeats. All `last_progress` values
882    /// store millis elapsed since this instant.
883    created_at: Instant,
884    /// Idle threshold above which the supervisor evicts a worker.
885    /// Mutable so the harness (and runtime config reload) can adjust it
886    /// without recreating the pool.
887    max_worker_idle: Mutex<Duration>,
888    /// Per-server "last starvation log" timestamp; rate-limits the
889    /// "no workable items" diagnostic to once per minute per server.
890    starvation_log: Mutex<HashMap<String, Instant>>,
891    /// Lifetime count of worker evictions performed by the heartbeat
892    /// watchdog. Useful for tests and observability — a non-zero value
893    /// means at least one worker stalled long enough to be reclaimed.
894    evictions: AtomicU64,
895    workers: Mutex<HashMap<String, Vec<ActiveWorker>>>,
896    shutdown: Arc<AtomicBool>,
897    supervisor_handle: Mutex<Option<JoinHandle<()>>>,
898}
899
900impl WorkerPool {
901    pub fn new(
902        servers: Arc<Mutex<Vec<ServerConfig>>>,
903        bandwidth: Arc<BandwidthLimiter>,
904        conn_tracker: Arc<ConnectionTracker>,
905        stall_timeout_secs: u64,
906    ) -> Arc<Self> {
907        let stall_timeout = if stall_timeout_secs > 0 {
908            Some(Duration::from_secs(stall_timeout_secs))
909        } else {
910            None
911        };
912        Arc::new(Self {
913            work_queue: Arc::new(SharedWorkQueue::new()),
914            job_contexts: Arc::new(Mutex::new(HashMap::new())),
915            servers,
916            server_health: Arc::new(Mutex::new(HashMap::new())),
917            bandwidth,
918            conn_tracker,
919            stall_timeout,
920            created_at: Instant::now(),
921            max_worker_idle: Mutex::new(DEFAULT_MAX_WORKER_IDLE),
922            starvation_log: Mutex::new(HashMap::new()),
923            evictions: AtomicU64::new(0),
924            workers: Mutex::new(HashMap::new()),
925            shutdown: Arc::new(AtomicBool::new(false)),
926            supervisor_handle: Mutex::new(None),
927        })
928    }
929
930    /// Override the worker idle eviction threshold. Tests use this to make
931    /// the supervisor's heartbeat check converge in seconds.
932    pub fn set_max_worker_idle(&self, d: Duration) {
933        *self.max_worker_idle.lock() = d;
934    }
935
936    /// Read current worker idle eviction threshold.
937    pub fn max_worker_idle(&self) -> Duration {
938        *self.max_worker_idle.lock()
939    }
940
941    /// Millis elapsed since the pool was constructed. Used as the
942    /// monotonic clock for `ActiveWorker::last_progress`.
943    fn elapsed_ms(&self) -> u64 {
944        self.created_at.elapsed().as_millis() as u64
945    }
946
947    /// Reference `Instant` used as the epoch for `last_progress` / heartbeat
948    /// timestamps. Exposed so the NNTP layer can share the same clock when
949    /// ticking its socket-liveness heartbeat — values then compare directly
950    /// against `self.elapsed_ms()` in the supervisor's idle-worker check.
951    fn created_at(&self) -> Instant {
952        self.created_at
953    }
954
955    /// Collect server IDs with strictly higher priority (lower priority number)
956    /// than `my_priority`, restricted to enabled + healthy (non-circuit-broken)
957    /// servers. `my_server_id` is excluded (a server never blocks itself).
958    ///
959    /// Used by worker loops and the supervisor to drive the priority gate in
960    /// [`SharedWorkQueue::pop_workable`] and [`SharedWorkQueue::workable_count_for`].
961    /// See `sabnzbd/nzb/article.py::get_article` for the reference behaviour.
962    fn higher_priority_servers(&self, my_priority: u8, my_server_id: &str) -> Vec<String> {
963        let servers = self.servers.lock();
964        let health = self.server_health.lock();
965        servers
966            .iter()
967            .filter(|s| s.enabled && s.priority < my_priority && s.id != my_server_id)
968            .filter(|s| health.get(&s.id).is_none_or(|h| h.is_available()))
969            .map(|s| s.id.clone())
970            .collect()
971    }
972
973    /// Lifetime count of worker evictions performed by the heartbeat
974    /// watchdog. Increases by 1 each time the supervisor reclaims a stalled
975    /// worker. Test harnesses use this as a positive signal that the Phase
976    /// 5 idle watchdog actually fired.
977    pub fn eviction_count(&self) -> u64 {
978        self.evictions.load(Ordering::Relaxed)
979    }
980
981    /// Spawn workers for all currently enabled servers and start the
982    /// supervisor task. Call once at queue-manager startup.
983    pub fn start(self: &Arc<Self>) {
984        self.reconcile_servers();
985
986        let this = Arc::clone(self);
987        let handle = tokio::spawn(async move {
988            this.supervisor_loop().await;
989        });
990        *self.supervisor_handle.lock() = Some(handle);
991    }
992
993    /// Create or tear down workers to match the current server list.
994    ///
995    /// For each enabled server, ensures exactly `server.connections` workers
996    /// exist. Extra workers (from a shrunk limit or disabled server) have
997    /// their per-worker shutdown flag flipped so they exit gracefully after
998    /// the current article.
999    pub fn reconcile_servers(self: &Arc<Self>) {
1000        if self.shutdown.load(Ordering::Relaxed) {
1001            return;
1002        }
1003
1004        let servers_snapshot: Vec<ServerConfig> = self.servers.lock().clone();
1005        let mut workers = self.workers.lock();
1006
1007        // First pass: retire workers for servers that are gone or disabled.
1008        let mut retire: Vec<String> = Vec::new();
1009        for key in workers.keys() {
1010            let still_active = servers_snapshot.iter().any(|s| s.enabled && &s.id == key);
1011            if !still_active {
1012                retire.push(key.clone());
1013            }
1014        }
1015        for key in retire {
1016            if let Some(list) = workers.remove(&key) {
1017                for w in list {
1018                    w.shutdown.store(true, Ordering::Relaxed);
1019                    // Don't await — workers check shutdown on next loop
1020                    // iteration and exit within ~WORKER_IDLE_POLL.
1021                    drop(w.handle);
1022                }
1023            }
1024        }
1025
1026        // Second pass: spawn or shrink to match target count per enabled server.
1027        for server in &servers_snapshot {
1028            if !server.enabled {
1029                continue;
1030            }
1031            let target = (server.connections as usize).min(500);
1032            let entry = workers.entry(server.id.clone()).or_default();
1033
1034            // Shrink: signal extras to exit.
1035            while entry.len() > target {
1036                if let Some(w) = entry.pop() {
1037                    w.shutdown.store(true, Ordering::Relaxed);
1038                    drop(w.handle);
1039                }
1040            }
1041
1042            // Grow: spawn new workers with stagger.
1043            let current = entry.len();
1044            for conn_idx in current..target {
1045                let worker_shutdown = Arc::new(AtomicBool::new(false));
1046                // Initialize heartbeat to *now* so the worker has a full
1047                // grace period before its first eviction check.
1048                let last_progress = Arc::new(AtomicU64::new(self.elapsed_ms()));
1049                let pool = Arc::clone(self);
1050                let server_clone = server.clone();
1051                let ws_clone = Arc::clone(&worker_shutdown);
1052                let lp_clone = Arc::clone(&last_progress);
1053                let handle = tokio::spawn(async move {
1054                    pool_worker(pool, server_clone, conn_idx, ws_clone, lp_clone).await;
1055                });
1056                entry.push(ActiveWorker {
1057                    shutdown: worker_shutdown,
1058                    last_progress,
1059                    handle,
1060                });
1061            }
1062        }
1063    }
1064
1065    /// Register a new job context and submit its unfinished articles to the
1066    /// shared queue. Called by QueueManager::launch_download.
1067    pub(crate) fn submit_job(self: &Arc<Self>, ctx: Arc<JobContext>, items: Vec<WorkItem>) {
1068        let job_id = ctx.job_id.clone();
1069        if items.is_empty() {
1070            // Nothing to do — emit completion immediately.
1071            ctx.emit_terminal();
1072            return;
1073        }
1074        self.job_contexts.lock().insert(job_id.clone(), ctx);
1075        self.work_queue.submit_items(items);
1076        debug!(job_id = %job_id, queue_len = self.work_queue.len(), "Job submitted to worker pool");
1077    }
1078
1079    /// Pause a job: workers stop pulling its items, and any item currently
1080    /// being held while paused is returned to the queue.
1081    pub fn pause_job(&self, job_id: &str) {
1082        if let Some(ctx) = self.job_contexts.lock().get(job_id) {
1083            ctx.paused.store(true, Ordering::Relaxed);
1084        }
1085    }
1086
1087    /// Resume a paused job.
1088    pub fn resume_job(&self, job_id: &str) {
1089        if let Some(ctx) = self.job_contexts.lock().get(job_id) {
1090            ctx.paused.store(false, Ordering::Relaxed);
1091            // Wake any workers that were idle waiting for work.
1092            self.work_queue.notify.notify_waiters();
1093        }
1094    }
1095
1096    /// Abort a job with a reason. Drains queued items, sets the abort flag,
1097    /// and emits JobAborted via the job's progress channel.
1098    pub fn abort_job(&self, job_id: &str, reason: String) {
1099        let ctx = self.job_contexts.lock().get(job_id).cloned();
1100        let Some(ctx) = ctx else {
1101            return;
1102        };
1103        *ctx.abort_reason.lock() = Some(reason);
1104        ctx.cancelled.store(true, Ordering::Relaxed);
1105        let drained = self.work_queue.drain_job(job_id);
1106        // Decrement the remaining counter for drained items so the terminal
1107        // callback fires if nothing is in-flight for this job.
1108        for _ in drained {
1109            ctx.resolve_one();
1110        }
1111        ctx.emit_terminal();
1112        self.job_contexts.lock().remove(job_id);
1113    }
1114
1115    /// Cancel a job silently (no JobFinished / JobAborted emission).
1116    /// Used by `remove_job` when the user deletes a job from the queue —
1117    /// the progress receiver is about to be dropped anyway.
1118    pub fn cancel_job(&self, job_id: &str) {
1119        let ctx = self.job_contexts.lock().remove(job_id);
1120        let Some(ctx) = ctx else {
1121            return;
1122        };
1123        ctx.cancelled.store(true, Ordering::Relaxed);
1124        let _ = self.work_queue.drain_job(job_id);
1125    }
1126
1127    /// Emit NoServersAvailable for a stuck job and unregister it.
1128    fn mark_no_servers(&self, job_id: &str, reason: String) {
1129        let ctx = self.job_contexts.lock().remove(job_id);
1130        let Some(ctx) = ctx else {
1131            return;
1132        };
1133        ctx.paused.store(true, Ordering::Relaxed);
1134        try_send_progress(
1135            &ctx.progress_tx,
1136            &ctx.job_id,
1137            ProgressUpdate::NoServersAvailable {
1138                job_id: ctx.job_id.clone(),
1139                reason,
1140            },
1141        );
1142        // Remove pending work for this job so other jobs aren't blocked.
1143        let _ = self.work_queue.drain_job(job_id);
1144    }
1145
1146    /// Supervisor loop: periodically detects jobs whose remaining articles
1147    /// cannot possibly be fetched (all enabled servers circuit-broken or
1148    /// Per-tick checks that maintain pool health: idle-worker eviction,
1149    /// dead-worker reaping, reconcile (respawn missing workers), starvation
1150    /// diagnostics, and the legacy "all servers broken" pause.
1151    async fn supervisor_loop(self: Arc<Self>) {
1152        let mut ticker = tokio::time::interval(SUPERVISOR_INTERVAL);
1153        loop {
1154            ticker.tick().await;
1155            if self.shutdown.load(Ordering::Relaxed) {
1156                break;
1157            }
1158
1159            // ---------- 1. Heartbeat eviction ----------
1160            // Compute idle threshold + current epoch outside the workers lock.
1161            let now_ms = self.elapsed_ms();
1162            let max_idle_ms = self.max_worker_idle().as_millis() as u64;
1163
1164            // Pre-compute per-server "has workable items now" so idle backup
1165            // workers aren't evicted when there's legitimately nothing for
1166            // them to do (waiting on higher-priority servers to fail first).
1167            // Computed once per tick, outside the workers lock, to avoid
1168            // lock-ordering hazards. Matches SABnzbd's idle-thread model
1169            // (sabnzbd/downloader.py — idle threads stay connected).
1170            let server_priorities: Vec<(String, u8)> = {
1171                let srv = self.servers.lock();
1172                srv.iter()
1173                    .filter(|s| s.enabled)
1174                    .map(|s| (s.id.clone(), s.priority))
1175                    .collect()
1176            };
1177            let has_workable: HashMap<String, bool> = server_priorities
1178                .iter()
1179                .map(|(sid, prio)| {
1180                    let hp = self.higher_priority_servers(*prio, sid);
1181                    let (workable, _) = self.work_queue.workable_count_for(sid, &hp);
1182                    (sid.clone(), workable > 0)
1183                })
1184                .collect();
1185
1186            {
1187                let workers = self.workers.lock();
1188                for (server_id, list) in workers.iter() {
1189                    for (idx, w) in list.iter().enumerate() {
1190                        if w.shutdown.load(Ordering::Relaxed) {
1191                            continue;
1192                        }
1193                        let last = w.last_progress.load(Ordering::Relaxed);
1194                        let idle = now_ms.saturating_sub(last);
1195                        if idle > max_idle_ms {
1196                            // Bug 2 fix: don't evict a worker whose server
1197                            // has no workable items. It's idle because it's
1198                            // waiting for its primary/higher-priority peers
1199                            // to fail articles — not because it's zombied.
1200                            // If the server isn't in the map at all (e.g.
1201                            // disabled mid-tick), default to true (evict).
1202                            if !has_workable.get(server_id).copied().unwrap_or(true) {
1203                                continue;
1204                            }
1205                            warn!(
1206                                server = %server_id,
1207                                worker_idx = idx,
1208                                idle_ms = idle,
1209                                max_idle_ms,
1210                                "Idle-worker watchdog: evicting stalled worker"
1211                            );
1212                            w.shutdown.store(true, Ordering::Relaxed);
1213                            self.evictions.fetch_add(1, Ordering::Relaxed);
1214                        }
1215                    }
1216                }
1217            }
1218
1219            // ---------- 2. Reap finished workers and respawn ----------
1220            {
1221                let mut workers = self.workers.lock();
1222                for (_id, list) in workers.iter_mut() {
1223                    list.retain(|w| !w.handle.is_finished());
1224                }
1225            }
1226            // Reconcile fills any gaps left by reaped workers.
1227            self.reconcile_servers();
1228
1229            // ---------- 3. Starvation diagnostic ----------
1230            // For each enabled server: if the queue has items but none are
1231            // workable for this server, log once per minute. "Not workable"
1232            // can mean either (a) every item has already been tried here, or
1233            // (b) every item is still waiting on a higher-priority server
1234            // (backup server legitimately idle — not a bug).
1235            let enabled_servers: Vec<String> =
1236                server_priorities.iter().map(|(id, _)| id.clone()).collect();
1237            let now_instant = Instant::now();
1238            for (sid, prio) in &server_priorities {
1239                let hp = self.higher_priority_servers(*prio, sid);
1240                let (workable, total) = self.work_queue.workable_count_for(sid, &hp);
1241                if workable == 0 && total > 0 {
1242                    let mut log = self.starvation_log.lock();
1243                    let should_log = log
1244                        .get(sid)
1245                        .map(|t| now_instant.duration_since(*t) >= Duration::from_secs(60))
1246                        .unwrap_or(true);
1247                    if should_log {
1248                        log.insert(sid.clone(), now_instant);
1249                        let reason = if hp.is_empty() {
1250                            "every item has been tried here already"
1251                        } else {
1252                            "every item has been tried here, or is waiting for a higher-priority server"
1253                        };
1254                        info!(
1255                            server = %sid,
1256                            total_items = total,
1257                            higher_priority_servers = hp.len(),
1258                            "Queue has items but none are workable for this server ({reason})"
1259                        );
1260                    }
1261                }
1262            }
1263
1264            // ---------- 4. Legacy "all servers broken" pause ----------
1265            if enabled_servers.is_empty() {
1266                continue;
1267            }
1268            let healthy_servers: Vec<String> = {
1269                let health = self.server_health.lock();
1270                enabled_servers
1271                    .iter()
1272                    .filter(|sid| health.get(sid.as_str()).is_none_or(|h| h.is_available()))
1273                    .cloned()
1274                    .collect()
1275            };
1276            let all_broken = healthy_servers.is_empty();
1277
1278            let ctxs: Vec<Arc<JobContext>> = self.job_contexts.lock().values().cloned().collect();
1279            for ctx in ctxs {
1280                if ctx.articles_remaining.load(Ordering::Relaxed) == 0 {
1281                    continue;
1282                }
1283                if ctx.cancelled.load(Ordering::Relaxed) {
1284                    continue;
1285                }
1286                if all_broken {
1287                    let reason = {
1288                        let health = self.server_health.lock();
1289                        health
1290                            .values()
1291                            .filter_map(|h| h.reason.clone())
1292                            .next()
1293                            .unwrap_or_else(|| "All servers unavailable".into())
1294                    };
1295                    warn!(
1296                        job_id = %ctx.job_id,
1297                        remaining = ctx.articles_remaining.load(Ordering::Relaxed),
1298                        "All servers circuit-broken — pausing job for user intervention"
1299                    );
1300                    self.mark_no_servers(&ctx.job_id, reason);
1301                }
1302            }
1303        }
1304    }
1305
1306    /// Shut down all workers gracefully. In-flight articles finish first.
1307    pub async fn shutdown(self: &Arc<Self>) {
1308        self.shutdown.store(true, Ordering::Relaxed);
1309        let handles: Vec<JoinHandle<()>> = {
1310            let mut workers = self.workers.lock();
1311            let mut out = Vec::new();
1312            for (_id, list) in workers.drain() {
1313                for w in list {
1314                    w.shutdown.store(true, Ordering::Relaxed);
1315                    out.push(w.handle);
1316                }
1317            }
1318            out
1319        };
1320        // Notify workers so any parked on notify.notified() wake up.
1321        self.work_queue.notify.notify_waiters();
1322
1323        let timeout = Duration::from_secs(10);
1324        for h in handles {
1325            let _ = tokio::time::timeout(timeout, h).await;
1326        }
1327
1328        if let Some(h) = self.supervisor_handle.lock().take() {
1329            h.abort();
1330        }
1331    }
1332
1333    pub fn conn_tracker(&self) -> &Arc<ConnectionTracker> {
1334        &self.conn_tracker
1335    }
1336
1337    /// Whether this job still has an active context in the pool.
1338    pub fn has_job(&self, job_id: &str) -> bool {
1339        self.job_contexts.lock().contains_key(job_id)
1340    }
1341}
1342
1343// ---------------------------------------------------------------------------
1344// Worker body
1345// ---------------------------------------------------------------------------
1346
1347/// Single pool worker. Owns an NNTP connection to `primary_server` and pulls
1348/// items from the shared queue until `worker_shutdown` is flipped (server
1349/// reconciled away, limit shrunk) or the pool shuts down.
1350///
1351/// The worker acquires its connection slot (a semaphore permit from the
1352/// per-server pool) **before** the first connect attempt and holds it for
1353/// the entire lifetime of the worker, across every reconnect. This bounds
1354/// concurrent connections to `server.connections` by construction.
1355async fn pool_worker(
1356    pool: Arc<WorkerPool>,
1357    primary_server: ServerConfig,
1358    conn_idx: usize,
1359    worker_shutdown: Arc<AtomicBool>,
1360    last_progress: Arc<AtomicU64>,
1361) {
1362    let worker_id = format!("{}#{}", primary_server.id, conn_idx);
1363
1364    // Stagger worker startup to avoid thundering herd of connections.
1365    if conn_idx > 0 {
1366        let stagger = WORKER_RAMP_DELAY * conn_idx as u32;
1367        tokio::time::sleep(stagger).await;
1368    }
1369
1370    let should_exit = |worker_shutdown: &Arc<AtomicBool>, pool: &Arc<WorkerPool>| {
1371        worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed)
1372    };
1373
1374    // Acquire the slot up-front. If the server isn't registered or its limit
1375    // is zero, the worker has nothing to do — exit.
1376    let mut conn_slot = match pool.conn_tracker.acquire(&primary_server.id).await {
1377        Some(slot) => slot,
1378        None => {
1379            info!(
1380                worker = %worker_id,
1381                server = %primary_server.name,
1382                "No connection slot available (server removed or limit=0); worker exiting"
1383            );
1384            return;
1385        }
1386    };
1387
1388    'reconnect: loop {
1389        if should_exit(&worker_shutdown, &pool) {
1390            return;
1391        }
1392
1393        // If the limit was shrunk under us, the semaphore will have been
1394        // replaced. Drop our (now-orphaned) slot and exit cleanly.
1395        match pool.conn_tracker.slot_status(&conn_slot) {
1396            SlotStatus::Current => {}
1397            SlotStatus::PoolReplaced => {
1398                info!(
1399                    worker = %worker_id,
1400                    server = %primary_server.name,
1401                    reason = "pool_replaced",
1402                    "Connection slot is stale (connection limit changed); worker exiting"
1403                );
1404                return;
1405            }
1406            SlotStatus::ServerRemoved => {
1407                info!(
1408                    worker = %worker_id,
1409                    server = %primary_server.name,
1410                    reason = "server_removed",
1411                    "Connection slot is stale (server removed from tracker); worker exiting"
1412                );
1413                return;
1414            }
1415        }
1416
1417        // Check circuit breaker before connecting. Compute an owned bool
1418        // so we don't hold the MutexGuard across an await point.
1419        let circuit_broken = {
1420            let health = pool.server_health.lock();
1421            health
1422                .get(&primary_server.id)
1423                .is_some_and(|h| !h.is_available())
1424        };
1425        if circuit_broken {
1426            tokio::time::sleep(WORKER_IDLE_POLL).await;
1427            continue 'reconnect;
1428        }
1429
1430        info!(
1431            worker = %worker_id,
1432            server = %primary_server.name,
1433            host = %primary_server.host,
1434            port = primary_server.port,
1435            ssl = primary_server.ssl,
1436            conn_idx,
1437            "Worker starting — connecting to primary server"
1438        );
1439
1440        let mut conn = NntpConnection::new(worker_id.clone());
1441        // Attach socket-liveness heartbeat BEFORE connect so every byte
1442        // received — from the welcome banner onward — counts as progress.
1443        // This is the fix for false-eviction of slow-but-working workers:
1444        // previously `last_progress` only advanced on full article decode,
1445        // so a worker receiving a 50-second article looked idle for the
1446        // entire fetch. Now any line read from the socket keeps it alive.
1447        // Matches SABnzbd's `nw.timeout` model (newswrapper.py:315).
1448        conn.set_io_heartbeat(last_progress.clone(), pool.created_at());
1449        if let Err(e) = connect_with_retry(
1450            &mut conn,
1451            &primary_server,
1452            &worker_id,
1453            &pool.server_health,
1454            &pool.servers,
1455        )
1456        .await
1457        {
1458            warn!(
1459                worker = %worker_id,
1460                server = %primary_server.name,
1461                host = %primary_server.host,
1462                "Worker FAILED to connect after all retries: {e}"
1463            );
1464            if should_exit(&worker_shutdown, &pool) {
1465                return;
1466            }
1467            tokio::time::sleep(RECONNECT_DELAY).await;
1468            continue 'reconnect;
1469        }
1470
1471        let pipe_depth = primary_server.pipelining.max(1);
1472        let active_conns = pool.conn_tracker.total();
1473        info!(
1474            worker = %worker_id,
1475            server = %primary_server.name,
1476            host = %primary_server.host,
1477            pipelining = pipe_depth,
1478            total_nntp_connections = active_conns,
1479            "Worker connected and ready"
1480        );
1481
1482        // NOTE: do not tick heartbeat here. Reconnects are *not* progress
1483        // — counting them masks the zombie cycle (worker reconnects forever
1484        // without ever decoding an article). The heartbeat is initialised
1485        // to spawn time in `reconcile_servers`, which gives every worker a
1486        // full grace period to first-connect and process its first article.
1487        let reconnect_needed = if pipe_depth <= 1 {
1488            run_worker_serial(
1489                &pool,
1490                &primary_server,
1491                &worker_id,
1492                &worker_shutdown,
1493                &mut conn,
1494                &mut conn_slot,
1495                &last_progress,
1496            )
1497            .await
1498        } else {
1499            run_worker_pipelined(
1500                &pool,
1501                &primary_server,
1502                &worker_id,
1503                pipe_depth,
1504                &worker_shutdown,
1505                &mut conn,
1506                &mut conn_slot,
1507                &last_progress,
1508            )
1509            .await
1510        };
1511
1512        let _ = conn.quit().await;
1513
1514        match reconnect_needed {
1515            WorkerExit::Reconnect => {
1516                // Loop back to the top and reconnect — slot is preserved.
1517                continue 'reconnect;
1518            }
1519            WorkerExit::Exit => {
1520                // Slot drops here when conn_slot goes out of scope.
1521                return;
1522            }
1523        }
1524    }
1525}
1526
1527enum WorkerExit {
1528    /// Exit the worker function entirely (server retired or pool shutdown).
1529    Exit,
1530    /// Reconnect and keep pulling work (transient connection loss).
1531    Reconnect,
1532}
1533
1534/// Wait for work, with early exit on shutdown / server retirement.
1535/// Returns `Some(item, ctx)` when a processable item is available, or `None`
1536/// if the worker should exit.
1537///
1538/// `higher_priority_servers` gates which items this server is allowed to take
1539/// — see `pop_workable` for the priority model.
1540async fn next_work_item(
1541    pool: &Arc<WorkerPool>,
1542    server_id: &str,
1543    higher_priority_servers: &[String],
1544    worker_shutdown: &Arc<AtomicBool>,
1545) -> Option<(WorkItem, Arc<JobContext>)> {
1546    loop {
1547        if worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed) {
1548            return None;
1549        }
1550
1551        if let Some(item) = pool
1552            .work_queue
1553            .pop_workable(server_id, higher_priority_servers)
1554        {
1555            // Look up the job context. If the job is gone or cancelled, drop
1556            // the item and keep going.
1557            let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
1558            let Some(ctx) = ctx else {
1559                continue;
1560            };
1561            if ctx.cancelled.load(Ordering::Relaxed) {
1562                continue;
1563            }
1564            // Respect per-job pause: return the item and wait.
1565            if ctx.paused.load(Ordering::Relaxed) {
1566                pool.work_queue.push_back(item);
1567                tokio::time::sleep(WORKER_IDLE_POLL).await;
1568                continue;
1569            }
1570            return Some((item, ctx));
1571        }
1572
1573        // Queue empty (or nothing workable for this server) — wait with a
1574        // timeout so we still notice shutdown and new work alike.
1575        let notified = pool.work_queue.notify.notified();
1576        tokio::select! {
1577            _ = notified => {}
1578            _ = tokio::time::sleep(WORKER_IDLE_POLL) => {}
1579        }
1580    }
1581}
1582
1583async fn run_worker_serial(
1584    pool: &Arc<WorkerPool>,
1585    primary_server: &ServerConfig,
1586    worker_id: &str,
1587    worker_shutdown: &Arc<AtomicBool>,
1588    conn: &mut NntpConnection,
1589    _conn_slot: &mut ConnectionSlot,
1590    last_progress: &Arc<AtomicU64>,
1591) -> WorkerExit {
1592    let mut consecutive_errors: u32 = 0;
1593
1594    loop {
1595        // Server runtime checks.
1596        let server_disabled = pool
1597            .servers
1598            .lock()
1599            .iter()
1600            .find(|s| s.id == primary_server.id)
1601            .is_none_or(|s| !s.enabled);
1602        if server_disabled {
1603            info!(
1604                worker = %worker_id,
1605                server = %primary_server.name,
1606                "Server disabled, worker exiting"
1607            );
1608            return WorkerExit::Exit;
1609        }
1610        {
1611            let health = pool.server_health.lock();
1612            if let Some(h) = health.get(&primary_server.id)
1613                && !h.is_available()
1614            {
1615                info!(
1616                    worker = %worker_id,
1617                    server = %primary_server.name,
1618                    reason = h.reason.as_deref().unwrap_or("unknown"),
1619                    "Server circuit-broken, worker reconnecting after cooldown"
1620                );
1621                return WorkerExit::Reconnect;
1622            }
1623        }
1624
1625        // Snapshot of healthy servers with strictly higher priority (lower
1626        // priority number). Used as the priority gate in pop_workable:
1627        // items waiting for a higher-priority server won't be dispatched
1628        // to this worker. Recomputed each loop iteration so runtime
1629        // priority / health changes are picked up. See SABnzbd
1630        // `Article.get_article` for the reference behaviour.
1631        let higher_priority_servers =
1632            pool.higher_priority_servers(primary_server.priority, &primary_server.id);
1633
1634        let Some((mut item, ctx)) = next_work_item(
1635            pool,
1636            &primary_server.id,
1637            &higher_priority_servers,
1638            worker_shutdown,
1639        )
1640        .await
1641        else {
1642            return WorkerExit::Exit;
1643        };
1644
1645        let fetch_fut =
1646            fetch_article_with_retry(conn, &item, &ctx.assembler, primary_server, worker_id);
1647        let result = if let Some(timeout) = pool.stall_timeout {
1648            match tokio::time::timeout(timeout, fetch_fut).await {
1649                Ok(r) => r,
1650                Err(_) => {
1651                    warn!(
1652                        worker = %worker_id,
1653                        server = %primary_server.name,
1654                        article = %item.message_id,
1655                        "Connection stalled — no response within {}s, reconnecting",
1656                        timeout.as_secs()
1657                    );
1658                    pool.work_queue.push_front(item);
1659                    return WorkerExit::Reconnect;
1660                }
1661            }
1662        } else {
1663            fetch_fut.await
1664        };
1665
1666        match result {
1667            Ok(process_result) => {
1668                consecutive_errors = 0;
1669                ctx.total_decode_us
1670                    .fetch_add(process_result.decode_us, Ordering::Relaxed);
1671                ctx.total_assemble_us
1672                    .fetch_add(process_result.assemble_us, Ordering::Relaxed);
1673                ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
1674                if let Some(ref yname) = process_result.yenc_filename {
1675                    ctx.yenc_names
1676                        .lock()
1677                        .entry(item.file_id.clone())
1678                        .or_insert_with(|| crate::util::normalize_nfc(yname));
1679                }
1680                if let Some(n) = std::num::NonZeroU32::new(process_result.decoded_bytes as u32) {
1681                    let _ = pool.bandwidth.acquire_download(n).await;
1682                }
1683                try_send_progress(
1684                    &ctx.progress_tx,
1685                    &item.job_id,
1686                    ProgressUpdate::ArticleComplete {
1687                        job_id: item.job_id.clone(),
1688                        file_id: item.file_id.clone(),
1689                        segment_number: item.segment_number,
1690                        decoded_bytes: process_result.decoded_bytes,
1691                        file_complete: process_result.file_complete,
1692                        server_id: Some(primary_server.id.clone()),
1693                    },
1694                );
1695                ctx.resolve_one();
1696                // Heartbeat: definitive forward progress.
1697                last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1698            }
1699            Err(ArticleError::ArticleNotFound) => {
1700                if handle_article_not_available(
1701                    &mut item,
1702                    primary_server,
1703                    &pool.servers,
1704                    &pool.server_health,
1705                    &ctx,
1706                    &pool.work_queue,
1707                    crate::article_failure::ArticleFailureKind::NotFound,
1708                    "Article not found on any server",
1709                ) {
1710                    last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1711                }
1712            }
1713            Err(ArticleError::ConnectionLost(msg)) => {
1714                consecutive_errors += 1;
1715                warn!(
1716                    worker = %worker_id,
1717                    server = %primary_server.name,
1718                    host = %primary_server.host,
1719                    consecutive_errors,
1720                    max_reconnects = MAX_RECONNECT_ATTEMPTS,
1721                    article = %item.message_id,
1722                    "Connection lost: {msg}"
1723                );
1724                pool.work_queue.push_front(item);
1725                if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
1726                    warn!(
1727                        worker = %worker_id,
1728                        server = %primary_server.name,
1729                        host = %primary_server.host,
1730                        consecutive_errors,
1731                        "Too many consecutive errors — worker reconnecting"
1732                    );
1733                    return WorkerExit::Reconnect;
1734                }
1735                return WorkerExit::Reconnect;
1736            }
1737            Err(ArticleError::DecodeError(msg)) => {
1738                if handle_article_not_available(
1739                    &mut item,
1740                    primary_server,
1741                    &pool.servers,
1742                    &pool.server_health,
1743                    &ctx,
1744                    &pool.work_queue,
1745                    crate::article_failure::ArticleFailureKind::DecodeError,
1746                    &format!("Decode error: {msg}"),
1747                ) {
1748                    last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1749                }
1750            }
1751            Err(ArticleError::AssemblyError(msg)) => {
1752                error!(article = %item.message_id, "Assembly error: {msg}");
1753                try_send_progress(
1754                    &ctx.progress_tx,
1755                    &item.job_id,
1756                    ProgressUpdate::ArticleFailed {
1757                        job_id: item.job_id.clone(),
1758                        file_id: item.file_id.clone(),
1759                        segment_number: item.segment_number,
1760                        failure: crate::article_failure::ArticleFailure::decode_error(
1761                            &primary_server.id,
1762                            format!("Assembly error: {msg}"),
1763                        ),
1764                    },
1765                );
1766                ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
1767                ctx.resolve_one();
1768                last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1769            }
1770        }
1771    }
1772}
1773
1774#[allow(clippy::too_many_arguments)]
1775async fn run_worker_pipelined(
1776    pool: &Arc<WorkerPool>,
1777    primary_server: &ServerConfig,
1778    worker_id: &str,
1779    pipe_depth: u8,
1780    worker_shutdown: &Arc<AtomicBool>,
1781    conn: &mut NntpConnection,
1782    _conn_slot: &mut ConnectionSlot,
1783    last_progress: &Arc<AtomicU64>,
1784) -> WorkerExit {
1785    let mut pipeline = Pipeline::new(pipe_depth);
1786    let mut in_flight_items: HashMap<u64, WorkItem> = HashMap::new();
1787    let mut next_tag: u64 = 0;
1788    let mut consecutive_errors: u32 = 0;
1789
1790    // Perf metrics
1791    let mut perf_articles: u64 = 0;
1792    let mut perf_bytes: u64 = 0;
1793    let mut perf_queue_lock_us: u64 = 0;
1794    let mut perf_receive_us: u64 = 0;
1795    let mut perf_decode_us: u64 = 0;
1796    let mut perf_assemble_us: u64 = 0;
1797    let mut perf_bandwidth_us: u64 = 0;
1798    let mut perf_yield_us: u64 = 0;
1799    let mut perf_flush_us: u64 = 0;
1800    let mut perf_last_log = Instant::now();
1801    const PERF_LOG_INTERVAL: Duration = Duration::from_secs(10);
1802
1803    loop {
1804        if worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed) {
1805            requeue_all(&mut in_flight_items, &pool.work_queue);
1806            return WorkerExit::Exit;
1807        }
1808
1809        // Server runtime checks.
1810        let server_disabled = pool
1811            .servers
1812            .lock()
1813            .iter()
1814            .find(|s| s.id == primary_server.id)
1815            .is_none_or(|s| !s.enabled);
1816        if server_disabled {
1817            info!(
1818                worker = %worker_id,
1819                server = %primary_server.name,
1820                "Server disabled, worker exiting"
1821            );
1822            requeue_all(&mut in_flight_items, &pool.work_queue);
1823            return WorkerExit::Exit;
1824        }
1825        {
1826            let health = pool.server_health.lock();
1827            if let Some(h) = health.get(&primary_server.id)
1828                && !h.is_available()
1829            {
1830                info!(
1831                    worker = %worker_id,
1832                    server = %primary_server.name,
1833                    reason = h.reason.as_deref().unwrap_or("unknown"),
1834                    "Server circuit-broken, worker reconnecting after cooldown"
1835                );
1836                requeue_all(&mut in_flight_items, &pool.work_queue);
1837                return WorkerExit::Reconnect;
1838            }
1839        }
1840
1841        // Snapshot of healthy servers with strictly higher priority. Gates
1842        // which items the pipeline-fill and wait-for-work paths are allowed
1843        // to take (see `pop_workable`). Recomputed each loop iteration so
1844        // runtime priority / health changes are picked up.
1845        let higher_priority_servers =
1846            pool.higher_priority_servers(primary_server.priority, &primary_server.id);
1847
1848        // Fill the pipeline.
1849        while pipeline.pending_count() + pipeline.in_flight_count() < pipe_depth as usize {
1850            let lock_t = Instant::now();
1851            let item = pool
1852                .work_queue
1853                .pop_workable(&primary_server.id, &higher_priority_servers);
1854            perf_queue_lock_us += lock_t.elapsed().as_micros() as u64;
1855            let Some(item) = item else {
1856                break;
1857            };
1858            // Look up ctx to respect pause/cancel.
1859            let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
1860            let Some(ctx) = ctx else {
1861                continue;
1862            };
1863            if ctx.cancelled.load(Ordering::Relaxed) {
1864                continue;
1865            }
1866            if ctx.paused.load(Ordering::Relaxed) {
1867                pool.work_queue.push_back(item);
1868                break;
1869            }
1870            let tag = next_tag;
1871            next_tag += 1;
1872            pipeline.submit(item.message_id.clone(), tag);
1873            in_flight_items.insert(tag, item);
1874        }
1875
1876        // If nothing is queued and nothing in flight, wait for work.
1877        if pipeline.is_empty() && in_flight_items.is_empty() {
1878            let Some((first_item, ctx)) = next_work_item(
1879                pool,
1880                &primary_server.id,
1881                &higher_priority_servers,
1882                worker_shutdown,
1883            )
1884            .await
1885            else {
1886                return WorkerExit::Exit;
1887            };
1888            let _ = ctx; // ctx is validated in next_work_item
1889            let tag = next_tag;
1890            next_tag += 1;
1891            pipeline.submit(first_item.message_id.clone(), tag);
1892            in_flight_items.insert(tag, first_item);
1893        }
1894
1895        let flush_t = Instant::now();
1896        if let Err(e) = pipeline.flush_sends(conn).await {
1897            warn!(
1898                worker = %worker_id,
1899                server = %primary_server.name,
1900                host = %primary_server.host,
1901                error = %e,
1902                in_flight = in_flight_items.len(),
1903                "Pipeline send error — re-queuing all in-flight items"
1904            );
1905            requeue_all(&mut in_flight_items, &pool.work_queue);
1906            consecutive_errors += 1;
1907            if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
1908                warn!(
1909                    worker = %worker_id,
1910                    server = %primary_server.name,
1911                    consecutive_errors,
1912                    "Too many pipeline errors — worker reconnecting"
1913                );
1914                return WorkerExit::Reconnect;
1915            }
1916            tokio::time::sleep(RECONNECT_DELAY).await;
1917            return WorkerExit::Reconnect;
1918        }
1919        perf_flush_us += flush_t.elapsed().as_micros() as u64;
1920
1921        // Read one response.
1922        let recv_t = Instant::now();
1923        trace!(
1924            worker = %worker_id,
1925            in_flight = in_flight_items.len(),
1926            stall_timeout_secs = pool.stall_timeout.map(|d| d.as_secs()).unwrap_or(0),
1927            "Pipeline: awaiting response"
1928        );
1929        let result = if let Some(timeout) = pool.stall_timeout {
1930            match tokio::time::timeout(timeout, pipeline.receive_one(conn)).await {
1931                Ok(r) => r,
1932                Err(_) => {
1933                    let elapsed_ms = recv_t.elapsed().as_millis();
1934                    warn!(
1935                        worker = %worker_id,
1936                        server = %primary_server.name,
1937                        elapsed_ms,
1938                        in_flight = in_flight_items.len(),
1939                        "Connection stalled — no response within {}s, reconnecting",
1940                        timeout.as_secs()
1941                    );
1942                    requeue_all(&mut in_flight_items, &pool.work_queue);
1943                    return WorkerExit::Reconnect;
1944                }
1945            }
1946        } else {
1947            pipeline.receive_one(conn).await
1948        };
1949        perf_receive_us += recv_t.elapsed().as_micros() as u64;
1950
1951        match result {
1952            Ok(Some(pipe_result)) => {
1953                let Some(mut item) = in_flight_items.remove(&pipe_result.request.tag) else {
1954                    continue;
1955                };
1956                // Look up the ctx for this item's job (may have been cancelled).
1957                let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
1958                let Some(ctx) = ctx else {
1959                    continue;
1960                };
1961                if ctx.cancelled.load(Ordering::Relaxed) {
1962                    continue;
1963                }
1964
1965                match pipe_result.result {
1966                    Ok(response) => {
1967                        consecutive_errors = 0;
1968                        let raw_data = response.data.unwrap_or_default();
1969                        let yield_t = Instant::now();
1970                        tokio::task::yield_now().await;
1971                        perf_yield_us += yield_t.elapsed().as_micros() as u64;
1972                        match decode_and_assemble(&item, &raw_data, &ctx.assembler) {
1973                            Ok(process_result) => {
1974                                perf_decode_us += process_result.decode_us;
1975                                perf_assemble_us += process_result.assemble_us;
1976                                perf_bytes += process_result.decoded_bytes;
1977                                perf_articles += 1;
1978                                ctx.total_decode_us
1979                                    .fetch_add(process_result.decode_us, Ordering::Relaxed);
1980                                ctx.total_assemble_us
1981                                    .fetch_add(process_result.assemble_us, Ordering::Relaxed);
1982                                ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
1983                                if let Some(ref yname) = process_result.yenc_filename {
1984                                    ctx.yenc_names
1985                                        .lock()
1986                                        .entry(item.file_id.clone())
1987                                        .or_insert_with(|| crate::util::normalize_nfc(yname));
1988                                }
1989                                let bw_t = Instant::now();
1990                                if let Some(n) =
1991                                    std::num::NonZeroU32::new(process_result.decoded_bytes as u32)
1992                                {
1993                                    let _ = pool.bandwidth.acquire_download(n).await;
1994                                }
1995                                perf_bandwidth_us += bw_t.elapsed().as_micros() as u64;
1996                                try_send_progress(
1997                                    &ctx.progress_tx,
1998                                    &item.job_id,
1999                                    ProgressUpdate::ArticleComplete {
2000                                        job_id: item.job_id.clone(),
2001                                        file_id: item.file_id.clone(),
2002                                        segment_number: item.segment_number,
2003                                        decoded_bytes: process_result.decoded_bytes,
2004                                        file_complete: process_result.file_complete,
2005                                        server_id: Some(primary_server.id.clone()),
2006                                    },
2007                                );
2008                                ctx.resolve_one();
2009                                // Heartbeat: definitive forward progress.
2010                                last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2011
2012                                if perf_last_log.elapsed() >= PERF_LOG_INTERVAL {
2013                                    let elapsed = perf_last_log.elapsed().as_secs_f64();
2014                                    let mbps = perf_bytes as f64 / elapsed / (1024.0 * 1024.0);
2015                                    info!(
2016                                        worker = %worker_id,
2017                                        articles = perf_articles,
2018                                        throughput_mbps = format!("{mbps:.1}"),
2019                                        recv_ms = perf_receive_us / 1000,
2020                                        decode_ms = perf_decode_us / 1000,
2021                                        assemble_ms = perf_assemble_us / 1000,
2022                                        queue_lock_ms = perf_queue_lock_us / 1000,
2023                                        flush_ms = perf_flush_us / 1000,
2024                                        yield_ms = perf_yield_us / 1000,
2025                                        bw_wait_ms = perf_bandwidth_us / 1000,
2026                                        "Worker perf summary"
2027                                    );
2028                                    perf_articles = 0;
2029                                    perf_bytes = 0;
2030                                    perf_queue_lock_us = 0;
2031                                    perf_receive_us = 0;
2032                                    perf_decode_us = 0;
2033                                    perf_assemble_us = 0;
2034                                    perf_bandwidth_us = 0;
2035                                    perf_yield_us = 0;
2036                                    perf_flush_us = 0;
2037                                    perf_last_log = Instant::now();
2038                                }
2039                            }
2040                            Err(ArticleError::DecodeError(msg)) => {
2041                                if handle_article_not_available(
2042                                    &mut item,
2043                                    primary_server,
2044                                    &pool.servers,
2045                                    &pool.server_health,
2046                                    &ctx,
2047                                    &pool.work_queue,
2048                                    crate::article_failure::ArticleFailureKind::DecodeError,
2049                                    &format!("Decode error: {msg}"),
2050                                ) {
2051                                    last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2052                                }
2053                            }
2054                            Err(ArticleError::AssemblyError(msg)) => {
2055                                error!(article = %item.message_id, "Assembly error: {msg}");
2056                                try_send_progress(
2057                                    &ctx.progress_tx,
2058                                    &item.job_id,
2059                                    ProgressUpdate::ArticleFailed {
2060                                        job_id: item.job_id.clone(),
2061                                        file_id: item.file_id.clone(),
2062                                        segment_number: item.segment_number,
2063                                        failure:
2064                                            crate::article_failure::ArticleFailure::decode_error(
2065                                                &primary_server.id,
2066                                                format!("Assembly error: {msg}"),
2067                                            ),
2068                                    },
2069                                );
2070                                ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
2071                                ctx.resolve_one();
2072                                last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2073                            }
2074                            Err(_) => {}
2075                        }
2076                    }
2077                    Err(NntpError::ArticleNotFound(_)) => {
2078                        if handle_article_not_available(
2079                            &mut item,
2080                            primary_server,
2081                            &pool.servers,
2082                            &pool.server_health,
2083                            &ctx,
2084                            &pool.work_queue,
2085                            crate::article_failure::ArticleFailureKind::NotFound,
2086                            "Article not found on any server",
2087                        ) {
2088                            last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2089                        }
2090                    }
2091                    Err(NntpError::Connection(_) | NntpError::Io(_)) => {
2092                        warn!(
2093                            worker = %worker_id,
2094                            server = %primary_server.name,
2095                            host = %primary_server.host,
2096                            article = %item.message_id,
2097                            in_flight = in_flight_items.len(),
2098                            consecutive_errors,
2099                            "Pipeline: connection lost during receive — re-queuing all"
2100                        );
2101                        pool.work_queue.push_front(item);
2102                        requeue_all(&mut in_flight_items, &pool.work_queue);
2103                        consecutive_errors += 1;
2104                        if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
2105                            return WorkerExit::Reconnect;
2106                        }
2107                        tokio::time::sleep(RECONNECT_DELAY).await;
2108                        return WorkerExit::Reconnect;
2109                    }
2110                    Err(e) => {
2111                        warn!(worker = %worker_id, article = %item.message_id, "Pipeline error: {e}");
2112                        let kind = crate::article_failure::ArticleFailure::from_nntp(
2113                            &e,
2114                            &primary_server.id,
2115                        )
2116                        .kind;
2117                        if handle_article_not_available(
2118                            &mut item,
2119                            primary_server,
2120                            &pool.servers,
2121                            &pool.server_health,
2122                            &ctx,
2123                            &pool.work_queue,
2124                            kind,
2125                            &format!("Pipeline error: {e}"),
2126                        ) {
2127                            last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2128                        }
2129                    }
2130                }
2131            }
2132            Ok(None) => {
2133                // No in-flight requests — loop will fill more.
2134            }
2135            Err(e) => {
2136                warn!(
2137                    worker = %worker_id,
2138                    server = %primary_server.name,
2139                    host = %primary_server.host,
2140                    error = %e,
2141                    in_flight = in_flight_items.len(),
2142                    consecutive_errors,
2143                    "Pipeline receive error"
2144                );
2145                requeue_all(&mut in_flight_items, &pool.work_queue);
2146                consecutive_errors += 1;
2147                if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
2148                    return WorkerExit::Reconnect;
2149                }
2150                tokio::time::sleep(RECONNECT_DELAY).await;
2151                return WorkerExit::Reconnect;
2152            }
2153        }
2154    }
2155}
2156
2157// ---------------------------------------------------------------------------
2158// Connection with retry
2159// ---------------------------------------------------------------------------
2160
2161async fn connect_with_retry(
2162    conn: &mut NntpConnection,
2163    server: &ServerConfig,
2164    worker_id: &str,
2165    server_health: &ServerHealthMap,
2166    all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
2167) -> Result<(), String> {
2168    for attempt in 1..=MAX_RECONNECT_ATTEMPTS {
2169        {
2170            let health = server_health.lock();
2171            if let Some(h) = health.get(&server.id)
2172                && !h.is_available()
2173            {
2174                return Err(format!(
2175                    "Server circuit-broken: {}",
2176                    h.reason.as_deref().unwrap_or("unknown")
2177                ));
2178            }
2179        }
2180
2181        let current_config = all_servers
2182            .lock()
2183            .iter()
2184            .find(|s| s.id == server.id)
2185            .cloned()
2186            .unwrap_or_else(|| server.clone());
2187
2188        info!(
2189            worker = %worker_id,
2190            server = %current_config.name,
2191            host = %current_config.host,
2192            port = current_config.port,
2193            attempt,
2194            max_attempts = MAX_RECONNECT_ATTEMPTS,
2195            "Connect attempt starting"
2196        );
2197        match conn.connect(&current_config).await {
2198            Ok(()) => {
2199                info!(
2200                    worker = %worker_id,
2201                    server = %current_config.name,
2202                    host = %current_config.host,
2203                    attempt,
2204                    "Connect attempt succeeded"
2205                );
2206                server_health
2207                    .lock()
2208                    .entry(server.id.clone())
2209                    .or_default()
2210                    .record_success();
2211                return Ok(());
2212            }
2213            Err(e) => {
2214                let is_auth = matches!(e, NntpError::Auth(_) | NntpError::ServiceUnavailable(_));
2215                {
2216                    let mut health = server_health.lock();
2217                    let entry = health.entry(server.id.clone()).or_default();
2218                    entry.record_failure(is_auth, &e.to_string());
2219                    if !entry.is_available() {
2220                        warn!(
2221                            worker = %worker_id,
2222                            server = %current_config.name,
2223                            host = %current_config.host,
2224                            error = %e,
2225                            cooldown_secs = if is_auth { AUTH_FAILURE_COOLDOWN.as_secs() } else { TRANSIENT_FAILURE_COOLDOWN.as_secs() },
2226                            "Server circuit-broken — stopping all connection attempts"
2227                        );
2228                        return Err(format!("Server circuit-broken: {e}"));
2229                    }
2230                }
2231
2232                warn!(
2233                    worker = %worker_id,
2234                    server = %current_config.name,
2235                    host = %current_config.host,
2236                    attempt,
2237                    max_attempts = MAX_RECONNECT_ATTEMPTS,
2238                    error = %e,
2239                    is_auth,
2240                    "Connect attempt FAILED: {e}"
2241                );
2242
2243                if is_auth {
2244                    return Err(format!("Auth/permission failure: {e}"));
2245                }
2246
2247                if attempt < MAX_RECONNECT_ATTEMPTS {
2248                    info!(
2249                        worker = %worker_id,
2250                        server = %current_config.name,
2251                        delay_secs = RECONNECT_DELAY.as_secs(),
2252                        "Waiting before retry"
2253                    );
2254                    tokio::time::sleep(RECONNECT_DELAY).await;
2255                    *conn = NntpConnection::new(worker_id.to_string());
2256                } else {
2257                    return Err(format!(
2258                        "All {MAX_RECONNECT_ATTEMPTS} connect attempts failed: {e}"
2259                    ));
2260                }
2261            }
2262        }
2263    }
2264    Err("Connect retry loop exited unexpectedly".into())
2265}
2266
2267// ---------------------------------------------------------------------------
2268// Helpers: re-queue, not-available routing, par2 sort key
2269// ---------------------------------------------------------------------------
2270
2271/// Handle an article that's not available on this server (not found, decode
2272/// error, etc.): mark the server as tried and either requeue or mark failed.
2273///
2274/// `kind` lets the failure be classified — if every server has been tried
2275/// and the failure was per-server (NotFound, ServerDown, …), we promote it
2276/// to a definitive `NotFound` for the hopeless tracker. DecodeError stays
2277/// classified as DecodeError because it's typically not server-specific.
2278#[allow(clippy::too_many_arguments)]
2279fn handle_article_not_available(
2280    item: &mut WorkItem,
2281    primary_server: &ServerConfig,
2282    all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
2283    server_health: &ServerHealthMap,
2284    ctx: &Arc<JobContext>,
2285    work_queue: &Arc<SharedWorkQueue>,
2286    kind: crate::article_failure::ArticleFailureKind,
2287    error_msg: &str,
2288) -> bool {
2289    item.tried_servers.push(primary_server.id.clone());
2290    item.tries_on_current = 0;
2291
2292    let all_tried = {
2293        let servers = all_servers.lock();
2294        let health = server_health.lock();
2295        servers.iter().filter(|s| s.enabled).all(|s| {
2296            item.tried_servers.contains(&s.id)
2297                || health.get(&s.id).is_some_and(|h| !h.is_available())
2298        })
2299    };
2300
2301    debug!(
2302        article = %item.message_id,
2303        server = %primary_server.id,
2304        kind = kind.as_str(),
2305        tried_count = item.tried_servers.len(),
2306        all_tried,
2307        "Article returned error on this server"
2308    );
2309
2310    // (debug log immediately below was added for observability)
2311    if all_tried {
2312        warn!(article = %item.message_id, kind = kind.as_str(), "{error_msg}");
2313        // Promote a per-server NotFound to a definitive NotFound now that
2314        // every server has been exhausted. DecodeError keeps its kind.
2315        let final_failure = if kind == crate::article_failure::ArticleFailureKind::DecodeError {
2316            crate::article_failure::ArticleFailure::decode_error(
2317                &primary_server.id,
2318                error_msg.to_string(),
2319            )
2320        } else {
2321            crate::article_failure::ArticleFailure::not_found_anywhere(&primary_server.id)
2322        };
2323        try_send_progress(
2324            &ctx.progress_tx,
2325            &item.job_id,
2326            ProgressUpdate::ArticleFailed {
2327                job_id: item.job_id.clone(),
2328                file_id: item.file_id.clone(),
2329                segment_number: item.segment_number,
2330                failure: final_failure,
2331            },
2332        );
2333        ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
2334        ctx.resolve_one();
2335        true
2336    } else {
2337        // push_FRONT (not push_back): put the failed item at the front of the
2338        // queue so the next eligible server picks it up IMMEDIATELY instead
2339        // of queueing behind thousands of fresh items. Same-server workers
2340        // rotate the item back via pop_workable's existing skip-and-push_back
2341        // logic. This transforms a retention-dead NZB from "11+ minutes per
2342        // article's full-server cascade" into "fractions of a second per
2343        // cascade" — the dominant failure mode for hung downloads.
2344        work_queue.push_front(item.clone());
2345        false
2346    }
2347}
2348
2349/// Re-queue all in-flight items back to the work queue (on connection loss).
2350fn requeue_all(in_flight: &mut HashMap<u64, WorkItem>, work_queue: &Arc<SharedWorkQueue>) {
2351    let items: Vec<WorkItem> = in_flight.drain().map(|(_, item)| item).collect();
2352    for item in items {
2353        work_queue.push_front(item);
2354    }
2355}
2356
2357/// Sort key for work-queue prioritisation of PAR2 files. Index files (0)
2358/// first, then volume files (1), then data files (2).
2359fn par2_sort_key(filename: &str) -> u8 {
2360    let lower = filename.to_lowercase();
2361    if lower.ends_with(".par2") {
2362        if lower.contains(".vol") { 1 } else { 0 }
2363    } else {
2364        2
2365    }
2366}
2367
2368fn has_known_extension(name: &str) -> bool {
2369    let lower = name.to_lowercase();
2370    if let Some(dot_pos) = lower.rfind('.') {
2371        let ext = &lower[dot_pos + 1..];
2372        matches!(
2373            ext,
2374            "rar"
2375                | "r00"
2376                | "r01"
2377                | "r02"
2378                | "r03"
2379                | "r04"
2380                | "r05"
2381                | "zip"
2382                | "7z"
2383                | "gz"
2384                | "bz2"
2385                | "xz"
2386                | "tar"
2387                | "mkv"
2388                | "mp4"
2389                | "avi"
2390                | "wmv"
2391                | "ts"
2392                | "m4v"
2393                | "mov"
2394                | "mpg"
2395                | "mpeg"
2396                | "mp3"
2397                | "flac"
2398                | "ogg"
2399                | "m4a"
2400                | "aac"
2401                | "wav"
2402                | "srt"
2403                | "sub"
2404                | "idx"
2405                | "ass"
2406                | "ssa"
2407                | "sup"
2408                | "nfo"
2409                | "jpg"
2410                | "jpeg"
2411                | "png"
2412                | "gif"
2413                | "bmp"
2414                | "par2"
2415                | "001"
2416                | "002"
2417                | "003"
2418                | "004"
2419                | "005"
2420        )
2421    } else {
2422        false
2423    }
2424}
2425
2426// ---------------------------------------------------------------------------
2427// Public helper used by queue_manager: build work items + context for a job
2428// ---------------------------------------------------------------------------
2429
2430/// Build the WorkItems for a job's unfinished articles and an initialised
2431/// JobContext. Called by QueueManager before [`WorkerPool::submit_job`].
2432pub(crate) fn build_job_submission(
2433    job: &NzbJob,
2434    progress_tx: mpsc::Sender<ProgressUpdate>,
2435) -> (Arc<JobContext>, Vec<WorkItem>) {
2436    let assembler = Arc::new(FileAssembler::new());
2437    for file in &job.files {
2438        let output_path = job.work_dir.join(&file.filename);
2439        if let Err(e) =
2440            assembler.register_file(&job.id, &file.id, output_path, file.articles.len() as u32)
2441        {
2442            error!(file = %file.filename, "Failed to register file for assembly: {e}");
2443        }
2444    }
2445
2446    let work_items: Vec<WorkItem> = job
2447        .files
2448        .iter()
2449        .flat_map(|file| {
2450            file.articles
2451                .iter()
2452                .enumerate()
2453                .filter(|(_, a)| !a.downloaded)
2454                .map(move |(idx, article)| WorkItem {
2455                    job_id: job.id.clone(),
2456                    file_id: file.id.clone(),
2457                    filename: file.filename.clone(),
2458                    message_id: article.message_id.clone(),
2459                    segment_number: (idx as u32) + 1,
2460                    tried_servers: Vec::new(),
2461                    tries_on_current: 0,
2462                })
2463        })
2464        .collect();
2465
2466    let total_remaining = work_items.len();
2467    let ctx = Arc::new(JobContext::new(
2468        job,
2469        assembler,
2470        progress_tx,
2471        total_remaining,
2472    ));
2473    (ctx, work_items)
2474}
2475
2476// ---------------------------------------------------------------------------
2477// Article fetch with per-server retry
2478// ---------------------------------------------------------------------------
2479
2480async fn fetch_article_with_retry(
2481    conn: &mut NntpConnection,
2482    item: &WorkItem,
2483    assembler: &FileAssembler,
2484    _server: &ServerConfig,
2485    worker_id: &str,
2486) -> Result<ProcessResult, ArticleError> {
2487    let mut last_error = None;
2488
2489    for attempt in 1..=MAX_TRIES_PER_SERVER {
2490        let fetch_start = Instant::now();
2491        match conn.fetch_article(&item.message_id).await {
2492            Ok(response) => {
2493                let fetch_us = fetch_start.elapsed().as_micros();
2494                let raw_data = response.data.unwrap_or_default();
2495                debug!(
2496                    worker = %worker_id,
2497                    article = %item.message_id,
2498                    raw_bytes = raw_data.len(),
2499                    fetch_us,
2500                    "NNTP fetch complete"
2501                );
2502                return decode_and_assemble(item, &raw_data, assembler);
2503            }
2504            Err(NntpError::ArticleNotFound(_)) => {
2505                debug!(
2506                    worker = %worker_id,
2507                    article = %item.message_id,
2508                    "Article not found (430) — will try next server"
2509                );
2510                return Err(ArticleError::ArticleNotFound);
2511            }
2512            Err(e @ (NntpError::Connection(_) | NntpError::Io(_))) => {
2513                warn!(
2514                    worker = %worker_id,
2515                    article = %item.message_id,
2516                    attempt,
2517                    error = %e,
2518                    conn_state = ?conn.state,
2519                    "Connection/IO error during fetch — connection lost"
2520                );
2521                return Err(ArticleError::ConnectionLost(format!(
2522                    "Connection error on attempt {attempt}: {e}"
2523                )));
2524            }
2525            Err(e @ NntpError::Tls(_)) => {
2526                warn!(
2527                    worker = %worker_id,
2528                    article = %item.message_id,
2529                    attempt,
2530                    error = %e,
2531                    "TLS error during fetch — connection lost"
2532                );
2533                return Err(ArticleError::ConnectionLost(format!("TLS error: {e}")));
2534            }
2535            Err(e @ NntpError::ServiceUnavailable(_)) => {
2536                warn!(
2537                    worker = %worker_id,
2538                    article = %item.message_id,
2539                    attempt,
2540                    error = %e,
2541                    "Service unavailable (502) during article fetch — likely rate limited or blocked"
2542                );
2543                return Err(ArticleError::ConnectionLost(format!(
2544                    "Service unavailable: {e}"
2545                )));
2546            }
2547            Err(e @ NntpError::AuthRequired(_)) => {
2548                warn!(
2549                    worker = %worker_id,
2550                    article = %item.message_id,
2551                    attempt,
2552                    error = %e,
2553                    "Auth required (480) during article fetch — session expired or rate limited"
2554                );
2555                return Err(ArticleError::ConnectionLost(format!(
2556                    "Auth required mid-session: {e}"
2557                )));
2558            }
2559            Err(e) => {
2560                last_error = Some(format!("{e}"));
2561                if attempt < MAX_TRIES_PER_SERVER {
2562                    warn!(
2563                        worker = %worker_id,
2564                        article = %item.message_id,
2565                        attempt,
2566                        max_tries = MAX_TRIES_PER_SERVER,
2567                        error = %e,
2568                        "Transient fetch error, retrying in 500ms"
2569                    );
2570                    tokio::time::sleep(Duration::from_millis(500)).await;
2571                } else {
2572                    warn!(
2573                        worker = %worker_id,
2574                        article = %item.message_id,
2575                        attempt,
2576                        error = %e,
2577                        "All retries on this server exhausted"
2578                    );
2579                }
2580            }
2581        }
2582    }
2583
2584    Err(ArticleError::DecodeError(
2585        last_error.unwrap_or_else(|| "Unknown error after retries".into()),
2586    ))
2587}
2588
2589// ---------------------------------------------------------------------------
2590// Article processing
2591// ---------------------------------------------------------------------------
2592
2593#[derive(Debug)]
2594struct ProcessResult {
2595    decoded_bytes: u64,
2596    file_complete: bool,
2597    yenc_filename: Option<String>,
2598    decode_us: u64,
2599    assemble_us: u64,
2600}
2601
2602#[derive(Debug, thiserror::Error)]
2603enum ArticleError {
2604    #[error("Article not found on server")]
2605    ArticleNotFound,
2606    #[error("Connection lost: {0}")]
2607    ConnectionLost(String),
2608    #[error("Decode error: {0}")]
2609    DecodeError(String),
2610    #[error("Assembly error: {0}")]
2611    AssemblyError(String),
2612}
2613
2614fn decode_and_assemble(
2615    item: &WorkItem,
2616    raw_data: &[u8],
2617    assembler: &FileAssembler,
2618) -> Result<ProcessResult, ArticleError> {
2619    let decode_start = Instant::now();
2620    let decoded = decode_yenc(raw_data).map_err(|e| {
2621        ArticleError::DecodeError(format!(
2622            "yEnc decode failed for {} seg {}: {e}",
2623            item.filename, item.segment_number
2624        ))
2625    })?;
2626    let decode_us = decode_start.elapsed().as_micros();
2627
2628    let yenc_filename = decoded.filename;
2629    let data_begin = decoded.part_begin.unwrap_or(0);
2630    let decoded_len = decoded.data.len() as u64;
2631
2632    let assemble_start = Instant::now();
2633    let file_complete = assembler
2634        .assemble_article(
2635            &item.job_id,
2636            &item.file_id,
2637            item.segment_number,
2638            data_begin,
2639            &decoded.data,
2640        )
2641        .map_err(|e| {
2642            ArticleError::AssemblyError(format!(
2643                "Assembly failed for {} seg {}: {e}",
2644                item.filename, item.segment_number
2645            ))
2646        })?;
2647    let assemble_us = assemble_start.elapsed().as_micros();
2648
2649    debug!(
2650        file = %item.filename,
2651        segment = item.segment_number,
2652        raw_bytes = raw_data.len(),
2653        decoded_bytes = decoded_len,
2654        decode_us,
2655        assemble_us,
2656        "Article decode+assemble timing"
2657    );
2658
2659    Ok(ProcessResult {
2660        decoded_bytes: decoded_len,
2661        file_complete,
2662        yenc_filename,
2663        decode_us: decode_us as u64,
2664        assemble_us: assemble_us as u64,
2665    })
2666}
2667
2668// ---------------------------------------------------------------------------
2669// Tests
2670// ---------------------------------------------------------------------------
2671
2672#[cfg(test)]
2673mod tests {
2674    use super::*;
2675
2676    #[test]
2677    fn has_known_extension_recognizes_archives() {
2678        assert!(has_known_extension("movie.rar"));
2679        assert!(has_known_extension("movie.part01.rar"));
2680        assert!(has_known_extension("file.zip"));
2681        assert!(has_known_extension("file.7z"));
2682        assert!(has_known_extension("archive.001"));
2683    }
2684
2685    #[test]
2686    fn has_known_extension_recognizes_video() {
2687        assert!(has_known_extension("episode.mkv"));
2688        assert!(has_known_extension("movie.mp4"));
2689        assert!(has_known_extension("video.avi"));
2690        assert!(has_known_extension("clip.ts"));
2691    }
2692
2693    #[test]
2694    fn has_known_extension_recognizes_par2() {
2695        assert!(has_known_extension("file.par2"));
2696        assert!(has_known_extension("file.vol00+01.par2"));
2697        assert!(has_known_extension("file.vol015-031.par2"));
2698    }
2699
2700    #[test]
2701    fn has_known_extension_recognizes_misc() {
2702        assert!(has_known_extension("info.nfo"));
2703        assert!(has_known_extension("sub.srt"));
2704        assert!(has_known_extension("cover.jpg"));
2705        assert!(has_known_extension("song.flac"));
2706    }
2707
2708    #[test]
2709    fn has_known_extension_rejects_obfuscated_hashes() {
2710        assert!(!has_known_extension("9b6a324d7560b87091685020371ba869"));
2711        assert!(!has_known_extension("1fG1GP7L2263LHXH213HTNIxZsX7l0cv44BZ"));
2712        assert!(!has_known_extension("DfKUx3bl7L6PSo6276WSaXSZ7"));
2713        assert!(!has_known_extension("Q77O1ZxL237vc241z77hFoLBxl"));
2714    }
2715
2716    #[test]
2717    fn has_known_extension_rejects_unknown_extensions() {
2718        assert!(!has_known_extension("file.xyz123"));
2719        assert!(!has_known_extension("noext"));
2720        assert!(!has_known_extension(""));
2721    }
2722
2723    #[test]
2724    fn has_known_extension_case_insensitive() {
2725        assert!(has_known_extension("file.RAR"));
2726        assert!(has_known_extension("file.MKV"));
2727        assert!(has_known_extension("file.Par2"));
2728        assert!(has_known_extension("file.MP4"));
2729    }
2730
2731    fn make_item(job_id: &str, msg_id: &str, filename: &str) -> WorkItem {
2732        WorkItem {
2733            job_id: job_id.to_string(),
2734            file_id: "f1".to_string(),
2735            filename: filename.to_string(),
2736            message_id: msg_id.to_string(),
2737            segment_number: 1,
2738            tried_servers: Vec::new(),
2739            tries_on_current: 0,
2740        }
2741    }
2742
2743    #[test]
2744    fn shared_queue_par2_first() {
2745        let q = SharedWorkQueue::new();
2746        q.submit_items(vec![
2747            make_item("j1", "a", "movie.rar"),
2748            make_item("j1", "b", "movie.par2"),
2749            make_item("j1", "c", "movie.vol00+01.par2"),
2750            make_item("j1", "d", "movie.r00"),
2751        ]);
2752        let first = q.pop_workable("srv1", &[]).unwrap();
2753        assert_eq!(first.filename, "movie.par2", "index file first");
2754        let second = q.pop_workable("srv1", &[]).unwrap();
2755        assert_eq!(second.filename, "movie.vol00+01.par2", "vol file second");
2756    }
2757
2758    #[test]
2759    fn shared_queue_skips_tried_servers() {
2760        let q = SharedWorkQueue::new();
2761        let mut item = make_item("j1", "a", "file.rar");
2762        item.tried_servers.push("srv1".to_string());
2763        q.submit_items(vec![item, make_item("j1", "b", "other.rar")]);
2764
2765        // srv1 should skip the first item and return the second.
2766        let picked = q.pop_workable("srv1", &[]).unwrap();
2767        assert_eq!(picked.message_id, "b");
2768    }
2769
2770    #[test]
2771    fn pop_workable_respects_priority() {
2772        // Fresh item (tried_servers empty). A backup-priority caller whose
2773        // higher_priority_servers list is non-empty must NOT get the item —
2774        // the primary server still needs a chance first.
2775        let q = SharedWorkQueue::new();
2776        q.submit_items(vec![make_item("j1", "a", "file.rar")]);
2777
2778        let higher = vec!["srv_primary".to_string()];
2779        assert!(q.pop_workable("srv_backup", &higher).is_none());
2780
2781        // Primary server (empty higher list) should still get it.
2782        let item = q.pop_workable("srv_primary", &[]).unwrap();
2783        assert_eq!(item.message_id, "a");
2784    }
2785
2786    #[test]
2787    fn pop_workable_allows_backup_after_primary_tried() {
2788        // Once the primary has been added to tried_servers (because it
2789        // failed), the backup is allowed to take the item.
2790        let q = SharedWorkQueue::new();
2791        let mut item = make_item("j1", "a", "file.rar");
2792        item.tried_servers.push("srv_primary".to_string());
2793        q.submit_items(vec![item]);
2794
2795        let higher = vec!["srv_primary".to_string()];
2796        let picked = q.pop_workable("srv_backup", &higher).unwrap();
2797        assert_eq!(picked.message_id, "a");
2798    }
2799
2800    #[test]
2801    fn pop_workable_ignores_circuit_broken_higher_server() {
2802        // When the caller's `higher_priority_servers` list is empty because
2803        // the primary was filtered out as circuit-broken/disabled, the backup
2804        // gets items immediately — no waiting for the dead primary.
2805        let q = SharedWorkQueue::new();
2806        q.submit_items(vec![make_item("j1", "a", "file.rar")]);
2807
2808        let higher: Vec<String> = vec![]; // primary filtered out by caller
2809        let item = q.pop_workable("srv_backup", &higher).unwrap();
2810        assert_eq!(item.message_id, "a");
2811    }
2812
2813    #[test]
2814    fn workable_count_for_respects_priority() {
2815        let q = SharedWorkQueue::new();
2816        q.submit_items(vec![
2817            make_item("j1", "a", "a.rar"),
2818            make_item("j1", "b", "b.rar"),
2819        ]);
2820
2821        // Backup: both items need primary first → workable=0.
2822        let higher = vec!["srv_primary".to_string()];
2823        let (workable, total) = q.workable_count_for("srv_backup", &higher);
2824        assert_eq!(workable, 0);
2825        assert_eq!(total, 2);
2826
2827        // Primary: both items are workable.
2828        let (workable, total) = q.workable_count_for("srv_primary", &[]);
2829        assert_eq!(workable, 2);
2830        assert_eq!(total, 2);
2831    }
2832
2833    #[test]
2834    fn shared_queue_drain_job_removes_only_target() {
2835        let q = SharedWorkQueue::new();
2836        q.submit_items(vec![
2837            make_item("j1", "a", "a.rar"),
2838            make_item("j2", "b", "b.rar"),
2839            make_item("j1", "c", "c.rar"),
2840        ]);
2841        let drained = q.drain_job("j1");
2842        assert_eq!(drained.len(), 2);
2843        assert_eq!(q.len(), 1);
2844        let remaining = q.pop_workable("srv1", &[]).unwrap();
2845        assert_eq!(remaining.job_id, "j2");
2846    }
2847
2848    // -----------------------------------------------------------------------
2849    // Per-job round-robin fairness
2850    // -----------------------------------------------------------------------
2851
2852    #[test]
2853    fn pop_workable_alternates_between_jobs_on_single_server() {
2854        // Prod scenario: two jobs both have many workable items; a single
2855        // server must not drain one job entirely before touching the other.
2856        let q = SharedWorkQueue::new();
2857        q.submit_items(vec![
2858            make_item("j1", "a1", "a1.rar"),
2859            make_item("j1", "a2", "a2.rar"),
2860            make_item("j1", "a3", "a3.rar"),
2861            make_item("j2", "b1", "b1.rar"),
2862            make_item("j2", "b2", "b2.rar"),
2863            make_item("j2", "b3", "b3.rar"),
2864        ]);
2865        // Expect alternation: j1, j2, j1, j2, j1, j2.
2866        let mut order: Vec<String> = Vec::new();
2867        while let Some(item) = q.pop_workable("srv1", &[]) {
2868            order.push(item.job_id);
2869        }
2870        assert_eq!(
2871            order,
2872            vec!["j1", "j2", "j1", "j2", "j1", "j2"],
2873            "single-server pops must alternate across jobs, not drain one"
2874        );
2875    }
2876
2877    #[test]
2878    fn pop_workable_falls_back_when_only_same_job_is_available() {
2879        // Round-robin PREFERS the other job but doesn't forbid same-job when
2880        // that's all that's eligible.
2881        let q = SharedWorkQueue::new();
2882        q.submit_items(vec![
2883            make_item("j1", "a1", "a.rar"),
2884            make_item("j1", "a2", "b.rar"),
2885        ]);
2886        let first = q.pop_workable("srv1", &[]).unwrap();
2887        assert_eq!(first.job_id, "j1");
2888        let second = q.pop_workable("srv1", &[]).unwrap();
2889        assert_eq!(
2890            second.job_id, "j1",
2891            "falls back to same job when no sibling"
2892        );
2893    }
2894
2895    #[test]
2896    fn per_server_cursors_are_independent() {
2897        // Two servers; cursor state is tracked per server so one server's
2898        // round-robin choice doesn't bias the other.
2899        let q = SharedWorkQueue::new();
2900        q.submit_items(vec![
2901            make_item("j1", "a1", "a1.rar"),
2902            make_item("j2", "b1", "b1.rar"),
2903            make_item("j1", "a2", "a2.rar"),
2904            make_item("j2", "b2", "b2.rar"),
2905        ]);
2906        // srv_x has no cursor → picks first eligible = j1-a1.
2907        let x1 = q.pop_workable("srv_x", &[]).unwrap();
2908        assert_eq!(x1.job_id, "j1");
2909        // srv_x's cursor is now j1 → next pop wants != j1 = j2-b1.
2910        let x2 = q.pop_workable("srv_x", &[]).unwrap();
2911        assert_eq!(x2.job_id, "j2");
2912        // srv_y has never popped — independent from srv_x's j2 cursor. Picks
2913        // first eligible in the remaining queue = j1-a2.
2914        let y1 = q.pop_workable("srv_y", &[]).unwrap();
2915        assert_eq!(
2916            y1.job_id, "j1",
2917            "srv_y has its own cursor state; srv_x's j2 cursor must not leak"
2918        );
2919    }
2920
2921    #[test]
2922    fn fairness_respects_tried_servers_and_priority() {
2923        // The fairness preference must not override eligibility: a "preferred
2924        // other-job" item that the server has already tried cannot be picked
2925        // just because of round-robin. Same for priority-gated items.
2926        let q = SharedWorkQueue::new();
2927        let mut j2_tried = make_item("j2", "b1", "b1.rar");
2928        j2_tried.tried_servers.push("srv1".to_string());
2929        q.submit_items(vec![make_item("j1", "a1", "a1.rar"), j2_tried]);
2930        // First pop: j1 (no cursor, first eligible).
2931        let first = q.pop_workable("srv1", &[]).unwrap();
2932        assert_eq!(first.job_id, "j1");
2933        // Cursor now points at j1. Fairness wants j2. But j2's item was
2934        // tried by srv1 already → must fall through, returning None.
2935        assert!(
2936            q.pop_workable("srv1", &[]).is_none(),
2937            "must not serve an ineligible item just to satisfy fairness"
2938        );
2939    }
2940
2941    #[test]
2942    fn drained_jobs_clear_last_served_cursor() {
2943        // When a job is drained (cancelled), its entry in the last_served
2944        // map should be cleared so future pops aren't biased toward an
2945        // extinct job.
2946        let q = SharedWorkQueue::new();
2947        q.submit_items(vec![
2948            make_item("j1", "a1", "a1.rar"),
2949            make_item("j2", "b1", "b1.rar"),
2950        ]);
2951        let _ = q.pop_workable("srv1", &[]).unwrap(); // serves j1, cursor=j1
2952        q.drain_job("j1");
2953        // With j1 gone and last_served cleared, the next pop is unbiased
2954        // and simply returns the first eligible item — j2.
2955        let pick = q.pop_workable("srv1", &[]).unwrap();
2956        assert_eq!(pick.job_id, "j2");
2957    }
2958
2959    // -----------------------------------------------------------------------
2960    // ConnectionTracker (Phase 4 — semaphore-backed slots)
2961    // -----------------------------------------------------------------------
2962
2963    #[tokio::test]
2964    async fn connection_tracker_acquire_releases_slot_on_drop() {
2965        let t = ConnectionTracker::new();
2966        t.set_limit("srv1", "Server 1", 2);
2967        let s1 = t.acquire("srv1").await.unwrap();
2968        let s2 = t.acquire("srv1").await.unwrap();
2969        assert_eq!(t.total(), 2);
2970        // Synchronous release on drop.
2971        drop(s1);
2972        assert_eq!(t.total(), 1);
2973        drop(s2);
2974        assert_eq!(t.total(), 0);
2975    }
2976
2977    #[tokio::test]
2978    async fn connection_tracker_blocks_at_limit() {
2979        let t = Arc::new(ConnectionTracker::new());
2980        t.set_limit("srv1", "Server 1", 1);
2981        let _held = t.acquire("srv1").await.unwrap();
2982
2983        // Third acquire on a 1-slot pool must block. Wrap with a short
2984        // timeout — if it does NOT time out, the cap was breached.
2985        let t2 = Arc::clone(&t);
2986        let res = tokio::time::timeout(Duration::from_millis(150), async move {
2987            t2.acquire("srv1").await
2988        })
2989        .await;
2990        assert!(
2991            res.is_err(),
2992            "second acquire should block while limit is reached"
2993        );
2994    }
2995
2996    #[tokio::test]
2997    async fn connection_tracker_grow_in_place_lets_more_acquire() {
2998        let t = ConnectionTracker::new();
2999        t.set_limit("srv1", "Server 1", 2);
3000        let _a = t.acquire("srv1").await.unwrap();
3001        let _b = t.acquire("srv1").await.unwrap();
3002        // No more capacity — but grow to 4 and we should be able to take 2
3003        // more without releasing the existing slots.
3004        t.set_limit("srv1", "Server 1", 4);
3005        let _c = t.acquire("srv1").await.unwrap();
3006        let _d = t.acquire("srv1").await.unwrap();
3007        assert_eq!(t.total(), 4);
3008    }
3009
3010    #[tokio::test]
3011    async fn connection_tracker_shrink_marks_old_slots_stale() {
3012        let t = ConnectionTracker::new();
3013        t.set_limit("srv1", "Server 1", 4);
3014        let s = t.acquire("srv1").await.unwrap();
3015        assert!(t.slot_is_current(&s));
3016
3017        // Shrink to 1 — old semaphore is replaced.
3018        t.set_limit("srv1", "Server 1", 1);
3019        assert!(
3020            !t.slot_is_current(&s),
3021            "after shrink, the previously-acquired slot must be marked stale"
3022        );
3023
3024        // The new pool starts empty (1 permit available, 0 in use). Old
3025        // permit holder is no longer counted in `total()` because its
3026        // semaphore is orphaned.
3027        assert_eq!(t.total(), 0);
3028
3029        // We can still acquire from the new pool.
3030        let new_slot = t.acquire("srv1").await.unwrap();
3031        assert!(t.slot_is_current(&new_slot));
3032        assert_eq!(t.total(), 1);
3033    }
3034
3035    #[tokio::test]
3036    async fn connection_tracker_remove_server_marks_slot_stale() {
3037        let t = ConnectionTracker::new();
3038        t.set_limit("srv1", "Server 1", 2);
3039        let s = t.acquire("srv1").await.unwrap();
3040        assert!(t.slot_is_current(&s));
3041
3042        t.remove_server("srv1");
3043        assert!(
3044            !t.slot_is_current(&s),
3045            "after remove_server, the slot must be marked stale"
3046        );
3047        assert_eq!(t.total(), 0);
3048    }
3049
3050    #[tokio::test]
3051    async fn connection_tracker_snapshot_reflects_active_count() {
3052        let t = ConnectionTracker::new();
3053        t.set_limit("srv1", "Server 1", 3);
3054        t.set_limit("srv2", "Server 2", 5);
3055
3056        let _a1 = t.acquire("srv1").await.unwrap();
3057        let _a2 = t.acquire("srv1").await.unwrap();
3058        let _b1 = t.acquire("srv2").await.unwrap();
3059
3060        let mut snap = t.snapshot();
3061        snap.sort_by(|a, b| a.0.cmp(&b.0));
3062        assert_eq!(snap.len(), 2);
3063        assert_eq!(snap[0], ("srv1".into(), 2, 3));
3064        assert_eq!(snap[1], ("srv2".into(), 1, 5));
3065    }
3066}