use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
pub struct V2WALMetrics {
pub(crate) counters: Arc<Mutex<WALPerformanceCounters>>,
pub(crate) latency_histogram:
Arc<Mutex<crate::backend::native::v2::wal::metrics::aggregation::LatencyHistogram>>,
pub(crate) throughput_tracker:
Arc<Mutex<crate::backend::native::v2::wal::metrics::aggregation::ThroughputTracker>>,
pub(crate) resource_tracker:
Arc<Mutex<crate::backend::native::v2::wal::metrics::reporting::ResourceTracker>>,
pub(crate) cluster_metrics:
Arc<Mutex<crate::backend::native::v2::wal::metrics::reporting::ClusterPerformanceMetrics>>,
pub(crate) error_tracker:
Arc<Mutex<crate::backend::native::v2::wal::metrics::reporting::ErrorTracker>>,
pub(crate) global_counters: GlobalCounters,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct WALPerformanceCounters {
pub records_processed: u64,
pub bytes_transferred: u64,
pub flush_operations: u64,
pub checkpoint_operations: u64,
pub recovery_operations: u64,
pub avg_write_latency_us: u64,
pub avg_read_latency_us: u64,
pub avg_flush_latency_us: u64,
pub buffer_utilization_percent: f64,
pub cluster_operations: HashMap<i64, ClusterOperationCounters>,
pub edge_operations: EdgeOperationMetrics,
pub node_operations: NodeOperationMetrics,
pub free_space_operations: FreeSpaceOperationMetrics,
pub string_table_operations: StringTableOperationMetrics,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ClusterOperationCounters {
pub creates: u64,
pub reads: u64,
pub updates: u64,
pub bytes_processed: u64,
pub avg_latency_us: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EdgeOperationMetrics {
pub total_inserts: u64,
pub total_updates: u64,
pub total_deletions: u64,
pub avg_record_size: f64,
pub avg_insertion_latency_us: u64,
pub avg_update_latency_us: u64,
pub cluster_affinity_hit_rate: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NodeOperationMetrics {
pub total_inserts: u64,
pub total_updates: u64,
pub total_deletions: u64,
pub avg_record_size: f64,
pub avg_insertion_latency_us: u64,
pub avg_update_latency_us: u64,
pub io_locality_score: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FreeSpaceOperationMetrics {
pub total_allocations: u64,
pub total_deallocations: u64,
pub avg_allocation_size: u64,
pub efficiency_percent: f64,
pub avg_allocation_latency_us: u64,
pub avg_deallocation_latency_us: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StringTableOperationMetrics {
pub total_insertions: u64,
pub avg_string_length: f64,
pub hit_rate_percent: f64,
pub compression_ratio: f64,
pub avg_insertion_latency_us: u64,
pub avg_lookup_latency_us: u64,
}
#[derive(Debug)]
pub struct GlobalCounters {
pub records_written: AtomicU64,
pub records_read: AtomicU64,
pub bytes_written: AtomicU64,
pub bytes_read: AtomicU64,
pub active_operations: AtomicUsize,
}
impl V2WALMetrics {
pub fn new() -> Self {
Self {
counters: Arc::new(Mutex::new(WALPerformanceCounters::default())),
latency_histogram: Arc::new(Mutex::new(
crate::backend::native::v2::wal::metrics::aggregation::LatencyHistogram::new()
)),
throughput_tracker: Arc::new(Mutex::new(
crate::backend::native::v2::wal::metrics::aggregation::ThroughputTracker::new()
)),
resource_tracker: Arc::new(Mutex::new(
crate::backend::native::v2::wal::metrics::reporting::ResourceTracker::default()
)),
cluster_metrics: Arc::new(Mutex::new(
crate::backend::native::v2::wal::metrics::reporting::ClusterPerformanceMetrics::default()
)),
error_tracker: Arc::new(Mutex::new(
crate::backend::native::v2::wal::metrics::reporting::ErrorTracker::new()
)),
global_counters: GlobalCounters::default(),
}
}
pub fn get_counters(&self) -> WALPerformanceCounters {
self.counters.lock().clone()
}
pub fn get_latency_histogram(
&self,
) -> crate::backend::native::v2::wal::metrics::aggregation::LatencyHistogram {
self.latency_histogram.lock().clone()
}
pub fn get_throughput_tracker(
&self,
) -> crate::backend::native::v2::wal::metrics::aggregation::ThroughputTracker {
self.throughput_tracker.lock().clone()
}
pub fn get_resource_tracker(
&self,
) -> crate::backend::native::v2::wal::metrics::reporting::ResourceTracker {
self.resource_tracker.lock().clone()
}
pub fn get_cluster_metrics(
&self,
) -> crate::backend::native::v2::wal::metrics::reporting::ClusterPerformanceMetrics {
self.cluster_metrics.lock().clone()
}
pub fn get_error_tracker(
&self,
) -> crate::backend::native::v2::wal::metrics::reporting::ErrorTracker {
self.error_tracker.lock().clone()
}
pub fn get_global_counters(&self) -> (u64, u64, u64, u64, usize) {
(
self.global_counters.records_written.load(Ordering::Relaxed),
self.global_counters.records_read.load(Ordering::Relaxed),
self.global_counters.bytes_written.load(Ordering::Relaxed),
self.global_counters.bytes_read.load(Ordering::Relaxed),
self.global_counters
.active_operations
.load(Ordering::Relaxed),
)
}
pub fn reset(&self) {
{
let mut counters = self.counters.lock();
*counters = WALPerformanceCounters::default();
}
{
let mut histogram = self.latency_histogram.lock();
histogram.reset();
}
{
let mut tracker = self.throughput_tracker.lock();
tracker.reset();
}
{
let mut resource_tracker = self.resource_tracker.lock();
resource_tracker.reset();
}
{
let mut cluster_metrics = self.cluster_metrics.lock();
cluster_metrics.reset();
}
{
let mut error_tracker = self.error_tracker.lock();
error_tracker.reset();
}
self.global_counters
.records_written
.store(0, Ordering::Relaxed);
self.global_counters
.records_read
.store(0, Ordering::Relaxed);
self.global_counters
.bytes_written
.store(0, Ordering::Relaxed);
self.global_counters.bytes_read.store(0, Ordering::Relaxed);
self.global_counters
.active_operations
.store(0, Ordering::Relaxed);
}
}
impl Default for GlobalCounters {
fn default() -> Self {
Self {
records_written: AtomicU64::new(0),
records_read: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
active_operations: AtomicUsize::new(0),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_v2_wal_metrics_creation() {
let metrics = V2WALMetrics::new();
let counters = metrics.get_counters();
assert_eq!(counters.records_processed, 0);
assert_eq!(counters.bytes_transferred, 0);
}
#[test]
fn test_performance_counters_default() {
let counters = WALPerformanceCounters::default();
assert_eq!(counters.records_processed, 0);
assert_eq!(counters.bytes_transferred, 0);
assert_eq!(counters.flush_operations, 0);
}
#[test]
fn test_cluster_operation_counters_default() {
let cluster_ops = ClusterOperationCounters::default();
assert_eq!(cluster_ops.creates, 0);
assert_eq!(cluster_ops.reads, 0);
assert_eq!(cluster_ops.updates, 0);
assert_eq!(cluster_ops.bytes_processed, 0);
assert_eq!(cluster_ops.avg_latency_us, 0);
}
#[test]
fn test_edge_operation_metrics_default() {
let edge_ops = EdgeOperationMetrics::default();
assert_eq!(edge_ops.total_inserts, 0);
assert_eq!(edge_ops.total_updates, 0);
assert_eq!(edge_ops.total_deletions, 0);
assert_eq!(edge_ops.avg_record_size, 0.0);
}
#[test]
fn test_node_operation_metrics_default() {
let node_ops = NodeOperationMetrics::default();
assert_eq!(node_ops.total_inserts, 0);
assert_eq!(node_ops.total_updates, 0);
assert_eq!(node_ops.total_deletions, 0);
assert_eq!(node_ops.io_locality_score, 0.0);
}
#[test]
fn test_global_counters_atomic_operations() {
let counters = GlobalCounters::default();
counters.records_written.store(100, Ordering::Relaxed);
counters.records_read.store(50, Ordering::Relaxed);
assert_eq!(counters.records_written.load(Ordering::Relaxed), 100);
assert_eq!(counters.records_read.load(Ordering::Relaxed), 50);
}
#[test]
fn test_v2_wal_metrics_reset() {
let metrics = V2WALMetrics::new();
let counters = metrics.get_counters();
assert_eq!(counters.records_processed, 0);
metrics.reset();
let counters_after = metrics.get_counters();
assert_eq!(counters_after.records_processed, 0);
let global_counters = metrics.get_global_counters();
assert_eq!(global_counters.0, 0); assert_eq!(global_counters.1, 0); }
}