use super::state::StateProvider;
use axum::http::{Method, StatusCode};
use chrono::{TimeDelta, NaiveDateTime, SubsecRound as _, Utc};
use core::time::Duration;
use serde::{Serialize, Serializer};
use smart_default::SmartDefault;
use std::{
collections::VecDeque,
sync::Arc,
};
use tokio::{
select,
spawn,
sync::broadcast,
time::{interval, sleep},
};
use tracing::error;
#[cfg(feature = "utoipa")]
use utoipa::ToSchema;
#[derive(Clone, Debug, Eq, Hash, PartialEq, SmartDefault)]
#[expect(clippy::exhaustive_structs, reason = "Exhaustive")]
pub struct Endpoint {
pub path: String,
pub method: Method,
}
impl Serialize for Endpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{} {}", self.method, self.path))
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, SmartDefault)]
#[cfg_attr(feature = "utoipa", derive(ToSchema))]
#[non_exhaustive]
pub struct StatsForPeriod {
#[default(Utc::now().naive_utc())]
pub started_at: NaiveDateTime,
pub average: f64,
pub maximum: u64,
pub minimum: u64,
pub count: u64,
}
impl StatsForPeriod {
#[must_use]
pub fn initialize(value: u64) -> Self {
#[expect(clippy::cast_precision_loss, reason = "Not expected to get anywhere near 52 bits")]
Self {
average: value as f64,
maximum: value,
minimum: value,
count: 1,
..Default::default()
}
}
pub fn update(&mut self, stats: &Self) {
if (stats.minimum < self.minimum && stats.count > 0) || self.count == 0 {
self.minimum = stats.minimum;
}
if stats.maximum > self.maximum {
self.maximum = stats.maximum;
}
self.count = self.count.saturating_add(stats.count);
if self.count > 0 && stats.count > 0 {
#[expect(clippy::cast_precision_loss, reason = "Not expected to get anywhere near 52 bits")]
let weight = stats.count as f64 / self.count as f64;
self.average = self.average.mul_add(1.0 - weight, stats.average * weight);
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Serialize)]
#[cfg_attr(feature = "utoipa", derive(ToSchema))]
#[non_exhaustive]
pub struct AllStatsForPeriod {
pub times: StatsForPeriod,
pub connections: StatsForPeriod,
pub memory: StatsForPeriod,
}
#[derive(Clone, Debug, Eq, PartialEq, SmartDefault)]
#[non_exhaustive]
pub struct ResponseMetrics {
pub endpoint: Endpoint,
#[default(Utc::now().naive_utc())]
pub started_at: NaiveDateTime,
pub time_taken: u64,
pub status_code: StatusCode,
pub connections: u64,
pub memory: u64,
}
pub async fn start<SP: StateProvider>(state: &Arc<SP>) {
if !state.config().enabled {
return;
}
let appstate = Arc::clone(state);
let (sender, receiver) = flume::unbounded();
let (tx, rx) = broadcast::channel(10);
let mut stats_state = appstate.state().write().await;
stats_state.queue = Some(sender);
stats_state.broadcaster = Some(tx);
stats_state.listener = Some(rx);
let mut current_second = Utc::now().naive_utc().trunc_subsecs(0);
let mut timing_stats = StatsForPeriod::default();
let mut conn_stats = StatsForPeriod::default();
let mut memory_stats = StatsForPeriod::default();
{
let mut buffers = stats_state.data.buffers.write();
buffers.responses .reserve(appstate.config().timing_buffer_size);
buffers.connections.reserve(appstate.config().connection_buffer_size);
buffers.memory .reserve(appstate.config().memory_buffer_size);
}
drop(stats_state);
drop(spawn(async move {
sleep(
current_second
.checked_add_signed(TimeDelta::seconds(1))
.unwrap_or(current_second)
.signed_duration_since(Utc::now().naive_utc())
.to_std()
.unwrap_or(Duration::from_secs(0))
).await;
let mut timer = interval(Duration::from_secs(1));
loop { select!{
_ = timer.tick() => {
stats_processor(
&*appstate,
None,
&mut timing_stats,
&mut conn_stats,
&mut memory_stats,
&mut current_second,
).await;
}
message = receiver.recv_async() => {
if let Ok(response_metrics) = message {
stats_processor(
&*appstate,
Some(response_metrics),
&mut timing_stats,
&mut conn_stats,
&mut memory_stats,
&mut current_second,
).await;
} else {
error!("Channel has been disconnected, exiting thread.");
break;
}
}
}}
}));
}
async fn stats_processor<SP: StateProvider>(
state: &SP,
metrics: Option<ResponseMetrics>,
timing_stats: &mut StatsForPeriod,
conn_stats: &mut StatsForPeriod,
memory_stats: &mut StatsForPeriod,
current_second: &mut NaiveDateTime
) {
fn update_buffer(
buffer: &mut VecDeque<StatsForPeriod>,
buffer_size: usize,
stats: &mut StatsForPeriod,
current_second: &NaiveDateTime,
elapsed: i64,
message: &mut AllStatsForPeriod,
mut update_message: impl FnMut(&mut StatsForPeriod, &mut AllStatsForPeriod),
) {
for i in 0..elapsed {
if buffer.len() == buffer_size {
_ = buffer.pop_back();
}
stats.started_at = current_second.checked_add_signed(TimeDelta::seconds(i)).unwrap_or(*current_second);
buffer.push_front(*stats);
update_message(stats, message);
*stats = StatsForPeriod::default();
}
}
let new_second: NaiveDateTime;
#[expect(clippy::shadow_reuse, reason = "Clear purpose")]
if let Some(metrics) = metrics {
let new_timing_stats = StatsForPeriod::initialize(metrics.time_taken);
let new_conn_stats = StatsForPeriod::initialize(metrics.connections);
let new_memory_stats = StatsForPeriod::initialize(metrics.memory);
timing_stats.update(&new_timing_stats);
conn_stats .update(&new_conn_stats);
memory_stats.update(&new_memory_stats);
let stats_state = state.state().read().await;
let mut totals = stats_state.data.totals.lock();
_ = totals.codes.entry(metrics.status_code).and_modify(|e| *e = e.saturating_add(1)).or_insert(1);
totals.times.update(&new_timing_stats);
_ = totals.endpoints
.entry(metrics.endpoint)
.and_modify(|ep_stats| ep_stats.update(&new_timing_stats))
.or_insert(new_timing_stats)
;
totals.connections.update(&new_conn_stats);
totals.memory.update(&new_memory_stats);
drop(totals);
drop(stats_state);
new_second = metrics.started_at.trunc_subsecs(0);
} else {
new_second = Utc::now().naive_utc().trunc_subsecs(0);
}
if new_second > *current_second {
#[expect(clippy::arithmetic_side_effects, reason = "Nothing interesting can happen here")]
let elapsed = (new_second - *current_second).num_seconds();
let stats_state = state.state().read().await;
let mut buffers = stats_state.data.buffers.write();
let mut message = AllStatsForPeriod::default();
update_buffer(
&mut buffers.responses,
state.config().timing_buffer_size,
timing_stats,
current_second,
elapsed,
&mut message,
|stats, msg| { msg.times = *stats; },
);
update_buffer(
&mut buffers.connections,
state.config().connection_buffer_size,
conn_stats,
current_second,
elapsed,
&mut message,
|stats, msg| { msg.connections = *stats; },
);
update_buffer(
&mut buffers.memory,
state.config().memory_buffer_size,
memory_stats,
current_second,
elapsed,
&mut message,
|stats, msg| { msg.memory = *stats; },
);
drop(buffers);
*stats_state.data.last_second.write() = *current_second;
*current_second = new_second;
if let Some(ref broadcaster) = stats_state.broadcaster {
drop(broadcaster.send(message).inspect_err(|err| error!("Failed to broadcast stats: {err}")));
}
drop(stats_state);
}
}