use std::collections::VecDeque;
use std::error;
use std::fmt;
use std::io::Error;
use std::net::AddrParseError;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::time;
extern crate rand;
#[derive(Debug)]
pub enum StatsdError {
IoError(Error),
AddrParseError(String),
}
impl From<AddrParseError> for StatsdError {
fn from(_: AddrParseError) -> StatsdError {
StatsdError::AddrParseError("Address parsing error".to_string())
}
}
impl From<Error> for StatsdError {
fn from(err: Error) -> StatsdError {
StatsdError::IoError(err)
}
}
impl fmt::Display for StatsdError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
StatsdError::IoError(ref e) => write!(f, "{}", e),
StatsdError::AddrParseError(ref e) => write!(f, "{}", e),
}
}
}
impl error::Error for StatsdError {}
pub struct Client {
socket: UdpSocket,
server_address: SocketAddr,
prefix: String,
constant_tags: Vec<String>,
}
impl Client {
pub fn new<T: ToSocketAddrs>(
host: T,
prefix: &str,
constant_tags: Option<Vec<&str>>,
) -> Result<Client, StatsdError> {
let server_address = host
.to_socket_addrs()?
.next()
.ok_or_else(|| StatsdError::AddrParseError("Address parsing error".to_string()))?;
let socket = if server_address.is_ipv4() {
UdpSocket::bind("0.0.0.0:0")?
} else {
UdpSocket::bind("[::]:0")?
};
Ok(Client {
socket,
prefix: prefix.to_string(),
server_address,
constant_tags: match constant_tags {
Some(tags) => tags.iter().map(|x| x.to_string()).collect(),
None => vec![],
},
})
}
pub fn incr(&self, metric: &str, tags: &Option<Vec<&str>>) {
self.count(metric, 1.0, tags);
}
pub fn decr(&self, metric: &str, tags: &Option<Vec<&str>>) {
self.count(metric, -1.0, tags);
}
pub fn count(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
let data = self.prepare_with_tags(format!("{}:{}|c", metric, value), tags);
self.send(data);
}
pub fn sampled_count(&self, metric: &str, value: f64, rate: f64, tags: &Option<Vec<&str>>) {
if rand::random::<f64>() >= rate {
return;
}
let data = self.prepare_with_tags(format!("{}:{}|c|@{}", metric, value, rate), tags);
self.send(data);
}
pub fn gauge(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
let data = self.prepare_with_tags(format!("{}:{}|g", metric, value), tags);
self.send(data);
}
pub fn timer(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
let data = self.prepare_with_tags(format!("{}:{}|ms", metric, value), tags);
self.send(data);
}
pub fn time<F, R>(&self, metric: &str, tags: &Option<Vec<&str>>, callable: F) -> R
where
F: FnOnce() -> R,
{
let start = time::Instant::now();
let return_val = callable();
let used = start.elapsed();
let data = self.prepare_with_tags(format!("{}:{}|ms", metric, used.as_millis()), tags);
self.send(data);
return_val
}
fn prepare<T: AsRef<str>>(&self, data: T) -> String {
if self.prefix.is_empty() {
data.as_ref().to_string()
} else {
format!("{}.{}", self.prefix, data.as_ref())
}
}
fn prepare_with_tags<T: AsRef<str>>(&self, data: T, tags: &Option<Vec<&str>>) -> String {
self.append_tags(self.prepare(data), tags)
}
fn append_tags<T: AsRef<str>>(&self, data: T, tags: &Option<Vec<&str>>) -> String {
if self.constant_tags.is_empty() && tags.is_none() {
data.as_ref().to_string()
} else {
let mut all_tags = self.constant_tags.clone();
match tags {
Some(v) => {
for tag in v {
all_tags.push(tag.to_string());
}
}
None => {
}
}
format!("{}|#{}", data.as_ref(), all_tags.join(","))
}
}
fn send(&self, data: String) {
let _ = self.socket.send_to(data.as_bytes(), self.server_address);
}
pub fn pipeline(&self) -> Pipeline {
Pipeline::new()
}
pub fn histogram(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
let data = self.prepare_with_tags(format!("{}:{}|h", metric, value), tags);
self.send(data);
}
pub fn event(&self, title: &str, text: &str, alert_type: AlertType, tags: &Option<Vec<&str>>) {
let mut d = vec![];
d.push(format!("_e{{{},{}}}:{}", title.len(), text.len(), title));
d.push(text.to_string());
if alert_type != AlertType::Info {
d.push(format!("t:{}", alert_type.to_string().to_lowercase()))
}
let event_with_tags = self.append_tags(d.join("|"), tags);
self.send(event_with_tags)
}
pub fn service_check(
&self,
service_check_name: &str,
status: ServiceCheckStatus,
tags: &Option<Vec<&str>>,
) {
let mut d = vec![];
let status_code = (status as u32).to_string();
d.push("_sc");
d.push(service_check_name);
d.push(&status_code);
let sc_with_tags = self.append_tags(d.join("|"), tags);
self.send(sc_with_tags)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AlertType {
Info,
Error,
Warning,
Success,
}
impl fmt::Display for AlertType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ServiceCheckStatus {
Ok = 0,
Warning = 1,
Critical = 2,
Unknown = 3,
}
pub struct Pipeline {
stats: VecDeque<String>,
max_udp_size: usize,
}
impl Pipeline {
pub fn new() -> Pipeline {
Pipeline {
stats: VecDeque::new(),
max_udp_size: 512,
}
}
pub fn set_max_udp_size(&mut self, max_udp_size: usize) {
self.max_udp_size = max_udp_size;
}
pub fn incr(&mut self, metric: &str) {
self.count(metric, 1.0);
}
pub fn decr(&mut self, metric: &str) {
self.count(metric, -1.0);
}
pub fn count(&mut self, metric: &str, value: f64) {
let data = format!("{}:{}|c", metric, value);
self.stats.push_back(data);
}
pub fn sampled_count(&mut self, metric: &str, value: f64, rate: f64) {
if rand::random::<f64>() >= rate {
return;
}
let data = format!("{}:{}|c|@{}", metric, value, rate);
self.stats.push_back(data);
}
pub fn gauge(&mut self, metric: &str, value: f64) {
let data = format!("{}:{}|g", metric, value);
self.stats.push_back(data);
}
pub fn timer(&mut self, metric: &str, value: f64) {
let data = format!("{}:{}|ms", metric, value);
self.stats.push_back(data);
}
pub fn time<F>(&mut self, metric: &str, callable: F)
where
F: FnOnce(),
{
let start = time::Instant::now();
callable();
let used = start.elapsed();
let data = format!("{}:{}|ms", metric, used.as_millis());
self.stats.push_back(data);
}
pub fn histogram(&mut self, metric: &str, value: f64) {
let data = format!("{}:{}|h", metric, value);
self.stats.push_back(data);
}
pub fn send(&mut self, client: &Client) {
let mut _data = String::new();
if let Some(data) = self.stats.pop_front() {
_data += client.prepare(&data).as_ref();
while !self.stats.is_empty() {
let stat = client.prepare(self.stats.pop_front().unwrap());
if data.len() + stat.len() + 1 > self.max_udp_size {
client.send(_data.clone());
_data.clear();
_data += &stat;
} else {
_data += "\n";
_data += &stat;
}
}
}
if !_data.is_empty() {
client.send(_data);
}
}
}
#[cfg(test)]
mod test {
extern crate rand;
use self::rand::distributions::{IndependentSample, Range};
use super::*;
use std::net::UdpSocket;
use std::str;
use std::sync::mpsc::sync_channel;
use std::thread;
static PORT: u16 = 8125;
fn next_test_ip4() -> String {
let range = Range::new(0, 1000);
let mut rng = rand::thread_rng();
let port = PORT + range.ind_sample(&mut rng);
format!("127.0.0.1:{}", port)
}
fn make_server(host: &str) -> UdpSocket {
UdpSocket::bind(host).ok().unwrap()
}
fn server_recv(server: UdpSocket) -> String {
let (serv_tx, serv_rx) = sync_channel(1);
let _t = thread::spawn(move || {
let mut buf = [0; 128];
let (len, _) = match server.recv_from(&mut buf) {
Ok(r) => r,
Err(_) => panic!("No response from test server."),
};
drop(server);
let bytes = Vec::from(&buf[0..len]);
serv_tx.send(bytes).unwrap();
});
let bytes = serv_rx.recv().ok().unwrap();
str::from_utf8(&bytes).unwrap().to_string()
}
#[test]
fn test_sending_gauge() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.gauge("metric", 9.1, &None);
let response = server_recv(server);
assert_eq!("myapp.metric:9.1|g", response);
}
#[test]
fn test_sending_gauge_without_prefix() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "", None).unwrap();
client.gauge("metric", 9.1, &None);
let response = server_recv(server);
assert_eq!("metric:9.1|g", response);
}
#[test]
fn test_sending_incr() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.incr("metric", &None);
let response = server_recv(server);
assert_eq!("myapp.metric:1|c", response);
}
#[test]
fn test_sending_decr() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.decr("metric", &None);
let response = server_recv(server);
assert_eq!("myapp.metric:-1|c", response);
}
#[test]
fn test_sending_count() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.count("metric", 12.2, &None);
let response = server_recv(server);
assert_eq!("myapp.metric:12.2|c", response);
}
#[test]
fn test_sending_timer() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.timer("metric", 21.39, &None);
let response = server_recv(server);
assert_eq!("myapp.metric:21.39|ms", response);
}
#[test]
fn test_sending_timed_block() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
struct TimeTest {
num: u8,
};
let mut t = TimeTest { num: 10 };
let output = client.time("time_block", &None, || {
t.num += 2;
"a string"
});
let response = server_recv(server);
assert_eq!(output, "a string");
assert_eq!(t.num, 12);
assert!(response.contains("myapp.time_block"));
assert!(response.contains("|ms"));
}
#[test]
fn test_sending_histogram() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.histogram("metric", 9.1, &None);
let mut response = server_recv(server.try_clone().unwrap());
assert_eq!("myapp.metric:9.1|h", response);
client.histogram("metric", 9.1, &Some(vec!["tag1", "tag2:test"]));
response = server_recv(server.try_clone().unwrap());
assert_eq!("myapp.metric:9.1|h|#tag1,tag2:test", response);
}
#[test]
fn test_sending_histogram_with_constant_tags() {
let host = next_test_ip4();
let server = make_server(&host);
let client =
Client::new(&host, "myapp", Some(vec!["tag1common", "tag2common:test"])).unwrap();
client.histogram("metric", 9.1, &None);
let mut response = server_recv(server.try_clone().unwrap());
assert_eq!("myapp.metric:9.1|h|#tag1common,tag2common:test", response);
let tags = &Some(vec!["tag1", "tag2:test"]);
client.histogram("metric", 9.1, tags);
response = server_recv(server.try_clone().unwrap());
assert_eq!(
"myapp.metric:9.1|h|#tag1common,tag2common:test,tag1,tag2:test",
response
);
client.histogram("metric", 19.12, tags);
response = server_recv(server.try_clone().unwrap());
assert_eq!(
"myapp.metric:19.12|h|#tag1common,tag2common:test,tag1,tag2:test",
response
);
}
#[test]
fn test_sending_event_with_tags() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.event(
"Title Test",
"Text ABC",
AlertType::Error,
&Some(vec!["tag1", "tag2:test"]),
);
let response = server_recv(server);
assert_eq!(
"_e{10,8}:Title Test|Text ABC|t:error|#tag1,tag2:test",
response
);
}
#[test]
fn test_sending_service_check_with_tags() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
client.service_check(
"Service.check.name",
ServiceCheckStatus::Critical,
&Some(vec!["tag1", "tag2:test"]),
);
let response = server_recv(server);
assert_eq!("_sc|Service.check.name|2|#tag1,tag2:test", response);
}
#[test]
fn test_pipeline_sending_time_block() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
let mut pipeline = client.pipeline();
pipeline.gauge("metric", 9.1);
struct TimeTest {
num: u8,
};
let mut t = TimeTest { num: 10 };
pipeline.time("time_block", || {
t.num += 2;
});
pipeline.send(&client);
let response = server_recv(server);
assert_eq!(t.num, 12);
assert_eq!("myapp.metric:9.1|g\nmyapp.time_block:0|ms", response);
}
#[test]
fn test_pipeline_sending_gauge() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
let mut pipeline = client.pipeline();
pipeline.gauge("metric", 9.1);
pipeline.send(&client);
let response = server_recv(server);
assert_eq!("myapp.metric:9.1|g", response);
}
#[test]
fn test_pipeline_sending_histogram() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
let mut pipeline = client.pipeline();
pipeline.histogram("metric", 9.1);
pipeline.send(&client);
let response = server_recv(server);
assert_eq!("myapp.metric:9.1|h", response);
}
#[test]
fn test_pipeline_sending_multiple_data() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
let mut pipeline = client.pipeline();
pipeline.gauge("metric", 9.1);
pipeline.count("metric", 12.2);
pipeline.send(&client);
let response = server_recv(server);
assert_eq!("myapp.metric:9.1|g\nmyapp.metric:12.2|c", response);
}
#[test]
fn test_pipeline_set_max_udp_size() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
let mut pipeline = client.pipeline();
pipeline.set_max_udp_size(20);
pipeline.gauge("metric", 9.1);
pipeline.count("metric", 12.2);
pipeline.send(&client);
let response = server_recv(server);
assert_eq!("myapp.metric:9.1|g", response);
}
#[test]
fn test_pipeline_send_metric_after_pipeline() {
let host = next_test_ip4();
let server = make_server(&host);
let client = Client::new(&host, "myapp", None).unwrap();
let mut pipeline = client.pipeline();
pipeline.gauge("load", 9.0);
pipeline.count("customers", 7.0);
pipeline.send(&client);
client.count("customers", 6.0, &None);
let response = server_recv(server);
assert_eq!("myapp.load:9|g\nmyapp.customers:7|c", response);
}
}