1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use reqwest::{Client as HttpClient, Method, Url};
use std::error::Error;
use crate::{models::InfluxError, traits::PointSerialize};
#[derive(Clone)]
pub enum InsertOptions {
None,
WithTimestamp(Option<String>),
}
pub struct Client {
host: Url,
token: String,
client: HttpClient,
bucket: Option<String>,
org: Option<String>,
org_id: Option<String>,
}
impl Client {
pub fn new<T>(host: T, token: T) -> Client
where
T: Into<String>,
{
let host_url = reqwest::Url::parse(&host.into()[..]).unwrap();
Client {
host: host_url,
token: token.into(),
client: HttpClient::default(),
bucket: None,
org: None,
org_id: None,
}
}
pub fn with_bucket<T: Into<String>>(mut self, bucket: T) -> Self {
self.bucket = Some(bucket.into());
self
}
pub fn with_org<T: Into<String>>(mut self, org: T) -> Self {
self.org = Some(org.into());
self
}
pub fn with_org_id<T: Into<String>>(mut self, org_id: T) -> Self {
self.org_id = Some(org_id.into());
self
}
pub async fn insert_points(
self,
points: impl IntoIterator<Item = impl PointSerialize>,
options: InsertOptions,
) -> Result<(), InfluxError> {
let body = points
.into_iter()
.map(|p| {
format!(
"{}",
match options.clone() {
InsertOptions::WithTimestamp(t) => p.serialize_with_timestamp(t),
InsertOptions::None => p.serialize(),
}
)
})
.collect::<Vec<String>>()
.join("\n");
let result = self
.new_request(Method::POST, "/api/v2/write")
.body(body)
.send()
.await
.unwrap()
.error_for_status();
if let Err(err) = result {
let status = err.status().unwrap().as_u16();
return Err(Client::status_to_influxerror(status, Box::new(err)));
}
Ok(())
}
fn new_request(self, method: Method, path: &str) -> reqwest::RequestBuilder {
let mut query_params = Vec::<(&str, String)>::new();
if let Some(bucket) = self.bucket {
query_params.push(("bucket", bucket));
}
if let Some(org) = self.org {
query_params.push(("org", org));
} else if let Some(org_id) = self.org_id {
query_params.push(("orgID", org_id));
}
let mut url = self.host.clone();
url.set_path(path);
self.client
.request(method, url)
.header("Content-Type", "text/plain")
.header("Authorization", format!("{} {}", "Token", self.token))
.query(&query_params)
}
fn status_to_influxerror(status: u16, err: Box<dyn Error>) -> InfluxError {
match status {
400 => InfluxError::InvalidSyntax(err.to_string()),
401 => InfluxError::InvalidCredentials(err.to_string()),
403 => InfluxError::Forbidden(err.to_string()),
_ => InfluxError::Unknown(err.to_string()),
}
}
}