use super::{
DashboardConfig, DashboardSnapshot, MetricStats, MetricsCollector, OperationCategory,
OperationRecord, ResourceSnapshot,
};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime};
#[derive(Debug)]
pub struct Dashboard {
config: DashboardConfig,
metrics: Arc<MetricsCollector>,
operations: RwLock<VecDeque<OperationRecord>>,
start_time: Instant,
running: AtomicBool,
total_ops: AtomicU64,
total_errors: AtomicU64,
total_rows: AtomicU64,
total_bytes: AtomicU64,
category_counters: RwLock<HashMap<OperationCategory, AtomicU64>>,
active_alerts: RwLock<Vec<String>>,
last_snapshot: RwLock<Instant>,
}
impl Dashboard {
pub fn new(config: DashboardConfig) -> Self {
Dashboard {
config,
metrics: Arc::new(MetricsCollector::new()),
operations: RwLock::new(VecDeque::with_capacity(10000)),
start_time: Instant::now(),
running: AtomicBool::new(false),
total_ops: AtomicU64::new(0),
total_errors: AtomicU64::new(0),
total_rows: AtomicU64::new(0),
total_bytes: AtomicU64::new(0),
category_counters: RwLock::new(HashMap::new()),
active_alerts: RwLock::new(Vec::new()),
last_snapshot: RwLock::new(Instant::now()),
}
}
pub fn default_config() -> Self {
Self::new(DashboardConfig::default())
}
pub fn start(&self) {
self.running.store(true, Ordering::SeqCst);
}
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn metrics(&self) -> &Arc<MetricsCollector> {
&self.metrics
}
pub fn record_operation(
&self,
name: &str,
category: OperationCategory,
duration_us: u64,
rows: Option<usize>,
bytes: Option<usize>,
success: bool,
error: Option<String>,
) {
if !self.config.enabled {
return;
}
if self.config.sample_rate < 1.0 {
let random = (Instant::now().elapsed().as_nanos() % 1000) as f64 / 1000.0;
if random > self.config.sample_rate {
return;
}
}
let record = OperationRecord {
name: name.to_string(),
category,
duration_us,
rows_processed: rows,
bytes_processed: bytes,
timestamp: Instant::now(),
success,
error,
};
self.total_ops.fetch_add(1, Ordering::Relaxed);
if !success {
self.total_errors.fetch_add(1, Ordering::Relaxed);
}
if let Some(r) = rows {
self.total_rows.fetch_add(r as u64, Ordering::Relaxed);
}
if let Some(b) = bytes {
self.total_bytes.fetch_add(b as u64, Ordering::Relaxed);
}
if let Ok(counters) = self.category_counters.write() {
}
self.metrics.record_duration(
&format!("{}.duration", category),
Duration::from_micros(duration_us),
);
if success {
self.metrics.increment(&format!("{}.success", category));
} else {
self.metrics.increment(&format!("{}.error", category));
}
if let Ok(mut ops) = self.operations.write() {
ops.push_back(record);
while ops.len() > self.config.max_metrics {
ops.pop_front();
}
let cutoff = Instant::now() - self.config.retention_period;
while ops.front().map(|r| r.timestamp < cutoff).unwrap_or(false) {
ops.pop_front();
}
}
}
pub fn record_simple(&self, name: &str, duration_us: u64) {
self.record_operation(
name,
OperationCategory::Other,
duration_us,
None,
None,
true,
None,
);
}
pub fn time<F, R>(&self, name: &str, category: OperationCategory, f: F) -> R
where
F: FnOnce() -> R,
{
let start = Instant::now();
let result = f();
let duration = start.elapsed();
self.record_operation(
name,
category,
duration.as_micros() as u64,
None,
None,
true,
None,
);
result
}
pub fn ops_per_second(&self) -> f64 {
let total = self.total_ops.load(Ordering::Relaxed);
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed > 0.0 {
total as f64 / elapsed
} else {
0.0
}
}
pub fn error_rate(&self) -> f64 {
let total = self.total_ops.load(Ordering::Relaxed);
let errors = self.total_errors.load(Ordering::Relaxed);
if total > 0 {
errors as f64 / total as f64
} else {
0.0
}
}
pub fn avg_latency_us(&self) -> f64 {
self.operations
.read()
.map(|ops| {
if ops.is_empty() {
return 0.0;
}
let sum: u64 = ops.iter().map(|r| r.duration_us).sum();
sum as f64 / ops.len() as f64
})
.unwrap_or(0.0)
}
pub fn p99_latency_us(&self) -> f64 {
self.operations
.read()
.map(|ops| {
if ops.is_empty() {
return 0.0;
}
let mut durations: Vec<u64> = ops.iter().map(|r| r.duration_us).collect();
durations.sort();
let idx = (0.99 * (durations.len() - 1) as f64).round() as usize;
durations[idx.min(durations.len() - 1)] as f64
})
.unwrap_or(0.0)
}
pub fn rows_per_second(&self) -> f64 {
let total = self.total_rows.load(Ordering::Relaxed);
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed > 0.0 {
total as f64 / elapsed
} else {
0.0
}
}
pub fn bytes_per_second(&self) -> f64 {
let total = self.total_bytes.load(Ordering::Relaxed);
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed > 0.0 {
total as f64 / elapsed
} else {
0.0
}
}
pub fn category_stats(&self, category: OperationCategory) -> MetricStats {
self.operations
.read()
.map(|ops| {
let values: Vec<f64> = ops
.iter()
.filter(|r| r.category == category)
.map(|r| r.duration_us as f64)
.collect();
MetricStats::from_values(&values)
})
.unwrap_or_default()
}
pub fn all_category_stats(&self) -> HashMap<OperationCategory, MetricStats> {
let mut result = HashMap::new();
for category in [
OperationCategory::Query,
OperationCategory::Write,
OperationCategory::Read,
OperationCategory::Aggregation,
OperationCategory::Join,
OperationCategory::Sort,
OperationCategory::Filter,
OperationCategory::GroupBy,
OperationCategory::IO,
OperationCategory::Memory,
OperationCategory::Other,
] {
let stats = self.category_stats(category);
if stats.count > 0 {
result.insert(category, stats);
}
}
result
}
pub fn resource_snapshot(&self) -> ResourceSnapshot {
ResourceSnapshot {
memory_used: 0,
memory_available: 0,
cpu_usage: 0.0,
thread_count: std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1),
open_files: 0,
timestamp: SystemTime::now(),
}
}
pub fn add_alert(&self, message: String) {
if let Ok(mut alerts) = self.active_alerts.write() {
alerts.push(message);
}
}
pub fn clear_alert(&self, message: &str) {
if let Ok(mut alerts) = self.active_alerts.write() {
alerts.retain(|a| a != message);
}
}
pub fn alerts(&self) -> Vec<String> {
self.active_alerts
.read()
.map(|a| a.clone())
.unwrap_or_default()
}
pub fn snapshot(&self) -> DashboardSnapshot {
DashboardSnapshot {
timestamp: SystemTime::now(),
uptime: self.start_time.elapsed(),
total_operations: self.total_ops.load(Ordering::Relaxed),
ops_per_second: self.ops_per_second(),
avg_latency_us: self.avg_latency_us(),
p99_latency_us: self.p99_latency_us(),
error_rate: self.error_rate(),
total_rows: self.total_rows.load(Ordering::Relaxed),
rows_per_second: self.rows_per_second(),
total_bytes: self.total_bytes.load(Ordering::Relaxed),
bytes_per_second: self.bytes_per_second(),
category_stats: self.all_category_stats(),
resources: self.resource_snapshot(),
active_alerts: self.alerts(),
}
}
pub fn reset(&self) {
self.total_ops.store(0, Ordering::Relaxed);
self.total_errors.store(0, Ordering::Relaxed);
self.total_rows.store(0, Ordering::Relaxed);
self.total_bytes.store(0, Ordering::Relaxed);
if let Ok(mut ops) = self.operations.write() {
ops.clear();
}
if let Ok(mut alerts) = self.active_alerts.write() {
alerts.clear();
}
self.metrics.clear();
}
pub fn recent_operations(&self, limit: usize) -> Vec<OperationRecord> {
self.operations
.read()
.map(|ops| ops.iter().rev().take(limit).cloned().collect())
.unwrap_or_default()
}
pub fn slowest_operations(&self, limit: usize) -> Vec<OperationRecord> {
self.operations
.read()
.map(|ops| {
let mut sorted: Vec<_> = ops.iter().cloned().collect();
sorted.sort_by(|a, b| b.duration_us.cmp(&a.duration_us));
sorted.into_iter().take(limit).collect()
})
.unwrap_or_default()
}
pub fn failed_operations(&self, limit: usize) -> Vec<OperationRecord> {
self.operations
.read()
.map(|ops| {
ops.iter()
.filter(|r| !r.success)
.rev()
.take(limit)
.cloned()
.collect()
})
.unwrap_or_default()
}
}
impl Default for Dashboard {
fn default() -> Self {
Self::default_config()
}
}
static GLOBAL_DASHBOARD: std::sync::OnceLock<Dashboard> = std::sync::OnceLock::new();
pub fn init_global_dashboard(config: DashboardConfig) {
let _ = GLOBAL_DASHBOARD.set(Dashboard::new(config));
}
pub fn global_dashboard() -> &'static Dashboard {
GLOBAL_DASHBOARD.get_or_init(Dashboard::default)
}
pub fn record_global(name: &str, category: OperationCategory, duration_us: u64) {
global_dashboard().record_operation(name, category, duration_us, None, None, true, None);
}
pub fn time_global<F, R>(name: &str, category: OperationCategory, f: F) -> R
where
F: FnOnce() -> R,
{
global_dashboard().time(name, category, f)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dashboard_creation() {
let dashboard = Dashboard::default();
assert!(!dashboard.is_running());
dashboard.start();
assert!(dashboard.is_running());
dashboard.stop();
assert!(!dashboard.is_running());
}
#[test]
fn test_record_operation() {
let dashboard = Dashboard::default();
dashboard.start();
dashboard.record_operation(
"test_query",
OperationCategory::Query,
1000,
Some(100),
Some(1024),
true,
None,
);
let snapshot = dashboard.snapshot();
assert_eq!(snapshot.total_operations, 1);
assert_eq!(snapshot.total_rows, 100);
assert_eq!(snapshot.total_bytes, 1024);
}
#[test]
fn test_time_operation() {
let dashboard = Dashboard::default();
let result = dashboard.time("test_op", OperationCategory::Other, || {
std::thread::sleep(Duration::from_millis(10));
42
});
assert_eq!(result, 42);
assert!(dashboard.avg_latency_us() >= 10000.0); }
#[test]
fn test_error_tracking() {
let dashboard = Dashboard::default();
dashboard.record_operation("op1", OperationCategory::Query, 100, None, None, true, None);
dashboard.record_operation(
"op2",
OperationCategory::Query,
100,
None,
None,
false,
Some("test error".to_string()),
);
assert!((dashboard.error_rate() - 0.5).abs() < 0.01);
}
#[test]
fn test_category_stats() {
let dashboard = Dashboard::default();
for i in 0..10 {
dashboard.record_operation(
"query",
OperationCategory::Query,
(i + 1) * 100,
None,
None,
true,
None,
);
}
let stats = dashboard.category_stats(OperationCategory::Query);
assert_eq!(stats.count, 10);
assert_eq!(stats.min, 100.0);
assert_eq!(stats.max, 1000.0);
}
#[test]
fn test_slowest_operations() {
let dashboard = Dashboard::default();
dashboard.record_simple("fast", 100);
dashboard.record_simple("medium", 500);
dashboard.record_simple("slow", 1000);
let slowest = dashboard.slowest_operations(2);
assert_eq!(slowest.len(), 2);
assert_eq!(slowest[0].duration_us, 1000);
assert_eq!(slowest[1].duration_us, 500);
}
#[test]
fn test_alerts() {
let dashboard = Dashboard::default();
dashboard.add_alert("High latency detected".to_string());
dashboard.add_alert("Memory usage critical".to_string());
let alerts = dashboard.alerts();
assert_eq!(alerts.len(), 2);
dashboard.clear_alert("High latency detected");
assert_eq!(dashboard.alerts().len(), 1);
}
#[test]
fn test_reset() {
let dashboard = Dashboard::default();
for _ in 0..100 {
dashboard.record_simple("op", 100);
}
assert_eq!(dashboard.snapshot().total_operations, 100);
dashboard.reset();
assert_eq!(dashboard.snapshot().total_operations, 0);
}
#[test]
fn test_global_dashboard() {
let dashboard = global_dashboard();
dashboard.start();
record_global("global_op", OperationCategory::Query, 100);
assert!(dashboard.snapshot().total_operations >= 1);
}
}