influx_client/
lib.rs

1//! An unofficial library to read from and write to [InfluxDB](https://www.influxdata.com/) databases.
2//! This library supports the 2.x API.
3//! It is still very early in development, so features may be missing or not working.
4
5pub mod blocking;
6pub mod flux;
7use std::fmt::Display;
8
9use csv::StringRecord;
10pub use flux::{Precision, ReadQuery, WriteQuery};
11use futures::TryFutureExt;
12
13#[derive(Debug)]
14pub struct InfluxError {
15    pub msg: Option<String>,
16}
17
18/// Use a Client to connect to your influx database and execute queries.
19pub struct Client<'a> {
20    url: &'a str,
21    token: String,
22    reqwest_client: reqwest::Client,
23}
24
25impl<'a> Client<'a> {
26    /// Create a client with a given database URL and token.
27    pub fn new(url: &'a str, token: &'a str) -> Self {
28        Self {
29            url,
30            token: token.to_owned(),
31            reqwest_client: reqwest::Client::new(),
32        }
33    }
34
35    /// Create a client that reads its token from the
36    /// `INFLUXDB_TOKEN` environment variable
37    pub fn from_env(url: &'a str) -> Result<Self, InfluxError> {
38        Ok(Self {
39            url,
40            token: std::env::var("INFLUXDB_TOKEN").map_err(|e| InfluxError {
41                msg: Some(e.to_string()),
42            })?,
43            reqwest_client: reqwest::Client::new(),
44        })
45    }
46
47    /// Insert a new value into a bucket.
48    /// Note that not all attributes on `WriteQuery` are supported yet.
49    pub async fn insert<T: Display>(
50        &self,
51        bucket: &'a str,
52        org: &'a str,
53        precision: Precision,
54        query: WriteQuery<'a, T>,
55    ) -> Result<(), InfluxError> {
56        self.reqwest_client
57            .post(&format!(
58                "{}/api/v2/write?org={}&bucket={}&precision={}",
59                self.url, org, bucket, precision
60            ))
61            .header("Authorization", &format!("Token {}", self.token))
62            .body(query.to_string())
63            .send()
64            .map_err(|e| InfluxError {
65                msg: Some(e.to_string()),
66            })
67            .await?;
68        Ok(())
69    }
70
71    /// Retrieve a value from a bucket based on certain filters.
72    pub async fn get(&self, org: &'a str, query: ReadQuery<'a>) -> Result<String, InfluxError> {
73        self.get_raw(org, &query.to_string()).await
74    }
75
76    pub async fn get_csv(
77        &self,
78        org: &'a str,
79        query: ReadQuery<'a>,
80    ) -> Result<Vec<StringRecord>, InfluxError> {
81        let res = self.get(org, query).await?;
82        let reader = csv::ReaderBuilder::new().from_reader(res.as_bytes());
83        Ok(reader.into_records().map(|r| r.unwrap()).collect())
84    }
85
86    /// If you prefer to write your own `flux` queries, use this method.
87    /// As `flux` support is not complete yet, this is currently the only
88    /// way to use the full `flux` language.
89    pub async fn get_raw(&self, org: &'a str, query: &'a str) -> Result<String, InfluxError> {
90        self.reqwest_client
91            .post(&format!("{}/api/v2/query?org={}", self.url, org))
92            .header("Accept", "application/csv")
93            .header("Content-Type", "application/vnd.flux")
94            .header("Authorization", &format!("Token {}", self.token))
95            .body(query.to_owned())
96            .send()
97            .await
98            .map_err(|e| InfluxError {
99                msg: Some(e.to_string()),
100            })?
101            .text()
102            .await
103            .map_err(|e| InfluxError {
104                msg: Some(e.to_string()),
105            })
106    }
107}