sozu_lib/metrics/
mod.rs

1mod local_drain;
2mod network_drain;
3mod writer;
4
5use std::{
6    cell::RefCell,
7    collections::BTreeMap,
8    io::{self, Write},
9    net::SocketAddr,
10    str,
11    time::Instant,
12};
13
14use mio::net::UdpSocket;
15
16use sozu_command::proto::command::{
17    FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
18};
19
20use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
21
22thread_local! {
23  pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
24}
25
26#[derive(thiserror::Error, Debug)]
27pub enum MetricError {
28    #[error("Could not parse udp address {address}: {error}")]
29    WrongUdpAddress { address: String, error: String },
30    #[error("Could not bind to udp address {address}: {error}")]
31    UdpBind { address: String, error: String },
32    #[error("No metrics found for object with id {0}")]
33    NoMetrics(String),
34    #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
35    HistogramCreation {
36        time_metric: MetricValue,
37        error: String,
38    },
39    #[error("could not record time metric {time_metric:?}: {error}")]
40    TimeMetricRecordingError {
41        time_metric: MetricValue,
42        error: String,
43    },
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum MetricValue {
48    Gauge(usize),
49    GaugeAdd(i64),
50    Count(i64),
51    Time(usize),
52}
53
54impl MetricValue {
55    fn is_time(&self) -> bool {
56        matches!(self, &MetricValue::Time(_))
57    }
58
59    fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
60        match (self, m) {
61            (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
62                let changed = *v1 != v2;
63                *v1 = v2;
64                changed
65            }
66            (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
67                debug_assert!(
68                    *v1 as i64 + v2 >= 0,
69                    "metric {key} underflow: previous value: {v1}, adding: {v2}"
70                );
71                let changed = v2 != 0;
72                let res = *v1 as i64 + v2;
73                *v1 = if res >= 0 {
74                    res as usize
75                } else {
76                    error!(
77                        "metric {} underflow: previous value: {}, adding: {}",
78                        key, v1, v2
79                    );
80                    0
81                };
82
83                changed
84            }
85            (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
86                let changed = v2 != 0;
87                *v1 += v2;
88                changed
89            }
90            (s, m) => panic!(
91                "tried to update metric {key} of value {s:?} with an incompatible metric: {m:?}"
92            ),
93        }
94    }
95}
96
97#[derive(Debug, Clone)]
98pub struct StoredMetricValue {
99    last_sent: Instant,
100    updated: bool,
101    data: MetricValue,
102}
103
104impl StoredMetricValue {
105    pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
106        StoredMetricValue {
107            last_sent,
108            updated: true,
109            data: if let MetricValue::GaugeAdd(v) = data {
110                if v >= 0 {
111                    MetricValue::Gauge(v as usize)
112                } else {
113                    MetricValue::Gauge(0)
114                }
115            } else {
116                data
117            },
118        }
119    }
120
121    pub fn update(&mut self, key: &'static str, m: MetricValue) {
122        let updated = self.data.update(key, m);
123        if !self.updated {
124            self.updated = updated;
125        }
126    }
127}
128
129pub fn setup<O: Into<String>>(
130    metrics_host: &SocketAddr,
131    origin: O,
132    use_tagged_metrics: bool,
133    prefix: Option<String>,
134) -> Result<(), MetricError> {
135    let metrics_socket = udp_bind()?;
136
137    debug!(
138        "setting up metrics: local address = {:#?}",
139        metrics_socket.local_addr()
140    );
141
142    METRICS.with(|metrics| {
143        if let Some(p) = prefix {
144            (*metrics.borrow_mut()).set_up_prefix(p);
145        }
146        (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
147        (*metrics.borrow_mut()).set_up_origin(origin.into());
148        (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
149    });
150    Ok(())
151}
152
153pub trait Subscriber {
154    fn receive_metric(
155        &mut self,
156        label: &'static str,
157        cluster_id: Option<&str>,
158        backend_id: Option<&str>,
159        metric: MetricValue,
160    );
161}
162
163pub struct Aggregator {
164    /// appended to metric keys, usually "sozu-"
165    prefix: String,
166    /// gathers metrics and sends them on a UDP socket
167    network: Option<NetworkDrain>,
168    /// gather metrics locally, queried by the CLI
169    local: LocalDrain,
170}
171
172impl Aggregator {
173    pub fn new(prefix: String) -> Aggregator {
174        Aggregator {
175            prefix: prefix.clone(),
176            network: None,
177            local: LocalDrain::new(prefix),
178        }
179    }
180
181    pub fn set_up_prefix(&mut self, prefix: String) {
182        self.prefix = prefix;
183    }
184
185    pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
186        self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
187    }
188
189    pub fn set_up_origin(&mut self, origin: String) {
190        if let Some(n) = self.network.as_mut() {
191            n.origin = origin;
192        }
193    }
194
195    pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
196        if let Some(n) = self.network.as_mut() {
197            n.use_tagged_metrics = tagged;
198        }
199    }
200
201    pub fn socket(&self) -> Option<&UdpSocket> {
202        self.network.as_ref().map(|n| &n.remote.get_ref().socket)
203    }
204
205    pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
206        self.network
207            .as_mut()
208            .map(|n| &mut n.remote.get_mut().socket)
209    }
210
211    pub fn count_add(&mut self, key: &'static str, count_value: i64) {
212        self.receive_metric(key, None, None, MetricValue::Count(count_value));
213    }
214
215    pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
216        self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
217    }
218
219    pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
220        self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
221    }
222
223    pub fn writable(&mut self) {
224        if let Some(ref mut net) = self.network.as_mut() {
225            net.writable();
226        }
227    }
228
229    pub fn send_data(&mut self) {
230        if let Some(ref mut net) = self.network.as_mut() {
231            net.send_metrics();
232        }
233    }
234
235    pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
236        self.local.dump_proxy_metrics(&Vec::new())
237    }
238
239    pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
240        self.local.query(q)
241    }
242
243    pub fn clear_local(&mut self) {
244        self.local.clear();
245    }
246
247    pub fn configure(&mut self, config: &MetricsConfiguration) {
248        self.local.configure(config);
249    }
250}
251
252impl Subscriber for Aggregator {
253    fn receive_metric(
254        &mut self,
255        label: &'static str,
256        cluster_id: Option<&str>,
257        backend_id: Option<&str>,
258        metric: MetricValue,
259    ) {
260        if let Some(ref mut net) = self.network.as_mut() {
261            net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
262        }
263        self.local
264            .receive_metric(label, cluster_id, backend_id, metric);
265    }
266}
267
268pub struct MetricSocket {
269    pub addr: SocketAddr,
270    pub socket: UdpSocket,
271}
272
273impl Write for MetricSocket {
274    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
275        self.socket.send_to(buf, self.addr)
276    }
277
278    fn flush(&mut self) -> io::Result<()> {
279        Ok(())
280    }
281}
282
283pub fn udp_bind() -> Result<UdpSocket, MetricError> {
284    let address = "0.0.0.0:0";
285
286    let udp_address =
287        address
288            .parse::<SocketAddr>()
289            .map_err(|parse_error| MetricError::WrongUdpAddress {
290                address: address.to_owned(),
291                error: parse_error.to_string(),
292            })?;
293
294    UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
295        address: udp_address.to_string(),
296        error: parse_error.to_string(),
297    })
298}
299
300/// adds a value to a counter
301#[macro_export]
302macro_rules! count (
303  ($key:expr, $value: expr) => ({
304    let v = $value;
305    $crate::metrics::METRICS.with(|metrics| {
306      (*metrics.borrow_mut()).count_add($key, v);
307    });
308  })
309);
310
311/// adds 1 to a counter
312#[macro_export]
313macro_rules! incr (
314  ($key:expr) => (count!($key, 1));
315  ($key:expr, $cluster_id:expr, $backend_id:expr) => {
316    {
317        use $crate::metrics::Subscriber;
318
319        $crate::metrics::METRICS.with(|metrics| {
320          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
321        });
322    }
323  }
324);
325
326#[macro_export]
327macro_rules! decr (
328  ($key:expr) => (count!($key, -1))
329);
330
331#[macro_export]
332macro_rules! gauge (
333  ($key:expr, $value: expr) => ({
334    let v = $value;
335    $crate::metrics::METRICS.with(|metrics| {
336      (*metrics.borrow_mut()).set_gauge($key, v);
337    });
338  })
339);
340
341#[macro_export]
342macro_rules! gauge_add (
343  ($key:expr, $value: expr) => ({
344    let v = $value;
345    $crate::metrics::METRICS.with(|metrics| {
346      (*metrics.borrow_mut()).gauge_add($key, v);
347    });
348  });
349  ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
350    {
351        use $crate::metrics::Subscriber;
352        let v = $value;
353
354        $crate::metrics::METRICS.with(|metrics| {
355          (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
356        });
357    }
358  }
359);
360
361#[macro_export]
362macro_rules! time (
363  ($key:expr, $value: expr) => ({
364    use $crate::metrics::{MetricValue,Subscriber};
365    let v = $value;
366    $crate::metrics::METRICS.with(|metrics| {
367      let m = &mut *metrics.borrow_mut();
368
369      m.receive_metric($key, None, None, MetricValue::Time(v as usize));
370    });
371  });
372  ($key:expr, $cluster_id:expr, $value: expr) => ({
373    use $crate::metrics::{MetricValue,Subscriber};
374    let v = $value;
375    $crate::metrics::METRICS.with(|metrics| {
376      let m = &mut *metrics.borrow_mut();
377      let cluster: &str = $cluster_id;
378
379      m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
380    });
381  })
382);
383
384#[macro_export]
385macro_rules! record_backend_metrics (
386  ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
387    use $crate::metrics::{MetricValue,Subscriber};
388    $crate::metrics::METRICS.with(|metrics| {
389      let m = &mut *metrics.borrow_mut();
390      let cluster_id: &str = $cluster_id;
391      let backend_id: &str = $backend_id;
392
393      m.receive_metric("bytes_in", Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
394      m.receive_metric("bytes_out", Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
395      m.receive_metric("backend_response_time", Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
396      if let Some(t) = $backend_connection_time {
397        m.receive_metric("backend_connection_time", Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
398      }
399
400      m.receive_metric("requests", Some(cluster_id), Some(backend_id), MetricValue::Count(1));
401    });
402  }
403);