1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
pub mod flux;
use std::{fmt::Display, io::Read};
use csv::StringRecord;
pub use flux::{Precision, ReadQuery, WriteQuery};
#[derive(Debug)]
pub struct InfluxError {
pub msg: Option<String>,
}
pub struct Client<'a> {
url: &'a str,
token: String,
reqwest_client: reqwest::blocking::Client,
}
impl<'a> Client<'a> {
pub fn new(url: &'a str, token: &'a str) -> Self {
Self {
url,
token: token.to_owned(),
reqwest_client: reqwest::blocking::Client::new(),
}
}
pub fn from_env(url: &'a str) -> Result<Self, InfluxError> {
Ok(Self {
url,
token: std::env::var("INFLUXDB_TOKEN").map_err(|e| InfluxError {
msg: Some(e.to_string()),
})?,
reqwest_client: reqwest::blocking::Client::new(),
})
}
pub fn insert<T: Display>(
&self,
bucket: &'a str,
org: &'a str,
precision: Precision,
query: WriteQuery<T>,
) -> Result<(), InfluxError> {
self.reqwest_client
.post(&format!(
"{}/api/v2/write?org={}&bucket={}&precision={}",
self.url, org, bucket, precision
))
.header("Authorization", &format!("Token {}", self.token))
.body(format!(
"{} {}={}",
query.name, query.field_name, query.value
))
.send()
.map_err(|e| InfluxError {
msg: Some(e.to_string()),
})?;
Ok(())
}
pub fn get(&self, org: &'a str, query: ReadQuery) -> Result<String, InfluxError> {
self.get_raw(org, &format!("{}", query))
}
pub fn get_csv(
&self,
org: &'a str,
query: ReadQuery,
) -> Result<Vec<StringRecord>, InfluxError> {
let res = self.get(org, query)?;
let reader = csv::ReaderBuilder::new().from_reader(res.as_bytes());
Ok(reader.into_records().map(|r| r.unwrap()).collect())
}
pub fn get_raw(&self, org: &'a str, query: &'a str) -> Result<String, InfluxError> {
let mut buf = String::new();
self.reqwest_client
.post(&format!("{}/api/v2/query?org={}", self.url, org))
.header("Accept", "application/csv")
.header("Content-Type", "application/vnd.flux")
.header("Authorization", &format!("Token {}", self.token))
.body(query.to_owned())
.send()
.map_err(|e| InfluxError {
msg: Some(e.to_string()),
})?
.read_to_string(&mut buf)
.map_err(|e| InfluxError {
msg: Some(e.to_string()),
})?;
Ok(buf)
}
}