use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Default)]
pub struct Diagnostics {
start_time: Option<Instant>,
events_processed: AtomicU64,
events_dropped_backpressure: AtomicU64,
events_dropped_error: AtomicU64,
sink_errors: AtomicU64,
pre_init_buffer_events: AtomicU64,
pre_init_buffer_overflows: AtomicU64,
database_batch_writes: AtomicU64,
file_writes: AtomicU64,
stdout_writes: AtomicU64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DiagnosticsSnapshot {
pub uptime: Option<Duration>,
pub events_processed: u64,
pub events_dropped_backpressure: u64,
pub events_dropped_error: u64,
pub sink_errors: u64,
pub pre_init_buffer_events: u64,
pub pre_init_buffer_overflows: u64,
pub database_batch_writes: u64,
pub file_writes: u64,
pub stdout_writes: u64,
pub total_events_dropped: u64,
pub success_rate_percent: f64,
}
impl Diagnostics {
pub fn new() -> Self {
Self {
start_time: Some(Instant::now()),
..Default::default()
}
}
pub fn increment_events_processed(&self) {
self.events_processed.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_events_dropped_backpressure(&self) {
self.events_dropped_backpressure
.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_events_dropped_error(&self) {
self.events_dropped_error.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_sink_errors(&self) {
self.sink_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_pre_init_buffer_events(&self) {
self.pre_init_buffer_events.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_pre_init_buffer_overflows(&self) {
self.pre_init_buffer_overflows
.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_database_batch_writes(&self) {
self.database_batch_writes.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_file_writes(&self) {
self.file_writes.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_stdout_writes(&self) {
self.stdout_writes.fetch_add(1, Ordering::Relaxed);
}
pub fn add_events_processed(&self, count: u64) {
self.events_processed.fetch_add(count, Ordering::Relaxed);
}
pub fn snapshot(&self) -> DiagnosticsSnapshot {
let events_processed = self.events_processed.load(Ordering::Relaxed);
let events_dropped_backpressure = self.events_dropped_backpressure.load(Ordering::Relaxed);
let events_dropped_error = self.events_dropped_error.load(Ordering::Relaxed);
let total_events_dropped = events_dropped_backpressure + events_dropped_error;
let success_rate_percent = if events_processed + total_events_dropped > 0 {
(events_processed as f64 / (events_processed + total_events_dropped) as f64) * 100.0
} else {
100.0
};
DiagnosticsSnapshot {
uptime: self.start_time.map(|start| start.elapsed()),
events_processed,
events_dropped_backpressure,
events_dropped_error,
sink_errors: self.sink_errors.load(Ordering::Relaxed),
pre_init_buffer_events: self.pre_init_buffer_events.load(Ordering::Relaxed),
pre_init_buffer_overflows: self.pre_init_buffer_overflows.load(Ordering::Relaxed),
database_batch_writes: self.database_batch_writes.load(Ordering::Relaxed),
file_writes: self.file_writes.load(Ordering::Relaxed),
stdout_writes: self.stdout_writes.load(Ordering::Relaxed),
total_events_dropped,
success_rate_percent,
}
}
pub fn reset(&self) {
self.events_processed.store(0, Ordering::Relaxed);
self.events_dropped_backpressure.store(0, Ordering::Relaxed);
self.events_dropped_error.store(0, Ordering::Relaxed);
self.sink_errors.store(0, Ordering::Relaxed);
self.pre_init_buffer_events.store(0, Ordering::Relaxed);
self.pre_init_buffer_overflows.store(0, Ordering::Relaxed);
self.database_batch_writes.store(0, Ordering::Relaxed);
self.file_writes.store(0, Ordering::Relaxed);
self.stdout_writes.store(0, Ordering::Relaxed);
}
}
static GLOBAL_DIAGNOSTICS: std::sync::OnceLock<Arc<Diagnostics>> = std::sync::OnceLock::new();
pub fn init_diagnostics() -> Arc<Diagnostics> {
GLOBAL_DIAGNOSTICS
.get_or_init(|| Arc::new(Diagnostics::new()))
.clone()
}
pub fn get_diagnostics_instance() -> Option<Arc<Diagnostics>> {
GLOBAL_DIAGNOSTICS.get().cloned()
}
pub fn get_diagnostics() -> DiagnosticsSnapshot {
match GLOBAL_DIAGNOSTICS.get() {
Some(diagnostics) => diagnostics.snapshot(),
None => DiagnosticsSnapshot {
uptime: None,
events_processed: 0,
events_dropped_backpressure: 0,
events_dropped_error: 0,
sink_errors: 0,
pre_init_buffer_events: 0,
pre_init_buffer_overflows: 0,
database_batch_writes: 0,
file_writes: 0,
stdout_writes: 0,
total_events_dropped: 0,
success_rate_percent: 100.0,
},
}
}
pub fn get_diagnostics_snapshot() -> DiagnosticsSnapshot {
get_diagnostics()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_diagnostics_creation() {
let diagnostics = Diagnostics::new();
let snapshot = diagnostics.snapshot();
assert!(snapshot.uptime.is_some());
assert_eq!(snapshot.events_processed, 0);
assert_eq!(snapshot.events_dropped_backpressure, 0);
assert_eq!(snapshot.events_dropped_error, 0);
assert_eq!(snapshot.sink_errors, 0);
assert_eq!(snapshot.success_rate_percent, 100.0);
}
#[test]
fn test_increment_operations() {
let diagnostics = Diagnostics::new();
diagnostics.increment_events_processed();
diagnostics.increment_events_dropped_backpressure();
diagnostics.increment_sink_errors();
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.events_processed, 1);
assert_eq!(snapshot.events_dropped_backpressure, 1);
assert_eq!(snapshot.sink_errors, 1);
assert_eq!(snapshot.total_events_dropped, 1);
}
#[test]
fn test_success_rate_calculation() {
let diagnostics = Diagnostics::new();
diagnostics.add_events_processed(8);
diagnostics.increment_events_dropped_backpressure();
diagnostics.increment_events_dropped_error();
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.events_processed, 8);
assert_eq!(snapshot.total_events_dropped, 2);
assert_eq!(snapshot.success_rate_percent, 80.0);
}
#[test]
fn test_reset_functionality() {
let diagnostics = Diagnostics::new();
diagnostics.increment_events_processed();
diagnostics.increment_sink_errors();
let snapshot_before = diagnostics.snapshot();
assert_eq!(snapshot_before.events_processed, 1);
assert_eq!(snapshot_before.sink_errors, 1);
diagnostics.reset();
let snapshot_after = diagnostics.snapshot();
assert_eq!(snapshot_after.events_processed, 0);
assert_eq!(snapshot_after.sink_errors, 0);
}
#[test]
fn test_concurrent_access() {
let diagnostics = Arc::new(Diagnostics::new());
let mut handles = vec![];
for _ in 0..10 {
let diagnostics_clone = diagnostics.clone();
let handle = thread::spawn(move || {
for _ in 0..100 {
diagnostics_clone.increment_events_processed();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.events_processed, 1000);
}
#[test]
fn test_uptime_measurement() {
let diagnostics = Diagnostics::new();
thread::sleep(Duration::from_millis(10));
let snapshot = diagnostics.snapshot();
assert!(snapshot.uptime.is_some());
assert!(snapshot.uptime.unwrap() >= Duration::from_millis(10));
}
#[test]
fn test_global_diagnostics_initialization() {
let diagnostics1 = init_diagnostics();
let diagnostics2 = init_diagnostics();
assert!(Arc::ptr_eq(&diagnostics1, &diagnostics2));
let retrieved = get_diagnostics();
assert!(retrieved.uptime.is_some());
}
#[test]
fn test_diagnostics_snapshot_convenience_function() {
if let Some(diagnostics) = get_diagnostics_instance() {
diagnostics.reset();
}
let snapshot = get_diagnostics_snapshot();
assert_eq!(snapshot.events_processed, 0);
assert_eq!(snapshot.success_rate_percent, 100.0);
}
#[test]
fn test_edge_cases() {
let diagnostics = Diagnostics::new();
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.success_rate_percent, 100.0);
diagnostics.increment_events_dropped_error();
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.success_rate_percent, 0.0);
}
#[test]
fn test_all_counter_types() {
let diagnostics = Diagnostics::new();
diagnostics.increment_events_processed();
diagnostics.increment_events_dropped_backpressure();
diagnostics.increment_events_dropped_error();
diagnostics.increment_sink_errors();
diagnostics.increment_pre_init_buffer_events();
diagnostics.increment_pre_init_buffer_overflows();
diagnostics.increment_database_batch_writes();
diagnostics.increment_file_writes();
diagnostics.increment_stdout_writes();
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.events_processed, 1);
assert_eq!(snapshot.events_dropped_backpressure, 1);
assert_eq!(snapshot.events_dropped_error, 1);
assert_eq!(snapshot.sink_errors, 1);
assert_eq!(snapshot.pre_init_buffer_events, 1);
assert_eq!(snapshot.pre_init_buffer_overflows, 1);
assert_eq!(snapshot.database_batch_writes, 1);
assert_eq!(snapshot.file_writes, 1);
assert_eq!(snapshot.stdout_writes, 1);
}
}