use std::net::{ToSocketAddrs, UdpSocket};
use std::str;
use std::sync::mpsc::channel;
use std::thread;
use super::parse_metrics;
use super::super::super::collector::Collector;
pub struct StatsdUdpListener {
collector: Collector,
}
impl StatsdUdpListener {
pub fn new(collector: Collector) -> StatsdUdpListener {
StatsdUdpListener {
collector,
}
}
pub fn listen<A: ToSocketAddrs>(&self, addr: A) {
let (send, recv) = channel();
let socket = UdpSocket::bind(addr).unwrap();
thread::spawn(move || {
let mut buf = [0; 1500];
loop {
let (bytes_read, _) = match socket.recv_from(&mut buf) {
Ok(pair) => pair,
Err(_) => return,
};
let message: &str = match str::from_utf8(&buf[..bytes_read]) {
Ok(s) => s,
Err(_) => return,
};
send.send(message.to_owned()).unwrap();
}
});
for line in recv {
match parse_metrics(line.trim_right().as_bytes()) {
Ok(metrics) => {
let aggregated_metrics = metrics.into_iter()
.map(|metric| metric.into())
.collect();
self.collector.push(aggregated_metrics)
},
Err(_) => (),
}
}
} }