use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::Row;
use std::collections::HashMap;
use crate::AppState;
const LAST_24H_CACHE_KEY: &str = "metrics:last_24h";
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub(super) struct Last24hMetrics {
pub(super) requests_last_24h: i64,
pub(super) management_mutations_last_24h: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct Last24hMetricsCacheEntry {
cached_at_epoch_seconds: i64,
metrics: Last24hMetrics,
}
#[derive(Debug, Clone, Default)]
pub(super) struct DeferredQueueMetrics {
pub(super) storage_up: bool,
pub(super) total: u64,
pub(super) by_status: HashMap<String, u64>,
pub(super) oldest_age_seconds: f64,
}
pub(super) async fn last_24h_metrics(app_state: &AppState) -> Last24hMetrics {
if let Some(cached) = app_state.cache.get(LAST_24H_CACHE_KEY).await
&& let Ok(value) = serde_json::from_value::<Last24hMetricsCacheEntry>(cached)
&& epoch_seconds() - value.cached_at_epoch_seconds <= 30
{
return value.metrics;
}
let Some(logging_client_name) = app_state.logging_client_name.as_ref() else {
return Last24hMetrics::default();
};
let Some(pool) = app_state.pg_registry.get_pool(logging_client_name) else {
return Last24hMetrics::default();
};
let requests_last_24h: i64 = sqlx::query(
r#"
SELECT COUNT(*) AS total
FROM gateway_request_log
WHERE to_timestamp(time) >= now() - interval '24 hours'
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<i64, _>("total").ok())
.unwrap_or_default();
let management_mutations_last_24h = sqlx::query(
r#"
SELECT COUNT(*) AS total
FROM gateway_operation_log
WHERE path LIKE '/management/%'
AND to_timestamp(time) >= now() - interval '24 hours'
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<i64, _>("total").ok())
.unwrap_or_default();
let value: Last24hMetrics = Last24hMetrics {
requests_last_24h,
management_mutations_last_24h,
};
app_state
.cache
.insert(
LAST_24H_CACHE_KEY.to_string(),
json!(Last24hMetricsCacheEntry {
cached_at_epoch_seconds: epoch_seconds(),
metrics: value.clone(),
}),
)
.await;
value
}
pub(super) async fn deferred_queue_metrics(app_state: &AppState) -> DeferredQueueMetrics {
let Some(logging_client_name) = app_state.logging_client_name.as_ref() else {
return DeferredQueueMetrics::default();
};
let Some(pool) = app_state.pg_registry.get_pool(logging_client_name) else {
return DeferredQueueMetrics::default();
};
let mut metrics: DeferredQueueMetrics = DeferredQueueMetrics {
storage_up: true,
..DeferredQueueMetrics::default()
};
let rows: Result<Vec<sqlx::postgres::PgRow>, sqlx::Error> = sqlx::query(
r#"
SELECT status, COUNT(*)::bigint AS total
FROM public.gateway_deferred_request_queue
GROUP BY status
"#,
)
.fetch_all(&pool)
.await;
let rows: Vec<sqlx::postgres::PgRow> = match rows {
Ok(rows) => rows,
Err(_) => {
metrics.storage_up = false;
return metrics;
}
};
for row in rows {
let status: String = row
.try_get("status")
.unwrap_or_else(|_| "unknown".to_string());
let count: i64 = row.try_get("total").unwrap_or_default();
let count: u64 = count.max(0) as u64;
metrics.total = metrics.total.saturating_add(count);
metrics.by_status.insert(status, count);
}
metrics.oldest_age_seconds = sqlx::query(
r#"
SELECT COALESCE(EXTRACT(EPOCH FROM (now() - MIN(created_at))), 0) AS oldest_age_seconds
FROM public.gateway_deferred_request_queue
WHERE status IN ('queued', 'running')
"#,
)
.fetch_one(&pool)
.await
.ok()
.and_then(|row| row.try_get::<f64, _>("oldest_age_seconds").ok())
.unwrap_or(0.0)
.max(0.0);
metrics
}
fn epoch_seconds() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}