#![deny(missing_docs)]
#![cfg_attr(not(test), deny(clippy::panic))]
#![cfg_attr(not(test), deny(clippy::unwrap_used))]
#![cfg_attr(not(test), deny(clippy::expect_used))]
#![cfg_attr(not(test), deny(clippy::todo))]
#![cfg_attr(not(test), deny(clippy::unimplemented))]
use libdd_common::tag::Tag;
use libdd_common::Endpoint;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tracing::error;
use anyhow::anyhow;
use cadence::prelude::*;
#[cfg(unix)]
use cadence::UnixMetricSink;
use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink};
#[cfg(unix)]
use libdd_common::connector::uds::socket_path_from_uri;
use std::net::{ToSocketAddrs, UdpSocket};
#[cfg(unix)]
use std::os::unix::net::UnixDatagram;
use std::sync::{Arc, Mutex};
const QUEUE_SIZE: usize = 32 * 1024;
#[derive(Debug, Serialize, Deserialize)]
pub enum DogStatsDActionOwned {
#[allow(missing_docs)]
Count(String, i64, Vec<Tag>),
#[allow(missing_docs)]
Distribution(String, f64, Vec<Tag>),
#[allow(missing_docs)]
Gauge(String, f64, Vec<Tag>),
#[allow(missing_docs)]
Histogram(String, f64, Vec<Tag>),
Set(String, i64, Vec<Tag>),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum DogStatsDAction<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>> {
#[allow(missing_docs)]
Count(T, i64, V),
#[allow(missing_docs)]
Distribution(T, f64, V),
#[allow(missing_docs)]
Gauge(T, f64, V),
#[allow(missing_docs)]
Histogram(T, f64, V),
Set(T, i64, V),
}
#[derive(Debug, Default)]
pub struct Client {
client: Mutex<Arc<Option<StatsdClient>>>,
endpoint: Option<Endpoint>,
}
pub fn new(endpoint: Endpoint) -> anyhow::Result<Client> {
Ok(Client {
endpoint: Some(endpoint),
..Default::default()
})
}
impl Client {
pub fn send_owned(&self, actions: Vec<DogStatsDActionOwned>) {
let client_opt = match self.get_or_init_client() {
Ok(client) => client,
Err(e) => {
error!(?e, "Failed to get client");
return;
}
};
if let Some(client) = &*client_opt {
for action in actions {
if let Err(err) = match action {
DogStatsDActionOwned::Count(metric, value, tags) => {
do_send(client.count_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Distribution(metric, value, tags) => {
do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Histogram(metric, value, tags) => {
do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), &tags)
}
} {
error!(?err, "Error while sending metric");
}
}
}
}
pub fn send<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>>(
&self,
actions: Vec<DogStatsDAction<'a, T, V>>,
) {
let client_opt = match self.get_or_init_client() {
Ok(client) => client,
Err(e) => {
error!(?e, "Failed to get client");
return;
}
};
if let Some(client) = &*client_opt {
for action in actions {
if let Err(err) = match action {
DogStatsDAction::Count(metric, value, tags) => {
let metric_builder = client.count_with_tags(metric.as_ref(), value);
do_send(metric_builder, tags)
}
DogStatsDAction::Distribution(metric, value, tags) => {
do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Histogram(metric, value, tags) => {
do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), tags)
}
} {
error!(?err, "Error while sending metric");
}
}
}
}
fn get_or_init_client(&self) -> anyhow::Result<Arc<Option<StatsdClient>>> {
if let Some(endpoint) = &self.endpoint {
let mut client_guard = self
.client
.lock()
.map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {e}"))?;
return if client_guard.is_some() {
Ok(client_guard.clone())
} else {
let client = Arc::new(Some(create_client(endpoint)?));
*client_guard = client.clone();
Ok(client)
};
}
Ok(None.into())
}
}
fn do_send<'m, 't, T, V: IntoIterator<Item = &'t Tag>>(
mut builder: MetricBuilder<'m, '_, T>,
tags: V,
) -> anyhow::Result<()>
where
T: Metric + From<String>,
't: 'm,
{
let mut tags_iter = tags.into_iter();
let mut tag_opt = tags_iter.next();
#[allow(clippy::unwrap_used)]
while tag_opt.is_some() {
builder = builder.with_tag_value(tag_opt.unwrap().as_ref());
tag_opt = tags_iter.next();
}
builder.try_send()?;
Ok(())
}
fn create_client(endpoint: &Endpoint) -> anyhow::Result<StatsdClient> {
match endpoint.url.scheme_str() {
#[cfg(unix)]
Some("unix") => {
let socket = UnixDatagram::unbound()
.map_err(|e| anyhow!("failed to make unbound unix port: {}", e))?;
socket
.set_nonblocking(true)
.map_err(|e| anyhow!("failed to set socket to nonblocking: {}", e))?;
let sink = QueuingMetricSink::with_capacity(
UnixMetricSink::from(
socket_path_from_uri(&endpoint.url)
.map_err(|e| anyhow!("failed to build socket path from uri: {}", e))?,
socket,
),
QUEUE_SIZE,
);
Ok(StatsdClient::from_sink("", sink))
}
_ => {
let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?;
let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16();
let server_address = (host, port)
.to_socket_addrs()?
.next()
.ok_or(anyhow!("invalid address"))?;
let socket = if server_address.is_ipv4() {
UdpSocket::bind("0.0.0.0:0")
.map_err(|e| anyhow!("failed to bind to 0.0.0.0:0: {}", e))?
} else {
UdpSocket::bind("[::]:0").map_err(|e| anyhow!("failed to bind to [::]:0: {}", e))?
};
socket.set_nonblocking(true)?;
let sink = QueuingMetricSink::with_capacity(
UdpMetricSink::from((host, port), socket)
.map_err(|e| anyhow!("failed to build UdpMetricSink: {}", e))?,
QUEUE_SIZE,
);
Ok(StatsdClient::from_sink("", sink))
}
}
}
#[cfg(test)]
mod test {
use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set};
use crate::{create_client, new, DogStatsDActionOwned};
#[cfg(unix)]
use http::Uri;
#[cfg(unix)]
use libdd_common::connector::uds::socket_path_to_uri;
use libdd_common::{tag, Endpoint};
use std::net;
use std::sync::Arc;
use std::time::Duration;
#[test]
#[cfg_attr(miri, ignore)]
fn test_flusher() {
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
let flusher = new(Endpoint::from_slice(
socket.local_addr().unwrap().to_string().as_str(),
))
.unwrap();
flusher.send(vec![
Count("test_count", 3, &vec![tag!("foo", "bar")]),
Count("test_neg_count", -2, &vec![]),
Distribution("test_distribution", 4.2, &vec![]),
Gauge("test_gauge", 7.6, &vec![]),
Histogram("test_histogram", 8.0, &vec![]),
Set("test_set", 9, &vec![tag!("the", "end")]),
Set("test_neg_set", -1, &vec![]),
]);
fn read(socket: &net::UdpSocket) -> String {
let mut buf = [0; 100];
socket.recv(&mut buf).expect("No data");
let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap());
datagram.trim_matches(char::from(0)).to_string()
}
assert_eq!("test_count:3|c|#foo:bar", read(&socket));
assert_eq!("test_neg_count:-2|c", read(&socket));
assert_eq!("test_distribution:4.2|d", read(&socket));
assert_eq!("test_gauge:7.6|g", read(&socket));
assert_eq!("test_histogram:8|h", read(&socket));
assert_eq!("test_set:9|s|#the:end", read(&socket));
assert_eq!("test_neg_set:-1|s", read(&socket));
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_create_client_udp() {
let res = create_client(&Endpoint::default());
assert!(res.is_err());
assert_eq!("invalid host", res.unwrap_err().to_string().as_str());
let res = create_client(&Endpoint::from_slice("localhost:99999"));
assert!(res.is_err());
assert_eq!("invalid port", res.unwrap_err().to_string().as_str());
let res = create_client(&Endpoint::from_slice("localhost:80"));
assert!(res.is_ok());
let res = create_client(&Endpoint::from_slice("http://localhost:80"));
assert!(res.is_ok());
}
#[test]
#[cfg(unix)]
#[cfg_attr(miri, ignore)]
fn test_create_client_unix_domain_socket() {
let res = create_client(&Endpoint::from_url(
"unix://localhost:80".parse::<Uri>().unwrap(),
));
assert!(res.is_err());
assert_eq!(
"failed to build socket path from uri: invalid url",
res.unwrap_err().to_string().as_str()
);
let res = create_client(&Endpoint::from_url(
socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(),
));
assert!(res.is_ok());
}
#[test]
fn test_owned_sync() {
let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]);
match owned_act {
DogStatsDActionOwned::Count(_, _, _) => {}
DogStatsDActionOwned::Distribution(_, _, _) => {}
DogStatsDActionOwned::Gauge(_, _, _) => {}
DogStatsDActionOwned::Histogram(_, _, _) => {}
DogStatsDActionOwned::Set(_, _, _) => {}
}
let act = Count("test".to_string(), 1, vec![]);
match act {
Count(_, _, _) => {}
Distribution(_, _, _) => {}
Gauge(_, _, _) => {}
Histogram(_, _, _) => {}
Set(_, _, _) => {}
}
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_thread_safety() {
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str());
let flusher = Arc::new(new(endpoint.clone()).unwrap());
{
let client = flusher
.client
.lock()
.expect("failed to obtain lock on client");
assert!(client.is_none());
}
let tasks: Vec<_> = (0..10)
.map(|_| {
let flusher_clone = Arc::clone(&flusher);
tokio::spawn(async move {
flusher_clone.send(vec![
Count("test_count", 3, &vec![tag!("foo", "bar")]),
Count("test_neg_count", -2, &vec![]),
Distribution("test_distribution", 4.2, &vec![]),
Gauge("test_gauge", 7.6, &vec![]),
Histogram("test_histogram", 8.0, &vec![]),
Set("test_set", 9, &vec![tag!("the", "end")]),
Set("test_neg_set", -1, &vec![]),
]);
let client = flusher_clone
.client
.lock()
.expect("failed to obtain lock on client within send thread");
assert!(client.is_some());
})
})
.collect();
for task in tasks {
task.await.unwrap();
}
}
}