1mod local_drain;
2mod network_drain;
3mod writer;
4
5use std::{
6 cell::RefCell,
7 collections::BTreeMap,
8 io::{self, Write},
9 net::SocketAddr,
10 str,
11 time::Instant,
12};
13
14use mio::net::UdpSocket;
15
16use sozu_command::proto::command::{
17 FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
18};
19
20use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
21
22thread_local! {
23 pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
24}
25
26#[derive(thiserror::Error, Debug)]
27pub enum MetricError {
28 #[error("Could not parse udp address {address}: {error}")]
29 WrongUdpAddress { address: String, error: String },
30 #[error("Could not bind to udp address {address}: {error}")]
31 UdpBind { address: String, error: String },
32 #[error("No metrics found for object with id {0}")]
33 NoMetrics(String),
34 #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
35 HistogramCreation {
36 time_metric: MetricValue,
37 error: String,
38 },
39 #[error("could not record time metric {time_metric:?}: {error}")]
40 TimeMetricRecordingError {
41 time_metric: MetricValue,
42 error: String,
43 },
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum MetricValue {
48 Gauge(usize),
49 GaugeAdd(i64),
50 Count(i64),
51 Time(usize),
52}
53
54impl MetricValue {
55 fn is_time(&self) -> bool {
56 matches!(self, &MetricValue::Time(_))
57 }
58
59 fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
60 match (self, m) {
61 (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
62 let changed = *v1 != v2;
63 *v1 = v2;
64 changed
65 }
66 (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
67 debug_assert!(
68 *v1 as i64 + v2 >= 0,
69 "metric {key} underflow: previous value: {v1}, adding: {v2}"
70 );
71 let changed = v2 != 0;
72 let res = *v1 as i64 + v2;
73 *v1 = if res >= 0 {
74 res as usize
75 } else {
76 error!(
77 "metric {} underflow: previous value: {}, adding: {}",
78 key, v1, v2
79 );
80 0
81 };
82
83 changed
84 }
85 (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
86 let changed = v2 != 0;
87 *v1 += v2;
88 changed
89 }
90 (s, m) => panic!(
91 "tried to update metric {key} of value {s:?} with an incompatible metric: {m:?}"
92 ),
93 }
94 }
95}
96
97#[derive(Debug, Clone)]
98pub struct StoredMetricValue {
99 last_sent: Instant,
100 updated: bool,
101 data: MetricValue,
102}
103
104impl StoredMetricValue {
105 pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
106 StoredMetricValue {
107 last_sent,
108 updated: true,
109 data: if let MetricValue::GaugeAdd(v) = data {
110 if v >= 0 {
111 MetricValue::Gauge(v as usize)
112 } else {
113 MetricValue::Gauge(0)
114 }
115 } else {
116 data
117 },
118 }
119 }
120
121 pub fn update(&mut self, key: &'static str, m: MetricValue) {
122 let updated = self.data.update(key, m);
123 if !self.updated {
124 self.updated = updated;
125 }
126 }
127}
128
129pub fn setup<O: Into<String>>(
130 metrics_host: &SocketAddr,
131 origin: O,
132 use_tagged_metrics: bool,
133 prefix: Option<String>,
134) -> Result<(), MetricError> {
135 let metrics_socket = udp_bind()?;
136
137 debug!(
138 "setting up metrics: local address = {:#?}",
139 metrics_socket.local_addr()
140 );
141
142 METRICS.with(|metrics| {
143 if let Some(p) = prefix {
144 (*metrics.borrow_mut()).set_up_prefix(p);
145 }
146 (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
147 (*metrics.borrow_mut()).set_up_origin(origin.into());
148 (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
149 });
150 Ok(())
151}
152
153pub trait Subscriber {
154 fn receive_metric(
155 &mut self,
156 label: &'static str,
157 cluster_id: Option<&str>,
158 backend_id: Option<&str>,
159 metric: MetricValue,
160 );
161}
162
163pub struct Aggregator {
164 prefix: String,
166 network: Option<NetworkDrain>,
168 local: LocalDrain,
170}
171
172impl Aggregator {
173 pub fn new(prefix: String) -> Aggregator {
174 Aggregator {
175 prefix: prefix.clone(),
176 network: None,
177 local: LocalDrain::new(prefix),
178 }
179 }
180
181 pub fn set_up_prefix(&mut self, prefix: String) {
182 self.prefix = prefix;
183 }
184
185 pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
186 self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
187 }
188
189 pub fn set_up_origin(&mut self, origin: String) {
190 if let Some(n) = self.network.as_mut() {
191 n.origin = origin;
192 }
193 }
194
195 pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
196 if let Some(n) = self.network.as_mut() {
197 n.use_tagged_metrics = tagged;
198 }
199 }
200
201 pub fn socket(&self) -> Option<&UdpSocket> {
202 self.network.as_ref().map(|n| &n.remote.get_ref().socket)
203 }
204
205 pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
206 self.network
207 .as_mut()
208 .map(|n| &mut n.remote.get_mut().socket)
209 }
210
211 pub fn count_add(&mut self, key: &'static str, count_value: i64) {
212 self.receive_metric(key, None, None, MetricValue::Count(count_value));
213 }
214
215 pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
216 self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
217 }
218
219 pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
220 self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
221 }
222
223 pub fn writable(&mut self) {
224 if let Some(ref mut net) = self.network.as_mut() {
225 net.writable();
226 }
227 }
228
229 pub fn send_data(&mut self) {
230 if let Some(ref mut net) = self.network.as_mut() {
231 net.send_metrics();
232 }
233 }
234
235 pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
236 self.local.dump_proxy_metrics(&Vec::new())
237 }
238
239 pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
240 self.local.query(q)
241 }
242
243 pub fn clear_local(&mut self) {
244 self.local.clear();
245 }
246
247 pub fn configure(&mut self, config: &MetricsConfiguration) {
248 self.local.configure(config);
249 }
250}
251
252impl Subscriber for Aggregator {
253 fn receive_metric(
254 &mut self,
255 label: &'static str,
256 cluster_id: Option<&str>,
257 backend_id: Option<&str>,
258 metric: MetricValue,
259 ) {
260 if let Some(ref mut net) = self.network.as_mut() {
261 net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
262 }
263 self.local
264 .receive_metric(label, cluster_id, backend_id, metric);
265 }
266}
267
268pub struct MetricSocket {
269 pub addr: SocketAddr,
270 pub socket: UdpSocket,
271}
272
273impl Write for MetricSocket {
274 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
275 self.socket.send_to(buf, self.addr)
276 }
277
278 fn flush(&mut self) -> io::Result<()> {
279 Ok(())
280 }
281}
282
283pub fn udp_bind() -> Result<UdpSocket, MetricError> {
284 let address = "0.0.0.0:0";
285
286 let udp_address =
287 address
288 .parse::<SocketAddr>()
289 .map_err(|parse_error| MetricError::WrongUdpAddress {
290 address: address.to_owned(),
291 error: parse_error.to_string(),
292 })?;
293
294 UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
295 address: udp_address.to_string(),
296 error: parse_error.to_string(),
297 })
298}
299
300#[macro_export]
302macro_rules! count (
303 ($key:expr, $value: expr) => ({
304 let v = $value;
305 $crate::metrics::METRICS.with(|metrics| {
306 (*metrics.borrow_mut()).count_add($key, v);
307 });
308 })
309);
310
311#[macro_export]
313macro_rules! incr (
314 ($key:expr) => (count!($key, 1));
315 ($key:expr, $cluster_id:expr, $backend_id:expr) => {
316 {
317 use $crate::metrics::Subscriber;
318
319 $crate::metrics::METRICS.with(|metrics| {
320 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
321 });
322 }
323 }
324);
325
326#[macro_export]
327macro_rules! decr (
328 ($key:expr) => (count!($key, -1))
329);
330
331#[macro_export]
332macro_rules! gauge (
333 ($key:expr, $value: expr) => ({
334 let v = $value;
335 $crate::metrics::METRICS.with(|metrics| {
336 (*metrics.borrow_mut()).set_gauge($key, v);
337 });
338 })
339);
340
341#[macro_export]
342macro_rules! gauge_add (
343 ($key:expr, $value: expr) => ({
344 let v = $value;
345 $crate::metrics::METRICS.with(|metrics| {
346 (*metrics.borrow_mut()).gauge_add($key, v);
347 });
348 });
349 ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
350 {
351 use $crate::metrics::Subscriber;
352 let v = $value;
353
354 $crate::metrics::METRICS.with(|metrics| {
355 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
356 });
357 }
358 }
359);
360
361#[macro_export]
362macro_rules! time (
363 ($key:expr, $value: expr) => ({
364 use $crate::metrics::{MetricValue,Subscriber};
365 let v = $value;
366 $crate::metrics::METRICS.with(|metrics| {
367 let m = &mut *metrics.borrow_mut();
368
369 m.receive_metric($key, None, None, MetricValue::Time(v as usize));
370 });
371 });
372 ($key:expr, $cluster_id:expr, $value: expr) => ({
373 use $crate::metrics::{MetricValue,Subscriber};
374 let v = $value;
375 $crate::metrics::METRICS.with(|metrics| {
376 let m = &mut *metrics.borrow_mut();
377 let cluster: &str = $cluster_id;
378
379 m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
380 });
381 })
382);
383
384#[macro_export]
385macro_rules! record_backend_metrics (
386 ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
387 use $crate::metrics::{MetricValue,Subscriber};
388 $crate::metrics::METRICS.with(|metrics| {
389 let m = &mut *metrics.borrow_mut();
390 let cluster_id: &str = $cluster_id;
391 let backend_id: &str = $backend_id;
392
393 m.receive_metric("bytes_in", Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
394 m.receive_metric("bytes_out", Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
395 m.receive_metric("backend_response_time", Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
396 if let Some(t) = $backend_connection_time {
397 m.receive_metric("backend_connection_time", Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
398 }
399
400 m.receive_metric("requests", Some(cluster_id), Some(backend_id), MetricValue::Count(1));
401 });
402 }
403);