aviso-server 0.6.2

Notification service for data-driven workflows with live and replay APIs.
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

use crate::error::{
    ProcessingKind, RequestKind, processing_error_response, request_parse_error_response,
    request_validation_error_response,
};
use crate::handlers::{
    NotificationErrorKind, parse_and_validate_request, process_notification_request,
    save_to_backend,
};
use crate::metrics::AppMetrics;
use crate::notification::OperationType;
use crate::notification::decode_subject_for_display;
use crate::notification_backend::NotificationBackend;
use crate::routes::streaming::{
    StreamOperation, bucket_event_type_for_observability, enforce_known_event_type,
    enforce_stream_auth,
};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
use crate::types::{NotificationRequest, NotificationResponse};
use actix_web::{HttpRequest, HttpResponse, web};
use std::sync::Arc;
use tracing::info;
use tracing_actix_web::RequestId;

/// Notification endpoint handler
///
/// Validates request format, processes the notification, and saves to backend.
#[utoipa::path(
    post,
    path = "/api/v1/notification",
    tag = "notification",
    request_body = NotificationRequest,
    responses(
        (status = 200, description = "Notification processed and stored successfully", body = crate::types::NotificationResponse),
        (status = 400, description = "Invalid request data or validation failure"),
        (status = 401, description = "Missing or invalid credentials (when stream requires auth)"),
        (status = 403, description = "Valid credentials but insufficient roles for this stream"),
        (status = 500, description = "Internal server error during processing"),
        (status = 503, description = "Authentication service unavailable (direct mode)")
    ),
    security(
        (),
        ("bearer_jwt" = []),
        ("basic" = []),
    )
)]
#[tracing::instrument(
    skip(body, notification_backend, metrics),
    fields(
        event_type = tracing::field::Empty,
        topic = tracing::field::Empty,
        request_id = %request_id,
        spatial_enabled = tracing::field::Empty,
    )
)]
pub async fn notify(
    http_request: HttpRequest,
    body: web::Bytes,
    notification_backend: web::Data<Arc<dyn NotificationBackend>>,
    request_id: RequestId,
    metrics: Option<web::Data<AppMetrics>>,
) -> HttpResponse {
    let request_id_str = request_id.to_string();
    // Parse and validate request structure
    let payload = match parse_and_validate_request(&body) {
        Ok(p) => p,
        Err(e) => {
            record_notification(&metrics, "unknown", "error");
            return request_parse_error_response(RequestKind::Notification, e, &request_id_str);
        }
    };

    let event_type = &payload.event_type;
    let request_params = &payload.identifier;

    // Strict-mode guard: reject unknown event_types BEFORE any endpoint-specific
    // request-shape checks, auth, and any sink that would record the
    // user-controlled `event_type` value into a Prometheus label or a tracing
    // span field. The rejection branch below does emit a notification metric,
    // but with the fixed `"unknown"` label — bounded cardinality, so safe.
    //
    // This also makes UNKNOWN_EVENT_TYPE the canonical first-line rejection:
    // clients see one stable error type for any unknown event_type, independent
    // of whether they also got the identifier shape wrong. The function is a
    // no-op when strict mode is off.
    if let Err(response) = enforce_known_event_type(&http_request, event_type) {
        record_notification(&metrics, "unknown", "rejected");
        return response;
    }

    // Single source of truth for observability labels: bucket to "generic" when
    // the event_type is not in the schema (only reachable in non-strict mode).
    // Use this for BOTH the tracing span field and Prometheus metric labels so
    // the two stay in sync and neither leaks user-controlled cardinality.
    let event_type_label = bucket_event_type_for_observability(event_type);
    tracing::Span::current().record("event_type", event_type_label);

    if payload.identifier.contains_key("point") {
        record_notification(&metrics, event_type_label, "error");
        return request_validation_error_response(
            RequestKind::Notification,
            anyhow::anyhow!(
                "identifier.point is only supported for watch/replay endpoints, not /notification"
            ),
            &request_id_str,
        );
    }

    // Reject unauthorized requests before validation/topic work.
    if let Err(response) = enforce_stream_auth(&http_request, event_type, StreamOperation::Write) {
        record_notification(&metrics, event_type_label, "rejected");
        return response;
    }

    let notification_result = match process_notification_request(
        event_type,
        request_params,
        &payload.payload,
        OperationType::Notify,
    ) {
        Ok(result) => result,
        Err(e) => match e.kind {
            NotificationErrorKind::Validation => {
                record_notification(&metrics, event_type_label, "error");
                return request_validation_error_response(
                    RequestKind::Notification,
                    e.source,
                    &request_id_str,
                );
            }
            NotificationErrorKind::Processing => {
                record_notification(&metrics, event_type_label, "error");
                return processing_error_response(
                    ProcessingKind::NotificationProcessing,
                    e.source,
                    &request_id_str,
                );
            }
        },
    };

    let display_topic = decode_subject_for_display(&notification_result.topic);
    tracing::Span::current().record("topic", &display_topic);
    tracing::Span::current().record(
        "spatial_enabled",
        notification_result.spatial_metadata.is_some(),
    );

    // Payload is always persisted as canonical JSON.
    // Missing optional payload is represented as JSON null.
    let payload_string = payload
        .payload
        .as_ref()
        .map(serde_json::Value::to_string)
        .unwrap_or_else(|| "null".to_string());
    let payload_size = payload_string.len();

    if let Err(e) = save_to_backend(
        &notification_result,
        payload_string,
        notification_backend.get_ref().as_ref(),
    )
    .await
    {
        record_notification(&metrics, event_type_label, "error");
        return processing_error_response(ProcessingKind::NotificationStorage, e, &request_id_str);
    }

    record_notification(&metrics, event_type_label, "success");

    let response = NotificationResponse {
        status: "success".to_string(),
        request_id: request_id_str,
        processed_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
    };

    // This single info event is now the canonical per-notification log line.
    // The previously-info storage and publisher events were demoted to debug
    // because they describe sub-steps of the same business outcome; their
    // unique fields (payload_size here, sequence/stream_name still on the
    // backend's debug line) live alongside this event when it matters.
    let payload_kind = payload
        .payload
        .as_ref()
        .map(json_value_kind)
        .unwrap_or("null");
    let spatial_enabled = notification_result.spatial_metadata.is_some();
    let spatial_bbox = notification_result
        .spatial_metadata
        .as_ref()
        .map(|metadata| metadata.bounding_box.as_str());
    info!(
        service_name = SERVICE_NAME,
        service_version = SERVICE_VERSION,
        event_name = "api.notification.processed",
        outcome = "success",
        topic = %display_topic,
        event_type = %notification_result.event_type,
        param_count = notification_result.canonicalized_params.len(),
        payload_kind = %payload_kind,
        payload_size = payload_size,
        spatial_enabled = spatial_enabled,
        spatial_bbox = ?spatial_bbox,
        "Notification processed and saved successfully"
    );

    HttpResponse::Ok().json(response)
}

fn record_notification(metrics: &Option<web::Data<AppMetrics>>, event_type: &str, status: &str) {
    if let Some(m) = metrics {
        m.notifications_total
            .with_label_values(&[event_type, status])
            .inc();
    }
}

fn json_value_kind(value: &serde_json::Value) -> &'static str {
    match value {
        serde_json::Value::Null => "null",
        serde_json::Value::Bool(_) => "bool",
        serde_json::Value::Number(_) => "number",
        serde_json::Value::String(_) => "string",
        serde_json::Value::Array(_) => "array",
        serde_json::Value::Object(_) => "object",
    }
}