pub mod macros;
pub mod protocol;
use std::{
fmt,
io::{self, Error, Write},
net::{Shutdown, SocketAddr, TcpStream, UdpSocket},
};
#[cfg(target_family = "unix")]
use std::os::unix::net::{UnixDatagram, UnixStream};
use url::Url;
use protocol::*;
pub use protocol::{FieldData, IntoFieldData};
pub use telegraf_derive::*;
pub type TelegrafResult = Result<(), TelegrafError>;
pub trait Metric {
fn to_point(&self) -> Point;
}
#[derive(Debug)]
pub enum TelegrafError {
IoError(Error),
ConnectionError(String),
BadProtocol(String),
}
#[derive(Debug, Clone, PartialEq)]
pub struct Point {
pub measurement: String,
pub tags: Vec<Tag>,
pub fields: Vec<Field>,
pub timestamp: Option<Timestamp>,
}
pub struct Client {
conn: Connector,
}
enum Connector {
Tcp(TcpStream),
Udp(UdpSocket),
#[cfg(target_family = "unix")]
Unix(UnixStream),
#[cfg(target_family = "unix")]
Unixgram(UnixDatagram),
}
impl Point {
pub fn new(
measurement: String,
tags: Vec<(String, String)>,
fields: Vec<(String, Box<dyn IntoFieldData>)>,
timestamp: Option<u64>,
) -> Self {
let t = tags
.into_iter()
.map(|(n, v)| Tag { name: n, value: v })
.collect();
let f = fields
.into_iter()
.map(|(n, v)| Field {
name: n,
value: v.field_data(),
})
.collect();
let ts = timestamp.map(|t| Timestamp { value: t });
Self {
measurement,
tags: t,
fields: f,
timestamp: ts,
}
}
fn to_lp(&self) -> LineProtocol {
let tag_attrs: Vec<Attr> = self.tags.iter().cloned().map(Attr::Tag).collect();
let field_attrs: Vec<Attr> = self.fields.iter().cloned().map(Attr::Field).collect();
let timestamp_attr: Vec<Attr> = self
.timestamp
.iter()
.cloned()
.map(Attr::Timestamp)
.collect();
let tag_str = if tag_attrs.is_empty() {
None
} else {
Some(format_attr(tag_attrs))
};
let field_str = format_attr(field_attrs);
let timestamp_str = if timestamp_attr.is_empty() {
None
} else {
Some(format_attr(timestamp_attr))
};
LineProtocol::new(self.measurement.clone(), tag_str, field_str, timestamp_str)
}
}
impl Client {
pub fn new(conn_url: &str) -> Result<Self, TelegrafError> {
let conn = Connector::new(conn_url)?;
Ok(Self { conn })
}
pub fn write_point(&mut self, pt: &Point) -> TelegrafResult {
if pt.fields.is_empty() {
return Err(TelegrafError::BadProtocol(
"points must have at least 1 field".to_owned(),
));
}
let lp = pt.to_lp();
let bytes = lp.to_str().as_bytes();
self.write_to_conn(bytes)
}
pub fn write_points(&mut self, pts: &[Point]) -> TelegrafResult {
if pts.iter().any(|p| p.fields.is_empty()) {
return Err(TelegrafError::BadProtocol(
"points must have at least 1 field".to_owned(),
));
}
let lp = pts
.iter()
.map(|p| p.to_lp().to_str().to_owned())
.collect::<Vec<String>>()
.join("");
self.write_to_conn(lp.as_bytes())
}
pub fn write<M: Metric>(&mut self, metric: &M) -> TelegrafResult {
let pt = metric.to_point();
self.write_point(&pt)
}
pub fn close(&self) -> io::Result<()> {
self.conn.close()
}
pub fn write_to_conn(&mut self, data: &[u8]) -> TelegrafResult {
self.conn.write(data).map(|_| Ok(()))?
}
}
impl Connector {
fn close(&self) -> io::Result<()> {
use Connector::*;
match self {
Tcp(c) => c.shutdown(Shutdown::Both),
#[cfg(target_family = "unix")]
Unix(c) => c.shutdown(Shutdown::Both),
#[cfg(target_family = "unix")]
Unixgram(c) => c.shutdown(Shutdown::Both),
Udp(_) => Ok(()),
}
}
fn write(&mut self, buf: &[u8]) -> io::Result<()> {
let r = match self {
Self::Tcp(c) => c.write(buf),
Self::Udp(c) => c.send(buf),
#[cfg(target_family = "unix")]
Self::Unix(c) => c.write(buf),
#[cfg(target_family = "unix")]
Self::Unixgram(c) => c.send(buf),
};
r.map(|_| Ok(()))?
}
fn new(url: &str) -> Result<Self, TelegrafError> {
match Url::parse(url) {
Ok(u) => {
let scheme = u.scheme();
match scheme {
"tcp" => {
let addr = u.socket_addrs(|| None)?;
let conn = TcpStream::connect(&*addr)?;
Ok(Connector::Tcp(conn))
}
"udp" => {
let addr = u.socket_addrs(|| None)?;
let conn = UdpSocket::bind(&[SocketAddr::from(([0, 0, 0, 0], 0))][..])?;
conn.connect(&*addr)?;
conn.set_nonblocking(true)?;
Ok(Connector::Udp(conn))
}
#[cfg(target_family = "unix")]
"unix" => {
let path = u.path();
let conn = UnixStream::connect(path)?;
Ok(Connector::Unix(conn))
}
#[cfg(target_family = "unix")]
"unixgram" => {
let path = u.path();
let conn = UnixDatagram::unbound()?;
conn.connect(path)?;
conn.set_nonblocking(true)?;
Ok(Connector::Unixgram(conn))
}
_ => Err(TelegrafError::BadProtocol(format!(
"unknown connection protocol {}",
scheme
))),
}
}
Err(_) => Err(TelegrafError::BadProtocol(format!(
"invalid connection URL {}",
url
))),
}
}
}
impl fmt::Display for TelegrafError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
TelegrafError::IoError(ref e) => write!(f, "{}", e),
TelegrafError::ConnectionError(ref e) => write!(f, "{}", e),
TelegrafError::BadProtocol(ref e) => write!(f, "{}", e),
}
}
}
impl From<Error> for TelegrafError {
fn from(e: Error) -> Self {
Self::ConnectionError(e.to_string())
}
}
trait TelegrafUnwrap<T> {
fn t_unwrap(self, msg: &str) -> Result<T, TelegrafError>;
}
impl<T> TelegrafUnwrap<T> for Option<T> {
fn t_unwrap(self, msg: &str) -> Result<T, TelegrafError> {
self.ok_or_else(|| TelegrafError::ConnectionError(msg.to_owned()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn can_create_point_lp_ts_no_tags() {
let p = Point::new(
String::from("Foo"),
vec![],
vec![
("f1".to_owned(), Box::new(10)),
("f2".to_owned(), Box::new(10.3)),
],
Some(10),
);
let lp = p.to_lp();
assert_eq!(lp.to_str(), "Foo f1=10i,f2=10.3 10\n");
}
#[test]
fn can_create_point_lp_ts() {
let p = Point::new(
String::from("Foo"),
vec![("t1".to_owned(), "v".to_owned())],
vec![
("f1".to_owned(), Box::new(10)),
("f2".to_owned(), Box::new(10.3)),
("f3".to_owned(), Box::new("b")),
],
Some(10),
);
let lp = p.to_lp();
assert_eq!(lp.to_str(), "Foo,t1=v f1=10i,f2=10.3,f3=\"b\" 10\n");
}
#[test]
fn can_create_point_lp() {
let p = Point::new(
String::from("Foo"),
vec![("t1".to_owned(), "v".to_owned())],
vec![
("f1".to_owned(), Box::new(10)),
("f2".to_owned(), Box::new(10.3)),
("f3".to_owned(), Box::new("b")),
],
None,
);
let lp = p.to_lp();
assert_eq!(lp.to_str(), "Foo,t1=v f1=10i,f2=10.3,f3=\"b\"\n");
}
#[test]
fn can_create_point_lp_no_tags() {
let p = Point::new(
String::from("Foo"),
vec![],
vec![
("f1".to_owned(), Box::new(10)),
("f2".to_owned(), Box::new(10.3)),
],
None,
);
let lp = p.to_lp();
assert_eq!(lp.to_str(), "Foo f1=10i,f2=10.3\n");
}
}