1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use influxdb::{Client, Error, Query, QueryTypes};
use reqwest::blocking as rb;
use reqwest::blocking::Client as ReqwestClient;
use reqwest::blocking::Response;
use reqwest::{StatusCode, Url};
pub struct SyncClient {
client: Client,
}
impl SyncClient {
pub fn new(client: Client) -> Self {
SyncClient { client }
}
pub fn ping(&self) -> Result<(String, String), Error> {
let res =
rb::get(format!("{}/ping", self.client.database_url()).as_str()).map_err(|err| {
Error::ProtocolError {
error: format!("{}", err),
}
})?;
let build = res
.headers()
.get("X-Influxdb-Build")
.unwrap()
.to_str()
.unwrap();
let version = res
.headers()
.get("X-Influxdb-Version")
.unwrap()
.to_str()
.unwrap();
Ok((build.to_owned(), version.to_owned()))
}
pub fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
where
Q: Query,
&'q Q: Into<QueryTypes<'q>>,
{
let query = q.build().map_err(|err| Error::InvalidQueryError {
error: format!("{}", err),
})?;
let basic_parameters: Vec<(String, String)> = self.client.to_owned().into();
let client = match q.into() {
QueryTypes::Read(_) => {
let read_query = query.get();
let mut url = Url::parse_with_params(
format!("{url}/query", url = self.client.database_url()).as_str(),
basic_parameters,
)
.map_err(|err| Error::UrlConstructionError {
error: format!("{}", err),
})?;
url.query_pairs_mut().append_pair("q", &read_query);
if read_query.contains("SELECT") || read_query.contains("SHOW") {
ReqwestClient::new().get(url)
} else {
ReqwestClient::new().post(url)
}
}
QueryTypes::Write(write_query) => {
let mut url = Url::parse_with_params(
format!("{url}/write", url = self.client.database_url()).as_str(),
basic_parameters,
)
.map_err(|err| Error::InvalidQueryError {
error: format!("{}", err),
})?;
url.query_pairs_mut()
.append_pair("precision", &write_query.get_precision());
ReqwestClient::new().post(url).body(query.get())
}
};
let res: Response = client
.send()
.map_err(|err| Error::ConnectionError { error: err })?;
match res.status() {
StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
_ => {}
}
let s = res.text().map_err(|_| Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})?;
if s.contains("\"error\"") {
return Err(Error::DatabaseError {
error: format!("influxdb error: \"{}\"", s),
});
}
Ok(s)
}
}