1pub mod blocking;
6pub mod flux;
7use std::fmt::Display;
8
9use csv::StringRecord;
10pub use flux::{Precision, ReadQuery, WriteQuery};
11use futures::TryFutureExt;
12
13#[derive(Debug)]
14pub struct InfluxError {
15 pub msg: Option<String>,
16}
17
18pub struct Client<'a> {
20 url: &'a str,
21 token: String,
22 reqwest_client: reqwest::Client,
23}
24
25impl<'a> Client<'a> {
26 pub fn new(url: &'a str, token: &'a str) -> Self {
28 Self {
29 url,
30 token: token.to_owned(),
31 reqwest_client: reqwest::Client::new(),
32 }
33 }
34
35 pub fn from_env(url: &'a str) -> Result<Self, InfluxError> {
38 Ok(Self {
39 url,
40 token: std::env::var("INFLUXDB_TOKEN").map_err(|e| InfluxError {
41 msg: Some(e.to_string()),
42 })?,
43 reqwest_client: reqwest::Client::new(),
44 })
45 }
46
47 pub async fn insert<T: Display>(
50 &self,
51 bucket: &'a str,
52 org: &'a str,
53 precision: Precision,
54 query: WriteQuery<'a, T>,
55 ) -> Result<(), InfluxError> {
56 self.reqwest_client
57 .post(&format!(
58 "{}/api/v2/write?org={}&bucket={}&precision={}",
59 self.url, org, bucket, precision
60 ))
61 .header("Authorization", &format!("Token {}", self.token))
62 .body(query.to_string())
63 .send()
64 .map_err(|e| InfluxError {
65 msg: Some(e.to_string()),
66 })
67 .await?;
68 Ok(())
69 }
70
71 pub async fn get(&self, org: &'a str, query: ReadQuery<'a>) -> Result<String, InfluxError> {
73 self.get_raw(org, &query.to_string()).await
74 }
75
76 pub async fn get_csv(
77 &self,
78 org: &'a str,
79 query: ReadQuery<'a>,
80 ) -> Result<Vec<StringRecord>, InfluxError> {
81 let res = self.get(org, query).await?;
82 let reader = csv::ReaderBuilder::new().from_reader(res.as_bytes());
83 Ok(reader.into_records().map(|r| r.unwrap()).collect())
84 }
85
86 pub async fn get_raw(&self, org: &'a str, query: &'a str) -> Result<String, InfluxError> {
90 self.reqwest_client
91 .post(&format!("{}/api/v2/query?org={}", self.url, org))
92 .header("Accept", "application/csv")
93 .header("Content-Type", "application/vnd.flux")
94 .header("Authorization", &format!("Token {}", self.token))
95 .body(query.to_owned())
96 .send()
97 .await
98 .map_err(|e| InfluxError {
99 msg: Some(e.to_string()),
100 })?
101 .text()
102 .await
103 .map_err(|e| InfluxError {
104 msg: Some(e.to_string()),
105 })
106 }
107}