use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::{Html, IntoResponse, Response},
routing::get,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tower_http::{
cors::CorsLayer,
services::ServeDir,
trace::{DefaultMakeSpan, TraceLayer},
};
use crate::storage::{
CollectorRecord, CollectorSource, CollectorStore, CollectorType, Event, EventQuery,
EventReader, EventSource, MetricCategory, MetricQuery, MetricReader, SortOrder,
};
#[derive(Clone)]
pub struct AppState {
pub metric_reader: MetricReader,
pub event_reader: EventReader,
pub collector_store: CollectorStore,
}
#[derive(Serialize)]
struct HealthResponse {
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
db: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct MetricsQueryParams {
pub category: Option<String>,
pub name: Option<String>,
pub target: Option<String>,
pub limit: Option<u32>,
pub order: Option<String>,
pub range: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct EventsQueryParams {
pub source: Option<String>,
pub kind: Option<String>,
pub severity: Option<String>,
pub limit: Option<u32>,
pub order: Option<String>,
pub range: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct StatsQueryParams {
pub range: Option<String>,
}
fn parse_sort_order(s: Option<String>) -> Option<SortOrder> {
s.and_then(|order| match order.to_lowercase().as_str() {
"asc" => Some(SortOrder::Asc),
"desc" => Some(SortOrder::Desc),
_ => None,
})
}
fn parse_range(range: Option<String>) -> Option<chrono::DateTime<chrono::Utc>> {
use chrono::TimeDelta;
let range = range?;
let now = chrono::Utc::now();
match range.as_str() {
"1h" => TimeDelta::try_hours(1).map(|d| now - d),
"6h" => TimeDelta::try_hours(6).map(|d| now - d),
"12h" => TimeDelta::try_hours(12).map(|d| now - d),
"24h" => TimeDelta::try_hours(24).map(|d| now - d),
"7d" => TimeDelta::try_days(7).map(|d| now - d),
"30d" => TimeDelta::try_days(30).map(|d| now - d),
_ => None,
}
}
use askama::Template;
#[derive(Template)]
#[template(path = "dashboard.html")]
struct DashboardTemplate;
#[derive(Template)]
#[template(path = "partials/metrics.html")]
struct MetricsTemplate {
metrics: Vec<crate::storage::MetricResult>,
}
#[derive(Template)]
#[template(path = "partials/events.html")]
struct EventsTemplate {
events: Vec<Event>,
}
impl EventsTemplate {}
#[derive(Template)]
#[template(path = "partials/collectors.html")]
struct CollectorsTemplate {
collectors: Vec<CollectorRecord>,
page: u32,
page_size: u32,
total_count: u64,
total_pages: u32,
count: u32,
}
impl CollectorsTemplate {
fn format_timestamp(ts_millis: &i64) -> String {
chrono::DateTime::from_timestamp_millis(*ts_millis)
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| "-".to_string())
}
}
struct HtmlTemplate<T>(T);
impl<T> IntoResponse for HtmlTemplate<T>
where
T: Template,
{
fn into_response(self) -> Response {
match self.0.render() {
Ok(rendered) => Html(rendered).into_response(),
Err(err) => {
tracing::error!(error = %err, "Template render failed");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}
pub fn create_router(state: AppState) -> Router {
let app_state = Arc::new(state);
Router::new()
.route("/", get(dashboard_handler))
.route("/healthz", get(healthz_handler))
.route("/readyz", get(readyz_handler))
.route("/api/metrics", get(metrics_handler))
.route("/api/metrics/stats", get(metrics_stats_handler))
.route("/api/events", get(events_handler))
.route("/api/collectors/html", get(collectors_html_handler))
.route(
"/api/collectors",
get(list_collectors_handler).post(create_collector_handler),
)
.route(
"/api/collectors/:type/:name",
get(get_collector_handler)
.put(update_collector_handler)
.delete(delete_collector_handler),
)
.nest_service("/static", ServeDir::new("templates/static"))
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
)
.layer(CorsLayer::permissive())
.with_state(app_state)
}
async fn dashboard_handler() -> impl IntoResponse {
DashboardTemplate
}
async fn healthz_handler() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok".to_string(),
db: None,
})
}
async fn readyz_handler(State(state): State<Arc<AppState>>) -> Response {
let db_status = state
.metric_reader
.query(MetricQuery {
limit: Some(1),
..Default::default()
})
.await
.map(|_| "ready".to_string())
.map_err(|e| e.to_string());
match db_status {
Ok(db) => Json(HealthResponse {
status: "ok".to_string(),
db: Some(db),
})
.into_response(),
Err(err) => {
tracing::error!(error = %err, "Readiness check failed");
(
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready".to_string(),
db: Some(err),
}),
)
.into_response()
}
}
}
async fn metrics_handler(
State(state): State<Arc<AppState>>,
Query(params): Query<MetricsQueryParams>,
) -> Response {
let category = params
.category
.as_ref()
.and_then(|c| c.parse::<MetricCategory>().ok());
let query = MetricQuery {
category,
name: params.name.filter(|s| !s.is_empty()),
target: params.target.filter(|s| !s.is_empty()),
start: parse_range(params.range),
end: None,
limit: params.limit,
order: parse_sort_order(params.order),
};
match state.metric_reader.query(query).await {
Ok(metrics) => HtmlTemplate(MetricsTemplate { metrics }).into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to query metrics");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Internal server error. Please check server logs.",
)
.into_response()
}
}
}
async fn metrics_stats_handler(
State(state): State<Arc<AppState>>,
Query(params): Query<StatsQueryParams>,
) -> Response {
let start = parse_range(params.range);
match state.metric_reader.stats(start, None).await {
Ok(stats) => Json(stats).into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to query metrics stats");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Internal server error. Please check server logs.",
)
.into_response()
}
}
}
async fn events_handler(
State(state): State<Arc<AppState>>,
Query(params): Query<EventsQueryParams>,
) -> Response {
let source = params
.source
.as_ref()
.and_then(|s| s.parse::<EventSource>().ok());
let kind = params.kind.as_ref().and_then(|k| k.parse().ok());
let severity = params.severity.as_ref().and_then(|s| s.parse().ok());
let query = EventQuery {
source,
kind,
severity,
start: parse_range(params.range),
end: None,
limit: params.limit,
order: parse_sort_order(params.order),
};
match state.event_reader.query(query).await {
Ok(events) => HtmlTemplate(EventsTemplate { events }).into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to query events");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Internal server error. Please check server logs.",
)
.into_response()
}
}
}
#[derive(Debug, Deserialize)]
pub struct CollectorRequest {
#[serde(rename = "type")]
pub collector_type: String,
pub name: String,
#[serde(default = "default_enabled")]
pub enabled: bool,
#[serde(default = "default_group")]
pub group: String,
pub config: serde_json::Value,
}
fn default_enabled() -> bool {
true
}
fn default_group() -> String {
"default".to_string()
}
#[derive(Debug, Deserialize)]
pub struct CollectorPath {
#[serde(rename = "type")]
pub collector_type: String,
pub name: String,
}
#[derive(Debug, Deserialize)]
struct CollectorsPaginationQuery {
#[serde(default = "default_page")]
page: u32,
#[serde(default = "default_page_size")]
page_size: u32,
}
fn default_page() -> u32 {
1
}
fn default_page_size() -> u32 {
20
}
async fn collectors_html_handler(
State(state): State<Arc<AppState>>,
Query(params): Query<CollectorsPaginationQuery>,
) -> Response {
let page = params.page.max(1);
let page_size = params.page_size.clamp(1, 100);
match state.collector_store.list_paginated(page, page_size).await {
Ok((collectors, total_count)) => {
let total_pages = ((total_count as f64) / (page_size as f64)).ceil() as u32;
let count = collectors.len() as u32;
HtmlTemplate(CollectorsTemplate {
collectors,
page,
page_size,
total_count,
total_pages,
count,
})
.into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to list collectors");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Internal server error. Please check server logs.",
)
.into_response()
}
}
}
async fn list_collectors_handler(State(state): State<Arc<AppState>>) -> Response {
match state.collector_store.list_all().await {
Ok(collectors) => Json(collectors).into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to list collectors");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
async fn get_collector_handler(
State(state): State<Arc<AppState>>,
Path(params): Path<CollectorPath>,
) -> Response {
let collector_type = match params.collector_type.parse::<CollectorType>() {
Ok(t) => t,
Err(_) => return (StatusCode::BAD_REQUEST, "Invalid collector type").into_response(),
};
match state
.collector_store
.get(collector_type, ¶ms.name)
.await
{
Ok(Some(collector)) => Json(collector).into_response(),
Ok(None) => (StatusCode::NOT_FOUND, "Collector not found").into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to get collector");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
async fn create_collector_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<CollectorRequest>,
) -> Response {
let collector_type = match req.collector_type.parse::<CollectorType>() {
Ok(t) => t,
Err(_) => return (StatusCode::BAD_REQUEST, "Invalid collector type").into_response(),
};
if let Ok(Some(_)) = state.collector_store.get(collector_type, &req.name).await {
return (StatusCode::CONFLICT, "Collector already exists").into_response();
}
let record = CollectorRecord::from_api(
collector_type,
&req.name,
req.enabled,
&req.group,
req.config,
);
match state.collector_store.upsert(&record).await {
Ok(id) => {
tracing::info!(id = id, name = %req.name, "Created collector");
(StatusCode::CREATED, Json(record)).into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to create collector");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
async fn update_collector_handler(
State(state): State<Arc<AppState>>,
Path(params): Path<CollectorPath>,
Json(req): Json<CollectorRequest>,
) -> Response {
let collector_type = match params.collector_type.parse::<CollectorType>() {
Ok(t) => t,
Err(_) => return (StatusCode::BAD_REQUEST, "Invalid collector type").into_response(),
};
let existing = match state
.collector_store
.get(collector_type, ¶ms.name)
.await
{
Ok(Some(c)) => c,
Ok(None) => return (StatusCode::NOT_FOUND, "Collector not found").into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to get collector");
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
if existing.source != CollectorSource::Api {
return (
StatusCode::FORBIDDEN,
"Cannot update config-sourced collector via API",
)
.into_response();
}
let record = CollectorRecord::from_api(
collector_type,
¶ms.name,
req.enabled,
&req.group,
req.config,
);
match state.collector_store.upsert(&record).await {
Ok(_) => {
tracing::info!(name = %params.name, "Updated collector");
Json(record).into_response()
}
Err(e) => {
tracing::error!(error = %e, "Failed to update collector");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
async fn delete_collector_handler(
State(state): State<Arc<AppState>>,
Path(params): Path<CollectorPath>,
) -> Response {
let collector_type = match params.collector_type.parse::<CollectorType>() {
Ok(t) => t,
Err(_) => return (StatusCode::BAD_REQUEST, "Invalid collector type").into_response(),
};
let existing = match state
.collector_store
.get(collector_type, ¶ms.name)
.await
{
Ok(Some(c)) => c,
Ok(None) => return (StatusCode::NOT_FOUND, "Collector not found").into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to get collector");
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
if existing.source != CollectorSource::Api {
return (
StatusCode::FORBIDDEN,
"Cannot delete config-sourced collector via API",
)
.into_response();
}
match state
.collector_store
.delete(collector_type, ¶ms.name)
.await
{
Ok(true) => {
tracing::info!(name = %params.name, "Deleted collector");
StatusCode::NO_CONTENT.into_response()
}
Ok(false) => (StatusCode::NOT_FOUND, "Collector not found").into_response(),
Err(e) => {
tracing::error!(error = %e, "Failed to delete collector");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::StorageBuilder;
use axum::http::Request;
use tower::ServiceExt;
async fn create_test_state() -> (AppState, crate::storage::StorageHandles) {
let handles = StorageBuilder::new("sqlite::memory:")
.channel_capacity(100)
.build()
.await
.expect("Failed to build storage");
let state = AppState {
metric_reader: handles.metric_reader.clone(),
event_reader: handles.event_reader.clone(),
collector_store: handles.collector_store.clone(),
};
(state, handles)
}
#[tokio::test]
async fn test_metrics_endpoint() {
let (state, _handles) = create_test_state().await;
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/metrics?limit=10")
.body(axum::body::Body::empty())
.unwrap(),
)
.await
.unwrap();
let status = response.status();
if status != StatusCode::OK {
use axum::body::to_bytes;
let bytes = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let body = String::from_utf8_lossy(&bytes);
panic!("Expected 200 OK, got {}. Body: {}", status, body);
}
}
#[tokio::test]
async fn test_events_endpoint() {
let (state, _handles) = create_test_state().await;
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/events?limit=10")
.body(axum::body::Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
}