use crate::core::types::{ChannelType, DeviceId};
use dashmap::DashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
pub struct MetricsCollector {
messages_sent: AtomicU64,
messages_received: AtomicU64,
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
channel_usage: DashMap<ChannelType, AtomicU64>,
last_rtt: DashMap<DeviceId, u32>,
start_time: Instant,
}
impl Default for MetricsCollector {
fn default() -> Self {
Self {
messages_sent: AtomicU64::new(0),
messages_received: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
channel_usage: DashMap::new(),
last_rtt: DashMap::new(),
start_time: Instant::now(),
}
}
}
impl MetricsCollector {
pub fn new() -> Self {
Self::default()
}
pub fn record_send(&self, channel: ChannelType, bytes: u64) {
self.messages_sent.fetch_add(1, Ordering::Relaxed);
self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
self.channel_usage
.entry(channel)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_receive(&self, bytes: u64) {
self.messages_received.fetch_add(1, Ordering::Relaxed);
self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
}
pub fn update_rtt(&self, device: DeviceId, rtt_ms: u32) {
self.last_rtt.insert(device, rtt_ms);
}
pub fn get_report(&self) -> MetricsReport {
MetricsReport {
uptime_secs: self.start_time.elapsed().as_secs(),
total_sent: self.messages_sent.load(Ordering::Relaxed),
total_received: self.messages_received.load(Ordering::Relaxed),
total_bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
total_bytes_received: self.bytes_received.load(Ordering::Relaxed),
}
}
}
pub struct MetricsReport {
pub uptime_secs: u64,
pub total_sent: u64,
pub total_received: u64,
pub total_bytes_sent: u64,
pub total_bytes_received: u64,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct AnalyticsEvent {
pub timestamp: u64,
pub event_type: String,
pub device_id: String,
pub channel: Option<String>,
pub metadata: std::collections::HashMap<String, String>,
}
impl MetricsCollector {
pub fn export_prometheus(&self) -> String {
let mut report = String::new();
report.push_str("# HELP xlink_messages_sent_total Total number of messages sent\n");
report.push_str("# TYPE xlink_messages_sent_total counter\n");
report.push_str(&format!(
"xlink_messages_sent_total {}\n",
self.messages_sent.load(Ordering::Relaxed)
));
report.push_str("# HELP xlink_bytes_sent_total Total number of bytes sent\n");
report.push_str("# TYPE xlink_bytes_sent_total counter\n");
report.push_str(&format!(
"xlink_bytes_sent_total {}\n",
self.bytes_sent.load(Ordering::Relaxed)
));
for entry in self.channel_usage.iter() {
report.push_str(&format!(
"xlink_channel_usage_total{{channel=\"{:?}\"}} {}\n",
entry.key(),
entry.value().load(Ordering::Relaxed)
));
}
report
}
pub fn record_event(&self, event: AnalyticsEvent) {
log::info!("Analytics Event: {:?}", event);
}
pub fn clear(&self) {
let channel_keys: Vec<_> = self
.channel_usage
.iter()
.map(|entry| *entry.key())
.collect();
for channel_type in channel_keys {
self.channel_usage.remove(&channel_type);
}
let device_keys: Vec<_> = self.last_rtt.iter().map(|entry| *entry.key()).collect();
for device_id in device_keys {
self.last_rtt.remove(&device_id);
}
log::debug!("MetricsCollector: Cleared all metrics data using entry removal");
}
}