sozu_lib/metrics/
mod.rs

1mod local_drain;
2mod network_drain;
3mod writer;
4
5use std::{
6    cell::RefCell,
7    collections::BTreeMap,
8    io::{self, Write},
9    net::SocketAddr,
10    str,
11    time::Instant,
12};
13
14use mio::net::UdpSocket;
15use sozu_command::proto::command::{
16    FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
17};
18
19use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
20
21thread_local! {
22  pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
23}
24
25#[derive(thiserror::Error, Debug)]
26pub enum MetricError {
27    #[error("Could not parse udp address {address}: {error}")]
28    WrongUdpAddress { address: String, error: String },
29    #[error("Could not bind to udp address {address}: {error}")]
30    UdpBind { address: String, error: String },
31    #[error("No metrics found for object with id {0}")]
32    NoMetrics(String),
33    #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
34    HistogramCreation {
35        time_metric: MetricValue,
36        error: String,
37    },
38    #[error("could not record time metric {time_metric:?}: {error}")]
39    TimeMetricRecordingError {
40        time_metric: MetricValue,
41        error: String,
42    },
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum MetricValue {
47    Gauge(usize),
48    GaugeAdd(i64),
49    Count(i64),
50    Time(usize),
51}
52
53impl MetricValue {
54    fn is_time(&self) -> bool {
55        matches!(self, &MetricValue::Time(_))
56    }
57
58    fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
59        match (self, m) {
60            (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
61                let changed = *v1 != v2;
62                *v1 = v2;
63                changed
64            }
65            (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
66                debug_assert!(
67                    *v1 as i64 + v2 >= 0,
68                    "metric {key} underflow: previous value: {v1}, adding: {v2}"
69                );
70                let changed = v2 != 0;
71                let res = *v1 as i64 + v2;
72                *v1 = if res >= 0 {
73                    res as usize
74                } else {
75                    error!(
76                        "metric {} underflow: previous value: {}, adding: {}",
77                        key, v1, v2
78                    );
79                    0
80                };
81
82                changed
83            }
84            (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
85                let changed = v2 != 0;
86                *v1 += v2;
87                changed
88            }
89            (s, m) => panic!(
90                "tried to update metric {key} of value {s:?} with an incompatible metric: {m:?}"
91            ),
92        }
93    }
94}
95
96#[derive(Debug, Clone)]
97pub struct StoredMetricValue {
98    last_sent: Instant,
99    updated: bool,
100    data: MetricValue,
101}
102
103impl StoredMetricValue {
104    pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
105        StoredMetricValue {
106            last_sent,
107            updated: true,
108            data: if let MetricValue::GaugeAdd(v) = data {
109                if v >= 0 {
110                    MetricValue::Gauge(v as usize)
111                } else {
112                    MetricValue::Gauge(0)
113                }
114            } else {
115                data
116            },
117        }
118    }
119
120    pub fn update(&mut self, key: &'static str, m: MetricValue) {
121        let updated = self.data.update(key, m);
122        if !self.updated {
123            self.updated = updated;
124        }
125    }
126}
127
128pub fn setup<O: Into<String>>(
129    metrics_host: &SocketAddr,
130    origin: O,
131    use_tagged_metrics: bool,
132    prefix: Option<String>,
133) -> Result<(), MetricError> {
134    let metrics_socket = udp_bind()?;
135
136    debug!(
137        "setting up metrics: local address = {:#?}",
138        metrics_socket.local_addr()
139    );
140
141    METRICS.with(|metrics| {
142        if let Some(p) = prefix {
143            (*metrics.borrow_mut()).set_up_prefix(p);
144        }
145        (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
146        (*metrics.borrow_mut()).set_up_origin(origin.into());
147        (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
148    });
149    Ok(())
150}
151
152pub trait Subscriber {
153    fn receive_metric(
154        &mut self,
155        label: &'static str,
156        cluster_id: Option<&str>,
157        backend_id: Option<&str>,
158        metric: MetricValue,
159    );
160}
161
162pub struct Aggregator {
163    /// appended to metric keys, usually "sozu-"
164    prefix: String,
165    /// gathers metrics and sends them on a UDP socket
166    network: Option<NetworkDrain>,
167    /// gather metrics locally, queried by the CLI
168    local: LocalDrain,
169}
170
171impl Aggregator {
172    pub fn new(prefix: String) -> Aggregator {
173        Aggregator {
174            prefix: prefix.clone(),
175            network: None,
176            local: LocalDrain::new(prefix),
177        }
178    }
179
180    pub fn set_up_prefix(&mut self, prefix: String) {
181        self.prefix = prefix;
182    }
183
184    pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
185        self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
186    }
187
188    pub fn set_up_origin(&mut self, origin: String) {
189        if let Some(n) = self.network.as_mut() {
190            n.origin = origin;
191        }
192    }
193
194    pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
195        if let Some(n) = self.network.as_mut() {
196            n.use_tagged_metrics = tagged;
197        }
198    }
199
200    pub fn socket(&self) -> Option<&UdpSocket> {
201        self.network.as_ref().map(|n| &n.remote.get_ref().socket)
202    }
203
204    pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
205        self.network
206            .as_mut()
207            .map(|n| &mut n.remote.get_mut().socket)
208    }
209
210    pub fn count_add(&mut self, key: &'static str, count_value: i64) {
211        self.receive_metric(key, None, None, MetricValue::Count(count_value));
212    }
213
214    pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
215        self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
216    }
217
218    pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
219        self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
220    }
221
222    pub fn writable(&mut self) {
223        if let Some(ref mut net) = self.network.as_mut() {
224            net.writable();
225        }
226    }
227
228    pub fn send_data(&mut self) {
229        if let Some(ref mut net) = self.network.as_mut() {
230            net.send_metrics();
231        }
232    }
233
234    pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
235        self.local.dump_proxy_metrics(&Vec::new())
236    }
237
238    pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
239        self.local.query(q)
240    }
241
242    pub fn clear_local(&mut self) {
243        self.local.clear();
244    }
245
246    pub fn configure(&mut self, config: &MetricsConfiguration) {
247        self.local.configure(config);
248    }
249}
250
251impl Subscriber for Aggregator {
252    fn receive_metric(
253        &mut self,
254        label: &'static str,
255        cluster_id: Option<&str>,
256        backend_id: Option<&str>,
257        metric: MetricValue,
258    ) {
259        if let Some(ref mut net) = self.network.as_mut() {
260            net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
261        }
262        self.local
263            .receive_metric(label, cluster_id, backend_id, metric);
264    }
265}
266
267pub struct MetricSocket {
268    pub addr: SocketAddr,
269    pub socket: UdpSocket,
270}
271
272impl Write for MetricSocket {
273    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
274        self.socket.send_to(buf, self.addr)
275    }
276
277    fn flush(&mut self) -> io::Result<()> {
278        Ok(())
279    }
280}
281
282pub fn udp_bind() -> Result<UdpSocket, MetricError> {
283    let address = "0.0.0.0:0";
284
285    let udp_address =
286        address
287            .parse::<SocketAddr>()
288            .map_err(|parse_error| MetricError::WrongUdpAddress {
289                address: address.to_owned(),
290                error: parse_error.to_string(),
291            })?;
292
293    UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
294        address: udp_address.to_string(),
295        error: parse_error.to_string(),
296    })
297}
298
299/// adds a value to a counter
300#[macro_export]
301macro_rules! count (
302  ($key:expr, $value: expr) => ({
303    let v = $value;
304    $crate::metrics::METRICS.with(|metrics| {
305      (*metrics.borrow_mut()).count_add($key, v);
306    });
307  })
308);
309
310/// adds 1 to a counter
311#[macro_export]
312macro_rules! incr (
313  ($key:expr) => (count!($key, 1));
314  ($key:expr, $cluster_id:expr, $backend_id:expr) => {
315    {
316        use $crate::metrics::Subscriber;
317
318        $crate::metrics::METRICS.with(|metrics| {
319          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
320        });
321    }
322  }
323);
324
325#[macro_export]
326macro_rules! decr (
327  ($key:expr) => (count!($key, -1))
328);
329
330#[macro_export]
331macro_rules! gauge (
332  ($key:expr, $value: expr) => ({
333    let v = $value;
334    $crate::metrics::METRICS.with(|metrics| {
335      (*metrics.borrow_mut()).set_gauge($key, v);
336    });
337  })
338);
339
340#[macro_export]
341macro_rules! gauge_add (
342  ($key:expr, $value: expr) => ({
343    let v = $value;
344    $crate::metrics::METRICS.with(|metrics| {
345      (*metrics.borrow_mut()).gauge_add($key, v);
346    });
347  });
348  ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
349    {
350        use $crate::metrics::Subscriber;
351        let v = $value;
352
353        $crate::metrics::METRICS.with(|metrics| {
354          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
355        });
356    }
357  }
358);
359
360#[macro_export]
361macro_rules! time (
362  ($key:expr, $value: expr) => ({
363    use $crate::metrics::{MetricValue,Subscriber};
364    let v = $value;
365    $crate::metrics::METRICS.with(|metrics| {
366      let m = &mut *metrics.borrow_mut();
367
368      m.receive_metric($key, None, None, MetricValue::Time(v as usize));
369    });
370  });
371  ($key:expr, $cluster_id:expr, $value: expr) => ({
372    use $crate::metrics::{MetricValue,Subscriber};
373    let v = $value;
374    $crate::metrics::METRICS.with(|metrics| {
375      let m = &mut *metrics.borrow_mut();
376      let cluster: &str = $cluster_id;
377
378      m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
379    });
380  })
381);
382
383#[macro_export]
384macro_rules! record_backend_metrics (
385  ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
386    use $crate::metrics::{MetricValue,Subscriber};
387    $crate::metrics::METRICS.with(|metrics| {
388      let m = &mut *metrics.borrow_mut();
389      let cluster_id: &str = $cluster_id;
390      let backend_id: &str = $backend_id;
391
392      m.receive_metric("bytes_in", Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
393      m.receive_metric("bytes_out", Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
394      m.receive_metric("backend_response_time", Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
395      if let Some(t) = $backend_connection_time {
396        m.receive_metric("backend_connection_time", Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
397      }
398
399      m.receive_metric("requests", Some(cluster_id), Some(backend_id), MetricValue::Count(1));
400    });
401  }
402);