use std::sync::Mutex;
use std::time::Duration;
use calimero_node::sync::metrics::{PhaseTimer, SyncMetricsCollector};
use super::metrics::SimMetrics;
#[derive(Debug)]
pub struct SimMetricsCollector {
metrics: Mutex<SimMetrics>,
}
impl Default for SimMetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl SimMetricsCollector {
pub fn new() -> Self {
Self {
metrics: Mutex::new(SimMetrics::default()),
}
}
pub fn take_metrics(&self) -> SimMetrics {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
std::mem::take(&mut *guard)
}
pub fn snapshot(&self) -> SimMetrics {
self.metrics.lock().expect("metrics lock poisoned").clone()
}
pub fn reset(&self) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.reset();
}
pub fn merge(&self, other: &SimMetrics) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.merge(other);
}
}
impl SyncMetricsCollector for SimMetricsCollector {
fn record_message_sent(&self, _protocol: &str, bytes: usize) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.protocol.record_message(bytes);
}
fn record_round_trip(&self, _protocol: &str) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.protocol.record_round_trip();
}
fn record_entities_transferred(&self, count: usize) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.protocol.entities_transferred += count as u64;
}
fn record_merge(&self, _crdt_type: &str) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.protocol.record_merge();
}
fn record_comparison(&self) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.protocol.record_comparison();
}
fn record_phase_complete(&self, _timer: PhaseTimer) {
}
fn record_snapshot_blocked(&self) {
}
fn record_verification_failure(&self) {
}
fn record_lww_fallback(&self) {
}
fn record_buffer_drop(&self) {
let mut guard = self.metrics.lock().expect("metrics lock poisoned");
guard.effects.record_buffer_drop();
}
fn record_sync_start(&self, _context_id: &str, _protocol: &str, _trigger: &str) {
}
fn record_sync_complete(
&self,
_context_id: &str,
_protocol: &str,
_duration: Duration,
_entities: usize,
) {
}
fn record_sync_failure(&self, _context_id: &str, _protocol: &str, _reason: &str) {
}
fn record_protocol_selected(&self, _protocol: &str, _reason: &str, _divergence: f64) {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collector_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<SimMetricsCollector>();
}
#[test]
fn test_record_message_sent() {
let collector = SimMetricsCollector::new();
collector.record_message_sent("HashComparison", 1024);
collector.record_message_sent("HashComparison", 512);
let metrics = collector.snapshot();
assert_eq!(metrics.protocol.messages_sent, 2);
assert_eq!(metrics.protocol.payload_bytes, 1536);
}
#[test]
fn test_record_round_trip() {
let collector = SimMetricsCollector::new();
collector.record_round_trip("HashComparison");
collector.record_round_trip("HashComparison");
let metrics = collector.snapshot();
assert_eq!(metrics.protocol.round_trips, 2);
}
#[test]
fn test_record_entities_transferred() {
let collector = SimMetricsCollector::new();
collector.record_entities_transferred(5);
collector.record_entities_transferred(3);
let metrics = collector.snapshot();
assert_eq!(metrics.protocol.entities_transferred, 8);
}
#[test]
fn test_record_merge() {
let collector = SimMetricsCollector::new();
collector.record_merge("GCounter");
collector.record_merge("LwwRegister");
let metrics = collector.snapshot();
assert_eq!(metrics.protocol.merges_performed, 2);
}
#[test]
fn test_record_comparison() {
let collector = SimMetricsCollector::new();
collector.record_comparison();
collector.record_comparison();
collector.record_comparison();
let metrics = collector.snapshot();
assert_eq!(metrics.protocol.entities_compared, 3);
}
#[test]
fn test_record_buffer_drop() {
let collector = SimMetricsCollector::new();
collector.record_buffer_drop();
collector.record_buffer_drop();
let metrics = collector.snapshot();
assert_eq!(metrics.effects.buffer_drops, 2);
}
#[test]
fn test_take_metrics_resets() {
let collector = SimMetricsCollector::new();
collector.record_message_sent("test", 100);
let metrics1 = collector.take_metrics();
assert_eq!(metrics1.protocol.messages_sent, 1);
let metrics2 = collector.snapshot();
assert_eq!(metrics2.protocol.messages_sent, 0);
}
#[test]
fn test_reset() {
let collector = SimMetricsCollector::new();
collector.record_message_sent("test", 100);
collector.record_round_trip("test");
collector.reset();
let metrics = collector.snapshot();
assert_eq!(metrics.protocol.messages_sent, 0);
assert_eq!(metrics.protocol.round_trips, 0);
}
}