use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Clone, Debug, Default)]
pub struct BlobMetrics {
inner: Arc<MetricsInner>,
}
#[derive(Debug, Default)]
struct MetricsInner {
blobs_stored_total: AtomicU64,
blobs_fetched_total: AtomicU64,
bytes_stored_total: AtomicU64,
gc_swept_total: AtomicU64,
disk_used_bytes: AtomicU64,
disk_capacity_bytes: AtomicU64,
overflow_pushes_admitted_total: AtomicU64,
overflow_push_errors_total: AtomicU64,
overflow_pushed_bytes_total: AtomicU64,
overflow_rejected_no_target_total: AtomicU64,
overflow_rejected_no_storage_cap_total: AtomicU64,
overflow_rejected_not_participating_total: AtomicU64,
overflow_rejected_sender_not_overflowing_total: AtomicU64,
overflow_rejected_unhealthy_total: AtomicU64,
overflow_rejected_scope_mismatch_total: AtomicU64,
overflow_rejected_insufficient_disk_total: AtomicU64,
overflow_high_water_triggered_total: AtomicU64,
overflow_low_water_cleared_total: AtomicU64,
overflow_active: AtomicU64,
overflow_disk_ratio_x1000: AtomicU64,
}
impl BlobMetrics {
pub fn new() -> Self {
Self {
inner: Arc::new(MetricsInner::default()),
}
}
pub fn snapshot(&self) -> BlobMetricsSnapshot {
let inner = &self.inner;
BlobMetricsSnapshot {
blobs_stored_total: inner.blobs_stored_total.load(Ordering::Relaxed),
blobs_fetched_total: inner.blobs_fetched_total.load(Ordering::Relaxed),
bytes_stored_total: inner.bytes_stored_total.load(Ordering::Relaxed),
gc_swept_total: inner.gc_swept_total.load(Ordering::Relaxed),
disk_used_bytes: inner.disk_used_bytes.load(Ordering::Relaxed),
disk_capacity_bytes: inner.disk_capacity_bytes.load(Ordering::Relaxed),
overflow: OverflowMetricsSnapshot {
pushes_admitted_total: inner.overflow_pushes_admitted_total.load(Ordering::Relaxed),
push_errors_total: inner.overflow_push_errors_total.load(Ordering::Relaxed),
pushed_bytes_total: inner.overflow_pushed_bytes_total.load(Ordering::Relaxed),
rejected_no_target_total: inner
.overflow_rejected_no_target_total
.load(Ordering::Relaxed),
rejected_no_storage_cap_total: inner
.overflow_rejected_no_storage_cap_total
.load(Ordering::Relaxed),
rejected_not_participating_total: inner
.overflow_rejected_not_participating_total
.load(Ordering::Relaxed),
rejected_sender_not_overflowing_total: inner
.overflow_rejected_sender_not_overflowing_total
.load(Ordering::Relaxed),
rejected_unhealthy_total: inner
.overflow_rejected_unhealthy_total
.load(Ordering::Relaxed),
rejected_scope_mismatch_total: inner
.overflow_rejected_scope_mismatch_total
.load(Ordering::Relaxed),
rejected_insufficient_disk_total: inner
.overflow_rejected_insufficient_disk_total
.load(Ordering::Relaxed),
high_water_triggered_total: inner
.overflow_high_water_triggered_total
.load(Ordering::Relaxed),
low_water_cleared_total: inner
.overflow_low_water_cleared_total
.load(Ordering::Relaxed),
active: inner.overflow_active.load(Ordering::Relaxed) != 0,
disk_ratio: inner.overflow_disk_ratio_x1000.load(Ordering::Relaxed) as f64 / 1000.0,
},
}
}
pub fn record_store(&self, size: u64) {
self.inner
.blobs_stored_total
.fetch_add(1, Ordering::Relaxed);
self.inner
.bytes_stored_total
.fetch_add(size, Ordering::Relaxed);
}
pub fn record_fetch(&self) {
self.inner
.blobs_fetched_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_gc_swept(&self, count: u64) {
self.inner
.gc_swept_total
.fetch_add(count, Ordering::Relaxed);
}
pub fn set_disk_used_bytes(&self, bytes: u64) {
self.inner.disk_used_bytes.store(bytes, Ordering::Relaxed);
}
pub fn set_disk_capacity_bytes(&self, bytes: u64) {
self.inner
.disk_capacity_bytes
.store(bytes, Ordering::Relaxed);
}
pub fn record_overflow_tick(&self, report: &super::overflow::BlobOverflowTickReport) {
let inner = &self.inner;
inner
.overflow_pushes_admitted_total
.fetch_add(report.admitted, Ordering::Relaxed);
inner
.overflow_push_errors_total
.fetch_add(report.push_errors, Ordering::Relaxed);
inner
.overflow_pushed_bytes_total
.fetch_add(report.pushed_bytes, Ordering::Relaxed);
inner
.overflow_rejected_no_target_total
.fetch_add(report.rejected_no_target, Ordering::Relaxed);
if !report.was_active_at_start && report.is_active_at_end {
inner
.overflow_high_water_triggered_total
.fetch_add(1, Ordering::Relaxed);
}
if report.was_active_at_start && !report.is_active_at_end {
inner
.overflow_low_water_cleared_total
.fetch_add(1, Ordering::Relaxed);
}
inner.overflow_active.store(
if report.is_active_at_end { 1 } else { 0 },
Ordering::Relaxed,
);
let raw = report.disk_ratio_at_end;
let ratio = if raw.is_nan() {
0.0
} else {
raw.clamp(0.0, 10.0)
};
inner
.overflow_disk_ratio_x1000
.store((ratio * 1000.0) as u64, Ordering::Relaxed);
}
pub fn record_overflow_reject(&self, reason: super::admission::OverflowReject) {
use super::admission::OverflowReject as R;
let counter = match reason {
R::NoStorageCap => &self.inner.overflow_rejected_no_storage_cap_total,
R::NotParticipating => &self.inner.overflow_rejected_not_participating_total,
R::SenderNotOverflowing => &self.inner.overflow_rejected_sender_not_overflowing_total,
R::Unhealthy => &self.inner.overflow_rejected_unhealthy_total,
R::ScopeMismatch => &self.inner.overflow_rejected_scope_mismatch_total,
R::InsufficientDisk => &self.inner.overflow_rejected_insufficient_disk_total,
};
counter.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct BlobMetricsSnapshot {
pub blobs_stored_total: u64,
pub blobs_fetched_total: u64,
pub bytes_stored_total: u64,
pub gc_swept_total: u64,
pub disk_used_bytes: u64,
pub disk_capacity_bytes: u64,
pub overflow: OverflowMetricsSnapshot,
}
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct OverflowMetricsSnapshot {
pub pushes_admitted_total: u64,
pub push_errors_total: u64,
pub pushed_bytes_total: u64,
pub rejected_no_target_total: u64,
pub rejected_no_storage_cap_total: u64,
pub rejected_not_participating_total: u64,
pub rejected_sender_not_overflowing_total: u64,
pub rejected_unhealthy_total: u64,
pub rejected_scope_mismatch_total: u64,
pub rejected_insufficient_disk_total: u64,
pub high_water_triggered_total: u64,
pub low_water_cleared_total: u64,
pub active: bool,
pub disk_ratio: f64,
}
impl BlobMetricsSnapshot {
pub fn to_prometheus_text(&self, adapter_id: &str, gc_pending_total: u64) -> String {
let label = escape_prometheus_label(adapter_id);
let label = label.as_str();
let mut out = String::new();
out.push_str(&format!(
"# HELP dataforts_blobs_stored_total Successful blob stores.\n\
# TYPE dataforts_blobs_stored_total counter\n\
dataforts_blobs_stored_total{{adapter=\"{}\"}} {}\n",
label, self.blobs_stored_total
));
out.push_str(&format!(
"# HELP dataforts_blobs_fetched_total Successful blob fetches.\n\
# TYPE dataforts_blobs_fetched_total counter\n\
dataforts_blobs_fetched_total{{adapter=\"{}\"}} {}\n",
label, self.blobs_fetched_total
));
out.push_str(&format!(
"# HELP dataforts_blob_bytes_stored_total Bytes accepted by store.\n\
# TYPE dataforts_blob_bytes_stored_total counter\n\
dataforts_blob_bytes_stored_total{{adapter=\"{}\"}} {}\n",
label, self.bytes_stored_total
));
out.push_str(&format!(
"# HELP dataforts_blob_gc_swept_total Blobs removed by GC sweep.\n\
# TYPE dataforts_blob_gc_swept_total counter\n\
dataforts_blob_gc_swept_total{{adapter=\"{}\"}} {}\n",
label, self.gc_swept_total
));
out.push_str(&format!(
"# HELP dataforts_blob_gc_pending Zero-refcount blobs waiting on retention floor.\n\
# TYPE dataforts_blob_gc_pending gauge\n\
dataforts_blob_gc_pending{{adapter=\"{}\"}} {}\n",
label, gc_pending_total
));
out.push_str(&format!(
"# HELP dataforts_blob_disk_used_bytes Bytes the adapter currently holds.\n\
# TYPE dataforts_blob_disk_used_bytes gauge\n\
dataforts_blob_disk_used_bytes{{adapter=\"{}\"}} {}\n",
label, self.disk_used_bytes
));
out.push_str(&format!(
"# HELP dataforts_blob_disk_capacity_bytes Operator-configured disk cap.\n\
# TYPE dataforts_blob_disk_capacity_bytes gauge\n\
dataforts_blob_disk_capacity_bytes{{adapter=\"{}\"}} {}\n",
label, self.disk_capacity_bytes
));
let o = &self.overflow;
out.push_str(&format!(
"# HELP dataforts_blob_overflow_pushes_admitted_total Successful overflow pushes (Accepted ack).\n\
# TYPE dataforts_blob_overflow_pushes_admitted_total counter\n\
dataforts_blob_overflow_pushes_admitted_total{{adapter=\"{}\"}} {}\n",
label, o.pushes_admitted_total
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_push_errors_total Send-side overflow failures (non-Accepted ack + transport errors).\n\
# TYPE dataforts_blob_overflow_push_errors_total counter\n\
dataforts_blob_overflow_push_errors_total{{adapter=\"{}\"}} {}\n",
label, o.push_errors_total
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_pushed_bytes_total Bytes pushed via overflow (sum of size_bytes on Accepted).\n\
# TYPE dataforts_blob_overflow_pushed_bytes_total counter\n\
dataforts_blob_overflow_pushed_bytes_total{{adapter=\"{}\"}} {}\n",
label, o.pushed_bytes_total
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_rejected_no_target_total Tick computed a cold candidate but no overflow-enabled peer was reachable.\n\
# TYPE dataforts_blob_overflow_rejected_no_target_total counter\n\
dataforts_blob_overflow_rejected_no_target_total{{adapter=\"{}\"}} {}\n",
label, o.rejected_no_target_total
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_rejected_total Receive-side admission rejections by reason.\n\
# TYPE dataforts_blob_overflow_rejected_total counter\n\
dataforts_blob_overflow_rejected_total{{adapter=\"{}\",reason=\"no_storage_cap\"}} {}\n\
dataforts_blob_overflow_rejected_total{{adapter=\"{}\",reason=\"not_participating\"}} {}\n\
dataforts_blob_overflow_rejected_total{{adapter=\"{}\",reason=\"sender_not_overflowing\"}} {}\n\
dataforts_blob_overflow_rejected_total{{adapter=\"{}\",reason=\"unhealthy\"}} {}\n\
dataforts_blob_overflow_rejected_total{{adapter=\"{}\",reason=\"scope_mismatch\"}} {}\n\
dataforts_blob_overflow_rejected_total{{adapter=\"{}\",reason=\"insufficient_disk\"}} {}\n",
label, o.rejected_no_storage_cap_total,
label, o.rejected_not_participating_total,
label, o.rejected_sender_not_overflowing_total,
label, o.rejected_unhealthy_total,
label, o.rejected_scope_mismatch_total,
label, o.rejected_insufficient_disk_total,
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_high_water_triggered_total Hysteresis transitions from inactive to active (false -> true).\n\
# TYPE dataforts_blob_overflow_high_water_triggered_total counter\n\
dataforts_blob_overflow_high_water_triggered_total{{adapter=\"{}\"}} {}\n",
label, o.high_water_triggered_total
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_low_water_cleared_total Hysteresis transitions from active to inactive (true -> false).\n\
# TYPE dataforts_blob_overflow_low_water_cleared_total counter\n\
dataforts_blob_overflow_low_water_cleared_total{{adapter=\"{}\"}} {}\n",
label, o.low_water_cleared_total
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_active 1 iff the overflow tick is actively shedding (hysteresis high).\n\
# TYPE dataforts_blob_overflow_active gauge\n\
dataforts_blob_overflow_active{{adapter=\"{}\"}} {}\n",
label, if o.active { 1 } else { 0 }
));
out.push_str(&format!(
"# HELP dataforts_blob_overflow_disk_ratio Local disk usage ratio observed at the most recent tick.\n\
# TYPE dataforts_blob_overflow_disk_ratio gauge\n\
dataforts_blob_overflow_disk_ratio{{adapter=\"{}\"}} {:.3}\n",
label, o.disk_ratio
));
out
}
}
fn escape_prometheus_label(input: &str) -> String {
let mut out = String::with_capacity(input.len());
for ch in input.chars() {
match ch {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
other => out.push(other),
}
}
out
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HealthGateAction {
Emit,
Clear,
Unchanged,
}
pub const HEALTH_GATE_EMIT_THRESHOLD: f64 = 0.95;
pub const HEALTH_GATE_CLEAR_THRESHOLD: f64 = 0.85;
pub fn evaluate_health_gate(
used_bytes: u64,
capacity_bytes: u64,
currently_unhealthy: bool,
) -> HealthGateAction {
if capacity_bytes == 0 {
return HealthGateAction::Unchanged;
}
let usage = used_bytes as f64 / capacity_bytes as f64;
if usage >= HEALTH_GATE_EMIT_THRESHOLD {
if currently_unhealthy {
HealthGateAction::Unchanged
} else {
HealthGateAction::Emit
}
} else if usage <= HEALTH_GATE_CLEAR_THRESHOLD {
if currently_unhealthy {
HealthGateAction::Clear
} else {
HealthGateAction::Unchanged
}
} else {
HealthGateAction::Unchanged
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn record_store_bumps_both_counters() {
let m = BlobMetrics::new();
m.record_store(1024);
m.record_store(2048);
let s = m.snapshot();
assert_eq!(s.blobs_stored_total, 2);
assert_eq!(s.bytes_stored_total, 1024 + 2048);
}
#[test]
fn record_fetch_bumps_only_fetched() {
let m = BlobMetrics::new();
m.record_fetch();
m.record_fetch();
m.record_fetch();
let s = m.snapshot();
assert_eq!(s.blobs_fetched_total, 3);
assert_eq!(s.blobs_stored_total, 0);
assert_eq!(s.bytes_stored_total, 0);
}
#[test]
fn record_gc_swept_accumulates() {
let m = BlobMetrics::new();
m.record_gc_swept(5);
m.record_gc_swept(0); m.record_gc_swept(3);
let s = m.snapshot();
assert_eq!(s.gc_swept_total, 8);
}
#[test]
fn disk_gauges_set_overwrites() {
let m = BlobMetrics::new();
m.set_disk_capacity_bytes(1 << 30); m.set_disk_used_bytes(100);
m.set_disk_used_bytes(200); let s = m.snapshot();
assert_eq!(s.disk_capacity_bytes, 1 << 30);
assert_eq!(s.disk_used_bytes, 200);
}
#[test]
fn prometheus_text_includes_every_field() {
let m = BlobMetrics::new();
m.record_store(1024);
m.record_fetch();
m.record_gc_swept(2);
m.set_disk_capacity_bytes(1 << 30);
m.set_disk_used_bytes(1 << 28);
let text = m.snapshot().to_prometheus_text("my-adapter", 7);
assert!(text.contains("dataforts_blobs_stored_total{adapter=\"my-adapter\"} 1"));
assert!(text.contains("dataforts_blobs_fetched_total{adapter=\"my-adapter\"} 1"));
assert!(text.contains("dataforts_blob_bytes_stored_total{adapter=\"my-adapter\"} 1024"));
assert!(text.contains("dataforts_blob_gc_swept_total{adapter=\"my-adapter\"} 2"));
assert!(text.contains("dataforts_blob_gc_pending{adapter=\"my-adapter\"} 7"));
assert!(text.contains("dataforts_blob_disk_capacity_bytes{adapter=\"my-adapter\"}"));
assert!(text.contains("dataforts_blob_disk_used_bytes{adapter=\"my-adapter\"}"));
}
#[test]
fn health_gate_unhealthy_capacity_zero_is_unchanged() {
assert_eq!(
evaluate_health_gate(100, 0, false),
HealthGateAction::Unchanged
);
assert_eq!(
evaluate_health_gate(100, 0, true),
HealthGateAction::Unchanged
);
}
#[test]
fn health_gate_emit_when_over_95_percent_and_currently_healthy() {
assert_eq!(evaluate_health_gate(96, 100, false), HealthGateAction::Emit);
}
#[test]
fn health_gate_unchanged_when_already_unhealthy_and_over_95() {
assert_eq!(
evaluate_health_gate(96, 100, true),
HealthGateAction::Unchanged
);
}
#[test]
fn health_gate_clear_when_under_85_percent_and_currently_unhealthy() {
assert_eq!(evaluate_health_gate(50, 100, true), HealthGateAction::Clear);
}
#[test]
fn health_gate_unchanged_inside_hysteresis_band() {
assert_eq!(
evaluate_health_gate(90, 100, false),
HealthGateAction::Unchanged
);
assert_eq!(
evaluate_health_gate(90, 100, true),
HealthGateAction::Unchanged
);
}
#[test]
fn health_gate_emit_threshold_inclusive() {
assert_eq!(evaluate_health_gate(95, 100, false), HealthGateAction::Emit);
}
#[test]
fn health_gate_clear_threshold_inclusive() {
assert_eq!(evaluate_health_gate(85, 100, true), HealthGateAction::Clear);
}
#[test]
fn prometheus_label_escapes_backslash_quote_newline_carriage_return() {
assert_eq!(escape_prometheus_label(r"a\b"), r"a\\b");
assert_eq!(escape_prometheus_label(r#"a"b"#), r#"a\"b"#);
assert_eq!(escape_prometheus_label("a\nb"), r"a\nb");
assert_eq!(escape_prometheus_label("a\rb"), r"a\rb");
assert_eq!(
escape_prometheus_label("a\\b\"c\nd\re"),
"a\\\\b\\\"c\\nd\\re"
);
assert_eq!(escape_prometheus_label("mesh-prod"), "mesh-prod");
}
#[test]
fn to_prometheus_text_escapes_adapter_id_against_injection() {
let snap = BlobMetricsSnapshot::default();
let body = snap.to_prometheus_text("evil\"\n# bogus_metric{} 1\n#", 0);
assert!(
body.contains(r#"adapter="evil\"\n# bogus_metric{} 1\n#""#),
"adapter_id must appear escaped inside the label value; got:\n{}",
body
);
assert!(
!body.contains("\nbogus_metric{}"),
"raw injected metric line must not survive escaping; got:\n{}",
body
);
}
#[test]
fn overflow_metrics_default_snapshot_is_all_zero() {
let metrics = BlobMetrics::new();
let o = metrics.snapshot().overflow;
assert_eq!(o.pushes_admitted_total, 0);
assert_eq!(o.push_errors_total, 0);
assert_eq!(o.pushed_bytes_total, 0);
assert_eq!(o.rejected_no_target_total, 0);
assert_eq!(o.rejected_no_storage_cap_total, 0);
assert_eq!(o.rejected_not_participating_total, 0);
assert_eq!(o.rejected_sender_not_overflowing_total, 0);
assert_eq!(o.rejected_unhealthy_total, 0);
assert_eq!(o.rejected_scope_mismatch_total, 0);
assert_eq!(o.rejected_insufficient_disk_total, 0);
assert_eq!(o.high_water_triggered_total, 0);
assert_eq!(o.low_water_cleared_total, 0);
assert!(!o.active);
assert_eq!(o.disk_ratio, 0.0);
}
#[test]
fn record_overflow_tick_bumps_counters_and_sets_gauges() {
let metrics = BlobMetrics::new();
let report = super::super::overflow::BlobOverflowTickReport {
admitted: 3,
rejected_no_target: 2,
push_errors: 1,
was_active_at_start: false,
is_active_at_end: true,
disk_ratio_at_start: 0.90,
disk_ratio_at_end: 0.88,
pushed_bytes: 12_345,
};
metrics.record_overflow_tick(&report);
let o = metrics.snapshot().overflow;
assert_eq!(o.pushes_admitted_total, 3);
assert_eq!(o.push_errors_total, 1);
assert_eq!(o.pushed_bytes_total, 12_345);
assert_eq!(o.rejected_no_target_total, 2);
assert_eq!(
o.high_water_triggered_total, 1,
"false → true transition must bump the trigger counter exactly once"
);
assert_eq!(
o.low_water_cleared_total, 0,
"no true → false transition this tick"
);
assert!(o.active);
assert!((o.disk_ratio - 0.88).abs() < 1e-3);
}
#[test]
fn record_overflow_tick_no_transition_does_not_bump_hysteresis_counters() {
let metrics = BlobMetrics::new();
let tick1 = super::super::overflow::BlobOverflowTickReport {
was_active_at_start: false,
is_active_at_end: true,
disk_ratio_at_end: 0.90,
..Default::default()
};
let tick2 = super::super::overflow::BlobOverflowTickReport {
was_active_at_start: true,
is_active_at_end: true,
disk_ratio_at_end: 0.88,
..Default::default()
};
metrics.record_overflow_tick(&tick1);
metrics.record_overflow_tick(&tick2);
let o = metrics.snapshot().overflow;
assert_eq!(o.high_water_triggered_total, 1);
assert_eq!(o.low_water_cleared_total, 0);
}
#[test]
fn record_overflow_tick_clear_transition_bumps_low_water_cleared() {
let metrics = BlobMetrics::new();
let tick_clear = super::super::overflow::BlobOverflowTickReport {
was_active_at_start: true,
is_active_at_end: false,
disk_ratio_at_end: 0.65,
..Default::default()
};
metrics.record_overflow_tick(&tick_clear);
let o = metrics.snapshot().overflow;
assert_eq!(o.high_water_triggered_total, 0);
assert_eq!(o.low_water_cleared_total, 1);
assert!(!o.active);
}
#[test]
fn record_overflow_reject_bumps_each_variant_distinctly() {
let metrics = BlobMetrics::new();
use super::super::admission::OverflowReject as R;
metrics.record_overflow_reject(R::NoStorageCap);
metrics.record_overflow_reject(R::NoStorageCap);
metrics.record_overflow_reject(R::NotParticipating);
metrics.record_overflow_reject(R::SenderNotOverflowing);
metrics.record_overflow_reject(R::Unhealthy);
metrics.record_overflow_reject(R::ScopeMismatch);
metrics.record_overflow_reject(R::InsufficientDisk);
let o = metrics.snapshot().overflow;
assert_eq!(o.rejected_no_storage_cap_total, 2);
assert_eq!(o.rejected_not_participating_total, 1);
assert_eq!(o.rejected_sender_not_overflowing_total, 1);
assert_eq!(o.rejected_unhealthy_total, 1);
assert_eq!(o.rejected_scope_mismatch_total, 1);
assert_eq!(o.rejected_insufficient_disk_total, 1);
}
#[test]
fn to_prometheus_text_emits_overflow_counter_family() {
let metrics = BlobMetrics::new();
let report = super::super::overflow::BlobOverflowTickReport {
admitted: 7,
push_errors: 2,
pushed_bytes: 99_999,
was_active_at_start: false,
is_active_at_end: true,
disk_ratio_at_end: 0.87,
..Default::default()
};
metrics.record_overflow_tick(&report);
let body = metrics.snapshot().to_prometheus_text("op-test", 0);
assert!(
body.contains("dataforts_blob_overflow_pushes_admitted_total{adapter=\"op-test\"} 7")
);
assert!(body.contains("dataforts_blob_overflow_push_errors_total{adapter=\"op-test\"} 2"));
assert!(
body.contains("dataforts_blob_overflow_pushed_bytes_total{adapter=\"op-test\"} 99999")
);
assert!(body
.contains("dataforts_blob_overflow_high_water_triggered_total{adapter=\"op-test\"} 1"));
assert!(body.contains("reason=\"no_storage_cap\""));
assert!(body.contains("reason=\"not_participating\""));
assert!(body.contains("reason=\"sender_not_overflowing\""));
assert!(body.contains("reason=\"unhealthy\""));
assert!(body.contains("reason=\"scope_mismatch\""));
assert!(body.contains("reason=\"insufficient_disk\""));
assert!(body.contains("dataforts_blob_overflow_active{adapter=\"op-test\"} 1"));
assert!(body.contains("dataforts_blob_overflow_disk_ratio{adapter=\"op-test\"} 0.870"));
assert!(body.contains("# TYPE dataforts_blob_overflow_pushes_admitted_total counter"));
assert!(body.contains("# TYPE dataforts_blob_overflow_active gauge"));
}
#[test]
fn to_prometheus_text_overflow_adapter_id_is_escaped() {
let metrics = BlobMetrics::new();
let body = metrics.snapshot().to_prometheus_text("evil\"\nbogus", 0);
assert!(body.contains(r#"dataforts_blob_overflow_active{adapter="evil\"\nbogus"}"#));
assert!(!body.contains("\nbogus\nbogus_metric"));
}
}