use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use parking_lot::{Mutex, RwLock};
use tokio::sync::{Notify, mpsc};
use tracing::{debug, info};
use nzb_core::models::NzbJob;
use nzb_nntp::config::ServerConfig;
use crate::article_failure::{ArticleFailure, ArticleFailureKind};
use crate::dispatch_engine::DispatchEngine;
use crate::download_engine::{JobContext, ProgressUpdate, build_job_submission};
const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 40;
const DEFAULT_WORK_CHANNEL_CAPACITY: usize = 4096;
const DEFAULT_OUTCOME_CHANNEL_CAPACITY: usize = 4096;
#[derive(Clone)]
pub struct NewsEngineConfig {
pub servers: Arc<Mutex<Vec<ServerConfig>>>,
pub article_timeout: Duration,
pub max_concurrent_fetches: usize,
pub work_channel_capacity: usize,
pub outcome_channel_capacity: usize,
pub probe_policy: Option<nzb_news::ServerProbePolicy>,
}
impl NewsEngineConfig {
pub fn new(servers: Vec<ServerConfig>, article_timeout: Duration) -> Self {
Self::with_shared_servers(Arc::new(Mutex::new(servers)), article_timeout)
}
pub fn with_shared_servers(
servers: Arc<Mutex<Vec<ServerConfig>>>,
article_timeout: Duration,
) -> Self {
Self {
servers,
article_timeout,
max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
work_channel_capacity: DEFAULT_WORK_CHANNEL_CAPACITY,
outcome_channel_capacity: DEFAULT_OUTCOME_CHANNEL_CAPACITY,
probe_policy: Some(nzb_news::ServerProbePolicy::default()),
}
}
}
struct Inner {
config: NewsEngineConfig,
handle: RwLock<Option<nzb_news::DownloaderHandle>>,
jobs: RwLock<HashMap<String, Arc<JobEntry>>>,
next_tag: AtomicU64,
in_flight: RwLock<HashMap<u64, InFlight>>,
}
struct JobEntry {
context: Arc<JobContext>,
paused: AtomicBool,
cancelled: AtomicBool,
pending: Mutex<VecDeque<nzb_news::WorkItem>>,
pump_wake: Notify,
}
#[derive(Clone)]
struct InFlight {
job_id: String,
file_id: String,
segment_number: u32,
}
pub struct NewsDispatchEngine {
inner: Arc<Inner>,
}
impl NewsDispatchEngine {
pub fn new(config: NewsEngineConfig) -> Self {
Self {
inner: Arc::new(Inner {
config,
handle: RwLock::new(None),
jobs: RwLock::new(HashMap::new()),
next_tag: AtomicU64::new(1),
in_flight: RwLock::new(HashMap::new()),
}),
}
}
}
#[async_trait::async_trait]
impl DispatchEngine for NewsDispatchEngine {
fn start(&self) {
let mut slot = self.inner.handle.write();
if slot.is_some() {
return; }
let servers_snapshot = self.inner.config.servers.lock().clone();
if servers_snapshot.is_empty() {
info!("NewsDispatchEngine start deferred — no servers configured");
return;
}
spawn_and_install_downloader(&self.inner, &mut slot, servers_snapshot);
}
fn submit_job(&self, job: &NzbJob, progress_tx: mpsc::Sender<ProgressUpdate>) {
let (ctx, legacy_items) = build_job_submission(job, progress_tx);
let news_files: Vec<Arc<nzb_news::NzbFile>> = job
.files
.iter()
.map(|f| {
Arc::new(nzb_news::NzbFile::new(
&f.id,
&job.id,
&f.filename,
f.articles.len() as u32,
))
})
.collect();
let news_files_by_id: HashMap<String, Arc<nzb_news::NzbFile>> = news_files
.iter()
.map(|nf| (nf.id.clone(), Arc::clone(nf)))
.collect();
let total_articles = legacy_items.len() as u64;
let news_job = Arc::new(nzb_news::NzbObject::new(
&job.id,
&job.name,
total_articles,
job.total_bytes,
news_files.clone(),
));
let mut pending = VecDeque::with_capacity(legacy_items.len());
let tag_counter = &self.inner.next_tag;
for item in legacy_items {
let tag = tag_counter.fetch_add(1, Ordering::Relaxed);
let file = match news_files_by_id.get(&item.file_id) {
Some(f) => Arc::clone(f),
None => continue, };
self.inner.in_flight.write().insert(
tag,
InFlight {
job_id: item.job_id.clone(),
file_id: item.file_id.clone(),
segment_number: item.segment_number,
},
);
let article = Arc::new(nzb_news::Article::new(
item.message_id.clone(),
item.file_id.clone(),
item.job_id.clone(),
0,
item.segment_number,
tag,
));
pending.push_back(nzb_news::WorkItem {
tag,
article,
file,
job: Arc::clone(&news_job),
});
}
let entry = Arc::new(JobEntry {
context: Arc::clone(&ctx),
paused: AtomicBool::new(false),
cancelled: AtomicBool::new(false),
pending: Mutex::new(pending),
pump_wake: Notify::new(),
});
self.inner
.jobs
.write()
.insert(ctx.job_id.clone(), Arc::clone(&entry));
let job_id = job.id.clone();
tokio::spawn(pump_loop(entry, Arc::clone(&self.inner), job_id));
}
fn pause_job(&self, job_id: &str) {
if let Some(entry) = self.inner.jobs.read().get(job_id) {
entry.paused.store(true, Ordering::SeqCst);
}
if let Some(h) = self.inner.handle.read().as_ref() {
h.pause_job(job_id);
}
debug!(job_id, "paused");
}
fn resume_job(&self, job_id: &str) {
if let Some(entry) = self.inner.jobs.read().get(job_id) {
entry.paused.store(false, Ordering::SeqCst);
entry.pump_wake.notify_waiters();
}
if let Some(h) = self.inner.handle.read().as_ref() {
h.resume_job(job_id);
}
debug!(job_id, "resumed");
}
fn cancel_job(&self, job_id: &str) {
let entry = self.inner.jobs.write().remove(job_id);
if let Some(entry) = entry {
entry.cancelled.store(true, Ordering::SeqCst);
entry.pump_wake.notify_waiters();
entry.pending.lock().clear();
self.inner
.in_flight
.write()
.retain(|_, m| m.job_id != job_id);
if let Some(h) = self.inner.handle.read().as_ref() {
h.purge_job(job_id);
}
debug!(job_id, "cancelled");
}
}
fn abort_job(&self, job_id: &str, reason: String) {
let entry = self.inner.jobs.read().get(job_id).cloned();
if let Some(entry) = entry {
*entry.context.abort_reason.lock() = Some(reason);
entry.context.emit_terminal_public();
}
self.cancel_job(job_id);
}
fn has_job(&self, job_id: &str) -> bool {
self.inner.jobs.read().contains_key(job_id)
}
fn reconcile_servers(&self) {
let servers_snapshot = self.inner.config.servers.lock().clone();
let server_count = servers_snapshot.len();
let old_handle = if servers_snapshot.is_empty() {
self.inner.handle.write().take()
} else {
let mut slot = self.inner.handle.write();
let old = slot.take();
spawn_and_install_downloader(&self.inner, &mut slot, servers_snapshot);
old
};
if let Some(old) = old_handle {
old.shutdown();
}
let entries: Vec<Arc<JobEntry>> = self.inner.jobs.read().values().map(Arc::clone).collect();
for entry in entries {
entry.pump_wake.notify_waiters();
}
info!(
servers = server_count,
"NewsDispatchEngine reconciled server list"
);
}
fn set_max_worker_idle(&self, _d: Duration) {
}
fn eviction_count(&self) -> u64 {
0
}
fn server_stats_snapshot(&self) -> Vec<(String, crate::dispatch_engine::ServerAttemptStats)> {
let guard = self.inner.handle.read();
let Some(h) = guard.as_ref() else {
return Vec::new();
};
h.server_stats_snapshot()
.into_iter()
.map(|(id, s)| {
(
id,
crate::dispatch_engine::ServerAttemptStats {
attempted: s.attempted,
succeeded: s.succeeded,
not_found: s.not_found,
transient_failed: s.transient_failed,
},
)
})
.collect()
}
async fn shutdown(&self) {
let handle = self.inner.handle.write().take();
if let Some(h) = handle {
h.shutdown();
h.join().await;
}
}
}
async fn outcome_dispatcher(
inner: Arc<Inner>,
mut outcomes: mpsc::Receiver<nzb_news::FetchOutcome>,
) {
while let Some(outcome) = outcomes.recv().await {
match outcome {
nzb_news::FetchOutcome::Success {
tag,
server_id,
bytes,
article_bytes: _,
} => {
let inner2 = Arc::clone(&inner);
tokio::spawn(async move {
process_success(inner2, tag, server_id, bytes).await;
});
}
nzb_news::FetchOutcome::Failed { tag, last_error } => {
process_failure(&inner, tag, last_error);
}
nzb_news::FetchOutcome::Cancelled { tag } => {
inner.in_flight.write().remove(&tag);
}
}
}
debug!("outcome_dispatcher exiting: channel closed");
}
async fn process_success(inner: Arc<Inner>, tag: u64, server_id: String, raw: Vec<u8>) {
let meta = inner.in_flight.write().remove(&tag);
let Some(meta) = meta else {
return; };
let entry = inner.jobs.read().get(&meta.job_id).cloned();
let Some(entry) = entry else {
return; };
let ctx = &entry.context;
let decode_start = Instant::now();
let decoded = match nzb_decode::decode_yenc(&raw) {
Ok(d) => d,
Err(e) => {
let failure = ArticleFailure::decode_error(server_id, format!("yEnc decode: {e}"));
emit_failed(ctx, &meta, failure);
return;
}
};
let decode_us = decode_start.elapsed().as_micros() as u64;
if let Some(ref fname) = decoded.filename
&& !fname.is_empty()
{
ctx.yenc_names
.lock()
.insert(meta.file_id.clone(), fname.clone());
}
let data_begin = decoded.part_begin.unwrap_or(0);
let assemble_start = Instant::now();
let file_complete = match ctx.assembler.assemble_article(
&meta.job_id,
&meta.file_id,
meta.segment_number,
data_begin,
&decoded.data,
) {
Ok(b) => b,
Err(e) => {
let failure = ArticleFailure::decode_error(server_id, format!("assembly: {e}"));
emit_failed(ctx, &meta, failure);
return;
}
};
let assemble_us = assemble_start.elapsed().as_micros() as u64;
ctx.total_decode_us.fetch_add(decode_us, Ordering::Relaxed);
ctx.total_assemble_us
.fetch_add(assemble_us, Ordering::Relaxed);
ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
let decoded_bytes = decoded.data.len() as u64;
let _ = ctx.progress_tx.try_send(ProgressUpdate::ArticleComplete {
job_id: meta.job_id.clone(),
file_id: meta.file_id.clone(),
segment_number: meta.segment_number,
decoded_bytes,
file_complete,
server_id: Some(server_id),
});
ctx.resolve_one_public();
}
fn process_failure(inner: &Inner, tag: u64, last_error: Option<String>) {
let meta = inner.in_flight.write().remove(&tag);
let Some(meta) = meta else {
return;
};
let entry = inner.jobs.read().get(&meta.job_id).cloned();
let Some(entry) = entry else {
return;
};
let msg = last_error.unwrap_or_else(|| "all servers exhausted".into());
let kind = classify_error_message(&msg);
let failure = ArticleFailure {
kind,
server_id: String::new(),
message: msg,
};
emit_failed(&entry.context, &meta, failure);
}
fn classify_error_message(msg: &str) -> ArticleFailureKind {
let m = msg.to_ascii_lowercase();
if m.contains("(482)") || m.contains("(481)") || m.contains("auth") {
return ArticleFailureKind::AuthFailed;
}
if m.contains("(403)") || m.contains("permission") || m.contains("forbidden") {
return ArticleFailureKind::PermissionDenied;
}
if m.contains("(430)") || m.contains("article not found") || m.contains("no such article") {
return ArticleFailureKind::NotFound;
}
if m.contains("(502)") || m.contains("service unavailable") {
return ArticleFailureKind::ServerDown;
}
if m.contains("timeout") || m.contains("timed out") {
return ArticleFailureKind::Timeout;
}
if m.contains("connection") || m.contains("eof") || m.contains("reset") || m.contains("closed")
{
return ArticleFailureKind::ConnectionClosed;
}
ArticleFailureKind::NotFound
}
#[cfg(test)]
mod classify_tests {
use super::*;
#[test]
fn classifies_auth_failures() {
let msg = "Authentication failed: PASS rejected (482): Your block account is fully used";
assert_eq!(classify_error_message(msg), ArticleFailureKind::AuthFailed);
}
#[test]
fn classifies_not_found() {
let msg = "NNTP (430) No such article";
assert_eq!(classify_error_message(msg), ArticleFailureKind::NotFound);
}
#[test]
fn classifies_service_unavailable() {
let msg = "Service unavailable (502)";
assert_eq!(classify_error_message(msg), ArticleFailureKind::ServerDown);
}
#[test]
fn classifies_timeout() {
let msg = "read timed out after 60s";
assert_eq!(classify_error_message(msg), ArticleFailureKind::Timeout);
}
#[test]
fn unknown_defaults_to_not_found() {
assert_eq!(
classify_error_message("all servers exhausted"),
ArticleFailureKind::NotFound
);
}
}
fn emit_failed(ctx: &JobContext, meta: &InFlight, failure: ArticleFailure) {
ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
let _ = ctx.progress_tx.try_send(ProgressUpdate::ArticleFailed {
job_id: meta.job_id.clone(),
file_id: meta.file_id.clone(),
segment_number: meta.segment_number,
failure,
});
ctx.resolve_one_public();
}
async fn pump_loop(entry: Arc<JobEntry>, inner: Arc<Inner>, job_id: String) {
loop {
if entry.cancelled.load(Ordering::SeqCst) {
debug!(job_id, "pump exiting: cancelled");
return;
}
if entry.paused.load(Ordering::SeqCst) {
entry.pump_wake.notified().await;
continue;
}
let next = entry.pending.lock().pop_front();
let Some(item) = next else {
entry.pump_wake.notified().await;
continue;
};
let sender = inner.handle.read().as_ref().map(|h| h.sender());
let Some(sender) = sender else {
entry.pending.lock().push_front(item);
entry.pump_wake.notified().await;
continue;
};
if let Err(e) = sender.send(item).await {
entry.pending.lock().push_front(e.0);
entry.pump_wake.notified().await;
continue;
}
}
}
fn spawn_and_install_downloader(
inner: &Arc<Inner>,
slot: &mut Option<nzb_news::DownloaderHandle>,
servers: Vec<ServerConfig>,
) {
let cfg = &inner.config;
let server_count = servers.len();
let dl_config = nzb_news::DownloaderConfig {
servers,
max_concurrent_fetches: cfg.max_concurrent_fetches,
article_timeout: cfg.article_timeout,
work_channel_capacity: cfg.work_channel_capacity,
outcome_channel_capacity: cfg.outcome_channel_capacity,
probe_policy: cfg.probe_policy.clone(),
};
let (handle, outcomes) = nzb_news::spawn_downloader(dl_config);
let inner_for_task = Arc::clone(inner);
tokio::spawn(outcome_dispatcher(inner_for_task, outcomes));
*slot = Some(handle);
info!(
servers = server_count,
"NewsDispatchEngine downloader spawned"
);
}