aviso 2.0.0-rc.3

Core client library for aviso-server, ECMWF's notification service.
Documentation
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

use std::sync::Arc;

use reqwest::header::AUTHORIZATION;
use tokio::sync::{mpsc, oneshot, watch};
use url::Url;

use super::drain::drain_frames;
use super::{
    ConnectionOutcome, DrainOutcome, GapGuard, PendingCommit, apply_outcome,
    heartbeat_starvation_budget,
};
use crate::auth::AuthProvider;
use crate::state::{ResumeKey, StateStore};
use crate::watch::retry_after;
use crate::watch::wire::WireWatchRequest;
use crate::watch::{
    ConnectionLossReason, ReconnectPolicy, ResumeStart, WatchEvent, WatchMode, WatchRequest,
    WatchState,
};
use crate::{ClientError, Notification};

/// Drive one HTTP connection: open the request, classify the initial
/// response, decode chunks, feed them through the SSE parser plus
/// [`drain_frames`], and return a [`ConnectionOutcome`] the outer
/// supervisor loop dispatches on.
///
/// Returns:
/// - `ServerClosed` when the wire delivered a recognised
///   `connection-closing` frame; the reducer has transitioned via
///   `WatchEvent::ServerClose` and the outer loop reads the resulting
///   state to decide whether to reconnect or terminate.
/// - `HttpStatus { status, body, request_id, retry_after }` when the
///   initial response was not exactly `200 OK`. The reducer is NOT
///   advanced here; the outer loop classifies the status.
/// - `TransportError(_)` on a reqwest-level error (TLS, connect,
///   mid-stream read).
/// - `UnexpectedEof` when the response body ended without a
///   `connection-closing` frame; the reducer has transitioned via
///   `WatchEvent::ConnectionLost { reason: UnexpectedEof }`.
/// - `HeartbeatStarved` when the per-chunk `timeout(budget, ...)`
///   fired; the reducer has transitioned via
///   `WatchEvent::HeartbeatStarvation`.
/// - `Fatal(ClientError)` for terminal wire-level conditions surfaced
///   by [`drain_frames`] (malformed `CloudEvent` id, server `error`
///   event, unknown `connection-closing.reason`, decode failure,
///   gap detected, state-store failure).
/// - `Cancelled` when per-stream drop or parent drop fired during any
///   await.
#[allow(
    clippy::too_many_arguments,
    clippy::too_many_lines,
    reason = "the supervisor's collaborators are intentionally passed by reference rather than bundled into a context struct, so each await point owns clear borrows; the function's length is the natural shape of a single-connection runner that maps every initial-response and chunk-loop branch to a `ConnectionOutcome`, and splitting it further obscures the mapping table; threading triggers and trigger_states as additional references keeps the same shape and the cancel-safety analysis local to each await point"
)]
pub(super) async fn run_one_connection(
    state: &mut WatchState,
    last_reconnect_policy: &mut Option<ReconnectPolicy>,
    request: &WatchRequest,
    wire_from: Option<&ResumeStart>,
    commit_cursor: &mut Option<u64>,
    pending_commit: &mut Option<PendingCommit>,
    state_store: Option<&Arc<dyn StateStore>>,
    resume_key: &ResumeKey,
    http: &reqwest::Client,
    base_url: &Url,
    auth: Option<&Arc<dyn AuthProvider>>,
    heartbeat_interval: std::time::Duration,
    retry_counter: &mut u32,
    trigger_states: &mut [crate::watch::trigger::TriggerState],
    tx: &mpsc::Sender<Result<Notification, ClientError>>,
    cancel: &mut oneshot::Receiver<()>,
    parent_cancel: &mut watch::Receiver<bool>,
) -> ConnectionOutcome {
    let budget = heartbeat_starvation_budget(heartbeat_interval);
    let endpoint = match request.mode() {
        WatchMode::Watch => "api/v1/watch",
        WatchMode::ReplayOnly => "api/v1/replay",
    };
    let url = match base_url.join(endpoint) {
        Ok(u) => u,
        Err(e) => {
            return ConnectionOutcome::Fatal(ClientError::Config(format!(
                "build watch endpoint url from base {base_url} and path {endpoint:?}: {e}"
            )));
        }
    };
    let body = match WireWatchRequest::from_parts(request.event_type(), request.filter(), wire_from)
    {
        Ok(b) => b,
        Err(e) => return ConnectionOutcome::Fatal(e),
    };

    let auth_header = match auth {
        Some(provider) => {
            let result = tokio::select! {
                biased;
                _ = parent_cancel.changed() => return ConnectionOutcome::Cancelled,
                _ = &mut *cancel => return ConnectionOutcome::Cancelled,
                v = provider.authorization_header() => v,
            };
            match result {
                Ok(v) => Some(v),
                Err(e) => return ConnectionOutcome::Fatal(e),
            }
        }
        None => None,
    };

    let mut builder = http.post(url).json(&body);
    if let Some(value) = auth_header {
        builder = builder.header(AUTHORIZATION, value);
    }

    let send_result = tokio::select! {
        biased;
        _ = parent_cancel.changed() => return ConnectionOutcome::Cancelled,
        _ = &mut *cancel => return ConnectionOutcome::Cancelled,
        r = builder.send() => r,
    };
    let mut response = match send_result {
        Ok(r) => r,
        Err(e) => return ConnectionOutcome::TransportError(e),
    };

    // SSE streams MUST return 200. A 2xx-but-not-200 (204 No Content, 206
    // Partial Content, etc.) would have no streamable body and the
    // supervisor would EOF immediately into a reconnect loop without ever
    // surfacing the misclassified status to the consumer. Require exact
    // 200 so unexpected 2xx variants surface as `HttpStatus` with the
    // verbatim status code and the classifier in the outer loop decides
    // whether to retry or fatal them.
    if response.status() != reqwest::StatusCode::OK {
        let status = response.status().as_u16();
        let request_id = response
            .headers()
            .get("x-request-id")
            .and_then(|h| h.to_str().ok())
            .map(String::from);
        let retry_after = retry_after::parse_retry_after(response.headers().get("retry-after"));
        let body_result = tokio::select! {
            biased;
            _ = parent_cancel.changed() => return ConnectionOutcome::Cancelled,
            _ = &mut *cancel => return ConnectionOutcome::Cancelled,
            b = response.bytes() => b,
        };
        let body_bytes = match body_result {
            Ok(b) => b,
            Err(e) => return ConnectionOutcome::TransportError(e),
        };
        let body = String::from_utf8_lossy(&body_bytes).into_owned();
        return ConnectionOutcome::HttpStatus {
            status,
            body,
            request_id,
            retry_after,
        };
    }

    let session_request_id = response
        .headers()
        .get("x-request-id")
        .and_then(|h| h.to_str().ok())
        .map(String::from);
    tracing::debug!(
        event.name = "client.watch.subscribed",
        request_id = session_request_id.as_deref().unwrap_or("<absent>"),
        "watch session opened"
    );
    let connected = state.transition(WatchEvent::ConnectionEstablished);
    apply_outcome(last_reconnect_policy, connected);
    // Reset the retry counter as soon as the HTTP handshake succeeds.
    // Subsequent mid-stream failures (UnexpectedEof, HeartbeatStarved,
    // mid-stream TransportError) start a fresh failures-since-last-
    // success streak rather than inflating onto whatever streak led to
    // the just-completed connect. `ServerClosed` separately resets too,
    // but that path only covers server-emitted close frames; this
    // resets even when the session ends ungracefully later.
    *retry_counter = 0;

    let mut parser = finesse::Parser::new();
    // Strict gap detection assumes consecutive sequence numbers and
    // fires `ClientError::HistoryGap` on any jump. That contract
    // holds only for UNFILTERED listeners: when a filter is present,
    // the server applies it server-side and silently skips
    // non-matching events, so observed sequences naturally jump from
    // the client's perspective. Relaxed mode logs gaps at DEBUG but
    // does not terminate the watch.
    let mut gap_guard = if request.filter().is_empty() {
        GapGuard::starting_from(wire_from)
    } else {
        GapGuard::relaxed_starting_from(wire_from)
    };

    loop {
        // The heartbeat budget measures time waiting for the next wire
        // chunk, not the supervisor's total iteration time. Wrapping
        // each `response.chunk().await` in a fresh `timeout(budget, ...)`
        // means time spent in `drain_frames` (state-store `put`, channel
        // backpressure inside `send_or_cancel`, parser work) does NOT
        // consume the budget. Otherwise a slow consumer that filled the
        // bounded channel could provoke a false `HeartbeatStarved`
        // reconnect even with a healthy server.
        let timed = tokio::select! {
            biased;
            _ = parent_cancel.changed() => {
                let stop = state.transition(WatchEvent::Stop);
                apply_outcome(last_reconnect_policy, stop);
                return ConnectionOutcome::Cancelled;
            }
            _ = &mut *cancel => {
                let stop = state.transition(WatchEvent::Stop);
                apply_outcome(last_reconnect_policy, stop);
                return ConnectionOutcome::Cancelled;
            }
            r = tokio::time::timeout(budget, response.chunk()) => r,
        };
        let chunk = match timed {
            Ok(c) => c,
            Err(_elapsed) => {
                let starved = state.transition(WatchEvent::HeartbeatStarvation);
                apply_outcome(last_reconnect_policy, starved);
                return ConnectionOutcome::HeartbeatStarved;
            }
        };
        let eof = matches!(chunk, Ok(None));
        match chunk {
            Ok(Some(bytes)) => parser.feed(&bytes),
            Ok(None) => parser.end(),
            Err(transport_e) => return ConnectionOutcome::TransportError(transport_e),
        }
        match drain_frames(
            &mut parser,
            state,
            last_reconnect_policy,
            &mut gap_guard,
            commit_cursor,
            pending_commit,
            state_store,
            resume_key,
            request.triggers(),
            trigger_states,
            http,
            tx,
            cancel,
            parent_cancel,
        )
        .await
        {
            Ok(DrainOutcome::Continue) => {}
            Ok(DrainOutcome::ServerClosed) => return ConnectionOutcome::ServerClosed,
            Ok(DrainOutcome::StopRequested) => return ConnectionOutcome::Cancelled,
            Err(terminal_err) => return ConnectionOutcome::Fatal(terminal_err),
        }
        if state.is_terminal() {
            return ConnectionOutcome::ServerClosed;
        }
        if eof {
            // The wire ended without a `connection-closing` frame. The
            // outer reconnect loop classifies this as routine transport
            // loss and reconnects with exponential backoff; long-lived
            // watches survive NAT timeouts and half-open sockets through
            // this path.
            let lost = state.transition(WatchEvent::ConnectionLost {
                reason: ConnectionLossReason::UnexpectedEof,
            });
            apply_outcome(last_reconnect_policy, lost);
            return ConnectionOutcome::UnexpectedEof;
        }
    }
}