use actix_web::{HttpResponse, web};
use anyhow::Result;
use chrono::Utc;
use futures_util::StreamExt as FuturesStreamExt;
use futures_util::stream::unfold;
use std::sync::Arc;
use tokio::time::Duration;
use tokio_stream::StreamExt as TokioStreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use super::helpers::{
apply_stream_lifecycle, create_heartbeat_stream, create_sse_response, frame_to_sse_bytes,
};
use super::types::{ControlEvent, DeliveryKind, StreamFrame};
use crate::configuration::Settings;
use crate::notification::IdentifierConstraint;
use crate::notification::decode_subject_for_display;
use crate::notification::wildcard_matcher::matches_notification_filters;
use crate::notification_backend::{
NotificationBackend, NotificationMessage,
replay::{BatchParams, StartAt},
};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
pub fn create_historical_replay_stream(
topic: String,
backend: Arc<dyn NotificationBackend>,
start_at: StartAt,
request_params: Arc<std::collections::HashMap<String, String>>,
request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
) -> impl tokio_stream::Stream<Item = StreamFrame> {
let watch_config = Settings::get_global_watch_settings();
let initial_params =
BatchParams::new(topic.clone(), watch_config.replay_batch_size).with_start_at(start_at);
unfold(
(
backend,
initial_params,
true,
watch_config.replay_batch_delay_ms,
request_params,
request_constraints,
),
move |(
backend,
mut params,
mut has_more,
delay_ms,
request_params,
request_constraints,
)| async move {
if !has_more {
return None;
}
match backend.get_messages_batch(params.clone()).await {
Ok(batch_result) => {
debug!(
topic = %decode_subject_for_display(¶ms.topic),
batch_size = batch_result.batch_size,
has_more = batch_result.has_more,
last_sequence = ?batch_result.last_sequence,
"Retrieved historical message batch"
);
has_more = batch_result.has_more;
if let Some(next_seq) = batch_result.next_sequence {
params = params.with_sequence(next_seq);
}
let mut frames = Vec::new();
for message in batch_result.messages {
if !matches_notification_filters(
&message.topic,
&request_params,
&request_constraints,
message.metadata.as_ref(),
&message.payload,
) {
continue;
}
frames.push(StreamFrame::Notification {
notification: message,
kind: DeliveryKind::Replay,
});
}
if let Some(replay_limit_info) = &batch_result.replay_limit {
frames.push(StreamFrame::Control(ControlEvent::ReplayLimitReached {
topic: params.topic.clone(),
max_allowed: replay_limit_info.max_allowed,
timestamp: Utc::now(),
}));
}
if delay_ms > 0 && has_more {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
Some((
tokio_stream::iter(frames),
(
backend,
params,
has_more,
delay_ms,
request_params,
request_constraints,
),
))
}
Err(e) => {
warn!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "stream.replay.batch.failed",
error = %e,
topic = %decode_subject_for_display(¶ms.topic),
"Failed to retrieve historical message batch"
);
let error_frames = vec![StreamFrame::Error {
topic: params.topic.clone(),
message: e.to_string(),
}];
Some((
tokio_stream::iter(error_frames),
(
backend,
params,
false,
delay_ms,
request_params,
request_constraints,
),
))
}
}
},
)
.flatten() }
pub async fn create_historical_then_live_stream(
topic: String,
backend: Arc<dyn NotificationBackend>,
start_at: StartAt,
shutdown: web::Data<CancellationToken>,
request_params: Arc<std::collections::HashMap<String, String>>,
request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
sse_guard: Option<crate::metrics::SseConnectionGuard>,
) -> Result<HttpResponse> {
let watch_config = Settings::get_global_watch_settings();
let app_settings = Settings::get_global_application_settings();
let historical_stream = create_historical_replay_stream(
topic.clone(),
backend.clone(),
start_at,
request_params.clone(),
request_constraints.clone(),
);
let (from_sequence, from_date) = start_at.as_replay_cursor();
let start_event = StreamFrame::Control(ControlEvent::ReplayStarted {
topic: topic.clone(),
from_sequence,
from_date,
batch_size: watch_config.replay_batch_size,
timestamp: chrono::Utc::now(),
});
let completion_event = StreamFrame::Control(ControlEvent::ReplayCompleted {
topic: topic.clone(),
timestamp: chrono::Utc::now(),
});
let notification_stream = backend.subscribe_to_topic(&topic).await?;
let request_params_clone = request_params.clone();
let request_constraints_clone = request_constraints.clone();
let filtered_stream = futures_util::StreamExt::filter_map(
notification_stream,
move |message: NotificationMessage| {
super::live::filter_notification_message(
message,
request_params_clone.clone(),
request_constraints_clone.clone(),
)
},
);
let live_notification_sse_stream = super::live::create_live_notification_stream(
filtered_stream,
watch_config.concurrent_notification_processing,
);
let heartbeat_stream =
create_heartbeat_stream(topic.clone(), watch_config.sse_heartbeat_interval_sec);
let combined_notification_stream = FuturesStreamExt::chain(
FuturesStreamExt::chain(tokio_stream::once(start_event), historical_stream),
FuturesStreamExt::chain(
tokio_stream::once(completion_event),
live_notification_sse_stream,
),
);
let merged_stream = TokioStreamExt::merge(combined_notification_stream, heartbeat_stream);
let stream_with_lifecycle = apply_stream_lifecycle(
merged_stream,
topic.clone(),
shutdown.get_ref().clone(),
Some(Duration::from_secs(
watch_config.connection_max_duration_sec,
)),
);
let base_url = app_settings.base_url.clone();
let byte_stream = FuturesStreamExt::map(stream_with_lifecycle, move |frame| {
frame_to_sse_bytes(frame, &base_url)
});
tracing::info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "stream.watch.replay_live.created",
topic = %decode_subject_for_display(&topic),
from_sequence = ?from_sequence,
from_date = ?from_date,
batch_size = watch_config.replay_batch_size,
"Created combined historical-then-live SSE stream"
);
Ok(create_sse_response(byte_stream, sse_guard))
}
pub async fn create_replay_only_stream(
topic: String,
backend: Arc<dyn NotificationBackend>,
start_at: StartAt,
shutdown: web::Data<CancellationToken>,
request_params: Arc<std::collections::HashMap<String, String>>,
request_constraints: Arc<std::collections::HashMap<String, IdentifierConstraint>>,
sse_guard: Option<crate::metrics::SseConnectionGuard>,
) -> Result<HttpResponse> {
let watch_config = Settings::get_global_watch_settings();
let historical_stream = create_historical_replay_stream(
topic.clone(),
backend.clone(),
start_at,
request_params.clone(),
request_constraints.clone(),
);
let (from_sequence, from_date) = start_at.as_replay_cursor();
let start_event = StreamFrame::Control(ControlEvent::ReplayStarted {
topic: topic.clone(),
from_sequence,
from_date,
batch_size: watch_config.replay_batch_size,
timestamp: Utc::now(),
});
let completion_event = StreamFrame::Control(ControlEvent::ReplayCompleted {
topic: topic.clone(),
timestamp: chrono::Utc::now(),
});
let replay_stream = FuturesStreamExt::chain(
FuturesStreamExt::chain(tokio_stream::once(start_event), historical_stream),
tokio_stream::once(completion_event),
);
let stream_with_lifecycle = apply_stream_lifecycle(
replay_stream,
topic.clone(),
shutdown.get_ref().clone(),
None,
);
let app_settings = Settings::get_global_application_settings();
let base_url = app_settings.base_url.clone();
let byte_stream = FuturesStreamExt::map(stream_with_lifecycle, move |frame| {
frame_to_sse_bytes(frame, &base_url)
});
tracing::info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "stream.replay.created",
topic = %decode_subject_for_display(&topic),
from_sequence = ?from_sequence,
from_date = ?from_date,
batch_size = watch_config.replay_batch_size,
"Created replay-only SSE stream"
);
Ok(create_sse_response(byte_stream, sse_guard))
}