1use std::collections::{HashMap, VecDeque};
34use std::path::PathBuf;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
37use std::time::{Duration, Instant};
38
39use parking_lot::Mutex;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinHandle;
42use tracing::{debug, error, info, trace, warn};
43
44use nzb_core::config::ServerConfig;
45use nzb_core::models::NzbJob;
46use nzb_decode::FileAssembler;
47use nzb_decode::yenc::decode_yenc;
48use nzb_nntp::Pipeline;
49use nzb_nntp::connection::NntpConnection;
50use nzb_nntp::error::NntpError;
51
52use crate::bandwidth::BandwidthLimiter;
53
54const MAX_TRIES_PER_SERVER: u32 = 3;
60const RECONNECT_DELAY: Duration = Duration::from_secs(5);
62const MAX_RECONNECT_ATTEMPTS: u32 = 5;
64const WORKER_RAMP_DELAY: Duration = Duration::from_millis(15);
67const CIRCUIT_BREAK_THRESHOLD: u32 = 3;
69const AUTH_FAILURE_COOLDOWN: Duration = Duration::from_secs(120);
71const TRANSIENT_FAILURE_COOLDOWN: Duration = Duration::from_secs(30);
73const SUPERVISOR_INTERVAL: Duration = Duration::from_secs(1);
75
76const DEFAULT_MAX_WORKER_IDLE: Duration = Duration::from_secs(60);
81const WORKER_IDLE_POLL: Duration = Duration::from_millis(500);
83
84pub const PROGRESS_CHANNEL_CAPACITY: usize = 10_000;
90
91fn try_send_progress(tx: &mpsc::Sender<ProgressUpdate>, job_id: &str, update: ProgressUpdate) {
99 if let Err(e) = tx.try_send(update) {
100 match e {
101 mpsc::error::TrySendError::Full(_) => {
102 warn!(
103 job_id,
104 capacity = PROGRESS_CHANNEL_CAPACITY,
105 "Progress channel full — dropping update (handler backpressure)"
106 );
107 }
108 mpsc::error::TrySendError::Closed(_) => {
109 }
112 }
113 }
114}
115
116pub struct ConnectionTracker {
137 pools: Mutex<HashMap<String, ServerSlot>>,
138}
139
140#[derive(Clone)]
141struct ServerSlot {
142 name: String,
143 limit: usize,
144 semaphore: Arc<tokio::sync::Semaphore>,
145}
146
147impl ConnectionTracker {
148 pub fn new() -> Self {
149 Self {
150 pools: Mutex::new(HashMap::new()),
151 }
152 }
153
154 pub fn set_limit(&self, server_id: &str, server_name: &str, limit: usize) {
162 let mut pools = self.pools.lock();
163 match pools.get_mut(server_id) {
164 Some(slot) if slot.limit == limit && slot.name == server_name => {
165 }
167 Some(slot) if limit > slot.limit && slot.name == server_name => {
168 let added = limit - slot.limit;
169 let old = slot.limit;
170 slot.semaphore.add_permits(added);
171 slot.limit = limit;
172 info!(
173 server_id,
174 server = %server_name,
175 old_limit = old,
176 new_limit = limit,
177 added,
178 "Connection pool grew in place"
179 );
180 }
181 existing => {
182 let (prev_limit, prev_name) = match existing {
184 Some(s) => (Some(s.limit), Some(s.name.clone())),
185 None => (None, None),
186 };
187 pools.insert(
188 server_id.to_string(),
189 ServerSlot {
190 name: server_name.to_string(),
191 limit,
192 semaphore: Arc::new(tokio::sync::Semaphore::new(limit)),
193 },
194 );
195 if let Some(prev) = prev_limit {
196 let renamed = prev_name.as_deref() != Some(server_name);
197 info!(
198 server_id,
199 server = %server_name,
200 old_limit = prev,
201 new_limit = limit,
202 renamed,
203 "Connection pool replaced (shrink or rename); old permits orphaned"
204 );
205 } else {
206 info!(
207 server_id,
208 server = %server_name,
209 limit,
210 "Connection pool created"
211 );
212 }
213 }
214 }
215 }
216
217 pub fn remove_server(&self, server_id: &str) {
221 self.pools.lock().remove(server_id);
222 }
223
224 pub async fn acquire(&self, server_id: &str) -> Option<ConnectionSlot> {
228 let server_slot = {
230 let pools = self.pools.lock();
231 pools.get(server_id).cloned()?
232 };
233 if server_slot.limit == 0 {
234 return None;
235 }
236 let permit = Arc::clone(&server_slot.semaphore)
237 .acquire_owned()
238 .await
239 .ok()?;
240 Some(ConnectionSlot {
241 server_id: server_id.to_string(),
242 server_name: server_slot.name,
243 semaphore_origin: server_slot.semaphore,
244 _permit: permit,
245 })
246 }
247
248 pub fn slot_is_current(&self, slot: &ConnectionSlot) -> bool {
253 matches!(self.slot_status(slot), SlotStatus::Current)
254 }
255
256 pub fn slot_status(&self, slot: &ConnectionSlot) -> SlotStatus {
259 let pools = self.pools.lock();
260 match pools.get(&slot.server_id) {
261 Some(server_slot) => {
262 if Arc::ptr_eq(&server_slot.semaphore, &slot.semaphore_origin) {
263 SlotStatus::Current
264 } else {
265 SlotStatus::PoolReplaced
266 }
267 }
268 None => SlotStatus::ServerRemoved,
269 }
270 }
271
272 pub fn snapshot(&self) -> Vec<(String, usize, usize)> {
276 let pools = self.pools.lock();
277 pools
278 .iter()
279 .map(|(id, slot)| {
280 let active = slot
281 .limit
282 .saturating_sub(slot.semaphore.available_permits());
283 (id.clone(), active, slot.limit)
284 })
285 .collect()
286 }
287
288 pub fn total(&self) -> usize {
292 let pools = self.pools.lock();
293 pools
294 .values()
295 .map(|s| s.limit.saturating_sub(s.semaphore.available_permits()))
296 .sum()
297 }
298}
299
300impl Default for ConnectionTracker {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306#[derive(Debug, Clone, Copy, PartialEq, Eq)]
308pub enum SlotStatus {
309 Current,
311 PoolReplaced,
314 ServerRemoved,
316}
317
318pub struct ConnectionSlot {
323 server_id: String,
324 server_name: String,
325 semaphore_origin: Arc<tokio::sync::Semaphore>,
329 _permit: tokio::sync::OwnedSemaphorePermit,
330}
331
332impl ConnectionSlot {
333 pub fn server_id(&self) -> &str {
334 &self.server_id
335 }
336 pub fn server_name(&self) -> &str {
337 &self.server_name
338 }
339}
340
341#[derive(Debug)]
346pub struct ServerHealth {
347 pub consecutive_failures: u32,
348 pub disabled_until: Option<Instant>,
349 pub reason: Option<String>,
350 pub is_auth_failure: bool,
351}
352
353impl Default for ServerHealth {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359impl ServerHealth {
360 pub fn new() -> Self {
361 Self {
362 consecutive_failures: 0,
363 disabled_until: None,
364 reason: None,
365 is_auth_failure: false,
366 }
367 }
368
369 pub fn is_available(&self) -> bool {
370 match self.disabled_until {
371 None => true,
372 Some(until) => Instant::now() >= until,
373 }
374 }
375
376 pub fn record_failure(&mut self, is_auth: bool, reason: &str) {
377 self.consecutive_failures += 1;
378 self.is_auth_failure = is_auth;
379 self.reason = Some(reason.to_string());
380
381 if is_auth || self.consecutive_failures >= CIRCUIT_BREAK_THRESHOLD {
382 let cooldown = if is_auth {
383 AUTH_FAILURE_COOLDOWN
384 } else {
385 TRANSIENT_FAILURE_COOLDOWN
386 };
387 self.disabled_until = Some(Instant::now() + cooldown);
388 }
389 }
390
391 pub fn record_success(&mut self) {
392 *self = Self::new();
393 }
394}
395
396pub type ServerHealthMap = Arc<Mutex<HashMap<String, ServerHealth>>>;
397
398#[derive(Debug, Clone)]
403pub enum ProgressUpdate {
404 ArticleComplete {
405 job_id: String,
406 file_id: String,
407 segment_number: u32,
408 decoded_bytes: u64,
409 file_complete: bool,
410 server_id: Option<String>,
411 },
412 ArticleFailed {
416 job_id: String,
417 file_id: String,
418 segment_number: u32,
419 failure: crate::article_failure::ArticleFailure,
420 },
421 JobFinished {
422 job_id: String,
423 success: bool,
424 articles_failed: usize,
425 },
426 NoServersAvailable {
427 job_id: String,
428 reason: String,
429 },
430 JobAborted {
431 job_id: String,
432 reason: String,
433 },
434}
435
436#[derive(Debug, Clone)]
441pub(crate) struct WorkItem {
442 pub(crate) job_id: String,
443 pub(crate) file_id: String,
444 pub(crate) filename: String,
445 pub(crate) message_id: String,
446 pub(crate) segment_number: u32,
447 pub(crate) tried_servers: Vec<String>,
449 pub(crate) tries_on_current: u32,
451}
452
453pub(crate) struct JobContext {
463 pub job_id: String,
464 pub work_dir: PathBuf,
465 pub assembler: Arc<FileAssembler>,
466 pub progress_tx: mpsc::Sender<ProgressUpdate>,
467 pub yenc_names: Arc<Mutex<HashMap<String, String>>>,
468 pub nzb_filenames: HashMap<String, String>,
469 pub articles_remaining: AtomicUsize,
472 pub articles_failed: AtomicUsize,
473 pub paused: AtomicBool,
474 pub cancelled: AtomicBool,
475 pub abort_reason: Mutex<Option<String>>,
479 pub total_decode_us: Arc<AtomicU64>,
480 pub total_assemble_us: Arc<AtomicU64>,
481 pub total_articles_decoded: Arc<AtomicU64>,
482 pub engine_start: Instant,
483 pub total_bytes: u64,
485 finished: AtomicBool,
487}
488
489pub(crate) type JobContextMap = Arc<Mutex<HashMap<String, Arc<JobContext>>>>;
490
491impl JobContext {
492 fn new(
493 job: &NzbJob,
494 assembler: Arc<FileAssembler>,
495 progress_tx: mpsc::Sender<ProgressUpdate>,
496 total_articles: usize,
497 ) -> Self {
498 let nzb_filenames = job
499 .files
500 .iter()
501 .map(|f| (f.id.clone(), f.filename.clone()))
502 .collect();
503 Self {
504 job_id: job.id.clone(),
505 work_dir: job.work_dir.clone(),
506 assembler,
507 progress_tx,
508 yenc_names: Arc::new(Mutex::new(HashMap::new())),
509 nzb_filenames,
510 articles_remaining: AtomicUsize::new(total_articles),
511 articles_failed: AtomicUsize::new(0),
512 paused: AtomicBool::new(false),
513 cancelled: AtomicBool::new(false),
514 abort_reason: Mutex::new(None),
515 total_decode_us: Arc::new(AtomicU64::new(0)),
516 total_assemble_us: Arc::new(AtomicU64::new(0)),
517 total_articles_decoded: Arc::new(AtomicU64::new(0)),
518 engine_start: Instant::now(),
519 total_bytes: job.total_bytes,
520 finished: AtomicBool::new(false),
521 }
522 }
523
524 pub(crate) fn resolve_one_public(&self) {
527 self.resolve_one();
528 }
529
530 pub(crate) fn emit_terminal_public(&self) {
532 self.emit_terminal();
533 }
534
535 fn resolve_one(&self) {
538 let prev = self.articles_remaining.fetch_sub(1, Ordering::Relaxed);
539 if prev != 1 {
540 return;
541 }
542 self.emit_terminal();
543 }
544
545 fn emit_terminal(&self) {
548 if self.finished.swap(true, Ordering::Relaxed) {
549 return;
550 }
551
552 self.deobfuscate_files();
555
556 let download_elapsed = self.engine_start.elapsed();
557 let decode_total_us = self.total_decode_us.load(Ordering::Relaxed);
558 let assemble_total_us = self.total_assemble_us.load(Ordering::Relaxed);
559 let articles_decoded = self.total_articles_decoded.load(Ordering::Relaxed);
560 let elapsed_us = download_elapsed.as_micros().max(1);
561 let throughput_mbps = (self.total_bytes as f64 / download_elapsed.as_secs_f64().max(0.001))
562 / (1024.0 * 1024.0);
563 info!(
564 job_id = %self.job_id,
565 elapsed_secs = download_elapsed.as_secs_f64(),
566 total_bytes = self.total_bytes,
567 throughput_mbps = format!("{throughput_mbps:.2}"),
568 "Download phase complete"
569 );
570 info!(
571 job_id = %self.job_id,
572 articles_decoded,
573 decode_secs = format!("{:.3}", decode_total_us as f64 / 1_000_000.0),
574 assemble_secs = format!("{:.3}", assemble_total_us as f64 / 1_000_000.0),
575 decode_pct = format!("{:.1}", decode_total_us as f64 / elapsed_us as f64 * 100.0),
576 assemble_pct = format!("{:.1}", assemble_total_us as f64 / elapsed_us as f64 * 100.0),
577 "Decode timing summary (cumulative across all workers)"
578 );
579
580 let abort_reason = self.abort_reason.lock().clone();
581 if let Some(reason) = abort_reason {
582 try_send_progress(
583 &self.progress_tx,
584 &self.job_id,
585 ProgressUpdate::JobAborted {
586 job_id: self.job_id.clone(),
587 reason,
588 },
589 );
590 return;
591 }
592
593 let failed = self.articles_failed.load(Ordering::Relaxed);
594 try_send_progress(
595 &self.progress_tx,
596 &self.job_id,
597 ProgressUpdate::JobFinished {
598 job_id: self.job_id.clone(),
599 success: failed == 0,
600 articles_failed: failed,
601 },
602 );
603 }
604
605 fn deobfuscate_files(&self) {
608 let renames = self.yenc_names.lock();
609 for (file_id, yenc_name) in renames.iter() {
610 let Some(nzb_name) = self.nzb_filenames.get(file_id) else {
611 continue;
612 };
613 if nzb_name == yenc_name {
614 continue;
615 }
616 let clean_yenc = std::path::Path::new(yenc_name.as_str())
617 .file_name()
618 .and_then(|n| n.to_str())
619 .unwrap_or(yenc_name);
620 if clean_yenc.is_empty() || nzb_name == clean_yenc {
621 continue;
622 }
623
624 let nzb_has_ext = has_known_extension(nzb_name);
625 let yenc_has_ext = has_known_extension(clean_yenc);
626
627 let (old_name, new_name) = if yenc_has_ext && !nzb_has_ext {
628 (nzb_name.as_str(), clean_yenc)
629 } else if nzb_has_ext && !yenc_has_ext {
630 continue;
631 } else if yenc_has_ext && nzb_has_ext {
632 (nzb_name.as_str(), clean_yenc)
633 } else {
634 continue;
635 };
636
637 let old_path = self.work_dir.join(old_name);
638 let new_path = self.work_dir.join(new_name);
639 if old_path.exists() && !new_path.exists() {
640 if let Err(e) = std::fs::rename(&old_path, &new_path) {
641 warn!(
642 job_id = %self.job_id,
643 from = %old_name,
644 to = %new_name,
645 "Failed to deobfuscate file: {e}"
646 );
647 } else {
648 info!(
649 job_id = %self.job_id,
650 from = %old_name,
651 to = %new_name,
652 "Deobfuscated file"
653 );
654 }
655 }
656 }
657 }
658}
659
660pub(crate) struct SharedWorkQueue {
671 inner: Mutex<InnerState>,
672 notify: Notify,
673}
674
675struct InnerState {
676 items: VecDeque<WorkItem>,
680 last_served: HashMap<String, String>,
687}
688
689impl SharedWorkQueue {
690 pub fn new() -> Self {
691 Self {
692 inner: Mutex::new(InnerState {
693 items: VecDeque::new(),
694 last_served: HashMap::new(),
695 }),
696 notify: Notify::new(),
697 }
698 }
699
700 pub fn submit_items(&self, mut items: Vec<WorkItem>) {
704 items.sort_by_key(|item| par2_sort_key(&item.filename));
705 let had_items = !items.is_empty();
706 {
707 let mut state = self.inner.lock();
708 state.items.reserve(items.len());
709 for item in items {
710 state.items.push_back(item);
711 }
712 }
713 if had_items {
714 self.notify.notify_waiters();
715 }
716 }
717
718 fn push_front(&self, item: WorkItem) {
722 self.inner.lock().items.push_front(item);
723 self.notify.notify_waiters();
724 }
725
726 fn push_back(&self, item: WorkItem) {
729 self.inner.lock().items.push_back(item);
730 self.notify.notify_waiters();
731 }
732
733 pub(crate) fn workable_count_for(
748 &self,
749 server_id: &str,
750 higher_priority_servers: &[String],
751 ) -> (usize, usize) {
752 let state = self.inner.lock();
753 let total = state.items.len();
754 let workable = state
755 .items
756 .iter()
757 .filter(|i| !i.tried_servers.iter().any(|s| s == server_id))
758 .filter(|i| {
759 higher_priority_servers
760 .iter()
761 .all(|hp| i.tried_servers.contains(hp))
762 })
763 .count();
764 (workable, total)
765 }
766
767 fn pop_workable(
787 &self,
788 server_id: &str,
789 higher_priority_servers: &[String],
790 ) -> Option<WorkItem> {
791 let mut state = self.inner.lock();
792
793 let eligible = |item: &WorkItem| -> bool {
794 !item.tried_servers.iter().any(|s| s == server_id)
795 && higher_priority_servers
796 .iter()
797 .all(|hp| item.tried_servers.contains(hp))
798 };
799
800 let last_served = state.last_served.get(server_id).cloned();
801
802 let mut chosen = None;
804 if let Some(ref last) = last_served {
805 chosen = state
806 .items
807 .iter()
808 .position(|item| eligible(item) && item.job_id != *last);
809 }
810
811 if chosen.is_none() {
813 chosen = state.items.iter().position(eligible);
814 }
815
816 let idx = chosen?;
817 let item = state.items.remove(idx)?;
821 state
822 .last_served
823 .insert(server_id.to_string(), item.job_id.clone());
824 Some(item)
825 }
826
827 fn drain_job(&self, job_id: &str) -> Vec<WorkItem> {
829 let mut state = self.inner.lock();
830 let mut kept = VecDeque::with_capacity(state.items.len());
831 let mut drained = Vec::new();
832 while let Some(item) = state.items.pop_front() {
833 if item.job_id == job_id {
834 drained.push(item);
835 } else {
836 kept.push_back(item);
837 }
838 }
839 state.items = kept;
840 state.last_served.retain(|_, v| v != job_id);
844 drained
845 }
846
847 pub fn len(&self) -> usize {
848 self.inner.lock().items.len()
849 }
850}
851
852impl Default for SharedWorkQueue {
853 fn default() -> Self {
854 Self::new()
855 }
856}
857
858struct ActiveWorker {
863 shutdown: Arc<AtomicBool>,
864 last_progress: Arc<AtomicU64>,
869 handle: JoinHandle<()>,
870}
871
872pub struct WorkerPool {
874 work_queue: Arc<SharedWorkQueue>,
875 job_contexts: JobContextMap,
876 servers: Arc<Mutex<Vec<ServerConfig>>>,
877 server_health: ServerHealthMap,
878 bandwidth: Arc<BandwidthLimiter>,
879 conn_tracker: Arc<ConnectionTracker>,
880 stall_timeout: Option<Duration>,
881 created_at: Instant,
884 max_worker_idle: Mutex<Duration>,
888 starvation_log: Mutex<HashMap<String, Instant>>,
891 evictions: AtomicU64,
895 workers: Mutex<HashMap<String, Vec<ActiveWorker>>>,
896 shutdown: Arc<AtomicBool>,
897 supervisor_handle: Mutex<Option<JoinHandle<()>>>,
898}
899
900impl WorkerPool {
901 pub fn new(
902 servers: Arc<Mutex<Vec<ServerConfig>>>,
903 bandwidth: Arc<BandwidthLimiter>,
904 conn_tracker: Arc<ConnectionTracker>,
905 stall_timeout_secs: u64,
906 ) -> Arc<Self> {
907 let stall_timeout = if stall_timeout_secs > 0 {
908 Some(Duration::from_secs(stall_timeout_secs))
909 } else {
910 None
911 };
912 Arc::new(Self {
913 work_queue: Arc::new(SharedWorkQueue::new()),
914 job_contexts: Arc::new(Mutex::new(HashMap::new())),
915 servers,
916 server_health: Arc::new(Mutex::new(HashMap::new())),
917 bandwidth,
918 conn_tracker,
919 stall_timeout,
920 created_at: Instant::now(),
921 max_worker_idle: Mutex::new(DEFAULT_MAX_WORKER_IDLE),
922 starvation_log: Mutex::new(HashMap::new()),
923 evictions: AtomicU64::new(0),
924 workers: Mutex::new(HashMap::new()),
925 shutdown: Arc::new(AtomicBool::new(false)),
926 supervisor_handle: Mutex::new(None),
927 })
928 }
929
930 pub fn set_max_worker_idle(&self, d: Duration) {
933 *self.max_worker_idle.lock() = d;
934 }
935
936 pub fn max_worker_idle(&self) -> Duration {
938 *self.max_worker_idle.lock()
939 }
940
941 fn elapsed_ms(&self) -> u64 {
944 self.created_at.elapsed().as_millis() as u64
945 }
946
947 fn created_at(&self) -> Instant {
952 self.created_at
953 }
954
955 fn higher_priority_servers(&self, my_priority: u8, my_server_id: &str) -> Vec<String> {
963 let servers = self.servers.lock();
964 let health = self.server_health.lock();
965 servers
966 .iter()
967 .filter(|s| s.enabled && s.priority < my_priority && s.id != my_server_id)
968 .filter(|s| health.get(&s.id).is_none_or(|h| h.is_available()))
969 .map(|s| s.id.clone())
970 .collect()
971 }
972
973 pub fn eviction_count(&self) -> u64 {
978 self.evictions.load(Ordering::Relaxed)
979 }
980
981 pub fn start(self: &Arc<Self>) {
984 self.reconcile_servers();
985
986 let this = Arc::clone(self);
987 let handle = tokio::spawn(async move {
988 this.supervisor_loop().await;
989 });
990 *self.supervisor_handle.lock() = Some(handle);
991 }
992
993 pub fn reconcile_servers(self: &Arc<Self>) {
1000 if self.shutdown.load(Ordering::Relaxed) {
1001 return;
1002 }
1003
1004 let servers_snapshot: Vec<ServerConfig> = self.servers.lock().clone();
1005 let mut workers = self.workers.lock();
1006
1007 let mut retire: Vec<String> = Vec::new();
1009 for key in workers.keys() {
1010 let still_active = servers_snapshot.iter().any(|s| s.enabled && &s.id == key);
1011 if !still_active {
1012 retire.push(key.clone());
1013 }
1014 }
1015 for key in retire {
1016 if let Some(list) = workers.remove(&key) {
1017 for w in list {
1018 w.shutdown.store(true, Ordering::Relaxed);
1019 drop(w.handle);
1022 }
1023 }
1024 }
1025
1026 for server in &servers_snapshot {
1028 if !server.enabled {
1029 continue;
1030 }
1031 let target = (server.connections as usize).min(500);
1032 let entry = workers.entry(server.id.clone()).or_default();
1033
1034 while entry.len() > target {
1036 if let Some(w) = entry.pop() {
1037 w.shutdown.store(true, Ordering::Relaxed);
1038 drop(w.handle);
1039 }
1040 }
1041
1042 let current = entry.len();
1044 for conn_idx in current..target {
1045 let worker_shutdown = Arc::new(AtomicBool::new(false));
1046 let last_progress = Arc::new(AtomicU64::new(self.elapsed_ms()));
1049 let pool = Arc::clone(self);
1050 let server_clone = server.clone();
1051 let ws_clone = Arc::clone(&worker_shutdown);
1052 let lp_clone = Arc::clone(&last_progress);
1053 let handle = tokio::spawn(async move {
1054 pool_worker(pool, server_clone, conn_idx, ws_clone, lp_clone).await;
1055 });
1056 entry.push(ActiveWorker {
1057 shutdown: worker_shutdown,
1058 last_progress,
1059 handle,
1060 });
1061 }
1062 }
1063 }
1064
1065 pub(crate) fn submit_job(self: &Arc<Self>, ctx: Arc<JobContext>, items: Vec<WorkItem>) {
1068 let job_id = ctx.job_id.clone();
1069 if items.is_empty() {
1070 ctx.emit_terminal();
1072 return;
1073 }
1074 self.job_contexts.lock().insert(job_id.clone(), ctx);
1075 self.work_queue.submit_items(items);
1076 debug!(job_id = %job_id, queue_len = self.work_queue.len(), "Job submitted to worker pool");
1077 }
1078
1079 pub fn pause_job(&self, job_id: &str) {
1082 if let Some(ctx) = self.job_contexts.lock().get(job_id) {
1083 ctx.paused.store(true, Ordering::Relaxed);
1084 }
1085 }
1086
1087 pub fn resume_job(&self, job_id: &str) {
1089 if let Some(ctx) = self.job_contexts.lock().get(job_id) {
1090 ctx.paused.store(false, Ordering::Relaxed);
1091 self.work_queue.notify.notify_waiters();
1093 }
1094 }
1095
1096 pub fn abort_job(&self, job_id: &str, reason: String) {
1099 let ctx = self.job_contexts.lock().get(job_id).cloned();
1100 let Some(ctx) = ctx else {
1101 return;
1102 };
1103 *ctx.abort_reason.lock() = Some(reason);
1104 ctx.cancelled.store(true, Ordering::Relaxed);
1105 let drained = self.work_queue.drain_job(job_id);
1106 for _ in drained {
1109 ctx.resolve_one();
1110 }
1111 ctx.emit_terminal();
1112 self.job_contexts.lock().remove(job_id);
1113 }
1114
1115 pub fn cancel_job(&self, job_id: &str) {
1119 let ctx = self.job_contexts.lock().remove(job_id);
1120 let Some(ctx) = ctx else {
1121 return;
1122 };
1123 ctx.cancelled.store(true, Ordering::Relaxed);
1124 let _ = self.work_queue.drain_job(job_id);
1125 }
1126
1127 fn mark_no_servers(&self, job_id: &str, reason: String) {
1129 let ctx = self.job_contexts.lock().remove(job_id);
1130 let Some(ctx) = ctx else {
1131 return;
1132 };
1133 ctx.paused.store(true, Ordering::Relaxed);
1134 try_send_progress(
1135 &ctx.progress_tx,
1136 &ctx.job_id,
1137 ProgressUpdate::NoServersAvailable {
1138 job_id: ctx.job_id.clone(),
1139 reason,
1140 },
1141 );
1142 let _ = self.work_queue.drain_job(job_id);
1144 }
1145
1146 async fn supervisor_loop(self: Arc<Self>) {
1152 let mut ticker = tokio::time::interval(SUPERVISOR_INTERVAL);
1153 loop {
1154 ticker.tick().await;
1155 if self.shutdown.load(Ordering::Relaxed) {
1156 break;
1157 }
1158
1159 let now_ms = self.elapsed_ms();
1162 let max_idle_ms = self.max_worker_idle().as_millis() as u64;
1163
1164 let server_priorities: Vec<(String, u8)> = {
1171 let srv = self.servers.lock();
1172 srv.iter()
1173 .filter(|s| s.enabled)
1174 .map(|s| (s.id.clone(), s.priority))
1175 .collect()
1176 };
1177 let has_workable: HashMap<String, bool> = server_priorities
1178 .iter()
1179 .map(|(sid, prio)| {
1180 let hp = self.higher_priority_servers(*prio, sid);
1181 let (workable, _) = self.work_queue.workable_count_for(sid, &hp);
1182 (sid.clone(), workable > 0)
1183 })
1184 .collect();
1185
1186 {
1187 let workers = self.workers.lock();
1188 for (server_id, list) in workers.iter() {
1189 for (idx, w) in list.iter().enumerate() {
1190 if w.shutdown.load(Ordering::Relaxed) {
1191 continue;
1192 }
1193 let last = w.last_progress.load(Ordering::Relaxed);
1194 let idle = now_ms.saturating_sub(last);
1195 if idle > max_idle_ms {
1196 if !has_workable.get(server_id).copied().unwrap_or(true) {
1203 continue;
1204 }
1205 warn!(
1206 server = %server_id,
1207 worker_idx = idx,
1208 idle_ms = idle,
1209 max_idle_ms,
1210 "Idle-worker watchdog: evicting stalled worker"
1211 );
1212 w.shutdown.store(true, Ordering::Relaxed);
1213 self.evictions.fetch_add(1, Ordering::Relaxed);
1214 }
1215 }
1216 }
1217 }
1218
1219 {
1221 let mut workers = self.workers.lock();
1222 for (_id, list) in workers.iter_mut() {
1223 list.retain(|w| !w.handle.is_finished());
1224 }
1225 }
1226 self.reconcile_servers();
1228
1229 let enabled_servers: Vec<String> =
1236 server_priorities.iter().map(|(id, _)| id.clone()).collect();
1237 let now_instant = Instant::now();
1238 for (sid, prio) in &server_priorities {
1239 let hp = self.higher_priority_servers(*prio, sid);
1240 let (workable, total) = self.work_queue.workable_count_for(sid, &hp);
1241 if workable == 0 && total > 0 {
1242 let mut log = self.starvation_log.lock();
1243 let should_log = log
1244 .get(sid)
1245 .map(|t| now_instant.duration_since(*t) >= Duration::from_secs(60))
1246 .unwrap_or(true);
1247 if should_log {
1248 log.insert(sid.clone(), now_instant);
1249 let reason = if hp.is_empty() {
1250 "every item has been tried here already"
1251 } else {
1252 "every item has been tried here, or is waiting for a higher-priority server"
1253 };
1254 info!(
1255 server = %sid,
1256 total_items = total,
1257 higher_priority_servers = hp.len(),
1258 "Queue has items but none are workable for this server ({reason})"
1259 );
1260 }
1261 }
1262 }
1263
1264 if enabled_servers.is_empty() {
1266 continue;
1267 }
1268 let healthy_servers: Vec<String> = {
1269 let health = self.server_health.lock();
1270 enabled_servers
1271 .iter()
1272 .filter(|sid| health.get(sid.as_str()).is_none_or(|h| h.is_available()))
1273 .cloned()
1274 .collect()
1275 };
1276 let all_broken = healthy_servers.is_empty();
1277
1278 let ctxs: Vec<Arc<JobContext>> = self.job_contexts.lock().values().cloned().collect();
1279 for ctx in ctxs {
1280 if ctx.articles_remaining.load(Ordering::Relaxed) == 0 {
1281 continue;
1282 }
1283 if ctx.cancelled.load(Ordering::Relaxed) {
1284 continue;
1285 }
1286 if all_broken {
1287 let reason = {
1288 let health = self.server_health.lock();
1289 health
1290 .values()
1291 .filter_map(|h| h.reason.clone())
1292 .next()
1293 .unwrap_or_else(|| "All servers unavailable".into())
1294 };
1295 warn!(
1296 job_id = %ctx.job_id,
1297 remaining = ctx.articles_remaining.load(Ordering::Relaxed),
1298 "All servers circuit-broken — pausing job for user intervention"
1299 );
1300 self.mark_no_servers(&ctx.job_id, reason);
1301 }
1302 }
1303 }
1304 }
1305
1306 pub async fn shutdown(self: &Arc<Self>) {
1308 self.shutdown.store(true, Ordering::Relaxed);
1309 let handles: Vec<JoinHandle<()>> = {
1310 let mut workers = self.workers.lock();
1311 let mut out = Vec::new();
1312 for (_id, list) in workers.drain() {
1313 for w in list {
1314 w.shutdown.store(true, Ordering::Relaxed);
1315 out.push(w.handle);
1316 }
1317 }
1318 out
1319 };
1320 self.work_queue.notify.notify_waiters();
1322
1323 let timeout = Duration::from_secs(10);
1324 for h in handles {
1325 let _ = tokio::time::timeout(timeout, h).await;
1326 }
1327
1328 if let Some(h) = self.supervisor_handle.lock().take() {
1329 h.abort();
1330 }
1331 }
1332
1333 pub fn conn_tracker(&self) -> &Arc<ConnectionTracker> {
1334 &self.conn_tracker
1335 }
1336
1337 pub fn has_job(&self, job_id: &str) -> bool {
1339 self.job_contexts.lock().contains_key(job_id)
1340 }
1341}
1342
1343async fn pool_worker(
1356 pool: Arc<WorkerPool>,
1357 primary_server: ServerConfig,
1358 conn_idx: usize,
1359 worker_shutdown: Arc<AtomicBool>,
1360 last_progress: Arc<AtomicU64>,
1361) {
1362 let worker_id = format!("{}#{}", primary_server.id, conn_idx);
1363
1364 if conn_idx > 0 {
1366 let stagger = WORKER_RAMP_DELAY * conn_idx as u32;
1367 tokio::time::sleep(stagger).await;
1368 }
1369
1370 let should_exit = |worker_shutdown: &Arc<AtomicBool>, pool: &Arc<WorkerPool>| {
1371 worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed)
1372 };
1373
1374 let mut conn_slot = match pool.conn_tracker.acquire(&primary_server.id).await {
1377 Some(slot) => slot,
1378 None => {
1379 info!(
1380 worker = %worker_id,
1381 server = %primary_server.name,
1382 "No connection slot available (server removed or limit=0); worker exiting"
1383 );
1384 return;
1385 }
1386 };
1387
1388 'reconnect: loop {
1389 if should_exit(&worker_shutdown, &pool) {
1390 return;
1391 }
1392
1393 match pool.conn_tracker.slot_status(&conn_slot) {
1396 SlotStatus::Current => {}
1397 SlotStatus::PoolReplaced => {
1398 info!(
1399 worker = %worker_id,
1400 server = %primary_server.name,
1401 reason = "pool_replaced",
1402 "Connection slot is stale (connection limit changed); worker exiting"
1403 );
1404 return;
1405 }
1406 SlotStatus::ServerRemoved => {
1407 info!(
1408 worker = %worker_id,
1409 server = %primary_server.name,
1410 reason = "server_removed",
1411 "Connection slot is stale (server removed from tracker); worker exiting"
1412 );
1413 return;
1414 }
1415 }
1416
1417 let circuit_broken = {
1420 let health = pool.server_health.lock();
1421 health
1422 .get(&primary_server.id)
1423 .is_some_and(|h| !h.is_available())
1424 };
1425 if circuit_broken {
1426 tokio::time::sleep(WORKER_IDLE_POLL).await;
1427 continue 'reconnect;
1428 }
1429
1430 info!(
1431 worker = %worker_id,
1432 server = %primary_server.name,
1433 host = %primary_server.host,
1434 port = primary_server.port,
1435 ssl = primary_server.ssl,
1436 conn_idx,
1437 "Worker starting — connecting to primary server"
1438 );
1439
1440 let mut conn = NntpConnection::new(worker_id.clone());
1441 conn.set_io_heartbeat(last_progress.clone(), pool.created_at());
1449 if let Err(e) = connect_with_retry(
1450 &mut conn,
1451 &primary_server,
1452 &worker_id,
1453 &pool.server_health,
1454 &pool.servers,
1455 )
1456 .await
1457 {
1458 warn!(
1459 worker = %worker_id,
1460 server = %primary_server.name,
1461 host = %primary_server.host,
1462 "Worker FAILED to connect after all retries: {e}"
1463 );
1464 if should_exit(&worker_shutdown, &pool) {
1465 return;
1466 }
1467 tokio::time::sleep(RECONNECT_DELAY).await;
1468 continue 'reconnect;
1469 }
1470
1471 let pipe_depth = primary_server.pipelining.max(1);
1472 let active_conns = pool.conn_tracker.total();
1473 info!(
1474 worker = %worker_id,
1475 server = %primary_server.name,
1476 host = %primary_server.host,
1477 pipelining = pipe_depth,
1478 total_nntp_connections = active_conns,
1479 "Worker connected and ready"
1480 );
1481
1482 let reconnect_needed = if pipe_depth <= 1 {
1488 run_worker_serial(
1489 &pool,
1490 &primary_server,
1491 &worker_id,
1492 &worker_shutdown,
1493 &mut conn,
1494 &mut conn_slot,
1495 &last_progress,
1496 )
1497 .await
1498 } else {
1499 run_worker_pipelined(
1500 &pool,
1501 &primary_server,
1502 &worker_id,
1503 pipe_depth,
1504 &worker_shutdown,
1505 &mut conn,
1506 &mut conn_slot,
1507 &last_progress,
1508 )
1509 .await
1510 };
1511
1512 let _ = conn.quit().await;
1513
1514 match reconnect_needed {
1515 WorkerExit::Reconnect => {
1516 continue 'reconnect;
1518 }
1519 WorkerExit::Exit => {
1520 return;
1522 }
1523 }
1524 }
1525}
1526
1527enum WorkerExit {
1528 Exit,
1530 Reconnect,
1532}
1533
1534async fn next_work_item(
1541 pool: &Arc<WorkerPool>,
1542 server_id: &str,
1543 higher_priority_servers: &[String],
1544 worker_shutdown: &Arc<AtomicBool>,
1545) -> Option<(WorkItem, Arc<JobContext>)> {
1546 loop {
1547 if worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed) {
1548 return None;
1549 }
1550
1551 if let Some(item) = pool
1552 .work_queue
1553 .pop_workable(server_id, higher_priority_servers)
1554 {
1555 let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
1558 let Some(ctx) = ctx else {
1559 continue;
1560 };
1561 if ctx.cancelled.load(Ordering::Relaxed) {
1562 continue;
1563 }
1564 if ctx.paused.load(Ordering::Relaxed) {
1566 pool.work_queue.push_back(item);
1567 tokio::time::sleep(WORKER_IDLE_POLL).await;
1568 continue;
1569 }
1570 return Some((item, ctx));
1571 }
1572
1573 let notified = pool.work_queue.notify.notified();
1576 tokio::select! {
1577 _ = notified => {}
1578 _ = tokio::time::sleep(WORKER_IDLE_POLL) => {}
1579 }
1580 }
1581}
1582
1583async fn run_worker_serial(
1584 pool: &Arc<WorkerPool>,
1585 primary_server: &ServerConfig,
1586 worker_id: &str,
1587 worker_shutdown: &Arc<AtomicBool>,
1588 conn: &mut NntpConnection,
1589 _conn_slot: &mut ConnectionSlot,
1590 last_progress: &Arc<AtomicU64>,
1591) -> WorkerExit {
1592 let mut consecutive_errors: u32 = 0;
1593
1594 loop {
1595 let server_disabled = pool
1597 .servers
1598 .lock()
1599 .iter()
1600 .find(|s| s.id == primary_server.id)
1601 .is_none_or(|s| !s.enabled);
1602 if server_disabled {
1603 info!(
1604 worker = %worker_id,
1605 server = %primary_server.name,
1606 "Server disabled, worker exiting"
1607 );
1608 return WorkerExit::Exit;
1609 }
1610 {
1611 let health = pool.server_health.lock();
1612 if let Some(h) = health.get(&primary_server.id)
1613 && !h.is_available()
1614 {
1615 info!(
1616 worker = %worker_id,
1617 server = %primary_server.name,
1618 reason = h.reason.as_deref().unwrap_or("unknown"),
1619 "Server circuit-broken, worker reconnecting after cooldown"
1620 );
1621 return WorkerExit::Reconnect;
1622 }
1623 }
1624
1625 let higher_priority_servers =
1632 pool.higher_priority_servers(primary_server.priority, &primary_server.id);
1633
1634 let Some((mut item, ctx)) = next_work_item(
1635 pool,
1636 &primary_server.id,
1637 &higher_priority_servers,
1638 worker_shutdown,
1639 )
1640 .await
1641 else {
1642 return WorkerExit::Exit;
1643 };
1644
1645 let fetch_fut =
1646 fetch_article_with_retry(conn, &item, &ctx.assembler, primary_server, worker_id);
1647 let result = if let Some(timeout) = pool.stall_timeout {
1648 match tokio::time::timeout(timeout, fetch_fut).await {
1649 Ok(r) => r,
1650 Err(_) => {
1651 warn!(
1652 worker = %worker_id,
1653 server = %primary_server.name,
1654 article = %item.message_id,
1655 "Connection stalled — no response within {}s, reconnecting",
1656 timeout.as_secs()
1657 );
1658 pool.work_queue.push_front(item);
1659 return WorkerExit::Reconnect;
1660 }
1661 }
1662 } else {
1663 fetch_fut.await
1664 };
1665
1666 match result {
1667 Ok(process_result) => {
1668 consecutive_errors = 0;
1669 ctx.total_decode_us
1670 .fetch_add(process_result.decode_us, Ordering::Relaxed);
1671 ctx.total_assemble_us
1672 .fetch_add(process_result.assemble_us, Ordering::Relaxed);
1673 ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
1674 if let Some(ref yname) = process_result.yenc_filename {
1675 ctx.yenc_names
1676 .lock()
1677 .entry(item.file_id.clone())
1678 .or_insert_with(|| crate::util::normalize_nfc(yname));
1679 }
1680 if let Some(n) = std::num::NonZeroU32::new(process_result.decoded_bytes as u32) {
1681 let _ = pool.bandwidth.acquire_download(n).await;
1682 }
1683 try_send_progress(
1684 &ctx.progress_tx,
1685 &item.job_id,
1686 ProgressUpdate::ArticleComplete {
1687 job_id: item.job_id.clone(),
1688 file_id: item.file_id.clone(),
1689 segment_number: item.segment_number,
1690 decoded_bytes: process_result.decoded_bytes,
1691 file_complete: process_result.file_complete,
1692 server_id: Some(primary_server.id.clone()),
1693 },
1694 );
1695 ctx.resolve_one();
1696 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1698 }
1699 Err(ArticleError::ArticleNotFound) => {
1700 if handle_article_not_available(
1701 &mut item,
1702 primary_server,
1703 &pool.servers,
1704 &pool.server_health,
1705 &ctx,
1706 &pool.work_queue,
1707 crate::article_failure::ArticleFailureKind::NotFound,
1708 "Article not found on any server",
1709 ) {
1710 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1711 }
1712 }
1713 Err(ArticleError::ConnectionLost(msg)) => {
1714 consecutive_errors += 1;
1715 warn!(
1716 worker = %worker_id,
1717 server = %primary_server.name,
1718 host = %primary_server.host,
1719 consecutive_errors,
1720 max_reconnects = MAX_RECONNECT_ATTEMPTS,
1721 article = %item.message_id,
1722 "Connection lost: {msg}"
1723 );
1724 pool.work_queue.push_front(item);
1725 if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
1726 warn!(
1727 worker = %worker_id,
1728 server = %primary_server.name,
1729 host = %primary_server.host,
1730 consecutive_errors,
1731 "Too many consecutive errors — worker reconnecting"
1732 );
1733 return WorkerExit::Reconnect;
1734 }
1735 return WorkerExit::Reconnect;
1736 }
1737 Err(ArticleError::DecodeError(msg)) => {
1738 if handle_article_not_available(
1739 &mut item,
1740 primary_server,
1741 &pool.servers,
1742 &pool.server_health,
1743 &ctx,
1744 &pool.work_queue,
1745 crate::article_failure::ArticleFailureKind::DecodeError,
1746 &format!("Decode error: {msg}"),
1747 ) {
1748 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1749 }
1750 }
1751 Err(ArticleError::AssemblyError(msg)) => {
1752 error!(article = %item.message_id, "Assembly error: {msg}");
1753 try_send_progress(
1754 &ctx.progress_tx,
1755 &item.job_id,
1756 ProgressUpdate::ArticleFailed {
1757 job_id: item.job_id.clone(),
1758 file_id: item.file_id.clone(),
1759 segment_number: item.segment_number,
1760 failure: crate::article_failure::ArticleFailure::decode_error(
1761 &primary_server.id,
1762 format!("Assembly error: {msg}"),
1763 ),
1764 },
1765 );
1766 ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
1767 ctx.resolve_one();
1768 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
1769 }
1770 }
1771 }
1772}
1773
1774#[allow(clippy::too_many_arguments)]
1775async fn run_worker_pipelined(
1776 pool: &Arc<WorkerPool>,
1777 primary_server: &ServerConfig,
1778 worker_id: &str,
1779 pipe_depth: u8,
1780 worker_shutdown: &Arc<AtomicBool>,
1781 conn: &mut NntpConnection,
1782 _conn_slot: &mut ConnectionSlot,
1783 last_progress: &Arc<AtomicU64>,
1784) -> WorkerExit {
1785 let mut pipeline = Pipeline::new(pipe_depth);
1786 let mut in_flight_items: HashMap<u64, WorkItem> = HashMap::new();
1787 let mut next_tag: u64 = 0;
1788 let mut consecutive_errors: u32 = 0;
1789
1790 let mut perf_articles: u64 = 0;
1792 let mut perf_bytes: u64 = 0;
1793 let mut perf_queue_lock_us: u64 = 0;
1794 let mut perf_receive_us: u64 = 0;
1795 let mut perf_decode_us: u64 = 0;
1796 let mut perf_assemble_us: u64 = 0;
1797 let mut perf_bandwidth_us: u64 = 0;
1798 let mut perf_yield_us: u64 = 0;
1799 let mut perf_flush_us: u64 = 0;
1800 let mut perf_last_log = Instant::now();
1801 const PERF_LOG_INTERVAL: Duration = Duration::from_secs(10);
1802
1803 loop {
1804 if worker_shutdown.load(Ordering::Relaxed) || pool.shutdown.load(Ordering::Relaxed) {
1805 requeue_all(&mut in_flight_items, &pool.work_queue);
1806 return WorkerExit::Exit;
1807 }
1808
1809 let server_disabled = pool
1811 .servers
1812 .lock()
1813 .iter()
1814 .find(|s| s.id == primary_server.id)
1815 .is_none_or(|s| !s.enabled);
1816 if server_disabled {
1817 info!(
1818 worker = %worker_id,
1819 server = %primary_server.name,
1820 "Server disabled, worker exiting"
1821 );
1822 requeue_all(&mut in_flight_items, &pool.work_queue);
1823 return WorkerExit::Exit;
1824 }
1825 {
1826 let health = pool.server_health.lock();
1827 if let Some(h) = health.get(&primary_server.id)
1828 && !h.is_available()
1829 {
1830 info!(
1831 worker = %worker_id,
1832 server = %primary_server.name,
1833 reason = h.reason.as_deref().unwrap_or("unknown"),
1834 "Server circuit-broken, worker reconnecting after cooldown"
1835 );
1836 requeue_all(&mut in_flight_items, &pool.work_queue);
1837 return WorkerExit::Reconnect;
1838 }
1839 }
1840
1841 let higher_priority_servers =
1846 pool.higher_priority_servers(primary_server.priority, &primary_server.id);
1847
1848 while pipeline.pending_count() + pipeline.in_flight_count() < pipe_depth as usize {
1850 let lock_t = Instant::now();
1851 let item = pool
1852 .work_queue
1853 .pop_workable(&primary_server.id, &higher_priority_servers);
1854 perf_queue_lock_us += lock_t.elapsed().as_micros() as u64;
1855 let Some(item) = item else {
1856 break;
1857 };
1858 let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
1860 let Some(ctx) = ctx else {
1861 continue;
1862 };
1863 if ctx.cancelled.load(Ordering::Relaxed) {
1864 continue;
1865 }
1866 if ctx.paused.load(Ordering::Relaxed) {
1867 pool.work_queue.push_back(item);
1868 break;
1869 }
1870 let tag = next_tag;
1871 next_tag += 1;
1872 pipeline.submit(item.message_id.clone(), tag);
1873 in_flight_items.insert(tag, item);
1874 }
1875
1876 if pipeline.is_empty() && in_flight_items.is_empty() {
1878 let Some((first_item, ctx)) = next_work_item(
1879 pool,
1880 &primary_server.id,
1881 &higher_priority_servers,
1882 worker_shutdown,
1883 )
1884 .await
1885 else {
1886 return WorkerExit::Exit;
1887 };
1888 let _ = ctx; let tag = next_tag;
1890 next_tag += 1;
1891 pipeline.submit(first_item.message_id.clone(), tag);
1892 in_flight_items.insert(tag, first_item);
1893 }
1894
1895 let flush_t = Instant::now();
1896 if let Err(e) = pipeline.flush_sends(conn).await {
1897 warn!(
1898 worker = %worker_id,
1899 server = %primary_server.name,
1900 host = %primary_server.host,
1901 error = %e,
1902 in_flight = in_flight_items.len(),
1903 "Pipeline send error — re-queuing all in-flight items"
1904 );
1905 requeue_all(&mut in_flight_items, &pool.work_queue);
1906 consecutive_errors += 1;
1907 if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
1908 warn!(
1909 worker = %worker_id,
1910 server = %primary_server.name,
1911 consecutive_errors,
1912 "Too many pipeline errors — worker reconnecting"
1913 );
1914 return WorkerExit::Reconnect;
1915 }
1916 tokio::time::sleep(RECONNECT_DELAY).await;
1917 return WorkerExit::Reconnect;
1918 }
1919 perf_flush_us += flush_t.elapsed().as_micros() as u64;
1920
1921 let recv_t = Instant::now();
1923 trace!(
1924 worker = %worker_id,
1925 in_flight = in_flight_items.len(),
1926 stall_timeout_secs = pool.stall_timeout.map(|d| d.as_secs()).unwrap_or(0),
1927 "Pipeline: awaiting response"
1928 );
1929 let result = if let Some(timeout) = pool.stall_timeout {
1930 match tokio::time::timeout(timeout, pipeline.receive_one(conn)).await {
1931 Ok(r) => r,
1932 Err(_) => {
1933 let elapsed_ms = recv_t.elapsed().as_millis();
1934 warn!(
1935 worker = %worker_id,
1936 server = %primary_server.name,
1937 elapsed_ms,
1938 in_flight = in_flight_items.len(),
1939 "Connection stalled — no response within {}s, reconnecting",
1940 timeout.as_secs()
1941 );
1942 requeue_all(&mut in_flight_items, &pool.work_queue);
1943 return WorkerExit::Reconnect;
1944 }
1945 }
1946 } else {
1947 pipeline.receive_one(conn).await
1948 };
1949 perf_receive_us += recv_t.elapsed().as_micros() as u64;
1950
1951 match result {
1952 Ok(Some(pipe_result)) => {
1953 let Some(mut item) = in_flight_items.remove(&pipe_result.request.tag) else {
1954 continue;
1955 };
1956 let ctx = pool.job_contexts.lock().get(&item.job_id).cloned();
1958 let Some(ctx) = ctx else {
1959 continue;
1960 };
1961 if ctx.cancelled.load(Ordering::Relaxed) {
1962 continue;
1963 }
1964
1965 match pipe_result.result {
1966 Ok(response) => {
1967 consecutive_errors = 0;
1968 let raw_data = response.data.unwrap_or_default();
1969 let yield_t = Instant::now();
1970 tokio::task::yield_now().await;
1971 perf_yield_us += yield_t.elapsed().as_micros() as u64;
1972 match decode_and_assemble(&item, &raw_data, &ctx.assembler) {
1973 Ok(process_result) => {
1974 perf_decode_us += process_result.decode_us;
1975 perf_assemble_us += process_result.assemble_us;
1976 perf_bytes += process_result.decoded_bytes;
1977 perf_articles += 1;
1978 ctx.total_decode_us
1979 .fetch_add(process_result.decode_us, Ordering::Relaxed);
1980 ctx.total_assemble_us
1981 .fetch_add(process_result.assemble_us, Ordering::Relaxed);
1982 ctx.total_articles_decoded.fetch_add(1, Ordering::Relaxed);
1983 if let Some(ref yname) = process_result.yenc_filename {
1984 ctx.yenc_names
1985 .lock()
1986 .entry(item.file_id.clone())
1987 .or_insert_with(|| crate::util::normalize_nfc(yname));
1988 }
1989 let bw_t = Instant::now();
1990 if let Some(n) =
1991 std::num::NonZeroU32::new(process_result.decoded_bytes as u32)
1992 {
1993 let _ = pool.bandwidth.acquire_download(n).await;
1994 }
1995 perf_bandwidth_us += bw_t.elapsed().as_micros() as u64;
1996 try_send_progress(
1997 &ctx.progress_tx,
1998 &item.job_id,
1999 ProgressUpdate::ArticleComplete {
2000 job_id: item.job_id.clone(),
2001 file_id: item.file_id.clone(),
2002 segment_number: item.segment_number,
2003 decoded_bytes: process_result.decoded_bytes,
2004 file_complete: process_result.file_complete,
2005 server_id: Some(primary_server.id.clone()),
2006 },
2007 );
2008 ctx.resolve_one();
2009 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2011
2012 if perf_last_log.elapsed() >= PERF_LOG_INTERVAL {
2013 let elapsed = perf_last_log.elapsed().as_secs_f64();
2014 let mbps = perf_bytes as f64 / elapsed / (1024.0 * 1024.0);
2015 info!(
2016 worker = %worker_id,
2017 articles = perf_articles,
2018 throughput_mbps = format!("{mbps:.1}"),
2019 recv_ms = perf_receive_us / 1000,
2020 decode_ms = perf_decode_us / 1000,
2021 assemble_ms = perf_assemble_us / 1000,
2022 queue_lock_ms = perf_queue_lock_us / 1000,
2023 flush_ms = perf_flush_us / 1000,
2024 yield_ms = perf_yield_us / 1000,
2025 bw_wait_ms = perf_bandwidth_us / 1000,
2026 "Worker perf summary"
2027 );
2028 perf_articles = 0;
2029 perf_bytes = 0;
2030 perf_queue_lock_us = 0;
2031 perf_receive_us = 0;
2032 perf_decode_us = 0;
2033 perf_assemble_us = 0;
2034 perf_bandwidth_us = 0;
2035 perf_yield_us = 0;
2036 perf_flush_us = 0;
2037 perf_last_log = Instant::now();
2038 }
2039 }
2040 Err(ArticleError::DecodeError(msg)) => {
2041 if handle_article_not_available(
2042 &mut item,
2043 primary_server,
2044 &pool.servers,
2045 &pool.server_health,
2046 &ctx,
2047 &pool.work_queue,
2048 crate::article_failure::ArticleFailureKind::DecodeError,
2049 &format!("Decode error: {msg}"),
2050 ) {
2051 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2052 }
2053 }
2054 Err(ArticleError::AssemblyError(msg)) => {
2055 error!(article = %item.message_id, "Assembly error: {msg}");
2056 try_send_progress(
2057 &ctx.progress_tx,
2058 &item.job_id,
2059 ProgressUpdate::ArticleFailed {
2060 job_id: item.job_id.clone(),
2061 file_id: item.file_id.clone(),
2062 segment_number: item.segment_number,
2063 failure:
2064 crate::article_failure::ArticleFailure::decode_error(
2065 &primary_server.id,
2066 format!("Assembly error: {msg}"),
2067 ),
2068 },
2069 );
2070 ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
2071 ctx.resolve_one();
2072 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2073 }
2074 Err(_) => {}
2075 }
2076 }
2077 Err(NntpError::ArticleNotFound(_)) => {
2078 if handle_article_not_available(
2079 &mut item,
2080 primary_server,
2081 &pool.servers,
2082 &pool.server_health,
2083 &ctx,
2084 &pool.work_queue,
2085 crate::article_failure::ArticleFailureKind::NotFound,
2086 "Article not found on any server",
2087 ) {
2088 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2089 }
2090 }
2091 Err(NntpError::Connection(_) | NntpError::Io(_)) => {
2092 warn!(
2093 worker = %worker_id,
2094 server = %primary_server.name,
2095 host = %primary_server.host,
2096 article = %item.message_id,
2097 in_flight = in_flight_items.len(),
2098 consecutive_errors,
2099 "Pipeline: connection lost during receive — re-queuing all"
2100 );
2101 pool.work_queue.push_front(item);
2102 requeue_all(&mut in_flight_items, &pool.work_queue);
2103 consecutive_errors += 1;
2104 if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
2105 return WorkerExit::Reconnect;
2106 }
2107 tokio::time::sleep(RECONNECT_DELAY).await;
2108 return WorkerExit::Reconnect;
2109 }
2110 Err(e) => {
2111 warn!(worker = %worker_id, article = %item.message_id, "Pipeline error: {e}");
2112 let kind = crate::article_failure::ArticleFailure::from_nntp(
2113 &e,
2114 &primary_server.id,
2115 )
2116 .kind;
2117 if handle_article_not_available(
2118 &mut item,
2119 primary_server,
2120 &pool.servers,
2121 &pool.server_health,
2122 &ctx,
2123 &pool.work_queue,
2124 kind,
2125 &format!("Pipeline error: {e}"),
2126 ) {
2127 last_progress.store(pool.elapsed_ms(), Ordering::Relaxed);
2128 }
2129 }
2130 }
2131 }
2132 Ok(None) => {
2133 }
2135 Err(e) => {
2136 warn!(
2137 worker = %worker_id,
2138 server = %primary_server.name,
2139 host = %primary_server.host,
2140 error = %e,
2141 in_flight = in_flight_items.len(),
2142 consecutive_errors,
2143 "Pipeline receive error"
2144 );
2145 requeue_all(&mut in_flight_items, &pool.work_queue);
2146 consecutive_errors += 1;
2147 if consecutive_errors > MAX_RECONNECT_ATTEMPTS {
2148 return WorkerExit::Reconnect;
2149 }
2150 tokio::time::sleep(RECONNECT_DELAY).await;
2151 return WorkerExit::Reconnect;
2152 }
2153 }
2154 }
2155}
2156
2157async fn connect_with_retry(
2162 conn: &mut NntpConnection,
2163 server: &ServerConfig,
2164 worker_id: &str,
2165 server_health: &ServerHealthMap,
2166 all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
2167) -> Result<(), String> {
2168 for attempt in 1..=MAX_RECONNECT_ATTEMPTS {
2169 {
2170 let health = server_health.lock();
2171 if let Some(h) = health.get(&server.id)
2172 && !h.is_available()
2173 {
2174 return Err(format!(
2175 "Server circuit-broken: {}",
2176 h.reason.as_deref().unwrap_or("unknown")
2177 ));
2178 }
2179 }
2180
2181 let current_config = all_servers
2182 .lock()
2183 .iter()
2184 .find(|s| s.id == server.id)
2185 .cloned()
2186 .unwrap_or_else(|| server.clone());
2187
2188 info!(
2189 worker = %worker_id,
2190 server = %current_config.name,
2191 host = %current_config.host,
2192 port = current_config.port,
2193 attempt,
2194 max_attempts = MAX_RECONNECT_ATTEMPTS,
2195 "Connect attempt starting"
2196 );
2197 match conn.connect(¤t_config).await {
2198 Ok(()) => {
2199 info!(
2200 worker = %worker_id,
2201 server = %current_config.name,
2202 host = %current_config.host,
2203 attempt,
2204 "Connect attempt succeeded"
2205 );
2206 server_health
2207 .lock()
2208 .entry(server.id.clone())
2209 .or_default()
2210 .record_success();
2211 return Ok(());
2212 }
2213 Err(e) => {
2214 let is_auth = matches!(e, NntpError::Auth(_) | NntpError::ServiceUnavailable(_));
2215 {
2216 let mut health = server_health.lock();
2217 let entry = health.entry(server.id.clone()).or_default();
2218 entry.record_failure(is_auth, &e.to_string());
2219 if !entry.is_available() {
2220 warn!(
2221 worker = %worker_id,
2222 server = %current_config.name,
2223 host = %current_config.host,
2224 error = %e,
2225 cooldown_secs = if is_auth { AUTH_FAILURE_COOLDOWN.as_secs() } else { TRANSIENT_FAILURE_COOLDOWN.as_secs() },
2226 "Server circuit-broken — stopping all connection attempts"
2227 );
2228 return Err(format!("Server circuit-broken: {e}"));
2229 }
2230 }
2231
2232 warn!(
2233 worker = %worker_id,
2234 server = %current_config.name,
2235 host = %current_config.host,
2236 attempt,
2237 max_attempts = MAX_RECONNECT_ATTEMPTS,
2238 error = %e,
2239 is_auth,
2240 "Connect attempt FAILED: {e}"
2241 );
2242
2243 if is_auth {
2244 return Err(format!("Auth/permission failure: {e}"));
2245 }
2246
2247 if attempt < MAX_RECONNECT_ATTEMPTS {
2248 info!(
2249 worker = %worker_id,
2250 server = %current_config.name,
2251 delay_secs = RECONNECT_DELAY.as_secs(),
2252 "Waiting before retry"
2253 );
2254 tokio::time::sleep(RECONNECT_DELAY).await;
2255 *conn = NntpConnection::new(worker_id.to_string());
2256 } else {
2257 return Err(format!(
2258 "All {MAX_RECONNECT_ATTEMPTS} connect attempts failed: {e}"
2259 ));
2260 }
2261 }
2262 }
2263 }
2264 Err("Connect retry loop exited unexpectedly".into())
2265}
2266
2267#[allow(clippy::too_many_arguments)]
2279fn handle_article_not_available(
2280 item: &mut WorkItem,
2281 primary_server: &ServerConfig,
2282 all_servers: &Arc<Mutex<Vec<ServerConfig>>>,
2283 server_health: &ServerHealthMap,
2284 ctx: &Arc<JobContext>,
2285 work_queue: &Arc<SharedWorkQueue>,
2286 kind: crate::article_failure::ArticleFailureKind,
2287 error_msg: &str,
2288) -> bool {
2289 item.tried_servers.push(primary_server.id.clone());
2290 item.tries_on_current = 0;
2291
2292 let all_tried = {
2293 let servers = all_servers.lock();
2294 let health = server_health.lock();
2295 servers.iter().filter(|s| s.enabled).all(|s| {
2296 item.tried_servers.contains(&s.id)
2297 || health.get(&s.id).is_some_and(|h| !h.is_available())
2298 })
2299 };
2300
2301 debug!(
2302 article = %item.message_id,
2303 server = %primary_server.id,
2304 kind = kind.as_str(),
2305 tried_count = item.tried_servers.len(),
2306 all_tried,
2307 "Article returned error on this server"
2308 );
2309
2310 if all_tried {
2312 warn!(article = %item.message_id, kind = kind.as_str(), "{error_msg}");
2313 let final_failure = if kind == crate::article_failure::ArticleFailureKind::DecodeError {
2316 crate::article_failure::ArticleFailure::decode_error(
2317 &primary_server.id,
2318 error_msg.to_string(),
2319 )
2320 } else {
2321 crate::article_failure::ArticleFailure::not_found_anywhere(&primary_server.id)
2322 };
2323 try_send_progress(
2324 &ctx.progress_tx,
2325 &item.job_id,
2326 ProgressUpdate::ArticleFailed {
2327 job_id: item.job_id.clone(),
2328 file_id: item.file_id.clone(),
2329 segment_number: item.segment_number,
2330 failure: final_failure,
2331 },
2332 );
2333 ctx.articles_failed.fetch_add(1, Ordering::Relaxed);
2334 ctx.resolve_one();
2335 true
2336 } else {
2337 work_queue.push_front(item.clone());
2345 false
2346 }
2347}
2348
2349fn requeue_all(in_flight: &mut HashMap<u64, WorkItem>, work_queue: &Arc<SharedWorkQueue>) {
2351 let items: Vec<WorkItem> = in_flight.drain().map(|(_, item)| item).collect();
2352 for item in items {
2353 work_queue.push_front(item);
2354 }
2355}
2356
2357fn par2_sort_key(filename: &str) -> u8 {
2360 let lower = filename.to_lowercase();
2361 if lower.ends_with(".par2") {
2362 if lower.contains(".vol") { 1 } else { 0 }
2363 } else {
2364 2
2365 }
2366}
2367
2368fn has_known_extension(name: &str) -> bool {
2369 let lower = name.to_lowercase();
2370 if let Some(dot_pos) = lower.rfind('.') {
2371 let ext = &lower[dot_pos + 1..];
2372 matches!(
2373 ext,
2374 "rar"
2375 | "r00"
2376 | "r01"
2377 | "r02"
2378 | "r03"
2379 | "r04"
2380 | "r05"
2381 | "zip"
2382 | "7z"
2383 | "gz"
2384 | "bz2"
2385 | "xz"
2386 | "tar"
2387 | "mkv"
2388 | "mp4"
2389 | "avi"
2390 | "wmv"
2391 | "ts"
2392 | "m4v"
2393 | "mov"
2394 | "mpg"
2395 | "mpeg"
2396 | "mp3"
2397 | "flac"
2398 | "ogg"
2399 | "m4a"
2400 | "aac"
2401 | "wav"
2402 | "srt"
2403 | "sub"
2404 | "idx"
2405 | "ass"
2406 | "ssa"
2407 | "sup"
2408 | "nfo"
2409 | "jpg"
2410 | "jpeg"
2411 | "png"
2412 | "gif"
2413 | "bmp"
2414 | "par2"
2415 | "001"
2416 | "002"
2417 | "003"
2418 | "004"
2419 | "005"
2420 )
2421 } else {
2422 false
2423 }
2424}
2425
2426pub(crate) fn build_job_submission(
2433 job: &NzbJob,
2434 progress_tx: mpsc::Sender<ProgressUpdate>,
2435) -> (Arc<JobContext>, Vec<WorkItem>) {
2436 let assembler = Arc::new(FileAssembler::new());
2437 for file in &job.files {
2438 let output_path = job.work_dir.join(&file.filename);
2439 if let Err(e) =
2440 assembler.register_file(&job.id, &file.id, output_path, file.articles.len() as u32)
2441 {
2442 error!(file = %file.filename, "Failed to register file for assembly: {e}");
2443 }
2444 }
2445
2446 let work_items: Vec<WorkItem> = job
2447 .files
2448 .iter()
2449 .flat_map(|file| {
2450 file.articles
2451 .iter()
2452 .enumerate()
2453 .filter(|(_, a)| !a.downloaded)
2454 .map(move |(idx, article)| WorkItem {
2455 job_id: job.id.clone(),
2456 file_id: file.id.clone(),
2457 filename: file.filename.clone(),
2458 message_id: article.message_id.clone(),
2459 segment_number: (idx as u32) + 1,
2460 tried_servers: Vec::new(),
2461 tries_on_current: 0,
2462 })
2463 })
2464 .collect();
2465
2466 let total_remaining = work_items.len();
2467 let ctx = Arc::new(JobContext::new(
2468 job,
2469 assembler,
2470 progress_tx,
2471 total_remaining,
2472 ));
2473 (ctx, work_items)
2474}
2475
2476async fn fetch_article_with_retry(
2481 conn: &mut NntpConnection,
2482 item: &WorkItem,
2483 assembler: &FileAssembler,
2484 _server: &ServerConfig,
2485 worker_id: &str,
2486) -> Result<ProcessResult, ArticleError> {
2487 let mut last_error = None;
2488
2489 for attempt in 1..=MAX_TRIES_PER_SERVER {
2490 let fetch_start = Instant::now();
2491 match conn.fetch_article(&item.message_id).await {
2492 Ok(response) => {
2493 let fetch_us = fetch_start.elapsed().as_micros();
2494 let raw_data = response.data.unwrap_or_default();
2495 debug!(
2496 worker = %worker_id,
2497 article = %item.message_id,
2498 raw_bytes = raw_data.len(),
2499 fetch_us,
2500 "NNTP fetch complete"
2501 );
2502 return decode_and_assemble(item, &raw_data, assembler);
2503 }
2504 Err(NntpError::ArticleNotFound(_)) => {
2505 debug!(
2506 worker = %worker_id,
2507 article = %item.message_id,
2508 "Article not found (430) — will try next server"
2509 );
2510 return Err(ArticleError::ArticleNotFound);
2511 }
2512 Err(e @ (NntpError::Connection(_) | NntpError::Io(_))) => {
2513 warn!(
2514 worker = %worker_id,
2515 article = %item.message_id,
2516 attempt,
2517 error = %e,
2518 conn_state = ?conn.state,
2519 "Connection/IO error during fetch — connection lost"
2520 );
2521 return Err(ArticleError::ConnectionLost(format!(
2522 "Connection error on attempt {attempt}: {e}"
2523 )));
2524 }
2525 Err(e @ NntpError::Tls(_)) => {
2526 warn!(
2527 worker = %worker_id,
2528 article = %item.message_id,
2529 attempt,
2530 error = %e,
2531 "TLS error during fetch — connection lost"
2532 );
2533 return Err(ArticleError::ConnectionLost(format!("TLS error: {e}")));
2534 }
2535 Err(e @ NntpError::ServiceUnavailable(_)) => {
2536 warn!(
2537 worker = %worker_id,
2538 article = %item.message_id,
2539 attempt,
2540 error = %e,
2541 "Service unavailable (502) during article fetch — likely rate limited or blocked"
2542 );
2543 return Err(ArticleError::ConnectionLost(format!(
2544 "Service unavailable: {e}"
2545 )));
2546 }
2547 Err(e @ NntpError::AuthRequired(_)) => {
2548 warn!(
2549 worker = %worker_id,
2550 article = %item.message_id,
2551 attempt,
2552 error = %e,
2553 "Auth required (480) during article fetch — session expired or rate limited"
2554 );
2555 return Err(ArticleError::ConnectionLost(format!(
2556 "Auth required mid-session: {e}"
2557 )));
2558 }
2559 Err(e) => {
2560 last_error = Some(format!("{e}"));
2561 if attempt < MAX_TRIES_PER_SERVER {
2562 warn!(
2563 worker = %worker_id,
2564 article = %item.message_id,
2565 attempt,
2566 max_tries = MAX_TRIES_PER_SERVER,
2567 error = %e,
2568 "Transient fetch error, retrying in 500ms"
2569 );
2570 tokio::time::sleep(Duration::from_millis(500)).await;
2571 } else {
2572 warn!(
2573 worker = %worker_id,
2574 article = %item.message_id,
2575 attempt,
2576 error = %e,
2577 "All retries on this server exhausted"
2578 );
2579 }
2580 }
2581 }
2582 }
2583
2584 Err(ArticleError::DecodeError(
2585 last_error.unwrap_or_else(|| "Unknown error after retries".into()),
2586 ))
2587}
2588
2589#[derive(Debug)]
2594struct ProcessResult {
2595 decoded_bytes: u64,
2596 file_complete: bool,
2597 yenc_filename: Option<String>,
2598 decode_us: u64,
2599 assemble_us: u64,
2600}
2601
2602#[derive(Debug, thiserror::Error)]
2603enum ArticleError {
2604 #[error("Article not found on server")]
2605 ArticleNotFound,
2606 #[error("Connection lost: {0}")]
2607 ConnectionLost(String),
2608 #[error("Decode error: {0}")]
2609 DecodeError(String),
2610 #[error("Assembly error: {0}")]
2611 AssemblyError(String),
2612}
2613
2614fn decode_and_assemble(
2615 item: &WorkItem,
2616 raw_data: &[u8],
2617 assembler: &FileAssembler,
2618) -> Result<ProcessResult, ArticleError> {
2619 let decode_start = Instant::now();
2620 let decoded = decode_yenc(raw_data).map_err(|e| {
2621 ArticleError::DecodeError(format!(
2622 "yEnc decode failed for {} seg {}: {e}",
2623 item.filename, item.segment_number
2624 ))
2625 })?;
2626 let decode_us = decode_start.elapsed().as_micros();
2627
2628 let yenc_filename = decoded.filename;
2629 let data_begin = decoded.part_begin.unwrap_or(0);
2630 let decoded_len = decoded.data.len() as u64;
2631
2632 let assemble_start = Instant::now();
2633 let file_complete = assembler
2634 .assemble_article(
2635 &item.job_id,
2636 &item.file_id,
2637 item.segment_number,
2638 data_begin,
2639 &decoded.data,
2640 )
2641 .map_err(|e| {
2642 ArticleError::AssemblyError(format!(
2643 "Assembly failed for {} seg {}: {e}",
2644 item.filename, item.segment_number
2645 ))
2646 })?;
2647 let assemble_us = assemble_start.elapsed().as_micros();
2648
2649 debug!(
2650 file = %item.filename,
2651 segment = item.segment_number,
2652 raw_bytes = raw_data.len(),
2653 decoded_bytes = decoded_len,
2654 decode_us,
2655 assemble_us,
2656 "Article decode+assemble timing"
2657 );
2658
2659 Ok(ProcessResult {
2660 decoded_bytes: decoded_len,
2661 file_complete,
2662 yenc_filename,
2663 decode_us: decode_us as u64,
2664 assemble_us: assemble_us as u64,
2665 })
2666}
2667
2668#[cfg(test)]
2673mod tests {
2674 use super::*;
2675
2676 #[test]
2677 fn has_known_extension_recognizes_archives() {
2678 assert!(has_known_extension("movie.rar"));
2679 assert!(has_known_extension("movie.part01.rar"));
2680 assert!(has_known_extension("file.zip"));
2681 assert!(has_known_extension("file.7z"));
2682 assert!(has_known_extension("archive.001"));
2683 }
2684
2685 #[test]
2686 fn has_known_extension_recognizes_video() {
2687 assert!(has_known_extension("episode.mkv"));
2688 assert!(has_known_extension("movie.mp4"));
2689 assert!(has_known_extension("video.avi"));
2690 assert!(has_known_extension("clip.ts"));
2691 }
2692
2693 #[test]
2694 fn has_known_extension_recognizes_par2() {
2695 assert!(has_known_extension("file.par2"));
2696 assert!(has_known_extension("file.vol00+01.par2"));
2697 assert!(has_known_extension("file.vol015-031.par2"));
2698 }
2699
2700 #[test]
2701 fn has_known_extension_recognizes_misc() {
2702 assert!(has_known_extension("info.nfo"));
2703 assert!(has_known_extension("sub.srt"));
2704 assert!(has_known_extension("cover.jpg"));
2705 assert!(has_known_extension("song.flac"));
2706 }
2707
2708 #[test]
2709 fn has_known_extension_rejects_obfuscated_hashes() {
2710 assert!(!has_known_extension("9b6a324d7560b87091685020371ba869"));
2711 assert!(!has_known_extension("1fG1GP7L2263LHXH213HTNIxZsX7l0cv44BZ"));
2712 assert!(!has_known_extension("DfKUx3bl7L6PSo6276WSaXSZ7"));
2713 assert!(!has_known_extension("Q77O1ZxL237vc241z77hFoLBxl"));
2714 }
2715
2716 #[test]
2717 fn has_known_extension_rejects_unknown_extensions() {
2718 assert!(!has_known_extension("file.xyz123"));
2719 assert!(!has_known_extension("noext"));
2720 assert!(!has_known_extension(""));
2721 }
2722
2723 #[test]
2724 fn has_known_extension_case_insensitive() {
2725 assert!(has_known_extension("file.RAR"));
2726 assert!(has_known_extension("file.MKV"));
2727 assert!(has_known_extension("file.Par2"));
2728 assert!(has_known_extension("file.MP4"));
2729 }
2730
2731 fn make_item(job_id: &str, msg_id: &str, filename: &str) -> WorkItem {
2732 WorkItem {
2733 job_id: job_id.to_string(),
2734 file_id: "f1".to_string(),
2735 filename: filename.to_string(),
2736 message_id: msg_id.to_string(),
2737 segment_number: 1,
2738 tried_servers: Vec::new(),
2739 tries_on_current: 0,
2740 }
2741 }
2742
2743 #[test]
2744 fn shared_queue_par2_first() {
2745 let q = SharedWorkQueue::new();
2746 q.submit_items(vec![
2747 make_item("j1", "a", "movie.rar"),
2748 make_item("j1", "b", "movie.par2"),
2749 make_item("j1", "c", "movie.vol00+01.par2"),
2750 make_item("j1", "d", "movie.r00"),
2751 ]);
2752 let first = q.pop_workable("srv1", &[]).unwrap();
2753 assert_eq!(first.filename, "movie.par2", "index file first");
2754 let second = q.pop_workable("srv1", &[]).unwrap();
2755 assert_eq!(second.filename, "movie.vol00+01.par2", "vol file second");
2756 }
2757
2758 #[test]
2759 fn shared_queue_skips_tried_servers() {
2760 let q = SharedWorkQueue::new();
2761 let mut item = make_item("j1", "a", "file.rar");
2762 item.tried_servers.push("srv1".to_string());
2763 q.submit_items(vec![item, make_item("j1", "b", "other.rar")]);
2764
2765 let picked = q.pop_workable("srv1", &[]).unwrap();
2767 assert_eq!(picked.message_id, "b");
2768 }
2769
2770 #[test]
2771 fn pop_workable_respects_priority() {
2772 let q = SharedWorkQueue::new();
2776 q.submit_items(vec![make_item("j1", "a", "file.rar")]);
2777
2778 let higher = vec!["srv_primary".to_string()];
2779 assert!(q.pop_workable("srv_backup", &higher).is_none());
2780
2781 let item = q.pop_workable("srv_primary", &[]).unwrap();
2783 assert_eq!(item.message_id, "a");
2784 }
2785
2786 #[test]
2787 fn pop_workable_allows_backup_after_primary_tried() {
2788 let q = SharedWorkQueue::new();
2791 let mut item = make_item("j1", "a", "file.rar");
2792 item.tried_servers.push("srv_primary".to_string());
2793 q.submit_items(vec![item]);
2794
2795 let higher = vec!["srv_primary".to_string()];
2796 let picked = q.pop_workable("srv_backup", &higher).unwrap();
2797 assert_eq!(picked.message_id, "a");
2798 }
2799
2800 #[test]
2801 fn pop_workable_ignores_circuit_broken_higher_server() {
2802 let q = SharedWorkQueue::new();
2806 q.submit_items(vec![make_item("j1", "a", "file.rar")]);
2807
2808 let higher: Vec<String> = vec![]; let item = q.pop_workable("srv_backup", &higher).unwrap();
2810 assert_eq!(item.message_id, "a");
2811 }
2812
2813 #[test]
2814 fn workable_count_for_respects_priority() {
2815 let q = SharedWorkQueue::new();
2816 q.submit_items(vec![
2817 make_item("j1", "a", "a.rar"),
2818 make_item("j1", "b", "b.rar"),
2819 ]);
2820
2821 let higher = vec!["srv_primary".to_string()];
2823 let (workable, total) = q.workable_count_for("srv_backup", &higher);
2824 assert_eq!(workable, 0);
2825 assert_eq!(total, 2);
2826
2827 let (workable, total) = q.workable_count_for("srv_primary", &[]);
2829 assert_eq!(workable, 2);
2830 assert_eq!(total, 2);
2831 }
2832
2833 #[test]
2834 fn shared_queue_drain_job_removes_only_target() {
2835 let q = SharedWorkQueue::new();
2836 q.submit_items(vec![
2837 make_item("j1", "a", "a.rar"),
2838 make_item("j2", "b", "b.rar"),
2839 make_item("j1", "c", "c.rar"),
2840 ]);
2841 let drained = q.drain_job("j1");
2842 assert_eq!(drained.len(), 2);
2843 assert_eq!(q.len(), 1);
2844 let remaining = q.pop_workable("srv1", &[]).unwrap();
2845 assert_eq!(remaining.job_id, "j2");
2846 }
2847
2848 #[test]
2853 fn pop_workable_alternates_between_jobs_on_single_server() {
2854 let q = SharedWorkQueue::new();
2857 q.submit_items(vec![
2858 make_item("j1", "a1", "a1.rar"),
2859 make_item("j1", "a2", "a2.rar"),
2860 make_item("j1", "a3", "a3.rar"),
2861 make_item("j2", "b1", "b1.rar"),
2862 make_item("j2", "b2", "b2.rar"),
2863 make_item("j2", "b3", "b3.rar"),
2864 ]);
2865 let mut order: Vec<String> = Vec::new();
2867 while let Some(item) = q.pop_workable("srv1", &[]) {
2868 order.push(item.job_id);
2869 }
2870 assert_eq!(
2871 order,
2872 vec!["j1", "j2", "j1", "j2", "j1", "j2"],
2873 "single-server pops must alternate across jobs, not drain one"
2874 );
2875 }
2876
2877 #[test]
2878 fn pop_workable_falls_back_when_only_same_job_is_available() {
2879 let q = SharedWorkQueue::new();
2882 q.submit_items(vec![
2883 make_item("j1", "a1", "a.rar"),
2884 make_item("j1", "a2", "b.rar"),
2885 ]);
2886 let first = q.pop_workable("srv1", &[]).unwrap();
2887 assert_eq!(first.job_id, "j1");
2888 let second = q.pop_workable("srv1", &[]).unwrap();
2889 assert_eq!(
2890 second.job_id, "j1",
2891 "falls back to same job when no sibling"
2892 );
2893 }
2894
2895 #[test]
2896 fn per_server_cursors_are_independent() {
2897 let q = SharedWorkQueue::new();
2900 q.submit_items(vec![
2901 make_item("j1", "a1", "a1.rar"),
2902 make_item("j2", "b1", "b1.rar"),
2903 make_item("j1", "a2", "a2.rar"),
2904 make_item("j2", "b2", "b2.rar"),
2905 ]);
2906 let x1 = q.pop_workable("srv_x", &[]).unwrap();
2908 assert_eq!(x1.job_id, "j1");
2909 let x2 = q.pop_workable("srv_x", &[]).unwrap();
2911 assert_eq!(x2.job_id, "j2");
2912 let y1 = q.pop_workable("srv_y", &[]).unwrap();
2915 assert_eq!(
2916 y1.job_id, "j1",
2917 "srv_y has its own cursor state; srv_x's j2 cursor must not leak"
2918 );
2919 }
2920
2921 #[test]
2922 fn fairness_respects_tried_servers_and_priority() {
2923 let q = SharedWorkQueue::new();
2927 let mut j2_tried = make_item("j2", "b1", "b1.rar");
2928 j2_tried.tried_servers.push("srv1".to_string());
2929 q.submit_items(vec![make_item("j1", "a1", "a1.rar"), j2_tried]);
2930 let first = q.pop_workable("srv1", &[]).unwrap();
2932 assert_eq!(first.job_id, "j1");
2933 assert!(
2936 q.pop_workable("srv1", &[]).is_none(),
2937 "must not serve an ineligible item just to satisfy fairness"
2938 );
2939 }
2940
2941 #[test]
2942 fn drained_jobs_clear_last_served_cursor() {
2943 let q = SharedWorkQueue::new();
2947 q.submit_items(vec![
2948 make_item("j1", "a1", "a1.rar"),
2949 make_item("j2", "b1", "b1.rar"),
2950 ]);
2951 let _ = q.pop_workable("srv1", &[]).unwrap(); q.drain_job("j1");
2953 let pick = q.pop_workable("srv1", &[]).unwrap();
2956 assert_eq!(pick.job_id, "j2");
2957 }
2958
2959 #[tokio::test]
2964 async fn connection_tracker_acquire_releases_slot_on_drop() {
2965 let t = ConnectionTracker::new();
2966 t.set_limit("srv1", "Server 1", 2);
2967 let s1 = t.acquire("srv1").await.unwrap();
2968 let s2 = t.acquire("srv1").await.unwrap();
2969 assert_eq!(t.total(), 2);
2970 drop(s1);
2972 assert_eq!(t.total(), 1);
2973 drop(s2);
2974 assert_eq!(t.total(), 0);
2975 }
2976
2977 #[tokio::test]
2978 async fn connection_tracker_blocks_at_limit() {
2979 let t = Arc::new(ConnectionTracker::new());
2980 t.set_limit("srv1", "Server 1", 1);
2981 let _held = t.acquire("srv1").await.unwrap();
2982
2983 let t2 = Arc::clone(&t);
2986 let res = tokio::time::timeout(Duration::from_millis(150), async move {
2987 t2.acquire("srv1").await
2988 })
2989 .await;
2990 assert!(
2991 res.is_err(),
2992 "second acquire should block while limit is reached"
2993 );
2994 }
2995
2996 #[tokio::test]
2997 async fn connection_tracker_grow_in_place_lets_more_acquire() {
2998 let t = ConnectionTracker::new();
2999 t.set_limit("srv1", "Server 1", 2);
3000 let _a = t.acquire("srv1").await.unwrap();
3001 let _b = t.acquire("srv1").await.unwrap();
3002 t.set_limit("srv1", "Server 1", 4);
3005 let _c = t.acquire("srv1").await.unwrap();
3006 let _d = t.acquire("srv1").await.unwrap();
3007 assert_eq!(t.total(), 4);
3008 }
3009
3010 #[tokio::test]
3011 async fn connection_tracker_shrink_marks_old_slots_stale() {
3012 let t = ConnectionTracker::new();
3013 t.set_limit("srv1", "Server 1", 4);
3014 let s = t.acquire("srv1").await.unwrap();
3015 assert!(t.slot_is_current(&s));
3016
3017 t.set_limit("srv1", "Server 1", 1);
3019 assert!(
3020 !t.slot_is_current(&s),
3021 "after shrink, the previously-acquired slot must be marked stale"
3022 );
3023
3024 assert_eq!(t.total(), 0);
3028
3029 let new_slot = t.acquire("srv1").await.unwrap();
3031 assert!(t.slot_is_current(&new_slot));
3032 assert_eq!(t.total(), 1);
3033 }
3034
3035 #[tokio::test]
3036 async fn connection_tracker_remove_server_marks_slot_stale() {
3037 let t = ConnectionTracker::new();
3038 t.set_limit("srv1", "Server 1", 2);
3039 let s = t.acquire("srv1").await.unwrap();
3040 assert!(t.slot_is_current(&s));
3041
3042 t.remove_server("srv1");
3043 assert!(
3044 !t.slot_is_current(&s),
3045 "after remove_server, the slot must be marked stale"
3046 );
3047 assert_eq!(t.total(), 0);
3048 }
3049
3050 #[tokio::test]
3051 async fn connection_tracker_snapshot_reflects_active_count() {
3052 let t = ConnectionTracker::new();
3053 t.set_limit("srv1", "Server 1", 3);
3054 t.set_limit("srv2", "Server 2", 5);
3055
3056 let _a1 = t.acquire("srv1").await.unwrap();
3057 let _a2 = t.acquire("srv1").await.unwrap();
3058 let _b1 = t.acquire("srv2").await.unwrap();
3059
3060 let mut snap = t.snapshot();
3061 snap.sort_by(|a, b| a.0.cmp(&b.0));
3062 assert_eq!(snap.len(), 2);
3063 assert_eq!(snap[0], ("srv1".into(), 2, 3));
3064 assert_eq!(snap[1], ("srv2".into(), 1, 5));
3065 }
3066}