use encoder::{Encoder, ProtobufEncoder};
use errors::{Error, Result};
use hyper::client::Client;
use hyper::client::pool::Config;
use hyper::header::ContentType;
use hyper::method::Method;
use hyper::status::StatusCode;
use metrics::Collector;
use proto;
use registry::Registry;
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::str::{self, FromStr};
const HYPER_MAX_IDLE: usize = 1;
lazy_static!{
static ref HTTP_CLIENT: Client = Client::with_pool_config(
Config{
max_idle: HYPER_MAX_IDLE,
}
);
}
pub fn push_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
) -> Result<()> {
push(job, grouping, url, mfs, "PUT")
}
pub fn push_add_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
) -> Result<()> {
push(job, grouping, url, mfs, "POST")
}
const LABEL_NAME_JOB: &str = "job";
fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
) -> Result<()> {
let grouping = grouping;
let mfs = mfs;
let mut push_url = if url.contains("://") {
url.to_owned()
} else {
format!("http://{}", url)
};
if push_url.ends_with('/') {
push_url.pop();
}
let mut url_components = Vec::new();
if job.contains('/') {
return Err(Error::Msg(format!("job contains '/': {}", job)));
}
url_components.push(job.to_owned());
for (ln, lv) in &grouping {
if lv.contains('/') {
return Err(Error::Msg(format!(
"value of grouping label {} contains '/': {}",
ln,
lv
)));
}
url_components.push(ln.to_owned());
url_components.push(lv.to_owned());
}
push_url = format!("{}/metrics/job/{}", push_url, url_components.join("/"));
for mf in &mfs {
for m in mf.get_metric() {
for lp in m.get_label() {
if lp.get_name() == LABEL_NAME_JOB {
return Err(Error::Msg(format!(
"pushed metric {} already contains a \
job label",
mf.get_name()
)));
}
if grouping.contains_key(lp.get_name()) {
return Err(Error::Msg(format!(
"pushed metric {} already contains \
grouping label {}",
mf.get_name(),
lp.get_name()
)));
}
}
}
}
let encoder = ProtobufEncoder::new();
let mut buf = Vec::new();
encoder.encode(&mfs, &mut buf)?;
let request = HTTP_CLIENT
.request(Method::from_str(method).unwrap(), &push_url)
.header(ContentType(encoder.format_type().parse().unwrap()))
.body(buf.as_slice());
let response = request.send().map_err(|e| Error::Msg(format!("{}", e)))?;
match response.status {
StatusCode::Accepted => Ok(()),
_ => Err(Error::Msg(format!(
"unexpected status code {} while pushing to {}",
response.status,
push_url
))),
}
}
fn push_from_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<Collector>>,
method: &str,
) -> Result<()> {
let registry = Registry::new();
for bc in collectors {
registry.register(bc)?;
}
let mfs = registry.gather();
push(job, grouping, url, mfs, method)
}
pub fn push_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<Collector>>,
) -> Result<()> {
push_from_collector(job, grouping, url, collectors, "PUT")
}
pub fn push_add_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<Collector>>,
) -> Result<()> {
push_from_collector(job, grouping, url, collectors, "POST")
}
const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
#[cfg(not(target_os = "windows"))]
pub fn hostname_grouping_key() -> HashMap<String, String> {
use libc;
let max_len = 256;
let mut name = vec![0u8; max_len];
match unsafe {
libc::gethostname(
name.as_mut_ptr() as *mut libc::c_char,
max_len as libc::size_t,
)
} {
0 => {
let last_char = name.iter().position(|byte| *byte == 0).unwrap_or(max_len);
labels!{
DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => str::from_utf8(&name[..last_char])
.unwrap_or(DEFAULT_GROUP_LABEL_PAIR.1).to_owned(),
}
}
_ => {
labels!{DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => DEFAULT_GROUP_LABEL_PAIR.1.to_owned(),}
}
}
}
#[cfg(target_os = "windows")]
pub fn hostname_grouping_key() -> HashMap<String, String> {
labels!{DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => DEFAULT_GROUP_LABEL_PAIR.1.to_owned(),}
}
#[cfg(test)]
mod tests {
use super::*;
use proto;
use protobuf::RepeatedField;
use std::error::Error;
#[test]
fn test_hostname_grouping_key() {
let map = hostname_grouping_key();
assert!(!map.is_empty());
}
#[test]
fn test_push_bad_label_name() {
let table = vec![
(LABEL_NAME_JOB, "job label"),
(DEFAULT_GROUP_LABEL_PAIR.0, "grouping label"),
];
for case in table {
let mut l = proto::LabelPair::new();
l.set_name(case.0.to_owned());
let mut m = proto::Metric::new();
m.set_label(RepeatedField::from_vec(vec![l]));
let mut mf = proto::MetricFamily::new();
mf.set_metric(RepeatedField::from_vec(vec![m]));
let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf]);
assert!(res.unwrap_err().description().contains(case.1));
}
}
}