nzb_dispatch/dispatch_engine.rs
1//! `DispatchEngine` trait — boundary between the job queue and the article dispatcher.
2//!
3//! This is the contract the queue layer depends on; it hides the concrete
4//! `WorkerPool` implementation. The reverse direction (dispatcher → queue) is
5//! already a decoupled channel (`mpsc::Sender<ProgressUpdate>`), so only this
6//! one trait is needed to cleanly separate the two layers.
7//!
8//! A `DispatchEngine` is responsible for turning an [`NzbJob`] into article
9//! fetches against the configured NNTP servers and reporting progress via
10//! the per-job [`ProgressUpdate`] channel. It must be able to pause, resume,
11//! cancel, and abort individual jobs, reconcile its worker set with the
12//! server list, and shut down gracefully.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use crate::download_engine::{ProgressUpdate, WorkerPool, build_job_submission};
20use nzb_core::models::NzbJob;
21
22/// Article-dispatch engine: accepts jobs, drives NNTP fetches, emits progress.
23///
24/// Constructed by the facade/queue layer and owned as `Arc<dyn DispatchEngine>`.
25/// All methods are `&self`; implementations use interior mutability to allow
26/// concurrent use across tokio tasks.
27#[async_trait::async_trait]
28pub trait DispatchEngine: Send + Sync {
29 /// Spawn workers for all currently enabled servers and start the
30 /// supervisor task. Idempotent (safe to call more than once).
31 fn start(&self);
32
33 /// Register a new job and begin dispatching its unfinished articles.
34 /// Progress is streamed to `progress_tx` as `ProgressUpdate`s; the
35 /// channel is closed when the job reaches a terminal state.
36 fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>);
37
38 /// Pause dispatch for `job_id`. In-flight articles finish; no new work
39 /// is popped for this job until [`resume_job`](Self::resume_job).
40 fn pause_job(&self, job_id: &str);
41
42 /// Resume a paused job.
43 fn resume_job(&self, job_id: &str);
44
45 /// Cancel `job_id`. In-flight articles may still complete but their
46 /// results are discarded; no terminal progress update is emitted beyond
47 /// the one triggered by cancellation.
48 fn cancel_job(&self, job_id: &str);
49
50 /// Abort `job_id` with a human-readable reason. Emits
51 /// [`ProgressUpdate::JobAborted`] once outstanding articles drain.
52 fn abort_job(&self, job_id: &str, reason: String);
53
54 /// Is `job_id` currently known to the dispatcher?
55 fn has_job(&self, job_id: &str) -> bool;
56
57 /// Re-read the server list and adjust workers to match. Call after any
58 /// mutation to the server config (add, remove, enable, disable, resize).
59 fn reconcile_servers(&self);
60
61 /// Override the idle-worker eviction threshold. Tests shrink this to
62 /// make the watchdog converge in seconds; production uses the default.
63 fn set_max_worker_idle(&self, d: Duration);
64
65 /// Lifetime count of worker evictions performed by the heartbeat
66 /// watchdog. Increases by 1 each time the supervisor reclaims a stalled
67 /// worker. Useful for tests and observability.
68 fn eviction_count(&self) -> u64;
69
70 /// Snapshot of per-server lifetime attempt counters. Used by the
71 /// queue manager to emit a diagnostic breakdown alongside a job abort
72 /// — distinguishes "server returned 430 for every article" (dead NZB)
73 /// from "server had auth errors" (transient). Default is empty for
74 /// engines that don't track per-server stats.
75 fn server_stats_snapshot(&self) -> Vec<(String, ServerAttemptStats)> {
76 Vec::new()
77 }
78
79 /// Gracefully shut down: stop accepting new work, signal all workers,
80 /// and wait for the supervisor to exit.
81 async fn shutdown(&self);
82}
83
84/// Per-server lifetime counters reported via
85/// [`DispatchEngine::server_stats_snapshot`]. `not_found` is the strongest
86/// signal for a dead NZB; `transient_failed` separates "missing articles"
87/// from "server flaky / auth issues".
88#[derive(Debug, Clone, Copy, Default)]
89pub struct ServerAttemptStats {
90 pub attempted: u64,
91 pub succeeded: u64,
92 pub not_found: u64,
93 pub transient_failed: u64,
94}
95
96// ---------------------------------------------------------------------------
97// DispatchHandle — wraps Arc<WorkerPool> to implement DispatchEngine.
98//
99// Why a wrapper: several WorkerPool methods have `self: &Arc<Self>` receivers
100// (they spawn tasks that clone the Arc). That signature is incompatible with
101// a `dyn`-object trait method. The wrapper holds `Arc<WorkerPool>` and can
102// call the concrete Arc-receiver methods on it.
103// ---------------------------------------------------------------------------
104
105/// Dynamic-dispatch wrapper around `Arc<WorkerPool>` — the one concrete
106/// [`DispatchEngine`] impl today. Extract this into `nzb-dispatch` in Phase B.
107pub struct DispatchHandle(Arc<WorkerPool>);
108
109impl DispatchHandle {
110 pub fn new(pool: Arc<WorkerPool>) -> Self {
111 Self(pool)
112 }
113
114 /// Escape hatch: access the underlying pool. Intended for callers that
115 /// still need pool-specific APIs not yet promoted to the trait (none
116 /// today, but keeps the migration incremental).
117 pub fn pool(&self) -> &Arc<WorkerPool> {
118 &self.0
119 }
120}
121
122#[async_trait::async_trait]
123impl DispatchEngine for DispatchHandle {
124 fn start(&self) {
125 self.0.start();
126 }
127
128 fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>) {
129 let (ctx, items) = build_job_submission(job, progress_tx);
130 self.0.submit_job(ctx, items);
131 }
132
133 fn pause_job(&self, job_id: &str) {
134 self.0.pause_job(job_id);
135 }
136
137 fn resume_job(&self, job_id: &str) {
138 self.0.resume_job(job_id);
139 }
140
141 fn cancel_job(&self, job_id: &str) {
142 self.0.cancel_job(job_id);
143 }
144
145 fn abort_job(&self, job_id: &str, reason: String) {
146 self.0.abort_job(job_id, reason);
147 }
148
149 fn has_job(&self, job_id: &str) -> bool {
150 self.0.has_job(job_id)
151 }
152
153 fn reconcile_servers(&self) {
154 self.0.reconcile_servers();
155 }
156
157 fn set_max_worker_idle(&self, d: Duration) {
158 self.0.set_max_worker_idle(d);
159 }
160
161 fn eviction_count(&self) -> u64 {
162 self.0.eviction_count()
163 }
164
165 async fn shutdown(&self) {
166 self.0.shutdown().await;
167 }
168}