aviso-server 0.6.2

Notification service for data-driven workflows with live and replay APIs.
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

use crate::auth::middleware::get_username;
use crate::error::{
    RequestKind, request_parse_error_response, request_validation_error_response,
    sse_error_response,
};
use crate::handlers::{StreamingRequestProcessor, ValidationConfig, parse_and_validate_request};
use crate::metrics::AppMetrics;
use crate::notification::decode_subject_for_display;
use crate::notification_backend::NotificationBackend;
use crate::routes::streaming::{
    StreamOperation, bucket_event_type_for_observability, enforce_known_event_type,
    enforce_stream_auth, record_start_at_span_fields,
};
use crate::sse::replay::create_replay_only_stream;
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
use actix_web::{HttpRequest, HttpResponse, web};
use std::sync::Arc;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing_actix_web::RequestId;

/// Replay endpoint handler for historical message streaming
#[utoipa::path(
    post,
    path = "/api/v1/replay",
    tag = "streaming",
    request_body = crate::types::request::NotificationRequest,
    responses(
        (status = 200, description = "Historical replay stream established successfully", content_type = "text/event-stream"),
        (status = 400, description = "Invalid request parameters or missing from_id/from_date"),
        (status = 401, description = "Missing or invalid credentials (when stream requires auth)"),
        (status = 403, description = "Valid credentials but the user lacks role-based access for this stream, or, when the stream enables the ECPDS plugin, the requested destination is not in the user's ECPDS allow-list"),
        (status = 500, description = "Failed to establish replay stream"),
        (status = 503, description = "Authentication service unavailable (direct mode), or, when the stream enables the ECPDS plugin, ECPDS could not be reached under the active partial_outage_policy")
    ),
    security(
        (),
        ("bearer_jwt" = []),
        ("basic" = []),
    )
)]
#[tracing::instrument(
    skip(notification_backend, shutdown, metrics),
    fields(
        event_type = tracing::field::Empty,
        request_id = %request_id,
        from_id = tracing::field::Empty,
        from_date = tracing::field::Empty,
        endpoint = "replay",
    )
)]
pub async fn replay(
    body: web::Bytes,
    http_request: HttpRequest,
    notification_backend: web::Data<Arc<dyn NotificationBackend>>,
    shutdown: web::Data<CancellationToken>,
    request_id: RequestId,
    metrics: Option<web::Data<AppMetrics>>,
) -> HttpResponse {
    let request_id_str = request_id.to_string();
    // Parse and validate request structure
    let notification_request = match parse_and_validate_request(&body) {
        Ok(req) => req,
        Err(e) => return request_parse_error_response(RequestKind::Replay, e, &request_id_str),
    };

    // Strict-mode guard: reject unknown event_types BEFORE auth, span recording,
    // metric labels and any replay setup.
    if let Err(response) = enforce_known_event_type(&http_request, &notification_request.event_type)
    {
        return response;
    }

    // Single source of truth for observability labels: bucket to "generic" when
    // the event_type is not in the schema (only reachable in non-strict mode).
    // Used for BOTH the tracing span field and Prometheus SSE metric labels so
    // the two stay in sync and neither leaks user-controlled cardinality.
    let event_type_label = bucket_event_type_for_observability(&notification_request.event_type);

    // Enforce schema-level auth before replay setup to fail fast.
    if let Err(response) = enforce_stream_auth(
        &http_request,
        &notification_request.event_type,
        StreamOperation::Read,
    ) {
        return response;
    }

    let context = match StreamingRequestProcessor::process_request(
        &notification_request,
        request_id,
        ValidationConfig::for_replay(),
    ) {
        Ok(ctx) => ctx,
        Err(e) => {
            return request_validation_error_response(RequestKind::Replay, e, &request_id_str);
        }
    };

    tracing::Span::current().record("event_type", event_type_label);
    record_start_at_span_fields(context.start_at);

    #[cfg(feature = "ecpds")]
    if let Err(response) = crate::routes::streaming::enforce_ecpds_auth(
        &http_request,
        &context.event_type,
        &context.canonicalized_params,
    )
    .await
    {
        return response;
    }

    let display_topic = decode_subject_for_display(&context.topic);
    let setup_started_at = Instant::now();

    // Pass canonicalized params for downstream filtering
    let filtering_params = Arc::new(context.canonicalized_params.clone());
    let filtering_constraints = Arc::new(context.identifier_constraints.clone());

    // See watch.rs for why the guard is created before stream setup.
    let sse_guard = metrics.as_ref().map(|m| {
        let username = get_username(&http_request);
        m.track_sse_connection("replay", event_type_label, username.as_deref())
    });

    match create_replay_only_stream(
        context.topic.clone(),
        notification_backend.get_ref().clone(),
        context.start_at,
        shutdown.clone(),
        filtering_params,
        filtering_constraints,
        sse_guard,
        request_id_str.clone(),
    )
    .await
    {
        Ok(response) => {
            info!(
                service_name = SERVICE_NAME,
                service_version = SERVICE_VERSION,
                event_name = "api.replay.stream.established",
                outcome = "success",
                topic = %display_topic,
                start_at = ?context.start_at,
                stream_mode = "replay_only",
                setup_duration_ms = setup_started_at.elapsed().as_millis() as u64,
                "Replay-only SSE stream established successfully"
            );
            response
        }
        Err(e) => sse_error_response(e, &context.topic, &request_id_str),
    }
}