hyperi_rustlib/worker/engine/
metrics.rs1use crate::metrics::MetricsManager;
10
11use super::config::BatchProcessingConfig;
12
13pub fn register(manager: &MetricsManager, config: &BatchProcessingConfig) {
19 let _ = manager.counter(
21 "batch_engine_messages_received_total",
22 "Messages received from transport",
23 );
24 let _ = manager.counter(
25 "batch_engine_messages_parsed_total",
26 "Messages successfully SIMD-parsed",
27 );
28 let _ = manager.counter(
29 "batch_engine_messages_filtered_total",
30 "Messages filtered at pre-route",
31 );
32 let _ = manager.counter("batch_engine_messages_dlq_total", "Messages routed to DLQ");
33 let _ = manager.counter("batch_engine_parse_errors_total", "Parse failures");
34
35 let _ = manager.histogram(
37 "batch_engine_parse_duration_seconds",
38 "SIMD parse time per chunk",
39 );
40 let _ = manager.histogram(
41 "batch_engine_transform_duration_seconds",
42 "App transform time per chunk",
43 );
44 let _ = manager.histogram_count("batch_engine_chunk_size", "Actual items per chunk");
45 let _ = manager.histogram(
46 "batch_engine_pre_route_duration_seconds",
47 "Pre-route extraction time per chunk",
48 );
49
50 let _ = manager.gauge(
52 "batch_engine_intern_table_size",
53 "Interned field name count",
54 );
55
56 #[cfg(feature = "governor")]
59 {
60 let _ = manager.gauge(
65 "self_regulation_byte_budget",
66 "Current AIMD byte budget (inbound block size lever)",
67 );
68 let _ = manager.gauge(
71 "self_regulation_byte_budget_bytes",
72 "Current AIMD byte budget in bytes (inbound block size lever)",
73 );
74 let _ = manager.gauge(
75 "self_regulation_recv_block_bytes",
76 "Bytes in the most recent received block (vs the byte budget)",
77 );
78 let _ = manager.gauge(
79 "self_regulation_pressure_ratio",
80 "Combined self-regulation pressure level (0.0-1.0)",
81 );
82 let _ = manager.gauge(
83 "self_regulation_inbound_paused",
84 "1 while the inbound gate is holding under pressure, else 0 (per source)",
85 );
86 let _ = manager.counter(
87 "self_regulation_inbound_pauses_total",
88 "Inbound gate pause (rising-edge) events (per source)",
89 );
90 let _ = manager.counter(
91 "self_regulation_kafka_gate_errors_total",
92 "Kafka pause/resume actuator failures (brake degraded)",
93 );
94 }
95
96 emit_thresholds(config);
98}
99
100pub fn emit_thresholds(config: &BatchProcessingConfig) {
106 metrics::gauge!("batch_engine_max_chunk_size").set(config.max_chunk_size as f64);
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[test]
114 fn register_does_not_panic() {
115 let manager = MetricsManager::new_for_test("test_engine_metrics");
116 let config = BatchProcessingConfig::default();
117 register(&manager, &config);
119 }
120
121 #[test]
122 fn emit_thresholds_does_not_panic() {
123 let config = BatchProcessingConfig::default();
124 emit_thresholds(&config);
126 }
127
128 #[test]
129 fn register_returns_handles() {
130 let manager = MetricsManager::new_for_test("test_engine_metrics_handles");
131 let config = BatchProcessingConfig::default();
132 register(&manager, &config);
134 register(&manager, &config);
135 }
136}