metrical 0.1.1

Graphite/Statsd metrics client
Documentation
use crate::metric::{MetricData, MetricType};

#[cfg(feature = "pickle")]
use serde_pickle::ser;

pub enum Protocol {
    StatsD,
    Graphite(Compression)
}

type PickleRecord = (String, (String, String));

impl Protocol {
    pub fn serialized_statsd_record(metric: MetricData) -> Vec<u8> {
        let path = metric.path();
        match metric.metric() {
            MetricType::Counter(count) => format!("{}:{}|c\n", path, count).into_bytes(),
            MetricType::Timer(elapsed) => format!("{}:{}|ms\n", path, elapsed).into_bytes(),
            MetricType::Gauge(gauge) => format!("{}:{}|g\n", path, gauge).into_bytes()
        }
    }

    pub fn serialize_statsd<I>(metrics: I) -> Vec<u8> where
    I: IntoIterator<Item=MetricData>
    {
        let mut data: Vec<u8> = metrics
            .into_iter()
            .map(Protocol::serialized_statsd_record)
            .flatten()
            .collect();
        data.pop();
        data
    }

    pub fn serialized_graphite_record(metric: MetricData) -> Vec<u8> {
        let occurred = metric.occurred();
        let path = metric.path();
        match metric.metric() {
            MetricType::Counter(count) => format!("{} {} {}", path, count, occurred).into_bytes(),
            MetricType::Timer(elapsed) => format!("{} {} {}", path, elapsed, occurred).into_bytes(),
            MetricType::Gauge(gauge) => format!("{} {} {}", path, gauge, occurred).into_bytes()
        }
    }

    pub fn serialize_graphite_uncompressed<I>(metrics: I) -> Vec<u8> where
        I: IntoIterator<Item=MetricData>
    {
        metrics
            .into_iter()
            .flat_map(Protocol::serialized_graphite_record)
            .collect()
    }

    pub fn pickle_tuple(metric: MetricData) -> PickleRecord {
        let occurred = metric.occurred().to_string();
        let path = metric.path();
        let value = match metric.metric() {
            MetricType::Counter(count) => count.to_string(),
            MetricType::Timer(elapsed) => elapsed.to_string(),
            MetricType::Gauge(gauge) => gauge.to_string()
        };

        return (path, (value, occurred))
    }

    #[cfg(feature = "pickle")]
    pub fn serialize_graphite_pickled<I>(metrics: I) -> Vec<u8> where
        I: IntoIterator<Item=MetricData>
    {
        use byteorder::{ByteOrder, BigEndian};

        let data: Vec<PickleRecord> = metrics.into_iter().map(
            Protocol::pickle_tuple
        ).collect();

        let data = ser::to_vec(&data, true).unwrap_or(Vec::new());

        let size = data.len() as u32;
        let mut buf = [0; 4];
        BigEndian::write_u32(&mut buf, size);
        buf.to_vec().into_iter().chain(data.into_iter()).collect::<Vec<_>>()
    }

    pub fn serialize_data<I>(&self, metrics: I) -> Vec<u8>
        where I: IntoIterator<Item=MetricData>
    {
        match self {
            Protocol::StatsD => Protocol::serialize_statsd(metrics),

            #[cfg(feature = "pickle")]
            Protocol::Graphite(Compression::Pickled) => Protocol::serialize_graphite_pickled(metrics),

            Protocol::Graphite(Compression::Uncompressed) =>
                Protocol::serialize_graphite_uncompressed(metrics)
        }
    }
}

impl Protocol {
    pub fn serialize<I>(&self, data: I) -> Vec<Vec<u8>> where I: IntoIterator<Item=MetricData> {
        let mut data = data.into_iter().peekable();

        let num_in_packet = match self {
            Protocol::StatsD => 14,
            Protocol::Graphite(Compression::Uncompressed) => 1,
            #[cfg(feature = "pickle")]
            Protocol::Graphite(Compression::Pickled) => 14
        };

        let mut packet_bodies = Vec::new();
        while data.peek().is_some() {
            let chunk: Vec<_> = data.by_ref().take(num_in_packet).collect();

            packet_bodies.push(self.serialize_data(chunk));
        }

        packet_bodies
    }
}

pub enum NetworkProtocol {
    UDP,
    TCP
}

pub enum Compression {
    #[cfg(feature = "pickle")]
    Pickled,

    Uncompressed,
}

#[cfg(test)]
mod test {
    mod uncompressed {
        use crate::metric::{metric_test_data};
        use crate::protocol::{Protocol, Compression};

        #[test]
        pub fn it_should_handle_one_metric_per_entry() {
            let data = metric_test_data().to_vec();
            let protocol = Protocol::Graphite(Compression::Uncompressed);
            let result: Vec<Vec<u8>> = protocol.serialize(data);

            assert_eq!(result.len(), 4);

            let payload1 = result[0].clone();
            assert_eq!(payload1, b"test.HelloTimer 1005 1".to_vec());

            let payload2 = result[1].clone();
            assert_eq!(payload2, b"test.HelloCounter 12 2".to_vec());

            let payload3 = result[2].clone();
            assert_eq!(payload3, b"test.HelloGauge +13 3".to_vec());

            let payload4 = result[3].clone();
            assert_eq!(payload4, b"test.HelloGauge -2 4".to_vec());
        }
    }

    mod compressed {
        #[test]
        #[cfg(feature = "pickle")]
        pub fn it_should_pickle_properly() -> Result<(), Box<dyn std::error::Error>> {
            use crate::metric::metric_test_data;
            use crate::protocol::{Protocol, Compression};
            let data = metric_test_data().to_vec();

            let protocol = Protocol::Graphite(Compression::Pickled);

            let pickle_data: Vec<Vec<u8>> = protocol.serialize(data);
            let data_point = &pickle_data[0][4..];

            assert_eq!(pickle_data.len(), 1);
            assert_eq!(data_point.len(), 159);

            Ok(())
        }
    }

    mod statsd {
        use crate::metric::metric_test_data;
        use crate::protocol::Protocol;

        #[test]
        pub fn it_should_construct_properly() {
            let data = metric_test_data().to_vec();
            let protocol = Protocol::StatsD;

            let data = protocol.serialize(data);

            assert_eq!(data.len(), 1);
            assert_eq!(data[0].to_vec(), b"test.HelloTimer:1005|ms\ntest.HelloCounter:12|c\ntest.HelloGauge:+13|g\ntest.HelloGauge:-2|g".to_vec());
        }
    }
}