aviso-server 0.5.0

Notification service for data-driven workflows with live and replay APIs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
use crate::helpers::{
    mock_ecpds, mock_ecpds_always_down, spawn_streaming_test_app_with_auth,
    spawn_streaming_test_app_with_auth_partial_outage, test_jwt,
};
use serde_json::json;

fn ecpds_token(username: &str, roles: &[&str]) -> String {
    test_jwt(username, roles)
}

fn diss_ecpds_watch_body(destination: &str) -> serde_json::Value {
    json!({
        "event_type": "dissemination_ecpds",
        "identifier": {
            "destination": destination,
            "class": "od"
        }
    })
}

fn diss_ecpds_replay_body(destination: &str) -> serde_json::Value {
    json!({
        "event_type": "dissemination_ecpds",
        "identifier": {
            "destination": destination,
            "class": "od"
        },
        "from_id": "1"
    })
}

#[tokio::test]
async fn watch_without_ecpds_plugin_allows_authenticated_user() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("reader-user", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&json!({
            "event_type": "test_polygon_auth_any",
            "identifier": {
                "date": "20250706",
                "time": "1200",
                "polygon": "(0,0,0,1,1,1,0,0)"
            }
        }))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::OK);
}

#[tokio::test]
async fn watch_ecpds_allows_user_with_valid_destination() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("ecpds-user", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("CIP"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::OK);
}

#[tokio::test]
async fn watch_ecpds_denies_user_without_matching_destination() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("ecpds-user", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("UNKNOWN"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::FORBIDDEN);
}

#[tokio::test]
async fn watch_ecpds_denies_user_with_empty_destination_list() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("ecpds-noaccess", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("CIP"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::FORBIDDEN);
}

#[tokio::test]
async fn watch_ecpds_bypasses_check_for_admin() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("admin-user", &["admin"]);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("ANYTHING"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::OK);
}

#[tokio::test]
async fn watch_ecpds_unauthenticated_request_returns_401() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .json(&diss_ecpds_watch_body("CIP"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::UNAUTHORIZED);
}

#[tokio::test]
async fn replay_ecpds_allows_user_with_valid_destination() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("ecpds-user", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/replay", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_replay_body("CIP"))
        .send()
        .await
        .expect("replay request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::OK);
}

#[tokio::test]
async fn replay_ecpds_denies_user_without_matching_destination() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("ecpds-user", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/replay", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_replay_body("UNKNOWN"))
        .send()
        .await
        .expect("replay request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::FORBIDDEN);
}

#[tokio::test]
async fn watch_ecpds_returns_503_when_all_servers_fail() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let token = ecpds_token("ecpds-unavailable", &["reader"]);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("CIP"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
}

#[tokio::test]
async fn watch_ecpds_caches_per_user_exactly_one_upstream_call() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let username = "ecpds-user-cache-test";
    let token = ecpds_token(username, &["reader"]);
    let mock = mock_ecpds();
    let before = mock.count_for(username);

    for _ in 0..3 {
        let response = client
            .post(format!("{}/api/v1/watch", app.address))
            .header("Content-Type", "application/json")
            .header("Authorization", format!("Bearer {token}"))
            .json(&diss_ecpds_watch_body("CIP"))
            .send()
            .await
            .expect("watch request should complete");
        assert_eq!(response.status(), reqwest::StatusCode::OK);
    }

    let after = mock.count_for(username);
    assert_eq!(
        after - before,
        1,
        "cache must coalesce 3 sequential requests for {username} into a single upstream fetch"
    );
}

#[tokio::test]
async fn watch_ecpds_concurrent_requests_coalesce() {
    let app = spawn_streaming_test_app_with_auth().await;
    let username = "ecpds-user-stampede-test";
    let token = ecpds_token(username, &["reader"]);
    let mock = mock_ecpds();
    let before = mock.count_for(username);

    let mut handles = Vec::new();
    for _ in 0..10 {
        let address = app.address.clone();
        let token = token.clone();
        handles.push(tokio::spawn(async move {
            reqwest::Client::new()
                .post(format!("{}/api/v1/watch", address))
                .header("Content-Type", "application/json")
                .header("Authorization", format!("Bearer {token}"))
                .json(&diss_ecpds_watch_body("CIP"))
                .send()
                .await
        }));
    }

    for handle in handles {
        let response = handle
            .await
            .expect("task must join")
            .expect("watch request should complete");
        assert_eq!(response.status(), reqwest::StatusCode::OK);
    }

    let after = mock.count_for(username);
    assert_eq!(
        after - before,
        1,
        "single-flight must coalesce 10 concurrent requests for {username} into a single upstream fetch"
    );
}

#[tokio::test]
async fn watch_ecpds_username_with_special_chars_handled() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let username = "u+s er&name";
    let token = ecpds_token(username, &["reader"]);
    let mock = mock_ecpds();
    let before = mock.count_for(username);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("CIP"))
        .send()
        .await
        .expect("watch request should complete");

    assert_eq!(response.status(), reqwest::StatusCode::OK);
    let after = mock.count_for(username);
    assert_eq!(
        after - before,
        1,
        "username with `+`, ` ` and `&` must round-trip URL-encoded \
         and reach the upstream identified by the original (decoded) value"
    );
}

