1use super::{
7 metrics::MetricsCollector,
8 monitoring::{PerformanceMetrics, SearchStats, SystemMonitor, SystemOverview, UserStats},
9};
10use crate::{RragError, RragResult};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use tokio::sync::{broadcast, mpsc, RwLock};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct DashboardConfig {
21 pub enabled: bool,
22 pub host: String,
23 pub port: u16,
24 pub title: String,
25 pub refresh_interval_seconds: u32,
26 pub max_data_points: usize,
27 pub websocket_enabled: bool,
28 pub auth_enabled: bool,
29 pub auth_token: Option<String>,
30 pub cors_enabled: bool,
31 pub allowed_origins: Vec<String>,
32}
33
34impl Default for DashboardConfig {
35 fn default() -> Self {
36 Self {
37 enabled: true,
38 host: "0.0.0.0".to_string(),
39 port: 3000,
40 title: "RRAG Observability Dashboard".to_string(),
41 refresh_interval_seconds: 5,
42 max_data_points: 100,
43 websocket_enabled: true,
44 auth_enabled: false,
45 auth_token: None,
46 cors_enabled: true,
47 allowed_origins: vec!["*".to_string()],
48 }
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ChartData {
55 pub labels: Vec<String>,
56 pub datasets: Vec<ChartDataset>,
57 pub chart_type: ChartType,
58 pub title: String,
59 pub unit: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ChartDataset {
64 pub label: String,
65 pub data: Vec<f64>,
66 pub color: String,
67 pub fill: bool,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub enum ChartType {
72 Line,
73 Bar,
74 Pie,
75 Gauge,
76 Area,
77 Scatter,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct RealtimeMetrics {
83 pub timestamp: DateTime<Utc>,
84 pub system_overview: SystemOverview,
85 pub charts: HashMap<String, ChartData>,
86 pub alerts: Vec<AlertInfo>,
87 pub health_status: ComponentHealthStatus,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct AlertInfo {
92 pub id: String,
93 pub severity: String,
94 pub message: String,
95 pub timestamp: DateTime<Utc>,
96 pub component: String,
97 pub acknowledged: bool,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct ComponentHealthStatus {
102 pub overall: String,
103 pub components: HashMap<String, ComponentHealth>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ComponentHealth {
108 pub status: String,
109 pub uptime_seconds: i64,
110 pub last_check: DateTime<Utc>,
111 pub error_count: u64,
112 pub response_time_ms: f64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "type")]
118pub enum WebSocketMessage {
119 #[serde(rename = "metrics_update")]
120 MetricsUpdate { data: RealtimeMetrics },
121 #[serde(rename = "alert")]
122 Alert { alert: AlertInfo },
123 #[serde(rename = "health_update")]
124 HealthUpdate { health: ComponentHealthStatus },
125 #[serde(rename = "chart_update")]
126 ChartUpdate { chart_id: String, data: ChartData },
127 #[serde(rename = "ping")]
128 Ping { timestamp: DateTime<Utc> },
129 #[serde(rename = "pong")]
130 Pong { timestamp: DateTime<Utc> },
131}
132
133pub struct WebSocketManager {
135 clients: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<String>>>>,
136 broadcast_sender: broadcast::Sender<WebSocketMessage>,
137 _broadcast_receiver: broadcast::Receiver<WebSocketMessage>,
138}
139
140impl WebSocketManager {
141 pub fn new() -> Self {
142 let (broadcast_sender, broadcast_receiver) = broadcast::channel(1000);
143
144 Self {
145 clients: Arc::new(RwLock::new(HashMap::new())),
146 broadcast_sender,
147 _broadcast_receiver: broadcast_receiver,
148 }
149 }
150
151 pub async fn add_client(&self, client_id: String, sender: mpsc::UnboundedSender<String>) {
152 let mut clients = self.clients.write().await;
153 clients.insert(client_id, sender);
154 tracing::info!(
155 "WebSocket client connected, total clients: {}",
156 clients.len()
157 );
158 }
159
160 pub async fn remove_client(&self, client_id: &str) {
161 let mut clients = self.clients.write().await;
162 clients.remove(client_id);
163 tracing::info!(
164 "WebSocket client disconnected, total clients: {}",
165 clients.len()
166 );
167 }
168
169 pub async fn broadcast_message(&self, message: WebSocketMessage) -> RragResult<()> {
170 if let Err(e) = self.broadcast_sender.send(message.clone()) {
172 tracing::warn!("Failed to broadcast message: {}", e);
173 }
174
175 let message_str = serde_json::to_string(&message)
177 .map_err(|e| RragError::agent("websocket", e.to_string()))?;
178
179 let mut clients = self.clients.write().await;
180 let mut disconnected_clients = Vec::new();
181
182 for (client_id, sender) in clients.iter() {
183 if sender.send(message_str.clone()).is_err() {
184 disconnected_clients.push(client_id.clone());
185 }
186 }
187
188 for client_id in disconnected_clients {
190 clients.remove(&client_id);
191 }
192
193 Ok(())
194 }
195
196 pub async fn get_client_count(&self) -> usize {
197 self.clients.read().await.len()
198 }
199
200 pub fn subscribe_to_broadcasts(&self) -> broadcast::Receiver<WebSocketMessage> {
201 self.broadcast_sender.subscribe()
202 }
203}
204
205pub struct DashboardMetrics {
207 performance_history: Arc<RwLock<Vec<PerformanceMetrics>>>,
208 search_stats_history: Arc<RwLock<Vec<SearchStats>>>,
209 user_stats_history: Arc<RwLock<Vec<UserStats>>>,
210 max_data_points: usize,
211}
212
213impl DashboardMetrics {
214 pub fn new(max_data_points: usize) -> Self {
215 Self {
216 performance_history: Arc::new(RwLock::new(Vec::new())),
217 search_stats_history: Arc::new(RwLock::new(Vec::new())),
218 user_stats_history: Arc::new(RwLock::new(Vec::new())),
219 max_data_points,
220 }
221 }
222
223 pub async fn update_performance(&self, metrics: PerformanceMetrics) {
224 let mut history = self.performance_history.write().await;
225 history.push(metrics);
226
227 let current_len = history.len();
228 if current_len > self.max_data_points {
229 history.drain(0..current_len - self.max_data_points);
230 }
231 }
232
233 pub async fn update_search_stats(&self, stats: SearchStats) {
234 let mut history = self.search_stats_history.write().await;
235 history.push(stats);
236
237 let current_len = history.len();
238 if current_len > self.max_data_points {
239 history.drain(0..current_len - self.max_data_points);
240 }
241 }
242
243 pub async fn update_user_stats(&self, stats: UserStats) {
244 let mut history = self.user_stats_history.write().await;
245 history.push(stats);
246
247 let current_len = history.len();
248 if current_len > self.max_data_points {
249 history.drain(0..current_len - self.max_data_points);
250 }
251 }
252
253 pub async fn generate_charts(&self) -> HashMap<String, ChartData> {
254 let mut charts = HashMap::new();
255
256 charts.insert("cpu_usage".to_string(), self.create_cpu_chart().await);
258 charts.insert("memory_usage".to_string(), self.create_memory_chart().await);
259 charts.insert("disk_usage".to_string(), self.create_disk_chart().await);
260
261 charts.insert(
263 "search_performance".to_string(),
264 self.create_search_performance_chart().await,
265 );
266 charts.insert(
267 "search_success_rate".to_string(),
268 self.create_search_success_chart().await,
269 );
270 charts.insert(
271 "cache_hit_rate".to_string(),
272 self.create_cache_hit_chart().await,
273 );
274
275 charts.insert(
277 "active_users".to_string(),
278 self.create_active_users_chart().await,
279 );
280 charts.insert(
281 "user_actions".to_string(),
282 self.create_user_actions_chart().await,
283 );
284
285 charts
286 }
287
288 async fn create_cpu_chart(&self) -> ChartData {
289 let history = self.performance_history.read().await;
290 let labels: Vec<String> = history
291 .iter()
292 .map(|m| m.timestamp.format("%H:%M:%S").to_string())
293 .collect();
294 let data: Vec<f64> = history.iter().map(|m| m.cpu_usage_percent).collect();
295
296 ChartData {
297 labels,
298 datasets: vec![ChartDataset {
299 label: "CPU Usage %".to_string(),
300 data,
301 color: "#3b82f6".to_string(),
302 fill: true,
303 }],
304 chart_type: ChartType::Area,
305 title: "CPU Usage Over Time".to_string(),
306 unit: Some("%".to_string()),
307 }
308 }
309
310 async fn create_memory_chart(&self) -> ChartData {
311 let history = self.performance_history.read().await;
312 let labels: Vec<String> = history
313 .iter()
314 .map(|m| m.timestamp.format("%H:%M:%S").to_string())
315 .collect();
316 let data: Vec<f64> = history.iter().map(|m| m.memory_usage_percent).collect();
317
318 ChartData {
319 labels,
320 datasets: vec![ChartDataset {
321 label: "Memory Usage %".to_string(),
322 data,
323 color: "#10b981".to_string(),
324 fill: true,
325 }],
326 chart_type: ChartType::Area,
327 title: "Memory Usage Over Time".to_string(),
328 unit: Some("%".to_string()),
329 }
330 }
331
332 async fn create_disk_chart(&self) -> ChartData {
333 let history = self.performance_history.read().await;
334 let labels: Vec<String> = history
335 .iter()
336 .map(|m| m.timestamp.format("%H:%M:%S").to_string())
337 .collect();
338 let data: Vec<f64> = history.iter().map(|m| m.disk_usage_percent).collect();
339
340 ChartData {
341 labels,
342 datasets: vec![ChartDataset {
343 label: "Disk Usage %".to_string(),
344 data,
345 color: "#f59e0b".to_string(),
346 fill: true,
347 }],
348 chart_type: ChartType::Area,
349 title: "Disk Usage Over Time".to_string(),
350 unit: Some("%".to_string()),
351 }
352 }
353
354 async fn create_search_performance_chart(&self) -> ChartData {
355 let history = self.search_stats_history.read().await;
356 let labels: Vec<String> = (0..history.len())
357 .map(|i| format!("Point {}", i + 1))
358 .collect();
359 let data: Vec<f64> = history
360 .iter()
361 .map(|s| s.average_processing_time_ms)
362 .collect();
363
364 ChartData {
365 labels,
366 datasets: vec![ChartDataset {
367 label: "Avg Processing Time".to_string(),
368 data,
369 color: "#8b5cf6".to_string(),
370 fill: false,
371 }],
372 chart_type: ChartType::Line,
373 title: "Search Processing Time".to_string(),
374 unit: Some("ms".to_string()),
375 }
376 }
377
378 async fn create_search_success_chart(&self) -> ChartData {
379 let history = self.search_stats_history.read().await;
380 let labels: Vec<String> = (0..history.len())
381 .map(|i| format!("Point {}", i + 1))
382 .collect();
383 let data: Vec<f64> = history.iter().map(|s| s.success_rate).collect();
384
385 ChartData {
386 labels,
387 datasets: vec![ChartDataset {
388 label: "Success Rate".to_string(),
389 data,
390 color: "#06d6a0".to_string(),
391 fill: false,
392 }],
393 chart_type: ChartType::Line,
394 title: "Search Success Rate".to_string(),
395 unit: Some("%".to_string()),
396 }
397 }
398
399 async fn create_cache_hit_chart(&self) -> ChartData {
400 let history = self.search_stats_history.read().await;
401 let labels: Vec<String> = (0..history.len())
402 .map(|i| format!("Point {}", i + 1))
403 .collect();
404 let data: Vec<f64> = history.iter().map(|s| s.cache_hit_rate).collect();
405
406 ChartData {
407 labels,
408 datasets: vec![ChartDataset {
409 label: "Cache Hit Rate".to_string(),
410 data,
411 color: "#ff6b6b".to_string(),
412 fill: false,
413 }],
414 chart_type: ChartType::Line,
415 title: "Cache Hit Rate".to_string(),
416 unit: Some("%".to_string()),
417 }
418 }
419
420 async fn create_active_users_chart(&self) -> ChartData {
421 let history = self.user_stats_history.read().await;
422 let labels: Vec<String> = (0..history.len())
423 .map(|i| format!("Point {}", i + 1))
424 .collect();
425 let data: Vec<f64> = history.iter().map(|s| s.unique_users as f64).collect();
426
427 ChartData {
428 labels,
429 datasets: vec![ChartDataset {
430 label: "Active Users".to_string(),
431 data,
432 color: "#4ecdc4".to_string(),
433 fill: true,
434 }],
435 chart_type: ChartType::Area,
436 title: "Active Users Over Time".to_string(),
437 unit: Some("users".to_string()),
438 }
439 }
440
441 async fn create_user_actions_chart(&self) -> ChartData {
442 let history = self.user_stats_history.read().await;
443 if let Some(latest_stats) = history.last() {
444 let labels: Vec<String> = latest_stats.action_breakdown.keys().cloned().collect();
445 let data: Vec<f64> = latest_stats
446 .action_breakdown
447 .values()
448 .map(|&count| count as f64)
449 .collect();
450
451 ChartData {
452 labels,
453 datasets: vec![ChartDataset {
454 label: "User Actions".to_string(),
455 data,
456 color: "#ff9ff3".to_string(),
457 fill: false,
458 }],
459 chart_type: ChartType::Pie,
460 title: "User Action Distribution".to_string(),
461 unit: Some("actions".to_string()),
462 }
463 } else {
464 ChartData {
465 labels: vec![],
466 datasets: vec![],
467 chart_type: ChartType::Pie,
468 title: "User Action Distribution".to_string(),
469 unit: Some("actions".to_string()),
470 }
471 }
472 }
473}
474
475pub struct DashboardServer {
477 config: DashboardConfig,
478 metrics_collector: Arc<MetricsCollector>,
479 system_monitor: Arc<SystemMonitor>,
480 websocket_manager: Arc<WebSocketManager>,
481 dashboard_metrics: Arc<DashboardMetrics>,
482 server_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
483 update_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
484 is_running: Arc<RwLock<bool>>,
485}
486
487impl DashboardServer {
488 pub async fn new(
489 config: DashboardConfig,
490 metrics_collector: Arc<MetricsCollector>,
491 system_monitor: Arc<SystemMonitor>,
492 ) -> RragResult<Self> {
493 let websocket_manager = Arc::new(WebSocketManager::new());
494 let dashboard_metrics = Arc::new(DashboardMetrics::new(config.max_data_points));
495
496 Ok(Self {
497 config,
498 metrics_collector,
499 system_monitor,
500 websocket_manager,
501 dashboard_metrics,
502 server_handle: Arc::new(RwLock::new(None)),
503 update_handle: Arc::new(RwLock::new(None)),
504 is_running: Arc::new(RwLock::new(false)),
505 })
506 }
507
508 pub async fn start(&self) -> RragResult<()> {
509 if !self.config.enabled {
510 return Ok(());
511 }
512
513 let mut running = self.is_running.write().await;
514 if *running {
515 return Err(RragError::config(
516 "dashboard_server",
517 "stopped",
518 "already running",
519 ));
520 }
521
522 let server_handle = self.start_http_server().await?;
524 {
525 let mut handle = self.server_handle.write().await;
526 *handle = Some(server_handle);
527 }
528
529 let update_handle = self.start_update_loop().await?;
531 {
532 let mut handle = self.update_handle.write().await;
533 *handle = Some(update_handle);
534 }
535
536 *running = true;
537 tracing::info!(
538 "Dashboard server started on {}:{}",
539 self.config.host,
540 self.config.port
541 );
542 Ok(())
543 }
544
545 pub async fn stop(&self) -> RragResult<()> {
546 let mut running = self.is_running.write().await;
547 if !*running {
548 return Ok(());
549 }
550
551 {
553 let mut handle = self.server_handle.write().await;
554 if let Some(h) = handle.take() {
555 h.abort();
556 }
557 }
558 {
559 let mut handle = self.update_handle.write().await;
560 if let Some(h) = handle.take() {
561 h.abort();
562 }
563 }
564
565 *running = false;
566 tracing::info!("Dashboard server stopped");
567 Ok(())
568 }
569
570 pub async fn is_healthy(&self) -> bool {
571 *self.is_running.read().await
572 }
573
574 async fn start_http_server(&self) -> RragResult<tokio::task::JoinHandle<()>> {
575 let config = self.config.clone();
576 let websocket_manager = self.websocket_manager.clone();
577 let is_running = self.is_running.clone();
578
579 let handle = tokio::spawn(async move {
580 let addr: SocketAddr = format!("{}:{}", config.host, config.port)
585 .parse()
586 .expect("Invalid address");
587
588 tracing::info!("Dashboard HTTP server would start on {}", addr);
589
590 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
592 while *is_running.read().await {
593 interval.tick().await;
594
595 let client_count = websocket_manager.get_client_count().await;
597 tracing::debug!("Active WebSocket clients: {}", client_count);
598 }
599 });
600
601 Ok(handle)
602 }
603
604 async fn start_update_loop(&self) -> RragResult<tokio::task::JoinHandle<()>> {
605 let config = self.config.clone();
606 let system_monitor = self.system_monitor.clone();
607 let websocket_manager = self.websocket_manager.clone();
608 let dashboard_metrics = self.dashboard_metrics.clone();
609 let is_running = self.is_running.clone();
610
611 let handle = tokio::spawn(async move {
612 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
613 config.refresh_interval_seconds as u64,
614 ));
615
616 while *is_running.read().await {
617 interval.tick().await;
618
619 let overview = system_monitor.get_system_overview().await;
621
622 if let Some(ref perf) = overview.performance_metrics {
624 dashboard_metrics.update_performance(perf.clone()).await;
625 }
626 if let Some(ref search_stats) = overview.search_stats {
627 dashboard_metrics
628 .update_search_stats(search_stats.clone())
629 .await;
630 }
631 if let Some(ref user_stats) = overview.user_stats {
632 dashboard_metrics
633 .update_user_stats(user_stats.clone())
634 .await;
635 }
636
637 let charts = dashboard_metrics.generate_charts().await;
639
640 let health_status = ComponentHealthStatus {
642 overall: "healthy".to_string(),
643 components: HashMap::from([
644 (
645 "metrics".to_string(),
646 ComponentHealth {
647 status: "healthy".to_string(),
648 uptime_seconds: 3600,
649 last_check: Utc::now(),
650 error_count: 0,
651 response_time_ms: 10.0,
652 },
653 ),
654 (
655 "monitoring".to_string(),
656 ComponentHealth {
657 status: "healthy".to_string(),
658 uptime_seconds: 3600,
659 last_check: Utc::now(),
660 error_count: 0,
661 response_time_ms: 15.0,
662 },
663 ),
664 ]),
665 };
666
667 let realtime_metrics = RealtimeMetrics {
669 timestamp: Utc::now(),
670 system_overview: overview,
671 charts,
672 alerts: vec![], health_status,
674 };
675
676 if let Err(e) = websocket_manager
678 .broadcast_message(WebSocketMessage::MetricsUpdate {
679 data: realtime_metrics,
680 })
681 .await
682 {
683 tracing::warn!("Failed to broadcast metrics update: {}", e);
684 }
685 }
686 });
687
688 Ok(handle)
689 }
690
691 pub async fn get_current_data(&self) -> RragResult<RealtimeMetrics> {
693 let overview = self.system_monitor.get_system_overview().await;
694 let charts = self.dashboard_metrics.generate_charts().await;
695
696 let health_status = ComponentHealthStatus {
697 overall: "healthy".to_string(),
698 components: HashMap::from([(
699 "metrics".to_string(),
700 ComponentHealth {
701 status: "healthy".to_string(),
702 uptime_seconds: 3600,
703 last_check: Utc::now(),
704 error_count: 0,
705 response_time_ms: 10.0,
706 },
707 )]),
708 };
709
710 Ok(RealtimeMetrics {
711 timestamp: Utc::now(),
712 system_overview: overview,
713 charts,
714 alerts: vec![],
715 health_status,
716 })
717 }
718
719 pub fn websocket_manager(&self) -> &Arc<WebSocketManager> {
721 &self.websocket_manager
722 }
723
724 pub fn config(&self) -> &DashboardConfig {
726 &self.config
727 }
728}
729
730pub struct DashboardHandler {
732 server: Arc<DashboardServer>,
733}
734
735impl DashboardHandler {
736 pub fn new(server: Arc<DashboardServer>) -> Self {
737 Self { server }
738 }
739
740 pub async fn handle_dashboard(&self) -> RragResult<String> {
742 let data = self.server.get_current_data().await?;
744
745 Ok(format!(
746 r#"
747<!DOCTYPE html>
748<html lang="en">
749<head>
750 <meta charset="UTF-8">
751 <meta name="viewport" content="width=device-width, initial-scale=1.0">
752 <title>{}</title>
753 <style>
754 body {{ font-family: Arial, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
755 .container {{ max-width: 1200px; margin: 0 auto; }}
756 .header {{ background: white; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
757 .metrics-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
758 .metric-card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
759 .metric-title {{ font-size: 18px; font-weight: bold; margin-bottom: 10px; color: #333; }}
760 .metric-value {{ font-size: 24px; font-weight: bold; color: #2563eb; }}
761 .status-healthy {{ color: #10b981; }}
762 .status-warning {{ color: #f59e0b; }}
763 .status-error {{ color: #ef4444; }}
764 .chart-placeholder {{ height: 200px; background: #f8fafc; border: 2px dashed #cbd5e1; border-radius: 4px; display: flex; align-items: center; justify-content: center; color: #64748b; }}
765 </style>
766</head>
767<body>
768 <div class="container">
769 <div class="header">
770 <h1>{}</h1>
771 <p>Last updated: {}</p>
772 </div>
773
774 <div class="metrics-grid">
775 <div class="metric-card">
776 <div class="metric-title">System Status</div>
777 <div class="metric-value status-healthy">Healthy</div>
778 </div>
779
780 <div class="metric-card">
781 <div class="metric-title">Active WebSocket Clients</div>
782 <div class="metric-value">{}</div>
783 </div>
784
785 <div class="metric-card">
786 <div class="metric-title">CPU Usage</div>
787 <div class="metric-value">{:.1}%</div>
788 </div>
789
790 <div class="metric-card">
791 <div class="metric-title">Memory Usage</div>
792 <div class="metric-value">{:.1}%</div>
793 </div>
794
795 <div class="metric-card">
796 <div class="metric-title">Search Success Rate</div>
797 <div class="metric-value">{:.1}%</div>
798 </div>
799
800 <div class="metric-card">
801 <div class="metric-title">Active Users</div>
802 <div class="metric-value">{}</div>
803 </div>
804 </div>
805
806 <div style="margin-top: 40px;">
807 <h2>Charts</h2>
808 <div class="metrics-grid">
809 <div class="metric-card">
810 <div class="metric-title">CPU Usage Over Time</div>
811 <div class="chart-placeholder">CPU Usage Chart</div>
812 </div>
813
814 <div class="metric-card">
815 <div class="metric-title">Memory Usage Over Time</div>
816 <div class="chart-placeholder">Memory Usage Chart</div>
817 </div>
818
819 <div class="metric-card">
820 <div class="metric-title">Search Performance</div>
821 <div class="chart-placeholder">Search Performance Chart</div>
822 </div>
823
824 <div class="metric-card">
825 <div class="metric-title">User Activity</div>
826 <div class="chart-placeholder">User Activity Chart</div>
827 </div>
828 </div>
829 </div>
830 </div>
831
832 <script>
833 // WebSocket connection for real-time updates
834 const ws = new WebSocket('ws://{}:{}/ws');
835 ws.onmessage = function(event) {{
836 const message = JSON.parse(event.data);
837 if (message.type === 'metrics_update') {{
838 // Update dashboard with new data
839 console.log('Received metrics update:', message.data);
840 }}
841 }};
842
843 // Refresh page every 30 seconds as fallback
844 setTimeout(() => location.reload(), 30000);
845 </script>
846</body>
847</html>
848"#,
849 self.server.config().title,
850 self.server.config().title,
851 data.timestamp.format("%Y-%m-%d %H:%M:%S UTC"),
852 self.server.websocket_manager().get_client_count().await,
853 data.system_overview
854 .performance_metrics
855 .as_ref()
856 .map(|p| p.cpu_usage_percent)
857 .unwrap_or(0.0),
858 data.system_overview
859 .performance_metrics
860 .as_ref()
861 .map(|p| p.memory_usage_percent)
862 .unwrap_or(0.0),
863 data.system_overview
864 .search_stats
865 .as_ref()
866 .map(|s| s.success_rate)
867 .unwrap_or(0.0),
868 data.system_overview
869 .user_stats
870 .as_ref()
871 .map(|u| u.unique_users)
872 .unwrap_or(0),
873 self.server.config().host,
874 self.server.config().port
875 ))
876 }
877
878 pub async fn handle_metrics_api(&self) -> RragResult<String> {
880 let data = self.server.get_current_data().await?;
881 serde_json::to_string_pretty(&data)
882 .map_err(|e| RragError::agent("dashboard", e.to_string()))
883 }
884
885 pub async fn handle_health(&self) -> RragResult<String> {
887 let health = ComponentHealthStatus {
888 overall: "healthy".to_string(),
889 components: HashMap::from([(
890 "dashboard".to_string(),
891 ComponentHealth {
892 status: "healthy".to_string(),
893 uptime_seconds: 3600,
894 last_check: Utc::now(),
895 error_count: 0,
896 response_time_ms: 5.0,
897 },
898 )]),
899 };
900
901 serde_json::to_string(&health).map_err(|e| RragError::agent("dashboard", e.to_string()))
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908 use crate::observability::{metrics::MetricsConfig, monitoring::MonitoringConfig};
909
910 async fn create_test_components() -> (Arc<MetricsCollector>, Arc<SystemMonitor>) {
911 let metrics_collector = Arc::new(
912 MetricsCollector::new(MetricsConfig::default())
913 .await
914 .unwrap(),
915 );
916 let system_monitor = Arc::new(
917 SystemMonitor::new(MonitoringConfig::default(), metrics_collector.clone())
918 .await
919 .unwrap(),
920 );
921 (metrics_collector, system_monitor)
922 }
923
924 #[tokio::test]
925 async fn test_websocket_manager() {
926 let manager = WebSocketManager::new();
927 assert_eq!(manager.get_client_count().await, 0);
928
929 let (sender, _receiver) = mpsc::unbounded_channel();
930 manager.add_client("client1".to_string(), sender).await;
931 assert_eq!(manager.get_client_count().await, 1);
932
933 manager.remove_client("client1").await;
934 assert_eq!(manager.get_client_count().await, 0);
935 }
936
937 #[tokio::test]
938 async fn test_dashboard_metrics() {
939 let dashboard_metrics = DashboardMetrics::new(100);
940
941 let perf_metrics = PerformanceMetrics {
942 timestamp: Utc::now(),
943 cpu_usage_percent: 50.0,
944 memory_usage_mb: 1024.0,
945 memory_usage_percent: 60.0,
946 disk_usage_mb: 2048.0,
947 disk_usage_percent: 70.0,
948 network_bytes_sent: 1000,
949 network_bytes_received: 2000,
950 active_connections: 10,
951 thread_count: 50,
952 gc_collections: 5,
953 gc_pause_time_ms: 2.5,
954 };
955
956 dashboard_metrics.update_performance(perf_metrics).await;
957
958 let charts = dashboard_metrics.generate_charts().await;
959 assert!(charts.contains_key("cpu_usage"));
960 assert!(charts.contains_key("memory_usage"));
961 assert!(charts.contains_key("disk_usage"));
962 }
963
964 #[tokio::test]
965 async fn test_dashboard_server() {
966 let (metrics_collector, system_monitor) = create_test_components().await;
967 let config = DashboardConfig::default();
968 let mut server = DashboardServer::new(config, metrics_collector, system_monitor)
969 .await
970 .unwrap();
971
972 assert!(!server.is_healthy().await);
973
974 server.start().await.unwrap();
975 assert!(server.is_healthy().await);
976
977 let current_data = server.get_current_data().await.unwrap();
978 assert!(current_data.charts.len() > 0);
979
980 server.stop().await.unwrap();
981 assert!(!server.is_healthy().await);
982 }
983
984 #[tokio::test]
985 async fn test_dashboard_handler() {
986 let (metrics_collector, system_monitor) = create_test_components().await;
987 let config = DashboardConfig::default();
988 let server = Arc::new(
989 DashboardServer::new(config, metrics_collector, system_monitor)
990 .await
991 .unwrap(),
992 );
993
994 let handler = DashboardHandler::new(server);
995
996 let dashboard_html = handler.handle_dashboard().await.unwrap();
997 assert!(dashboard_html.contains("<!DOCTYPE html>"));
998 assert!(dashboard_html.contains("RRAG Observability Dashboard"));
999
1000 let metrics_json = handler.handle_metrics_api().await.unwrap();
1001 assert!(serde_json::from_str::<RealtimeMetrics>(&metrics_json).is_ok());
1002
1003 let health_json = handler.handle_health().await.unwrap();
1004 assert!(serde_json::from_str::<ComponentHealthStatus>(&health_json).is_ok());
1005 }
1006
1007 #[test]
1008 fn test_chart_data_creation() {
1009 let chart_data = ChartData {
1010 labels: vec!["A".to_string(), "B".to_string(), "C".to_string()],
1011 datasets: vec![ChartDataset {
1012 label: "Test Data".to_string(),
1013 data: vec![10.0, 20.0, 30.0],
1014 color: "#3b82f6".to_string(),
1015 fill: false,
1016 }],
1017 chart_type: ChartType::Line,
1018 title: "Test Chart".to_string(),
1019 unit: Some("units".to_string()),
1020 };
1021
1022 assert_eq!(chart_data.labels.len(), 3);
1023 assert_eq!(chart_data.datasets[0].data.len(), 3);
1024 assert_eq!(chart_data.title, "Test Chart");
1025 }
1026
1027 #[test]
1028 fn test_websocket_message_serialization() {
1029 let message = WebSocketMessage::Ping {
1030 timestamp: Utc::now(),
1031 };
1032
1033 let json = serde_json::to_string(&message).unwrap();
1034 assert!(json.contains("\"type\":\"ping\""));
1035
1036 let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
1037 match deserialized {
1038 WebSocketMessage::Ping { .. } => {}
1039 _ => panic!("Wrong message type"),
1040 }
1041 }
1042}