Skip to main content

mako_engine/
metrics.rs

1//! [`EngineMetrics`] — process-level event counters for Prometheus export.
2//!
3//! Provides a **process-global** set of [`std::sync::atomic::AtomicU64`]
4//! counters that the engine and domain handlers increment at runtime. The
5//! [`metrics_api`] handler reads them via [`EngineMetrics::global()`] without
6//! any I/O and renders them in Prometheus text format.
7//!
8//! ## Design rationale
9//!
10//! The mako-engine is a single-process daemon (`makod`). A process-global
11//! static is the simplest, lowest-overhead counter mechanism that:
12//!
13//! - requires **zero allocations** on the hot path (every command dispatch),
14//! - is **async-safe** (atomics need no async context),
15//! - imposes **no external dependency** (no `prometheus` crate in the engine),
16//! - is **observable** from `metrics_api` via a simple method call.
17//!
18//! The trade-off: counters reset on process restart (they are not persisted).
19//! For a single-process daemon this is acceptable — Prometheus's `rate()`
20//! function handles counter resets automatically.
21//!
22//! ## Usage
23//!
24//! ### Incrementing a counter
25//!
26//! ```rust
27//! use mako_engine::metrics::{EngineMetrics, ProcessOutcome};
28//!
29//! // In a workflow handle() or apply() implementation:
30//! EngineMetrics::global().process_initiated("gpke");
31//! EngineMetrics::global().process_completed("gpke", ProcessOutcome::Accepted);
32//! EngineMetrics::global().validation_failed("utilmd", "S2.1");
33//! ```
34//!
35//! ### Reading counters (metrics endpoint)
36//!
37//! ```rust,ignore
38//! let metrics = mako_engine::metrics::EngineMetrics::global();
39//! let snapshot = metrics.snapshot();
40//! // Render snapshot to Prometheus text format.
41//! ```
42//!
43//! [`metrics_api`]: https://docs.rs/makod
44
45use std::{
46    collections::HashMap,
47    sync::{
48        Arc, OnceLock,
49        atomic::{AtomicU64, Ordering},
50    },
51};
52
53// ── ProcessOutcome ────────────────────────────────────────────────────────────
54
55/// Terminal outcome of a MaKo process instance.
56///
57/// Used as the `result` label on [`EngineMetrics::process_completed`].
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub enum ProcessOutcome {
60    /// The counterparty accepted the request (Bestätigung / positive APERAK).
61    Accepted,
62    /// The counterparty rejected the request (Ablehnung / negative APERAK).
63    Rejected,
64    /// The process timed out before a response arrived (24h / 5 WD / 10 WD).
65    Timeout,
66    /// The process was cancelled by the originating ERP before completion.
67    Cancelled,
68}
69
70impl ProcessOutcome {
71    /// Prometheus label value for this outcome.
72    #[must_use]
73    pub fn label(self) -> &'static str {
74        match self {
75            Self::Accepted => "accepted",
76            Self::Rejected => "rejected",
77            Self::Timeout => "timeout",
78            Self::Cancelled => "cancelled",
79        }
80    }
81
82    /// All variants in a fixed order, for metric exposition.
83    pub const ALL: &'static [Self] = &[
84        Self::Accepted,
85        Self::Rejected,
86        Self::Timeout,
87        Self::Cancelled,
88    ];
89}
90
91// ── MetricVec ─────────────────────────────────────────────────────────────────
92
93/// A map of label strings → `AtomicU64` counters.
94///
95/// `MetricVec` is append-only: new label combinations are registered on first
96/// increment and are never removed (counters remain at 0 once created).
97#[derive(Default)]
98struct MetricVec {
99    inner: std::sync::RwLock<HashMap<Box<str>, Arc<AtomicU64>>>,
100}
101
102impl MetricVec {
103    fn increment(&self, label: &str) {
104        // Fast path: label already registered — just increment.
105        {
106            let guard = self.inner.read().expect("MetricVec RwLock poisoned");
107            if let Some(counter) = guard.get(label) {
108                counter.fetch_add(1, Ordering::Relaxed);
109                return;
110            }
111        }
112        // Slow path: first increment for this label — register + increment.
113        let mut guard = self.inner.write().expect("MetricVec RwLock poisoned");
114        let counter = guard
115            .entry(label.into())
116            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
117        counter.fetch_add(1, Ordering::Relaxed);
118    }
119
120    /// Snapshot all label → value pairs, sorted by label for deterministic output.
121    fn snapshot(&self) -> Vec<(Box<str>, u64)> {
122        let guard = self.inner.read().expect("MetricVec RwLock poisoned");
123        let mut pairs: Vec<(Box<str>, u64)> = guard
124            .iter()
125            .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
126            .collect();
127        pairs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
128        pairs
129    }
130}
131
132// ── EngineMetrics ─────────────────────────────────────────────────────────────
133
134/// Process-global engine metrics counters.
135///
136/// Access via [`EngineMetrics::global()`]. The global instance is initialised
137/// once on first access using [`OnceLock`] and lives for the process lifetime.
138///
139/// ## Counter naming (maps 1:1 to Prometheus metric names)
140///
141/// | Method | Prometheus metric | Labels |
142/// |---|---|---|
143/// | [`process_initiated`] | `makod_process_initiated_total` | `family` |
144/// | [`process_completed`] | `makod_process_completed_total` | `family`, `result` |
145/// | [`validation_failed`] | `makod_validation_failed_total` | `message_type`, `release` |
146/// | [`outbox_delivery_attempted`] | `makod_outbox_delivery_attempts_total` | `result` |
147/// | [`deadline_fired`] | `makod_deadline_fired_total` | `family` |
148/// | [`dead_letter_recorded`] | `makod_dead_letter_recorded_total` | `reason` |
149///
150/// For `makod_dead_letter_recorded_total`, the `reason` label is:
151/// - `unknown_pid:<N>` when `DeadLetterReason::UnknownPid(N)` — one label per
152///   distinct PID, enabling per-PID alerting
153/// - a short category string (`unknown_conversation`, `version_mismatch`, etc.)
154///   for all other reason variants
155///
156/// [`process_initiated`]: EngineMetrics::process_initiated
157/// [`process_completed`]: EngineMetrics::process_completed
158/// [`validation_failed`]: EngineMetrics::validation_failed
159/// [`outbox_delivery_attempted`]: EngineMetrics::outbox_delivery_attempted
160/// [`deadline_fired`]: EngineMetrics::deadline_fired
161/// [`dead_letter_recorded`]: EngineMetrics::dead_letter_recorded
162pub struct EngineMetrics {
163    /// `makod_process_initiated_total{family}` — incremented when a new
164    /// process is spawned via `Process::execute(InitiateXxx)`.
165    process_initiated: MetricVec,
166
167    /// `makod_process_completed_total{family,result}` — incremented when a
168    /// process reaches a terminal state.
169    process_completed: MetricVec,
170
171    /// `makod_validation_failed_total{message_type,release}` — incremented
172    /// when an inbound EDIFACT message fails AHB validation.
173    validation_failed: MetricVec,
174
175    /// `makod_outbox_delivery_attempts_total{result}` — incremented by the
176    /// AS4 sender on every delivery attempt.
177    outbox_delivery_attempts: MetricVec,
178
179    /// `makod_deadline_fired_total{family}` — incremented when a deadline
180    /// scheduler fires a `TimeoutExpired` command.
181    deadline_fired: MetricVec,
182
183    /// `makod_dead_letter_recorded_total{reason}` — incremented when a message
184    /// is sent to the dead-letter sink.
185    dead_letter_recorded: MetricVec,
186}
187
188impl EngineMetrics {
189    fn new() -> Self {
190        Self {
191            process_initiated: MetricVec::default(),
192            process_completed: MetricVec::default(),
193            validation_failed: MetricVec::default(),
194            outbox_delivery_attempts: MetricVec::default(),
195            deadline_fired: MetricVec::default(),
196            dead_letter_recorded: MetricVec::default(),
197        }
198    }
199
200    /// Return the process-global [`EngineMetrics`] instance.
201    ///
202    /// The instance is initialised lazily on first call. Subsequent calls
203    /// return the same instance with zero allocation.
204    #[must_use]
205    pub fn global() -> &'static Self {
206        static GLOBAL: OnceLock<EngineMetrics> = OnceLock::new();
207        GLOBAL.get_or_init(Self::new)
208    }
209
210    // ── Increment methods ─────────────────────────────────────────────────────
211
212    /// Increment `makod_process_initiated_total{family=<family>}`.
213    ///
214    /// Call once when a domain workflow receives its first initiating command
215    /// (e.g. `LfAnmeldungCommand::InitiateAnmeldung`).
216    ///
217    /// `family` is the [`EngineModule::name`] value (`"gpke"`, `"wim"`, etc.).
218    ///
219    /// [`EngineModule::name`]: crate::builder::EngineModule::name
220    pub fn process_initiated(&self, family: &str) {
221        self.process_initiated.increment(family);
222    }
223
224    /// Increment `makod_process_completed_total{family=<family>,result=<result>}`.
225    ///
226    /// Call once when a workflow transitions to a **terminal state**
227    /// (`Active`, `Rejected`, timeout, or cancellation).
228    pub fn process_completed(&self, family: &str, outcome: ProcessOutcome) {
229        let label = format!("{family},{}", outcome.label());
230        self.process_completed.increment(&label);
231    }
232
233    /// Increment `makod_validation_failed_total{message_type=<type>,release=<rel>}`.
234    ///
235    /// Call when an inbound message fails `validate()` or `validate_against()`.
236    pub fn validation_failed(&self, message_type: &str, release: &str) {
237        let label = format!("{message_type},{release}");
238        self.validation_failed.increment(&label);
239    }
240
241    /// Increment `makod_outbox_delivery_attempts_total{result=<result>}`.
242    ///
243    /// Call in the AS4 sender after every delivery attempt.
244    /// `result` should be one of `"ok"`, `"transport_error"`, `"partner_unknown"`.
245    pub fn outbox_delivery_attempted(&self, result: &str) {
246        self.outbox_delivery_attempts.increment(result);
247    }
248
249    /// Increment `makod_deadline_fired_total{family=<family>}`.
250    ///
251    /// Call in the deadline scheduler when it dispatches a `TimeoutExpired`.
252    pub fn deadline_fired(&self, family: &str) {
253        self.deadline_fired.increment(family);
254    }
255
256    /// Increment `makod_dead_letter_recorded_total{reason=<reason>}`.
257    ///
258    /// Call in the dead-letter sink when `reject()` is invoked.
259    /// `reason` should match [`DeadLetterReason`]'s label string.
260    ///
261    /// [`DeadLetterReason`]: crate::dead_letter::DeadLetterReason
262    pub fn dead_letter_recorded(&self, reason: &str) {
263        self.dead_letter_recorded.increment(reason);
264    }
265
266    // ── Snapshot ──────────────────────────────────────────────────────────────
267
268    /// Return a snapshot of all counters as a [`MetricsSnapshot`].
269    ///
270    /// This is a **read-only** operation that does not reset any counters.
271    /// Counters are monotonically increasing; Prometheus's `rate()` handles
272    /// counter resets on process restart automatically.
273    #[must_use]
274    pub fn snapshot(&self) -> MetricsSnapshot {
275        MetricsSnapshot {
276            process_initiated: self.process_initiated.snapshot(),
277            process_completed: self.process_completed.snapshot(),
278            validation_failed: self.validation_failed.snapshot(),
279            outbox_delivery_attempts: self.outbox_delivery_attempts.snapshot(),
280            deadline_fired: self.deadline_fired.snapshot(),
281            dead_letter_recorded: self.dead_letter_recorded.snapshot(),
282        }
283    }
284}
285
286// ── MetricsSnapshot ───────────────────────────────────────────────────────────
287
288/// A point-in-time snapshot of all [`EngineMetrics`] counters.
289///
290/// Obtained via [`EngineMetrics::snapshot()`]. All fields are `Vec` of
291/// `(label, count)` pairs sorted by label for deterministic Prometheus output.
292///
293/// The `label` field uses a `","` separator for multi-label metrics
294/// (e.g. `"gpke,accepted"` for `{family="gpke",result="accepted"}`).
295/// The [`render_prometheus`] function splits them appropriately.
296///
297/// [`render_prometheus`]: MetricsSnapshot::render_prometheus
298#[derive(Debug, Clone)]
299pub struct MetricsSnapshot {
300    /// `(family, count)` pairs for `makod_process_initiated_total`.
301    pub process_initiated: Vec<(Box<str>, u64)>,
302    /// `("family,result", count)` pairs for `makod_process_completed_total`.
303    pub process_completed: Vec<(Box<str>, u64)>,
304    /// `("message_type,release", count)` pairs for `makod_validation_failed_total`.
305    pub validation_failed: Vec<(Box<str>, u64)>,
306    /// `(result, count)` pairs for `makod_outbox_delivery_attempts_total`.
307    pub outbox_delivery_attempts: Vec<(Box<str>, u64)>,
308    /// `(family, count)` pairs for `makod_deadline_fired_total`.
309    pub deadline_fired: Vec<(Box<str>, u64)>,
310    /// `(reason, count)` pairs for `makod_dead_letter_recorded_total`.
311    pub dead_letter_recorded: Vec<(Box<str>, u64)>,
312}
313
314impl MetricsSnapshot {
315    /// Render this snapshot to Prometheus text exposition format (v0.0.4).
316    ///
317    /// The output follows the format:
318    /// ```text
319    /// # HELP <metric_name> <description>
320    /// # TYPE <metric_name> counter
321    /// <metric_name>{<labels>} <value>
322    /// ```
323    ///
324    /// Multi-label metrics use a `","` separator in the internal label string,
325    /// which is split into separate `key="value"` pairs in the output.
326    #[must_use]
327    pub fn render_prometheus(&self) -> String {
328        let mut out = String::with_capacity(4096);
329
330        Self::write_counter_vec(
331            &mut out,
332            "makod_process_initiated_total",
333            "Total number of MaKo process instances initiated, by process family.",
334            &["family"],
335            &self.process_initiated,
336        );
337        Self::write_counter_vec(
338            &mut out,
339            "makod_process_completed_total",
340            "Total number of MaKo process instances that reached a terminal state.",
341            &["family", "result"],
342            &self.process_completed,
343        );
344        Self::write_counter_vec(
345            &mut out,
346            "makod_validation_failed_total",
347            "Total number of inbound EDIFACT messages that failed AHB validation.",
348            &["message_type", "release"],
349            &self.validation_failed,
350        );
351        Self::write_counter_vec(
352            &mut out,
353            "makod_outbox_delivery_attempts_total",
354            "Total number of AS4 outbox delivery attempts.",
355            &["result"],
356            &self.outbox_delivery_attempts,
357        );
358        Self::write_counter_vec(
359            &mut out,
360            "makod_deadline_fired_total",
361            "Total number of regulatory deadlines fired (TimeoutExpired dispatched).",
362            &["family"],
363            &self.deadline_fired,
364        );
365        Self::write_counter_vec(
366            &mut out,
367            "makod_dead_letter_recorded_total",
368            "Total number of messages sent to the durable dead-letter sink.",
369            &["reason"],
370            &self.dead_letter_recorded,
371        );
372
373        out
374    }
375
376    /// Write a `counter` metric family to `out`.
377    ///
378    /// `label_names` specifies the label key names in order.  Each entry in
379    /// `pairs` has a label value that is either a bare string (single-label
380    /// metrics) or a `","` separated string (multi-label metrics, split in
381    /// order of `label_names`).
382    fn write_counter_vec(
383        out: &mut String,
384        name: &str,
385        help: &str,
386        label_names: &[&str],
387        pairs: &[(Box<str>, u64)],
388    ) {
389        if pairs.is_empty() {
390            return;
391        }
392        out.push_str("# HELP ");
393        out.push_str(name);
394        out.push(' ');
395        out.push_str(help);
396        out.push('\n');
397        out.push_str("# TYPE ");
398        out.push_str(name);
399        out.push_str(" counter\n");
400
401        for (label_str, count) in pairs {
402            let values: Vec<&str> = label_str.splitn(label_names.len(), ',').collect();
403            out.push_str(name);
404            out.push('{');
405            for (i, (key, val)) in label_names.iter().zip(values.iter()).enumerate() {
406                if i > 0 {
407                    out.push(',');
408                }
409                out.push_str(key);
410                out.push_str("=\"");
411                // Escape backslash, double-quote, and newline per Prometheus spec.
412                for ch in val.chars() {
413                    match ch {
414                        '\\' => out.push_str(r"\\"),
415                        '"' => out.push_str(r#"\""#),
416                        '\n' => out.push_str(r"\n"),
417                        _ => out.push(ch),
418                    }
419                }
420                out.push('"');
421            }
422            out.push_str("} ");
423            let _ = std::fmt::Write::write_fmt(out, format_args!("{count}"));
424            out.push('\n');
425        }
426    }
427}
428
429// ── Tests ─────────────────────────────────────────────────────────────────────
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    fn fresh_metrics() -> EngineMetrics {
436        EngineMetrics::new()
437    }
438
439    #[test]
440    fn process_initiated_increments_by_family() {
441        let m = fresh_metrics();
442        m.process_initiated("gpke");
443        m.process_initiated("gpke");
444        m.process_initiated("wim");
445
446        let snap = m.snapshot();
447        assert_eq!(snap.process_initiated.len(), 2);
448
449        let gpke = snap
450            .process_initiated
451            .iter()
452            .find(|(k, _)| k.as_ref() == "gpke");
453        assert_eq!(gpke.map(|(_, v)| *v), Some(2));
454
455        let wim = snap
456            .process_initiated
457            .iter()
458            .find(|(k, _)| k.as_ref() == "wim");
459        assert_eq!(wim.map(|(_, v)| *v), Some(1));
460    }
461
462    #[test]
463    fn process_completed_uses_composite_label() {
464        let m = fresh_metrics();
465        m.process_completed("gpke", ProcessOutcome::Accepted);
466        m.process_completed("gpke", ProcessOutcome::Rejected);
467        m.process_completed("gpke", ProcessOutcome::Accepted);
468        m.process_completed("wim", ProcessOutcome::Timeout);
469
470        let snap = m.snapshot();
471        let accepted = snap
472            .process_completed
473            .iter()
474            .find(|(k, _)| k.as_ref() == "gpke,accepted");
475        assert_eq!(accepted.map(|(_, v)| *v), Some(2));
476
477        let timeout = snap
478            .process_completed
479            .iter()
480            .find(|(k, _)| k.as_ref() == "wim,timeout");
481        assert_eq!(timeout.map(|(_, v)| *v), Some(1));
482    }
483
484    #[test]
485    fn snapshot_returns_zero_for_unincremented_metric() {
486        let m = fresh_metrics();
487        // No increments — snapshot should be empty.
488        let snap = m.snapshot();
489        assert!(snap.process_initiated.is_empty());
490        assert!(snap.process_completed.is_empty());
491    }
492
493    #[test]
494    fn render_prometheus_omits_empty_metric_families() {
495        let m = fresh_metrics();
496        m.process_initiated("gpke");
497
498        let output = m.snapshot().render_prometheus();
499
500        // Only the incremented family should appear.
501        assert!(
502            output.contains("makod_process_initiated_total"),
503            "initiated must appear"
504        );
505        assert!(
506            !output.contains("makod_process_completed_total"),
507            "completed must be absent"
508        );
509        assert!(
510            !output.contains("makod_validation_failed_total"),
511            "validation must be absent"
512        );
513    }
514
515    #[test]
516    fn render_prometheus_formats_labels_correctly() {
517        let m = fresh_metrics();
518        m.process_initiated("gpke");
519        m.process_completed("gpke", ProcessOutcome::Accepted);
520        m.validation_failed("utilmd", "S2.1");
521
522        let output = m.snapshot().render_prometheus();
523
524        assert!(
525            output.contains(r#"makod_process_initiated_total{family="gpke"} 1"#),
526            "single-label format must match; output:\n{output}"
527        );
528        assert!(
529            output.contains(r#"makod_process_completed_total{family="gpke",result="accepted"} 1"#),
530            "two-label format must match; output:\n{output}"
531        );
532        assert!(
533            output.contains(
534                r#"makod_validation_failed_total{message_type="utilmd",release="S2.1"} 1"#
535            ),
536            "message_type+release format must match; output:\n{output}"
537        );
538    }
539
540    #[test]
541    fn render_prometheus_escapes_special_chars_in_label_values() {
542        let m = fresh_metrics();
543        // Inject a label value with a backslash and a double-quote.
544        m.outbox_delivery_attempted("ok");
545        m.dead_letter_recorded("unknown_pid:13002");
546
547        let output = m.snapshot().render_prometheus();
548        assert!(
549            output.contains(r#"result="ok""#),
550            "plain label must survive; output:\n{output}"
551        );
552        assert!(
553            output.contains(r#"reason="unknown_pid:13002""#),
554            "reason label must survive; output:\n{output}"
555        );
556    }
557
558    #[test]
559    fn counters_are_monotonically_increasing() {
560        let m = fresh_metrics();
561        for _ in 0..100 {
562            m.deadline_fired("gpke");
563        }
564        let snap = m.snapshot();
565        let gpke = snap
566            .deadline_fired
567            .iter()
568            .find(|(k, _)| k.as_ref() == "gpke");
569        assert_eq!(gpke.map(|(_, v)| *v), Some(100));
570    }
571
572    #[test]
573    fn snapshot_sorted_by_label() {
574        let m = fresh_metrics();
575        // Insert in reverse order to verify sort.
576        m.process_initiated("wim");
577        m.process_initiated("mabis");
578        m.process_initiated("geli-gas");
579        m.process_initiated("gpke");
580
581        let snap = m.snapshot();
582        let labels: Vec<&str> = snap
583            .process_initiated
584            .iter()
585            .map(|(k, _)| k.as_ref())
586            .collect();
587        let mut sorted = labels.clone();
588        sorted.sort_unstable();
589        assert_eq!(labels, sorted, "snapshot must be sorted by label");
590    }
591}