use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime};
use super::MetricsSnapshot;
#[derive(Debug)]
pub(crate) struct MetricsCollector {
message_count: AtomicU64,
error_count: AtomicU64,
total_processing_nanos: AtomicU64,
max_processing_nanos: AtomicU64,
last_activity_millis: AtomicU64,
start_instant: Instant,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
message_count: AtomicU64::new(0),
error_count: AtomicU64::new(0),
total_processing_nanos: AtomicU64::new(0),
max_processing_nanos: AtomicU64::new(0),
last_activity_millis: AtomicU64::new(0),
start_instant: Instant::now(),
}
}
#[inline]
pub fn record_message(&self, duration: Duration) {
self.message_count.fetch_add(1, Ordering::Relaxed);
let nanos = duration.as_nanos().min(u64::MAX as u128) as u64;
let _ = self.total_processing_nanos.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|current| Some(current.saturating_add(nanos)),
);
self.max_processing_nanos
.fetch_max(nanos, Ordering::Relaxed);
self.update_last_activity();
}
#[inline]
#[allow(dead_code)]
pub fn record_error(&self) {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
fn update_last_activity(&self) {
let millis = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis()
.min(u64::MAX as u128) as u64;
self.last_activity_millis.store(millis, Ordering::Relaxed);
}
fn get_last_activity(&self) -> Option<SystemTime> {
let millis = self.last_activity_millis.load(Ordering::Relaxed);
if millis == 0 {
None
} else {
SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(millis))
}
}
pub fn snapshot(&self) -> MetricsSnapshot {
let count = self.message_count.load(Ordering::Relaxed);
let total_nanos = self.total_processing_nanos.load(Ordering::Relaxed);
MetricsSnapshot {
message_count: count,
avg_processing_time: if count > 0 {
Duration::from_nanos(total_nanos / count)
} else {
Duration::ZERO
},
max_processing_time: Duration::from_nanos(
self.max_processing_nanos.load(Ordering::Relaxed),
),
error_count: self.error_count.load(Ordering::Relaxed),
uptime: self.start_instant.elapsed(),
last_activity: self.get_last_activity(),
}
}
#[inline]
pub fn message_count(&self) -> u64 {
self.message_count.load(Ordering::Relaxed)
}
#[inline]
pub fn error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
#[inline]
pub fn avg_processing_time(&self) -> Duration {
let count = self.message_count.load(Ordering::Relaxed);
if count > 0 {
let total_nanos = self.total_processing_nanos.load(Ordering::Relaxed);
Duration::from_nanos(total_nanos / count)
} else {
Duration::ZERO
}
}
#[inline]
pub fn max_processing_time(&self) -> Duration {
Duration::from_nanos(self.max_processing_nanos.load(Ordering::Relaxed))
}
#[inline]
pub fn uptime(&self) -> Duration {
self.start_instant.elapsed()
}
#[inline]
pub fn last_activity(&self) -> Option<SystemTime> {
self.get_last_activity()
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
pub(crate) struct MessageProcessingGuard<'a> {
collector: &'a MetricsCollector,
start: Instant,
}
impl<'a> MessageProcessingGuard<'a> {
#[inline]
pub fn new(collector: &'a MetricsCollector) -> Self {
Self {
collector,
start: Instant::now(),
}
}
}
impl Drop for MessageProcessingGuard<'_> {
#[inline]
fn drop(&mut self) {
self.collector.record_message(self.start.elapsed());
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_initial_state() {
let collector = MetricsCollector::new();
let snapshot = collector.snapshot();
assert_eq!(snapshot.message_count, 0);
assert_eq!(snapshot.error_count, 0);
assert_eq!(snapshot.avg_processing_time, Duration::ZERO);
assert_eq!(snapshot.max_processing_time, Duration::ZERO);
assert!(snapshot.last_activity.is_none());
}
#[test]
fn test_record_message() {
let collector = MetricsCollector::new();
collector.record_message(Duration::from_millis(100));
collector.record_message(Duration::from_millis(200));
assert_eq!(collector.message_count(), 2);
assert_eq!(collector.avg_processing_time(), Duration::from_millis(150));
assert_eq!(collector.max_processing_time(), Duration::from_millis(200));
assert!(collector.last_activity().is_some());
}
#[test]
fn test_record_error() {
let collector = MetricsCollector::new();
collector.record_error();
collector.record_error();
assert_eq!(collector.error_count(), 2);
}
#[test]
fn test_guard_records_duration() {
let collector = MetricsCollector::new();
{
let _guard = MessageProcessingGuard::new(&collector);
std::thread::sleep(Duration::from_millis(10));
}
assert_eq!(collector.message_count(), 1);
assert!(collector.max_processing_time() >= Duration::from_millis(10));
}
#[test]
fn test_uptime_increases() {
let collector = MetricsCollector::new();
let uptime1 = collector.uptime();
std::thread::sleep(Duration::from_millis(10));
let uptime2 = collector.uptime();
assert!(uptime2 > uptime1);
}
}