use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
pub struct IndexerMetrics {
blocks_processed: AtomicU64,
_blocks_per_second: Mutex<f64>,
last_processed_block: AtomicU64,
chain_head_block: AtomicU64,
events_processed: AtomicU64,
_events_per_second: Mutex<f64>,
reorgs_detected: AtomicU64,
total_reorg_depth: AtomicU64,
max_reorg_depth: AtomicU64,
handler_calls: AtomicU64,
handler_total_latency_us: AtomicU64,
handler_max_latency_us: AtomicU64,
rpc_calls: AtomicU64,
rpc_errors: AtomicU64,
rpc_total_latency_us: AtomicU64,
last_checkpoint_block: AtomicU64,
checkpoints_saved: AtomicU64,
started_at: AtomicU64,
last_block_at: AtomicU64,
}
impl IndexerMetrics {
pub fn new() -> Self {
Self {
blocks_processed: AtomicU64::new(0),
_blocks_per_second: Mutex::new(0.0),
last_processed_block: AtomicU64::new(0),
chain_head_block: AtomicU64::new(0),
events_processed: AtomicU64::new(0),
_events_per_second: Mutex::new(0.0),
reorgs_detected: AtomicU64::new(0),
total_reorg_depth: AtomicU64::new(0),
max_reorg_depth: AtomicU64::new(0),
handler_calls: AtomicU64::new(0),
handler_total_latency_us: AtomicU64::new(0),
handler_max_latency_us: AtomicU64::new(0),
rpc_calls: AtomicU64::new(0),
rpc_errors: AtomicU64::new(0),
rpc_total_latency_us: AtomicU64::new(0),
last_checkpoint_block: AtomicU64::new(0),
checkpoints_saved: AtomicU64::new(0),
started_at: AtomicU64::new(0),
last_block_at: AtomicU64::new(0),
}
}
pub fn set_started(&self, timestamp: u64) {
self.started_at.store(timestamp, Ordering::Relaxed);
}
pub fn record_block(&self, block_number: u64, event_count: u64, timestamp: u64) {
self.blocks_processed.fetch_add(1, Ordering::Relaxed);
self.last_processed_block
.store(block_number, Ordering::Relaxed);
self.events_processed
.fetch_add(event_count, Ordering::Relaxed);
self.last_block_at.store(timestamp, Ordering::Relaxed);
}
pub fn record_reorg(&self, depth: u64) {
self.reorgs_detected.fetch_add(1, Ordering::Relaxed);
self.total_reorg_depth.fetch_add(depth, Ordering::Relaxed);
self.max_reorg_depth.fetch_max(depth, Ordering::Relaxed);
}
pub fn record_handler_call(&self, latency: Duration) {
let us = latency.as_micros() as u64;
self.handler_calls.fetch_add(1, Ordering::Relaxed);
self.handler_total_latency_us
.fetch_add(us, Ordering::Relaxed);
self.handler_max_latency_us.fetch_max(us, Ordering::Relaxed);
}
pub fn record_rpc_call(&self, latency: Duration, success: bool) {
let us = latency.as_micros() as u64;
self.rpc_calls.fetch_add(1, Ordering::Relaxed);
self.rpc_total_latency_us.fetch_add(us, Ordering::Relaxed);
if !success {
self.rpc_errors.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_checkpoint(&self, block_number: u64) {
self.last_checkpoint_block
.store(block_number, Ordering::Relaxed);
self.checkpoints_saved.fetch_add(1, Ordering::Relaxed);
}
pub fn set_chain_head(&self, block_number: u64) {
self.chain_head_block.store(block_number, Ordering::Relaxed);
}
pub fn block_lag(&self) -> u64 {
let head = self.chain_head_block.load(Ordering::Relaxed);
let processed = self.last_processed_block.load(Ordering::Relaxed);
head.saturating_sub(processed)
}
pub fn avg_handler_latency(&self) -> Duration {
let calls = self.handler_calls.load(Ordering::Relaxed);
if calls == 0 {
return Duration::ZERO;
}
let total = self.handler_total_latency_us.load(Ordering::Relaxed);
Duration::from_micros(total / calls)
}
pub fn avg_reorg_depth(&self) -> f64 {
let count = self.reorgs_detected.load(Ordering::Relaxed);
if count == 0 {
return 0.0;
}
self.total_reorg_depth.load(Ordering::Relaxed) as f64 / count as f64
}
pub fn rpc_success_rate(&self) -> f64 {
let total = self.rpc_calls.load(Ordering::Relaxed);
if total == 0 {
return 1.0;
}
let errors = self.rpc_errors.load(Ordering::Relaxed);
(total - errors) as f64 / total as f64
}
pub fn snapshot(&self) -> MetricsSnapshot {
let handler_calls = self.handler_calls.load(Ordering::Relaxed);
let handler_total = self.handler_total_latency_us.load(Ordering::Relaxed);
let rpc_calls = self.rpc_calls.load(Ordering::Relaxed);
let rpc_errors = self.rpc_errors.load(Ordering::Relaxed);
let rpc_total = self.rpc_total_latency_us.load(Ordering::Relaxed);
let reorgs = self.reorgs_detected.load(Ordering::Relaxed);
let total_depth = self.total_reorg_depth.load(Ordering::Relaxed);
MetricsSnapshot {
blocks_processed: self.blocks_processed.load(Ordering::Relaxed),
last_processed_block: self.last_processed_block.load(Ordering::Relaxed),
chain_head_block: self.chain_head_block.load(Ordering::Relaxed),
block_lag: self.block_lag(),
events_processed: self.events_processed.load(Ordering::Relaxed),
reorgs_detected: reorgs,
avg_reorg_depth: if reorgs > 0 {
total_depth as f64 / reorgs as f64
} else {
0.0
},
max_reorg_depth: self.max_reorg_depth.load(Ordering::Relaxed),
handler_calls,
avg_handler_latency_us: if handler_calls > 0 {
handler_total / handler_calls
} else {
0
},
max_handler_latency_us: self.handler_max_latency_us.load(Ordering::Relaxed),
rpc_calls,
rpc_errors,
avg_rpc_latency_us: if rpc_calls > 0 {
rpc_total / rpc_calls
} else {
0
},
rpc_success_rate: if rpc_calls > 0 {
(rpc_calls - rpc_errors) as f64 / rpc_calls as f64
} else {
1.0
},
checkpoints_saved: self.checkpoints_saved.load(Ordering::Relaxed),
last_checkpoint_block: self.last_checkpoint_block.load(Ordering::Relaxed),
}
}
}
impl Default for IndexerMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MetricsSnapshot {
pub blocks_processed: u64,
pub last_processed_block: u64,
pub chain_head_block: u64,
pub block_lag: u64,
pub events_processed: u64,
pub reorgs_detected: u64,
pub avg_reorg_depth: f64,
pub max_reorg_depth: u64,
pub handler_calls: u64,
pub avg_handler_latency_us: u64,
pub max_handler_latency_us: u64,
pub rpc_calls: u64,
pub rpc_errors: u64,
pub avg_rpc_latency_us: u64,
pub rpc_success_rate: f64,
pub checkpoints_saved: u64,
pub last_checkpoint_block: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn record_block_updates_counters() {
let m = IndexerMetrics::new();
m.record_block(100, 5, 1000);
m.record_block(101, 3, 1012);
m.record_block(102, 10, 1024);
let snap = m.snapshot();
assert_eq!(snap.blocks_processed, 3);
assert_eq!(snap.last_processed_block, 102);
assert_eq!(snap.events_processed, 18); }
#[test]
fn block_lag() {
let m = IndexerMetrics::new();
m.set_chain_head(1000);
m.record_block(990, 0, 1000);
assert_eq!(m.block_lag(), 10);
m.record_block(1000, 0, 1120);
assert_eq!(m.block_lag(), 0);
m.record_block(1005, 0, 1180);
assert_eq!(m.block_lag(), 0); }
#[test]
fn reorg_metrics() {
let m = IndexerMetrics::new();
m.record_reorg(2);
m.record_reorg(4);
m.record_reorg(6);
let snap = m.snapshot();
assert_eq!(snap.reorgs_detected, 3);
assert_eq!(snap.max_reorg_depth, 6);
assert!((m.avg_reorg_depth() - 4.0).abs() < f64::EPSILON); }
#[test]
fn handler_latency() {
let m = IndexerMetrics::new();
m.record_handler_call(Duration::from_micros(100));
m.record_handler_call(Duration::from_micros(200));
m.record_handler_call(Duration::from_micros(300));
assert_eq!(m.avg_handler_latency(), Duration::from_micros(200));
let snap = m.snapshot();
assert_eq!(snap.handler_calls, 3);
assert_eq!(snap.avg_handler_latency_us, 200);
assert_eq!(snap.max_handler_latency_us, 300);
}
#[test]
fn rpc_success_rate() {
let m = IndexerMetrics::new();
assert!((m.rpc_success_rate() - 1.0).abs() < f64::EPSILON);
for _ in 0..8 {
m.record_rpc_call(Duration::from_micros(50), true);
}
for _ in 0..2 {
m.record_rpc_call(Duration::from_micros(50), false);
}
assert!((m.rpc_success_rate() - 0.8).abs() < f64::EPSILON);
let snap = m.snapshot();
assert_eq!(snap.rpc_calls, 10);
assert_eq!(snap.rpc_errors, 2);
}
#[test]
fn snapshot_serialization() {
let m = IndexerMetrics::new();
m.set_started(1000);
m.record_block(100, 5, 1012);
m.set_chain_head(110);
m.record_reorg(3);
m.record_handler_call(Duration::from_micros(150));
m.record_rpc_call(Duration::from_micros(200), true);
m.record_checkpoint(100);
let snap = m.snapshot();
let json = serde_json::to_string(&snap).expect("snapshot must serialize to JSON");
assert!(json.contains("\"blocks_processed\":1"));
assert!(json.contains("\"block_lag\":10"));
assert!(json.contains("\"reorgs_detected\":1"));
assert!(json.contains("\"checkpoints_saved\":1"));
}
}