nzb-web 0.4.14

Usenet download engine: queue management, download orchestration, and background services
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
//! Integration tests for server-priority dispatch.
//!
//! These tests spin up multiple `MockNntpServer` instances at different
//! priorities, submit a real NZB job, and assert that articles flow through
//! the servers the way the priority model prescribes:
//!
//! - Fresh articles ONLY go to the highest-priority healthy server.
//! - When a primary returns 430 (ArticleNotFound), the article cascades to
//!   the next-priority server (matches SABnzbd `search_new_server`).
//! - When a primary is circuit-broken / unreachable, backups take over
//!   immediately (no 60 s idle stall).
//! - Backup workers sitting idle while the primary serves everything are NOT
//!   evicted by the idle watchdog (Bug 2 fix).
//! - Same-priority peers share work.
//!
//! The MockNntpServer has no per-server request counter, so "this server
//! served article X" is inferred from `MockConfig.articles` — only the server
//! that actually holds article X's yEnc body can succeed on it. Combine that
//! with `article_response_overrides: {msg_id → 430}` to force selective
//! failover without changing the job's article count.

mod harness;

use std::collections::HashMap;
use std::time::Duration;

use harness::nzb_fixture::NzbFixture;
use harness::{HarnessBuilder, ServerProfile, yenc_articles};
use nzb_nntp::testutil::MockConfig;
use nzb_web::nzb_core::models::JobStatus;

// ---------------------------------------------------------------------------
// Fixture helpers — return fully owned data so tests don't fight the borrow
// checker over fixture-internal references.
// ---------------------------------------------------------------------------

/// Builds an N-segment single-file NZB. Returns the owned XML bytes, a map
/// of yEnc-encoded article bodies keyed by message-id (ready to splice into
/// any subset for a per-server MockConfig), and the ordered list of message
/// ids so tests can reference individual articles (e.g. to 430-override).
fn make_fixture(prefix: &str, n: usize) -> (Vec<u8>, HashMap<String, Vec<u8>>, Vec<String>) {
    let mids: Vec<String> = (1..=n).map(|i| format!("{prefix}-{i}@test")).collect();
    let bodies: Vec<Vec<u8>> = (1..=n).map(|i| format!("body-{i}").into_bytes()).collect();
    let segs: Vec<(&str, &[u8])> = mids
        .iter()
        .zip(bodies.iter())
        .map(|(m, b)| (m.as_str(), b.as_slice()))
        .collect();
    let file_name = format!("{prefix}.bin");
    let built = NzbFixture::new(prefix).add_file(&file_name, &segs).build();
    let triples: Vec<(&str, &[u8], &str)> = built
        .articles
        .iter()
        .map(|(m, b, f)| (*m, *b, f.as_str()))
        .collect();
    let yenc = yenc_articles(&triples);
    (built.xml, yenc, mids)
}

