use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use super::metrics::{PhaseTimer, SyncMetricsCollector};
const KNOWN_PROTOCOLS: &[&str] = &[
"None",
"Snapshot",
"HashComparison",
"DeltaSync",
"SubtreePrefetch",
"LevelWise",
"BloomFilter",
];
const KNOWN_CRDT_TYPES: &[&str] = &[
"GCounter",
"PnCounter",
"LwwRegister",
"GSet",
"ORSet",
"LwwMap",
"unknown",
];
fn sanitize_protocol(protocol: &str) -> &'static str {
KNOWN_PROTOCOLS
.iter()
.find(|&&p| p == protocol)
.copied()
.unwrap_or("unknown")
}
fn sanitize_crdt_type(crdt_type: &str) -> &'static str {
KNOWN_CRDT_TYPES
.iter()
.find(|&&t| t == crdt_type)
.copied()
.unwrap_or("unknown")
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct ProtocolLabels {
protocol: String,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct CrdtLabels {
crdt_type: String,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct PhaseLabels {
phase: String,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct OutcomeLabels {
protocol: String,
outcome: String,
}
#[derive(Debug)]
pub struct PrometheusSyncMetrics {
messages_sent: Family<ProtocolLabels, Counter>,
bytes_sent: Family<ProtocolLabels, Counter>,
round_trips: Family<ProtocolLabels, Counter>,
entities_transferred: Counter<u64, AtomicU64>,
merges_total: Family<CrdtLabels, Counter>,
comparisons_total: Counter<u64, AtomicU64>,
phase_duration_seconds: Family<PhaseLabels, Histogram>,
snapshot_blocked_total: Counter<u64, AtomicU64>,
verification_failures_total: Counter<u64, AtomicU64>,
lww_fallback_total: Counter<u64, AtomicU64>,
buffer_drops_total: Counter<u64, AtomicU64>,
sync_duration_seconds: Family<OutcomeLabels, Histogram>,
sync_attempts_total: Family<ProtocolLabels, Counter>,
sync_successes_total: Family<ProtocolLabels, Counter>,
sync_failures_total: Family<ProtocolLabels, Counter>,
protocol_selections_total: Family<ProtocolLabels, Counter>,
}
impl PrometheusSyncMetrics {
pub fn new(registry: &mut Registry) -> Self {
let metrics = Self {
messages_sent: Family::default(),
bytes_sent: Family::default(),
round_trips: Family::default(),
entities_transferred: Counter::default(),
merges_total: Family::default(),
comparisons_total: Counter::default(),
phase_duration_seconds: Family::new_with_constructor(|| {
Histogram::new(exponential_buckets(0.001, 2.0, 15))
}),
snapshot_blocked_total: Counter::default(),
verification_failures_total: Counter::default(),
lww_fallback_total: Counter::default(),
buffer_drops_total: Counter::default(),
sync_duration_seconds: Family::new_with_constructor(|| {
Histogram::new(exponential_buckets(0.01, 2.0, 15))
}),
sync_attempts_total: Family::default(),
sync_successes_total: Family::default(),
sync_failures_total: Family::default(),
protocol_selections_total: Family::default(),
};
registry.register(
"sync_messages_sent",
"Total sync protocol messages sent",
metrics.messages_sent.clone(),
);
registry.register(
"sync_bytes_sent",
"Total sync protocol bytes sent",
metrics.bytes_sent.clone(),
);
registry.register(
"sync_round_trips",
"Total sync round trips",
metrics.round_trips.clone(),
);
registry.register(
"sync_entities_transferred",
"Total entities transferred during sync",
metrics.entities_transferred.clone(),
);
registry.register(
"sync_merges",
"Total CRDT merge operations",
metrics.merges_total.clone(),
);
registry.register(
"sync_comparisons",
"Total entity hash comparisons",
metrics.comparisons_total.clone(),
);
registry.register(
"sync_phase_duration_seconds",
"Duration of sync phases in seconds",
metrics.phase_duration_seconds.clone(),
);
registry.register(
"sync_snapshot_blocked",
"Snapshot attempts blocked on initialized nodes (I5 protection)",
metrics.snapshot_blocked_total.clone(),
);
registry.register(
"sync_verification_failures",
"Snapshot verification failures (I7 violations)",
metrics.verification_failures_total.clone(),
);
registry.register(
"sync_lww_fallback",
"LWW fallback events due to missing CRDT type metadata",
metrics.lww_fallback_total.clone(),
);
registry.register(
"sync_buffer_drops",
"Delta buffer drop events (I6 violation risk)",
metrics.buffer_drops_total.clone(),
);
registry.register(
"sync_duration_seconds",
"Duration of sync sessions in seconds",
metrics.sync_duration_seconds.clone(),
);
registry.register(
"sync_attempts",
"Total sync attempts by protocol",
metrics.sync_attempts_total.clone(),
);
registry.register(
"sync_successes",
"Total successful syncs by protocol",
metrics.sync_successes_total.clone(),
);
registry.register(
"sync_failures",
"Total failed syncs by protocol",
metrics.sync_failures_total.clone(),
);
registry.register(
"sync_protocol_selections",
"Total protocol selection decisions by protocol",
metrics.protocol_selections_total.clone(),
);
metrics
}
}
impl SyncMetricsCollector for PrometheusSyncMetrics {
fn record_message_sent(&self, protocol: &str, bytes: usize) {
let labels = ProtocolLabels {
protocol: sanitize_protocol(protocol).to_string(),
};
self.messages_sent.get_or_create(&labels).inc();
self.bytes_sent.get_or_create(&labels).inc_by(bytes as u64);
}
fn record_round_trip(&self, protocol: &str) {
let labels = ProtocolLabels {
protocol: sanitize_protocol(protocol).to_string(),
};
self.round_trips.get_or_create(&labels).inc();
}
fn record_entities_transferred(&self, count: usize) {
self.entities_transferred.inc_by(count as u64);
}
fn record_merge(&self, crdt_type: &str) {
let labels = CrdtLabels {
crdt_type: sanitize_crdt_type(crdt_type).to_string(),
};
self.merges_total.get_or_create(&labels).inc();
}
fn record_comparison(&self) {
self.comparisons_total.inc();
}
fn record_phase_complete(&self, timer: PhaseTimer) {
let labels = PhaseLabels {
phase: timer.phase().to_string(),
};
self.phase_duration_seconds
.get_or_create(&labels)
.observe(timer.elapsed().as_secs_f64());
}
fn record_snapshot_blocked(&self) {
self.snapshot_blocked_total.inc();
}
fn record_verification_failure(&self) {
self.verification_failures_total.inc();
}
fn record_lww_fallback(&self) {
self.lww_fallback_total.inc();
}
fn record_buffer_drop(&self) {
self.buffer_drops_total.inc();
}
fn record_sync_start(&self, _context_id: &str, protocol: &str, _trigger: &str) {
let labels = ProtocolLabels {
protocol: sanitize_protocol(protocol).to_string(),
};
self.sync_attempts_total.get_or_create(&labels).inc();
}
fn record_sync_complete(
&self,
_context_id: &str,
protocol: &str,
duration: Duration,
_entities: usize,
) {
let sanitized = sanitize_protocol(protocol);
let labels = OutcomeLabels {
protocol: sanitized.to_string(),
outcome: "success".to_string(),
};
self.sync_duration_seconds
.get_or_create(&labels)
.observe(duration.as_secs_f64());
let success_labels = ProtocolLabels {
protocol: sanitized.to_string(),
};
self.sync_successes_total
.get_or_create(&success_labels)
.inc();
}
fn record_sync_failure(&self, _context_id: &str, protocol: &str, _reason: &str) {
let labels = ProtocolLabels {
protocol: sanitize_protocol(protocol).to_string(),
};
self.sync_failures_total.get_or_create(&labels).inc();
}
fn record_protocol_selected(&self, protocol: &str, _reason: &str, _divergence: f64) {
let labels = ProtocolLabels {
protocol: sanitize_protocol(protocol).to_string(),
};
self.protocol_selections_total.get_or_create(&labels).inc();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prometheus_metrics_creation() {
let mut registry = Registry::default();
let _metrics = PrometheusSyncMetrics::new(&mut registry);
let mut buffer = String::new();
prometheus_client::encoding::text::encode(&mut buffer, ®istry).unwrap();
assert!(buffer.contains("sync_messages_sent"));
assert!(buffer.contains("sync_snapshot_blocked"));
assert!(buffer.contains("sync_buffer_drops"));
}
#[test]
fn test_prometheus_metrics_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<PrometheusSyncMetrics>();
}
#[test]
fn test_prometheus_metrics_recording() {
let mut registry = Registry::default();
let metrics = PrometheusSyncMetrics::new(&mut registry);
metrics.record_message_sent("HashComparison", 1024);
metrics.record_round_trip("HashComparison");
metrics.record_entities_transferred(10);
metrics.record_merge("GCounter");
metrics.record_comparison();
metrics.record_snapshot_blocked();
metrics.record_verification_failure();
metrics.record_lww_fallback();
metrics.record_buffer_drop();
let timer = metrics.start_phase("test_phase");
std::thread::sleep(std::time::Duration::from_millis(1));
metrics.record_phase_complete(timer);
metrics.record_sync_start("ctx-123", "HashComparison", "timer");
metrics.record_sync_complete("ctx-123", "HashComparison", Duration::from_millis(100), 50);
metrics.record_sync_failure("ctx-456", "Snapshot", "timeout");
metrics.record_protocol_selected("HashComparison", "test", 0.05);
let mut buffer = String::new();
prometheus_client::encoding::text::encode(&mut buffer, ®istry).unwrap();
assert!(!buffer.is_empty());
}
}