clia_influxdb2/
lib.rs

1#![recursion_limit = "1024"]
2#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
3#![warn(
4    missing_copy_implementations,
5    missing_debug_implementations,
6    missing_docs,
7    clippy::explicit_iter_loop,
8    clippy::use_self,
9    clippy::clone_on_ref_ptr,
10    clippy::future_not_send
11)]
12
13//! # influxdb2
14//!
15//! This is a Rust client to InfluxDB using the [2.0 API][2api].
16//!
17//! [2api]: https://v2.docs.influxdata.com/v2.0/reference/api/
18//!
19//! This project is a fork from the
20//! https://github.com/influxdata/influxdb_iox/tree/main/influxdb2_client project.
21//! At the time of this writing, the query functionality of the influxdb2 client
22//! from the official repository isn't working. So, I created this client to use
23//! it in my project.
24//!
25//! ## Usage
26//!
27//! ### Querying
28//!
29//! ```rust
30//! use chrono::{DateTime, FixedOffset};
31//! use influxdb2::{Client, FromDataPoint};
32//! use influxdb2::models::Query;
33//!
34//! #[derive(Debug, FromDataPoint)]
35//! pub struct StockPrice {
36//!     ticker: String,
37//!     value: f64,
38//!     time: DateTime<FixedOffset>,
39//! }
40//!
41//! impl Default for StockPrice {
42//!     fn default() -> Self {
43//!         Self {
44//!             ticker: "".to_string(),
45//!             value: 0_f64,
46//!             time: chrono::MIN_DATETIME.with_timezone(&chrono::FixedOffset::east(7 * 3600)),
47//!         }
48//!     }
49//! }
50//!
51//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//!     let host = std::env::var("INFLUXDB_HOST").unwrap();
53//!     let org = std::env::var("INFLUXDB_ORG").unwrap();
54//!     let token = std::env::var("INFLUXDB_TOKEN").unwrap();
55//!     let client = Client::new(host, org, token);
56//!
57//!     let qs = format!("from(bucket: \"stock-prices\")
58//!         |> range(start: -1w)
59//!         |> filter(fn: (r) => r.ticker == \"{}\")
60//!         |> last()
61//!     ", "AAPL");
62//!     let query = Query::new(qs.to_string());
63//!     let res: Vec<StockPrice> = client.query::<StockPrice>(Some(query))
64//!         .await?;
65//!     println!("{:?}", res);
66//!
67//!     Ok(())
68//! }
69//! ```
70//!
71//! ### Writing
72//!
73//! ```rust
74//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
75//!     use futures::prelude::*;
76//!     use influxdb2::models::DataPoint;
77//!     use influxdb2::Client;
78//!
79//!     let host = std::env::var("INFLUXDB_HOST").unwrap();
80//!     let org = std::env::var("INFLUXDB_ORG").unwrap();
81//!     let token = std::env::var("INFLUXDB_TOKEN").unwrap();
82//!     let bucket = "bucket";
83//!     let client = Client::new(host, org, token);
84//!     
85//!     let points = vec![
86//!         DataPoint::builder("cpu")
87//!             .tag("host", "server01")
88//!             .field("usage", 0.5)
89//!             .build()?,
90//!         DataPoint::builder("cpu")
91//!             .tag("host", "server01")
92//!             .tag("region", "us-west")
93//!             .field("usage", 0.87)
94//!             .build()?,
95//!     ];
96//!                                                             
97//!     client.write(bucket, stream::iter(points)).await?;
98//!     
99//!     Ok(())
100//! }
101//! ```
102
103use reqwest::{Method, Url};
104use secrecy::{ExposeSecret, Secret};
105use snafu::{ResultExt, Snafu};
106
107/// Errors that occur while making requests to the Influx server.
108#[derive(Debug, Snafu)]
109pub enum RequestError {
110    /// While making a request to the Influx server, the underlying `reqwest`
111    /// library returned an error that was not an HTTP 400 or 500.
112    #[snafu(display("Error while processing the HTTP request: {}", source))]
113    ReqwestProcessing {
114        /// The underlying error object from `reqwest`.
115        source: reqwest::Error,
116    },
117    /// The underlying `reqwest` library returned an HTTP error with code 400
118    /// (meaning a client error) or 500 (meaning a server error).
119    #[snafu(display("HTTP request returned an error: {}, `{}`", status, text))]
120    Http {
121        /// The `StatusCode` returned from the request
122        status: reqwest::StatusCode,
123        /// Any text data returned from the request
124        text: String,
125    },
126
127    /// While serializing data as JSON to send in a request, the underlying
128    /// `serde_json` library returned an error.
129    #[snafu(display("Error while serializing to JSON: {}", source))]
130    Serializing {
131        /// The underlying error object from `serde_json`.
132        source: serde_json::error::Error,
133    },
134
135    /// While deserializing response from the Influx server, the underlying
136    /// parsing library returned an error.
137    #[snafu(display("Error while parsing response: {}", text))]
138    Deserializing {
139        /// Error description.
140        text: String,
141    },
142}
143
144#[cfg(feature = "gzip")]
145#[derive(Debug, Clone)]
146enum Compression {
147    None,
148    Gzip,
149}
150
151/// Client to a server supporting the InfluxData 2.0 API.
152#[derive(Debug, Clone)]
153pub struct Client {
154    /// The base URL this client sends requests to
155    pub base: Url,
156    /// The organization tied to this client
157    pub org: String,
158    auth_header: Option<Secret<String>>,
159    reqwest: reqwest::Client,
160    #[cfg(feature = "gzip")]
161    compression: Compression,
162}
163
164impl Client {
165    /// Create a new client pointing to the URL specified in
166    /// `protocol://server:port` format and using the specified token for
167    /// authorization.
168    ///
169    /// # Example
170    ///
171    /// ```
172    /// let client = influxdb2::Client::new("http://localhost:8888", "org", "my-token");
173    /// ```
174    pub fn new(
175        url: impl Into<String>,
176        org: impl Into<String>,
177        auth_token: impl Into<String>,
178    ) -> Self {
179        // unwrap is used to maintain backwards compatibility
180        // panicking was present earlier as well inside of reqwest
181        ClientBuilder::new(url, org, auth_token).build().unwrap()
182    }
183
184    /// Consolidate common request building code
185    fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
186        let mut req = self.reqwest.request(method, url);
187
188        if let Some(auth) = &self.auth_header {
189            req = req.header("Authorization", auth.expose_secret());
190        }
191
192        req
193    }
194
195    /// Join base Url of the client to target API endpoint into valid Url
196    fn url(&self, endpoint: &str) -> String {
197        let mut url = self.base.clone();
198        url.set_path(endpoint);
199        url.into()
200    }
201}
202
203/// Errors that occur when building the client
204#[derive(Debug, Snafu)]
205pub enum BuildError {
206    /// While constructing the reqwest client an error occurred
207    #[snafu(display("Error while building the client: {}", source))]
208    ReqwestClientError {
209        /// Reqwest internal error
210        source: reqwest::Error,
211    },
212}
213/// ClientBuilder builds the `Client`
214#[derive(Debug)]
215pub struct ClientBuilder {
216    /// The base URL this client sends requests to
217    pub base: Url,
218    /// The organization tied to this client
219    pub org: String,
220    auth_header: Option<Secret<String>>,
221    reqwest: reqwest::ClientBuilder,
222    #[cfg(feature = "gzip")]
223    compression: Compression,
224}
225
226impl ClientBuilder {
227    /// Construct a new `ClientBuilder`.
228    pub fn new(
229        url: impl Into<String>,
230        org: impl Into<String>,
231        auth_token: impl Into<String>,
232    ) -> Self {
233        Self::with_builder(reqwest::ClientBuilder::new(), url, org, auth_token)
234    }
235
236    /// Construct a new `ClientBuilder` with a provided [`reqwest::ClientBuilder`].
237    ///
238    /// Can be used to pass custom `reqwest` parameters, such as TLS configuration.
239    pub fn with_builder(
240        builder: reqwest::ClientBuilder,
241        url: impl Into<String>,
242        org: impl Into<String>,
243        auth_token: impl Into<String>,
244    ) -> Self {
245        let token = auth_token.into();
246        let auth_header = if token.is_empty() {
247            None
248        } else {
249            Some(format!("Token {}", token).into())
250        };
251
252        let url: String = url.into();
253        let base =
254            Url::parse(&url).unwrap_or_else(|_| panic!("Invalid url was provided: {}", &url));
255
256        Self {
257            base,
258            org: org.into(),
259            auth_header,
260            reqwest: builder,
261            #[cfg(feature = "gzip")]
262            compression: Compression::None,
263        }
264    }
265
266    /// Enable gzip compression on the write and write_with_precision calls
267    #[cfg(feature = "gzip")]
268    pub fn gzip(mut self, enable: bool) -> ClientBuilder {
269        self.reqwest = self.reqwest.gzip(enable);
270        self.compression = Compression::Gzip;
271        self
272    }
273
274    /// Build returns the influx client
275    pub fn build(self) -> Result<Client, BuildError> {
276        Ok(Client {
277            base: self.base,
278            org: self.org,
279            auth_header: self.auth_header,
280            reqwest: self.reqwest.build().context(ReqwestClientError)?,
281            #[cfg(feature = "gzip")]
282            compression: self.compression,
283        })
284    }
285}
286
287pub mod common;
288
289pub mod api;
290pub mod models;
291pub mod writable;
292
293// Re-exports
294pub use influxdb2_derive::FromDataPoint;
295pub use influxdb2_structmap::FromMap;
296
297#[cfg(test)]
298mod tests {
299    use crate::Client;
300
301    #[test]
302    fn url_invalid_panic() {
303        let result = std::panic::catch_unwind(|| Client::new("/3242/23", "some-org", "some-token"));
304        assert!(result.is_err());
305    }
306
307    #[test]
308    /// Reproduction of https://github.com/aprimadi/influxdb2/issues/6
309    fn url_ignores_double_slashes() {
310        let base = "http://influxdb.com/";
311        let client = Client::new(base, "some-org", "some-token");
312
313        assert_eq!(format!("{}api/v2/write", base), client.url("/api/v2/write"));
314
315        assert_eq!(client.url("/api/v2/write"), client.url("api/v2/write"));
316    }
317}