1#![deny(missing_docs)]
48#![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))]
49use std::io::{self, Write};
50use std::net::SocketAddr;
51use std::sync::{
52 atomic::{AtomicBool, Ordering},
53 Arc,
54};
55use std::thread;
56use std::time::SystemTime;
57use std::{
58 collections::{BTreeMap, HashMap, VecDeque},
59 sync::atomic::AtomicUsize,
60};
61
62use bytes::Bytes;
63use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
64use metrics::{
65 Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, Recorder,
66 SetRecorderError, SharedString, Unit,
67};
68use mio::{
69 net::{TcpListener, TcpStream},
70 Events, Interest, Poll, Token, Waker,
71};
72use prost::{EncodeError, Message};
73use tracing::{error, trace, trace_span};
74
75const WAKER: Token = Token(0);
76const LISTENER: Token = Token(1);
77const START_TOKEN: Token = Token(2);
78const CLIENT_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE);
79
80mod proto {
81 include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
82}
83
84use self::proto::metadata::MetricType;
85
86enum MetricOperation {
87 IncrementCounter(u64),
88 SetCounter(u64),
89 IncrementGauge(f64),
90 DecrementGauge(f64),
91 SetGauge(f64),
92 RecordHistogram(f64),
93}
94
95enum Event {
96 Metadata(KeyName, MetricType, Option<Unit>, SharedString),
97 Metric(Key, MetricOperation),
98}
99
100#[derive(Debug)]
102pub enum Error {
103 Io(io::Error),
105
106 Recorder(SetRecorderError<TcpRecorder>),
108}
109
110impl From<io::Error> for Error {
111 fn from(e: io::Error) -> Self {
112 Error::Io(e)
113 }
114}
115
116impl From<SetRecorderError<TcpRecorder>> for Error {
117 fn from(e: SetRecorderError<TcpRecorder>) -> Self {
118 Error::Recorder(e)
119 }
120}
121
122impl std::fmt::Display for Error {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 match self {
125 Error::Io(e) => write!(f, "IO error: {}", e),
126 Error::Recorder(e) => write!(f, "recorder error: {}", e),
127 }
128 }
129}
130
131impl std::error::Error for Error {
132 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
133 match self {
134 Error::Io(e) => Some(e),
135 Error::Recorder(e) => Some(e),
136 }
137 }
138}
139
140#[derive(Debug)]
141struct State {
142 client_count: AtomicUsize,
143 should_send: AtomicBool,
144 waker: Waker,
145 tx: Sender<Event>,
146}
147
148impl State {
149 pub fn new(waker: Waker, tx: Sender<Event>) -> State {
150 State { client_count: AtomicUsize::new(0), should_send: AtomicBool::new(false), waker, tx }
151 }
152
153 pub fn should_send(&self) -> bool {
154 self.should_send.load(Ordering::Acquire)
155 }
156
157 pub fn increment_clients(&self) {
158 self.client_count.fetch_add(1, Ordering::AcqRel);
159 self.should_send.store(true, Ordering::Release);
160 }
161
162 pub fn decrement_clients(&self) {
163 let count = self.client_count.fetch_sub(1, Ordering::AcqRel);
164 if count == 1 {
165 self.should_send.store(false, Ordering::Release);
166 }
167 }
168
169 fn register_metric(
170 &self,
171 key_name: KeyName,
172 metric_type: MetricType,
173 unit: Option<Unit>,
174 description: SharedString,
175 ) {
176 let _ = self.tx.try_send(Event::Metadata(key_name, metric_type, unit, description));
177 self.wake();
178 }
179
180 fn push_metric(&self, key: &Key, op: MetricOperation) {
181 if self.should_send() {
182 let _ = self.tx.try_send(Event::Metric(key.clone(), op));
183 self.wake();
184 }
185 }
186
187 pub fn wake(&self) {
188 let _ = self.waker.wake();
189 }
190}
191
192#[derive(Debug)]
193struct Handle {
194 key: Key,
195 state: Arc<State>,
196}
197
198impl Handle {
199 fn new(key: Key, state: Arc<State>) -> Handle {
200 Handle { key, state }
201 }
202}
203
204impl CounterFn for Handle {
205 fn increment(&self, value: u64) {
206 self.state.push_metric(&self.key, MetricOperation::IncrementCounter(value))
207 }
208
209 fn absolute(&self, value: u64) {
210 self.state.push_metric(&self.key, MetricOperation::SetCounter(value))
211 }
212}
213
214impl GaugeFn for Handle {
215 fn increment(&self, value: f64) {
216 self.state.push_metric(&self.key, MetricOperation::IncrementGauge(value))
217 }
218
219 fn decrement(&self, value: f64) {
220 self.state.push_metric(&self.key, MetricOperation::DecrementGauge(value))
221 }
222
223 fn set(&self, value: f64) {
224 self.state.push_metric(&self.key, MetricOperation::SetGauge(value))
225 }
226}
227
228impl HistogramFn for Handle {
229 fn record(&self, value: f64) {
230 self.state.push_metric(&self.key, MetricOperation::RecordHistogram(value))
231 }
232}
233
234#[derive(Debug)]
236pub struct TcpRecorder {
237 state: Arc<State>,
238}
239
240#[derive(Debug)]
242pub struct TcpBuilder {
243 listen_addr: SocketAddr,
244 buffer_size: Option<usize>,
245}
246
247impl TcpBuilder {
248 pub fn new() -> TcpBuilder {
250 TcpBuilder { listen_addr: ([0, 0, 0, 0], 5000).into(), buffer_size: Some(1024) }
251 }
252
253 pub fn listen_address<A>(mut self, addr: A) -> TcpBuilder
260 where
261 A: Into<SocketAddr>,
262 {
263 self.listen_addr = addr.into();
264 self
265 }
266
267 pub fn buffer_size(mut self, size: Option<usize>) -> TcpBuilder {
281 self.buffer_size = size;
282 self
283 }
284
285 pub fn install(self) -> Result<(), Error> {
290 let recorder = self.build()?;
291 metrics::set_global_recorder(recorder).map_err(Into::into)
292 }
293
294 pub fn build(self) -> Result<TcpRecorder, Error> {
300 let buffer_size = self.buffer_size;
301 let (tx, rx) = match buffer_size {
302 None => unbounded(),
303 Some(size) => bounded(size),
304 };
305
306 let poll = Poll::new()?;
307 let waker = Waker::new(poll.registry(), WAKER)?;
308
309 let mut listener = TcpListener::bind(self.listen_addr)?;
310 poll.registry().register(&mut listener, LISTENER, Interest::READABLE)?;
311
312 let state = Arc::new(State::new(waker, tx));
313 let recorder = TcpRecorder { state: state.clone() };
314
315 thread::spawn(move || run_transport(poll, listener, rx, state, buffer_size));
316 Ok(recorder)
317 }
318}
319
320impl Default for TcpBuilder {
321 fn default() -> Self {
322 TcpBuilder::new()
323 }
324}
325
326impl Recorder for TcpRecorder {
327 fn describe_counter(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
328 self.state.register_metric(key_name, MetricType::Counter, unit, description);
329 }
330
331 fn describe_gauge(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
332 self.state.register_metric(key_name, MetricType::Gauge, unit, description);
333 }
334
335 fn describe_histogram(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
336 self.state.register_metric(key_name, MetricType::Histogram, unit, description);
337 }
338
339 fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
340 Counter::from_arc(Arc::new(Handle::new(key.clone(), self.state.clone())))
341 }
342
343 fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
344 Gauge::from_arc(Arc::new(Handle::new(key.clone(), self.state.clone())))
345 }
346
347 fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
348 Histogram::from_arc(Arc::new(Handle::new(key.clone(), self.state.clone())))
349 }
350}
351
352#[allow(clippy::mutable_key_type)]
353fn run_transport(
354 mut poll: Poll,
355 listener: TcpListener,
356 rx: Receiver<Event>,
357 state: Arc<State>,
358 buffer_size: Option<usize>,
359) {
360 let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
361 let mut events = Events::with_capacity(1024);
362 let mut clients = HashMap::new();
363 let mut clients_to_remove = Vec::new();
364 let mut metadata = HashMap::new();
365 let mut next_token = START_TOKEN;
366 let mut buffered_pmsgs = VecDeque::with_capacity(buffer_limit);
367
368 loop {
369 let _span = trace_span!("transport");
370
371 let _evspan = trace_span!("event loop");
374 if let Err(e) = poll.poll(&mut events, None) {
375 error!(error = %e, "error during poll");
376 continue;
377 }
378 drop(_evspan);
379
380 trace!(events = events.iter().size_hint().0, "return from poll");
383
384 let _pspan = trace_span!("process events");
385 for event in events.iter() {
386 match event.token() {
387 WAKER => {
388 let _mrxspan = trace_span!("metrics in");
390 loop {
391 if buffered_pmsgs.len() >= buffer_limit {
392 state.wake();
395 break;
396 }
397
398 let msg = match rx.try_recv() {
399 Ok(msg) => msg,
400 Err(e) if e.is_empty() => {
401 trace!("metric rx drained");
402 break;
403 }
404 Err(_) => return,
406 };
407
408 match msg {
409 Event::Metadata(key, metric_type, unit, desc) => {
410 let entry = metadata
411 .entry(key)
412 .or_insert_with(|| (metric_type, None, None));
413 let (_, uentry, dentry) = entry;
414 *uentry = unit;
415 *dentry = Some(desc);
416 }
417 Event::Metric(key, value) => {
418 match convert_metric_to_protobuf_encoded(key, value) {
419 Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
420 Err(e) => error!(error = ?e, "error encoding metric"),
421 }
422 }
423 }
424 }
425 drop(_mrxspan);
426
427 if buffered_pmsgs.is_empty() {
428 trace!("woken for metrics but no pmsgs buffered");
429 continue;
430 }
431
432 for (token, (conn, wbuf, msgs)) in clients.iter_mut() {
434 let done = drive_connection(conn, wbuf, msgs);
437 if done {
438 clients_to_remove.push(*token);
439 state.decrement_clients();
440 continue;
441 }
442
443 let available =
453 if msgs.len() < buffer_limit { buffer_limit - msgs.len() } else { 0 };
454 let to_drain = buffered_pmsgs.len().saturating_sub(available);
455 let _ = msgs.drain(0..to_drain);
456 msgs.extend(buffered_pmsgs.iter().take(buffer_limit).cloned());
457
458 let done = drive_connection(conn, wbuf, msgs);
459 if done {
460 clients_to_remove.push(*token);
461 state.decrement_clients();
462 }
463 }
464
465 buffered_pmsgs.clear();
468
469 for token in clients_to_remove.drain(..) {
471 if let Some((conn, _, _)) = clients.get_mut(&token) {
472 trace!(?conn, ?token, "removing client");
473 clients.remove(&token);
474 state.decrement_clients();
475 }
476 }
477 }
478 LISTENER => {
479 loop {
481 match listener.accept() {
482 Ok((mut conn, _)) => {
483 let token = next(&mut next_token);
485 poll.registry()
486 .register(&mut conn, token, CLIENT_INTEREST)
487 .expect("failed to register interest for client connection");
488
489 state.increment_clients();
490
491 let metadata = generate_metadata_messages(&metadata);
493 clients
494 .insert(token, (conn, None, metadata))
495 .ok_or(())
496 .expect_err("client mapped to existing token!");
497 }
498 Err(ref e) if would_block(e) => break,
499 Err(e) => {
500 error!("caught error while accepting client connections: {:?}", e);
501 return;
502 }
503 }
504 }
505 }
506 token => {
507 if event.is_writable() {
508 if let Some((conn, wbuf, msgs)) = clients.get_mut(&token) {
509 let done = drive_connection(conn, wbuf, msgs);
510 if done {
511 trace!(?conn, ?token, "removing client");
512 clients.remove(&token);
513 state.decrement_clients();
514 }
515 }
516 }
517 }
518 }
519 }
520 }
521}
522
523#[allow(clippy::mutable_key_type)]
524fn generate_metadata_messages(
525 metadata: &HashMap<KeyName, (MetricType, Option<Unit>, Option<SharedString>)>,
526) -> VecDeque<Bytes> {
527 let mut bufs = VecDeque::new();
528 for (key_name, (metric_type, unit, desc)) in metadata.iter() {
529 let msg =
530 convert_metadata_to_protobuf_encoded(key_name, *metric_type, *unit, desc.as_ref())
531 .expect("failed to encode metadata buffer");
532 bufs.push_back(msg);
533 }
534 bufs
535}
536
537#[tracing::instrument(skip(wbuf, msgs))]
538fn drive_connection(
539 conn: &mut TcpStream,
540 wbuf: &mut Option<Bytes>,
541 msgs: &mut VecDeque<Bytes>,
542) -> bool {
543 trace!(?conn, "driving client");
544 loop {
545 let mut buf = match wbuf.take() {
546 Some(buf) => buf,
548 None => match msgs.pop_front() {
549 Some(msg) => msg,
550 None => {
551 trace!("client write queue drained");
552 return false;
553 }
554 },
555 };
556
557 match conn.write(&buf) {
558 Ok(0) => {
560 trace!(?conn, "zero write, closing client");
561 return true;
562 }
563 Ok(n) if n < buf.len() => {
564 let remaining = buf.split_off(n);
568 trace!(?conn, written = n, remaining = remaining.len(), "partial write");
569 wbuf.replace(remaining);
570 return false;
571 }
572 Ok(_) => continue,
573 Err(ref e) if would_block(e) => return false,
574 Err(ref e) if interrupted(e) => return drive_connection(conn, wbuf, msgs),
575 Err(e) => {
576 error!(?conn, error = %e, "write failed");
577 return true;
578 }
579 }
580 }
581}
582
583fn convert_metadata_to_protobuf_encoded(
584 key_name: &KeyName,
585 metric_type: MetricType,
586 unit: Option<Unit>,
587 desc: Option<&SharedString>,
588) -> Result<Bytes, EncodeError> {
589 let name = key_name.as_str().to_string();
590 let metadata = proto::Metadata {
591 name,
592 metric_type: metric_type.into(),
593 unit: unit.map(|u| proto::metadata::Unit::UnitValue(u.as_str().to_owned())),
594 description: desc.map(|d| proto::metadata::Description::DescriptionValue(d.to_string())),
595 };
596 let event = proto::Event { event: Some(proto::event::Event::Metadata(metadata)) };
597
598 let mut buf = Vec::new();
599 event.encode_length_delimited(&mut buf)?;
600 Ok(Bytes::from(buf))
601}
602
603fn convert_metric_to_protobuf_encoded(
604 key: Key,
605 operation: MetricOperation,
606) -> Result<Bytes, EncodeError> {
607 let name = key.name().to_string();
608 let labels = key
609 .labels()
610 .map(|label| (label.key().to_owned(), label.value().to_owned()))
611 .collect::<BTreeMap<_, _>>();
612 let operation = match operation {
613 MetricOperation::IncrementCounter(v) => proto::metric::Operation::IncrementCounter(v),
614 MetricOperation::SetCounter(v) => proto::metric::Operation::SetCounter(v),
615 MetricOperation::IncrementGauge(v) => proto::metric::Operation::IncrementGauge(v),
616 MetricOperation::DecrementGauge(v) => proto::metric::Operation::DecrementGauge(v),
617 MetricOperation::SetGauge(v) => proto::metric::Operation::SetGauge(v),
618 MetricOperation::RecordHistogram(v) => proto::metric::Operation::RecordHistogram(v),
619 };
620
621 let now: prost_types::Timestamp = SystemTime::now().into();
622 let metric = proto::Metric { name, labels, timestamp: Some(now), operation: Some(operation) };
623 let event = proto::Event { event: Some(proto::event::Event::Metric(metric)) };
624
625 let mut buf = Vec::new();
626 event.encode_length_delimited(&mut buf)?;
627 Ok(Bytes::from(buf))
628}
629
630fn next(current: &mut Token) -> Token {
631 let next = current.0;
632 current.0 += 1;
633 Token(next)
634}
635
636fn would_block(err: &io::Error) -> bool {
637 err.kind() == io::ErrorKind::WouldBlock
638}
639
640fn interrupted(err: &io::Error) -> bool {
641 err.kind() == io::ErrorKind::Interrupted
642}