1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use crate::{
opcodes::*,
types::{errors::SendError, AggregationOpcode, DateTime, DateTimeRange, Event, Query},
};
use std::{
io::{BufWriter, Write},
net::TcpStream,
};
type TcpWriter<'a> = BufWriter<&'a TcpStream>;
type WriterResult = Result<(), SendError>;
pub struct Writer<'a> {
writer: TcpWriter<'a>,
}
impl Writer<'_> {
pub fn new(stream: &TcpStream) -> Writer {
Writer {
writer: TcpWriter::new(stream),
}
}
pub fn flush(&mut self) -> WriterResult {
self.writer.flush()?;
Ok(())
}
fn write_all(&mut self, buf: &[u8]) -> WriterResult {
self.writer.write_all(buf)?;
Ok(())
}
pub fn query(&mut self, query: Query) -> WriterResult {
self.opcode(OP_QUERY)?;
self.metric_id(query.metric_id)?;
self.range(query.range)?;
self.aggregation(query.aggregation)?;
self.aggr_window(query.aggregation_window_secs)?;
self.flush()?;
Ok(())
}
pub fn event(&mut self, event: Event) -> WriterResult {
self.opcode(OP_EVENT)?;
self.metric_id(event.metric_id)?;
self.metric_value(event.value)?;
self.flush()?;
Ok(())
}
pub fn u8(&mut self, value: u8) -> WriterResult {
self.write_all(&[value])
}
pub fn f32(&mut self, value: f32) -> WriterResult {
self.write_all(&value.to_le_bytes())
}
pub fn string(&mut self, string: String) -> WriterResult {
let length = string.len().try_into().map_err(|_| SendError::Invalid)?;
self.write_all(&[length])?;
self.write_all(string.as_bytes())
}
pub fn opcode(&mut self, opcode: Opcode) -> WriterResult {
self.u8(opcode)
}
pub fn metric_id(&mut self, metric_id: String) -> WriterResult {
self.string(metric_id)
}
pub fn metric_value(&mut self, metric_value: f32) -> WriterResult {
self.f32(metric_value)
}
pub fn range(&mut self, range: Option<DateTimeRange>) -> WriterResult {
if let Some(range) = range {
self.datetime(range.from)?;
self.datetime(range.to)?;
}
Ok(())
}
pub fn datetime(&mut self, datetime: DateTime) -> WriterResult {
self.write_all(datetime.to_rfc3339().as_bytes())
}
pub fn aggregation(&mut self, aggregation: AggregationOpcode) -> WriterResult {
self.u8(aggregation as u8)
}
pub fn aggr_window(&mut self, aggr_window: Option<f32>) -> WriterResult {
if let Some(aggr_window) = aggr_window {
self.f32(aggr_window)?;
}
Ok(())
}
}