use std::collections::HashMap;
use std::fmt::Write;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
pub const MAX_TRACKED_CHANNELS: usize = 1024;
pub const OVERFLOW_CHANNEL_LABEL: &str = "__overflow__";
const LAG_NOT_OBSERVED: u64 = u64::MAX;
const LAG_SATURATED_MICROS: u64 = LAG_NOT_OBSERVED - 1;
#[derive(Debug)]
pub struct ChannelMetricsAtomic {
pub leader_lag_micros: AtomicU64,
pub replica_lag_micros: AtomicU64,
pub sync_bytes_total: AtomicU64,
pub leader_changes_total: AtomicU64,
pub under_capacity_total: AtomicU64,
pub skip_ahead_total: AtomicU64,
pub election_thrash_total: AtomicU64,
pub witness_withdrawals_total: AtomicU64,
pub announce_divergence_total: AtomicU64,
}
impl Default for ChannelMetricsAtomic {
fn default() -> Self {
Self::new()
}
}
impl ChannelMetricsAtomic {
pub fn new() -> Self {
Self {
leader_lag_micros: AtomicU64::new(LAG_NOT_OBSERVED),
replica_lag_micros: AtomicU64::new(LAG_NOT_OBSERVED),
sync_bytes_total: AtomicU64::new(0),
leader_changes_total: AtomicU64::new(0),
under_capacity_total: AtomicU64::new(0),
skip_ahead_total: AtomicU64::new(0),
election_thrash_total: AtomicU64::new(0),
witness_withdrawals_total: AtomicU64::new(0),
announce_divergence_total: AtomicU64::new(0),
}
}
pub fn record_leader_lag(&self, lag: std::time::Duration) {
let micros = u64::try_from(lag.as_micros()).unwrap_or(LAG_SATURATED_MICROS);
let stored = if micros == LAG_NOT_OBSERVED {
LAG_SATURATED_MICROS
} else {
micros
};
self.leader_lag_micros.store(stored, Ordering::Relaxed);
}
pub fn record_replica_lag(&self, lag: std::time::Duration) {
let micros = u64::try_from(lag.as_micros()).unwrap_or(LAG_SATURATED_MICROS);
let stored = if micros == LAG_NOT_OBSERVED {
LAG_SATURATED_MICROS
} else {
micros
};
self.replica_lag_micros.store(stored, Ordering::Relaxed);
}
pub fn incr_sync_bytes(&self, bytes: u64) {
self.sync_bytes_total.fetch_add(bytes, Ordering::Relaxed);
}
pub fn incr_leader_change(&self) {
self.leader_changes_total.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_under_capacity(&self) {
self.under_capacity_total.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_skip_ahead(&self) {
self.skip_ahead_total.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_election_thrash(&self) {
self.election_thrash_total.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_witness_withdrawal(&self) {
self.witness_withdrawals_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_announce_divergence(&self) {
self.announce_divergence_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub struct ReplicationMetricsRegistry {
channels: DashMap<String, Arc<ChannelMetricsAtomic>>,
}
impl Default for ReplicationMetricsRegistry {
fn default() -> Self {
Self::new()
}
}
impl ReplicationMetricsRegistry {
pub fn new() -> Self {
Self {
channels: DashMap::new(),
}
}
pub fn for_channel(&self, channel: &str) -> Arc<ChannelMetricsAtomic> {
if let Some(m) = self.channels.get(channel) {
return m.clone();
}
let metrics = self
.channels
.entry(channel.to_string())
.or_insert_with(|| Arc::new(ChannelMetricsAtomic::new()))
.clone();
if self.channels.len() > MAX_TRACKED_CHANNELS {
self.channels.remove(channel);
return self
.channels
.entry(OVERFLOW_CHANNEL_LABEL.to_string())
.or_insert_with(|| Arc::new(ChannelMetricsAtomic::new()))
.clone();
}
metrics
}
pub fn snapshot(&self) -> ReplicationMetricsSnapshot {
let mut channels = Vec::with_capacity(self.channels.len());
for entry in self.channels.iter() {
let m = entry.value();
channels.push(ChannelMetrics {
channel: entry.key().clone(),
leader_lag_seconds: load_lag(&m.leader_lag_micros),
replica_lag_seconds: load_lag(&m.replica_lag_micros),
sync_bytes_total: m.sync_bytes_total.load(Ordering::Relaxed),
leader_changes_total: m.leader_changes_total.load(Ordering::Relaxed),
under_capacity_total: m.under_capacity_total.load(Ordering::Relaxed),
skip_ahead_total: m.skip_ahead_total.load(Ordering::Relaxed),
election_thrash_total: m.election_thrash_total.load(Ordering::Relaxed),
witness_withdrawals_total: m.witness_withdrawals_total.load(Ordering::Relaxed),
announce_divergence_total: m.announce_divergence_total.load(Ordering::Relaxed),
});
}
channels.sort_by(|a, b| a.channel.cmp(&b.channel));
ReplicationMetricsSnapshot { channels }
}
#[cfg(test)]
pub fn contains(&self, channel: &str) -> bool {
self.channels.contains_key(channel)
}
pub fn len(&self) -> usize {
self.channels.len()
}
pub fn is_empty(&self) -> bool {
self.channels.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct ChannelMetrics {
pub channel: String,
pub leader_lag_seconds: Option<f64>,
pub replica_lag_seconds: Option<f64>,
pub sync_bytes_total: u64,
pub leader_changes_total: u64,
pub under_capacity_total: u64,
pub skip_ahead_total: u64,
pub election_thrash_total: u64,
pub witness_withdrawals_total: u64,
pub announce_divergence_total: u64,
}
#[derive(Debug, Clone, Default)]
pub struct ReplicationMetricsSnapshot {
pub channels: Vec<ChannelMetrics>,
}
impl ReplicationMetricsSnapshot {
pub fn prometheus_text(&self) -> String {
let mut out = String::with_capacity(2048);
out.push_str(
"# HELP dataforts_replication_lag_seconds Replica's tail_seq lag behind leader, per role.\n",
);
out.push_str("# TYPE dataforts_replication_lag_seconds gauge\n");
for c in &self.channels {
if let Some(secs) = c.leader_lag_seconds {
let _ = writeln!(
out,
"dataforts_replication_lag_seconds{{channel=\"{}\",role=\"leader\"}} {}",
escape_label(&c.channel),
format_seconds(secs),
);
}
if let Some(secs) = c.replica_lag_seconds {
let _ = writeln!(
out,
"dataforts_replication_lag_seconds{{channel=\"{}\",role=\"replica\"}} {}",
escape_label(&c.channel),
format_seconds(secs),
);
}
}
for (help, name, getter) in COUNTER_DESCRIPTORS {
let _ = writeln!(out, "# HELP {} {}", name, help);
let _ = writeln!(out, "# TYPE {} counter", name);
for c in &self.channels {
let _ = writeln!(
out,
"{}{{channel=\"{}\"}} {}",
name,
escape_label(&c.channel),
getter(c),
);
}
}
out
}
pub fn channel(&self, channel: &str) -> Option<&ChannelMetrics> {
self.channels.iter().find(|c| c.channel == channel)
}
pub fn totals(&self) -> HashMap<&'static str, u64> {
let mut totals = HashMap::new();
let mut bump = |k: &'static str, v: u64| {
*totals.entry(k).or_insert(0) += v;
};
for c in &self.channels {
bump("sync_bytes_total", c.sync_bytes_total);
bump("leader_changes_total", c.leader_changes_total);
bump("under_capacity_total", c.under_capacity_total);
bump("skip_ahead_total", c.skip_ahead_total);
bump("election_thrash_total", c.election_thrash_total);
bump("witness_withdrawals_total", c.witness_withdrawals_total);
bump("announce_divergence_total", c.announce_divergence_total);
}
totals
}
}
type CounterGetter = fn(&ChannelMetrics) -> u64;
const COUNTER_DESCRIPTORS: &[(&str, &str, CounterGetter)] = &[
(
"Cumulative bytes shipped via SYNC_RESPONSE.",
"dataforts_replication_sync_bytes_total",
|c| c.sync_bytes_total,
),
(
"Number of leader elections completed.",
"dataforts_leader_changes_total",
|c| c.leader_changes_total,
),
(
"Times the channel hit UnderCapacity policy.",
"dataforts_replication_under_capacity_total",
|c| c.under_capacity_total,
),
(
"Times a replica skipped instead of replaying a large gap.",
"dataforts_replication_skip_ahead_total",
|c| c.skip_ahead_total,
),
(
"Elections triggered within 30 s of the previous one (saturation indicator).",
"dataforts_replication_election_thrash_total",
|c| c.election_thrash_total,
),
(
"Times a peer replica issued a witness Mesh::withdraw_chain for a deposed leader's tag.",
"dataforts_replication_witness_withdrawals_total",
|c| c.witness_withdrawals_total,
),
(
"Times a coordinator transitioned to Idle but its mesh-side withdraw_chain call failed; \
non-zero values mean the mesh may still advertise this node as a chain holder.",
"dataforts_replication_announce_divergence_total",
|c| c.announce_divergence_total,
),
];
fn load_lag(atomic: &AtomicU64) -> Option<f64> {
let raw = atomic.load(Ordering::Relaxed);
if raw == LAG_NOT_OBSERVED {
None
} else {
Some(raw as f64 / 1_000_000.0)
}
}
fn format_seconds(secs: f64) -> String {
let mut s = format!("{:.6}", secs);
if s.contains('.') {
while s.ends_with('0') {
s.pop();
}
if s.ends_with('.') {
s.pop();
}
}
s
}
fn escape_label(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
other => out.push(other),
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn registry_starts_empty() {
let reg = ReplicationMetricsRegistry::new();
assert!(reg.is_empty());
assert_eq!(reg.len(), 0);
}
#[test]
fn for_channel_is_idempotent() {
let reg = ReplicationMetricsRegistry::new();
let a = reg.for_channel("payments");
let b = reg.for_channel("payments");
assert!(Arc::ptr_eq(&a, &b), "same channel must return same Arc");
assert_eq!(reg.len(), 1);
}
#[test]
fn distinct_channels_get_distinct_counters() {
let reg = ReplicationMetricsRegistry::new();
let a = reg.for_channel("payments");
let b = reg.for_channel("refunds");
a.incr_leader_change();
b.incr_leader_change();
b.incr_leader_change();
let snap = reg.snapshot();
let p = snap.channel("payments").expect("payments row");
let r = snap.channel("refunds").expect("refunds row");
assert_eq!(p.leader_changes_total, 1);
assert_eq!(r.leader_changes_total, 2);
}
#[test]
fn counters_increment_in_isolation() {
let reg = ReplicationMetricsRegistry::new();
let m = reg.for_channel("c1");
m.incr_sync_bytes(1024);
m.incr_sync_bytes(512);
m.incr_leader_change();
m.incr_under_capacity();
m.incr_skip_ahead();
m.incr_election_thrash();
m.incr_election_thrash();
m.incr_witness_withdrawal();
let snap = reg.snapshot();
let c = snap.channel("c1").expect("c1 row");
assert_eq!(c.sync_bytes_total, 1536);
assert_eq!(c.leader_changes_total, 1);
assert_eq!(c.under_capacity_total, 1);
assert_eq!(c.skip_ahead_total, 1);
assert_eq!(c.election_thrash_total, 2);
assert_eq!(c.witness_withdrawals_total, 1);
}
#[test]
fn lag_starts_unobserved_then_records() {
let reg = ReplicationMetricsRegistry::new();
let m = reg.for_channel("c1");
let snap = reg.snapshot();
let c = snap.channel("c1").unwrap();
assert_eq!(c.leader_lag_seconds, None);
assert_eq!(c.replica_lag_seconds, None);
m.record_leader_lag(Duration::from_millis(250));
m.record_replica_lag(Duration::from_micros(500));
let snap = reg.snapshot();
let c = snap.channel("c1").unwrap();
assert!(
(c.leader_lag_seconds.unwrap() - 0.25).abs() < 1e-9,
"leader lag should be 0.25s; got {:?}",
c.leader_lag_seconds
);
assert!(
(c.replica_lag_seconds.unwrap() - 0.0005).abs() < 1e-9,
"replica lag should be 0.0005s; got {:?}",
c.replica_lag_seconds
);
}
#[test]
fn snapshot_sorts_by_channel_name() {
let reg = ReplicationMetricsRegistry::new();
for name in ["zebra", "alpha", "delta", "bravo"] {
reg.for_channel(name);
}
let snap = reg.snapshot();
let names: Vec<&str> = snap.channels.iter().map(|c| c.channel.as_str()).collect();
assert_eq!(names, vec!["alpha", "bravo", "delta", "zebra"]);
}
#[test]
fn overflow_folds_past_cap() {
let reg = ReplicationMetricsRegistry::new();
for i in 0..MAX_TRACKED_CHANNELS {
reg.for_channel(&format!("channel-{i}"));
}
assert_eq!(reg.len(), MAX_TRACKED_CHANNELS);
let overflow = reg.for_channel("late-comer");
overflow.incr_leader_change();
let overflow2 = reg.for_channel("another-late-comer");
overflow2.incr_leader_change();
assert!(reg.contains(OVERFLOW_CHANNEL_LABEL));
let snap = reg.snapshot();
let bucket = snap.channel(OVERFLOW_CHANNEL_LABEL).expect("overflow row");
assert_eq!(bucket.leader_changes_total, 2);
let original = snap.channel("channel-0").expect("channel-0 row");
assert_eq!(original.leader_changes_total, 0);
}
#[test]
fn concurrent_inserts_at_cap_converge_within_one_extra_slot() {
use std::sync::Arc as StdArc;
use std::sync::Barrier;
use std::thread;
let reg = StdArc::new(ReplicationMetricsRegistry::new());
for i in 0..MAX_TRACKED_CHANNELS - 1 {
reg.for_channel(&format!("seed-{i}"));
}
let racers = 8;
let barrier = StdArc::new(Barrier::new(racers));
let mut handles = Vec::with_capacity(racers);
for t in 0..racers {
let reg = StdArc::clone(®);
let barrier = StdArc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
reg.for_channel(&format!("racer-{t}"));
}));
}
for h in handles {
h.join().unwrap();
}
assert!(
reg.len() <= MAX_TRACKED_CHANNELS + 1,
"racers blew the cap: len = {}, expected <= {}",
reg.len(),
MAX_TRACKED_CHANNELS + 1,
);
}
#[test]
fn previously_seen_channel_skips_overflow_after_cap() {
let reg = ReplicationMetricsRegistry::new();
reg.for_channel("known").incr_leader_change();
for i in 0..MAX_TRACKED_CHANNELS - 1 {
reg.for_channel(&format!("channel-{i}"));
}
let known = reg.for_channel("known");
known.incr_leader_change();
let snap = reg.snapshot();
let c = snap.channel("known").expect("known row");
assert_eq!(c.leader_changes_total, 2);
assert!(snap.channel(OVERFLOW_CHANNEL_LABEL).is_none());
}
#[test]
fn prometheus_text_emits_every_metric() {
let reg = ReplicationMetricsRegistry::new();
let m = reg.for_channel("payments/settlements");
m.incr_sync_bytes(2048);
m.incr_leader_change();
m.incr_leader_change();
m.incr_under_capacity();
m.incr_skip_ahead();
m.incr_election_thrash();
m.incr_witness_withdrawal();
m.record_leader_lag(Duration::from_millis(125));
m.record_replica_lag(Duration::from_secs(2));
let text = reg.snapshot().prometheus_text();
for name in [
"dataforts_replication_lag_seconds",
"dataforts_replication_sync_bytes_total",
"dataforts_leader_changes_total",
"dataforts_replication_under_capacity_total",
"dataforts_replication_skip_ahead_total",
"dataforts_replication_election_thrash_total",
"dataforts_replication_witness_withdrawals_total",
"dataforts_replication_announce_divergence_total",
] {
assert!(
text.contains(name),
"metric {name} missing from emission: {text}"
);
}
assert!(text.contains("channel=\"payments/settlements\""));
assert!(text.contains(
"dataforts_replication_sync_bytes_total{channel=\"payments/settlements\"} 2048"
));
assert!(text.contains("dataforts_leader_changes_total{channel=\"payments/settlements\"} 2"));
assert!(text.contains("role=\"leader\""));
assert!(text.contains("role=\"replica\""));
let help_lines = text.matches("# HELP ").count();
let type_lines = text.matches("# TYPE ").count();
assert_eq!(help_lines, 8, "expected 8 HELP lines, got {help_lines}");
assert_eq!(type_lines, 8, "expected 8 TYPE lines, got {type_lines}");
}
#[test]
fn prometheus_text_omits_unobserved_lag_roles() {
let reg = ReplicationMetricsRegistry::new();
let m = reg.for_channel("c1");
m.record_leader_lag(Duration::from_millis(10));
let text = reg.snapshot().prometheus_text();
assert!(text.contains("role=\"leader\""));
assert!(
!text.contains("role=\"replica\""),
"unobserved replica lag must not emit: {text}",
);
}
#[test]
fn prometheus_text_escapes_label_quotes_and_backslashes() {
let reg = ReplicationMetricsRegistry::new();
reg.for_channel(r#"weird/name"with"quotes\and\slashes"#)
.incr_leader_change();
let text = reg.snapshot().prometheus_text();
assert!(
text.contains(r#"channel=\"weird/name\\\"with\\\"quotes\\\\and\\\\slashes\""#)
|| text.contains(r#"channel="weird/name\"with\"quotes\\and\\slashes""#)
);
}
#[test]
fn totals_aggregates_across_channels() {
let reg = ReplicationMetricsRegistry::new();
let a = reg.for_channel("a");
let b = reg.for_channel("b");
a.incr_sync_bytes(100);
b.incr_sync_bytes(200);
a.incr_leader_change();
b.incr_leader_change();
b.incr_leader_change();
let snap = reg.snapshot();
let totals = snap.totals();
assert_eq!(totals.get("sync_bytes_total").copied(), Some(300));
assert_eq!(totals.get("leader_changes_total").copied(), Some(3));
}
#[test]
fn lag_record_saturating_on_oversize_duration() {
let reg = ReplicationMetricsRegistry::new();
let m = reg.for_channel("c1");
m.record_leader_lag(Duration::MAX);
let snap = reg.snapshot();
let c = snap.channel("c1").unwrap();
assert!(
c.leader_lag_seconds.is_some(),
"lag must be Some even at saturation"
);
}
#[test]
fn lag_saturated_constant_does_not_collide_with_sentinel() {
assert_ne!(LAG_SATURATED_MICROS, LAG_NOT_OBSERVED);
assert_eq!(
LAG_NOT_OBSERVED - LAG_SATURATED_MICROS,
1,
"saturation value must be exactly one below the sentinel"
);
let reg = ReplicationMetricsRegistry::new();
let m = reg.for_channel("sentinel-gap");
m.record_leader_lag(Duration::MAX);
let raw = m
.leader_lag_micros
.load(std::sync::atomic::Ordering::Relaxed);
assert_eq!(raw, LAG_SATURATED_MICROS);
assert_ne!(raw, LAG_NOT_OBSERVED);
}
}