use anyhow::{Context, Result};
use tokio_stream::StreamExt;
use tracing::{debug, info, warn};
use crate::configuration::Settings;
use crate::notification::decode_subject_for_display;
use crate::notification::topic_parser::derive_stream_name_from_topic;
use crate::notification::wildcard_matcher::{analyze_watch_pattern, matches_watch_pattern};
use crate::notification_backend::jetstream::{
backend::JetStreamBackend, subscriber_utils::transform_jetstream_message,
};
use crate::notification_backend::replay::{BatchParams, StartAt};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
use crate::types::{BatchResult, ReplayLimitInfo};
pub async fn get_messages_batch(
backend: &JetStreamBackend,
params: BatchParams,
) -> Result<BatchResult> {
let (backend_pattern, app_filter_pattern) = analyze_watch_pattern(¶ms.topic)?;
debug!(
topic = %params.topic,
backend_pattern = %backend_pattern,
start_at = ?params.start_at,
limit = params.limit,
"Starting JetStream batch retrieval with deterministic approach"
);
backend
.ensure_stream_for_topic(&backend_pattern)
.await
.context("Failed to ensure stream exists for batch retrieval")?;
let stream_name = derive_stream_name_from_topic(&backend_pattern)
.context("Failed to derive stream name from topic")?;
let mut stream = backend
.jetstream
.get_stream(&stream_name)
.await
.context("Failed to get stream")?;
let stream_info = stream.info().await.context("Failed to get stream info")?;
debug!(
stream_name = %stream_name,
total_messages = stream_info.state.messages,
first_sequence = stream_info.state.first_sequence,
last_sequence = stream_info.state.last_sequence,
"Stream info retrieved"
);
if stream_info.state.messages == 0 {
debug!("No messages available in stream");
return Ok(BatchResult::empty());
}
let consumer =
create_pull_consumer(backend, &stream_name, &backend_pattern, params.start_at).await?;
let mut messages = consumer
.fetch()
.max_messages(params.limit)
.messages()
.await
.context("Failed to fetch messages")?;
let mut filtered_messages = Vec::new();
let mut last_processed_sequence = None;
while let Some(msg_result) = messages.next().await {
match msg_result {
Ok(msg) => {
if let Ok(info) = msg.info() {
last_processed_sequence = Some(info.stream_sequence);
}
match transform_jetstream_message(&msg) {
Ok(notification) => {
if matches_watch_pattern(¬ification.topic, &app_filter_pattern) {
filtered_messages.push(notification);
}
}
Err(e) => {
warn!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "backend.jetstream.replay.message_transform.failed",
error = %e,
subject = %msg.subject,
"Failed to transform message"
);
}
}
}
Err(e) => {
debug!(error = %e, "Message error during batch retrieval");
}
}
if filtered_messages.len() >= params.limit {
break;
}
}
let has_more = if let Some(last_seq) = last_processed_sequence {
if filtered_messages.len() < params.limit {
last_seq < stream_info.state.last_sequence
} else {
true
}
} else {
false
};
debug!(
retrieved_count = filtered_messages.len(),
requested_limit = params.limit,
last_processed_sequence = ?last_processed_sequence,
stream_last_sequence = stream_info.state.last_sequence,
has_more = has_more,
"Batch processing completed"
);
let watch_config = Settings::get_global_watch_settings();
let effective_limit = params.limit.min(watch_config.max_historical_notifications);
let was_replay_limited = filtered_messages.len() > effective_limit;
if was_replay_limited {
warn!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "backend.jetstream.replay.limit.reached",
retrieved_messages = filtered_messages.len(),
max_allowed = effective_limit,
topic = %decode_subject_for_display(¶ms.topic),
"Replay message count limit is reached"
);
}
filtered_messages.truncate(effective_limit);
let mut batch_result = BatchResult::new(filtered_messages, params.limit);
batch_result.has_more = has_more && !was_replay_limited; batch_result.next_sequence = last_processed_sequence.map(|seq| seq + 1);
if was_replay_limited {
batch_result.replay_limit = Some(ReplayLimitInfo {
max_allowed: watch_config.max_historical_notifications,
});
}
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "backend.jetstream.replay.batch.succeeded",
topic = %decode_subject_for_display(¶ms.topic),
stream_name = %stream_name,
retrieved_count = batch_result.batch_size,
has_more = batch_result.has_more,
last_sequence = ?batch_result.last_sequence,
"JetStream batch retrieval completed using deterministic approach"
);
Ok(batch_result)
}
async fn create_pull_consumer(
backend: &JetStreamBackend,
stream_name: &str,
backend_pattern: &str,
start_at: StartAt,
) -> Result<async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>>
{
use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy};
let deliver_policy = determine_deliver_policy(start_at)?;
let consumer_config = async_nats::jetstream::consumer::pull::Config {
name: Some(format!(
"replay_consumer_{}_{}",
chrono::Utc::now().timestamp_millis(),
uuid::Uuid::new_v4().simple()
)),
durable_name: None, description: Some(format!("Replay consumer for pattern: {}", backend_pattern)),
filter_subject: backend_pattern.to_string(),
deliver_policy,
ack_policy: AckPolicy::None, replay_policy: ReplayPolicy::Instant, max_deliver: 1,
..Default::default()
};
debug!(
consumer_config = ?consumer_config,
"Creating pull consumer with configuration"
);
let consumer = backend
.jetstream
.create_consumer_on_stream(consumer_config, stream_name)
.await
.context("Failed to create JetStream consumer for batch retrieval")?;
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "backend.jetstream.replay.consumer.created",
stream_name = %stream_name,
backend_pattern = %backend_pattern,
deliver_policy = ?deliver_policy,
consumer_name = consumer.cached_info().name,
"Successfully created ephemeral pull consumer"
);
Ok(consumer)
}
fn determine_deliver_policy(
start_at: StartAt,
) -> Result<async_nats::jetstream::consumer::DeliverPolicy> {
use async_nats::jetstream::consumer::DeliverPolicy;
match start_at {
StartAt::Date(start_date) => {
let nanos = start_date
.timestamp_nanos_opt()
.context("from_date is outside supported timestamp range")?;
let start_time = time::OffsetDateTime::from_unix_timestamp_nanos(i128::from(nanos))
.context("from_date could not be converted to JetStream start time")?;
debug!(
start_time = ?start_time,
"Using ByStartTime delivery policy"
);
Ok(DeliverPolicy::ByStartTime { start_time })
}
StartAt::Sequence(seq) => match seq {
0 => {
debug!("Using DeliverPolicy::All for no replay start parameter");
Ok(DeliverPolicy::All)
}
_ => {
debug!(
start_sequence = seq,
"Using ByStartSequence delivery policy"
);
Ok(DeliverPolicy::ByStartSequence {
start_sequence: seq,
})
}
},
StartAt::LiveOnly => {
debug!("Using DeliverPolicy::All for no replay start parameter");
Ok(DeliverPolicy::All)
}
}
}
#[cfg(test)]
mod tests {
use super::determine_deliver_policy;
use crate::notification_backend::replay::StartAt;
use chrono::{DateTime, Utc};
#[test]
fn policy_prefers_sequence_when_both_sequence_and_date_present() {
let deliver_policy = determine_deliver_policy(StartAt::Sequence(42)).unwrap();
assert!(matches!(
deliver_policy,
async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence { start_sequence: 42 }
));
}
#[test]
fn policy_uses_start_time_when_only_date_is_present() {
let boundary = DateTime::parse_from_rfc3339("2025-06-09T13:15:00Z")
.unwrap()
.with_timezone(&Utc);
let deliver_policy = determine_deliver_policy(StartAt::Date(boundary)).unwrap();
assert!(matches!(
deliver_policy,
async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { .. }
));
}
#[test]
fn policy_uses_all_when_no_replay_parameters_are_present() {
let deliver_policy = determine_deliver_policy(StartAt::LiveOnly).unwrap();
assert!(matches!(
deliver_policy,
async_nats::jetstream::consumer::DeliverPolicy::All
));
}
}