hyperi_rustlib/worker/engine/
metrics.rs1use crate::metrics::MetricsManager;
10
11use super::config::BatchProcessingConfig;
12
13pub fn register(manager: &MetricsManager, config: &BatchProcessingConfig) {
19 let _ = manager.counter(
21 "batch_engine_messages_received_total",
22 "Messages received from transport",
23 );
24 let _ = manager.counter(
25 "batch_engine_messages_parsed_total",
26 "Messages successfully SIMD-parsed",
27 );
28 let _ = manager.counter(
29 "batch_engine_messages_filtered_total",
30 "Messages filtered at pre-route",
31 );
32 let _ = manager.counter("batch_engine_messages_dlq_total", "Messages routed to DLQ");
33 let _ = manager.counter("batch_engine_parse_errors_total", "Parse failures");
34
35 let _ = manager.histogram(
37 "batch_engine_parse_duration_seconds",
38 "SIMD parse time per chunk",
39 );
40 let _ = manager.histogram(
41 "batch_engine_transform_duration_seconds",
42 "App transform time per chunk",
43 );
44 let _ = manager.histogram("batch_engine_chunk_size", "Actual items per chunk");
45 let _ = manager.histogram(
46 "batch_engine_pre_route_duration_seconds",
47 "Pre-route extraction time per chunk",
48 );
49
50 let _ = manager.gauge(
52 "batch_engine_intern_table_size",
53 "Interned field name count",
54 );
55
56 #[cfg(feature = "governor")]
59 {
60 let _ = manager.gauge(
65 "self_regulation_byte_budget",
66 "Current AIMD byte budget (inbound block size lever)",
67 );
68 let _ = manager.gauge(
69 "self_regulation_recv_block_bytes",
70 "Bytes in the most recent received block (vs the byte budget)",
71 );
72 let _ = manager.gauge(
73 "self_regulation_pressure_ratio",
74 "Combined self-regulation pressure level (0.0-1.0)",
75 );
76 let _ = manager.gauge(
77 "self_regulation_inbound_paused",
78 "1 while the inbound gate is holding under pressure, else 0 (per source)",
79 );
80 let _ = manager.counter(
81 "self_regulation_inbound_pauses_total",
82 "Inbound gate pause (rising-edge) events (per source)",
83 );
84 let _ = manager.counter(
85 "self_regulation_kafka_gate_errors_total",
86 "Kafka pause/resume actuator failures (brake degraded)",
87 );
88 }
89
90 emit_thresholds(config);
92}
93
94pub fn emit_thresholds(config: &BatchProcessingConfig) {
100 metrics::gauge!("batch_engine_max_chunk_size").set(config.max_chunk_size as f64);
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 #[test]
108 fn register_does_not_panic() {
109 let manager = MetricsManager::new_for_test("test_engine_metrics");
110 let config = BatchProcessingConfig::default();
111 register(&manager, &config);
113 }
114
115 #[test]
116 fn emit_thresholds_does_not_panic() {
117 let config = BatchProcessingConfig::default();
118 emit_thresholds(&config);
120 }
121
122 #[test]
123 fn register_returns_handles() {
124 let manager = MetricsManager::new_for_test("test_engine_metrics_handles");
125 let config = BatchProcessingConfig::default();
126 register(&manager, &config);
128 register(&manager, &config);
129 }
130}