use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use axum::extract::Extension;
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::response::{Html, IntoResponse, Json};
use serde::Serialize;
use tokio::sync::broadcast;
use tokio_stream::Stream;
const DASHBOARD_HTML: &str = include_str!("../assets/dashboard.html");
#[derive(Clone, Debug)]
pub struct DashboardIdentity {
pub version: String,
pub platform: String,
}
#[derive(Clone, Debug)]
pub struct DashboardSseEvent {
pub event_type: String,
pub id: String,
pub data: serde_json::Value,
}
pub type SnapshotFn =
Arc<dyn Fn() -> Pin<Box<dyn Future<Output = serde_json::Value> + Send>> + Send + Sync>;
#[derive(Clone)]
pub struct DashboardState {
pub identity: DashboardIdentity,
pub mode: &'static str,
pub snapshot_fn: SnapshotFn,
pub event_tx: broadcast::Sender<DashboardSseEvent>,
pub started_at: Instant,
}
#[derive(Debug, Serialize)]
struct DashboardSnapshot {
version: String,
platform: String,
hostname: String,
hostname_fqdn: String,
uptime_secs: u64,
mode: String,
#[serde(flatten)]
details: serde_json::Value,
}
fn dashboard_event_stream(state: DashboardState) -> impl Stream<Item = Result<Event, Infallible>> {
async_stream::stream! {
let mut rx = state.event_tx.subscribe();
let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(15));
heartbeat.tick().await;
loop {
let event = tokio::select! {
result = rx.recv() => {
match result {
Ok(ev) => {
Event::default()
.event(&ev.event_type)
.id(ev.id)
.json_data(ev.data)
.ok()
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(dropped = n, "Dashboard SSE stream lagged");
continue;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
},
_ = heartbeat.tick() => {
Event::default()
.event("heartbeat")
.json_data(serde_json::json!({
"uptime_secs": state.started_at.elapsed().as_secs()
})).ok()
},
};
if let Some(ev) = event {
yield Ok(ev);
}
}
}
}
pub async fn get_dashboard() -> Html<&'static str> {
Html(DASHBOARD_HTML)
}
pub async fn get_snapshot(Extension(state): Extension<DashboardState>) -> impl IntoResponse {
let hostname = hostname::get()
.ok()
.and_then(|os| os.into_string().ok())
.unwrap_or_else(|| "unknown".to_string());
let hostname_fqdn = format!("{hostname}.local");
let details = (state.snapshot_fn)().await;
Json(DashboardSnapshot {
version: state.identity.version.clone(),
platform: state.identity.platform.clone(),
hostname,
hostname_fqdn,
uptime_secs: state.started_at.elapsed().as_secs(),
mode: state.mode.to_string(),
details,
})
}
pub async fn get_events(
Extension(state): Extension<DashboardState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
Sse::new(dashboard_event_stream(state)).keep_alive(KeepAlive::default())
}