use actix_web::{HttpResponse, web};
use anyhow::Result;
use futures_util::StreamExt as FuturesStreamExt;
use std::sync::Arc;
use tokio::time::Duration;
use tokio_stream::StreamExt as TokioStreamExt;
use tokio_util::sync::CancellationToken;
use super::helpers::{
apply_stream_lifecycle, create_heartbeat_stream, create_sse_response, frame_to_sse_bytes,
};
use super::types::{ControlEvent, DeliveryKind, StreamFrame};
use crate::configuration::Settings;
use crate::notification::IdentifierConstraint;
use crate::notification::decode_subject_for_display;
use crate::notification_backend::{NotificationBackend, NotificationMessage};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
pub fn create_live_notification_stream(
notification_stream: impl tokio_stream::Stream<Item = NotificationMessage> + Send + 'static,
_concurrent_limit: usize,
) -> impl tokio_stream::Stream<Item = StreamFrame> {
FuturesStreamExt::map(notification_stream, move |notification| {
StreamFrame::Notification {
notification,
kind: DeliveryKind::Live,
}
})
}
pub async fn create_watch_sse_stream(
topic: String,
backend: Arc<dyn NotificationBackend>,
shutdown: web::Data<CancellationToken>,
request_params: Arc<std::collections::HashMap<String, String>>,
request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
sse_guard: Option<crate::metrics::SseConnectionGuard>,
) -> Result<HttpResponse> {
let app_settings = Settings::get_global_application_settings();
let watch_config = Settings::get_global_watch_settings();
let notification_stream = backend.subscribe_to_topic(&topic).await?;
let request_params_clone = request_params.clone();
let request_constraints_clone = request_constraints.clone();
let filtered_stream = futures_util::StreamExt::filter_map(
notification_stream,
move |message: NotificationMessage| {
filter_notification_message(
message,
request_params_clone.clone(),
request_constraints_clone.clone(),
)
},
);
let notification_sse_stream = create_live_notification_stream(
filtered_stream,
watch_config.concurrent_notification_processing,
);
let connection_timeout = Duration::from_secs(watch_config.connection_max_duration_sec);
let initial_stream =
tokio_stream::once(StreamFrame::Control(ControlEvent::ConnectionEstablished {
topic: topic.clone(),
timestamp: chrono::Utc::now(),
connection_will_close_in_seconds: connection_timeout.as_secs(),
}));
let heartbeat_stream =
create_heartbeat_stream(topic.clone(), watch_config.sse_heartbeat_interval_sec);
let merged_stream = TokioStreamExt::merge(
FuturesStreamExt::chain(initial_stream, notification_sse_stream),
heartbeat_stream,
);
let stream_with_lifecycle = apply_stream_lifecycle(
merged_stream,
topic.clone(),
shutdown.get_ref().clone(),
Some(connection_timeout),
);
let base_url = app_settings.base_url.clone();
let byte_stream = FuturesStreamExt::map(stream_with_lifecycle, move |frame| {
frame_to_sse_bytes(frame, &base_url)
});
tracing::info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "stream.watch.live.created",
topic = %decode_subject_for_display(&topic),
timeout_seconds = connection_timeout.as_secs(),
concurrent_limit = watch_config.concurrent_notification_processing,
"SSE stream created with graceful-shutdown support and filtering"
);
Ok(create_sse_response(byte_stream, sse_guard))
}
pub async fn filter_notification_message(
message: NotificationMessage,
request_params: Arc<std::collections::HashMap<String, String>>,
request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
) -> Option<NotificationMessage> {
let result = crate::notification::wildcard_matcher::matches_notification_filters(
&message.topic,
&request_params,
&request_constraints,
message.metadata.as_ref(),
&message.payload,
);
tracing::debug!(
filter_result = result,
request_params = ?*request_params,
message_metadata = ?message.metadata,
message_payload = %message.payload,
"Live filter decision"
);
if result { Some(message) } else { None }
}