aviso 2.0.0

Core client library for aviso-server, ECMWF's notification service.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

//! Unit tests for the trigger dispatcher: retry loop, backoff
//! injection, terminal-vs-retryable classifier (`fail_fast`),
//! parent-drop cancellation, and the `cfg(test)` `Test*` variant
//! dispatch paths.

#![allow(
    clippy::unwrap_used,
    clippy::panic,
    reason = "test code: unwrap on channel send and panic on unexpected variant are the standard test diagnostics"
)]

use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::time::Duration;

use tokio::sync::{oneshot, watch};

use super::dispatch_triggers_with_backoff;
use crate::Notification;
use crate::watch::TriggerError;
#[cfg(unix)]
use crate::watch::trigger::command::build_command_config;
use crate::watch::trigger::kind::{TestEventual, TriggerKind};
use crate::watch::trigger::{DispatchOutcome, Trigger, TriggerState};

fn make_notification() -> Notification {
    Notification {
        event_type: "mars".to_string(),
        sequence: 1,
        identifier: BTreeMap::new(),
        payload: serde_json::Value::Null,
        cloudevent: None,
    }
}

async fn run_once<F>(
    triggers: &[Trigger],
    states: &mut [TriggerState],
    backoff: F,
) -> Result<(), DispatchOutcome>
where
    F: Fn(u32) -> Duration,
{
    let (_drop_tx, mut parent_rx) = watch::channel(false);
    let (_cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
    let n = make_notification();
    let http = test_http_client();
    dispatch_triggers_with_backoff(
        triggers,
        states,
        &n,
        &mut parent_rx,
        &mut cancel_rx,
        &http,
        backoff,
    )
    .await
}

/// Build a fresh reqwest client for dispatcher tests. The test
/// triggers (`TestFailing`, `TestFailOnCall`) and the production
/// echo, log, and command dispatchers do not use the client; the
/// HTTP-based triggers (webhook, teams, post) do, and those tests
/// live alongside each dispatcher (e.g. `webhook/tests.rs`)
/// against a real wiremock server. The dispatcher loop just
/// threads `&Client` through.
fn test_http_client() -> reqwest::Client {
    reqwest::Client::new()
}

#[tokio::test]
async fn retries_exhausted_returns_required_failed_with_io_source() {
    let (trigger, counter) = Trigger::test_failing(5, TestEventual::Succeed, 2, true);
    let mut states = vec![TriggerState::new()];
    let result = run_once(&[trigger], &mut states, |_| Duration::from_millis(1)).await;
    match result {
        Err(DispatchOutcome::RequiredFailed { source, .. }) => {
            assert!(source.to_string().starts_with("io:"));
        }
        other => panic!("expected RequiredFailed, got {other:?}"),
    }
    assert_eq!(counter.load(Ordering::Acquire), 2);
}

#[tokio::test]
async fn retries_zero_fails_on_first_attempt() {
    let (trigger, counter) = Trigger::test_failing(1, TestEventual::Succeed, 0, true);
    let mut states = vec![TriggerState::new()];
    let result = run_once(&[trigger], &mut states, |_| Duration::from_millis(1)).await;
    assert!(matches!(
        result,
        Err(DispatchOutcome::RequiredFailed { .. })
    ));
    assert_eq!(counter.load(Ordering::Acquire), 0);
}

#[tokio::test(start_paused = true)]
async fn success_after_retry_advances_through_backoff_and_completes() {
    let (trigger, counter) = Trigger::test_failing(2, TestEventual::Succeed, 3, true);
    let mut states = vec![TriggerState::new()];
    let (_drop_tx, mut parent_rx) = watch::channel(false);
    let (_cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
    let n = make_notification();
    let http = test_http_client();
    let fut = dispatch_triggers_with_backoff(
        std::slice::from_ref(&trigger),
        &mut states,
        &n,
        &mut parent_rx,
        &mut cancel_rx,
        &http,
        |_| Duration::from_millis(100),
    );
    tokio::pin!(fut);

    for _ in 0..2 {
        tokio::task::yield_now().await;
        tokio::time::advance(Duration::from_millis(110)).await;
    }
    let result = fut.await;
    assert!(matches!(result, Ok(())), "got: {result:?}");
    assert_eq!(counter.load(Ordering::Acquire), 0);
}

#[tokio::test]
async fn optional_trigger_failure_logs_warn_does_not_short_circuit() {
    let (failing_trigger, _) = Trigger::test_failing(5, TestEventual::Fail, 0, false);
    let success_trigger = Trigger::echo();
    let mut states = vec![TriggerState::new(), TriggerState::new()];
    let result = run_once(&[failing_trigger, success_trigger], &mut states, |_| {
        Duration::from_millis(1)
    })
    .await;
    assert!(matches!(result, Ok(())));
}

#[tokio::test(start_paused = true)]
async fn parent_cancel_during_retry_backoff_returns_cancelled() {
    let (trigger, _counter) = Trigger::test_failing(1, TestEventual::Succeed, 3, true);
    let mut states = vec![TriggerState::new()];
    let (drop_tx, mut parent_rx) = watch::channel(false);
    let (_cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
    let n = make_notification();
    let http = test_http_client();
    let fut = dispatch_triggers_with_backoff(
        std::slice::from_ref(&trigger),
        &mut states,
        &n,
        &mut parent_rx,
        &mut cancel_rx,
        &http,
        |_| Duration::from_secs(60),
    );
    tokio::pin!(fut);

    tokio::task::yield_now().await;
    tokio::task::yield_now().await;
    drop_tx.send(true).unwrap();
    let result = fut.await;
    assert!(matches!(result, Err(DispatchOutcome::Cancelled)));
}

#[cfg(unix)]
#[tokio::test]
async fn command_terminal_failure_short_circuits_retries_with_fail_fast_true() {
    let cfg = build_command_config("exit 7");
    let trigger = Trigger {
        kind: TriggerKind::Command(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: true,
    };
    let mut states = vec![TriggerState::new()];
    let started = std::time::Instant::now();
    let result = run_once(&[trigger], &mut states, |_| Duration::from_secs(60)).await;
    let elapsed = started.elapsed();
    assert!(
        matches!(
            result,
            Err(DispatchOutcome::RequiredFailed {
                source: TriggerError::Command { exit_code: 7, .. },
                ..
            })
        ),
        "got: {result:?}"
    );
    assert!(
        elapsed < Duration::from_secs(5),
        "fail_fast=true must short-circuit retries; took {elapsed:?}"
    );
}

#[cfg(unix)]
#[tokio::test]
async fn command_template_error_is_terminal_with_fail_fast_true() {
    let cfg = build_command_config("hello {{ notification.event_type");
    let trigger = Trigger {
        kind: TriggerKind::Command(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: true,
    };
    let mut states = vec![TriggerState::new()];
    let started = std::time::Instant::now();
    let result = run_once(&[trigger], &mut states, |_| Duration::from_secs(60)).await;
    let elapsed = started.elapsed();
    assert!(
        matches!(
            result,
            Err(DispatchOutcome::RequiredFailed {
                source: TriggerError::Template { .. },
                ..
            })
        ),
        "got: {result:?}"
    );
    assert!(
        elapsed < Duration::from_secs(5),
        "template errors are deterministic; must short-circuit"
    );
}

#[cfg(unix)]
#[tokio::test(start_paused = true)]
async fn command_nonzero_exit_retries_when_fail_fast_false() {
    let cfg = build_command_config("exit 1");
    let trigger = Trigger {
        kind: TriggerKind::Command(Box::new(cfg)),
        retries: 2,
        required: true,
        timeout: None,
        fail_fast: false,
    };
    let mut states = vec![TriggerState::new()];
    let (_drop_tx, mut parent_rx) = watch::channel(false);
    let (_cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
    let n = make_notification();
    let http = test_http_client();
    let fut = dispatch_triggers_with_backoff(
        std::slice::from_ref(&trigger),
        &mut states,
        &n,
        &mut parent_rx,
        &mut cancel_rx,
        &http,
        |_| Duration::from_millis(100),
    );
    tokio::pin!(fut);
    for _ in 0..2 {
        tokio::task::yield_now().await;
        tokio::time::advance(Duration::from_millis(110)).await;
    }
    let result = fut.await;
    match result {
        Err(DispatchOutcome::RequiredFailed {
            source: TriggerError::Command { exit_code: 1, .. },
            ..
        }) => {}
        other => panic!("expected RequiredFailed after retries exhausted, got {other:?}"),
    }
}

#[cfg(unix)]
#[tokio::test]
async fn command_timeout_returns_timeout_error_after_kill_and_reap() {
    let cfg = build_command_config("sleep 30");
    let trigger = Trigger {
        kind: TriggerKind::Command(Box::new(cfg)),
        retries: 0,
        required: true,
        timeout: Some(Duration::from_millis(200)),
        fail_fast: true,
    };
    let mut states = vec![TriggerState::new()];
    let started = std::time::Instant::now();
    let result = run_once(&[trigger], &mut states, |_| Duration::from_millis(1)).await;
    let elapsed = started.elapsed();
    assert!(
        matches!(
            result,
            Err(DispatchOutcome::RequiredFailed {
                source: TriggerError::Timeout(_),
                ..
            })
        ),
        "got: {result:?}"
    );
    assert!(
        elapsed < Duration::from_secs(5),
        "timeout must kill the child quickly; took {elapsed:?}"
    );
}

#[tokio::test]
async fn parent_cancel_between_triggers_returns_cancelled() {
    let echo1 = Trigger::echo();
    let echo2 = Trigger::echo();
    let mut states = vec![TriggerState::new(), TriggerState::new()];
    let (drop_tx, mut parent_rx) = watch::channel(false);
    let (_cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
    drop_tx.send(true).unwrap();
    let n = make_notification();
    let http = test_http_client();
    let result = dispatch_triggers_with_backoff(
        &[echo1, echo2],
        &mut states,
        &n,
        &mut parent_rx,
        &mut cancel_rx,
        &http,
        |_| Duration::from_millis(1),
    )
    .await;
    assert!(matches!(result, Err(DispatchOutcome::Cancelled)));
}

#[tokio::test]
async fn webhook_4xx_is_terminal_with_fail_fast_true() {
    use crate::watch::trigger::webhook::build_webhook_config;
    let cfg = build_webhook_config("http://127.0.0.1:1/unused");
    let trigger = Trigger {
        kind: TriggerKind::Webhook(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: true,
    };
    let err = TriggerError::Webhook {
        status: Some(reqwest::StatusCode::BAD_REQUEST),
        body_tail: String::new(),
    };
    assert!(
        super::is_terminal_error(&trigger, &err),
        "4xx must be terminal under fail_fast = true"
    );
}

#[tokio::test]
async fn webhook_5xx_is_retryable_with_fail_fast_true() {
    use crate::watch::trigger::webhook::build_webhook_config;
    let cfg = build_webhook_config("http://127.0.0.1:1/unused");
    let trigger = Trigger {
        kind: TriggerKind::Webhook(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: true,
    };
    let err = TriggerError::Webhook {
        status: Some(reqwest::StatusCode::INTERNAL_SERVER_ERROR),
        body_tail: String::new(),
    };
    assert!(
        !super::is_terminal_error(&trigger, &err),
        "5xx must stay retryable even under fail_fast = true"
    );
}

#[tokio::test]
async fn webhook_transport_error_is_retryable_with_fail_fast_true() {
    use crate::watch::trigger::webhook::build_webhook_config;
    let cfg = build_webhook_config("http://127.0.0.1:1/unused");
    let trigger = Trigger {
        kind: TriggerKind::Webhook(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: true,
    };
    let err = TriggerError::Webhook {
        status: None,
        body_tail: String::new(),
    };
    assert!(
        !super::is_terminal_error(&trigger, &err),
        "transport error (None status) must stay retryable"
    );
}

#[tokio::test]
async fn webhook_build_error_is_terminal_with_fail_fast_true() {
    use crate::watch::trigger::webhook::build_webhook_config;
    let cfg = build_webhook_config("http://127.0.0.1:1/unused");
    let trigger = Trigger {
        kind: TriggerKind::Webhook(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: true,
    };
    let err = TriggerError::WebhookBuild {
        reason: "request build failed (invalid URL or header value)".to_string(),
    };
    assert!(
        super::is_terminal_error(&trigger, &err),
        "WebhookBuild must be terminal under fail_fast = true; the failure is deterministic"
    );
}

#[tokio::test]
async fn webhook_build_error_is_retryable_with_fail_fast_false() {
    use crate::watch::trigger::webhook::build_webhook_config;
    let cfg = build_webhook_config("http://127.0.0.1:1/unused");
    let trigger = Trigger {
        kind: TriggerKind::Webhook(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: false,
    };
    let err = TriggerError::WebhookBuild {
        reason: "request build failed (invalid URL or header value)".to_string(),
    };
    assert!(
        !super::is_terminal_error(&trigger, &err),
        "fail_fast=false must keep every error retryable, including WebhookBuild"
    );
}

#[tokio::test]
async fn webhook_4xx_is_retryable_with_fail_fast_false() {
    use crate::watch::trigger::webhook::build_webhook_config;
    let cfg = build_webhook_config("http://127.0.0.1:1/unused");
    let trigger = Trigger {
        kind: TriggerKind::Webhook(Box::new(cfg)),
        retries: 5,
        required: true,
        timeout: None,
        fail_fast: false,
    };
    let err = TriggerError::Webhook {
        status: Some(reqwest::StatusCode::BAD_REQUEST),
        body_tail: String::new(),
    };
    assert!(
        !super::is_terminal_error(&trigger, &err),
        "fail_fast=false must keep every error retryable, including 4xx"
    );
}