metrics_exporter_tcp/
lib.rs

1//! A [`metrics`][metrics]-compatible exporter that outputs metrics to clients over TCP.
2//!
3//! This exporter creates a TCP server, that when connected to, will stream individual metrics to
4//! the client using a Protocol Buffers encoding.
5//!
6//! # Backpressure
7//! The exporter has configurable buffering, which allows users to trade off how many metrics they
8//! want to be queued up at any given time.  This buffer limit applies both to incoming metrics, as
9//! well as the individual buffers for each connected client.
10//!
11//! By default, the buffer limit is set at 1024 metrics.  When the incoming buffer -- metrics being
12//! fed to the exported -- is full, metrics will be dropped.  If a client's buffer is full,
13//! potentially due to slow network conditions or slow processing, then messages in the client's
14//! buffer will be dropped in FIFO order in order to allow the exporter to continue fanning out
15//! metrics to clients.
16//!
17//! If no buffer limit is set, then te exporter will ingest and enqueue as many metrics as possible,
18//! potentially up until the point of memory exhaustion.  A buffer limit is advised for this reason,
19//! even if it is many multiples of the default.
20//!
21//! # Encoding
22//! Metrics are encoded using Protocol Buffers.  The protocol file can be found in the repository at
23//! `proto/event.proto`.
24//!
25//! # Usage
26//! The TCP exporter can be constructed by creating a [`TcpBuilder`], configuring it as needed, and
27//! calling [`TcpBuilder::install`] to both spawn the TCP server as well as install the exporter
28//! globally.
29//!
30//! If necessary, the recorder itself can be returned so that it can be composed separately, while
31//! still installing the TCP server itself, by calling [`TcpBuilder::build`].
32//!
33//! ```
34//! # use metrics_exporter_tcp::TcpBuilder;
35//! # fn direct() {
36//! // Install the exporter directly:
37//! let builder = TcpBuilder::new();
38//! builder.install().expect("failed to install TCP exporter");
39//!
40//! // Or install the TCP server and get the recorder:
41//! let builder = TcpBuilder::new();
42//! let recorder = builder.build().expect("failed to install TCP exporter");
43//! # }
44//! ```
45//!
46//! [metrics]: https://docs.rs/metrics
47#![deny(missing_docs)]
48#![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))]
49use std::io::{self, Write};
50use std::net::SocketAddr;
51use std::sync::{
52    atomic::{AtomicBool, Ordering},
53    Arc,
54};
55use std::thread;
56use std::time::SystemTime;
57use std::{
58    collections::{BTreeMap, HashMap, VecDeque},
59    sync::atomic::AtomicUsize,
60};
61
62use bytes::Bytes;
63use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
64use metrics::{
65    Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, Recorder,
66    SetRecorderError, SharedString, Unit,
67};
68use mio::{
69    net::{TcpListener, TcpStream},
70    Events, Interest, Poll, Token, Waker,
71};
72use prost::{EncodeError, Message};
73use tracing::{error, trace, trace_span};
74
75const WAKER: Token = Token(0);
76const LISTENER: Token = Token(1);
77const START_TOKEN: Token = Token(2);
78const CLIENT_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE);
79
80mod proto {
81    include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
82}
83
84use self::proto::metadata::MetricType;
85
86enum MetricOperation {
87    IncrementCounter(u64),
88    SetCounter(u64),
89    IncrementGauge(f64),
90    DecrementGauge(f64),
91    SetGauge(f64),
92    RecordHistogram(f64),
93}
94
95enum Event {
96    Metadata(KeyName, MetricType, Option<Unit>, SharedString),
97    Metric(Key, MetricOperation),
98}
99
100/// Errors that could occur while installing a TCP recorder/exporter.
101#[derive(Debug)]
102pub enum Error {
103    /// Creating the networking event loop did not succeed.
104    Io(io::Error),
105
106    /// Installing the recorder did not succeed.
107    Recorder(SetRecorderError<TcpRecorder>),
108}
109
110impl From<io::Error> for Error {
111    fn from(e: io::Error) -> Self {
112        Error::Io(e)
113    }
114}
115
116impl From<SetRecorderError<TcpRecorder>> for Error {
117    fn from(e: SetRecorderError<TcpRecorder>) -> Self {
118        Error::Recorder(e)
119    }
120}
121
122impl std::fmt::Display for Error {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        match self {
125            Error::Io(e) => write!(f, "IO error: {}", e),
126            Error::Recorder(e) => write!(f, "recorder error: {}", e),
127        }
128    }
129}
130
131impl std::error::Error for Error {
132    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
133        match self {
134            Error::Io(e) => Some(e),
135            Error::Recorder(e) => Some(e),
136        }
137    }
138}
139
140#[derive(Debug)]
141struct State {
142    client_count: AtomicUsize,
143    should_send: AtomicBool,
144    waker: Waker,
145    tx: Sender<Event>,
146}
147
148impl State {
149    pub fn new(waker: Waker, tx: Sender<Event>) -> State {
150        State { client_count: AtomicUsize::new(0), should_send: AtomicBool::new(false), waker, tx }
151    }
152
153    pub fn should_send(&self) -> bool {
154        self.should_send.load(Ordering::Acquire)
155    }
156
157    pub fn increment_clients(&self) {
158        self.client_count.fetch_add(1, Ordering::AcqRel);
159        self.should_send.store(true, Ordering::Release);
160    }
161
162    pub fn decrement_clients(&self) {
163        let count = self.client_count.fetch_sub(1, Ordering::AcqRel);
164        if count == 1 {
165            self.should_send.store(false, Ordering::Release);
166        }
167    }
168
169    fn register_metric(
170        &self,
171        key_name: KeyName,
172        metric_type: MetricType,
173        unit: Option<Unit>,
174        description: SharedString,
175    ) {
176        let _ = self.tx.try_send(Event::Metadata(key_name, metric_type, unit, description));
177        self.wake();
178    }
179
180    fn push_metric(&self, key: &Key, op: MetricOperation) {
181        if self.should_send() {
182            let _ = self.tx.try_send(Event::Metric(key.clone(), op));
183            self.wake();
184        }
185    }
186
187    pub fn wake(&self) {
188        let _ = self.waker.wake();
189    }
190}
191
192#[derive(Debug)]
193struct Handle {
194    key: Key,
195    state: Arc<State>,
196}
197
198impl Handle {
199    fn new(key: Key, state: Arc<State>) -> Handle {
200        Handle { key, state }
201    }
202}
203
204impl CounterFn for Handle {
205    fn increment(&self, value: u64) {
206        self.state.push_metric(&self.key, MetricOperation::IncrementCounter(value))
207    }
208
209    fn absolute(&self, value: u64) {
210        self.state.push_metric(&self.key, MetricOperation::SetCounter(value))
211    }
212}
213
214impl GaugeFn for Handle {
215    fn increment(&self, value: f64) {
216        self.state.push_metric(&self.key, MetricOperation::IncrementGauge(value))
217    }
218
219    fn decrement(&self, value: f64) {
220        self.state.push_metric(&self.key, MetricOperation::DecrementGauge(value))
221    }
222
223    fn set(&self, value: f64) {
224        self.state.push_metric(&self.key, MetricOperation::SetGauge(value))
225    }
226}
227
228impl HistogramFn for Handle {
229    fn record(&self, value: f64) {
230        self.state.push_metric(&self.key, MetricOperation::RecordHistogram(value))
231    }
232}
233
234/// A TCP recorder.
235#[derive(Debug)]
236pub struct TcpRecorder {
237    state: Arc<State>,
238}
239
240/// Builder for creating and installing a TCP recorder/exporter.
241#[derive(Debug)]
242pub struct TcpBuilder {
243    listen_addr: SocketAddr,
244    buffer_size: Option<usize>,
245}
246
247impl TcpBuilder {
248    /// Creates a new `TcpBuilder`.
249    pub fn new() -> TcpBuilder {
250        TcpBuilder { listen_addr: ([0, 0, 0, 0], 5000).into(), buffer_size: Some(1024) }
251    }
252
253    /// Sets the listen address.
254    ///
255    /// The exporter will accept connections on this address and immediately begin forwarding
256    /// metrics to the client.
257    ///
258    /// Defaults to `0.0.0.0:5000`.
259    pub fn listen_address<A>(mut self, addr: A) -> TcpBuilder
260    where
261        A: Into<SocketAddr>,
262    {
263        self.listen_addr = addr.into();
264        self
265    }
266
267    /// Sets the buffer size for internal operations.
268    ///
269    /// The buffer size controls two operational aspects: the number of metrics processed
270    /// per iteration of the event loop, and the number of buffered metrics each client
271    /// can hold.
272    ///
273    /// This setting allows trading off responsiveness for throughput, where a smaller buffer
274    /// size will ensure that metrics are pushed to clients sooner, versus a larger buffer
275    /// size that allows us to push more at a time.
276    ///
277    /// As well, the larger the buffer, the more messages a client can temporarily hold.
278    /// Clients have a circular buffer implementation so if their buffers are full, metrics
279    /// will be dropped as necessary to avoid backpressure in the recorder.
280    pub fn buffer_size(mut self, size: Option<usize>) -> TcpBuilder {
281        self.buffer_size = size;
282        self
283    }
284
285    /// Installs the recorder and exporter.
286    ///
287    /// An error will be returned if there's an issue with creating the TCP server or with
288    /// installing the recorder as the global recorder.
289    pub fn install(self) -> Result<(), Error> {
290        let recorder = self.build()?;
291        metrics::set_global_recorder(recorder).map_err(Into::into)
292    }
293
294    /// Builds and installs the exporter, but returns the recorder.
295    ///
296    /// In most cases, users should prefer to use [`TcpBuilder::install`] to create and install
297    /// the recorder and exporter automatically for them. If a caller is combining recorders,
298    /// however, then this method allows the caller the flexibility to do so.
299    pub fn build(self) -> Result<TcpRecorder, Error> {
300        let buffer_size = self.buffer_size;
301        let (tx, rx) = match buffer_size {
302            None => unbounded(),
303            Some(size) => bounded(size),
304        };
305
306        let poll = Poll::new()?;
307        let waker = Waker::new(poll.registry(), WAKER)?;
308
309        let mut listener = TcpListener::bind(self.listen_addr)?;
310        poll.registry().register(&mut listener, LISTENER, Interest::READABLE)?;
311
312        let state = Arc::new(State::new(waker, tx));
313        let recorder = TcpRecorder { state: state.clone() };
314
315        thread::spawn(move || run_transport(poll, listener, rx, state, buffer_size));
316        Ok(recorder)
317    }
318}
319
320impl Default for TcpBuilder {
321    fn default() -> Self {
322        TcpBuilder::new()
323    }
324}
325
326impl Recorder for TcpRecorder {
327    fn describe_counter(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
328        self.state.register_metric(key_name, MetricType::Counter, unit, description);
329    }
330
331    fn describe_gauge(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
332        self.state.register_metric(key_name, MetricType::Gauge, unit, description);
333    }
334
335    fn describe_histogram(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
336        self.state.register_metric(key_name, MetricType::Histogram, unit, description);
337    }
338
339    fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
340        Counter::from_arc(Arc::new(Handle::new(key.clone(), self.state.clone())))
341    }
342
343    fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
344        Gauge::from_arc(Arc::new(Handle::new(key.clone(), self.state.clone())))
345    }
346
347    fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
348        Histogram::from_arc(Arc::new(Handle::new(key.clone(), self.state.clone())))
349    }
350}
351
352#[allow(clippy::mutable_key_type)]
353fn run_transport(
354    mut poll: Poll,
355    listener: TcpListener,
356    rx: Receiver<Event>,
357    state: Arc<State>,
358    buffer_size: Option<usize>,
359) {
360    let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
361    let mut events = Events::with_capacity(1024);
362    let mut clients = HashMap::new();
363    let mut clients_to_remove = Vec::new();
364    let mut metadata = HashMap::new();
365    let mut next_token = START_TOKEN;
366    let mut buffered_pmsgs = VecDeque::with_capacity(buffer_limit);
367
368    loop {
369        let _span = trace_span!("transport");
370
371        // Poll until we get something.  All events -- metrics wake-ups and network I/O -- flow
372        // through here so we can block without issue.
373        let _evspan = trace_span!("event loop");
374        if let Err(e) = poll.poll(&mut events, None) {
375            error!(error = %e, "error during poll");
376            continue;
377        }
378        drop(_evspan);
379
380        // Technically, this is an abuse of size_hint() but Mio will return the number of events
381        // for both parts of the tuple.
382        trace!(events = events.iter().size_hint().0, "return from poll");
383
384        let _pspan = trace_span!("process events");
385        for event in events.iter() {
386            match event.token() {
387                WAKER => {
388                    // Read until we hit our buffer limit or there are no more messages.
389                    let _mrxspan = trace_span!("metrics in");
390                    loop {
391                        if buffered_pmsgs.len() >= buffer_limit {
392                            // We didn't drain ourselves here, so schedule a future wake so we
393                            // continue to drain remaining metrics.
394                            state.wake();
395                            break;
396                        }
397
398                        let msg = match rx.try_recv() {
399                            Ok(msg) => msg,
400                            Err(e) if e.is_empty() => {
401                                trace!("metric rx drained");
402                                break;
403                            }
404                            // If our sender is dead, we can't do anything else, so just return.
405                            Err(_) => return,
406                        };
407
408                        match msg {
409                            Event::Metadata(key, metric_type, unit, desc) => {
410                                let entry = metadata
411                                    .entry(key)
412                                    .or_insert_with(|| (metric_type, None, None));
413                                let (_, uentry, dentry) = entry;
414                                *uentry = unit;
415                                *dentry = Some(desc);
416                            }
417                            Event::Metric(key, value) => {
418                                match convert_metric_to_protobuf_encoded(key, value) {
419                                    Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
420                                    Err(e) => error!(error = ?e, "error encoding metric"),
421                                }
422                            }
423                        }
424                    }
425                    drop(_mrxspan);
426
427                    if buffered_pmsgs.is_empty() {
428                        trace!("woken for metrics but no pmsgs buffered");
429                        continue;
430                    }
431
432                    // Now fan out each of these items to each client.
433                    for (token, (conn, wbuf, msgs)) in clients.iter_mut() {
434                        // Before we potentially do any draining, try and drive the connection to
435                        // make sure space is freed up as much as possible.
436                        let done = drive_connection(conn, wbuf, msgs);
437                        if done {
438                            clients_to_remove.push(*token);
439                            state.decrement_clients();
440                            continue;
441                        }
442
443                        // With the encoded metrics, we push them into each client's internal
444                        // list.  We try to write as many of those buffers as possible to the
445                        // client before being told to back off.  If we encounter a partial write
446                        // of a buffer, we store the remaining of that message in a special field
447                        // so that we don't write incomplete metrics to the client.
448                        //
449                        // If there are more messages to hand off to a client than the client's
450                        // internal list has room for, we remove as many as needed to do so.  This
451                        // means we prioritize sending newer metrics if connections are backed up.
452                        let available =
453                            if msgs.len() < buffer_limit { buffer_limit - msgs.len() } else { 0 };
454                        let to_drain = buffered_pmsgs.len().saturating_sub(available);
455                        let _ = msgs.drain(0..to_drain);
456                        msgs.extend(buffered_pmsgs.iter().take(buffer_limit).cloned());
457
458                        let done = drive_connection(conn, wbuf, msgs);
459                        if done {
460                            clients_to_remove.push(*token);
461                            state.decrement_clients();
462                        }
463                    }
464
465                    // We've pushed each metric into each client's internal list, so we can clear
466                    // ourselves and continue on.
467                    buffered_pmsgs.clear();
468
469                    // Remove any clients that were done.
470                    for token in clients_to_remove.drain(..) {
471                        if let Some((conn, _, _)) = clients.get_mut(&token) {
472                            trace!(?conn, ?token, "removing client");
473                            clients.remove(&token);
474                            state.decrement_clients();
475                        }
476                    }
477                }
478                LISTENER => {
479                    // Accept as many new connections as we can.
480                    loop {
481                        match listener.accept() {
482                            Ok((mut conn, _)) => {
483                                // Get our client's token and register the connection.
484                                let token = next(&mut next_token);
485                                poll.registry()
486                                    .register(&mut conn, token, CLIENT_INTEREST)
487                                    .expect("failed to register interest for client connection");
488
489                                state.increment_clients();
490
491                                // Start tracking them, and enqueue all of the metadata.
492                                let metadata = generate_metadata_messages(&metadata);
493                                clients
494                                    .insert(token, (conn, None, metadata))
495                                    .ok_or(())
496                                    .expect_err("client mapped to existing token!");
497                            }
498                            Err(ref e) if would_block(e) => break,
499                            Err(e) => {
500                                error!("caught error while accepting client connections: {:?}", e);
501                                return;
502                            }
503                        }
504                    }
505                }
506                token => {
507                    if event.is_writable() {
508                        if let Some((conn, wbuf, msgs)) = clients.get_mut(&token) {
509                            let done = drive_connection(conn, wbuf, msgs);
510                            if done {
511                                trace!(?conn, ?token, "removing client");
512                                clients.remove(&token);
513                                state.decrement_clients();
514                            }
515                        }
516                    }
517                }
518            }
519        }
520    }
521}
522
523#[allow(clippy::mutable_key_type)]
524fn generate_metadata_messages(
525    metadata: &HashMap<KeyName, (MetricType, Option<Unit>, Option<SharedString>)>,
526) -> VecDeque<Bytes> {
527    let mut bufs = VecDeque::new();
528    for (key_name, (metric_type, unit, desc)) in metadata.iter() {
529        let msg =
530            convert_metadata_to_protobuf_encoded(key_name, *metric_type, *unit, desc.as_ref())
531                .expect("failed to encode metadata buffer");
532        bufs.push_back(msg);
533    }
534    bufs
535}
536
537#[tracing::instrument(skip(wbuf, msgs))]
538fn drive_connection(
539    conn: &mut TcpStream,
540    wbuf: &mut Option<Bytes>,
541    msgs: &mut VecDeque<Bytes>,
542) -> bool {
543    trace!(?conn, "driving client");
544    loop {
545        let mut buf = match wbuf.take() {
546            // Send the leftover buffer first, if we have one.
547            Some(buf) => buf,
548            None => match msgs.pop_front() {
549                Some(msg) => msg,
550                None => {
551                    trace!("client write queue drained");
552                    return false;
553                }
554            },
555        };
556
557        match conn.write(&buf) {
558            // Zero write = client closed their connection, so remove 'em.
559            Ok(0) => {
560                trace!(?conn, "zero write, closing client");
561                return true;
562            }
563            Ok(n) if n < buf.len() => {
564                // We sent part of the buffer, but not everything.  Keep track of the remaining
565                // chunk of the buffer.  TODO: do we need to reregister ourselves to track writable
566                // status??
567                let remaining = buf.split_off(n);
568                trace!(?conn, written = n, remaining = remaining.len(), "partial write");
569                wbuf.replace(remaining);
570                return false;
571            }
572            Ok(_) => continue,
573            Err(ref e) if would_block(e) => return false,
574            Err(ref e) if interrupted(e) => return drive_connection(conn, wbuf, msgs),
575            Err(e) => {
576                error!(?conn, error = %e, "write failed");
577                return true;
578            }
579        }
580    }
581}
582
583fn convert_metadata_to_protobuf_encoded(
584    key_name: &KeyName,
585    metric_type: MetricType,
586    unit: Option<Unit>,
587    desc: Option<&SharedString>,
588) -> Result<Bytes, EncodeError> {
589    let name = key_name.as_str().to_string();
590    let metadata = proto::Metadata {
591        name,
592        metric_type: metric_type.into(),
593        unit: unit.map(|u| proto::metadata::Unit::UnitValue(u.as_str().to_owned())),
594        description: desc.map(|d| proto::metadata::Description::DescriptionValue(d.to_string())),
595    };
596    let event = proto::Event { event: Some(proto::event::Event::Metadata(metadata)) };
597
598    let mut buf = Vec::new();
599    event.encode_length_delimited(&mut buf)?;
600    Ok(Bytes::from(buf))
601}
602
603fn convert_metric_to_protobuf_encoded(
604    key: Key,
605    operation: MetricOperation,
606) -> Result<Bytes, EncodeError> {
607    let name = key.name().to_string();
608    let labels = key
609        .labels()
610        .map(|label| (label.key().to_owned(), label.value().to_owned()))
611        .collect::<BTreeMap<_, _>>();
612    let operation = match operation {
613        MetricOperation::IncrementCounter(v) => proto::metric::Operation::IncrementCounter(v),
614        MetricOperation::SetCounter(v) => proto::metric::Operation::SetCounter(v),
615        MetricOperation::IncrementGauge(v) => proto::metric::Operation::IncrementGauge(v),
616        MetricOperation::DecrementGauge(v) => proto::metric::Operation::DecrementGauge(v),
617        MetricOperation::SetGauge(v) => proto::metric::Operation::SetGauge(v),
618        MetricOperation::RecordHistogram(v) => proto::metric::Operation::RecordHistogram(v),
619    };
620
621    let now: prost_types::Timestamp = SystemTime::now().into();
622    let metric = proto::Metric { name, labels, timestamp: Some(now), operation: Some(operation) };
623    let event = proto::Event { event: Some(proto::event::Event::Metric(metric)) };
624
625    let mut buf = Vec::new();
626    event.encode_length_delimited(&mut buf)?;
627    Ok(Bytes::from(buf))
628}
629
630fn next(current: &mut Token) -> Token {
631    let next = current.0;
632    current.0 += 1;
633    Token(next)
634}
635
636fn would_block(err: &io::Error) -> bool {
637    err.kind() == io::ErrorKind::WouldBlock
638}
639
640fn interrupted(err: &io::Error) -> bool {
641    err.kind() == io::ErrorKind::Interrupted
642}