aviso 2.0.0-rc.2

Core client library for aviso-server, ECMWF's notification service.
Documentation
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

use super::*;

#[tokio::test]
async fn prev_notification_is_committed_before_current_triggers_run() {
    use crate::state::MemoryStore;
    use crate::watch::Trigger;
    use tokio::time::timeout;

    let server = MockServer::start().await;
    let body = format!(
        "{}{}",
        sse_chunk("live-notification", cloud_event("mars", 1)),
        sse_chunk("live-notification", cloud_event("mars", 2)),
    );
    Mock::given(method("POST"))
        .and(path("/api/v1/watch"))
        .respond_with(
            ResponseTemplate::new(200)
                .insert_header("content-type", "text/event-stream")
                .set_body_string(body),
        )
        .mount(&server)
        .await;

    let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
    let (trigger, _counter) = Trigger::test_fail_on_call(2, 0, true);
    let request = WatchRequest::watch("mars").with_triggers(vec![trigger]);
    let (mut rx, _cancel, handle, _drop) =
        start_supervisor_with_store(&server, request, Some(store.clone()));

    let first = timeout(Duration::from_secs(5), rx.recv()).await;
    let first = first
        .expect("first notification arrives within 5s")
        .expect("channel still open")
        .expect("first notification is Ok");
    assert_eq!(first.sequence, 1);

    let second = timeout(Duration::from_secs(5), rx.recv()).await;
    let second = second
        .expect("trigger failure arrives within 5s")
        .expect("channel still open");
    match second {
        Err(ClientError::TriggerFailed { .. }) => {}
        other => panic!("expected TriggerFailed on second item, got {other:?}"),
    }
    let none = timeout(Duration::from_secs(2), rx.recv()).await;
    let none = none.expect("channel closes within 2s after terminal error");
    assert!(none.is_none());

    timeout(Duration::from_secs(2), handle)
        .await
        .expect("supervisor exits within 2s")
        .expect("supervisor task must not panic");

    let base_url = url::Url::parse(&format!("{}/", server.uri())).unwrap();
    let resume_key = ResumeKey::new(&base_url, "mars", &json!({}), None).unwrap();
    let checkpoint = store.get(&resume_key).await.unwrap();
    let cp = checkpoint.expect("checkpoint for N=1 should be persisted");
    assert_eq!(
        cp.last_committed_sequence, 1,
        "N=1 must have been committed before N=2 triggers ran"
    );
}

#[tokio::test]
async fn flush_cursor_on_exit_persists_the_last_delivered_notification_when_no_next_arrives() {
    use crate::state::MemoryStore;
    use tokio::time::timeout;

    let server = MockServer::start().await;
    let body = sse_chunk("live-notification", cloud_event("mars", 1));
    Mock::given(method("POST"))
        .and(path("/api/v1/watch"))
        .respond_with(
            ResponseTemplate::new(200)
                .insert_header("content-type", "text/event-stream")
                .set_body_string(body),
        )
        .up_to_n_times(1)
        .mount(&server)
        .await;

    let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
    let request = WatchRequest::watch("mars");
    let (mut rx, _cancel, handle, _drop) =
        start_supervisor_with_store_and_flush(&server, request, store.clone());

    let first = timeout(Duration::from_secs(5), rx.recv())
        .await
        .expect("first notification arrives within 5s")
        .expect("channel still open")
        .expect("first notification is Ok");
    assert_eq!(first.sequence, 1);

    let _ = timeout(Duration::from_secs(15), rx.recv()).await;
    timeout(Duration::from_secs(15), handle)
        .await
        .expect("supervisor exits within 15s")
        .expect("supervisor task must not panic");

    let base_url = url::Url::parse(&format!("{}/", server.uri())).unwrap();
    let resume_key = ResumeKey::new(&base_url, "mars", &json!({}), None).unwrap();
    let cp = store
        .get(&resume_key)
        .await
        .expect("store.get is infallible for MemoryStore")
        .expect(
            "with flush_cursor_on_exit=true, pending N=1 MUST be persisted on supervisor exit even though commit-on-next-send never fired (no N=2 ever arrived to promote it); otherwise the next run would redeliver N=1 and an interactive operator would see a duplicate",
        );
    assert_eq!(
        cp.last_committed_sequence, 1,
        "the post-loop flush must persist the last successfully delivered notification"
    );
}

#[tokio::test]
async fn flush_cursor_on_exit_default_false_preserves_at_least_once_redelivery_invariant() {
    use crate::state::MemoryStore;
    use tokio::time::timeout;

    let server = MockServer::start().await;
    let body = sse_chunk("live-notification", cloud_event("mars", 1));
    Mock::given(method("POST"))
        .and(path("/api/v1/watch"))
        .respond_with(
            ResponseTemplate::new(200)
                .insert_header("content-type", "text/event-stream")
                .set_body_string(body),
        )
        .up_to_n_times(1)
        .mount(&server)
        .await;

    let store: Arc<dyn StateStore> = Arc::new(MemoryStore::new());
    let request = WatchRequest::watch("mars");
    let (mut rx, _cancel, handle, _drop) =
        start_supervisor_with_store(&server, request, Some(store.clone()));

    let first = timeout(Duration::from_secs(5), rx.recv())
        .await
        .expect("first notification arrives within 5s")
        .expect("channel still open")
        .expect("first notification is Ok");
    assert_eq!(first.sequence, 1);

    let _ = timeout(Duration::from_secs(15), rx.recv()).await;
    timeout(Duration::from_secs(15), handle)
        .await
        .expect("supervisor exits within 15s")
        .expect("supervisor task must not panic");

    let base_url = url::Url::parse(&format!("{}/", server.uri())).unwrap();
    let resume_key = ResumeKey::new(&base_url, "mars", &json!({}), None).unwrap();
    let stored = store
        .get(&resume_key)
        .await
        .expect("store.get is infallible for MemoryStore");
    assert!(
        stored.is_none(),
        "regression guard: with flush_cursor_on_exit=false (the default), commit-on-next-send must NOT fire for a notification with no successor; the at-least-once contract requires the next run to redeliver N=1"
    );
}