use std::{
collections::HashMap,
sync::{
Arc, OnceLock,
atomic::{AtomicU64, Ordering},
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ProcessOutcome {
Accepted,
Rejected,
Timeout,
Cancelled,
}
impl ProcessOutcome {
#[must_use]
pub fn label(self) -> &'static str {
match self {
Self::Accepted => "accepted",
Self::Rejected => "rejected",
Self::Timeout => "timeout",
Self::Cancelled => "cancelled",
}
}
pub const ALL: &'static [Self] = &[
Self::Accepted,
Self::Rejected,
Self::Timeout,
Self::Cancelled,
];
}
#[derive(Default)]
struct MetricVec {
inner: std::sync::RwLock<HashMap<Box<str>, Arc<AtomicU64>>>,
}
impl MetricVec {
fn increment(&self, label: &str) {
{
let guard = self.inner.read().expect("MetricVec RwLock poisoned");
if let Some(counter) = guard.get(label) {
counter.fetch_add(1, Ordering::Relaxed);
return;
}
}
let mut guard = self.inner.write().expect("MetricVec RwLock poisoned");
let counter = guard
.entry(label.into())
.or_insert_with(|| Arc::new(AtomicU64::new(0)));
counter.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> Vec<(Box<str>, u64)> {
let guard = self.inner.read().expect("MetricVec RwLock poisoned");
let mut pairs: Vec<(Box<str>, u64)> = guard
.iter()
.map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
.collect();
pairs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
pairs
}
}
pub struct EngineMetrics {
process_initiated: MetricVec,
process_completed: MetricVec,
validation_failed: MetricVec,
outbox_delivery_attempts: MetricVec,
deadline_fired: MetricVec,
dead_letter_recorded: MetricVec,
}
impl EngineMetrics {
fn new() -> Self {
Self {
process_initiated: MetricVec::default(),
process_completed: MetricVec::default(),
validation_failed: MetricVec::default(),
outbox_delivery_attempts: MetricVec::default(),
deadline_fired: MetricVec::default(),
dead_letter_recorded: MetricVec::default(),
}
}
#[must_use]
pub fn global() -> &'static Self {
static GLOBAL: OnceLock<EngineMetrics> = OnceLock::new();
GLOBAL.get_or_init(Self::new)
}
pub fn process_initiated(&self, family: &str) {
self.process_initiated.increment(family);
}
pub fn process_completed(&self, family: &str, outcome: ProcessOutcome) {
let label = format!("{family},{}", outcome.label());
self.process_completed.increment(&label);
}
pub fn validation_failed(&self, message_type: &str, release: &str) {
let label = format!("{message_type},{release}");
self.validation_failed.increment(&label);
}
pub fn outbox_delivery_attempted(&self, result: &str) {
self.outbox_delivery_attempts.increment(result);
}
pub fn deadline_fired(&self, family: &str) {
self.deadline_fired.increment(family);
}
pub fn dead_letter_recorded(&self, reason: &str) {
self.dead_letter_recorded.increment(reason);
}
#[must_use]
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
process_initiated: self.process_initiated.snapshot(),
process_completed: self.process_completed.snapshot(),
validation_failed: self.validation_failed.snapshot(),
outbox_delivery_attempts: self.outbox_delivery_attempts.snapshot(),
deadline_fired: self.deadline_fired.snapshot(),
dead_letter_recorded: self.dead_letter_recorded.snapshot(),
}
}
}
#[derive(Debug, Clone)]
pub struct MetricsSnapshot {
pub process_initiated: Vec<(Box<str>, u64)>,
pub process_completed: Vec<(Box<str>, u64)>,
pub validation_failed: Vec<(Box<str>, u64)>,
pub outbox_delivery_attempts: Vec<(Box<str>, u64)>,
pub deadline_fired: Vec<(Box<str>, u64)>,
pub dead_letter_recorded: Vec<(Box<str>, u64)>,
}
impl MetricsSnapshot {
#[must_use]
pub fn render_prometheus(&self) -> String {
let mut out = String::with_capacity(4096);
Self::write_counter_vec(
&mut out,
"makod_process_initiated_total",
"Total number of MaKo process instances initiated, by process family.",
&["family"],
&self.process_initiated,
);
Self::write_counter_vec(
&mut out,
"makod_process_completed_total",
"Total number of MaKo process instances that reached a terminal state.",
&["family", "result"],
&self.process_completed,
);
Self::write_counter_vec(
&mut out,
"makod_validation_failed_total",
"Total number of inbound EDIFACT messages that failed AHB validation.",
&["message_type", "release"],
&self.validation_failed,
);
Self::write_counter_vec(
&mut out,
"makod_outbox_delivery_attempts_total",
"Total number of AS4 outbox delivery attempts.",
&["result"],
&self.outbox_delivery_attempts,
);
Self::write_counter_vec(
&mut out,
"makod_deadline_fired_total",
"Total number of regulatory deadlines fired (TimeoutExpired dispatched).",
&["family"],
&self.deadline_fired,
);
Self::write_counter_vec(
&mut out,
"makod_dead_letter_recorded_total",
"Total number of messages sent to the durable dead-letter sink.",
&["reason"],
&self.dead_letter_recorded,
);
out
}
fn write_counter_vec(
out: &mut String,
name: &str,
help: &str,
label_names: &[&str],
pairs: &[(Box<str>, u64)],
) {
if pairs.is_empty() {
return;
}
out.push_str("# HELP ");
out.push_str(name);
out.push(' ');
out.push_str(help);
out.push('\n');
out.push_str("# TYPE ");
out.push_str(name);
out.push_str(" counter\n");
for (label_str, count) in pairs {
let values: Vec<&str> = label_str.splitn(label_names.len(), ',').collect();
out.push_str(name);
out.push('{');
for (i, (key, val)) in label_names.iter().zip(values.iter()).enumerate() {
if i > 0 {
out.push(',');
}
out.push_str(key);
out.push_str("=\"");
for ch in val.chars() {
match ch {
'\\' => out.push_str(r"\\"),
'"' => out.push_str(r#"\""#),
'\n' => out.push_str(r"\n"),
_ => out.push(ch),
}
}
out.push('"');
}
out.push_str("} ");
let _ = std::fmt::Write::write_fmt(out, format_args!("{count}"));
out.push('\n');
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_metrics() -> EngineMetrics {
EngineMetrics::new()
}
#[test]
fn process_initiated_increments_by_family() {
let m = fresh_metrics();
m.process_initiated("gpke");
m.process_initiated("gpke");
m.process_initiated("wim");
let snap = m.snapshot();
assert_eq!(snap.process_initiated.len(), 2);
let gpke = snap
.process_initiated
.iter()
.find(|(k, _)| k.as_ref() == "gpke");
assert_eq!(gpke.map(|(_, v)| *v), Some(2));
let wim = snap
.process_initiated
.iter()
.find(|(k, _)| k.as_ref() == "wim");
assert_eq!(wim.map(|(_, v)| *v), Some(1));
}
#[test]
fn process_completed_uses_composite_label() {
let m = fresh_metrics();
m.process_completed("gpke", ProcessOutcome::Accepted);
m.process_completed("gpke", ProcessOutcome::Rejected);
m.process_completed("gpke", ProcessOutcome::Accepted);
m.process_completed("wim", ProcessOutcome::Timeout);
let snap = m.snapshot();
let accepted = snap
.process_completed
.iter()
.find(|(k, _)| k.as_ref() == "gpke,accepted");
assert_eq!(accepted.map(|(_, v)| *v), Some(2));
let timeout = snap
.process_completed
.iter()
.find(|(k, _)| k.as_ref() == "wim,timeout");
assert_eq!(timeout.map(|(_, v)| *v), Some(1));
}
#[test]
fn snapshot_returns_zero_for_unincremented_metric() {
let m = fresh_metrics();
let snap = m.snapshot();
assert!(snap.process_initiated.is_empty());
assert!(snap.process_completed.is_empty());
}
#[test]
fn render_prometheus_omits_empty_metric_families() {
let m = fresh_metrics();
m.process_initiated("gpke");
let output = m.snapshot().render_prometheus();
assert!(
output.contains("makod_process_initiated_total"),
"initiated must appear"
);
assert!(
!output.contains("makod_process_completed_total"),
"completed must be absent"
);
assert!(
!output.contains("makod_validation_failed_total"),
"validation must be absent"
);
}
#[test]
fn render_prometheus_formats_labels_correctly() {
let m = fresh_metrics();
m.process_initiated("gpke");
m.process_completed("gpke", ProcessOutcome::Accepted);
m.validation_failed("utilmd", "S2.1");
let output = m.snapshot().render_prometheus();
assert!(
output.contains(r#"makod_process_initiated_total{family="gpke"} 1"#),
"single-label format must match; output:\n{output}"
);
assert!(
output.contains(r#"makod_process_completed_total{family="gpke",result="accepted"} 1"#),
"two-label format must match; output:\n{output}"
);
assert!(
output.contains(
r#"makod_validation_failed_total{message_type="utilmd",release="S2.1"} 1"#
),
"message_type+release format must match; output:\n{output}"
);
}
#[test]
fn render_prometheus_escapes_special_chars_in_label_values() {
let m = fresh_metrics();
m.outbox_delivery_attempted("ok");
m.dead_letter_recorded("unknown_pid:13002");
let output = m.snapshot().render_prometheus();
assert!(
output.contains(r#"result="ok""#),
"plain label must survive; output:\n{output}"
);
assert!(
output.contains(r#"reason="unknown_pid:13002""#),
"reason label must survive; output:\n{output}"
);
}
#[test]
fn counters_are_monotonically_increasing() {
let m = fresh_metrics();
for _ in 0..100 {
m.deadline_fired("gpke");
}
let snap = m.snapshot();
let gpke = snap
.deadline_fired
.iter()
.find(|(k, _)| k.as_ref() == "gpke");
assert_eq!(gpke.map(|(_, v)| *v), Some(100));
}
#[test]
fn snapshot_sorted_by_label() {
let m = fresh_metrics();
m.process_initiated("wim");
m.process_initiated("mabis");
m.process_initiated("geli-gas");
m.process_initiated("gpke");
let snap = m.snapshot();
let labels: Vec<&str> = snap
.process_initiated
.iter()
.map(|(k, _)| k.as_ref())
.collect();
let mut sorted = labels.clone();
sorted.sort_unstable();
assert_eq!(labels, sorted, "snapshot must be sorted by label");
}
}