mako_engine/metrics.rs
1//! [`EngineMetrics`] — process-level event counters for Prometheus export.
2//!
3//! Provides a **process-global** set of [`std::sync::atomic::AtomicU64`]
4//! counters that the engine and domain handlers increment at runtime. The
5//! [`metrics_api`] handler reads them via [`EngineMetrics::global()`] without
6//! any I/O and renders them in Prometheus text format.
7//!
8//! ## Design rationale
9//!
10//! The mako-engine is a single-process daemon (`makod`). A process-global
11//! static is the simplest, lowest-overhead counter mechanism that:
12//!
13//! - requires **zero allocations** on the hot path (every command dispatch),
14//! - is **async-safe** (atomics need no async context),
15//! - imposes **no external dependency** (no `prometheus` crate in the engine),
16//! - is **observable** from `metrics_api` via a simple method call.
17//!
18//! The trade-off: counters reset on process restart (they are not persisted).
19//! For a single-process daemon this is acceptable — Prometheus's `rate()`
20//! function handles counter resets automatically.
21//!
22//! ## Usage
23//!
24//! ### Incrementing a counter
25//!
26//! ```rust
27//! use mako_engine::metrics::{EngineMetrics, ProcessOutcome};
28//!
29//! // In a workflow handle() or apply() implementation:
30//! EngineMetrics::global().process_initiated("gpke");
31//! EngineMetrics::global().process_completed("gpke", ProcessOutcome::Accepted);
32//! EngineMetrics::global().validation_failed("utilmd", "S2.1");
33//! ```
34//!
35//! ### Reading counters (metrics endpoint)
36//!
37//! ```rust,ignore
38//! let metrics = mako_engine::metrics::EngineMetrics::global();
39//! let snapshot = metrics.snapshot();
40//! // Render snapshot to Prometheus text format.
41//! ```
42//!
43//! [`metrics_api`]: https://docs.rs/makod
44
45use std::{
46 collections::HashMap,
47 sync::{
48 Arc, OnceLock,
49 atomic::{AtomicU64, Ordering},
50 },
51};
52
53// ── ProcessOutcome ────────────────────────────────────────────────────────────
54
55/// Terminal outcome of a MaKo process instance.
56///
57/// Used as the `result` label on [`EngineMetrics::process_completed`].
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub enum ProcessOutcome {
60 /// The counterparty accepted the request (Bestätigung / positive APERAK).
61 Accepted,
62 /// The counterparty rejected the request (Ablehnung / negative APERAK).
63 Rejected,
64 /// The process timed out before a response arrived (24h / 5 WD / 10 WD).
65 Timeout,
66 /// The process was cancelled by the originating ERP before completion.
67 Cancelled,
68}
69
70impl ProcessOutcome {
71 /// Prometheus label value for this outcome.
72 #[must_use]
73 pub fn label(self) -> &'static str {
74 match self {
75 Self::Accepted => "accepted",
76 Self::Rejected => "rejected",
77 Self::Timeout => "timeout",
78 Self::Cancelled => "cancelled",
79 }
80 }
81
82 /// All variants in a fixed order, for metric exposition.
83 pub const ALL: &'static [Self] = &[
84 Self::Accepted,
85 Self::Rejected,
86 Self::Timeout,
87 Self::Cancelled,
88 ];
89}
90
91// ── MetricVec ─────────────────────────────────────────────────────────────────
92
93/// A map of label strings → `AtomicU64` counters.
94///
95/// `MetricVec` is append-only: new label combinations are registered on first
96/// increment and are never removed (counters remain at 0 once created).
97#[derive(Default)]
98struct MetricVec {
99 inner: std::sync::RwLock<HashMap<Box<str>, Arc<AtomicU64>>>,
100}
101
102impl MetricVec {
103 fn increment(&self, label: &str) {
104 // Fast path: label already registered — just increment.
105 {
106 let guard = self.inner.read().expect("MetricVec RwLock poisoned");
107 if let Some(counter) = guard.get(label) {
108 counter.fetch_add(1, Ordering::Relaxed);
109 return;
110 }
111 }
112 // Slow path: first increment for this label — register + increment.
113 let mut guard = self.inner.write().expect("MetricVec RwLock poisoned");
114 let counter = guard
115 .entry(label.into())
116 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
117 counter.fetch_add(1, Ordering::Relaxed);
118 }
119
120 /// Snapshot all label → value pairs, sorted by label for deterministic output.
121 fn snapshot(&self) -> Vec<(Box<str>, u64)> {
122 let guard = self.inner.read().expect("MetricVec RwLock poisoned");
123 let mut pairs: Vec<(Box<str>, u64)> = guard
124 .iter()
125 .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
126 .collect();
127 pairs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
128 pairs
129 }
130}
131
132// ── EngineMetrics ─────────────────────────────────────────────────────────────
133
134/// Process-global engine metrics counters.
135///
136/// Access via [`EngineMetrics::global()`]. The global instance is initialised
137/// once on first access using [`OnceLock`] and lives for the process lifetime.
138///
139/// ## Counter naming (maps 1:1 to Prometheus metric names)
140///
141/// | Method | Prometheus metric | Labels |
142/// |---|---|---|
143/// | [`process_initiated`] | `makod_process_initiated_total` | `family` |
144/// | [`process_completed`] | `makod_process_completed_total` | `family`, `result` |
145/// | [`validation_failed`] | `makod_validation_failed_total` | `message_type`, `release` |
146/// | [`outbox_delivery_attempted`] | `makod_outbox_delivery_attempts_total` | `result` |
147/// | [`deadline_fired`] | `makod_deadline_fired_total` | `family` |
148/// | [`dead_letter_recorded`] | `makod_dead_letter_recorded_total` | `reason` |
149///
150/// For `makod_dead_letter_recorded_total`, the `reason` label is:
151/// - `unknown_pid:<N>` when `DeadLetterReason::UnknownPid { pid: N, .. }` — one label per
152/// distinct PID, enabling per-PID alerting
153/// - a short category string (`unknown_conversation`, `version_mismatch`, etc.)
154/// for all other reason variants
155///
156/// [`process_initiated`]: EngineMetrics::process_initiated
157/// [`process_completed`]: EngineMetrics::process_completed
158/// [`validation_failed`]: EngineMetrics::validation_failed
159/// [`outbox_delivery_attempted`]: EngineMetrics::outbox_delivery_attempted
160/// [`deadline_fired`]: EngineMetrics::deadline_fired
161/// [`dead_letter_recorded`]: EngineMetrics::dead_letter_recorded
162pub struct EngineMetrics {
163 /// `makod_process_initiated_total{family}` — incremented when a new
164 /// process is spawned via `Process::execute(InitiateXxx)`.
165 process_initiated: MetricVec,
166
167 /// `makod_process_completed_total{family,result}` — incremented when a
168 /// process reaches a terminal state.
169 process_completed: MetricVec,
170
171 /// `makod_validation_failed_total{message_type,release}` — incremented
172 /// when an inbound EDIFACT message fails AHB validation.
173 validation_failed: MetricVec,
174
175 /// `makod_outbox_delivery_attempts_total{result}` — incremented by the
176 /// AS4 sender on every delivery attempt.
177 outbox_delivery_attempts: MetricVec,
178
179 /// `makod_deadline_fired_total{family}` — incremented when a deadline
180 /// scheduler fires a `TimeoutExpired` command.
181 deadline_fired: MetricVec,
182
183 /// `makod_dead_letter_recorded_total{reason}` — incremented when a message
184 /// is sent to the dead-letter sink.
185 dead_letter_recorded: MetricVec,
186}
187
188impl EngineMetrics {
189 fn new() -> Self {
190 Self {
191 process_initiated: MetricVec::default(),
192 process_completed: MetricVec::default(),
193 validation_failed: MetricVec::default(),
194 outbox_delivery_attempts: MetricVec::default(),
195 deadline_fired: MetricVec::default(),
196 dead_letter_recorded: MetricVec::default(),
197 }
198 }
199
200 /// Return the process-global [`EngineMetrics`] instance.
201 ///
202 /// The instance is initialised lazily on first call. Subsequent calls
203 /// return the same instance with zero allocation.
204 #[must_use]
205 pub fn global() -> &'static Self {
206 static GLOBAL: OnceLock<EngineMetrics> = OnceLock::new();
207 GLOBAL.get_or_init(Self::new)
208 }
209
210 // ── Increment methods ─────────────────────────────────────────────────────
211
212 /// Increment `makod_process_initiated_total{family=<family>}`.
213 ///
214 /// Call once when a domain workflow receives its first initiating command
215 /// (e.g. `LfAnmeldungCommand::InitiateAnmeldung`).
216 ///
217 /// `family` is the [`EngineModule::name`] value (`"gpke"`, `"wim"`, etc.).
218 ///
219 /// [`EngineModule::name`]: crate::builder::EngineModule::name
220 pub fn process_initiated(&self, family: &str) {
221 self.process_initiated.increment(family);
222 }
223
224 /// Increment `makod_process_completed_total{family=<family>,result=<result>}`.
225 ///
226 /// Call once when a workflow transitions to a **terminal state**
227 /// (`Active`, `Rejected`, timeout, or cancellation).
228 pub fn process_completed(&self, family: &str, outcome: ProcessOutcome) {
229 let label = format!("{family},{}", outcome.label());
230 self.process_completed.increment(&label);
231 }
232
233 /// Increment `makod_validation_failed_total{message_type=<type>,release=<rel>}`.
234 ///
235 /// Call when an inbound message fails `validate()` or `validate_against()`.
236 pub fn validation_failed(&self, message_type: &str, release: &str) {
237 let label = format!("{message_type},{release}");
238 self.validation_failed.increment(&label);
239 }
240
241 /// Increment `makod_outbox_delivery_attempts_total{result=<result>}`.
242 ///
243 /// Call in the AS4 sender after every delivery attempt.
244 /// `result` should be one of `"ok"`, `"transport_error"`, `"partner_unknown"`.
245 pub fn outbox_delivery_attempted(&self, result: &str) {
246 self.outbox_delivery_attempts.increment(result);
247 }
248
249 /// Increment `makod_deadline_fired_total{family=<family>}`.
250 ///
251 /// Call in the deadline scheduler when it dispatches a `TimeoutExpired`.
252 pub fn deadline_fired(&self, family: &str) {
253 self.deadline_fired.increment(family);
254 }
255
256 /// Increment `makod_dead_letter_recorded_total{reason=<reason>}`.
257 ///
258 /// Call in the dead-letter sink when `reject()` is invoked.
259 /// `reason` should match [`DeadLetterReason`]'s label string.
260 ///
261 /// [`DeadLetterReason`]: crate::dead_letter::DeadLetterReason
262 pub fn dead_letter_recorded(&self, reason: &str) {
263 self.dead_letter_recorded.increment(reason);
264 }
265
266 // ── Snapshot ──────────────────────────────────────────────────────────────
267
268 /// Return a snapshot of all counters as a [`MetricsSnapshot`].
269 ///
270 /// This is a **read-only** operation that does not reset any counters.
271 /// Counters are monotonically increasing; Prometheus's `rate()` handles
272 /// counter resets on process restart automatically.
273 #[must_use]
274 pub fn snapshot(&self) -> MetricsSnapshot {
275 MetricsSnapshot {
276 process_initiated: self.process_initiated.snapshot(),
277 process_completed: self.process_completed.snapshot(),
278 validation_failed: self.validation_failed.snapshot(),
279 outbox_delivery_attempts: self.outbox_delivery_attempts.snapshot(),
280 deadline_fired: self.deadline_fired.snapshot(),
281 dead_letter_recorded: self.dead_letter_recorded.snapshot(),
282 }
283 }
284}
285
286// ── MetricsSnapshot ───────────────────────────────────────────────────────────
287
288/// A point-in-time snapshot of all [`EngineMetrics`] counters.
289///
290/// Obtained via [`EngineMetrics::snapshot()`]. All fields are `Vec` of
291/// `(label, count)` pairs sorted by label for deterministic Prometheus output.
292///
293/// The `label` field uses a `","` separator for multi-label metrics
294/// (e.g. `"gpke,accepted"` for `{family="gpke",result="accepted"}`).
295/// The [`render_prometheus`] function splits them appropriately.
296///
297/// [`render_prometheus`]: MetricsSnapshot::render_prometheus
298#[derive(Debug, Clone)]
299pub struct MetricsSnapshot {
300 /// `(family, count)` pairs for `makod_process_initiated_total`.
301 pub process_initiated: Vec<(Box<str>, u64)>,
302 /// `("family,result", count)` pairs for `makod_process_completed_total`.
303 pub process_completed: Vec<(Box<str>, u64)>,
304 /// `("message_type,release", count)` pairs for `makod_validation_failed_total`.
305 pub validation_failed: Vec<(Box<str>, u64)>,
306 /// `(result, count)` pairs for `makod_outbox_delivery_attempts_total`.
307 pub outbox_delivery_attempts: Vec<(Box<str>, u64)>,
308 /// `(family, count)` pairs for `makod_deadline_fired_total`.
309 pub deadline_fired: Vec<(Box<str>, u64)>,
310 /// `(reason, count)` pairs for `makod_dead_letter_recorded_total`.
311 pub dead_letter_recorded: Vec<(Box<str>, u64)>,
312}
313
314impl MetricsSnapshot {
315 /// Render this snapshot to Prometheus text exposition format (v0.0.4).
316 ///
317 /// The output follows the format:
318 /// ```text
319 /// # HELP <metric_name> <description>
320 /// # TYPE <metric_name> counter
321 /// <metric_name>{<labels>} <value>
322 /// ```
323 ///
324 /// Multi-label metrics use a `","` separator in the internal label string,
325 /// which is split into separate `key="value"` pairs in the output.
326 #[must_use]
327 pub fn render_prometheus(&self) -> String {
328 let mut out = String::with_capacity(4096);
329
330 Self::write_counter_vec(
331 &mut out,
332 "makod_process_initiated_total",
333 "Total number of MaKo process instances initiated, by process family.",
334 &["family"],
335 &self.process_initiated,
336 );
337 Self::write_counter_vec(
338 &mut out,
339 "makod_process_completed_total",
340 "Total number of MaKo process instances that reached a terminal state.",
341 &["family", "result"],
342 &self.process_completed,
343 );
344 Self::write_counter_vec(
345 &mut out,
346 "makod_validation_failed_total",
347 "Total number of inbound EDIFACT messages that failed AHB validation.",
348 &["message_type", "release"],
349 &self.validation_failed,
350 );
351 Self::write_counter_vec(
352 &mut out,
353 "makod_outbox_delivery_attempts_total",
354 "Total number of AS4 outbox delivery attempts.",
355 &["result"],
356 &self.outbox_delivery_attempts,
357 );
358 Self::write_counter_vec(
359 &mut out,
360 "makod_deadline_fired_total",
361 "Total number of regulatory deadlines fired (TimeoutExpired dispatched).",
362 &["family"],
363 &self.deadline_fired,
364 );
365 Self::write_counter_vec(
366 &mut out,
367 "makod_dead_letter_recorded_total",
368 "Total number of messages sent to the durable dead-letter sink.",
369 &["reason"],
370 &self.dead_letter_recorded,
371 );
372
373 out
374 }
375
376 /// Write a `counter` metric family to `out`.
377 ///
378 /// `label_names` specifies the label key names in order. Each entry in
379 /// `pairs` has a label value that is either a bare string (single-label
380 /// metrics) or a `","` separated string (multi-label metrics, split in
381 /// order of `label_names`).
382 fn write_counter_vec(
383 out: &mut String,
384 name: &str,
385 help: &str,
386 label_names: &[&str],
387 pairs: &[(Box<str>, u64)],
388 ) {
389 if pairs.is_empty() {
390 return;
391 }
392 out.push_str("# HELP ");
393 out.push_str(name);
394 out.push(' ');
395 out.push_str(help);
396 out.push('\n');
397 out.push_str("# TYPE ");
398 out.push_str(name);
399 out.push_str(" counter\n");
400
401 for (label_str, count) in pairs {
402 let values: Vec<&str> = label_str.splitn(label_names.len(), ',').collect();
403 out.push_str(name);
404 out.push('{');
405 for (i, (key, val)) in label_names.iter().zip(values.iter()).enumerate() {
406 if i > 0 {
407 out.push(',');
408 }
409 out.push_str(key);
410 out.push_str("=\"");
411 // Escape backslash, double-quote, and newline per Prometheus spec.
412 for ch in val.chars() {
413 match ch {
414 '\\' => out.push_str(r"\\"),
415 '"' => out.push_str(r#"\""#),
416 '\n' => out.push_str(r"\n"),
417 _ => out.push(ch),
418 }
419 }
420 out.push('"');
421 }
422 out.push_str("} ");
423 let _ = std::fmt::Write::write_fmt(out, format_args!("{count}"));
424 out.push('\n');
425 }
426 }
427}
428
429// ── Tests ─────────────────────────────────────────────────────────────────────
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 fn fresh_metrics() -> EngineMetrics {
436 EngineMetrics::new()
437 }
438
439 #[test]
440 fn process_initiated_increments_by_family() {
441 let m = fresh_metrics();
442 m.process_initiated("gpke");
443 m.process_initiated("gpke");
444 m.process_initiated("wim");
445
446 let snap = m.snapshot();
447 assert_eq!(snap.process_initiated.len(), 2);
448
449 let gpke = snap
450 .process_initiated
451 .iter()
452 .find(|(k, _)| k.as_ref() == "gpke");
453 assert_eq!(gpke.map(|(_, v)| *v), Some(2));
454
455 let wim = snap
456 .process_initiated
457 .iter()
458 .find(|(k, _)| k.as_ref() == "wim");
459 assert_eq!(wim.map(|(_, v)| *v), Some(1));
460 }
461
462 #[test]
463 fn process_completed_uses_composite_label() {
464 let m = fresh_metrics();
465 m.process_completed("gpke", ProcessOutcome::Accepted);
466 m.process_completed("gpke", ProcessOutcome::Rejected);
467 m.process_completed("gpke", ProcessOutcome::Accepted);
468 m.process_completed("wim", ProcessOutcome::Timeout);
469
470 let snap = m.snapshot();
471 let accepted = snap
472 .process_completed
473 .iter()
474 .find(|(k, _)| k.as_ref() == "gpke,accepted");
475 assert_eq!(accepted.map(|(_, v)| *v), Some(2));
476
477 let timeout = snap
478 .process_completed
479 .iter()
480 .find(|(k, _)| k.as_ref() == "wim,timeout");
481 assert_eq!(timeout.map(|(_, v)| *v), Some(1));
482 }
483
484 #[test]
485 fn snapshot_returns_zero_for_unincremented_metric() {
486 let m = fresh_metrics();
487 // No increments — snapshot should be empty.
488 let snap = m.snapshot();
489 assert!(snap.process_initiated.is_empty());
490 assert!(snap.process_completed.is_empty());
491 }
492
493 #[test]
494 fn render_prometheus_omits_empty_metric_families() {
495 let m = fresh_metrics();
496 m.process_initiated("gpke");
497
498 let output = m.snapshot().render_prometheus();
499
500 // Only the incremented family should appear.
501 assert!(
502 output.contains("makod_process_initiated_total"),
503 "initiated must appear"
504 );
505 assert!(
506 !output.contains("makod_process_completed_total"),
507 "completed must be absent"
508 );
509 assert!(
510 !output.contains("makod_validation_failed_total"),
511 "validation must be absent"
512 );
513 }
514
515 #[test]
516 fn render_prometheus_formats_labels_correctly() {
517 let m = fresh_metrics();
518 m.process_initiated("gpke");
519 m.process_completed("gpke", ProcessOutcome::Accepted);
520 m.validation_failed("utilmd", "S2.1");
521
522 let output = m.snapshot().render_prometheus();
523
524 assert!(
525 output.contains(r#"makod_process_initiated_total{family="gpke"} 1"#),
526 "single-label format must match; output:\n{output}"
527 );
528 assert!(
529 output.contains(r#"makod_process_completed_total{family="gpke",result="accepted"} 1"#),
530 "two-label format must match; output:\n{output}"
531 );
532 assert!(
533 output.contains(
534 r#"makod_validation_failed_total{message_type="utilmd",release="S2.1"} 1"#
535 ),
536 "message_type+release format must match; output:\n{output}"
537 );
538 }
539
540 #[test]
541 fn render_prometheus_escapes_special_chars_in_label_values() {
542 let m = fresh_metrics();
543 // Inject a label value with a backslash and a double-quote.
544 m.outbox_delivery_attempted("ok");
545 m.dead_letter_recorded("unknown_pid:13002");
546
547 let output = m.snapshot().render_prometheus();
548 assert!(
549 output.contains(r#"result="ok""#),
550 "plain label must survive; output:\n{output}"
551 );
552 assert!(
553 output.contains(r#"reason="unknown_pid:13002""#),
554 "reason label must survive; output:\n{output}"
555 );
556 }
557
558 #[test]
559 fn counters_are_monotonically_increasing() {
560 let m = fresh_metrics();
561 for _ in 0..100 {
562 m.deadline_fired("gpke");
563 }
564 let snap = m.snapshot();
565 let gpke = snap
566 .deadline_fired
567 .iter()
568 .find(|(k, _)| k.as_ref() == "gpke");
569 assert_eq!(gpke.map(|(_, v)| *v), Some(100));
570 }
571
572 #[test]
573 fn snapshot_sorted_by_label() {
574 let m = fresh_metrics();
575 // Insert in reverse order to verify sort.
576 m.process_initiated("wim");
577 m.process_initiated("mabis");
578 m.process_initiated("geli-gas");
579 m.process_initiated("gpke");
580
581 let snap = m.snapshot();
582 let labels: Vec<&str> = snap
583 .process_initiated
584 .iter()
585 .map(|(k, _)| k.as_ref())
586 .collect();
587 let mut sorted = labels.clone();
588 sorted.sort_unstable();
589 assert_eq!(labels, sorted, "snapshot must be sorted by label");
590 }
591}