async_inspect/dashboard/
mod.rs

1//! Web dashboard for real-time async inspection
2//!
3//! This module provides a browser-based dashboard for monitoring async tasks in real-time.
4//! It uses WebSockets to stream events and metrics to connected clients.
5//!
6//! # Features
7//!
8//! - Real-time task monitoring
9//! - Interactive timeline visualization
10//! - Live metrics dashboard
11//! - Event log with filtering
12//! - RESTful API fallback
13//!
14//! # Example
15//!
16//! ```no_run
17//! use async_inspect::dashboard::Dashboard;
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     // Start dashboard on port 8080
22//!     let dashboard = Dashboard::new(8080);
23//!     let handle = dashboard.start().await?;
24//!
25//!     println!("Dashboard running at http://localhost:8080");
26//!
27//!     // Keep running
28//!     handle.await??;
29//!     Ok(())
30//! }
31//! ```
32
33#[cfg(feature = "dashboard")]
34use crate::inspector::Inspector;
35
36#[cfg(feature = "dashboard")]
37use axum::{
38    extract::{
39        ws::{Message, WebSocket},
40        State as AxumState, WebSocketUpgrade,
41    },
42    response::{Html, IntoResponse},
43    routing::get,
44    Json, Router,
45};
46#[cfg(feature = "dashboard")]
47use serde::{Deserialize, Serialize};
48#[cfg(feature = "dashboard")]
49use std::net::SocketAddr;
50#[cfg(feature = "dashboard")]
51use std::time::{Duration, SystemTime, UNIX_EPOCH};
52#[cfg(feature = "dashboard")]
53use tokio::sync::broadcast;
54#[cfg(feature = "dashboard")]
55use tokio::task::JoinHandle;
56#[cfg(feature = "dashboard")]
57use tower_http::cors::CorsLayer;
58
59/// Dashboard event sent to connected clients
60#[cfg(feature = "dashboard")]
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum DashboardEvent {
64    /// Task was spawned
65    TaskSpawned {
66        /// Task ID
67        task_id: u64,
68        /// Task name
69        name: String,
70        /// Parent task ID
71        parent: Option<u64>,
72        /// Timestamp in milliseconds
73        timestamp: u128,
74    },
75    /// Task completed successfully
76    TaskCompleted {
77        /// Task ID
78        task_id: u64,
79        /// Duration in milliseconds
80        duration_ms: f64,
81        /// Timestamp
82        timestamp: u128,
83    },
84    /// Task failed
85    TaskFailed {
86        /// Task ID
87        task_id: u64,
88        /// Error message
89        error: Option<String>,
90        /// Timestamp
91        timestamp: u128,
92    },
93    /// Task state changed
94    StateChanged {
95        /// Task ID
96        task_id: u64,
97        /// Old state
98        old_state: String,
99        /// New state
100        new_state: String,
101        /// Timestamp
102        timestamp: u128,
103    },
104    /// Metrics snapshot
105    MetricsSnapshot {
106        /// Total tasks
107        total_tasks: usize,
108        /// Running tasks
109        running_tasks: usize,
110        /// Completed tasks
111        completed_tasks: usize,
112        /// Failed tasks
113        failed_tasks: usize,
114        /// Blocked tasks
115        blocked_tasks: usize,
116        /// Timestamp
117        timestamp: u128,
118    },
119    /// Await point started
120    AwaitStarted {
121        /// Task ID
122        task_id: u64,
123        /// Await point label
124        label: String,
125        /// Timestamp
126        timestamp: u128,
127    },
128    /// Await point ended
129    AwaitEnded {
130        /// Task ID
131        task_id: u64,
132        /// Await point label
133        label: String,
134        /// Duration in milliseconds
135        duration_ms: f64,
136        /// Timestamp
137        timestamp: u128,
138    },
139}
140
141/// Dashboard state shared across handlers
142#[cfg(feature = "dashboard")]
143#[derive(Clone)]
144struct DashboardState {
145    /// Event broadcaster
146    event_tx: broadcast::Sender<DashboardEvent>,
147    /// Inspector reference
148    inspector: &'static Inspector,
149}
150
151/// Web dashboard for real-time monitoring
152#[cfg(feature = "dashboard")]
153pub struct Dashboard {
154    /// Server port
155    port: u16,
156    /// Event broadcaster
157    event_tx: broadcast::Sender<DashboardEvent>,
158}
159
160#[cfg(feature = "dashboard")]
161impl Dashboard {
162    /// Create a new dashboard on the specified port
163    #[must_use]
164    pub fn new(port: u16) -> Self {
165        let (event_tx, _) = broadcast::channel(1000);
166
167        Self { port, event_tx }
168    }
169
170    /// Start the dashboard server
171    ///
172    /// Returns a join handle for the server task
173    pub async fn start(self) -> Result<JoinHandle<Result<(), std::io::Error>>, std::io::Error> {
174        let addr = SocketAddr::from(([127, 0, 0, 1], self.port));
175        let inspector = Inspector::global();
176
177        let state = DashboardState {
178            event_tx: self.event_tx.clone(),
179            inspector,
180        };
181
182        // Start metrics broadcaster
183        let metrics_tx = self.event_tx.clone();
184        tokio::spawn(async move {
185            let mut interval = tokio::time::interval(Duration::from_millis(100));
186
187            loop {
188                interval.tick().await;
189
190                let stats = inspector.stats();
191                let snapshot = DashboardEvent::MetricsSnapshot {
192                    total_tasks: stats.total_tasks,
193                    running_tasks: stats.running_tasks,
194                    completed_tasks: stats.completed_tasks,
195                    failed_tasks: stats.failed_tasks,
196                    blocked_tasks: stats.blocked_tasks,
197                    timestamp: SystemTime::now()
198                        .duration_since(UNIX_EPOCH)
199                        .unwrap_or_default()
200                        .as_millis(),
201                };
202
203                let _ = metrics_tx.send(snapshot);
204            }
205        });
206
207        // Build router
208        let app = Router::new()
209            .route("/", get(serve_dashboard))
210            .route("/ws", get(websocket_handler))
211            .route("/api/tasks", get(api_tasks))
212            .route("/api/stats", get(api_stats))
213            .layer(CorsLayer::permissive())
214            .with_state(state);
215
216        // Spawn server
217        let handle = tokio::spawn(async move {
218            let listener = tokio::net::TcpListener::bind(&addr).await?;
219            axum::serve(listener, app).await
220        });
221
222        Ok(handle)
223    }
224}
225
226/// Serve the dashboard HTML
227#[cfg(feature = "dashboard")]
228async fn serve_dashboard() -> Html<&'static str> {
229    Html(include_str!("static/index.html"))
230}
231
232/// WebSocket upgrade handler
233#[cfg(feature = "dashboard")]
234async fn websocket_handler(
235    ws: WebSocketUpgrade,
236    AxumState(state): AxumState<DashboardState>,
237) -> impl IntoResponse {
238    ws.on_upgrade(|socket| handle_websocket(socket, state))
239}
240
241/// Handle WebSocket connection
242#[cfg(feature = "dashboard")]
243async fn handle_websocket(mut socket: WebSocket, state: DashboardState) {
244    let mut event_rx = state.event_tx.subscribe();
245
246    // Send initial state
247    let tasks = state.inspector.get_all_tasks();
248    for task in tasks {
249        let event = DashboardEvent::TaskSpawned {
250            task_id: task.id.as_u64(),
251            name: task.name.clone(),
252            parent: task.parent.map(|p| p.as_u64()),
253            timestamp: SystemTime::now()
254                .duration_since(UNIX_EPOCH)
255                .unwrap_or_default()
256                .as_millis(),
257        };
258
259        if let Ok(json) = serde_json::to_string(&event) {
260            if socket.send(Message::Text(json)).await.is_err() {
261                return;
262            }
263        }
264    }
265
266    // Stream events
267    while let Ok(event) = event_rx.recv().await {
268        if let Ok(json) = serde_json::to_string(&event) {
269            if socket.send(Message::Text(json)).await.is_err() {
270                break;
271            }
272        }
273    }
274}
275
276/// REST API: Get all tasks
277#[cfg(feature = "dashboard")]
278async fn api_tasks(AxumState(state): AxumState<DashboardState>) -> Json<serde_json::Value> {
279    let tasks = state.inspector.get_all_tasks();
280
281    let task_list: Vec<serde_json::Value> = tasks
282        .into_iter()
283        .map(|task| {
284            serde_json::json!({
285                "id": task.id.as_u64(),
286                "name": task.name,
287                "state": format!("{:?}", task.state),
288                "parent": task.parent.map(|p| p.as_u64()),
289                "poll_count": task.poll_count,
290            })
291        })
292        .collect();
293
294    Json(serde_json::json!({ "tasks": task_list }))
295}
296
297/// REST API: Get statistics
298#[cfg(feature = "dashboard")]
299async fn api_stats(AxumState(state): AxumState<DashboardState>) -> Json<serde_json::Value> {
300    let stats = state.inspector.stats();
301
302    Json(serde_json::json!({
303        "total_tasks": stats.total_tasks,
304        "running_tasks": stats.running_tasks,
305        "completed_tasks": stats.completed_tasks,
306        "failed_tasks": stats.failed_tasks,
307        "blocked_tasks": stats.blocked_tasks,
308    }))
309}
310
311#[cfg(not(feature = "dashboard"))]
312compile_error!("The dashboard module requires the 'dashboard' feature to be enabled");