Skip to main content

koi_common/
dashboard.rs

1//! Dashboard adapter — system-level operational overview.
2//!
3//! Provides the shared dashboard infrastructure (HTML serving, snapshot
4//! endpoint, SSE event stream) that both the standalone daemon and
5//! embedded mode can mount.  Domain-specific logic is injected via a
6//! boxed async closure (snapshot) and a broadcast channel (events).
7//!
8//! This module owns zero domain logic.  All data flows through the
9//! abstractions the caller provides.
10
11use std::convert::Infallible;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use axum::extract::Extension;
18use axum::response::sse::{Event, KeepAlive, Sse};
19use axum::response::{Html, IntoResponse, Json};
20use serde::Serialize;
21use tokio::sync::broadcast;
22use tokio_stream::Stream;
23
24// ── HTML asset ──────────────────────────────────────────────────────
25
26const DASHBOARD_HTML: &str = include_str!("../assets/dashboard.html");
27
28// ── Public types ────────────────────────────────────────────────────
29
30/// Identity information injected by the caller so the snapshot reflects
31/// the host binary's version, not koi-common's.
32#[derive(Clone, Debug)]
33pub struct DashboardIdentity {
34    pub version: String,
35    pub platform: String,
36}
37
38/// A domain-agnostic SSE event forwarded by the caller.
39#[derive(Clone, Debug)]
40pub struct DashboardSseEvent {
41    /// SSE event type (e.g. "mdns.found", "health.changed").
42    pub event_type: String,
43    /// Unique event ID (typically UUID v7).
44    pub id: String,
45    /// JSON payload.
46    pub data: serde_json::Value,
47}
48
49/// Type alias for the async snapshot closure.
50///
51/// The caller provides a closure that queries all domain cores and
52/// returns a complete JSON snapshot.  koi-common wraps it with
53/// identity / uptime / mode metadata.
54pub type SnapshotFn =
55    Arc<dyn Fn() -> Pin<Box<dyn Future<Output = serde_json::Value> + Send>> + Send + Sync>;
56
57/// Shared state for the dashboard routes.
58///
59/// Construct this in the binary crate or koi-embedded and inject it
60/// via `axum::Extension`.
61#[derive(Clone)]
62pub struct DashboardState {
63    pub identity: DashboardIdentity,
64    pub mode: &'static str,
65    pub snapshot_fn: SnapshotFn,
66    pub event_tx: broadcast::Sender<DashboardSseEvent>,
67    pub started_at: Instant,
68}
69
70// ── Snapshot envelope ───────────────────────────────────────────────
71
72#[derive(Debug, Serialize)]
73struct DashboardSnapshot {
74    version: String,
75    platform: String,
76    hostname: String,
77    hostname_fqdn: String,
78    uptime_secs: u64,
79    mode: String,
80    #[serde(flatten)]
81    details: serde_json::Value,
82}
83
84// ── SSE stream ──────────────────────────────────────────────────────
85
86fn dashboard_event_stream(state: DashboardState) -> impl Stream<Item = Result<Event, Infallible>> {
87    async_stream::stream! {
88        let mut rx = state.event_tx.subscribe();
89        let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(15));
90        heartbeat.tick().await; // skip immediate tick
91
92        loop {
93            let event = tokio::select! {
94                result = rx.recv() => {
95                    match result {
96                        Ok(ev) => {
97                            Event::default()
98                                .event(&ev.event_type)
99                                .id(ev.id)
100                                .json_data(ev.data)
101                                .ok()
102                        }
103                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
104                            tracing::warn!(dropped = n, "Dashboard SSE stream lagged");
105                            continue;
106                        }
107                        Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
108                    }
109                },
110                _ = heartbeat.tick() => {
111                    Event::default()
112                        .event("heartbeat")
113                        .json_data(serde_json::json!({
114                            "uptime_secs": state.started_at.elapsed().as_secs()
115                        })).ok()
116                },
117            };
118
119            if let Some(ev) = event {
120                yield Ok(ev);
121            }
122        }
123    }
124}
125
126// ── Handlers ────────────────────────────────────────────────────────
127
128/// `GET /` — Serve the dashboard SPA.
129pub async fn get_dashboard() -> Html<&'static str> {
130    Html(DASHBOARD_HTML)
131}
132
133/// `GET /v1/dashboard/snapshot` — System-level JSON snapshot.
134pub async fn get_snapshot(Extension(state): Extension<DashboardState>) -> impl IntoResponse {
135    let hostname = hostname::get()
136        .ok()
137        .and_then(|os| os.into_string().ok())
138        .unwrap_or_else(|| "unknown".to_string());
139    let hostname_fqdn = format!("{hostname}.local");
140
141    let details = (state.snapshot_fn)().await;
142
143    Json(DashboardSnapshot {
144        version: state.identity.version.clone(),
145        platform: state.identity.platform.clone(),
146        hostname,
147        hostname_fqdn,
148        uptime_secs: state.started_at.elapsed().as_secs(),
149        mode: state.mode.to_string(),
150        details,
151    })
152}
153
154/// `GET /v1/dashboard/events` — Unified SSE event stream.
155pub async fn get_events(
156    Extension(state): Extension<DashboardState>,
157) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
158    Sse::new(dashboard_event_stream(state)).keep_alive(KeepAlive::default())
159}