Skip to main content

hyperi_rustlib/worker/engine/
metrics.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/metrics.rs
3// Purpose:   Metric registration and threshold gauge emission for BatchEngine
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use crate::metrics::MetricsManager;
10
11use super::config::BatchProcessingConfig;
12
13/// Register all `BatchEngine` metrics with the `MetricsManager`.
14///
15/// Called by [`BatchEngine::auto_wire`](super::BatchEngine::auto_wire). Registers
16/// descriptors for all operational metrics and immediately emits the current
17/// config thresholds as gauges (for Grafana overlay of scaling decision lines).
18pub fn register(manager: &MetricsManager, config: &BatchProcessingConfig) {
19    // Counters
20    let _ = manager.counter(
21        "batch_engine_messages_received_total",
22        "Messages received from transport",
23    );
24    let _ = manager.counter(
25        "batch_engine_messages_parsed_total",
26        "Messages successfully SIMD-parsed",
27    );
28    let _ = manager.counter(
29        "batch_engine_messages_filtered_total",
30        "Messages filtered at pre-route",
31    );
32    let _ = manager.counter("batch_engine_messages_dlq_total", "Messages routed to DLQ");
33    let _ = manager.counter("batch_engine_parse_errors_total", "Parse failures");
34
35    // Histograms
36    let _ = manager.histogram(
37        "batch_engine_parse_duration_seconds",
38        "SIMD parse time per chunk",
39    );
40    let _ = manager.histogram(
41        "batch_engine_transform_duration_seconds",
42        "App transform time per chunk",
43    );
44    let _ = manager.histogram_count("batch_engine_chunk_size", "Actual items per chunk");
45    let _ = manager.histogram(
46        "batch_engine_pre_route_duration_seconds",
47        "Pre-route extraction time per chunk",
48    );
49
50    // Gauges
51    let _ = manager.gauge(
52        "batch_engine_intern_table_size",
53        "Interned field name count",
54    );
55
56    // Self-regulation governor observability (governor feature). Registered so
57    // the metrics manifest advertises them even before the first throttle.
58    #[cfg(feature = "governor")]
59    {
60        // Names MUST match the emit sites (gate.rs / driver.rs): all carry the
61        // `self_regulation_` domain prefix so the manifest advertises the same
62        // series that actually carry data (a bare `pressure_ratio` would also
63        // collide with MemoryGuard/ScalingPressure on the registry).
64        let _ = manager.gauge(
65            "self_regulation_byte_budget",
66            "Current AIMD byte budget (inbound block size lever)",
67        );
68        // dual-emit: drop OLD in next release (MIGRATIONS) -- `_bytes` suffix
69        // matches the Prometheus/OTel base-unit convention.
70        let _ = manager.gauge(
71            "self_regulation_byte_budget_bytes",
72            "Current AIMD byte budget in bytes (inbound block size lever)",
73        );
74        let _ = manager.gauge(
75            "self_regulation_recv_block_bytes",
76            "Bytes in the most recent received block (vs the byte budget)",
77        );
78        let _ = manager.gauge(
79            "self_regulation_pressure_ratio",
80            "Combined self-regulation pressure level (0.0-1.0)",
81        );
82        let _ = manager.gauge(
83            "self_regulation_inbound_paused",
84            "1 while the inbound gate is holding under pressure, else 0 (per source)",
85        );
86        let _ = manager.counter(
87            "self_regulation_inbound_pauses_total",
88            "Inbound gate pause (rising-edge) events (per source)",
89        );
90        let _ = manager.counter(
91            "self_regulation_kafka_gate_errors_total",
92            "Kafka pause/resume actuator failures (brake degraded)",
93        );
94    }
95
96    // Config thresholds as gauges (emitted immediately).
97    emit_thresholds(config);
98}
99
100/// Emit config threshold values as gauge metrics.
101///
102/// Called at startup (via `register`) and optionally on config reload.
103/// Metric names are mechanically derived from config field names so that
104/// Grafana dashboards can overlay config changes on operational graphs.
105pub fn emit_thresholds(config: &BatchProcessingConfig) {
106    metrics::gauge!("batch_engine_max_chunk_size").set(config.max_chunk_size as f64);
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn register_does_not_panic() {
115        let manager = MetricsManager::new_for_test("test_engine_metrics");
116        let config = BatchProcessingConfig::default();
117        // Should complete without panic even with no recorder installed.
118        register(&manager, &config);
119    }
120
121    #[test]
122    fn emit_thresholds_does_not_panic() {
123        let config = BatchProcessingConfig::default();
124        // metrics macros are no-ops when no recorder is installed.
125        emit_thresholds(&config);
126    }
127
128    #[test]
129    fn register_returns_handles() {
130        let manager = MetricsManager::new_for_test("test_engine_metrics_handles");
131        let config = BatchProcessingConfig::default();
132        // Calling twice should be idempotent.
133        register(&manager, &config);
134        register(&manager, &config);
135    }
136}