Skip to main content

nzb_dispatch/
dispatch_engine.rs

1//! `DispatchEngine` trait — boundary between the job queue and the article dispatcher.
2//!
3//! This is the contract the queue layer depends on; it hides the concrete
4//! `WorkerPool` implementation. The reverse direction (dispatcher → queue) is
5//! already a decoupled channel (`mpsc::Sender<ProgressUpdate>`), so only this
6//! one trait is needed to cleanly separate the two layers.
7//!
8//! A `DispatchEngine` is responsible for turning an [`NzbJob`] into article
9//! fetches against the configured NNTP servers and reporting progress via
10//! the per-job [`ProgressUpdate`] channel. It must be able to pause, resume,
11//! cancel, and abort individual jobs, reconcile its worker set with the
12//! server list, and shut down gracefully.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use crate::download_engine::{ProgressUpdate, WorkerPool, build_job_submission};
20use nzb_core::models::NzbJob;
21
22/// Article-dispatch engine: accepts jobs, drives NNTP fetches, emits progress.
23///
24/// Constructed by the facade/queue layer and owned as `Arc<dyn DispatchEngine>`.
25/// All methods are `&self`; implementations use interior mutability to allow
26/// concurrent use across tokio tasks.
27#[async_trait::async_trait]
28pub trait DispatchEngine: Send + Sync {
29    /// Spawn workers for all currently enabled servers and start the
30    /// supervisor task. Idempotent (safe to call more than once).
31    fn start(&self);
32
33    /// Register a new job and begin dispatching its unfinished articles.
34    /// Progress is streamed to `progress_tx` as `ProgressUpdate`s; the
35    /// channel is closed when the job reaches a terminal state.
36    fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>);
37
38    /// Pause dispatch for `job_id`. In-flight articles finish; no new work
39    /// is popped for this job until [`resume_job`](Self::resume_job).
40    fn pause_job(&self, job_id: &str);
41
42    /// Resume a paused job.
43    fn resume_job(&self, job_id: &str);
44
45    /// Cancel `job_id`. In-flight articles may still complete but their
46    /// results are discarded; no terminal progress update is emitted beyond
47    /// the one triggered by cancellation.
48    fn cancel_job(&self, job_id: &str);
49
50    /// Abort `job_id` with a human-readable reason. Emits
51    /// [`ProgressUpdate::JobAborted`] once outstanding articles drain.
52    fn abort_job(&self, job_id: &str, reason: String);
53
54    /// Is `job_id` currently known to the dispatcher?
55    fn has_job(&self, job_id: &str) -> bool;
56
57    /// Re-read the server list and adjust workers to match. Call after any
58    /// mutation to the server config (add, remove, enable, disable, resize).
59    fn reconcile_servers(&self);
60
61    /// Override the idle-worker eviction threshold. Tests shrink this to
62    /// make the watchdog converge in seconds; production uses the default.
63    fn set_max_worker_idle(&self, d: Duration);
64
65    /// Lifetime count of worker evictions performed by the heartbeat
66    /// watchdog. Increases by 1 each time the supervisor reclaims a stalled
67    /// worker. Useful for tests and observability.
68    fn eviction_count(&self) -> u64;
69
70    /// Snapshot of per-server lifetime attempt counters. Used by the
71    /// queue manager to emit a diagnostic breakdown alongside a job abort
72    /// — distinguishes "server returned 430 for every article" (dead NZB)
73    /// from "server had auth errors" (transient). Default is empty for
74    /// engines that don't track per-server stats.
75    fn server_stats_snapshot(&self) -> Vec<(String, ServerAttemptStats)> {
76        Vec::new()
77    }
78
79    /// Gracefully shut down: stop accepting new work, signal all workers,
80    /// and wait for the supervisor to exit.
81    async fn shutdown(&self);
82}
83
84/// Per-server lifetime counters reported via
85/// [`DispatchEngine::server_stats_snapshot`]. `not_found` is the strongest
86/// signal for a dead NZB; `transient_failed` separates "missing articles"
87/// from "server flaky / auth issues".
88#[derive(Debug, Clone, Copy, Default)]
89pub struct ServerAttemptStats {
90    pub attempted: u64,
91    pub succeeded: u64,
92    pub not_found: u64,
93    pub transient_failed: u64,
94}
95
96// ---------------------------------------------------------------------------
97// DispatchHandle — wraps Arc<WorkerPool> to implement DispatchEngine.
98//
99// Why a wrapper: several WorkerPool methods have `self: &Arc<Self>` receivers
100// (they spawn tasks that clone the Arc). That signature is incompatible with
101// a `dyn`-object trait method. The wrapper holds `Arc<WorkerPool>` and can
102// call the concrete Arc-receiver methods on it.
103// ---------------------------------------------------------------------------
104
105/// Dynamic-dispatch wrapper around `Arc<WorkerPool>` — the one concrete
106/// [`DispatchEngine`] impl today. Extract this into `nzb-dispatch` in Phase B.
107pub struct DispatchHandle(Arc<WorkerPool>);
108
109impl DispatchHandle {
110    pub fn new(pool: Arc<WorkerPool>) -> Self {
111        Self(pool)
112    }
113
114    /// Escape hatch: access the underlying pool. Intended for callers that
115    /// still need pool-specific APIs not yet promoted to the trait (none
116    /// today, but keeps the migration incremental).
117    pub fn pool(&self) -> &Arc<WorkerPool> {
118        &self.0
119    }
120}
121
122#[async_trait::async_trait]
123impl DispatchEngine for DispatchHandle {
124    fn start(&self) {
125        self.0.start();
126    }
127
128    fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>) {
129        let (ctx, items) = build_job_submission(job, progress_tx);
130        self.0.submit_job(ctx, items);
131    }
132
133    fn pause_job(&self, job_id: &str) {
134        self.0.pause_job(job_id);
135    }
136
137    fn resume_job(&self, job_id: &str) {
138        self.0.resume_job(job_id);
139    }
140
141    fn cancel_job(&self, job_id: &str) {
142        self.0.cancel_job(job_id);
143    }
144
145    fn abort_job(&self, job_id: &str, reason: String) {
146        self.0.abort_job(job_id, reason);
147    }
148
149    fn has_job(&self, job_id: &str) -> bool {
150        self.0.has_job(job_id)
151    }
152
153    fn reconcile_servers(&self) {
154        self.0.reconcile_servers();
155    }
156
157    fn set_max_worker_idle(&self, d: Duration) {
158        self.0.set_max_worker_idle(d);
159    }
160
161    fn eviction_count(&self) -> u64 {
162        self.0.eviction_count()
163    }
164
165    async fn shutdown(&self) {
166        self.0.shutdown().await;
167    }
168}