memfaultd/metrics/statsd_server/
mod.rs1use std::net::SocketAddr;
5use std::net::UdpSocket;
6
7use eyre::Result;
8use log::warn;
9
10use crate::metrics::KeyedMetricReading;
11
12use super::MetricReading;
13use super::MetricsMBox;
14
15pub struct StatsDServer {
16 legacy_gauge_aggregation: bool,
17 legacy_key_names: bool,
18 metrics_mailbox: MetricsMBox,
19}
20
21impl StatsDServer {
22 pub fn new(
23 legacy_gauge_aggregation: bool,
24 legacy_key_names: bool,
25 metrics_mailbox: MetricsMBox,
26 ) -> StatsDServer {
27 StatsDServer {
28 legacy_gauge_aggregation,
29 legacy_key_names,
30 metrics_mailbox,
31 }
32 }
33
34 pub fn run(&self, listening_address: SocketAddr) -> Result<()> {
35 let socket = UdpSocket::bind(listening_address)?;
36 loop {
37 let mut buf = [0; 1432];
40 match socket.recv(&mut buf) {
41 Ok(amt) => {
42 let message = String::from_utf8_lossy(&buf[..amt]);
43 self.process_statsd_message(&message)
44 }
45 Err(e) => warn!("Statsd server socket error: {}", e),
46 }
47 }
48 }
49
50 fn process_statsd_message(&self, message: &str) {
51 let metric_readings = message
55 .trim()
56 .lines()
57 .map(|line| KeyedMetricReading::from_statsd_str(line, self.legacy_key_names))
58 .filter_map(|res| {
60 if let Err(e) = &res {
61 warn!("{}", e)
62 };
63
64 if self.legacy_gauge_aggregation {
65 match res {
66 Ok(KeyedMetricReading {
69 name,
70 value: MetricReading::Gauge { value, timestamp },
71 }) => Some(KeyedMetricReading {
72 name,
73 value: MetricReading::Histogram { value, timestamp },
74 }),
75 _ => res.ok(),
78 }
79 } else {
80 res.ok()
81 }
82 })
83 .collect();
84
85 if let Err(e) = self.metrics_mailbox.send_and_forget(metric_readings) {
86 warn!("Error adding metric sent to StatsD server: {}", e);
87 }
88 }
89}
90
91#[cfg(test)]
92mod test {
93 use crate::metrics::TakeMetrics;
94 use insta::{assert_json_snapshot, with_settings};
95 use rstest::{fixture, rstest};
96 use ssf::ServiceMock;
97
98 use super::*;
99
100 #[rstest]
101 #[case("test_counter:1|c", "test_gauge:2.0|g", "test_simple")]
102 #[case("test-counter:1|c", "test-gauge:2.0|g", "test_simple_dashes")]
103 #[case("test_counter:1|c", "test_counter:1|c", "test_counter_aggregation")]
104 #[case(
105 "test_counter:1|c\ntest_gauge:2.0|g",
106 "test_counter:1|c\ntest_gauge:10.0|g",
107 "test_counter_and_gauge_aggregation"
108 )]
109 #[case(
110 "test_histo:100|h\ntest_another_histo:20.0|h",
111 "test_one_more_histo:35|h\ntest_another_histo:1000.0|h",
112 "test_histogram_aggregation"
113 )]
114 fn test_process_statsd_message(
115 #[case] statsd_message_a: &str,
116 #[case] statsd_message_b: &str,
117 #[case] test_name: &str,
118 mut fixture: Fixture,
119 ) {
120 fixture.server.process_statsd_message(statsd_message_a);
122
123 fixture.server.process_statsd_message(statsd_message_b);
125 with_settings!({sort_maps => true}, {
126 assert_json_snapshot!(test_name, fixture.mock.take_metrics().unwrap());
127 });
128 }
129
130 #[rstest]
131 #[case("test-gauge:1|g", "test-gauge:2.0|g", "test_legacy_gauge_aggregation")]
132 #[case(
133 "test_counter:1|c\ntest_gauge:2.0|g",
134 "test_counter:1|c\ntest_gauge:10.0|g",
135 "test_counter_and_legacy_gauge_aggregation"
136 )]
137 fn test_process_statsd_message_with_legacy_gauge_aggregation(
138 #[case] statsd_message_a: &str,
139 #[case] statsd_message_b: &str,
140 #[case] test_name: &str,
141 mut fixture_legacy_gauge_aggregation: Fixture,
142 ) {
143 fixture_legacy_gauge_aggregation
145 .server
146 .process_statsd_message(statsd_message_a);
147
148 fixture_legacy_gauge_aggregation
150 .server
151 .process_statsd_message(statsd_message_b);
152 with_settings!({sort_maps => true}, {
153 assert_json_snapshot!(test_name, fixture_legacy_gauge_aggregation.mock.take_metrics().unwrap());
154 });
155 }
156
157 struct Fixture {
158 server: StatsDServer,
159 mock: ServiceMock<Vec<KeyedMetricReading>>,
160 }
161
162 #[fixture]
163 fn fixture() -> Fixture {
164 let mock = ServiceMock::new();
165 let server = StatsDServer::new(false, false, mock.mbox.clone());
166
167 Fixture { server, mock }
168 }
169
170 #[fixture]
171 fn fixture_legacy_gauge_aggregation() -> Fixture {
172 let mock = ServiceMock::new();
173 let server = StatsDServer::new(true, false, mock.mbox.clone());
174
175 Fixture { server, mock }
176 }
177}