1mod local_drain;
2mod network_drain;
3mod writer;
4
5use std::{
6 cell::RefCell,
7 collections::BTreeMap,
8 io::{self, Write},
9 net::SocketAddr,
10 str,
11 time::Instant,
12};
13
14use mio::net::UdpSocket;
15use sozu_command::proto::command::{
16 FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
17};
18
19use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
20
21thread_local! {
22 pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
23}
24
25#[derive(thiserror::Error, Debug)]
26pub enum MetricError {
27 #[error("Could not parse udp address {address}: {error}")]
28 WrongUdpAddress { address: String, error: String },
29 #[error("Could not bind to udp address {address}: {error}")]
30 UdpBind { address: String, error: String },
31 #[error("No metrics found for object with id {0}")]
32 NoMetrics(String),
33 #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
34 HistogramCreation {
35 time_metric: MetricValue,
36 error: String,
37 },
38 #[error("could not record time metric {time_metric:?}: {error}")]
39 TimeMetricRecordingError {
40 time_metric: MetricValue,
41 error: String,
42 },
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum MetricValue {
47 Gauge(usize),
48 GaugeAdd(i64),
49 Count(i64),
50 Time(usize),
51}
52
53impl MetricValue {
54 fn is_time(&self) -> bool {
55 matches!(self, &MetricValue::Time(_))
56 }
57
58 fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
59 match (self, m) {
60 (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
61 let changed = *v1 != v2;
62 *v1 = v2;
63 changed
64 }
65 (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
66 debug_assert!(
67 *v1 as i64 + v2 >= 0,
68 "metric {key} underflow: previous value: {v1}, adding: {v2}"
69 );
70 let changed = v2 != 0;
71 let res = *v1 as i64 + v2;
72 *v1 = if res >= 0 {
73 res as usize
74 } else {
75 error!(
76 "metric {} underflow: previous value: {}, adding: {}",
77 key, v1, v2
78 );
79 0
80 };
81
82 changed
83 }
84 (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
85 let changed = v2 != 0;
86 *v1 += v2;
87 changed
88 }
89 (s, m) => panic!(
90 "tried to update metric {key} of value {s:?} with an incompatible metric: {m:?}"
91 ),
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct StoredMetricValue {
98 last_sent: Instant,
99 updated: bool,
100 data: MetricValue,
101}
102
103impl StoredMetricValue {
104 pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
105 StoredMetricValue {
106 last_sent,
107 updated: true,
108 data: if let MetricValue::GaugeAdd(v) = data {
109 if v >= 0 {
110 MetricValue::Gauge(v as usize)
111 } else {
112 MetricValue::Gauge(0)
113 }
114 } else {
115 data
116 },
117 }
118 }
119
120 pub fn update(&mut self, key: &'static str, m: MetricValue) {
121 let updated = self.data.update(key, m);
122 if !self.updated {
123 self.updated = updated;
124 }
125 }
126}
127
128pub fn setup<O: Into<String>>(
129 metrics_host: &SocketAddr,
130 origin: O,
131 use_tagged_metrics: bool,
132 prefix: Option<String>,
133) -> Result<(), MetricError> {
134 let metrics_socket = udp_bind()?;
135
136 debug!(
137 "setting up metrics: local address = {:#?}",
138 metrics_socket.local_addr()
139 );
140
141 METRICS.with(|metrics| {
142 if let Some(p) = prefix {
143 (*metrics.borrow_mut()).set_up_prefix(p);
144 }
145 (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
146 (*metrics.borrow_mut()).set_up_origin(origin.into());
147 (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
148 });
149 Ok(())
150}
151
152pub trait Subscriber {
153 fn receive_metric(
154 &mut self,
155 label: &'static str,
156 cluster_id: Option<&str>,
157 backend_id: Option<&str>,
158 metric: MetricValue,
159 );
160}
161
162pub struct Aggregator {
163 prefix: String,
165 network: Option<NetworkDrain>,
167 local: LocalDrain,
169}
170
171impl Aggregator {
172 pub fn new(prefix: String) -> Aggregator {
173 Aggregator {
174 prefix: prefix.clone(),
175 network: None,
176 local: LocalDrain::new(prefix),
177 }
178 }
179
180 pub fn set_up_prefix(&mut self, prefix: String) {
181 self.prefix = prefix;
182 }
183
184 pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
185 self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
186 }
187
188 pub fn set_up_origin(&mut self, origin: String) {
189 if let Some(n) = self.network.as_mut() {
190 n.origin = origin;
191 }
192 }
193
194 pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
195 if let Some(n) = self.network.as_mut() {
196 n.use_tagged_metrics = tagged;
197 }
198 }
199
200 pub fn socket(&self) -> Option<&UdpSocket> {
201 self.network.as_ref().map(|n| &n.remote.get_ref().socket)
202 }
203
204 pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
205 self.network
206 .as_mut()
207 .map(|n| &mut n.remote.get_mut().socket)
208 }
209
210 pub fn count_add(&mut self, key: &'static str, count_value: i64) {
211 self.receive_metric(key, None, None, MetricValue::Count(count_value));
212 }
213
214 pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
215 self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
216 }
217
218 pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
219 self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
220 }
221
222 pub fn writable(&mut self) {
223 if let Some(ref mut net) = self.network.as_mut() {
224 net.writable();
225 }
226 }
227
228 pub fn send_data(&mut self) {
229 if let Some(ref mut net) = self.network.as_mut() {
230 net.send_metrics();
231 }
232 }
233
234 pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
235 self.local.dump_proxy_metrics(&Vec::new())
236 }
237
238 pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
239 self.local.query(q)
240 }
241
242 pub fn clear_local(&mut self) {
243 self.local.clear();
244 }
245
246 pub fn configure(&mut self, config: &MetricsConfiguration) {
247 self.local.configure(config);
248 }
249}
250
251impl Subscriber for Aggregator {
252 fn receive_metric(
253 &mut self,
254 label: &'static str,
255 cluster_id: Option<&str>,
256 backend_id: Option<&str>,
257 metric: MetricValue,
258 ) {
259 if let Some(ref mut net) = self.network.as_mut() {
260 net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
261 }
262 self.local
263 .receive_metric(label, cluster_id, backend_id, metric);
264 }
265}
266
267pub struct MetricSocket {
268 pub addr: SocketAddr,
269 pub socket: UdpSocket,
270}
271
272impl Write for MetricSocket {
273 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
274 self.socket.send_to(buf, self.addr)
275 }
276
277 fn flush(&mut self) -> io::Result<()> {
278 Ok(())
279 }
280}
281
282pub fn udp_bind() -> Result<UdpSocket, MetricError> {
283 let address = "0.0.0.0:0";
284
285 let udp_address =
286 address
287 .parse::<SocketAddr>()
288 .map_err(|parse_error| MetricError::WrongUdpAddress {
289 address: address.to_owned(),
290 error: parse_error.to_string(),
291 })?;
292
293 UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
294 address: udp_address.to_string(),
295 error: parse_error.to_string(),
296 })
297}
298
299#[macro_export]
301macro_rules! count (
302 ($key:expr, $value: expr) => ({
303 let v = $value;
304 $crate::metrics::METRICS.with(|metrics| {
305 (*metrics.borrow_mut()).count_add($key, v);
306 });
307 })
308);
309
310#[macro_export]
312macro_rules! incr (
313 ($key:expr) => (count!($key, 1));
314 ($key:expr, $cluster_id:expr, $backend_id:expr) => {
315 {
316 use $crate::metrics::Subscriber;
317
318 $crate::metrics::METRICS.with(|metrics| {
319 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
320 });
321 }
322 }
323);
324
325#[macro_export]
326macro_rules! decr (
327 ($key:expr) => (count!($key, -1))
328);
329
330#[macro_export]
331macro_rules! gauge (
332 ($key:expr, $value: expr) => ({
333 let v = $value;
334 $crate::metrics::METRICS.with(|metrics| {
335 (*metrics.borrow_mut()).set_gauge($key, v);
336 });
337 })
338);
339
340#[macro_export]
341macro_rules! gauge_add (
342 ($key:expr, $value: expr) => ({
343 let v = $value;
344 $crate::metrics::METRICS.with(|metrics| {
345 (*metrics.borrow_mut()).gauge_add($key, v);
346 });
347 });
348 ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
349 {
350 use $crate::metrics::Subscriber;
351 let v = $value;
352
353 $crate::metrics::METRICS.with(|metrics| {
354 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
355 });
356 }
357 }
358);
359
360#[macro_export]
361macro_rules! time (
362 ($key:expr, $value: expr) => ({
363 use $crate::metrics::{MetricValue,Subscriber};
364 let v = $value;
365 $crate::metrics::METRICS.with(|metrics| {
366 let m = &mut *metrics.borrow_mut();
367
368 m.receive_metric($key, None, None, MetricValue::Time(v as usize));
369 });
370 });
371 ($key:expr, $cluster_id:expr, $value: expr) => ({
372 use $crate::metrics::{MetricValue,Subscriber};
373 let v = $value;
374 $crate::metrics::METRICS.with(|metrics| {
375 let m = &mut *metrics.borrow_mut();
376 let cluster: &str = $cluster_id;
377
378 m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
379 });
380 })
381);
382
383#[macro_export]
384macro_rules! record_backend_metrics (
385 ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
386 use $crate::metrics::{MetricValue,Subscriber};
387 $crate::metrics::METRICS.with(|metrics| {
388 let m = &mut *metrics.borrow_mut();
389 let cluster_id: &str = $cluster_id;
390 let backend_id: &str = $backend_id;
391
392 m.receive_metric("bytes_in", Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
393 m.receive_metric("bytes_out", Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
394 m.receive_metric("backend_response_time", Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
395 if let Some(t) = $backend_connection_time {
396 m.receive_metric("backend_connection_time", Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
397 }
398
399 m.receive_metric("requests", Some(cluster_id), Some(backend_id), MetricValue::Count(1));
400 });
401 }
402);