/// Select a subset of `all_yenc` into a fresh map — lets one server serve
/// some message-ids while another serves the rest.
fn subset(all_yenc: &HashMap<String, Vec<u8>>, which: &[&str]) -> HashMap<String, Vec<u8>> {
    which
        .iter()
        .filter_map(|m| all_yenc.get(*m).map(|v| (m.to_string(), v.clone())))
        .collect()
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

/// The primary serves every article. The backup's article map is EMPTY —
/// meaning if any article were dispatched to the backup it would 430 and
/// ultimately fail the job. We assert the job succeeds 100%, which proves
/// the priority gate actually kept the backup out of the dispatch path.
///
/// Secondarily, we confirm the Bug 2 idle-eviction fix: with `max_worker_idle`
/// set short (3 s) and the test running ~6 s, backup workers would previously
/// be evicted at least once. Assert `worker_eviction_count()` stays at 0.
#[tokio::test]
async fn backup_stays_idle_when_primary_serves_all() {
    let (xml, yenc, _mids) = make_fixture("primserve", 6);

    let primary = ServerProfile::start(
        "primary",
        MockConfig {
            articles: yenc.clone(),
            ..Default::default()
        },
        3,
    )
    .await
    .with_priority(0);

    // Backup has NOTHING. Any article sent here would 430 → job fails.
    let backup = ServerProfile::start(
        "backup",
        MockConfig {
            articles: HashMap::new(),
            ..Default::default()
        },
        3,
    )
    .await
    .with_priority(1);

    let engine = HarnessBuilder::new()
        .with_server(primary)
        .with_server(backup)
        .article_timeout(10)
        .build();

    // Aggressive idle threshold: without the Bug 2 fix, backup workers
    // would be evicted at 3 s, respawn, evict again, etc.
    engine
        .queue_manager
        .set_max_worker_idle(Duration::from_secs(3));

    let job_id = engine.submit_nzb_xml("primserve", xml).expect("submit nzb");

    // All 6 articles must download cleanly (only primary has them).
    let done = engine
        .wait_for(Duration::from_secs(20), |snap| {
            snap.job(&job_id)
                .map(|j| j.articles_downloaded == 6)
                .unwrap_or(false)
        })
        .await;

    let view = engine.job(&job_id).expect("job present");
    assert!(
        done,
        "primary did not complete the job: status={} downloaded={} failed={}",
        view.status, view.articles_downloaded, view.articles_failed
    );
    assert_eq!(
        view.articles_failed, 0,
        "no articles should have failed — backup should have stayed out of dispatch"
    );

    // Bug 2: the backup's 3 workers sit idle the whole time (no workable
    // items since primary handled everything). They must NOT be evicted.
    let evictions = engine.queue_manager.worker_eviction_count();
    assert_eq!(
        evictions, 0,
        "idle backup workers were evicted ({evictions}x) — Bug 2 regressed"
    );
}

/// Primary is launched with `service_unavailable: true` so it sends 502 on
/// connect. The backup is healthy and has every article. Before the fix,
/// backup workers would never get work until the primary was tried per-article
/// and circuit-broken. After the fix, the circuit-broken primary is excluded
/// from `higher_priority_servers` and the backup takes over immediately.
#[tokio::test]
async fn backup_takes_over_when_primary_unreachable() {
    let (xml, yenc, _mids) = make_fixture("deadprim", 4);

    let primary = ServerProfile::start(
        "dead-primary",
        MockConfig {
            service_unavailable: true,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(0);

    let backup = ServerProfile::start(
        "backup",
        MockConfig {
            articles: yenc,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(1);

    let engine = HarnessBuilder::new()
        .with_server(primary)
        .with_server(backup)
        .article_timeout(5)
        .build();

    let job_id = engine.submit_nzb_xml("deadprim", xml).expect("submit nzb");

    // Backup should pick up all 4 articles — primary is 502 on every connect.
    let done = engine
        .wait_for(Duration::from_secs(25), |snap| {
            snap.job(&job_id)
                .map(|j| j.articles_downloaded == 4)
                .unwrap_or(false)
        })
        .await;

    let view = engine.job(&job_id).expect("job present");
    assert!(
        done,
        "backup did not take over from unreachable primary: status={} downloaded={} failed={}",
        view.status, view.articles_downloaded, view.articles_failed
    );
}

/// Primary returns 430 for half the articles; has the other half. Backup has
/// every article. Assert the job completes 100% — primary serves what it can,
/// backup picks up the failures.
#[tokio::test]
async fn backup_picks_up_primary_430_failures() {
    let (xml, yenc, mids) = make_fixture("mixed", 4);

    // Primary fails on the first two, serves the last two.
    let mut overrides = HashMap::new();
    overrides.insert(mids[0].clone(), 430u16);
    overrides.insert(mids[1].clone(), 430u16);
    let primary_articles = subset(&yenc, &[mids[2].as_str(), mids[3].as_str()]);

    let primary = ServerProfile::start(
        "primary",
        MockConfig {
            articles: primary_articles,
            article_response_overrides: overrides,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(0);

    let backup = ServerProfile::start(
        "backup",
        MockConfig {
            articles: yenc,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(1);

    let engine = HarnessBuilder::new()
        .with_server(primary)
        .with_server(backup)
        .article_timeout(10)
        .build();

    let job_id = engine.submit_nzb_xml("mixed", xml).expect("submit nzb");

    let done = engine
        .wait_for(Duration::from_secs(20), |snap| {
            snap.job(&job_id)
                .map(|j| j.articles_downloaded == 4)
                .unwrap_or(false)
        })
        .await;

    let view = engine.job(&job_id).expect("job present");
    assert!(
        done,
        "job did not complete via cascade: status={} downloaded={} failed={}",
        view.status, view.articles_downloaded, view.articles_failed
    );
    assert_eq!(
        view.articles_failed, 0,
        "expected 0 failures after failover"
    );
}

/// Three tiers (priority 0, 1, 2). Priority 0 and 1 both 430 on all articles.
/// Priority 2 has everything. Verifies multi-tier cascade — priority 2 only
/// gets items after both 0 and 1 have tried.
#[tokio::test]
async fn three_tier_priority_cascade() {
    let (xml, yenc, mids) = make_fixture("tier", 3);

    let all_430: HashMap<String, u16> = mids.iter().map(|m| (m.clone(), 430u16)).collect();

    let tier0 = ServerProfile::start(
        "tier0",
        MockConfig {
            article_response_overrides: all_430.clone(),
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(0);

    let tier1 = ServerProfile::start(
        "tier1",
        MockConfig {
            article_response_overrides: all_430,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(1);

    let tier2 = ServerProfile::start(
        "tier2",
        MockConfig {
            articles: yenc,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(2);

    let engine = HarnessBuilder::new()
        .with_server(tier0)
        .with_server(tier1)
        .with_server(tier2)
        .article_timeout(10)
        .build();

    let job_id = engine.submit_nzb_xml("tier", xml).expect("submit nzb");

    let done = engine
        .wait_for(Duration::from_secs(25), |snap| {
            snap.job(&job_id)
                .map(|j| j.articles_downloaded == 3)
                .unwrap_or(false)
        })
        .await;

    let view = engine.job(&job_id).expect("job present");
    assert!(
        done,
        "tier-2 did not complete cascade: status={} downloaded={} failed={}",
        view.status, view.articles_downloaded, view.articles_failed
    );
}

/// Two servers both at priority 0. Neither gets a priority-gate block; both
/// can compete for every fresh article. We just assert the job completes —
/// this is the regression-guard case (priority 0 = priority 0 means no
/// starvation, same as the pre-patch baseline).
#[tokio::test]
async fn same_priority_peers_both_serve_job() {
    let (xml, yenc, _mids) = make_fixture("same", 6);

    let a = ServerProfile::start(
        "peer-a",
        MockConfig {
            articles: yenc.clone(),
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(0);

    let b = ServerProfile::start(
        "peer-b",
        MockConfig {
            articles: yenc,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(0);

    let engine = HarnessBuilder::new()
        .with_server(a)
        .with_server(b)
        .article_timeout(10)
        .build();

    let job_id = engine.submit_nzb_xml("same", xml).expect("submit nzb");

    let done = engine
        .wait_for(Duration::from_secs(15), |snap| {
            snap.job(&job_id)
                .map(|j| j.articles_downloaded == 6)
                .unwrap_or(false)
        })
        .await;

    let view = engine.job(&job_id).expect("job present");
    assert!(
        done,
        "same-priority peers didn't complete job: status={} downloaded={} failed={}",
        view.status, view.articles_downloaded, view.articles_failed
    );
}

/// Sanity: job must reach a terminal state (Completed/Failed) after
/// submission when the only priority-0 server is unreachable. Guards against
/// the failure mode where backup workers get starvation-logged and never
/// retire. With `service_unavailable: true`, primary 502s on every connect
/// → primary drops out of higher_priority_servers → backup picks up all work
/// → job must complete.
#[tokio::test]
async fn unreachable_primary_does_not_hang_backup() {
    let (xml, yenc, _mids) = make_fixture("unreach", 2);

    // Primary: immediate 502 on every connect.
    let primary = ServerProfile::start(
        "dead",
        MockConfig {
            service_unavailable: true,
            ..Default::default()
        },
        1,
    )
    .await
    .with_priority(0);

    // Backup: has everything.
    let backup = ServerProfile::start(
        "live",
        MockConfig {
            articles: yenc,
            ..Default::default()
        },
        2,
    )
    .await
    .with_priority(1);

    let engine = HarnessBuilder::new()
        .with_server(primary)
        .with_server(backup)
        .article_timeout(5)
        .build();

    let job_id = engine.submit_nzb_xml("unreach", xml).expect("submit nzb");

    // Must reach a terminal state — never hang in Downloading forever.
    let settled = engine
        .wait_for_status(
            &job_id,
            Duration::from_secs(25),
            &[JobStatus::Completed, JobStatus::Failed],
        )
        .await;

    let view = engine.job(&job_id).expect("job present");
    assert!(
        settled,
        "job never left Downloading when primary was unreachable: status={} downloaded={} failed={}",
        view.status, view.articles_downloaded, view.articles_failed
    );
}