worldinterface_daemon/metrics/
recorder.rs1use std::sync::Arc;
4
5use worldinterface_core::metrics::MetricsRecorder;
6
7use super::WiMetricsRegistry;
8
9pub struct PrometheusMetricsRecorder {
13 registry: Arc<WiMetricsRegistry>,
14}
15
16impl PrometheusMetricsRecorder {
17 pub fn new(registry: Arc<WiMetricsRegistry>) -> Self {
18 Self { registry }
19 }
20}
21
22impl MetricsRecorder for PrometheusMetricsRecorder {
23 fn record_step_completed(&self, connector: &str, duration_secs: f64) {
24 let c = self.registry.collectors();
25 c.step_runs_total.with_label_values(&[connector, "success"]).inc();
26 c.step_duration_seconds.with_label_values(&[connector]).observe(duration_secs);
27 }
28
29 fn record_step_failed(&self, connector: &str) {
30 self.registry.collectors().step_runs_total.with_label_values(&[connector, "failure"]).inc();
31 }
32
33 fn record_connector_invocation(&self, connector: &str) {
34 self.registry
35 .collectors()
36 .connector_invocations_total
37 .with_label_values(&[connector])
38 .inc();
39 }
40
41 fn record_contextstore_write(&self) {
42 self.registry.collectors().contextstore_writes_total.inc();
43 }
44
45 fn record_flow_completed(&self) {
46 self.registry.collectors().flow_runs_total.with_label_values(&["completed"]).inc();
47 }
48
49 fn record_flow_failed(&self) {
50 self.registry.collectors().flow_runs_total.with_label_values(&["failed"]).inc();
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57
58 fn make_recorder() -> (Arc<WiMetricsRegistry>, PrometheusMetricsRecorder) {
59 let registry = Arc::new(WiMetricsRegistry::new().unwrap());
60 let recorder = PrometheusMetricsRecorder::new(Arc::clone(®istry));
61 (registry, recorder)
62 }
63
64 #[test]
65 fn record_step_completed_increments_counter_and_histogram() {
66 let (registry, recorder) = make_recorder();
67 recorder.record_step_completed("delay", 0.5);
68 let text = registry.encode_text().unwrap();
69 assert!(text.contains("wi_step_runs_total{connector=\"delay\",status=\"success\"} 1"));
70 assert!(text.contains("wi_step_duration_seconds_count{connector=\"delay\"} 1"));
71 }
72
73 #[test]
74 fn record_step_failed_increments_counter() {
75 let (registry, recorder) = make_recorder();
76 recorder.record_step_failed("http.request");
77 let text = registry.encode_text().unwrap();
78 assert!(
79 text.contains("wi_step_runs_total{connector=\"http.request\",status=\"failure\"} 1")
80 );
81 }
82
83 #[test]
84 fn record_connector_invocation_increments() {
85 let (registry, recorder) = make_recorder();
86 recorder.record_connector_invocation("fs.write");
87 let text = registry.encode_text().unwrap();
88 assert!(text.contains("wi_connector_invocations_total{connector=\"fs.write\"} 1"));
89 }
90
91 #[test]
92 fn record_contextstore_write_increments() {
93 let (registry, recorder) = make_recorder();
94 recorder.record_contextstore_write();
95 let text = registry.encode_text().unwrap();
96 assert!(text.contains("wi_contextstore_writes_total 1"));
97 }
98
99 #[test]
100 fn record_flow_completed_increments() {
101 let (registry, recorder) = make_recorder();
102 recorder.record_flow_completed();
103 let text = registry.encode_text().unwrap();
104 assert!(text.contains("wi_flow_runs_total{status=\"completed\"} 1"));
105 }
106
107 #[test]
108 fn record_flow_failed_increments() {
109 let (registry, recorder) = make_recorder();
110 recorder.record_flow_failed();
111 let text = registry.encode_text().unwrap();
112 assert!(text.contains("wi_flow_runs_total{status=\"failed\"} 1"));
113 }
114}