clia_influxdb2/lib.rs
1#![recursion_limit = "1024"]
2#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
3#![warn(
4 missing_copy_implementations,
5 missing_debug_implementations,
6 missing_docs,
7 clippy::explicit_iter_loop,
8 clippy::use_self,
9 clippy::clone_on_ref_ptr,
10 clippy::future_not_send
11)]
12
13//! # influxdb2
14//!
15//! This is a Rust client to InfluxDB using the [2.0 API][2api].
16//!
17//! [2api]: https://v2.docs.influxdata.com/v2.0/reference/api/
18//!
19//! This project is a fork from the
20//! https://github.com/influxdata/influxdb_iox/tree/main/influxdb2_client project.
21//! At the time of this writing, the query functionality of the influxdb2 client
22//! from the official repository isn't working. So, I created this client to use
23//! it in my project.
24//!
25//! ## Usage
26//!
27//! ### Querying
28//!
29//! ```rust
30//! use chrono::{DateTime, FixedOffset};
31//! use influxdb2::{Client, FromDataPoint};
32//! use influxdb2::models::Query;
33//!
34//! #[derive(Debug, FromDataPoint)]
35//! pub struct StockPrice {
36//! ticker: String,
37//! value: f64,
38//! time: DateTime<FixedOffset>,
39//! }
40//!
41//! impl Default for StockPrice {
42//! fn default() -> Self {
43//! Self {
44//! ticker: "".to_string(),
45//! value: 0_f64,
46//! time: chrono::MIN_DATETIME.with_timezone(&chrono::FixedOffset::east(7 * 3600)),
47//! }
48//! }
49//! }
50//!
51//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//! let host = std::env::var("INFLUXDB_HOST").unwrap();
53//! let org = std::env::var("INFLUXDB_ORG").unwrap();
54//! let token = std::env::var("INFLUXDB_TOKEN").unwrap();
55//! let client = Client::new(host, org, token);
56//!
57//! let qs = format!("from(bucket: \"stock-prices\")
58//! |> range(start: -1w)
59//! |> filter(fn: (r) => r.ticker == \"{}\")
60//! |> last()
61//! ", "AAPL");
62//! let query = Query::new(qs.to_string());
63//! let res: Vec<StockPrice> = client.query::<StockPrice>(Some(query))
64//! .await?;
65//! println!("{:?}", res);
66//!
67//! Ok(())
68//! }
69//! ```
70//!
71//! ### Writing
72//!
73//! ```rust
74//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
75//! use futures::prelude::*;
76//! use influxdb2::models::DataPoint;
77//! use influxdb2::Client;
78//!
79//! let host = std::env::var("INFLUXDB_HOST").unwrap();
80//! let org = std::env::var("INFLUXDB_ORG").unwrap();
81//! let token = std::env::var("INFLUXDB_TOKEN").unwrap();
82//! let bucket = "bucket";
83//! let client = Client::new(host, org, token);
84//!
85//! let points = vec![
86//! DataPoint::builder("cpu")
87//! .tag("host", "server01")
88//! .field("usage", 0.5)
89//! .build()?,
90//! DataPoint::builder("cpu")
91//! .tag("host", "server01")
92//! .tag("region", "us-west")
93//! .field("usage", 0.87)
94//! .build()?,
95//! ];
96//!
97//! client.write(bucket, stream::iter(points)).await?;
98//!
99//! Ok(())
100//! }
101//! ```
102
103use reqwest::{Method, Url};
104use secrecy::{ExposeSecret, Secret};
105use snafu::{ResultExt, Snafu};
106
107/// Errors that occur while making requests to the Influx server.
108#[derive(Debug, Snafu)]
109pub enum RequestError {
110 /// While making a request to the Influx server, the underlying `reqwest`
111 /// library returned an error that was not an HTTP 400 or 500.
112 #[snafu(display("Error while processing the HTTP request: {}", source))]
113 ReqwestProcessing {
114 /// The underlying error object from `reqwest`.
115 source: reqwest::Error,
116 },
117 /// The underlying `reqwest` library returned an HTTP error with code 400
118 /// (meaning a client error) or 500 (meaning a server error).
119 #[snafu(display("HTTP request returned an error: {}, `{}`", status, text))]
120 Http {
121 /// The `StatusCode` returned from the request
122 status: reqwest::StatusCode,
123 /// Any text data returned from the request
124 text: String,
125 },
126
127 /// While serializing data as JSON to send in a request, the underlying
128 /// `serde_json` library returned an error.
129 #[snafu(display("Error while serializing to JSON: {}", source))]
130 Serializing {
131 /// The underlying error object from `serde_json`.
132 source: serde_json::error::Error,
133 },
134
135 /// While deserializing response from the Influx server, the underlying
136 /// parsing library returned an error.
137 #[snafu(display("Error while parsing response: {}", text))]
138 Deserializing {
139 /// Error description.
140 text: String,
141 },
142}
143
144#[cfg(feature = "gzip")]
145#[derive(Debug, Clone)]
146enum Compression {
147 None,
148 Gzip,
149}
150
151/// Client to a server supporting the InfluxData 2.0 API.
152#[derive(Debug, Clone)]
153pub struct Client {
154 /// The base URL this client sends requests to
155 pub base: Url,
156 /// The organization tied to this client
157 pub org: String,
158 auth_header: Option<Secret<String>>,
159 reqwest: reqwest::Client,
160 #[cfg(feature = "gzip")]
161 compression: Compression,
162}
163
164impl Client {
165 /// Create a new client pointing to the URL specified in
166 /// `protocol://server:port` format and using the specified token for
167 /// authorization.
168 ///
169 /// # Example
170 ///
171 /// ```
172 /// let client = influxdb2::Client::new("http://localhost:8888", "org", "my-token");
173 /// ```
174 pub fn new(
175 url: impl Into<String>,
176 org: impl Into<String>,
177 auth_token: impl Into<String>,
178 ) -> Self {
179 // unwrap is used to maintain backwards compatibility
180 // panicking was present earlier as well inside of reqwest
181 ClientBuilder::new(url, org, auth_token).build().unwrap()
182 }
183
184 /// Consolidate common request building code
185 fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
186 let mut req = self.reqwest.request(method, url);
187
188 if let Some(auth) = &self.auth_header {
189 req = req.header("Authorization", auth.expose_secret());
190 }
191
192 req
193 }
194
195 /// Join base Url of the client to target API endpoint into valid Url
196 fn url(&self, endpoint: &str) -> String {
197 let mut url = self.base.clone();
198 url.set_path(endpoint);
199 url.into()
200 }
201}
202
203/// Errors that occur when building the client
204#[derive(Debug, Snafu)]
205pub enum BuildError {
206 /// While constructing the reqwest client an error occurred
207 #[snafu(display("Error while building the client: {}", source))]
208 ReqwestClientError {
209 /// Reqwest internal error
210 source: reqwest::Error,
211 },
212}
213/// ClientBuilder builds the `Client`
214#[derive(Debug)]
215pub struct ClientBuilder {
216 /// The base URL this client sends requests to
217 pub base: Url,
218 /// The organization tied to this client
219 pub org: String,
220 auth_header: Option<Secret<String>>,
221 reqwest: reqwest::ClientBuilder,
222 #[cfg(feature = "gzip")]
223 compression: Compression,
224}
225
226impl ClientBuilder {
227 /// Construct a new `ClientBuilder`.
228 pub fn new(
229 url: impl Into<String>,
230 org: impl Into<String>,
231 auth_token: impl Into<String>,
232 ) -> Self {
233 Self::with_builder(reqwest::ClientBuilder::new(), url, org, auth_token)
234 }
235
236 /// Construct a new `ClientBuilder` with a provided [`reqwest::ClientBuilder`].
237 ///
238 /// Can be used to pass custom `reqwest` parameters, such as TLS configuration.
239 pub fn with_builder(
240 builder: reqwest::ClientBuilder,
241 url: impl Into<String>,
242 org: impl Into<String>,
243 auth_token: impl Into<String>,
244 ) -> Self {
245 let token = auth_token.into();
246 let auth_header = if token.is_empty() {
247 None
248 } else {
249 Some(format!("Token {}", token).into())
250 };
251
252 let url: String = url.into();
253 let base =
254 Url::parse(&url).unwrap_or_else(|_| panic!("Invalid url was provided: {}", &url));
255
256 Self {
257 base,
258 org: org.into(),
259 auth_header,
260 reqwest: builder,
261 #[cfg(feature = "gzip")]
262 compression: Compression::None,
263 }
264 }
265
266 /// Enable gzip compression on the write and write_with_precision calls
267 #[cfg(feature = "gzip")]
268 pub fn gzip(mut self, enable: bool) -> ClientBuilder {
269 self.reqwest = self.reqwest.gzip(enable);
270 self.compression = Compression::Gzip;
271 self
272 }
273
274 /// Build returns the influx client
275 pub fn build(self) -> Result<Client, BuildError> {
276 Ok(Client {
277 base: self.base,
278 org: self.org,
279 auth_header: self.auth_header,
280 reqwest: self.reqwest.build().context(ReqwestClientError)?,
281 #[cfg(feature = "gzip")]
282 compression: self.compression,
283 })
284 }
285}
286
287pub mod common;
288
289pub mod api;
290pub mod models;
291pub mod writable;
292
293// Re-exports
294pub use influxdb2_derive::FromDataPoint;
295pub use influxdb2_structmap::FromMap;
296
297#[cfg(test)]
298mod tests {
299 use crate::Client;
300
301 #[test]
302 fn url_invalid_panic() {
303 let result = std::panic::catch_unwind(|| Client::new("/3242/23", "some-org", "some-token"));
304 assert!(result.is_err());
305 }
306
307 #[test]
308 /// Reproduction of https://github.com/aprimadi/influxdb2/issues/6
309 fn url_ignores_double_slashes() {
310 let base = "http://influxdb.com/";
311 let client = Client::new(base, "some-org", "some-token");
312
313 assert_eq!(format!("{}api/v2/write", base), client.url("/api/v2/write"));
314
315 assert_eq!(client.url("/api/v2/write"), client.url("api/v2/write"));
316 }
317}