mockforge_chaos/
dashboard.rs

1//! Real-time dashboard WebSocket support
2
3use crate::{
4    alerts::{Alert, AlertManager},
5    analytics::{ChaosAnalytics, ChaosImpact, MetricsBucket, TimeBucket},
6    scenario_orchestrator::OrchestrationStatus,
7    scenario_replay::ReplayStatus,
8};
9use chrono::{DateTime, Duration, Utc};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13use tokio::sync::broadcast;
14use tracing::{debug, info};
15
16/// Dashboard update message types
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type")]
19pub enum DashboardUpdate {
20    /// Metrics update
21    Metrics {
22        timestamp: DateTime<Utc>,
23        bucket: MetricsBucket,
24    },
25    /// Alert fired
26    AlertFired { alert: Alert },
27    /// Alert resolved
28    AlertResolved { alert_id: String },
29    /// Scenario status change
30    ScenarioStatus {
31        scenario_name: String,
32        status: String,
33        progress: Option<f64>,
34    },
35    /// Orchestration status
36    OrchestrationStatus { status: Option<OrchestrationStatus> },
37    /// Replay status
38    ReplayStatus { status: Option<ReplayStatus> },
39    /// Impact analysis update
40    ImpactUpdate { impact: ChaosImpact },
41    /// Schedule update
42    ScheduleUpdate {
43        schedule_id: String,
44        next_execution: Option<DateTime<Utc>>,
45    },
46    /// Health check / keepalive
47    Ping { timestamp: DateTime<Utc> },
48}
49
50/// Dashboard statistics summary
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct DashboardStats {
53    /// Current timestamp
54    pub timestamp: DateTime<Utc>,
55
56    /// Total events in last hour
57    pub events_last_hour: usize,
58
59    /// Total events in last 24 hours
60    pub events_last_day: usize,
61
62    /// Average latency (ms)
63    pub avg_latency_ms: f64,
64
65    /// Total faults in last hour
66    pub faults_last_hour: usize,
67
68    /// Active alerts count
69    pub active_alerts: usize,
70
71    /// Total scheduled scenarios
72    pub scheduled_scenarios: usize,
73
74    /// Active orchestrations
75    pub active_orchestrations: usize,
76
77    /// Active replays
78    pub active_replays: usize,
79
80    /// Current chaos impact score (0.0 - 1.0)
81    pub current_impact_score: f64,
82
83    /// Top affected endpoints
84    pub top_endpoints: Vec<(String, usize)>,
85}
86
87impl DashboardStats {
88    /// Create empty stats
89    pub fn empty() -> Self {
90        Self {
91            timestamp: Utc::now(),
92            events_last_hour: 0,
93            events_last_day: 0,
94            avg_latency_ms: 0.0,
95            faults_last_hour: 0,
96            active_alerts: 0,
97            scheduled_scenarios: 0,
98            active_orchestrations: 0,
99            active_replays: 0,
100            current_impact_score: 0.0,
101            top_endpoints: vec![],
102        }
103    }
104
105    /// Calculate stats from analytics
106    pub fn from_analytics(analytics: &ChaosAnalytics, alert_manager: &AlertManager) -> Self {
107        let now = Utc::now();
108        let one_hour_ago = now - Duration::hours(1);
109        let one_day_ago = now - Duration::days(1);
110
111        // Get metrics for last hour and day
112        let hour_metrics = analytics.get_metrics(one_hour_ago, now, TimeBucket::Minute);
113        let day_metrics = analytics.get_metrics(one_day_ago, now, TimeBucket::Hour);
114
115        // Calculate statistics
116        let events_last_hour: usize = hour_metrics.iter().map(|m| m.total_events).sum();
117        let events_last_day: usize = day_metrics.iter().map(|m| m.total_events).sum();
118
119        let avg_latency_ms = if !hour_metrics.is_empty() {
120            hour_metrics.iter().map(|m| m.avg_latency_ms).sum::<f64>() / hour_metrics.len() as f64
121        } else {
122            0.0
123        };
124
125        let faults_last_hour: usize = hour_metrics.iter().map(|m| m.total_faults).sum();
126
127        // Get active alerts
128        let active_alerts = alert_manager.get_active_alerts().len();
129
130        // Get impact analysis
131        let impact = analytics.get_impact_analysis(one_hour_ago, now, TimeBucket::Minute);
132
133        Self {
134            timestamp: now,
135            events_last_hour,
136            events_last_day,
137            avg_latency_ms,
138            faults_last_hour,
139            active_alerts,
140            scheduled_scenarios: 0,   // Would be populated from scheduler
141            active_orchestrations: 0, // Would be populated from orchestrator
142            active_replays: 0,        // Would be populated from replay engine
143            current_impact_score: impact.severity_score,
144            top_endpoints: impact.top_affected_endpoints,
145        }
146    }
147}
148
149/// Dashboard manager
150pub struct DashboardManager {
151    /// Analytics engine
152    analytics: Arc<ChaosAnalytics>,
153    /// Alert manager
154    alert_manager: Arc<AlertManager>,
155    /// Update broadcaster
156    update_tx: broadcast::Sender<DashboardUpdate>,
157    /// Last stats snapshot
158    last_stats: Arc<RwLock<DashboardStats>>,
159}
160
161impl DashboardManager {
162    /// Create a new dashboard manager
163    pub fn new(analytics: Arc<ChaosAnalytics>, alert_manager: Arc<AlertManager>) -> Self {
164        let (update_tx, _) = broadcast::channel(100);
165
166        Self {
167            analytics,
168            alert_manager,
169            update_tx,
170            last_stats: Arc::new(RwLock::new(DashboardStats::empty())),
171        }
172    }
173
174    /// Subscribe to dashboard updates
175    pub fn subscribe(&self) -> broadcast::Receiver<DashboardUpdate> {
176        self.update_tx.subscribe()
177    }
178
179    /// Send a dashboard update
180    pub fn send_update(&self, update: DashboardUpdate) {
181        debug!("Sending dashboard update: {:?}", update);
182        let _ = self.update_tx.send(update);
183    }
184
185    /// Broadcast metrics update
186    pub fn broadcast_metrics(&self, bucket: MetricsBucket) {
187        self.send_update(DashboardUpdate::Metrics {
188            timestamp: Utc::now(),
189            bucket,
190        });
191    }
192
193    /// Broadcast alert
194    pub fn broadcast_alert(&self, alert: Alert) {
195        self.send_update(DashboardUpdate::AlertFired { alert });
196    }
197
198    /// Broadcast alert resolution
199    pub fn broadcast_alert_resolved(&self, alert_id: String) {
200        self.send_update(DashboardUpdate::AlertResolved { alert_id });
201    }
202
203    /// Broadcast scenario status
204    pub fn broadcast_scenario_status(
205        &self,
206        scenario_name: String,
207        status: String,
208        progress: Option<f64>,
209    ) {
210        self.send_update(DashboardUpdate::ScenarioStatus {
211            scenario_name,
212            status,
213            progress,
214        });
215    }
216
217    /// Broadcast impact update
218    pub fn broadcast_impact(&self, impact: ChaosImpact) {
219        self.send_update(DashboardUpdate::ImpactUpdate { impact });
220    }
221
222    /// Send ping (keepalive)
223    pub fn send_ping(&self) {
224        self.send_update(DashboardUpdate::Ping {
225            timestamp: Utc::now(),
226        });
227    }
228
229    /// Get current statistics
230    pub fn get_stats(&self) -> DashboardStats {
231        let mut stats = self.last_stats.write();
232        *stats = DashboardStats::from_analytics(&self.analytics, &self.alert_manager);
233        stats.clone()
234    }
235
236    /// Get metrics for time range
237    pub fn get_metrics_range(
238        &self,
239        start: DateTime<Utc>,
240        end: DateTime<Utc>,
241        bucket_size: TimeBucket,
242    ) -> Vec<MetricsBucket> {
243        self.analytics.get_metrics(start, end, bucket_size)
244    }
245
246    /// Get impact analysis
247    pub fn get_impact_analysis(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> ChaosImpact {
248        self.analytics.get_impact_analysis(start, end, TimeBucket::Minute)
249    }
250
251    /// Get active alerts
252    pub fn get_active_alerts(&self) -> Vec<Alert> {
253        self.alert_manager.get_active_alerts()
254    }
255
256    /// Get alert history
257    pub fn get_alert_history(&self, limit: Option<usize>) -> Vec<Alert> {
258        self.alert_manager.get_alert_history(limit)
259    }
260
261    /// Start background update loop
262    pub async fn start_update_loop(&self, interval_seconds: u64) {
263        let analytics = Arc::clone(&self.analytics);
264        let _alert_manager = Arc::clone(&self.alert_manager);
265        let update_tx = self.update_tx.clone();
266
267        tokio::spawn(async move {
268            let mut interval =
269                tokio::time::interval(std::time::Duration::from_secs(interval_seconds));
270
271            loop {
272                interval.tick().await;
273
274                // Send ping
275                let _ = update_tx.send(DashboardUpdate::Ping {
276                    timestamp: Utc::now(),
277                });
278
279                // Calculate and broadcast impact
280                let now = Utc::now();
281                let one_hour_ago = now - Duration::hours(1);
282                let impact = analytics.get_impact_analysis(one_hour_ago, now, TimeBucket::Minute);
283
284                let _ = update_tx.send(DashboardUpdate::ImpactUpdate { impact });
285
286                // Check for new metrics
287                let recent_metrics = analytics.get_current_metrics(1, TimeBucket::Minute);
288                if let Some(latest) = recent_metrics.last() {
289                    let _ = update_tx.send(DashboardUpdate::Metrics {
290                        timestamp: Utc::now(),
291                        bucket: latest.clone(),
292                    });
293                }
294            }
295        });
296
297        info!("Dashboard update loop started (interval: {}s)", interval_seconds);
298    }
299}
300
301/// Dashboard query parameters
302#[derive(Debug, Clone, Deserialize)]
303pub struct DashboardQuery {
304    /// Start time (ISO 8601)
305    pub start: Option<String>,
306    /// End time (ISO 8601)
307    pub end: Option<String>,
308    /// Bucket size (minute, hour, day)
309    pub bucket: Option<String>,
310    /// Limit
311    pub limit: Option<usize>,
312}
313
314impl DashboardQuery {
315    /// Parse start time
316    pub fn parse_start(&self) -> Option<DateTime<Utc>> {
317        self.start
318            .as_ref()
319            .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
320    }
321
322    /// Parse end time
323    pub fn parse_end(&self) -> Option<DateTime<Utc>> {
324        self.end
325            .as_ref()
326            .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
327    }
328
329    /// Parse bucket size
330    pub fn parse_bucket(&self) -> TimeBucket {
331        match self.bucket.as_deref() {
332            Some("minute") | Some("1m") => TimeBucket::Minute,
333            Some("5minutes") | Some("5m") => TimeBucket::FiveMinutes,
334            Some("hour") | Some("1h") => TimeBucket::Hour,
335            Some("day") | Some("1d") => TimeBucket::Day,
336            _ => TimeBucket::Minute,
337        }
338    }
339
340    /// Get time range (defaults to last hour)
341    pub fn get_range(&self) -> (DateTime<Utc>, DateTime<Utc>) {
342        let end = self.parse_end().unwrap_or_else(Utc::now);
343        let start = self.parse_start().unwrap_or_else(|| end - Duration::hours(1));
344        (start, end)
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn test_dashboard_stats_empty() {
354        let stats = DashboardStats::empty();
355        assert_eq!(stats.events_last_hour, 0);
356        assert_eq!(stats.active_alerts, 0);
357    }
358
359    #[test]
360    fn test_dashboard_query_defaults() {
361        let query = DashboardQuery {
362            start: None,
363            end: None,
364            bucket: None,
365            limit: None,
366        };
367
368        let (start, end) = query.get_range();
369        assert!(end > start);
370        assert_eq!(query.parse_bucket(), TimeBucket::Minute);
371    }
372
373    #[test]
374    fn test_dashboard_query_parsing() {
375        let query = DashboardQuery {
376            start: Some("2025-10-07T12:00:00Z".to_string()),
377            end: Some("2025-10-07T13:00:00Z".to_string()),
378            bucket: Some("hour".to_string()),
379            limit: Some(100),
380        };
381
382        let (start, end) = query.get_range();
383        assert_eq!(start.to_rfc3339(), "2025-10-07T12:00:00+00:00");
384        assert_eq!(end.to_rfc3339(), "2025-10-07T13:00:00+00:00");
385        assert_eq!(query.parse_bucket(), TimeBucket::Hour);
386    }
387
388    #[tokio::test]
389    async fn test_dashboard_manager_creation() {
390        let analytics = Arc::new(ChaosAnalytics::new());
391        let alert_manager = Arc::new(AlertManager::new());
392        let manager = DashboardManager::new(analytics, alert_manager);
393
394        let stats = manager.get_stats();
395        assert_eq!(stats.events_last_hour, 0);
396    }
397
398    #[tokio::test]
399    async fn test_dashboard_subscribe() {
400        let analytics = Arc::new(ChaosAnalytics::new());
401        let alert_manager = Arc::new(AlertManager::new());
402        let manager = DashboardManager::new(analytics, alert_manager);
403
404        let mut rx = manager.subscribe();
405
406        manager.send_ping();
407
408        let update = rx.recv().await.unwrap();
409        match update {
410            DashboardUpdate::Ping { .. } => {}
411            _ => panic!("Expected Ping update"),
412        }
413    }
414}