Skip to main content

memfaultd/metrics/statsd_server/
mod.rs

1//
2// Copyright (c) Memfault, Inc.
3// See License.txt for details
4use std::net::SocketAddr;
5use std::net::UdpSocket;
6
7use eyre::Result;
8use log::warn;
9
10use crate::metrics::KeyedMetricReading;
11
12use super::MetricReading;
13use super::MetricsMBox;
14
15pub struct StatsDServer {
16    legacy_gauge_aggregation: bool,
17    legacy_key_names: bool,
18    metrics_mailbox: MetricsMBox,
19}
20
21impl StatsDServer {
22    pub fn new(
23        legacy_gauge_aggregation: bool,
24        legacy_key_names: bool,
25        metrics_mailbox: MetricsMBox,
26    ) -> StatsDServer {
27        StatsDServer {
28            legacy_gauge_aggregation,
29            legacy_key_names,
30            metrics_mailbox,
31        }
32    }
33
34    pub fn run(&self, listening_address: SocketAddr) -> Result<()> {
35        let socket = UdpSocket::bind(listening_address)?;
36        loop {
37            // This means that packets with > 1432 bytes are NOT supported
38            // Clients must enforce a maximum message size of 1432 bytes or less
39            let mut buf = [0; 1432];
40            match socket.recv(&mut buf) {
41                Ok(amt) => {
42                    let message = String::from_utf8_lossy(&buf[..amt]);
43                    self.process_statsd_message(&message)
44                }
45                Err(e) => warn!("Statsd server socket error: {}", e),
46            }
47        }
48    }
49
50    fn process_statsd_message(&self, message: &str) {
51        // https://github.com/statsd/statsd/blob/master/docs/server.md
52        // From statsd spec:
53        // Multiple metrics can be received in a single packet if separated by the \n character.
54        let metric_readings = message
55            .trim()
56            .lines()
57            .map(|line| KeyedMetricReading::from_statsd_str(line, self.legacy_key_names))
58            // Drop strings that couldn't be parsed as a KeyedMetricReading
59            .filter_map(|res| {
60                if let Err(e) = &res {
61                    warn!("{}", e)
62                };
63
64                if self.legacy_gauge_aggregation {
65                    match res {
66                        // If legacy_gauge_aggregation is enabled, convert
67                        // Gauges to Histograms on ingestion
68                        Ok(KeyedMetricReading {
69                            name,
70                            value: MetricReading::Gauge { value, timestamp },
71                        }) => Some(KeyedMetricReading {
72                            name,
73                            value: MetricReading::Histogram { value, timestamp },
74                        }),
75                        // legacy_gauge_aggretation has no affect on non-Gauge
76                        // readings
77                        _ => res.ok(),
78                    }
79                } else {
80                    res.ok()
81                }
82            })
83            .collect();
84
85        if let Err(e) = self.metrics_mailbox.send_and_forget(metric_readings) {
86            warn!("Error adding metric sent to StatsD server: {}", e);
87        }
88    }
89}
90
91#[cfg(test)]
92mod test {
93    use crate::metrics::TakeMetrics;
94    use insta::{assert_json_snapshot, with_settings};
95    use rstest::{fixture, rstest};
96    use ssf::ServiceMock;
97
98    use super::*;
99
100    #[rstest]
101    #[case("test_counter:1|c", "test_gauge:2.0|g", "test_simple")]
102    #[case("test-counter:1|c", "test-gauge:2.0|g", "test_simple_dashes")]
103    #[case("test_counter:1|c", "test_counter:1|c", "test_counter_aggregation")]
104    #[case(
105        "test_counter:1|c\ntest_gauge:2.0|g",
106        "test_counter:1|c\ntest_gauge:10.0|g",
107        "test_counter_and_gauge_aggregation"
108    )]
109    #[case(
110        "test_histo:100|h\ntest_another_histo:20.0|h",
111        "test_one_more_histo:35|h\ntest_another_histo:1000.0|h",
112        "test_histogram_aggregation"
113    )]
114    fn test_process_statsd_message(
115        #[case] statsd_message_a: &str,
116        #[case] statsd_message_b: &str,
117        #[case] test_name: &str,
118        mut fixture: Fixture,
119    ) {
120        // Process first StatsD test message
121        fixture.server.process_statsd_message(statsd_message_a);
122
123        // Process second StatsD test message
124        fixture.server.process_statsd_message(statsd_message_b);
125        with_settings!({sort_maps => true}, {
126        assert_json_snapshot!(test_name, fixture.mock.take_metrics().unwrap());
127        });
128    }
129
130    #[rstest]
131    #[case("test-gauge:1|g", "test-gauge:2.0|g", "test_legacy_gauge_aggregation")]
132    #[case(
133        "test_counter:1|c\ntest_gauge:2.0|g",
134        "test_counter:1|c\ntest_gauge:10.0|g",
135        "test_counter_and_legacy_gauge_aggregation"
136    )]
137    fn test_process_statsd_message_with_legacy_gauge_aggregation(
138        #[case] statsd_message_a: &str,
139        #[case] statsd_message_b: &str,
140        #[case] test_name: &str,
141        mut fixture_legacy_gauge_aggregation: Fixture,
142    ) {
143        // Process first StatsD test message
144        fixture_legacy_gauge_aggregation
145            .server
146            .process_statsd_message(statsd_message_a);
147
148        // Process second StatsD test message
149        fixture_legacy_gauge_aggregation
150            .server
151            .process_statsd_message(statsd_message_b);
152        with_settings!({sort_maps => true}, {
153        assert_json_snapshot!(test_name, fixture_legacy_gauge_aggregation.mock.take_metrics().unwrap());
154        });
155    }
156
157    struct Fixture {
158        server: StatsDServer,
159        mock: ServiceMock<Vec<KeyedMetricReading>>,
160    }
161
162    #[fixture]
163    fn fixture() -> Fixture {
164        let mock = ServiceMock::new();
165        let server = StatsDServer::new(false, false, mock.mbox.clone());
166
167        Fixture { server, mock }
168    }
169
170    #[fixture]
171    fn fixture_legacy_gauge_aggregation() -> Fixture {
172        let mock = ServiceMock::new();
173        let server = StatsDServer::new(true, false, mock.mbox.clone());
174
175        Fixture { server, mock }
176    }
177}