1use crate::{
4 alerts::{Alert, AlertManager},
5 analytics::{ChaosAnalytics, ChaosImpact, MetricsBucket, TimeBucket},
6 scenario_orchestrator::OrchestrationStatus,
7 scenario_replay::ReplayStatus,
8};
9use chrono::{DateTime, Duration, Utc};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13use tokio::sync::broadcast;
14use tracing::{debug, info};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type")]
19pub enum DashboardUpdate {
20 Metrics {
22 timestamp: DateTime<Utc>,
23 bucket: MetricsBucket,
24 },
25 AlertFired { alert: Alert },
27 AlertResolved { alert_id: String },
29 ScenarioStatus {
31 scenario_name: String,
32 status: String,
33 progress: Option<f64>,
34 },
35 OrchestrationStatus { status: Option<OrchestrationStatus> },
37 ReplayStatus { status: Option<ReplayStatus> },
39 ImpactUpdate { impact: ChaosImpact },
41 ScheduleUpdate {
43 schedule_id: String,
44 next_execution: Option<DateTime<Utc>>,
45 },
46 Ping { timestamp: DateTime<Utc> },
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct DashboardStats {
53 pub timestamp: DateTime<Utc>,
55
56 pub events_last_hour: usize,
58
59 pub events_last_day: usize,
61
62 pub avg_latency_ms: f64,
64
65 pub faults_last_hour: usize,
67
68 pub active_alerts: usize,
70
71 pub scheduled_scenarios: usize,
73
74 pub active_orchestrations: usize,
76
77 pub active_replays: usize,
79
80 pub current_impact_score: f64,
82
83 pub top_endpoints: Vec<(String, usize)>,
85}
86
87impl DashboardStats {
88 pub fn empty() -> Self {
90 Self {
91 timestamp: Utc::now(),
92 events_last_hour: 0,
93 events_last_day: 0,
94 avg_latency_ms: 0.0,
95 faults_last_hour: 0,
96 active_alerts: 0,
97 scheduled_scenarios: 0,
98 active_orchestrations: 0,
99 active_replays: 0,
100 current_impact_score: 0.0,
101 top_endpoints: vec![],
102 }
103 }
104
105 pub fn from_analytics(analytics: &ChaosAnalytics, alert_manager: &AlertManager) -> Self {
107 let now = Utc::now();
108 let one_hour_ago = now - Duration::hours(1);
109 let one_day_ago = now - Duration::days(1);
110
111 let hour_metrics = analytics.get_metrics(one_hour_ago, now, TimeBucket::Minute);
113 let day_metrics = analytics.get_metrics(one_day_ago, now, TimeBucket::Hour);
114
115 let events_last_hour: usize = hour_metrics.iter().map(|m| m.total_events).sum();
117 let events_last_day: usize = day_metrics.iter().map(|m| m.total_events).sum();
118
119 let avg_latency_ms = if !hour_metrics.is_empty() {
120 hour_metrics.iter().map(|m| m.avg_latency_ms).sum::<f64>() / hour_metrics.len() as f64
121 } else {
122 0.0
123 };
124
125 let faults_last_hour: usize = hour_metrics.iter().map(|m| m.total_faults).sum();
126
127 let active_alerts = alert_manager.get_active_alerts().len();
129
130 let impact = analytics.get_impact_analysis(one_hour_ago, now, TimeBucket::Minute);
132
133 Self {
134 timestamp: now,
135 events_last_hour,
136 events_last_day,
137 avg_latency_ms,
138 faults_last_hour,
139 active_alerts,
140 scheduled_scenarios: 0, active_orchestrations: 0, active_replays: 0, current_impact_score: impact.severity_score,
144 top_endpoints: impact.top_affected_endpoints,
145 }
146 }
147}
148
149pub struct DashboardManager {
151 analytics: Arc<ChaosAnalytics>,
153 alert_manager: Arc<AlertManager>,
155 update_tx: broadcast::Sender<DashboardUpdate>,
157 last_stats: Arc<RwLock<DashboardStats>>,
159}
160
161impl DashboardManager {
162 pub fn new(analytics: Arc<ChaosAnalytics>, alert_manager: Arc<AlertManager>) -> Self {
164 let (update_tx, _) = broadcast::channel(100);
165
166 Self {
167 analytics,
168 alert_manager,
169 update_tx,
170 last_stats: Arc::new(RwLock::new(DashboardStats::empty())),
171 }
172 }
173
174 pub fn subscribe(&self) -> broadcast::Receiver<DashboardUpdate> {
176 self.update_tx.subscribe()
177 }
178
179 pub fn send_update(&self, update: DashboardUpdate) {
181 debug!("Sending dashboard update: {:?}", update);
182 let _ = self.update_tx.send(update);
183 }
184
185 pub fn broadcast_metrics(&self, bucket: MetricsBucket) {
187 self.send_update(DashboardUpdate::Metrics {
188 timestamp: Utc::now(),
189 bucket,
190 });
191 }
192
193 pub fn broadcast_alert(&self, alert: Alert) {
195 self.send_update(DashboardUpdate::AlertFired { alert });
196 }
197
198 pub fn broadcast_alert_resolved(&self, alert_id: String) {
200 self.send_update(DashboardUpdate::AlertResolved { alert_id });
201 }
202
203 pub fn broadcast_scenario_status(
205 &self,
206 scenario_name: String,
207 status: String,
208 progress: Option<f64>,
209 ) {
210 self.send_update(DashboardUpdate::ScenarioStatus {
211 scenario_name,
212 status,
213 progress,
214 });
215 }
216
217 pub fn broadcast_impact(&self, impact: ChaosImpact) {
219 self.send_update(DashboardUpdate::ImpactUpdate { impact });
220 }
221
222 pub fn send_ping(&self) {
224 self.send_update(DashboardUpdate::Ping {
225 timestamp: Utc::now(),
226 });
227 }
228
229 pub fn get_stats(&self) -> DashboardStats {
231 let mut stats = self.last_stats.write();
232 *stats = DashboardStats::from_analytics(&self.analytics, &self.alert_manager);
233 stats.clone()
234 }
235
236 pub fn get_metrics_range(
238 &self,
239 start: DateTime<Utc>,
240 end: DateTime<Utc>,
241 bucket_size: TimeBucket,
242 ) -> Vec<MetricsBucket> {
243 self.analytics.get_metrics(start, end, bucket_size)
244 }
245
246 pub fn get_impact_analysis(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> ChaosImpact {
248 self.analytics.get_impact_analysis(start, end, TimeBucket::Minute)
249 }
250
251 pub fn get_active_alerts(&self) -> Vec<Alert> {
253 self.alert_manager.get_active_alerts()
254 }
255
256 pub fn get_alert_history(&self, limit: Option<usize>) -> Vec<Alert> {
258 self.alert_manager.get_alert_history(limit)
259 }
260
261 pub async fn start_update_loop(&self, interval_seconds: u64) {
263 let analytics = Arc::clone(&self.analytics);
264 let _alert_manager = Arc::clone(&self.alert_manager);
265 let update_tx = self.update_tx.clone();
266
267 tokio::spawn(async move {
268 let mut interval =
269 tokio::time::interval(std::time::Duration::from_secs(interval_seconds));
270
271 loop {
272 interval.tick().await;
273
274 let _ = update_tx.send(DashboardUpdate::Ping {
276 timestamp: Utc::now(),
277 });
278
279 let now = Utc::now();
281 let one_hour_ago = now - Duration::hours(1);
282 let impact = analytics.get_impact_analysis(one_hour_ago, now, TimeBucket::Minute);
283
284 let _ = update_tx.send(DashboardUpdate::ImpactUpdate { impact });
285
286 let recent_metrics = analytics.get_current_metrics(1, TimeBucket::Minute);
288 if let Some(latest) = recent_metrics.last() {
289 let _ = update_tx.send(DashboardUpdate::Metrics {
290 timestamp: Utc::now(),
291 bucket: latest.clone(),
292 });
293 }
294 }
295 });
296
297 info!("Dashboard update loop started (interval: {}s)", interval_seconds);
298 }
299}
300
301#[derive(Debug, Clone, Deserialize)]
303pub struct DashboardQuery {
304 pub start: Option<String>,
306 pub end: Option<String>,
308 pub bucket: Option<String>,
310 pub limit: Option<usize>,
312}
313
314impl DashboardQuery {
315 pub fn parse_start(&self) -> Option<DateTime<Utc>> {
317 self.start
318 .as_ref()
319 .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
320 }
321
322 pub fn parse_end(&self) -> Option<DateTime<Utc>> {
324 self.end
325 .as_ref()
326 .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
327 }
328
329 pub fn parse_bucket(&self) -> TimeBucket {
331 match self.bucket.as_deref() {
332 Some("minute") | Some("1m") => TimeBucket::Minute,
333 Some("5minutes") | Some("5m") => TimeBucket::FiveMinutes,
334 Some("hour") | Some("1h") => TimeBucket::Hour,
335 Some("day") | Some("1d") => TimeBucket::Day,
336 _ => TimeBucket::Minute,
337 }
338 }
339
340 pub fn get_range(&self) -> (DateTime<Utc>, DateTime<Utc>) {
342 let end = self.parse_end().unwrap_or_else(Utc::now);
343 let start = self.parse_start().unwrap_or_else(|| end - Duration::hours(1));
344 (start, end)
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
353 fn test_dashboard_stats_empty() {
354 let stats = DashboardStats::empty();
355 assert_eq!(stats.events_last_hour, 0);
356 assert_eq!(stats.active_alerts, 0);
357 }
358
359 #[test]
360 fn test_dashboard_query_defaults() {
361 let query = DashboardQuery {
362 start: None,
363 end: None,
364 bucket: None,
365 limit: None,
366 };
367
368 let (start, end) = query.get_range();
369 assert!(end > start);
370 assert_eq!(query.parse_bucket(), TimeBucket::Minute);
371 }
372
373 #[test]
374 fn test_dashboard_query_parsing() {
375 let query = DashboardQuery {
376 start: Some("2025-10-07T12:00:00Z".to_string()),
377 end: Some("2025-10-07T13:00:00Z".to_string()),
378 bucket: Some("hour".to_string()),
379 limit: Some(100),
380 };
381
382 let (start, end) = query.get_range();
383 assert_eq!(start.to_rfc3339(), "2025-10-07T12:00:00+00:00");
384 assert_eq!(end.to_rfc3339(), "2025-10-07T13:00:00+00:00");
385 assert_eq!(query.parse_bucket(), TimeBucket::Hour);
386 }
387
388 #[tokio::test]
389 async fn test_dashboard_manager_creation() {
390 let analytics = Arc::new(ChaosAnalytics::new());
391 let alert_manager = Arc::new(AlertManager::new());
392 let manager = DashboardManager::new(analytics, alert_manager);
393
394 let stats = manager.get_stats();
395 assert_eq!(stats.events_last_hour, 0);
396 }
397
398 #[tokio::test]
399 async fn test_dashboard_subscribe() {
400 let analytics = Arc::new(ChaosAnalytics::new());
401 let alert_manager = Arc::new(AlertManager::new());
402 let manager = DashboardManager::new(analytics, alert_manager);
403
404 let mut rx = manager.subscribe();
405
406 manager.send_ping();
407
408 let update = rx.recv().await.unwrap();
409 match update {
410 DashboardUpdate::Ping { .. } => {}
411 _ => panic!("Expected Ping update"),
412 }
413 }
414}