use std::fmt::Write;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
pub const MAX_TRACKED_CHANNELS: usize = 1024;
pub const OVERFLOW_CHANNEL_LABEL: &str = "__overflow__";
#[derive(Debug, Default)]
pub struct GreedyChannelMetricsAtomic {
pub cache_hits_total: AtomicU64,
pub serve_count_total: AtomicU64,
pub evictions_total: AtomicU64,
pub bytes_resident: AtomicU64,
}
impl GreedyChannelMetricsAtomic {
pub fn new() -> Self {
Self::default()
}
pub fn incr_cache_hit(&self) {
self.cache_hits_total.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_serve(&self) {
self.serve_count_total.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_eviction(&self) {
self.evictions_total.fetch_add(1, Ordering::Relaxed);
}
pub fn set_bytes_resident(&self, bytes: u64) {
self.bytes_resident.store(bytes, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
pub struct GreedyClusterMetricsAtomic {
pub admit_rejected_scope_total: AtomicU64,
pub admit_rejected_intent_total: AtomicU64,
pub admit_rejected_colocation_total: AtomicU64,
pub admit_rejected_capacity_total: AtomicU64,
pub admit_rejected_bandwidth_total: AtomicU64,
pub io_budget_used_bytes: AtomicU64,
pub observer_dropped_overloaded_total: AtomicU64,
pub gravity_heat_unattributed_total: AtomicU64,
pub blob_pulls_admitted_total: AtomicU64,
pub blob_pulls_rejected_no_storage_total: AtomicU64,
pub blob_pulls_rejected_greedy_disabled_total: AtomicU64,
pub blob_pulls_rejected_proximity_zero_total: AtomicU64,
pub blob_pulls_rejected_unhealthy_total: AtomicU64,
pub blob_pulls_rejected_scope_mismatch_total: AtomicU64,
pub blob_prefetches_ok_total: AtomicU64,
pub blob_prefetches_err_total: AtomicU64,
}
impl GreedyClusterMetricsAtomic {
pub fn new() -> Self {
Self::default()
}
pub fn incr_admit_rejected(&self, reason: AdmitRejectReason) {
match reason {
AdmitRejectReason::Scope => {
self.admit_rejected_scope_total
.fetch_add(1, Ordering::Relaxed);
}
AdmitRejectReason::Intent => {
self.admit_rejected_intent_total
.fetch_add(1, Ordering::Relaxed);
}
AdmitRejectReason::Colocation => {
self.admit_rejected_colocation_total
.fetch_add(1, Ordering::Relaxed);
}
AdmitRejectReason::Capacity => {
self.admit_rejected_capacity_total
.fetch_add(1, Ordering::Relaxed);
}
AdmitRejectReason::Bandwidth => {
self.admit_rejected_bandwidth_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn set_io_budget_used_bytes(&self, bytes: u64) {
self.io_budget_used_bytes.store(bytes, Ordering::Relaxed);
}
pub fn incr_observer_dropped_overloaded(&self) {
self.observer_dropped_overloaded_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_gravity_heat_unattributed(&self) {
self.gravity_heat_unattributed_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_blob_pull_admitted(&self) {
self.blob_pulls_admitted_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_blob_pull_rejected(
&self,
reason: crate::adapter::net::dataforts::blob::PullBlobReject,
) {
use crate::adapter::net::dataforts::blob::PullBlobReject;
match reason {
PullBlobReject::NoStorageCap => {
self.blob_pulls_rejected_no_storage_total
.fetch_add(1, Ordering::Relaxed);
}
PullBlobReject::GreedyDisabled => {
self.blob_pulls_rejected_greedy_disabled_total
.fetch_add(1, Ordering::Relaxed);
}
PullBlobReject::ProximityZero => {
self.blob_pulls_rejected_proximity_zero_total
.fetch_add(1, Ordering::Relaxed);
}
PullBlobReject::Unhealthy => {
self.blob_pulls_rejected_unhealthy_total
.fetch_add(1, Ordering::Relaxed);
}
PullBlobReject::ScopeMismatch => {
self.blob_pulls_rejected_scope_mismatch_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn incr_blob_prefetch_ok(&self) {
self.blob_prefetches_ok_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_blob_prefetch_err(&self) {
self.blob_prefetches_err_total
.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmitRejectReason {
Scope,
Intent,
Colocation,
Capacity,
Bandwidth,
}
#[derive(Debug)]
pub struct GreedyMetricsRegistry {
channels: DashMap<String, Arc<GreedyChannelMetricsAtomic>>,
cluster: Arc<GreedyClusterMetricsAtomic>,
}
impl Default for GreedyMetricsRegistry {
fn default() -> Self {
Self::new()
}
}
impl GreedyMetricsRegistry {
pub fn new() -> Self {
Self {
channels: DashMap::new(),
cluster: Arc::new(GreedyClusterMetricsAtomic::new()),
}
}
pub fn for_channel(&self, channel: &str) -> Arc<GreedyChannelMetricsAtomic> {
if let Some(m) = self.channels.get(channel) {
return m.clone();
}
if self.channels.len() >= MAX_TRACKED_CHANNELS && !self.channels.contains_key(channel) {
return self
.channels
.entry(OVERFLOW_CHANNEL_LABEL.to_string())
.or_insert_with(|| Arc::new(GreedyChannelMetricsAtomic::new()))
.clone();
}
self.channels
.entry(channel.to_string())
.or_insert_with(|| Arc::new(GreedyChannelMetricsAtomic::new()))
.clone()
}
pub fn cluster(&self) -> Arc<GreedyClusterMetricsAtomic> {
self.cluster.clone()
}
pub fn len(&self) -> usize {
self.channels.len()
}
pub fn is_empty(&self) -> bool {
self.channels.is_empty()
}
pub fn snapshot(&self) -> GreedyMetricsSnapshot {
let mut channels = Vec::with_capacity(self.channels.len());
for entry in self.channels.iter() {
let m = entry.value();
channels.push(GreedyChannelMetrics {
channel: entry.key().clone(),
cache_hits_total: m.cache_hits_total.load(Ordering::Relaxed),
serve_count_total: m.serve_count_total.load(Ordering::Relaxed),
evictions_total: m.evictions_total.load(Ordering::Relaxed),
bytes_resident: m.bytes_resident.load(Ordering::Relaxed),
});
}
channels.sort_by(|a, b| a.channel.cmp(&b.channel));
GreedyMetricsSnapshot {
channels,
cluster: GreedyClusterMetrics {
admit_rejected_scope_total: self
.cluster
.admit_rejected_scope_total
.load(Ordering::Relaxed),
admit_rejected_intent_total: self
.cluster
.admit_rejected_intent_total
.load(Ordering::Relaxed),
admit_rejected_colocation_total: self
.cluster
.admit_rejected_colocation_total
.load(Ordering::Relaxed),
admit_rejected_capacity_total: self
.cluster
.admit_rejected_capacity_total
.load(Ordering::Relaxed),
admit_rejected_bandwidth_total: self
.cluster
.admit_rejected_bandwidth_total
.load(Ordering::Relaxed),
io_budget_used_bytes: self.cluster.io_budget_used_bytes.load(Ordering::Relaxed),
observer_dropped_overloaded_total: self
.cluster
.observer_dropped_overloaded_total
.load(Ordering::Relaxed),
gravity_heat_unattributed_total: self
.cluster
.gravity_heat_unattributed_total
.load(Ordering::Relaxed),
blob_pulls_admitted_total: self
.cluster
.blob_pulls_admitted_total
.load(Ordering::Relaxed),
blob_pulls_rejected_no_storage_total: self
.cluster
.blob_pulls_rejected_no_storage_total
.load(Ordering::Relaxed),
blob_pulls_rejected_greedy_disabled_total: self
.cluster
.blob_pulls_rejected_greedy_disabled_total
.load(Ordering::Relaxed),
blob_pulls_rejected_proximity_zero_total: self
.cluster
.blob_pulls_rejected_proximity_zero_total
.load(Ordering::Relaxed),
blob_pulls_rejected_unhealthy_total: self
.cluster
.blob_pulls_rejected_unhealthy_total
.load(Ordering::Relaxed),
blob_pulls_rejected_scope_mismatch_total: self
.cluster
.blob_pulls_rejected_scope_mismatch_total
.load(Ordering::Relaxed),
blob_prefetches_ok_total: self
.cluster
.blob_prefetches_ok_total
.load(Ordering::Relaxed),
blob_prefetches_err_total: self
.cluster
.blob_prefetches_err_total
.load(Ordering::Relaxed),
},
}
}
}
#[derive(Debug, Clone)]
pub struct GreedyChannelMetrics {
pub channel: String,
pub cache_hits_total: u64,
pub serve_count_total: u64,
pub evictions_total: u64,
pub bytes_resident: u64,
}
#[derive(Debug, Clone, Default)]
pub struct GreedyClusterMetrics {
pub admit_rejected_scope_total: u64,
pub admit_rejected_intent_total: u64,
pub admit_rejected_colocation_total: u64,
pub admit_rejected_capacity_total: u64,
pub admit_rejected_bandwidth_total: u64,
pub io_budget_used_bytes: u64,
pub observer_dropped_overloaded_total: u64,
pub gravity_heat_unattributed_total: u64,
pub blob_pulls_admitted_total: u64,
pub blob_pulls_rejected_no_storage_total: u64,
pub blob_pulls_rejected_greedy_disabled_total: u64,
pub blob_pulls_rejected_proximity_zero_total: u64,
pub blob_pulls_rejected_unhealthy_total: u64,
pub blob_pulls_rejected_scope_mismatch_total: u64,
pub blob_prefetches_ok_total: u64,
pub blob_prefetches_err_total: u64,
}
#[derive(Debug, Clone, Default)]
pub struct GreedyMetricsSnapshot {
pub channels: Vec<GreedyChannelMetrics>,
pub cluster: GreedyClusterMetrics,
}
impl GreedyMetricsSnapshot {
pub fn prometheus_text(&self) -> String {
let mut out = String::with_capacity(2048);
for (help, name, getter) in CHANNEL_COUNTER_DESCRIPTORS {
let _ = writeln!(out, "# HELP {} {}", name, help);
let _ = writeln!(out, "# TYPE {} counter", name);
for c in &self.channels {
let _ = writeln!(
out,
"{}{{channel=\"{}\"}} {}",
name,
escape_label(&c.channel),
getter(c),
);
}
}
let _ = writeln!(
out,
"# HELP dataforts_greedy_bytes_resident Current bytes resident in the channel's cache."
);
let _ = writeln!(out, "# TYPE dataforts_greedy_bytes_resident gauge");
for c in &self.channels {
let _ = writeln!(
out,
"dataforts_greedy_bytes_resident{{channel=\"{}\"}} {}",
escape_label(&c.channel),
c.bytes_resident,
);
}
let _ = writeln!(
out,
"# HELP dataforts_greedy_admit_rejected_total Cumulative greedy-cache admission rejections, split by axis."
);
let _ = writeln!(out, "# TYPE dataforts_greedy_admit_rejected_total counter");
for (reason, count) in [
("scope", self.cluster.admit_rejected_scope_total),
("intent", self.cluster.admit_rejected_intent_total),
("colocation", self.cluster.admit_rejected_colocation_total),
("capacity", self.cluster.admit_rejected_capacity_total),
("bandwidth", self.cluster.admit_rejected_bandwidth_total),
] {
let _ = writeln!(
out,
"dataforts_greedy_admit_rejected_total{{reason=\"{}\"}} {}",
reason, count,
);
}
let _ = writeln!(
out,
"# HELP dataforts_greedy_io_budget_used_bytes Bytes consumed from the greedy I/O token bucket since the last refill."
);
let _ = writeln!(out, "# TYPE dataforts_greedy_io_budget_used_bytes gauge");
let _ = writeln!(
out,
"dataforts_greedy_io_budget_used_bytes {}",
self.cluster.io_budget_used_bytes,
);
let _ = writeln!(
out,
"# HELP dataforts_greedy_observer_dropped_total Cumulative events dropped at the observe_event hot path, split by reason."
);
let _ = writeln!(
out,
"# TYPE dataforts_greedy_observer_dropped_total counter"
);
let _ = writeln!(
out,
"dataforts_greedy_observer_dropped_total{{reason=\"overloaded\"}} {}",
self.cluster.observer_dropped_overloaded_total,
);
let _ = writeln!(
out,
"# HELP dataforts_greedy_gravity_heat_unattributed_total Cumulative gravity note_read bumps skipped because origin_hash was zero (publisher didn't stamp identity)."
);
let _ = writeln!(
out,
"# TYPE dataforts_greedy_gravity_heat_unattributed_total counter"
);
let _ = writeln!(
out,
"dataforts_greedy_gravity_heat_unattributed_total {}",
self.cluster.gravity_heat_unattributed_total,
);
let _ = writeln!(
out,
"# HELP dataforts_greedy_blob_pulls_admitted_total Cumulative G-1 blob-pull verdicts that returned Admit."
);
let _ = writeln!(
out,
"# TYPE dataforts_greedy_blob_pulls_admitted_total counter"
);
let _ = writeln!(
out,
"dataforts_greedy_blob_pulls_admitted_total {}",
self.cluster.blob_pulls_admitted_total,
);
let _ = writeln!(
out,
"# HELP dataforts_greedy_blob_pulls_rejected_total Cumulative G-1 blob-pull rejections, split by reason."
);
let _ = writeln!(
out,
"# TYPE dataforts_greedy_blob_pulls_rejected_total counter"
);
for (reason, count) in [
(
"no_storage",
self.cluster.blob_pulls_rejected_no_storage_total,
),
(
"greedy_disabled",
self.cluster.blob_pulls_rejected_greedy_disabled_total,
),
(
"proximity_zero",
self.cluster.blob_pulls_rejected_proximity_zero_total,
),
(
"unhealthy",
self.cluster.blob_pulls_rejected_unhealthy_total,
),
(
"scope_mismatch",
self.cluster.blob_pulls_rejected_scope_mismatch_total,
),
] {
let _ = writeln!(
out,
"dataforts_greedy_blob_pulls_rejected_total{{reason=\"{}\"}} {}",
reason, count,
);
}
let _ = writeln!(
out,
"# HELP dataforts_greedy_blob_prefetches_total Cumulative BlobAdapter::prefetch invocations from the G-1 admit path, split by outcome."
);
let _ = writeln!(out, "# TYPE dataforts_greedy_blob_prefetches_total counter");
for (outcome, count) in [
("ok", self.cluster.blob_prefetches_ok_total),
("err", self.cluster.blob_prefetches_err_total),
] {
let _ = writeln!(
out,
"dataforts_greedy_blob_prefetches_total{{outcome=\"{}\"}} {}",
outcome, count,
);
}
out
}
pub fn is_empty(&self) -> bool {
self.channels.is_empty()
&& self.cluster.admit_rejected_scope_total == 0
&& self.cluster.admit_rejected_intent_total == 0
&& self.cluster.admit_rejected_colocation_total == 0
&& self.cluster.admit_rejected_capacity_total == 0
&& self.cluster.admit_rejected_bandwidth_total == 0
&& self.cluster.io_budget_used_bytes == 0
&& self.cluster.observer_dropped_overloaded_total == 0
&& self.cluster.gravity_heat_unattributed_total == 0
&& self.cluster.blob_pulls_admitted_total == 0
&& self.cluster.blob_pulls_rejected_no_storage_total == 0
&& self.cluster.blob_pulls_rejected_greedy_disabled_total == 0
&& self.cluster.blob_pulls_rejected_proximity_zero_total == 0
&& self.cluster.blob_pulls_rejected_unhealthy_total == 0
&& self.cluster.blob_pulls_rejected_scope_mismatch_total == 0
&& self.cluster.blob_prefetches_ok_total == 0
&& self.cluster.blob_prefetches_err_total == 0
}
}
type ChannelCounterGetter = fn(&GreedyChannelMetrics) -> u64;
const CHANNEL_COUNTER_DESCRIPTORS: &[(&str, &str, ChannelCounterGetter)] = &[
(
"Cumulative greedy-cache hits — substrate read resolved to a cached holder.",
"dataforts_greedy_cache_hits_total",
(|c| c.cache_hits_total) as ChannelCounterGetter,
),
(
"Cumulative reads served from this node's greedy cache.",
"dataforts_greedy_serve_count_total",
(|c| c.serve_count_total) as ChannelCounterGetter,
),
(
"Cumulative greedy-cache evictions under cluster-cap pressure.",
"dataforts_greedy_evictions_total",
(|c| c.evictions_total) as ChannelCounterGetter,
),
];
fn escape_label(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for ch in s.chars() {
match ch {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
other => out.push(other),
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registry_starts_empty() {
let r = GreedyMetricsRegistry::new();
assert!(r.is_empty());
assert_eq!(r.len(), 0);
let snap = r.snapshot();
assert!(snap.is_empty());
}
#[test]
fn for_channel_returns_same_arc_per_name() {
let r = GreedyMetricsRegistry::new();
let a1 = r.for_channel("test/foo");
let a2 = r.for_channel("test/foo");
assert!(Arc::ptr_eq(&a1, &a2));
}
#[test]
fn channel_counters_bump_into_snapshot() {
let r = GreedyMetricsRegistry::new();
let m = r.for_channel("test/a");
m.incr_cache_hit();
m.incr_cache_hit();
m.incr_eviction();
m.set_bytes_resident(4096);
let snap = r.snapshot();
let c = snap
.channels
.iter()
.find(|c| c.channel == "test/a")
.expect("entry present");
assert_eq!(c.cache_hits_total, 2);
assert_eq!(c.evictions_total, 1);
assert_eq!(c.bytes_resident, 4096);
}
#[test]
fn cluster_admit_rejected_bumps_per_reason() {
let r = GreedyMetricsRegistry::new();
let cluster = r.cluster();
cluster.incr_admit_rejected(AdmitRejectReason::Scope);
cluster.incr_admit_rejected(AdmitRejectReason::Scope);
cluster.incr_admit_rejected(AdmitRejectReason::Intent);
cluster.incr_admit_rejected(AdmitRejectReason::Capacity);
cluster.incr_admit_rejected(AdmitRejectReason::Bandwidth);
cluster.incr_admit_rejected(AdmitRejectReason::Bandwidth);
let snap = r.snapshot();
assert_eq!(snap.cluster.admit_rejected_scope_total, 2);
assert_eq!(snap.cluster.admit_rejected_intent_total, 1);
assert_eq!(snap.cluster.admit_rejected_colocation_total, 0);
assert_eq!(snap.cluster.admit_rejected_capacity_total, 1);
assert_eq!(snap.cluster.admit_rejected_bandwidth_total, 2);
}
#[test]
fn overflow_bucket_activates_past_cap() {
let r = GreedyMetricsRegistry::new();
let a = r.for_channel("a");
let b = r.for_channel("b");
assert!(!Arc::ptr_eq(&a, &b));
}
#[test]
fn prometheus_text_renders_all_metrics() {
let r = GreedyMetricsRegistry::new();
let m = r.for_channel("test/a");
m.incr_cache_hit();
m.set_bytes_resident(2048);
let cluster = r.cluster();
cluster.incr_admit_rejected(AdmitRejectReason::Scope);
cluster.set_io_budget_used_bytes(8192);
let text = r.snapshot().prometheus_text();
assert!(text.contains("dataforts_greedy_cache_hits_total{channel=\"test/a\"} 1"));
assert!(text.contains("dataforts_greedy_bytes_resident{channel=\"test/a\"} 2048"));
assert!(text.contains("dataforts_greedy_admit_rejected_total{reason=\"scope\"} 1"));
assert!(text.contains("dataforts_greedy_io_budget_used_bytes 8192"));
assert!(text.contains("dataforts_greedy_admit_rejected_total{reason=\"intent\"} 0"));
assert!(text.contains("dataforts_greedy_admit_rejected_total{reason=\"colocation\"} 0"));
assert!(text.contains("dataforts_greedy_admit_rejected_total{reason=\"capacity\"} 0"));
assert!(text.contains("dataforts_greedy_admit_rejected_total{reason=\"bandwidth\"} 0"));
assert!(text.contains("dataforts_greedy_observer_dropped_total{reason=\"overloaded\"} 0"));
}
#[test]
fn channels_render_in_sorted_order() {
let r = GreedyMetricsRegistry::new();
r.for_channel("zeta").incr_cache_hit();
r.for_channel("alpha").incr_cache_hit();
r.for_channel("middle").incr_cache_hit();
let snap = r.snapshot();
let channels: Vec<&str> = snap.channels.iter().map(|c| c.channel.as_str()).collect();
assert_eq!(channels, vec!["alpha", "middle", "zeta"]);
}
#[test]
fn label_escape_handles_quotes_and_backslashes() {
assert_eq!(escape_label("plain"), "plain");
assert_eq!(escape_label("with \"quote\""), "with \\\"quote\\\"");
assert_eq!(escape_label("a\\b"), "a\\\\b");
assert_eq!(escape_label("multi\nline"), "multi\\nline");
}
}