1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use crate::{
    opcodes::*,
    types::{errors::SendError, AggregationOpcode, DateTime, DateTimeRange, Event, Query},
};
use std::{
    io::{BufWriter, Write},
    net::TcpStream,
};

type TcpWriter<'a> = BufWriter<&'a TcpStream>;
type WriterResult = Result<(), SendError>;

pub struct Writer<'a> {
    writer: TcpWriter<'a>,
}

impl Writer<'_> {
    pub fn new(stream: &TcpStream) -> Writer {
        Writer {
            writer: TcpWriter::new(stream),
        }
    }

    pub fn flush(&mut self) -> WriterResult {
        self.writer.flush()?;
        Ok(())
    }

    fn write_all(&mut self, buf: &[u8]) -> WriterResult {
        self.writer.write_all(buf)?;
        Ok(())
    }

    // API

    pub fn query(&mut self, query: Query) -> WriterResult {
        self.opcode(OP_QUERY)?;
        self.metric_id(query.metric_id)?;
        self.range(query.range)?;
        self.aggregation(query.aggregation)?;
        self.aggr_window(query.aggregation_window_secs)?;

        self.flush()?;

        Ok(())
    }

    pub fn event(&mut self, event: Event) -> WriterResult {
        self.opcode(OP_EVENT)?;
        self.metric_id(event.metric_id)?;
        self.metric_value(event.value)?;

        self.flush()?;

        Ok(())
    }

    // Generic

    pub fn u8(&mut self, value: u8) -> WriterResult {
        self.write_all(&[value])
    }

    pub fn f32(&mut self, value: f32) -> WriterResult {
        self.write_all(&value.to_le_bytes())
    }

    pub fn string(&mut self, string: String) -> WriterResult {
        let length = string.len().try_into().map_err(|_| SendError::Invalid)?;
        self.write_all(&[length])?;
        self.write_all(string.as_bytes())
    }

    // Wrappers

    pub fn opcode(&mut self, opcode: Opcode) -> WriterResult {
        self.u8(opcode)
    }

    pub fn metric_id(&mut self, metric_id: String) -> WriterResult {
        self.string(metric_id)
    }

    pub fn metric_value(&mut self, metric_value: f32) -> WriterResult {
        self.f32(metric_value)
    }

    pub fn range(&mut self, range: Option<DateTimeRange>) -> WriterResult {
        if let Some(range) = range {
            self.datetime(range.from)?;
            self.datetime(range.to)?;
        }
        Ok(())
    }

    pub fn datetime(&mut self, datetime: DateTime) -> WriterResult {
        self.write_all(datetime.to_rfc3339().as_bytes())
    }

    pub fn aggregation(&mut self, aggregation: AggregationOpcode) -> WriterResult {
        self.u8(aggregation as u8)
    }

    pub fn aggr_window(&mut self, aggr_window: Option<f32>) -> WriterResult {
        if let Some(aggr_window) = aggr_window {
            self.f32(aggr_window)?;
        }
        Ok(())
    }
}