aviso-server 0.5.0

Notification service for data-driven workflows with live and replay APIs.
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

//! Live notification streaming functionality

use actix_web::{HttpResponse, web};
use anyhow::Result;
use futures_util::StreamExt as FuturesStreamExt;
use std::sync::Arc;
use tokio::time::Duration;
use tokio_stream::StreamExt as TokioStreamExt;
use tokio_util::sync::CancellationToken;

use super::helpers::{
    apply_stream_lifecycle, create_heartbeat_stream, create_sse_response, frame_to_sse_bytes,
};
use super::types::{ControlEvent, DeliveryKind, StreamFrame};
use crate::configuration::Settings;
use crate::notification::IdentifierConstraint;
use crate::notification::decode_subject_for_display;
use crate::notification_backend::{NotificationBackend, NotificationMessage};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};

/// Create a live notification stream from a backend subscription
pub(crate) fn create_live_notification_stream(
    notification_stream: impl tokio_stream::Stream<Item = NotificationMessage> + Send + 'static,
    _concurrent_limit: usize,
) -> impl tokio_stream::Stream<Item = StreamFrame> {
    // Preserve backend emission order. This path intentionally avoids unordered buffering.
    FuturesStreamExt::map(notification_stream, move |notification| {
        StreamFrame::Notification {
            notification,
            kind: DeliveryKind::Live,
        }
    })
}

/// Create a live SSE stream for the watch endpoint, applying field/spatial filtering
///
/// - Subscribes to the notification topic for real-time events
/// - Filters each notification using matches_notification_filters (param/polygon filtering)
/// - Sends connection established and heartbeat events, supports graceful shutdown
pub(crate) async fn create_watch_sse_stream(
    topic: String,
    backend: Arc<dyn NotificationBackend>,
    shutdown: web::Data<CancellationToken>,
    request_params: Arc<std::collections::HashMap<String, String>>,
    request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
    sse_guard: Option<crate::metrics::SseConnectionGuard>,
    request_id: String,
) -> Result<HttpResponse> {
    let app_settings = Settings::get_global_application_settings();
    let watch_config = Settings::get_global_watch_settings();

    // Subscribe to the topic for real-time notifications
    let notification_stream = backend.subscribe_to_topic(&topic).await?;

    let request_params_clone = request_params.clone();
    let request_constraints_clone = request_constraints.clone();
    let filtered_stream = futures_util::StreamExt::filter_map(
        notification_stream,
        move |message: NotificationMessage| {
            filter_notification_message(
                message,
                request_params_clone.clone(),
                request_constraints_clone.clone(),
            )
        },
    );

    // Convert filtered notifications into typed live frames.
    let notification_sse_stream = create_live_notification_stream(
        filtered_stream,
        watch_config.concurrent_notification_processing,
    );

    // Send initial connection established event.
    let connection_timeout = Duration::from_secs(watch_config.connection_max_duration_sec);
    let initial_stream =
        tokio_stream::once(StreamFrame::Control(ControlEvent::ConnectionEstablished {
            topic: topic.clone(),
            timestamp: chrono::Utc::now(),
            connection_will_close_in_seconds: connection_timeout.as_secs(),
            request_id: request_id.clone(),
        }));

    // Create heartbeat stream
    let heartbeat_stream =
        create_heartbeat_stream(topic.clone(), watch_config.sse_heartbeat_interval_sec);

    // Order matters: chain the initial control event BEFORE merging live with
    // heartbeat. tokio_stream::merge polls round-robin and the heartbeat
    // interval's first tick fires immediately, so without this pinning a
    // heartbeat can race the connection_established frame and the request_id
    // would not be in the first event.
    let live_with_heartbeat = TokioStreamExt::merge(notification_sse_stream, heartbeat_stream);
    let merged_stream = FuturesStreamExt::chain(initial_stream, live_with_heartbeat);

    // Apply lifecycle and convert typed frames to SSE bytes.
    let stream_with_lifecycle = apply_stream_lifecycle(
        merged_stream,
        topic.clone(),
        shutdown.get_ref().clone(),
        Some(connection_timeout),
        request_id.clone(),
    );
    let base_url = app_settings.base_url.clone();
    let request_id_for_bytes = request_id.clone();
    let byte_stream = FuturesStreamExt::map(stream_with_lifecycle, move |frame| {
        frame_to_sse_bytes(frame, &base_url, &request_id_for_bytes)
    });

    tracing::info!(
        service_name = SERVICE_NAME,
        service_version = SERVICE_VERSION,
        event_name = "stream.watch.live.created",
        topic = %decode_subject_for_display(&topic),
        timeout_seconds = connection_timeout.as_secs(),
        concurrent_limit = watch_config.concurrent_notification_processing,
        request_id = %request_id,
        "SSE stream created with graceful-shutdown support and filtering"
    );

    Ok(create_sse_response(byte_stream, sse_guard))
}

pub async fn filter_notification_message(
    message: NotificationMessage,
    request_params: Arc<std::collections::HashMap<String, String>>,
    request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
) -> Option<NotificationMessage> {
    let result = crate::notification::wildcard_matcher::matches_notification_filters(
        &message.topic,
        &request_params,
        &request_constraints,
        message.metadata.as_ref(),
        &message.payload,
    );
    tracing::debug!(
        filter_result = result,
        request_params = ?*request_params,
        message_metadata = ?message.metadata,
        message_payload = %message.payload,
        "Live filter decision"
    );
    if result { Some(message) } else { None }
}