use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum::Json;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::auth::require_bearer;
use crate::error::AppError;
use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
use crate::state::AppState;
const DEFAULT_LIMIT: usize = 100;
const MAX_LIMIT: usize = 1000;
const FETCH_TIMEOUT_MS: u64 = 250;
const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);
#[derive(Debug, Deserialize, Default)]
pub struct EventsQuery {
#[serde(default)]
pub since: Option<u64>,
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct EventEnvelope {
pub seq: u64,
pub event: serde_json::Value,
}
#[derive(Debug, Serialize)]
pub struct EventsResponse {
pub events: Vec<EventEnvelope>,
pub cursor: u64,
}
pub async fn list_events(
State(state): State<AppState>,
headers: HeaderMap,
Query(query): Query<EventsQuery>,
) -> Result<Json<EventsResponse>, AppError> {
require_bearer(&headers, &state.api_token)?;
let since = query.since.unwrap_or(0);
let limit = clamp_limit(query.limit);
let Some(ctx) = state.jetstream.clone() else {
debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
return Ok(Json(EventsResponse {
events: Vec::new(),
cursor: state.cursor().max(since),
}));
};
let mut stream = match tokio::time::timeout(
NATS_REQUEST_TIMEOUT,
ctx.get_stream(crate::jetstream::STREAM_NAME),
)
.await
{
Ok(Ok(s)) => s,
Ok(Err(e)) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
error = %e,
stage = "get_stream",
"GET /v1/events: JetStream request failed; returning 503",
);
return Err(AppError::service_unavailable());
}
Err(_elapsed) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "get_stream",
"GET /v1/events: JetStream get_stream timed out; returning 503",
);
return Err(AppError::service_unavailable());
}
};
let policy = if since == 0 {
deliver_policy_for(None)
} else {
deliver_policy_for(Some(since))
};
let consumer = match tokio::time::timeout(
NATS_REQUEST_TIMEOUT,
create_ephemeral_consumer(&stream, policy, None),
)
.await
{
Ok(Ok(c)) => c,
Ok(Err(e)) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
error = %format!("{e:#}"),
stage = "create_ephemeral_consumer",
"GET /v1/events: JetStream consumer create failed; returning 503",
);
return Err(AppError::service_unavailable());
}
Err(_elapsed) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "create_ephemeral_consumer",
"GET /v1/events: JetStream consumer create timed out; returning 503",
);
return Err(AppError::service_unavailable());
}
};
let mut batch = match tokio::time::timeout(
NATS_REQUEST_TIMEOUT,
consumer
.fetch()
.max_messages(limit)
.expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
.messages(),
)
.await
{
Ok(Ok(b)) => b,
Ok(Err(e)) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
error = %format!("{e:#}"),
stage = "fetch_batch",
"GET /v1/events: JetStream batch fetch failed; returning 503",
);
return Err(AppError::service_unavailable());
}
Err(_elapsed) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "fetch_batch",
"GET /v1/events: JetStream batch fetch timed out; returning 503",
);
return Err(AppError::service_unavailable());
}
};
let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
let mut highest: u64 = since;
while let Some(msg) = batch.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "GET /v1/events: batch read error; ending page");
break;
}
};
let seq = match msg.info() {
Ok(info) => info.stream_sequence,
Err(e) => {
warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
continue;
}
};
let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
Ok(v) => v,
Err(e) => {
warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
continue;
}
};
if seq > highest {
highest = seq;
}
envelopes.push(EventEnvelope {
seq,
event: event_value,
});
if envelopes.len() >= limit {
break;
}
}
let tail: Option<u64> = match tokio::time::timeout(NATS_REQUEST_TIMEOUT, stream.info()).await {
Ok(Ok(info)) => Some(info.state.last_sequence),
Ok(Err(e)) => {
debug!(
stream = crate::jetstream::STREAM_NAME,
error = %e,
stage = "stream_info",
"GET /v1/events: stream.info() failed; cursor will not be clamped",
);
None
}
Err(_elapsed) => {
debug!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "stream_info",
"GET /v1/events: stream.info() timed out; cursor will not be clamped",
);
None
}
};
Ok(Json(EventsResponse {
events: envelopes,
cursor: clamp_cursor(highest, tail),
}))
}
pub(crate) fn clamp_cursor(supplied: u64, tail: Option<u64>) -> u64 {
match tail {
Some(t) => supplied.min(t),
None => supplied,
}
}
pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
match requested {
None => DEFAULT_LIMIT,
Some(0) => DEFAULT_LIMIT, Some(n) if n > MAX_LIMIT => MAX_LIMIT,
Some(n) => n,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn clamp_limit_default_when_unset() {
assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
}
#[test]
fn clamp_limit_default_when_zero() {
assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
}
#[test]
fn clamp_limit_caps_at_max() {
assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
}
#[test]
fn clamp_limit_passes_through_in_range() {
assert_eq!(clamp_limit(Some(1)), 1);
assert_eq!(clamp_limit(Some(50)), 50);
assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
}
#[test]
fn clamp_cursor_clamps_future_cursor_to_stream_tail() {
assert_eq!(clamp_cursor(1_000_000, Some(42)), 42);
assert_eq!(clamp_cursor(43, Some(42)), 42);
}
#[test]
fn clamp_cursor_passes_through_in_range() {
assert_eq!(clamp_cursor(10, Some(42)), 10);
assert_eq!(clamp_cursor(42, Some(42)), 42);
assert_eq!(clamp_cursor(0, Some(42)), 0);
}
#[test]
fn clamp_cursor_no_op_when_tail_unknown() {
assert_eq!(clamp_cursor(0, None), 0);
assert_eq!(clamp_cursor(42, None), 42);
assert_eq!(clamp_cursor(1_000_000, None), 1_000_000);
}
}