1use std::convert::Infallible;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use axum::extract::Extension;
18use axum::response::sse::{Event, KeepAlive, Sse};
19use axum::response::{Html, IntoResponse, Json};
20use serde::Serialize;
21use tokio::sync::broadcast;
22use tokio_stream::Stream;
23
24const DASHBOARD_HTML: &str = include_str!("../assets/dashboard.html");
27
28#[derive(Clone, Debug)]
33pub struct DashboardIdentity {
34 pub version: String,
35 pub platform: String,
36}
37
38#[derive(Clone, Debug)]
40pub struct DashboardSseEvent {
41 pub event_type: String,
43 pub id: String,
45 pub data: serde_json::Value,
47}
48
49pub type SnapshotFn =
55 Arc<dyn Fn() -> Pin<Box<dyn Future<Output = serde_json::Value> + Send>> + Send + Sync>;
56
57#[derive(Clone)]
62pub struct DashboardState {
63 pub identity: DashboardIdentity,
64 pub mode: &'static str,
65 pub snapshot_fn: SnapshotFn,
66 pub event_tx: broadcast::Sender<DashboardSseEvent>,
67 pub started_at: Instant,
68}
69
70#[derive(Debug, Serialize)]
73struct DashboardSnapshot {
74 version: String,
75 platform: String,
76 hostname: String,
77 hostname_fqdn: String,
78 uptime_secs: u64,
79 mode: String,
80 #[serde(flatten)]
81 details: serde_json::Value,
82}
83
84fn dashboard_event_stream(state: DashboardState) -> impl Stream<Item = Result<Event, Infallible>> {
87 async_stream::stream! {
88 let mut rx = state.event_tx.subscribe();
89 let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(15));
90 heartbeat.tick().await; loop {
93 let event = tokio::select! {
94 result = rx.recv() => {
95 match result {
96 Ok(ev) => {
97 Event::default()
98 .event(&ev.event_type)
99 .id(ev.id)
100 .json_data(ev.data)
101 .ok()
102 }
103 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
104 tracing::warn!(dropped = n, "Dashboard SSE stream lagged");
105 continue;
106 }
107 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
108 }
109 },
110 _ = heartbeat.tick() => {
111 Event::default()
112 .event("heartbeat")
113 .json_data(serde_json::json!({
114 "uptime_secs": state.started_at.elapsed().as_secs()
115 })).ok()
116 },
117 };
118
119 if let Some(ev) = event {
120 yield Ok(ev);
121 }
122 }
123 }
124}
125
126pub async fn get_dashboard() -> Html<&'static str> {
130 Html(DASHBOARD_HTML)
131}
132
133pub async fn get_snapshot(Extension(state): Extension<DashboardState>) -> impl IntoResponse {
135 let hostname = hostname::get()
136 .ok()
137 .and_then(|os| os.into_string().ok())
138 .unwrap_or_else(|| "unknown".to_string());
139 let hostname_fqdn = format!("{hostname}.local");
140
141 let details = (state.snapshot_fn)().await;
142
143 Json(DashboardSnapshot {
144 version: state.identity.version.clone(),
145 platform: state.identity.platform.clone(),
146 hostname,
147 hostname_fqdn,
148 uptime_secs: state.started_at.elapsed().as_secs(),
149 mode: state.mode.to_string(),
150 details,
151 })
152}
153
154pub async fn get_events(
156 Extension(state): Extension<DashboardState>,
157) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
158 Sse::new(dashboard_event_stream(state)).keep_alive(KeepAlive::default())
159}