#[tokio::test]
async fn notify_on_ecpds_protected_stream_does_not_invoke_ecpds_for_admin() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let username = "admin-user-notify-bypass";
    let token = ecpds_token(username, &["admin"]);
    let mock = mock_ecpds();
    let before = mock.count_for(username);

    let response = client
        .post(format!("{}/api/v1/notification", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&json!({
            "event_type": "dissemination_ecpds",
            "identifier": {
                "destination": "any-value-not-checked",
                "class": "od"
            },
            "payload": "irrelevant"
        }))
        .send()
        .await
        .expect("notify request should complete");

    assert!(
        response.status().is_success() || response.status().is_client_error(),
        "notify on ECPDS-protected stream must not 503; got {}",
        response.status()
    );
    let after = mock.count_for(username);
    assert_eq!(
        after, before,
        "notify must NOT invoke the ECPDS plugin under any policy"
    );
}

/// Non-admin counterpart to the admin notify-bypass test above. The ECPDS
/// plugin is read-only by design (only `enforce_ecpds_auth` callers in
/// watch/replay invoke it); admins additionally bypass the plugin even
/// on reads, so a passing admin test does not by itself prove that the
/// plugin is not consulted on writes. This case uses a non-admin
/// `producer` writer on a stream whose `auth.write_roles` grants that
/// role write access while keeping `plugins: ["ecpds"]` enabled. If a
/// future change accidentally wired `enforce_ecpds_auth` into the notify
/// path, the mock ECPDS would be hit (and likely deny since the user has
/// no destination list) rather than letting this assertion stay green.
#[tokio::test]
async fn notify_on_ecpds_protected_stream_does_not_invoke_ecpds_for_non_admin_writer() {
    let app = spawn_streaming_test_app_with_auth().await;
    let client = reqwest::Client::new();
    let username = "producer-user-notify-non-admin";
    let token = ecpds_token(username, &["producer"]);
    let mock = mock_ecpds();
    let before = mock.count_for(username);

    let response = client
        .post(format!("{}/api/v1/notification", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&json!({
            "event_type": "dissemination_ecpds_writable",
            "identifier": {
                "destination": "any-value-not-checked",
                "class": "od"
            },
            "payload": { "note": "non-admin notify smoke" }
        }))
        .send()
        .await
        .expect("notify request should complete");

    let status = response.status();
    assert_ne!(
        status,
        reqwest::StatusCode::FORBIDDEN,
        "non-admin producer must be authorised to write by the test schema's \
         write_roles. A 403 here means either the schema or the role mapping \
         drifted; this test cannot prove notify ungating from a 403."
    );
    assert_ne!(
        status,
        reqwest::StatusCode::SERVICE_UNAVAILABLE,
        "notify by a non-admin producer on an ECPDS-protected writable stream \
         must not 503. A 503 means the plugin incorrectly ran on a write."
    );
    let after = mock.count_for(username);
    assert_eq!(
        after, before,
        "notify must NOT invoke the ECPDS plugin even for a non-admin writer"
    );
}

/// Federated ECPDS deployment under `partial_outage_policy: any_success`:
/// one healthy server (`MOCK_ECPDS`) returns the user's destinations,
/// one consistently failing server (`MOCK_ECPDS_ALWAYS_DOWN`) returns 500.
///
/// Asserts at the HTTP boundary that:
///
/// * The watch request succeeds because at least one server responded.
/// * The checker fan-out is exercised through the HTTP layer: per-mock
///   counters on both configured servers each incremented for the user.
///   Without this, an accidental regression to single-server behaviour
///   (or to any other `servers` subset) would still leave the success
///   path green for users known to the first server.
///
/// Outcome-label propagation through `aviso_ecpds_fetch_total` is
/// verified separately at the subcrate level
/// (`any_success_policy_succeeds_when_one_server_is_down` asserts the
/// merged `FetchOutcome` is the worst per-server failure, not synthetic
/// `Success`); the HTTP layer reads that outcome out of
/// `CacheOutcome::MissFetched { fetch_outcome }` via a single
/// `record_fetch(fetch_outcome.label())` call already covered by the
/// other watch tests.
#[tokio::test]
async fn watch_ecpds_any_success_partial_outage_succeeds_and_queries_both_servers() {
    let app = spawn_streaming_test_app_with_auth_partial_outage().await;
    let client = reqwest::Client::new();
    let username = "ecpds-user-partial-outage";
    let token = ecpds_token(username, &["reader"]);
    let healthy = mock_ecpds();
    let down = mock_ecpds_always_down();
    let healthy_before = healthy.count_for(username);
    let down_before = down.count_for(username);

    let response = client
        .post(format!("{}/api/v1/watch", app.address))
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {token}"))
        .json(&diss_ecpds_watch_body("CIP"))
        .send()
        .await
        .expect("watch request should complete");

    assert!(
        response.status().is_success(),
        "any_success policy with one healthy server must allow the request; got {}",
        response.status()
    );
    drop(response);

    assert_eq!(
        healthy.count_for(username) - healthy_before,
        1,
        "healthy server must have received exactly one upstream call for {username}"
    );
    assert_eq!(
        down.count_for(username) - down_before,
        1,
        "always-down server must also have received exactly one upstream call for \
         {username}; if zero, the checker fan-out is not being exercised through \
         the HTTP layer (config drift on `servers` or wiring regression) and the \
         partial-outage path is untested"
    );
}