Skip to main content

hyperi_rustlib/worker/engine/
metrics.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/metrics.rs
3// Purpose:   Metric registration and threshold gauge emission for BatchEngine
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use crate::metrics::MetricsManager;
10
11use super::config::BatchProcessingConfig;
12
13/// Register all `BatchEngine` metrics with the `MetricsManager`.
14///
15/// Called by [`BatchEngine::auto_wire`](super::BatchEngine::auto_wire). Registers
16/// descriptors for all operational metrics and immediately emits the current
17/// config thresholds as gauges (for Grafana overlay of scaling decision lines).
18pub fn register(manager: &MetricsManager, config: &BatchProcessingConfig) {
19    // Counters
20    let _ = manager.counter(
21        "batch_engine_messages_received_total",
22        "Messages received from transport",
23    );
24    let _ = manager.counter(
25        "batch_engine_messages_parsed_total",
26        "Messages successfully SIMD-parsed",
27    );
28    let _ = manager.counter(
29        "batch_engine_messages_filtered_total",
30        "Messages filtered at pre-route",
31    );
32    let _ = manager.counter("batch_engine_messages_dlq_total", "Messages routed to DLQ");
33    let _ = manager.counter("batch_engine_parse_errors_total", "Parse failures");
34
35    // Histograms
36    let _ = manager.histogram(
37        "batch_engine_parse_duration_seconds",
38        "SIMD parse time per chunk",
39    );
40    let _ = manager.histogram(
41        "batch_engine_transform_duration_seconds",
42        "App transform time per chunk",
43    );
44    let _ = manager.histogram("batch_engine_chunk_size", "Actual items per chunk");
45    let _ = manager.histogram(
46        "batch_engine_pre_route_duration_seconds",
47        "Pre-route extraction time per chunk",
48    );
49
50    // Gauges
51    let _ = manager.gauge(
52        "batch_engine_intern_table_size",
53        "Interned field name count",
54    );
55
56    // Self-regulation governor observability (governor feature). Registered so
57    // the metrics manifest advertises them even before the first throttle.
58    #[cfg(feature = "governor")]
59    {
60        let _ = manager.gauge(
61            "self_regulation_byte_budget",
62            "Current AIMD byte budget (inbound block size lever)",
63        );
64        let _ = manager.gauge(
65            "pressure_ratio",
66            "Combined self-regulation pressure level (0.0-1.0)",
67        );
68        let _ = manager.gauge(
69            "inbound_paused",
70            "1 while the inbound gate is holding under pressure, else 0",
71        );
72        let _ = manager.counter(
73            "self_regulation_inbound_pauses_total",
74            "Inbound gate pause (rising-edge) events",
75        );
76    }
77
78    // Config thresholds as gauges (emitted immediately).
79    emit_thresholds(config);
80}
81
82/// Emit config threshold values as gauge metrics.
83///
84/// Called at startup (via `register`) and optionally on config reload.
85/// Metric names are mechanically derived from config field names so that
86/// Grafana dashboards can overlay config changes on operational graphs.
87pub fn emit_thresholds(config: &BatchProcessingConfig) {
88    metrics::gauge!("batch_engine_max_chunk_size").set(config.max_chunk_size as f64);
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    #[test]
96    fn register_does_not_panic() {
97        let manager = MetricsManager::new_for_test("test_engine_metrics");
98        let config = BatchProcessingConfig::default();
99        // Should complete without panic even with no recorder installed.
100        register(&manager, &config);
101    }
102
103    #[test]
104    fn emit_thresholds_does_not_panic() {
105        let config = BatchProcessingConfig::default();
106        // metrics macros are no-ops when no recorder is installed.
107        emit_thresholds(&config);
108    }
109
110    #[test]
111    fn register_returns_handles() {
112        let manager = MetricsManager::new_for_test("test_engine_metrics_handles");
113        let config = BatchProcessingConfig::default();
114        // Calling twice should be idempotent.
115        register(&manager, &config);
116        register(&manager, &config);
117    }
118}