Skip to main content

nzb_dispatch/
news_engine.rs

1//! `NewsDispatchEngine` — `DispatchEngine` impl backed by the `nzb-news` crate.
2//!
3//! The layered news engine is a pure NNTP fetch layer: it takes per-article
4//! work items and emits [`nzb_news::FetchOutcome`]s. This adapter bolts the
5//! rest of the pipeline on top of it so it satisfies the contract the old
6//! `WorkerPool` engine used to satisfy:
7//!
8//! 1. **Fetch** — delegated to `nzb_news::spawn_downloader`.
9//! 2. **Decode** — `nzb_decode::decode_yenc` on each successful outcome.
10//! 3. **Assemble** — `FileAssembler::assemble_article` writes the decoded
11//!    bytes at the yEnc-declared offset.
12//! 4. **Progress** — translates per-article outcomes into
13//!    [`ProgressUpdate::ArticleComplete`] / [`ProgressUpdate::ArticleFailed`];
14//!    drives job-level terminal via `JobContext::resolve_one`.
15//!
16//! Per-job lifecycle (pause/resume/cancel/abort) is tracked in this adapter
17//! because the news engine is job-agnostic. We keep a `JobContext` per job
18//! (same struct the old engine used — it owns the assembler, progress
19//! channel, deobfuscation state, and terminal-emit logic).
20//!
21//! MVP limitations — marked with TODO comments:
22//! - `pause_job` / `resume_job` are no-ops (work items are submitted
23//!   eagerly; pause-gating is a follow-up).
24//! - `reconcile_servers` is a no-op (nzb-news doesn't expose mid-flight
25//!   server reconfiguration yet; requires a downloader rebuild).
26//! - `set_max_worker_idle` / `eviction_count` are stubs (no idle-worker
27//!   pool concept in nzb-news).
28
29use std::collections::{HashMap, VecDeque};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::time::{Duration, Instant};
33
34use parking_lot::{Mutex, RwLock};
35use tokio::sync::{Notify, mpsc};
36use tracing::{debug, info};
37
38use nzb_core::models::NzbJob;
39use nzb_nntp::config::ServerConfig;
40
41use crate::article_failure::{ArticleFailure, ArticleFailureKind};
42use crate::dispatch_engine::DispatchEngine;
43use crate::download_engine::{JobContext, ProgressUpdate, build_job_submission};
44
45// ---------------------------------------------------------------------------
46// Tuning knobs
47// ---------------------------------------------------------------------------
48
49/// How many articles the nzb-news downloader will hold in-flight across all
50/// servers at once. Matches the old engine's rough ceiling.
51const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 40;
52
53/// Work channel depth inside nzb-news. Articles are buffered here between
54/// `submit_job` enqueue and the per-server fan-out.
55const DEFAULT_WORK_CHANNEL_CAPACITY: usize = 4096;
56
57/// Outcome channel depth. Must be large enough that a momentary backlog in
58/// the decode path doesn't block the fetch loop.
59const DEFAULT_OUTCOME_CHANNEL_CAPACITY: usize = 4096;
60
61// ---------------------------------------------------------------------------
62// Config
63// ---------------------------------------------------------------------------
64
65/// Configuration for [`NewsDispatchEngine`]. Mirrors the knobs exposed by
66/// the old engine so the swap is drop-in from the caller's perspective.
67///
68/// `servers` is held as an `Arc<Mutex<_>>` so the caller (queue manager) can
69/// mutate it and [`DispatchEngine::reconcile_servers`] will pick up the new
70/// list without requiring a new config instance or a full engine rebuild.
71#[derive(Clone)]
72pub struct NewsEngineConfig {
73    pub servers: Arc<Mutex<Vec<ServerConfig>>>,
74    pub article_timeout: Duration,
75    pub max_concurrent_fetches: usize,
76    pub work_channel_capacity: usize,
77    pub outcome_channel_capacity: usize,
78    /// Optional backup-server probe policy. Forwarded verbatim to
79    /// `nzb_news::DownloaderConfig::probe_policy`.
80    ///
81    /// `None` disables probing (every cascade article tries every server).
82    /// `Some(_)` enables fast-fail on a backup server when the probed
83    /// hit-rate falls below the threshold for that job.
84    ///
85    /// Defaults to `Some(ServerProbePolicy::default())` which matches the
86    /// nzb-news default (probe 10 articles, require >=10% hits).
87    pub probe_policy: Option<nzb_news::ServerProbePolicy>,
88}
89
90impl NewsEngineConfig {
91    /// Construct a config from an owned server list. Wraps the list in an
92    /// `Arc<Mutex<_>>` internally; if the caller already owns a shared Arc
93    /// (e.g. queue manager's live server list), use
94    /// [`NewsEngineConfig::with_shared_servers`] instead so mutations are
95    /// visible to the engine.
96    pub fn new(servers: Vec<ServerConfig>, article_timeout: Duration) -> Self {
97        Self::with_shared_servers(Arc::new(Mutex::new(servers)), article_timeout)
98    }
99
100    /// Construct a config sharing an existing `Arc<Mutex<Vec<ServerConfig>>>`
101    /// with the caller. Mutating the Arc from outside and then calling
102    /// [`DispatchEngine::reconcile_servers`] rebuilds the downloader with
103    /// the latest server list — this is how live "add/remove server"
104    /// operations reach the fetch layer.
105    pub fn with_shared_servers(
106        servers: Arc<Mutex<Vec<ServerConfig>>>,
107        article_timeout: Duration,
108    ) -> Self {
109        Self {
110            servers,
111            article_timeout,
112            max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
113            work_channel_capacity: DEFAULT_WORK_CHANNEL_CAPACITY,
114            outcome_channel_capacity: DEFAULT_OUTCOME_CHANNEL_CAPACITY,
115            probe_policy: Some(nzb_news::ServerProbePolicy::default()),
116        }
117    }
118}
119
120// ---------------------------------------------------------------------------
121// Adapter state
122// ---------------------------------------------------------------------------
123
124/// Shared state held behind an `Arc` so the outcome-dispatcher task can
125/// access jobs while the engine is owned by the caller.
126struct Inner {
127    config: NewsEngineConfig,
128    /// Populated by `start()`. Holds the downloader's work-submission sender.
129    handle: RwLock<Option<nzb_news::DownloaderHandle>>,
130    /// Job map: `job_id` → per-job state. Cloned into the outcome task.
131    jobs: RwLock<HashMap<String, Arc<JobEntry>>>,
132    /// Monotonic tag issued for every `WorkItem` submitted to the downloader.
133    /// Also serves as the routing key back to the originating article when
134    /// outcomes come out the other side.
135    next_tag: AtomicU64,
136    /// `tag` → in-flight article metadata. We remove on outcome; this is the
137    /// only place that holds the file_id / segment_number for an article
138    /// mid-flight. Cleared on job cancel to free memory fast.
139    in_flight: RwLock<HashMap<u64, InFlight>>,
140}
141
142/// Per-job state owned by the adapter.
143struct JobEntry {
144    /// Reused from the old engine — owns the assembler, progress channel,
145    /// deobfuscation state, and terminal emit logic. Everything the adapter
146    /// needs on the success/failure path is already a field here.
147    context: Arc<JobContext>,
148    /// When true, the pump task holds items in `pending` instead of
149    /// forwarding them to the downloader. In-flight articles still
150    /// complete — pause gates the *next* work only.
151    paused: AtomicBool,
152    /// Set by `cancel_job`. The pump exits and drains the queue.
153    cancelled: AtomicBool,
154    /// Work items waiting to be handed to the downloader. `submit_job`
155    /// pushes all items here; the pump task drains on start and on
156    /// resume.
157    pending: Mutex<VecDeque<nzb_news::WorkItem>>,
158    /// Wake-up signal for the pump task. Notified when `submit_job` adds
159    /// items or when `resume_job` / `cancel_job` changes the gate.
160    pump_wake: Notify,
161}
162
163/// Metadata recorded when a `WorkItem` is dispatched so we can route the
164/// outcome back to the right job / file / segment.
165#[derive(Clone)]
166struct InFlight {
167    job_id: String,
168    file_id: String,
169    segment_number: u32,
170}
171
172// ---------------------------------------------------------------------------
173// NewsDispatchEngine
174// ---------------------------------------------------------------------------
175
176/// `DispatchEngine` impl backed by the layered nzb-news fetch engine.
177pub struct NewsDispatchEngine {
178    inner: Arc<Inner>,
179}
180
181impl NewsDispatchEngine {
182    /// Construct the engine. Does **not** spawn the downloader —
183    /// [`DispatchEngine::start`] does that.
184    pub fn new(config: NewsEngineConfig) -> Self {
185        Self {
186            inner: Arc::new(Inner {
187                config,
188                handle: RwLock::new(None),
189                jobs: RwLock::new(HashMap::new()),
190                next_tag: AtomicU64::new(1),
191                in_flight: RwLock::new(HashMap::new()),
192            }),
193        }
194    }
195}
196
197#[async_trait::async_trait]
198impl DispatchEngine for NewsDispatchEngine {
199    fn start(&self) {
200        let mut slot = self.inner.handle.write();
201        if slot.is_some() {
202            return; // idempotent
203        }
204
205        let servers_snapshot = self.inner.config.servers.lock().clone();
206        if servers_snapshot.is_empty() {
207            // Deferred start: with zero servers, spawning the downloader
208            // would create an internal work queue with no per-server
209            // workers, and any items pushed would sit in limbo.
210            // `reconcile_servers` will start it once servers are added.
211            info!("NewsDispatchEngine start deferred — no servers configured");
212            return;
213        }
214        spawn_and_install_downloader(&self.inner, &mut slot, servers_snapshot);
215    }
216
217    fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>) {
218        // Reuse the old engine's job-submission builder: creates the
219        // FileAssembler, registers files, and filters out already-downloaded
220        // articles. We only use the returned `JobContext` — the WorkItem
221        // vec it produces is in the old engine's format; we build nzb-news
222        // work items fresh below.
223        let (ctx, legacy_items) = build_job_submission(job, progress_tx);
224
225        // Build nzb-news wrapper types. One NzbObject for the job, one
226        // NzbFile per file, and one Article per work item.
227        let news_files: Vec<Arc<nzb_news::NzbFile>> = job
228            .files
229            .iter()
230            .map(|f| {
231                Arc::new(nzb_news::NzbFile::new(
232                    &f.id,
233                    &job.id,
234                    &f.filename,
235                    f.articles.len() as u32,
236                ))
237            })
238            .collect();
239        let news_files_by_id: HashMap<String, Arc<nzb_news::NzbFile>> = news_files
240            .iter()
241            .map(|nf| (nf.id.clone(), Arc::clone(nf)))
242            .collect();
243        let total_articles = legacy_items.len() as u64;
244        let news_job = Arc::new(nzb_news::NzbObject::new(
245            &job.id,
246            &job.name,
247            total_articles,
248            job.total_bytes,
249            news_files.clone(),
250        ));
251
252        // Convert each legacy WorkItem into an nzb-news WorkItem, recording
253        // the routing metadata into in_flight and the item itself into the
254        // job's pending queue. The pump task forwards from pending to the
255        // downloader, gated by the `paused` flag.
256        let mut pending = VecDeque::with_capacity(legacy_items.len());
257        let tag_counter = &self.inner.next_tag;
258        for item in legacy_items {
259            let tag = tag_counter.fetch_add(1, Ordering::Relaxed);
260            let file = match news_files_by_id.get(&item.file_id) {
261                Some(f) => Arc::clone(f),
262                None => continue, // shouldn't happen — file_id came from the same job
263            };
264            self.inner.in_flight.write().insert(
265                tag,
266                InFlight {
267                    job_id: item.job_id.clone(),
268                    file_id: item.file_id.clone(),
269                    segment_number: item.segment_number,
270                },
271            );
272            let article = Arc::new(nzb_news::Article::new(
273                item.message_id.clone(),
274                item.file_id.clone(),
275                item.job_id.clone(),
276                0,
277                item.segment_number,
278                tag,
279            ));
280            pending.push_back(nzb_news::WorkItem {
281                tag,
282                article,
283                file,
284                job: Arc::clone(&news_job),
285            });
286        }
287
288        let entry = Arc::new(JobEntry {
289            context: Arc::clone(&ctx),
290            paused: AtomicBool::new(false),
291            cancelled: AtomicBool::new(false),
292            pending: Mutex::new(pending),
293            pump_wake: Notify::new(),
294        });
295        self.inner
296            .jobs
297            .write()
298            .insert(ctx.job_id.clone(), Arc::clone(&entry));
299
300        // Spawn the pump. It acquires a sender from the engine handle on
301        // each iteration and parks if the downloader is absent — so
302        // submitting a job before `start()` (or during a 0-server startup
303        // window) is safe: items wait in `pending` until `reconcile_servers`
304        // spawns the downloader.
305        let job_id = job.id.clone();
306        tokio::spawn(pump_loop(entry, Arc::clone(&self.inner), job_id));
307    }
308
309    fn pause_job(&self, job_id: &str) {
310        if let Some(entry) = self.inner.jobs.read().get(job_id) {
311            // Local gate — stops the pump from handing new items to nzb-news.
312            entry.paused.store(true, Ordering::SeqCst);
313        }
314        // Scheduler-level gate — holds already-submitted articles in
315        // nzb-news's own pending queue. Without this, anything already
316        // accepted into `work_channel_capacity` (default 4096) would still
317        // route to servers despite the local gate.
318        if let Some(h) = self.inner.handle.read().as_ref() {
319            h.pause_job(job_id);
320        }
321        debug!(job_id, "paused");
322    }
323
324    fn resume_job(&self, job_id: &str) {
325        if let Some(entry) = self.inner.jobs.read().get(job_id) {
326            entry.paused.store(false, Ordering::SeqCst);
327            entry.pump_wake.notify_waiters();
328        }
329        if let Some(h) = self.inner.handle.read().as_ref() {
330            h.resume_job(job_id);
331        }
332        debug!(job_id, "resumed");
333    }
334
335    fn cancel_job(&self, job_id: &str) {
336        let entry = self.inner.jobs.write().remove(job_id);
337        if let Some(entry) = entry {
338            // Signal pump to drain + exit.
339            entry.cancelled.store(true, Ordering::SeqCst);
340            entry.pump_wake.notify_waiters();
341            // Drop any not-yet-dispatched items so the pump sees an empty
342            // queue and exits promptly.
343            entry.pending.lock().clear();
344            // Clear in-flight entries for this job so stale outcomes are
345            // dropped silently by the dispatcher (unknown-tag path).
346            self.inner
347                .in_flight
348                .write()
349                .retain(|_, m| m.job_id != job_id);
350            // Purge nzb-news scheduler-level state: items already accepted
351            // into the downloader's work_channel or pending list get emitted
352            // as Cancelled outcomes and removed. Without this, a cancelled
353            // job would keep routing its buffered articles to servers.
354            if let Some(h) = self.inner.handle.read().as_ref() {
355                h.purge_job(job_id);
356            }
357            debug!(job_id, "cancelled");
358        }
359    }
360
361    fn abort_job(&self, job_id: &str, reason: String) {
362        // Emit terminal via the existing JobContext machinery — same path
363        // the old engine uses. `emit_terminal` is idempotent; cancel_job
364        // later is safe.
365        let entry = self.inner.jobs.read().get(job_id).cloned();
366        if let Some(entry) = entry {
367            *entry.context.abort_reason.lock() = Some(reason);
368            entry.context.emit_terminal_public();
369        }
370        self.cancel_job(job_id);
371    }
372
373    fn has_job(&self, job_id: &str) -> bool {
374        self.inner.jobs.read().contains_key(job_id)
375    }
376
377    fn reconcile_servers(&self) {
378        // Rebuild the downloader with the current server list.
379        //
380        // First-time (0 → N): when `start` was deferred for lack of
381        // servers, pump_loops parked waiting for a handle. Spawning the
382        // downloader here and notifying pumps resumes dispatch cleanly —
383        // no items are lost because `pump_loop` leaves unsent items in
384        // `pending` until a sender is available.
385        //
386        // Reconfigure (N → M, N > 0): the downloader is rebuilt and the
387        // old one shut down. Articles already in the old downloader's
388        // internal queue that had not completed may be lost; their job
389        // will stall until nzb-news grows a dynamic-server API. For the
390        // common "add/edit server" UI flows this is rare in practice and
391        // the user can retry a stalled job manually. Documented as a
392        // limitation rather than a silent partial failure.
393        let servers_snapshot = self.inner.config.servers.lock().clone();
394        let server_count = servers_snapshot.len();
395
396        let old_handle = if servers_snapshot.is_empty() {
397            // Remove handle; pumps will park until a server is added.
398            self.inner.handle.write().take()
399        } else {
400            let mut slot = self.inner.handle.write();
401            let old = slot.take();
402            spawn_and_install_downloader(&self.inner, &mut slot, servers_snapshot);
403            old
404        };
405
406        if let Some(old) = old_handle {
407            old.shutdown();
408        }
409
410        // Wake all pump loops so they re-read the handle and either pick
411        // up the new sender or park on `pump_wake` until one arrives.
412        let entries: Vec<Arc<JobEntry>> = self.inner.jobs.read().values().map(Arc::clone).collect();
413        for entry in entries {
414            entry.pump_wake.notify_waiters();
415        }
416
417        info!(
418            servers = server_count,
419            "NewsDispatchEngine reconciled server list"
420        );
421    }
422
423    fn set_max_worker_idle(&self, _d: Duration) {
424        // No per-worker idle concept in nzb-news; workers are persistent
425        // until the downloader shuts down.
426    }
427
428    fn eviction_count(&self) -> u64 {
429        0
430    }
431
432    fn server_stats_snapshot(&self) -> Vec<(String, crate::dispatch_engine::ServerAttemptStats)> {
433        let guard = self.inner.handle.read();
434        let Some(h) = guard.as_ref() else {
435            return Vec::new();
436        };
437        h.server_stats_snapshot()
438            .into_iter()
439            .map(|(id, s)| {
440                (
441                    id,
442                    crate::dispatch_engine::ServerAttemptStats {
443                        attempted: s.attempted,
444                        succeeded: s.succeeded,
445                        not_found: s.not_found,
446                        transient_failed: s.transient_failed,
447                    },
448                )
449            })
450            .collect()
451    }
452
453    async fn shutdown(&self) {
454        let handle = self.inner.handle.write().take();
455        if let Some(h) = handle {
456            h.shutdown();
457            h.join().await;
458        }
459    }
460}
461
462// ---------------------------------------------------------------------------
463// Outcome dispatcher
464// ---------------------------------------------------------------------------
465
466/// Main loop: consume `FetchOutcome`s from nzb-news and translate each into
467/// a `ProgressUpdate`, doing decode + assembly inline on success. Runs until
468/// the outcome channel is closed (downloader shutdown).
469async fn outcome_dispatcher(
470    inner: Arc<Inner>,
471    mut outcomes: mpsc::Receiver<nzb_news::FetchOutcome>,
472) {
473    while let Some(outcome) = outcomes.recv().await {
474        match outcome {
475            nzb_news::FetchOutcome::Success {
476                tag,
477                server_id,
478                bytes,
479                article_bytes: _,
480            } => {
481                // Spawn each success so decode+assemble runs in parallel.
482                // The old engine got this for free because every worker did
483                // its own fetch+decode+assemble — centralising here would
484                // serialise all post-fetch work to a single task.
485                let inner2 = Arc::clone(&inner);
486                tokio::spawn(async move {
487                    process_success(inner2, tag, server_id, bytes).await;
488                });
489            }
490            nzb_news::FetchOutcome::Failed { tag, last_error } => {
491                process_failure(&inner, tag, last_error);
492            }
493            nzb_news::FetchOutcome::Cancelled { tag } => {
494                // Treat as benign discard — caller (queue manager) will
495                // observe JobAborted separately via abort_job.
496                inner.in_flight.write().remove(&tag);
497            }
498        }
499    }
500    debug!("outcome_dispatcher exiting: channel closed");
501}
502
503async fn process_success(inner: Arc<Inner>, tag: u64, server_id: String, raw: Vec<u8>) {
504    let meta = inner.in_flight.write().remove(&tag);
505    let Some(meta) = meta else {
506        return; // stale / cancelled
507    };
508
509    let entry = inner.jobs.read().get(&meta.job_id).cloned();
510    let Some(entry) = entry else {
511        return; // job cancelled after submit
512    };
513    let ctx = &entry.context;
514
515    // Decode (CPU-bound; SIMD is fast but not free).
516    let decode_start = Instant::now();
517    let decoded = match nzb_decode::decode_yenc(&raw) {
518        Ok(d) => d,
519        Err(e) => {
520            let failure = ArticleFailure::decode_error(server_id, format!("yEnc decode: {e}"));
521            emit_failed(ctx, &meta, failure);
522            return;
523        }
524    };
525    let decode_us = decode_start.elapsed().as_micros() as u64;
526
527    // Record yEnc filename for deobfuscation.
528    if let Some(ref fname) = decoded.filename
529        && !fname.is_empty()
530    {
531        ctx.yenc_names
532            .lock()
533            .insert(meta.file_id.clone(), fname.clone());
534    }
535
536    let data_begin = decoded.part_begin.unwrap_or(0);
537
538    // Assemble.
539    let assemble_start = Instant::now();
540    let file_complete = match ctx.assembler.assemble_article(
541        &meta.job_id,
542        &meta.file_id,
543        meta.segment_number,
544        data_begin,
545        &decoded.data,
546    ) {
547        Ok(b) => b,
548        Err(e) => {
549            let failure = ArticleFailure::decode_error(server_id, format!("assembly: {e}"));
550            emit_failed(ctx, &meta, failure);
551            return;
552        }
553    };
554    let assemble_us = assemble_start.elapsed().as_micros() as u64;
555
556    // Timing stats.
557    ctx.total_decode_us.fetch_add(decode_us, Ordering::Relaxed);
558    ctx.total_assemble_us
559        .fetch_add(assemble_us, Ordering::Relaxed);
560    ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
561
562    // Emit progress.
563    let decoded_bytes = decoded.data.len() as u64;
564    let _ = ctx.progress_tx.try_send(ProgressUpdate::ArticleComplete {
565        job_id: meta.job_id.clone(),
566        file_id: meta.file_id.clone(),
567        segment_number: meta.segment_number,
568        decoded_bytes,
569        file_complete,
570        server_id: Some(server_id),
571    });
572
573    ctx.resolve_one_public();
574}
575
576fn process_failure(inner: &Inner, tag: u64, last_error: Option<String>) {
577    let meta = inner.in_flight.write().remove(&tag);
578    let Some(meta) = meta else {
579        return;
580    };
581    let entry = inner.jobs.read().get(&meta.job_id).cloned();
582    let Some(entry) = entry else {
583        return;
584    };
585    let msg = last_error.unwrap_or_else(|| "all servers exhausted".into());
586    // nzb-news doesn't carry structured error info at the outcome layer —
587    // only the last attempt's error string. Pattern-match common causes
588    // so the hopeless-tracker and queue_manager can distinguish "server
589    // is broken/quota-exhausted" (transient, don't count toward hopeless)
590    // from "article genuinely missing everywhere" (counts toward
591    // hopeless). Without this, an auth/quota failure trickles through as
592    // NotFound and aborts the job with "articles confirmed missing" —
593    // confusing diagnostics that blame the content instead of the server.
594    let kind = classify_error_message(&msg);
595    let failure = ArticleFailure {
596        kind,
597        server_id: String::new(),
598        message: msg,
599    };
600    emit_failed(&entry.context, &meta, failure);
601}
602
603/// Map an opaque nzb-news error string to a typed [`ArticleFailureKind`].
604///
605/// The strings come from `nzb_nntp::error::NntpError` (via nzb-news) and are
606/// the only signal we have at this layer — nzb-news's `FetchOutcome` carries
607/// `Option<String>` rather than a structured kind. Order of checks matters:
608/// more specific patterns are tested first.
609fn classify_error_message(msg: &str) -> ArticleFailureKind {
610    let m = msg.to_ascii_lowercase();
611    // NNTP response codes in the message body are the strongest signal.
612    if m.contains("(482)") || m.contains("(481)") || m.contains("auth") {
613        return ArticleFailureKind::AuthFailed;
614    }
615    if m.contains("(403)") || m.contains("permission") || m.contains("forbidden") {
616        return ArticleFailureKind::PermissionDenied;
617    }
618    if m.contains("(430)") || m.contains("article not found") || m.contains("no such article") {
619        return ArticleFailureKind::NotFound;
620    }
621    if m.contains("(502)") || m.contains("service unavailable") {
622        return ArticleFailureKind::ServerDown;
623    }
624    if m.contains("timeout") || m.contains("timed out") {
625        return ArticleFailureKind::Timeout;
626    }
627    if m.contains("connection") || m.contains("eof") || m.contains("reset") || m.contains("closed")
628    {
629        return ArticleFailureKind::ConnectionClosed;
630    }
631    // Default: treat unknown cascade exhaustion as NotFound — same as the
632    // old behaviour — so genuinely-missing articles still abort hopeless
633    // NZBs promptly.
634    ArticleFailureKind::NotFound
635}
636
637#[cfg(test)]
638mod classify_tests {
639    use super::*;
640
641    #[test]
642    fn classifies_auth_failures() {
643        let msg = "Authentication failed: PASS rejected (482): Your block account is fully used";
644        assert_eq!(classify_error_message(msg), ArticleFailureKind::AuthFailed);
645    }
646
647    #[test]
648    fn classifies_not_found() {
649        let msg = "NNTP (430) No such article";
650        assert_eq!(classify_error_message(msg), ArticleFailureKind::NotFound);
651    }
652
653    #[test]
654    fn classifies_service_unavailable() {
655        let msg = "Service unavailable (502)";
656        assert_eq!(classify_error_message(msg), ArticleFailureKind::ServerDown);
657    }
658
659    #[test]
660    fn classifies_timeout() {
661        let msg = "read timed out after 60s";
662        assert_eq!(classify_error_message(msg), ArticleFailureKind::Timeout);
663    }
664
665    #[test]
666    fn unknown_defaults_to_not_found() {
667        assert_eq!(
668            classify_error_message("all servers exhausted"),
669            ArticleFailureKind::NotFound
670        );
671    }
672}
673
674fn emit_failed(ctx: &JobContext, meta: &InFlight, failure: ArticleFailure) {
675    ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
676    let _ = ctx.progress_tx.try_send(ProgressUpdate::ArticleFailed {
677        job_id: meta.job_id.clone(),
678        file_id: meta.file_id.clone(),
679        segment_number: meta.segment_number,
680        failure,
681    });
682    ctx.resolve_one_public();
683}
684
685// ---------------------------------------------------------------------------
686// Per-job pump task
687// ---------------------------------------------------------------------------
688
689/// Drains a job's `pending` queue into the downloader's work channel,
690/// respecting the `paused` gate and exiting on `cancelled`.
691///
692/// The pump parks on `pump_wake` when `pending` is empty or when
693/// `paused` is true. `submit_job` / `resume_job` notify to wake it.
694async fn pump_loop(entry: Arc<JobEntry>, inner: Arc<Inner>, job_id: String) {
695    loop {
696        if entry.cancelled.load(Ordering::SeqCst) {
697            debug!(job_id, "pump exiting: cancelled");
698            return;
699        }
700        if entry.paused.load(Ordering::SeqCst) {
701            entry.pump_wake.notified().await;
702            continue;
703        }
704        let next = entry.pending.lock().pop_front();
705        let Some(item) = next else {
706            // Queue empty. submit_job enqueues every article up-front, so
707            // an empty queue means we're done. Park anyway so cancel can
708            // wake us.
709            entry.pump_wake.notified().await;
710            continue;
711        };
712
713        // Snapshot the current sender. If the downloader is absent (not
714        // started yet, or torn down during reconcile_servers with zero
715        // servers), stash the item back on the front of `pending` and
716        // park; reconcile_servers will notify us when a new handle exists.
717        let sender = inner.handle.read().as_ref().map(|h| h.sender());
718        let Some(sender) = sender else {
719            entry.pending.lock().push_front(item);
720            entry.pump_wake.notified().await;
721            continue;
722        };
723
724        // Send. On SendError (sender closed mid-reconcile), return the
725        // item to the queue and park — the new handle is on its way.
726        if let Err(e) = sender.send(item).await {
727            entry.pending.lock().push_front(e.0);
728            entry.pump_wake.notified().await;
729            continue;
730        }
731    }
732}
733
734/// Build a new `DownloaderConfig` from the engine's static knobs plus the
735/// given server list, spawn the downloader, install its handle in `slot`,
736/// and launch the outcome dispatcher task. Used by both `start()` and
737/// `reconcile_servers` to avoid duplicating the construction.
738///
739/// Precondition: `servers` is non-empty; caller decides the zero-server
740/// policy. `slot` must already be held under a write lock.
741fn spawn_and_install_downloader(
742    inner: &Arc<Inner>,
743    slot: &mut Option<nzb_news::DownloaderHandle>,
744    servers: Vec<ServerConfig>,
745) {
746    let cfg = &inner.config;
747    let server_count = servers.len();
748    let dl_config = nzb_news::DownloaderConfig {
749        servers,
750        max_concurrent_fetches: cfg.max_concurrent_fetches,
751        article_timeout: cfg.article_timeout,
752        work_channel_capacity: cfg.work_channel_capacity,
753        outcome_channel_capacity: cfg.outcome_channel_capacity,
754        probe_policy: cfg.probe_policy.clone(),
755    };
756    let (handle, outcomes) = nzb_news::spawn_downloader(dl_config);
757    let inner_for_task = Arc::clone(inner);
758    tokio::spawn(outcome_dispatcher(inner_for_task, outcomes));
759    *slot = Some(handle);
760    info!(
761        servers = server_count,
762        "NewsDispatchEngine downloader spawned"
763    );
764}