#![deny(missing_docs)]
use std::io;
use std::env;
use std::cell::RefCell;
use std::error::Error;
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Duration;
use futures::{Future, Stream};
use tokio_core::net::{UdpCodec, UdpSocket};
use tokio_core::reactor::Core;
use tokio_timer::Timer;
use backend::Backend;
use cache::CapellaCache;
use parse::{self, Metric};
pub struct StatsCodec;
impl UdpCodec for StatsCodec {
type In = (SocketAddr, Vec<Metric>);
type Out = SocketAddr;
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
let mut metrics = Vec::new();
if buf.contains(&b'\n') {
for m in buf.split(|c| *c == b'\n').filter(|chunk| chunk.len() > 0) {
let metric = parse::parse_metric(m);
if metric.is_ok() {
metrics.push(metric.unwrap());
}
}
return Ok((*addr, metrics));
}
let metric = parse::parse_metric(buf);
if metric.is_ok() {
metrics.push(metric.unwrap());
}
Ok((*addr, metrics))
}
fn encode(&mut self, addr: Self::Out, _: &mut Vec<u8>) -> SocketAddr {
addr
}
}
pub fn start_udp_server<B: Backend>(backend: B) {
let cache = Rc::new(RefCell::new(CapellaCache::default()));
let mut core = Core::new().unwrap();
let handle = core.handle();
let capella_addr = env::var("CAPELLA_LISTENER").unwrap();
let addr: SocketAddr = capella_addr.parse().unwrap();
let s = UdpSocket::bind(&addr, &handle).unwrap();
let (_, stream) = s.framed(StatsCodec).split();
let flush_duration = env::var("CAPELLA_FLUSH_DURATION").unwrap().parse::<u64>().unwrap();
let timer = Timer::default().interval(Duration::new(flush_duration, 0));
let future_t = timer.for_each(|()| {
backend.purge_metrics(&mut cache.borrow_mut());
trace!("flushing metrics");
Ok(())
}).map_err(|e| {
io::Error::new(io::ErrorKind::Other, e.description())
});
let events = stream.for_each(|(_, metrics)| {
if metrics.is_empty() {
trace!("no valid metrics were sent");
cache.borrow_mut().bad_metric_count_increase();
}
for m in &metrics {
cache.borrow_mut().add_metric(m);
}
Ok(())
});
let f = events.join(future_t);
drop(core.run(f));
}