use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum::Json;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::auth::require_bearer;
use crate::error::AppError;
use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
use crate::state::AppState;
const DEFAULT_LIMIT: usize = 100;
const MAX_LIMIT: usize = 1000;
const FETCH_TIMEOUT_MS: u64 = 250;
const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);
#[derive(Debug, Deserialize, Default)]
pub struct EventsQuery {
#[serde(default)]
pub since: Option<u64>,
#[serde(default)]
pub limit: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct EventEnvelope {
pub seq: u64,
pub event: serde_json::Value,
}
#[derive(Debug, Serialize)]
pub struct EventsResponse {
pub events: Vec<EventEnvelope>,
pub cursor: u64,
}
pub async fn list_events(
State(state): State<AppState>,
headers: HeaderMap,
Query(query): Query<EventsQuery>,
) -> Result<Json<EventsResponse>, AppError> {
require_bearer(&headers, &state.api_token)?;
let since = query.since.unwrap_or(0);
let limit = clamp_limit(query.limit);
let Some(ctx) = state.jetstream.clone() else {
debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
return Ok(Json(EventsResponse {
events: Vec::new(),
cursor: state.cursor().max(since),
}));
};
let stream = match tokio::time::timeout(
NATS_REQUEST_TIMEOUT,
ctx.get_stream(crate::jetstream::STREAM_NAME),
)
.await
{
Ok(Ok(s)) => s,
Ok(Err(e)) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
error = %e,
stage = "get_stream",
"GET /v1/events: JetStream request failed; returning 503",
);
return Err(AppError::service_unavailable());
}
Err(_elapsed) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "get_stream",
"GET /v1/events: JetStream get_stream timed out; returning 503",
);
return Err(AppError::service_unavailable());
}
};
let policy = if since == 0 {
deliver_policy_for(None)
} else {
deliver_policy_for(Some(since))
};
let consumer = match tokio::time::timeout(
NATS_REQUEST_TIMEOUT,
create_ephemeral_consumer(&stream, policy, None),
)
.await
{
Ok(Ok(c)) => c,
Ok(Err(e)) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
error = %format!("{e:#}"),
stage = "create_ephemeral_consumer",
"GET /v1/events: JetStream consumer create failed; returning 503",
);
return Err(AppError::service_unavailable());
}
Err(_elapsed) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "create_ephemeral_consumer",
"GET /v1/events: JetStream consumer create timed out; returning 503",
);
return Err(AppError::service_unavailable());
}
};
let mut batch = match tokio::time::timeout(
NATS_REQUEST_TIMEOUT,
consumer
.fetch()
.max_messages(limit)
.expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
.messages(),
)
.await
{
Ok(Ok(b)) => b,
Ok(Err(e)) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
error = %format!("{e:#}"),
stage = "fetch_batch",
"GET /v1/events: JetStream batch fetch failed; returning 503",
);
return Err(AppError::service_unavailable());
}
Err(_elapsed) => {
warn!(
stream = crate::jetstream::STREAM_NAME,
timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
stage = "fetch_batch",
"GET /v1/events: JetStream batch fetch timed out; returning 503",
);
return Err(AppError::service_unavailable());
}
};
let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
let mut highest: u64 = since;
while let Some(msg) = batch.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "GET /v1/events: batch read error; ending page");
break;
}
};
let seq = match msg.info() {
Ok(info) => info.stream_sequence,
Err(e) => {
warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
continue;
}
};
let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
Ok(v) => v,
Err(e) => {
warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
continue;
}
};
if seq > highest {
highest = seq;
}
envelopes.push(EventEnvelope {
seq,
event: event_value,
});
if envelopes.len() >= limit {
break;
}
}
Ok(Json(EventsResponse {
events: envelopes,
cursor: highest,
}))
}
pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
match requested {
None => DEFAULT_LIMIT,
Some(0) => DEFAULT_LIMIT, Some(n) if n > MAX_LIMIT => MAX_LIMIT,
Some(n) => n,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn clamp_limit_default_when_unset() {
assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
}
#[test]
fn clamp_limit_default_when_zero() {
assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
}
#[test]
fn clamp_limit_caps_at_max() {
assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
}
#[test]
fn clamp_limit_passes_through_in_range() {
assert_eq!(clamp_limit(Some(1)), 1);
assert_eq!(clamp_limit(Some(50)), 50);
assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
}
}