1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use crate::models::WriteDataPoint;
use crate::{Client, Http, RequestError, ReqwestProcessing};
use bytes::BufMut;
use futures::{Stream, StreamExt};
use reqwest::{Body, Method};
use snafu::ResultExt;
use std::io::{self, Write};
impl Client {
pub async fn write_line_protocol(
&self,
org: &str,
bucket: &str,
body: impl Into<Body> + Send,
) -> Result<(), RequestError> {
let body = body.into();
let write_url = format!("{}/api/v2/write", self.url);
let response = self
.request(Method::POST, &write_url)
.query(&[("bucket", bucket), ("org", org)])
.body(body)
.send()
.await
.context(ReqwestProcessing)?;
if !response.status().is_success() {
let status = response.status();
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?;
}
Ok(())
}
pub async fn write(
&self,
bucket: &str,
body: impl Stream<Item = impl WriteDataPoint> + Send + Sync + 'static,
) -> Result<(), RequestError> {
let mut buffer = bytes::BytesMut::new();
let body = body.map(move |point| {
let mut w = (&mut buffer).writer();
point.write_data_point_to(&mut w)?;
w.flush()?;
Ok::<_, io::Error>(buffer.split().freeze())
});
let body = Body::wrap_stream(body);
Ok(self.write_line_protocol(&self.org, bucket, body).await?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::DataPoint;
use futures::stream;
use mockito::mock;
#[tokio::test]
async fn writing_points() {
let org = "some-org";
let bucket = "some-bucket";
let token = "some-token";
let mock_server = mock(
"POST",
format!("/api/v2/write?bucket={}&org={}", bucket, org).as_str(),
)
.match_header("Authorization", format!("Token {}", token).as_str())
.match_body(
"\
cpu,host=server01 usage=0.5
cpu,host=server01,region=us-west usage=0.87
",
)
.create();
let client = Client::new(&mockito::server_url(), org, token);
let points = vec![
DataPoint::builder("cpu")
.tag("host", "server01")
.field("usage", 0.5)
.build()
.unwrap(),
DataPoint::builder("cpu")
.tag("host", "server01")
.tag("region", "us-west")
.field("usage", 0.87)
.build()
.unwrap(),
];
let _result = client.write(bucket, stream::iter(points)).await;
mock_server.assert();
}
}