nzb_dispatch/news_engine.rs
1//! `NewsDispatchEngine` — `DispatchEngine` impl backed by the `nzb-news` crate.
2//!
3//! The layered news engine is a pure NNTP fetch layer: it takes per-article
4//! work items and emits [`nzb_news::FetchOutcome`]s. This adapter bolts the
5//! rest of the pipeline on top of it so it satisfies the contract the old
6//! `WorkerPool` engine used to satisfy:
7//!
8//! 1. **Fetch** — delegated to `nzb_news::spawn_downloader`.
9//! 2. **Decode** — `nzb_decode::decode_yenc` on each successful outcome.
10//! 3. **Assemble** — `FileAssembler::assemble_article` writes the decoded
11//! bytes at the yEnc-declared offset.
12//! 4. **Progress** — translates per-article outcomes into
13//! [`ProgressUpdate::ArticleComplete`] / [`ProgressUpdate::ArticleFailed`];
14//! drives job-level terminal via `JobContext::resolve_one`.
15//!
16//! Per-job lifecycle (pause/resume/cancel/abort) is tracked in this adapter
17//! because the news engine is job-agnostic. We keep a `JobContext` per job
18//! (same struct the old engine used — it owns the assembler, progress
19//! channel, deobfuscation state, and terminal-emit logic).
20//!
21//! MVP limitations — marked with TODO comments:
22//! - `pause_job` / `resume_job` are no-ops (work items are submitted
23//! eagerly; pause-gating is a follow-up).
24//! - `reconcile_servers` is a no-op (nzb-news doesn't expose mid-flight
25//! server reconfiguration yet; requires a downloader rebuild).
26//! - `set_max_worker_idle` / `eviction_count` are stubs (no idle-worker
27//! pool concept in nzb-news).
28
29use std::collections::{HashMap, VecDeque};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::time::{Duration, Instant};
33
34use parking_lot::{Mutex, RwLock};
35use tokio::sync::{Notify, mpsc};
36use tracing::{debug, info};
37
38use nzb_core::models::NzbJob;
39use nzb_nntp::config::ServerConfig;
40
41use crate::article_failure::{ArticleFailure, ArticleFailureKind};
42use crate::dispatch_engine::DispatchEngine;
43use crate::download_engine::{JobContext, ProgressUpdate, build_job_submission};
44
45// ---------------------------------------------------------------------------
46// Tuning knobs
47// ---------------------------------------------------------------------------
48
49/// How many articles the nzb-news downloader will hold in-flight across all
50/// servers at once. Matches the old engine's rough ceiling.
51const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 40;
52
53/// Work channel depth inside nzb-news. Articles are buffered here between
54/// `submit_job` enqueue and the per-server fan-out.
55const DEFAULT_WORK_CHANNEL_CAPACITY: usize = 4096;
56
57/// Outcome channel depth. Must be large enough that a momentary backlog in
58/// the decode path doesn't block the fetch loop.
59const DEFAULT_OUTCOME_CHANNEL_CAPACITY: usize = 4096;
60
61// ---------------------------------------------------------------------------
62// Config
63// ---------------------------------------------------------------------------
64
65/// Configuration for [`NewsDispatchEngine`]. Mirrors the knobs exposed by
66/// the old engine so the swap is drop-in from the caller's perspective.
67///
68/// `servers` is held as an `Arc<Mutex<_>>` so the caller (queue manager) can
69/// mutate it and [`DispatchEngine::reconcile_servers`] will pick up the new
70/// list without requiring a new config instance or a full engine rebuild.
71#[derive(Clone)]
72pub struct NewsEngineConfig {
73 pub servers: Arc<Mutex<Vec<ServerConfig>>>,
74 pub article_timeout: Duration,
75 pub max_concurrent_fetches: usize,
76 pub work_channel_capacity: usize,
77 pub outcome_channel_capacity: usize,
78 /// Optional backup-server probe policy. Forwarded verbatim to
79 /// `nzb_news::DownloaderConfig::probe_policy`.
80 ///
81 /// `None` disables probing (every cascade article tries every server).
82 /// `Some(_)` enables fast-fail on a backup server when the probed
83 /// hit-rate falls below the threshold for that job.
84 ///
85 /// Defaults to `Some(ServerProbePolicy::default())` which matches the
86 /// nzb-news default (probe 10 articles, require >=10% hits).
87 pub probe_policy: Option<nzb_news::ServerProbePolicy>,
88}
89
90impl NewsEngineConfig {
91 /// Construct a config from an owned server list. Wraps the list in an
92 /// `Arc<Mutex<_>>` internally; if the caller already owns a shared Arc
93 /// (e.g. queue manager's live server list), use
94 /// [`NewsEngineConfig::with_shared_servers`] instead so mutations are
95 /// visible to the engine.
96 pub fn new(servers: Vec<ServerConfig>, article_timeout: Duration) -> Self {
97 Self::with_shared_servers(Arc::new(Mutex::new(servers)), article_timeout)
98 }
99
100 /// Construct a config sharing an existing `Arc<Mutex<Vec<ServerConfig>>>`
101 /// with the caller. Mutating the Arc from outside and then calling
102 /// [`DispatchEngine::reconcile_servers`] rebuilds the downloader with
103 /// the latest server list — this is how live "add/remove server"
104 /// operations reach the fetch layer.
105 pub fn with_shared_servers(
106 servers: Arc<Mutex<Vec<ServerConfig>>>,
107 article_timeout: Duration,
108 ) -> Self {
109 Self {
110 servers,
111 article_timeout,
112 max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
113 work_channel_capacity: DEFAULT_WORK_CHANNEL_CAPACITY,
114 outcome_channel_capacity: DEFAULT_OUTCOME_CHANNEL_CAPACITY,
115 probe_policy: Some(nzb_news::ServerProbePolicy::default()),
116 }
117 }
118}
119
120// ---------------------------------------------------------------------------
121// Adapter state
122// ---------------------------------------------------------------------------
123
124/// Shared state held behind an `Arc` so the outcome-dispatcher task can
125/// access jobs while the engine is owned by the caller.
126struct Inner {
127 config: NewsEngineConfig,
128 /// Populated by `start()`. Holds the downloader's work-submission sender.
129 handle: RwLock<Option<nzb_news::DownloaderHandle>>,
130 /// Job map: `job_id` → per-job state. Cloned into the outcome task.
131 jobs: RwLock<HashMap<String, Arc<JobEntry>>>,
132 /// Monotonic tag issued for every `WorkItem` submitted to the downloader.
133 /// Also serves as the routing key back to the originating article when
134 /// outcomes come out the other side.
135 next_tag: AtomicU64,
136 /// `tag` → in-flight article metadata. We remove on outcome; this is the
137 /// only place that holds the file_id / segment_number for an article
138 /// mid-flight. Cleared on job cancel to free memory fast.
139 in_flight: RwLock<HashMap<u64, InFlight>>,
140}
141
142/// Per-job state owned by the adapter.
143struct JobEntry {
144 /// Reused from the old engine — owns the assembler, progress channel,
145 /// deobfuscation state, and terminal emit logic. Everything the adapter
146 /// needs on the success/failure path is already a field here.
147 context: Arc<JobContext>,
148 /// When true, the pump task holds items in `pending` instead of
149 /// forwarding them to the downloader. In-flight articles still
150 /// complete — pause gates the *next* work only.
151 paused: AtomicBool,
152 /// Set by `cancel_job`. The pump exits and drains the queue.
153 cancelled: AtomicBool,
154 /// Work items waiting to be handed to the downloader. `submit_job`
155 /// pushes all items here; the pump task drains on start and on
156 /// resume.
157 pending: Mutex<VecDeque<nzb_news::WorkItem>>,
158 /// Wake-up signal for the pump task. Notified when `submit_job` adds
159 /// items or when `resume_job` / `cancel_job` changes the gate.
160 pump_wake: Notify,
161}
162
163/// Metadata recorded when a `WorkItem` is dispatched so we can route the
164/// outcome back to the right job / file / segment.
165#[derive(Clone)]
166struct InFlight {
167 job_id: String,
168 file_id: String,
169 segment_number: u32,
170}
171
172// ---------------------------------------------------------------------------
173// NewsDispatchEngine
174// ---------------------------------------------------------------------------
175
176/// `DispatchEngine` impl backed by the layered nzb-news fetch engine.
177pub struct NewsDispatchEngine {
178 inner: Arc<Inner>,
179}
180
181impl NewsDispatchEngine {
182 /// Construct the engine. Does **not** spawn the downloader —
183 /// [`DispatchEngine::start`] does that.
184 pub fn new(config: NewsEngineConfig) -> Self {
185 Self {
186 inner: Arc::new(Inner {
187 config,
188 handle: RwLock::new(None),
189 jobs: RwLock::new(HashMap::new()),
190 next_tag: AtomicU64::new(1),
191 in_flight: RwLock::new(HashMap::new()),
192 }),
193 }
194 }
195}
196
197#[async_trait::async_trait]
198impl DispatchEngine for NewsDispatchEngine {
199 fn start(&self) {
200 let mut slot = self.inner.handle.write();
201 if slot.is_some() {
202 return; // idempotent
203 }
204
205 let servers_snapshot = self.inner.config.servers.lock().clone();
206 if servers_snapshot.is_empty() {
207 // Deferred start: with zero servers, spawning the downloader
208 // would create an internal work queue with no per-server
209 // workers, and any items pushed would sit in limbo.
210 // `reconcile_servers` will start it once servers are added.
211 info!("NewsDispatchEngine start deferred — no servers configured");
212 return;
213 }
214 spawn_and_install_downloader(&self.inner, &mut slot, servers_snapshot);
215 }
216
217 fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>) {
218 // Reuse the old engine's job-submission builder: creates the
219 // FileAssembler, registers files, and filters out already-downloaded
220 // articles. We only use the returned `JobContext` — the WorkItem
221 // vec it produces is in the old engine's format; we build nzb-news
222 // work items fresh below.
223 let (ctx, legacy_items) = build_job_submission(job, progress_tx);
224
225 // Build nzb-news wrapper types. One NzbObject for the job, one
226 // NzbFile per file, and one Article per work item.
227 let news_files: Vec<Arc<nzb_news::NzbFile>> = job
228 .files
229 .iter()
230 .map(|f| {
231 Arc::new(nzb_news::NzbFile::new(
232 &f.id,
233 &job.id,
234 &f.filename,
235 f.articles.len() as u32,
236 ))
237 })
238 .collect();
239 let news_files_by_id: HashMap<String, Arc<nzb_news::NzbFile>> = news_files
240 .iter()
241 .map(|nf| (nf.id.clone(), Arc::clone(nf)))
242 .collect();
243 let total_articles = legacy_items.len() as u64;
244 let news_job = Arc::new(nzb_news::NzbObject::new(
245 &job.id,
246 &job.name,
247 total_articles,
248 job.total_bytes,
249 news_files.clone(),
250 ));
251
252 // Convert each legacy WorkItem into an nzb-news WorkItem, recording
253 // the routing metadata into in_flight and the item itself into the
254 // job's pending queue. The pump task forwards from pending to the
255 // downloader, gated by the `paused` flag.
256 let mut pending = VecDeque::with_capacity(legacy_items.len());
257 let tag_counter = &self.inner.next_tag;
258 for item in legacy_items {
259 let tag = tag_counter.fetch_add(1, Ordering::Relaxed);
260 let file = match news_files_by_id.get(&item.file_id) {
261 Some(f) => Arc::clone(f),
262 None => continue, // shouldn't happen — file_id came from the same job
263 };
264 self.inner.in_flight.write().insert(
265 tag,
266 InFlight {
267 job_id: item.job_id.clone(),
268 file_id: item.file_id.clone(),
269 segment_number: item.segment_number,
270 },
271 );
272 let article = Arc::new(nzb_news::Article::new(
273 item.message_id.clone(),
274 item.file_id.clone(),
275 item.job_id.clone(),
276 0,
277 item.segment_number,
278 tag,
279 ));
280 pending.push_back(nzb_news::WorkItem {
281 tag,
282 article,
283 file,
284 job: Arc::clone(&news_job),
285 });
286 }
287
288 let entry = Arc::new(JobEntry {
289 context: Arc::clone(&ctx),
290 paused: AtomicBool::new(false),
291 cancelled: AtomicBool::new(false),
292 pending: Mutex::new(pending),
293 pump_wake: Notify::new(),
294 });
295 self.inner
296 .jobs
297 .write()
298 .insert(ctx.job_id.clone(), Arc::clone(&entry));
299
300 // Spawn the pump. It acquires a sender from the engine handle on
301 // each iteration and parks if the downloader is absent — so
302 // submitting a job before `start()` (or during a 0-server startup
303 // window) is safe: items wait in `pending` until `reconcile_servers`
304 // spawns the downloader.
305 let job_id = job.id.clone();
306 tokio::spawn(pump_loop(entry, Arc::clone(&self.inner), job_id));
307 }
308
309 fn pause_job(&self, job_id: &str) {
310 if let Some(entry) = self.inner.jobs.read().get(job_id) {
311 // Local gate — stops the pump from handing new items to nzb-news.
312 entry.paused.store(true, Ordering::SeqCst);
313 }
314 // Scheduler-level gate — holds already-submitted articles in
315 // nzb-news's own pending queue. Without this, anything already
316 // accepted into `work_channel_capacity` (default 4096) would still
317 // route to servers despite the local gate.
318 if let Some(h) = self.inner.handle.read().as_ref() {
319 h.pause_job(job_id);
320 }
321 debug!(job_id, "paused");
322 }
323
324 fn resume_job(&self, job_id: &str) {
325 if let Some(entry) = self.inner.jobs.read().get(job_id) {
326 entry.paused.store(false, Ordering::SeqCst);
327 entry.pump_wake.notify_waiters();
328 }
329 if let Some(h) = self.inner.handle.read().as_ref() {
330 h.resume_job(job_id);
331 }
332 debug!(job_id, "resumed");
333 }
334
335 fn cancel_job(&self, job_id: &str) {
336 let entry = self.inner.jobs.write().remove(job_id);
337 if let Some(entry) = entry {
338 // Signal pump to drain + exit.
339 entry.cancelled.store(true, Ordering::SeqCst);
340 entry.pump_wake.notify_waiters();
341 // Drop any not-yet-dispatched items so the pump sees an empty
342 // queue and exits promptly.
343 entry.pending.lock().clear();
344 // Clear in-flight entries for this job so stale outcomes are
345 // dropped silently by the dispatcher (unknown-tag path).
346 self.inner
347 .in_flight
348 .write()
349 .retain(|_, m| m.job_id != job_id);
350 // Purge nzb-news scheduler-level state: items already accepted
351 // into the downloader's work_channel or pending list get emitted
352 // as Cancelled outcomes and removed. Without this, a cancelled
353 // job would keep routing its buffered articles to servers.
354 if let Some(h) = self.inner.handle.read().as_ref() {
355 h.purge_job(job_id);
356 }
357 debug!(job_id, "cancelled");
358 }
359 }
360
361 fn abort_job(&self, job_id: &str, reason: String) {
362 // Emit terminal via the existing JobContext machinery — same path
363 // the old engine uses. `emit_terminal` is idempotent; cancel_job
364 // later is safe.
365 let entry = self.inner.jobs.read().get(job_id).cloned();
366 if let Some(entry) = entry {
367 *entry.context.abort_reason.lock() = Some(reason);
368 entry.context.emit_terminal_public();
369 }
370 self.cancel_job(job_id);
371 }
372
373 fn has_job(&self, job_id: &str) -> bool {
374 self.inner.jobs.read().contains_key(job_id)
375 }
376
377 fn reconcile_servers(&self) {
378 // Rebuild the downloader with the current server list.
379 //
380 // First-time (0 → N): when `start` was deferred for lack of
381 // servers, pump_loops parked waiting for a handle. Spawning the
382 // downloader here and notifying pumps resumes dispatch cleanly —
383 // no items are lost because `pump_loop` leaves unsent items in
384 // `pending` until a sender is available.
385 //
386 // Reconfigure (N → M, N > 0): the downloader is rebuilt and the
387 // old one shut down. Articles already in the old downloader's
388 // internal queue that had not completed may be lost; their job
389 // will stall until nzb-news grows a dynamic-server API. For the
390 // common "add/edit server" UI flows this is rare in practice and
391 // the user can retry a stalled job manually. Documented as a
392 // limitation rather than a silent partial failure.
393 let servers_snapshot = self.inner.config.servers.lock().clone();
394 let server_count = servers_snapshot.len();
395
396 let old_handle = if servers_snapshot.is_empty() {
397 // Remove handle; pumps will park until a server is added.
398 self.inner.handle.write().take()
399 } else {
400 let mut slot = self.inner.handle.write();
401 let old = slot.take();
402 spawn_and_install_downloader(&self.inner, &mut slot, servers_snapshot);
403 old
404 };
405
406 if let Some(old) = old_handle {
407 old.shutdown();
408 }
409
410 // Wake all pump loops so they re-read the handle and either pick
411 // up the new sender or park on `pump_wake` until one arrives.
412 let entries: Vec<Arc<JobEntry>> = self.inner.jobs.read().values().map(Arc::clone).collect();
413 for entry in entries {
414 entry.pump_wake.notify_waiters();
415 }
416
417 info!(
418 servers = server_count,
419 "NewsDispatchEngine reconciled server list"
420 );
421 }
422
423 fn set_max_worker_idle(&self, _d: Duration) {
424 // No per-worker idle concept in nzb-news; workers are persistent
425 // until the downloader shuts down.
426 }
427
428 fn eviction_count(&self) -> u64 {
429 0
430 }
431
432 fn server_stats_snapshot(&self) -> Vec<(String, crate::dispatch_engine::ServerAttemptStats)> {
433 let guard = self.inner.handle.read();
434 let Some(h) = guard.as_ref() else {
435 return Vec::new();
436 };
437 h.server_stats_snapshot()
438 .into_iter()
439 .map(|(id, s)| {
440 (
441 id,
442 crate::dispatch_engine::ServerAttemptStats {
443 attempted: s.attempted,
444 succeeded: s.succeeded,
445 not_found: s.not_found,
446 transient_failed: s.transient_failed,
447 },
448 )
449 })
450 .collect()
451 }
452
453 async fn shutdown(&self) {
454 let handle = self.inner.handle.write().take();
455 if let Some(h) = handle {
456 h.shutdown();
457 h.join().await;
458 }
459 }
460}
461
462// ---------------------------------------------------------------------------
463// Outcome dispatcher
464// ---------------------------------------------------------------------------
465
466/// Main loop: consume `FetchOutcome`s from nzb-news and translate each into
467/// a `ProgressUpdate`, doing decode + assembly inline on success. Runs until
468/// the outcome channel is closed (downloader shutdown).
469async fn outcome_dispatcher(
470 inner: Arc<Inner>,
471 mut outcomes: mpsc::Receiver<nzb_news::FetchOutcome>,
472) {
473 while let Some(outcome) = outcomes.recv().await {
474 match outcome {
475 nzb_news::FetchOutcome::Success {
476 tag,
477 server_id,
478 bytes,
479 article_bytes: _,
480 } => {
481 // Spawn each success so decode+assemble runs in parallel.
482 // The old engine got this for free because every worker did
483 // its own fetch+decode+assemble — centralising here would
484 // serialise all post-fetch work to a single task.
485 let inner2 = Arc::clone(&inner);
486 tokio::spawn(async move {
487 process_success(inner2, tag, server_id, bytes).await;
488 });
489 }
490 nzb_news::FetchOutcome::Failed { tag, last_error } => {
491 process_failure(&inner, tag, last_error);
492 }
493 nzb_news::FetchOutcome::Cancelled { tag } => {
494 // Treat as benign discard — caller (queue manager) will
495 // observe JobAborted separately via abort_job.
496 inner.in_flight.write().remove(&tag);
497 }
498 }
499 }
500 debug!("outcome_dispatcher exiting: channel closed");
501}
502
503async fn process_success(inner: Arc<Inner>, tag: u64, server_id: String, raw: Vec<u8>) {
504 let meta = inner.in_flight.write().remove(&tag);
505 let Some(meta) = meta else {
506 return; // stale / cancelled
507 };
508
509 let entry = inner.jobs.read().get(&meta.job_id).cloned();
510 let Some(entry) = entry else {
511 return; // job cancelled after submit
512 };
513 let ctx = &entry.context;
514
515 // Decode (CPU-bound; SIMD is fast but not free).
516 let decode_start = Instant::now();
517 let decoded = match nzb_decode::decode_yenc(&raw) {
518 Ok(d) => d,
519 Err(e) => {
520 let failure = ArticleFailure::decode_error(server_id, format!("yEnc decode: {e}"));
521 emit_failed(ctx, &meta, failure);
522 return;
523 }
524 };
525 let decode_us = decode_start.elapsed().as_micros() as u64;
526
527 // Record yEnc filename for deobfuscation.
528 if let Some(ref fname) = decoded.filename
529 && !fname.is_empty()
530 {
531 ctx.yenc_names
532 .lock()
533 .insert(meta.file_id.clone(), fname.clone());
534 }
535
536 let data_begin = decoded.part_begin.unwrap_or(0);
537
538 // Assemble.
539 let assemble_start = Instant::now();
540 let file_complete = match ctx.assembler.assemble_article(
541 &meta.job_id,
542 &meta.file_id,
543 meta.segment_number,
544 data_begin,
545 &decoded.data,
546 ) {
547 Ok(b) => b,
548 Err(e) => {
549 let failure = ArticleFailure::decode_error(server_id, format!("assembly: {e}"));
550 emit_failed(ctx, &meta, failure);
551 return;
552 }
553 };
554 let assemble_us = assemble_start.elapsed().as_micros() as u64;
555
556 // Timing stats.
557 ctx.total_decode_us.fetch_add(decode_us, Ordering::Relaxed);
558 ctx.total_assemble_us
559 .fetch_add(assemble_us, Ordering::Relaxed);
560 ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
561
562 // Emit progress.
563 let decoded_bytes = decoded.data.len() as u64;
564 let _ = ctx.progress_tx.try_send(ProgressUpdate::ArticleComplete {
565 job_id: meta.job_id.clone(),
566 file_id: meta.file_id.clone(),
567 segment_number: meta.segment_number,
568 decoded_bytes,
569 file_complete,
570 server_id: Some(server_id),
571 });
572
573 ctx.resolve_one_public();
574}
575
576fn process_failure(inner: &Inner, tag: u64, last_error: Option<String>) {
577 let meta = inner.in_flight.write().remove(&tag);
578 let Some(meta) = meta else {
579 return;
580 };
581 let entry = inner.jobs.read().get(&meta.job_id).cloned();
582 let Some(entry) = entry else {
583 return;
584 };
585 let msg = last_error.unwrap_or_else(|| "all servers exhausted".into());
586 // nzb-news doesn't carry structured error info at the outcome layer —
587 // only the last attempt's error string. Pattern-match common causes
588 // so the hopeless-tracker and queue_manager can distinguish "server
589 // is broken/quota-exhausted" (transient, don't count toward hopeless)
590 // from "article genuinely missing everywhere" (counts toward
591 // hopeless). Without this, an auth/quota failure trickles through as
592 // NotFound and aborts the job with "articles confirmed missing" —
593 // confusing diagnostics that blame the content instead of the server.
594 let kind = classify_error_message(&msg);
595 let failure = ArticleFailure {
596 kind,
597 server_id: String::new(),
598 message: msg,
599 };
600 emit_failed(&entry.context, &meta, failure);
601}
602
603/// Map an opaque nzb-news error string to a typed [`ArticleFailureKind`].
604///
605/// The strings come from `nzb_nntp::error::NntpError` (via nzb-news) and are
606/// the only signal we have at this layer — nzb-news's `FetchOutcome` carries
607/// `Option<String>` rather than a structured kind. Order of checks matters:
608/// more specific patterns are tested first.
609fn classify_error_message(msg: &str) -> ArticleFailureKind {
610 let m = msg.to_ascii_lowercase();
611 // NNTP response codes in the message body are the strongest signal.
612 if m.contains("(482)") || m.contains("(481)") || m.contains("auth") {
613 return ArticleFailureKind::AuthFailed;
614 }
615 if m.contains("(403)") || m.contains("permission") || m.contains("forbidden") {
616 return ArticleFailureKind::PermissionDenied;
617 }
618 if m.contains("(430)") || m.contains("article not found") || m.contains("no such article") {
619 return ArticleFailureKind::NotFound;
620 }
621 if m.contains("(502)") || m.contains("service unavailable") {
622 return ArticleFailureKind::ServerDown;
623 }
624 if m.contains("timeout") || m.contains("timed out") {
625 return ArticleFailureKind::Timeout;
626 }
627 if m.contains("connection") || m.contains("eof") || m.contains("reset") || m.contains("closed")
628 {
629 return ArticleFailureKind::ConnectionClosed;
630 }
631 // Default: treat unknown cascade exhaustion as NotFound — same as the
632 // old behaviour — so genuinely-missing articles still abort hopeless
633 // NZBs promptly.
634 ArticleFailureKind::NotFound
635}
636
637#[cfg(test)]
638mod classify_tests {
639 use super::*;
640
641 #[test]
642 fn classifies_auth_failures() {
643 let msg = "Authentication failed: PASS rejected (482): Your block account is fully used";
644 assert_eq!(classify_error_message(msg), ArticleFailureKind::AuthFailed);
645 }
646
647 #[test]
648 fn classifies_not_found() {
649 let msg = "NNTP (430) No such article";
650 assert_eq!(classify_error_message(msg), ArticleFailureKind::NotFound);
651 }
652
653 #[test]
654 fn classifies_service_unavailable() {
655 let msg = "Service unavailable (502)";
656 assert_eq!(classify_error_message(msg), ArticleFailureKind::ServerDown);
657 }
658
659 #[test]
660 fn classifies_timeout() {
661 let msg = "read timed out after 60s";
662 assert_eq!(classify_error_message(msg), ArticleFailureKind::Timeout);
663 }
664
665 #[test]
666 fn unknown_defaults_to_not_found() {
667 assert_eq!(
668 classify_error_message("all servers exhausted"),
669 ArticleFailureKind::NotFound
670 );
671 }
672}
673
674fn emit_failed(ctx: &JobContext, meta: &InFlight, failure: ArticleFailure) {
675 ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
676 let _ = ctx.progress_tx.try_send(ProgressUpdate::ArticleFailed {
677 job_id: meta.job_id.clone(),
678 file_id: meta.file_id.clone(),
679 segment_number: meta.segment_number,
680 failure,
681 });
682 ctx.resolve_one_public();
683}
684
685// ---------------------------------------------------------------------------
686// Per-job pump task
687// ---------------------------------------------------------------------------
688
689/// Drains a job's `pending` queue into the downloader's work channel,
690/// respecting the `paused` gate and exiting on `cancelled`.
691///
692/// The pump parks on `pump_wake` when `pending` is empty or when
693/// `paused` is true. `submit_job` / `resume_job` notify to wake it.
694async fn pump_loop(entry: Arc<JobEntry>, inner: Arc<Inner>, job_id: String) {
695 loop {
696 if entry.cancelled.load(Ordering::SeqCst) {
697 debug!(job_id, "pump exiting: cancelled");
698 return;
699 }
700 if entry.paused.load(Ordering::SeqCst) {
701 entry.pump_wake.notified().await;
702 continue;
703 }
704 let next = entry.pending.lock().pop_front();
705 let Some(item) = next else {
706 // Queue empty. submit_job enqueues every article up-front, so
707 // an empty queue means we're done. Park anyway so cancel can
708 // wake us.
709 entry.pump_wake.notified().await;
710 continue;
711 };
712
713 // Snapshot the current sender. If the downloader is absent (not
714 // started yet, or torn down during reconcile_servers with zero
715 // servers), stash the item back on the front of `pending` and
716 // park; reconcile_servers will notify us when a new handle exists.
717 let sender = inner.handle.read().as_ref().map(|h| h.sender());
718 let Some(sender) = sender else {
719 entry.pending.lock().push_front(item);
720 entry.pump_wake.notified().await;
721 continue;
722 };
723
724 // Send. On SendError (sender closed mid-reconcile), return the
725 // item to the queue and park — the new handle is on its way.
726 if let Err(e) = sender.send(item).await {
727 entry.pending.lock().push_front(e.0);
728 entry.pump_wake.notified().await;
729 continue;
730 }
731 }
732}
733
734/// Build a new `DownloaderConfig` from the engine's static knobs plus the
735/// given server list, spawn the downloader, install its handle in `slot`,
736/// and launch the outcome dispatcher task. Used by both `start()` and
737/// `reconcile_servers` to avoid duplicating the construction.
738///
739/// Precondition: `servers` is non-empty; caller decides the zero-server
740/// policy. `slot` must already be held under a write lock.
741fn spawn_and_install_downloader(
742 inner: &Arc<Inner>,
743 slot: &mut Option<nzb_news::DownloaderHandle>,
744 servers: Vec<ServerConfig>,
745) {
746 let cfg = &inner.config;
747 let server_count = servers.len();
748 let dl_config = nzb_news::DownloaderConfig {
749 servers,
750 max_concurrent_fetches: cfg.max_concurrent_fetches,
751 article_timeout: cfg.article_timeout,
752 work_channel_capacity: cfg.work_channel_capacity,
753 outcome_channel_capacity: cfg.outcome_channel_capacity,
754 probe_policy: cfg.probe_policy.clone(),
755 };
756 let (handle, outcomes) = nzb_news::spawn_downloader(dl_config);
757 let inner_for_task = Arc::clone(inner);
758 tokio::spawn(outcome_dispatcher(inner_for_task, outcomes));
759 *slot = Some(handle);
760 info!(
761 servers = server_count,
762 "NewsDispatchEngine downloader spawned"
763 );
764}