#![allow(unused_qualifications, reason = "False positive")]
#![allow(clippy::exhaustive_structs, reason = "Handlers have auto-generated OpenAPI documentation")]
#![allow(clippy::unused_async, reason = "Handler functions need to be async")]
#[cfg(test)]
#[path = "tests/handlers.rs"]
mod tests;
use super::{
requests::{GetStatsFeedParams, GetStatsHistoryParams, MeasurementType},
responses::{StatsHistoryResponse, StatsResponse, StatsResponseForPeriod},
state::StateProvider,
worker::StatsForPeriod,
};
use axum::{
Json,
extract::{Query, State},
extract::ws::{Message, WebSocketUpgrade, WebSocket},
response::Response,
};
use chrono::{NaiveDateTime, SubsecRound as _, Utc};
use core::{
sync::atomic::Ordering,
time::Duration,
};
use indexmap::IndexMap;
use itertools::Itertools as _;
use rubedo::{
std::IteratorExt as _,
sugar::s,
};
use serde_json::json;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::Instant,
};
use tokio::{
select,
time::interval,
};
use tracing::{info, warn};
use velcro::btree_map;
#[cfg_attr(feature = "utoipa", utoipa::path(
get,
path = "/api/stats",
tag = "health",
responses(
(status = 200, description = "Application statistics overview", body = StatsResponse),
)
))]
pub async fn get_stats<SP: StateProvider>(
State(state): State<Arc<SP>>,
) -> Json<StatsResponse> {
fn initialize_map(
periods: &HashMap<String, usize>,
buffer: &VecDeque<StatsForPeriod>,
) -> IndexMap<String, StatsForPeriod> {
let mut output: IndexMap<String, StatsForPeriod> = periods
.iter()
.sorted_by_key(|p| p.1)
.map(|(name, _)| (name.clone(), StatsForPeriod::default()))
.collect()
;
for (i, stats) in buffer.iter().enumerate() {
for (name, stats_for_period) in &mut output {
if i < periods[name] {
stats_for_period.update(stats);
}
}
}
output
}
fn convert_map(
input: IndexMap<String, StatsForPeriod>,
all: &StatsForPeriod
) -> IndexMap<String, StatsResponseForPeriod> {
let mut output: IndexMap<String, StatsResponseForPeriod> = input
.into_iter()
.map(|(key, value)| (key, StatsResponseForPeriod::from(&value)))
.collect()
;
_ = output.insert(s!("all"), StatsResponseForPeriod::from(all));
output
}
let stats_state = state.state().read().await;
let buffers = stats_state.data.buffers.read();
let timing_input = initialize_map(&state.config().periods, &buffers.responses);
let conn_input = initialize_map(&state.config().periods, &buffers.connections);
let memory_input = initialize_map(&state.config().periods, &buffers.memory);
drop(buffers);
let totals = stats_state.data.totals.lock();
let timing_output = convert_map(timing_input, &totals.times);
let conn_output = convert_map(conn_input, &totals.connections);
let memory_output = convert_map(memory_input, &totals.memory);
let now = Utc::now().naive_utc();
#[expect(clippy::arithmetic_side_effects, reason = "Nothing interesting can happen here")]
#[expect(clippy::cast_sign_loss, reason = "We don't ever want a negative for uptime")]
let response = Json(StatsResponse {
started_at: stats_state.data.started_at.trunc_subsecs(0),
last_second: *stats_state.data.last_second.read(),
uptime: (now - stats_state.data.started_at).num_seconds() as u64,
active: stats_state.data.connections.load(Ordering::Relaxed) as u64,
requests: stats_state.data.requests.load(Ordering::Relaxed) as u64,
codes: totals.codes.clone(),
times: timing_output,
endpoints: totals.endpoints.iter()
.map(|(key, value)| (key.clone(), StatsResponseForPeriod::from(value)))
.collect()
,
connections: conn_output,
memory: memory_output,
});
drop(totals);
drop(stats_state);
response
}
#[cfg_attr(feature = "utoipa", utoipa::path(
get,
path = "/api/stats/history",
tag = "health",
params(
GetStatsHistoryParams,
),
responses(
(status = 200, description = "Historical application statistics interval data", body = StatsHistoryResponse),
)
))]
pub async fn get_stats_history<SP: StateProvider>(
State(state): State<Arc<SP>>,
Query(params): Query<GetStatsHistoryParams>,
) -> Json<StatsHistoryResponse> {
fn process_buffer(
buffer: &VecDeque<StatsForPeriod>,
from: Option<NaiveDateTime>,
limit: Option<usize>,
) -> Vec<StatsResponseForPeriod> {
buffer.iter()
.take_while(|entry| from.is_none_or(|time| entry.started_at >= time))
.limit(limit)
.map(StatsResponseForPeriod::from)
.collect()
}
let stats_state = state.state().read().await;
let buffers = stats_state.data.buffers.read();
let mut response = StatsHistoryResponse {
last_second: *stats_state.data.last_second.read(),
..Default::default()
};
match params.buffer {
Some(MeasurementType::Times) => {
response.times = process_buffer(&buffers.responses, params.from, params.limit);
},
Some(MeasurementType::Connections) => {
response.connections = process_buffer(&buffers.connections, params.from, params.limit);
},
Some(MeasurementType::Memory) => {
response.memory = process_buffer(&buffers.memory, params.from, params.limit);
},
None => {
response.times = process_buffer(&buffers.responses, params.from, params.limit);
response.connections = process_buffer(&buffers.connections, params.from, params.limit);
response.memory = process_buffer(&buffers.memory, params.from, params.limit);
},
}
drop(buffers);
drop(stats_state);
Json(response)
}
#[cfg_attr(feature = "utoipa", utoipa::path(
get,
path = "/api/stats/feed",
tag = "health",
params(
GetStatsFeedParams,
),
responses(
(status = 200, description = "Application statistics event feed"),
),
))]
pub async fn get_stats_feed<SP: StateProvider>(
State(state): State<Arc<SP>>,
Query(params): Query<GetStatsFeedParams>,
ws_req: WebSocketUpgrade,
) -> Response {
ws_req.on_upgrade(move |socket| ws_stats_feed(Arc::clone(&state), socket, params.r#type))
}
#[expect(clippy::similar_names, reason = "Clearly different")]
pub async fn ws_stats_feed<SP: StateProvider>(
state: Arc<SP>,
mut ws: WebSocket,
scope: Option<MeasurementType>,
) {
info!("WebSocket connection established");
let mut rx = if let Some(ref broadcaster) = state.state().read().await.broadcaster {
broadcaster.subscribe()
} else {
warn!("Broadcast channel not available");
return;
};
let mut timer = interval(Duration::from_secs(state.config().ws_ping_interval as u64));
let mut timeout = interval(Duration::from_secs(state.config().ws_ping_timeout as u64));
let mut last_ping = None;
let mut last_pong = Instant::now();
#[expect(clippy::pattern_type_mismatch, reason = "Tokio code")]
loop { select! {
_ = timer.tick() => {
if let Err(err) = ws.send(Message::Ping(Vec::new().into())).await {
warn!("Failed to send ping over WebSocket: {err}");
break;
}
last_ping = Some(Instant::now());
},
_ = timeout.tick() => {
if let Some(ping_time) = last_ping {
let limit = Duration::from_secs(state.config().ws_ping_timeout as u64);
if last_pong < ping_time && ping_time.elapsed() > limit {
warn!("WebSocket ping timed out");
break;
}
}
},
Some(msg) = ws.recv() => {
match msg {
Ok(Message::Ping(ping)) => {
if let Err(err) = ws.send(Message::Pong(ping)).await {
warn!("Failed to send pong over WebSocket: {err}");
break;
}
}
Ok(Message::Pong(_)) => {
last_pong = Instant::now();
}
Ok(Message::Close(_)) => {
info!("WebSocket connection closed");
break;
}
Ok(Message::Text(_)) => {
warn!("Unexpected WebSocket text message");
}
Ok(Message::Binary(_)) => {
warn!("Unexpected WebSocket binary message");
}
Err(err) => {
warn!("WebSocket error: {err}");
break;
}
#[expect(unreachable_patterns, reason = "Future-proofing")]
_ => {
warn!("Unknown WebSocket message type");
}
}
}
Ok(data) = rx.recv() => {
let response = match scope {
Some(MeasurementType::Times) => {
json!{StatsResponseForPeriod::from(&data.times)}
},
Some(MeasurementType::Connections) => {
json!{StatsResponseForPeriod::from(&data.connections)}
},
Some(MeasurementType::Memory) => {
json!{StatsResponseForPeriod::from(&data.memory)}
},
None => {
json!{btree_map!{
"times": StatsResponseForPeriod::from(&data.times),
"connections": StatsResponseForPeriod::from(&data.connections),
"memory": StatsResponseForPeriod::from(&data.memory),
}}
},
};
if let Err(err) = ws.send(Message::Text(response.to_string().into())).await {
warn!("Failed to send data over WebSocket: {err}");
break;
}
}
}}
}