rinfluxdb_influxql/client/async.rs
1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use std::collections::HashMap;
8use std::convert::TryFrom;
9
10use tracing::*;
11
12use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
13use reqwest::Client as ReqwestClient;
14use reqwest::ClientBuilder as ReqwestClientBuilder;
15use reqwest::RequestBuilder as ReqwestRequestBuilder;
16use reqwest::Response as ReqwestResponse;
17
18use url::Url;
19
20use chrono::{DateTime, Utc};
21
22use async_trait::async_trait;
23
24use rinfluxdb_types::Value;
25
26use super::ClientError;
27
28use super::super::query::Query;
29use super::super::response::{from_str, ResponseError};
30use super::super::StatementResult;
31
32/// A client for performing frequent InfluxQL queries in a convenient way
33///
34/// ```.no_run
35/// use std::collections::HashMap;
36/// use url::Url;
37/// use rinfluxdb_influxql::QueryBuilder;
38/// use rinfluxdb_influxql::r#async::Client;
39/// use rinfluxdb_dataframe::DataFrame;
40///
41/// async_std::task::block_on(async {
42/// let client = Client::new(
43/// Url::parse("https://example.com/")?,
44/// Some(("username", "password")),
45/// )?;
46///
47/// let query = QueryBuilder::from("indoor_environment")
48/// .database("house")
49/// .field("temperature")
50/// .field("humidity")
51/// .build();
52/// let dataframe: DataFrame = client.fetch_dataframe(query).await?;
53/// println!("{}", dataframe);
54///
55/// let query = QueryBuilder::from("indoor_environment")
56/// .database("house")
57/// .field("temperature")
58/// .field("humidity")
59/// .group_by("room")
60/// .build();
61/// let tagged_dataframes: HashMap<String, DataFrame> = client.fetch_dataframes_by_tag(query, "room").await?;
62/// for (tag, dataframe) in tagged_dataframes {
63/// println!("{}: {}", tag, dataframe);
64/// }
65/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
66/// # })?;
67/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
68/// ```
69#[derive(Debug)]
70pub struct Client {
71 client: ReqwestClient,
72 base_url: Url,
73 credentials: Option<(String, String)>,
74}
75
76impl Client {
77 /// Create a new client to an InfluxDB server
78 ///
79 /// Parameter `credentials` can be used to provide username and password if
80 /// the server requires authentication.
81 pub fn new<T, S>(
82 base_url: Url,
83 credentials: Option<(T, S)>,
84 ) -> Result<Self, ClientError>
85 where
86 T: Into<String>,
87 S: Into<String>,
88 {
89 let mut headers = HeaderMap::new();
90 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
91
92 let client = ReqwestClientBuilder::new()
93 .default_headers(headers)
94 .build()?;
95
96 let credentials = credentials
97 .map(|(username, password)| (username.into(), password.into()));
98
99 Ok(Self {
100 client,
101 base_url,
102 credentials,
103 })
104 }
105
106 /// Query the server for a single dataframe
107 ///
108 /// This function assumes a single statement is returned, and that such
109 /// statement contains a single dataframe. Everything else is ignored.
110 ///
111 /// [`ClientError::EmptyError`](ClientError::EmptyError) is returned if the
112 /// response does not contain
113 /// dataframes.
114 #[instrument(
115 name = "Fetching dataframe",
116 skip(self),
117 )]
118 pub async fn fetch_dataframe<DF, E>(
119 &self,
120 query: Query,
121 ) -> Result<DF, ClientError>
122 where
123 DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
124 E: Into<ResponseError>,
125 {
126 let statement_results = self.fetch_readings_from_database(query, None::<String>).await?;
127 let statement_result = statement_results
128 .into_iter()
129 .next()
130 .ok_or(ClientError::EmptyError)?;
131 let dataframes = statement_result?;
132 let (dataframe, _tags) = dataframes
133 .into_iter()
134 .next()
135 .ok_or(ClientError::EmptyError)?;
136 Ok(dataframe)
137 }
138
139 /// Query the server for dataframes grouped by a single tag
140 ///
141 /// This function assumes a single statement is returned, and that such
142 /// statement contains multiple dataframe with the specified tag.
143 /// Everything else is ignored.
144 ///
145 /// [`ClientError::EmptyError`](ClientError::EmptyError) is returned if the
146 /// response does not contain dataframes.
147 /// [`ClientError::ExpectedTagsError`](ClientError::ExpectedTagsError) is
148 /// returned if the response does not contain tagged dataframes.
149 /// [`ClientError::ExpectedTagError`](ClientError::ExpectedTagError) is
150 /// returned if the response contains tagged dataframes, but the specified
151 /// tag is missing.
152 #[instrument(
153 name = "Fetching dataframe by tag",
154 skip(self),
155 )]
156 pub async fn fetch_dataframes_by_tag<DF, E>(
157 &self,
158 query: Query,
159 tag: &str,
160 ) -> Result<HashMap<String, DF>, ClientError>
161 where
162 DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
163 E: Into<ResponseError>,
164 {
165 let statement_results = self.fetch_readings_from_database(query, None::<String>).await?;
166 let statement_result = statement_results
167 .into_iter()
168 .next()
169 .ok_or(ClientError::EmptyError)?;
170 let dataframes = statement_result?;
171 dataframes
172 .into_iter()
173 .map(|(dataframe, tags)| {
174 let tags = tags.ok_or(ClientError::ExpectedTagsError)?;
175 let tag_value = tags
176 .get(tag)
177 .ok_or_else(|| ClientError::ExpectedTagError(tag.to_owned()))?;
178 Ok((tag_value.to_owned(), dataframe))
179 })
180 .collect()
181 }
182
183 pub async fn fetch_readings<DF, E>(
184 &self,
185 query: Query,
186 ) -> Result<Vec<StatementResult<DF>>, ClientError>
187 where
188 DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
189 E: Into<ResponseError>,
190 {
191 self.fetch_readings_from_database(query, None::<String>).await
192 }
193
194 pub async fn fetch_readings_from_database<DF, E, T>(
195 &self,
196 query: Query,
197 database: Option<T>,
198 ) -> Result<Vec<StatementResult<DF>>, ClientError>
199 where
200 DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
201 E: Into<ResponseError>,
202 T: Into<String>,
203 {
204 let mut influxql_request = self.client
205 .influxql(&self.base_url)?
206 .query(query);
207 if let Some(database) = database {
208 influxql_request = influxql_request.database(database);
209 }
210 let mut request = influxql_request.into_reqwest_builder();
211
212 if let Some((username, password)) = &self.credentials {
213 request = request.basic_auth(username, Some(password));
214 }
215
216 let request = request.build()?;
217
218 debug!("Sending request to {}", self.base_url);
219 trace!("Request: {:?}", request);
220
221 let response = self.client.execute(request).await?;
222
223 let response = response.error_for_status()?;
224
225 type TaggedDataFrames<DF> = Vec<(DF, Option<HashMap<String, String>>)>;
226 let results: Vec<Result<TaggedDataFrames<DF>, ResponseError>> = response.dataframes().await?;
227 debug!("Fetched {} statement results", results.len());
228
229 Ok(results)
230 }
231}
232
233/// A trait to obtain a prepared InfluxQL request builder from [Reqwest clients](reqwest::Client).
234///
235/// This trait is used to attach an `influxql()` function to [`reqwest::Client`](reqwest::Client).
236///
237/// ```no_run
238/// # use url::Url;
239/// # use rinfluxdb_influxql::Query;
240/// // Bring into scope the trait implementation
241/// use rinfluxdb_influxql::r#async::InfluxqlClientWrapper;
242///
243/// async_std::task::block_on(async {
244/// // Create Reqwest client
245/// let client = reqwest::Client::new();
246///
247/// // Create InfluxQL request
248/// let base_url = Url::parse("https://example.com")?;
249/// let mut builder = client
250/// // (this is a function added by the trait above)
251/// .influxql(&base_url)?
252/// // (this functions are defined on influxql::RequestBuilder)
253/// .database("house")
254/// .query(Query::new("SELECT temperature FROM indoor_temperature"))
255/// // (this function returns a regular Reqwest builder)
256/// .into_reqwest_builder();
257///
258/// // Now this is a regular Reqwest builder, and can be customized as usual
259/// if let Some((username, password)) = Some(("username", "password")) {
260/// builder = builder.basic_auth(username, Some(password));
261/// }
262///
263/// // Create a request from the builder
264/// let request = builder.build()?;
265///
266/// // Execute the request through Reqwest and obtain a response
267/// let response = client.execute(request).await?;
268///
269/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
270/// # })?;
271/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
272/// ```
273pub trait InfluxqlClientWrapper {
274 /// Create an InfluxQL request builder
275 ///
276 /// The request will point to the InfluxDB instance available at
277 /// `base_url`.
278 /// In particular, it will send a POST request to `base_url + "/query"`.
279 fn influxql(&self, base_url: &Url) -> Result<RequestBuilder, ClientError>;
280}
281
282impl InfluxqlClientWrapper for ReqwestClient {
283 fn influxql(&self, base_url: &Url) -> Result<RequestBuilder, ClientError> {
284 let url = base_url.join("/query")?;
285
286 let mut headers = HeaderMap::new();
287 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
288
289 let builder = self
290 .post(url)
291 .headers(headers);
292
293 Ok(RequestBuilder::new(builder))
294 }
295}
296
297/// An extension of [`reqwest::RequestBuilder`](reqwest::RequestBuilder)
298/// to build requests to InfluxDB using InfluxQL
299///
300/// See traits [`InfluxqlClientWrapper`](InfluxqlClientWrapper) and
301/// [`InfluxqlResponseWrapper`](InfluxqlResponseWrapper) for an example.
302#[derive(Debug)]
303pub struct RequestBuilder {
304 builder: ReqwestRequestBuilder,
305 database: Option<String>,
306 query: Option<Query>,
307}
308
309impl RequestBuilder {
310 fn new(builder: ReqwestRequestBuilder) -> Self {
311 Self {
312 builder,
313 database: None,
314 query: None,
315 }
316 }
317
318 /// Set a database for the request
319 pub fn database<T>(mut self, database: T) -> Self
320 where
321 T: Into<String>,
322 {
323 self.database = Some(database.into());
324 self
325 }
326
327 /// Set the query for the request
328 pub fn query(mut self, query: Query) -> Self {
329 self.query = Some(query);
330 self
331 }
332
333 /// Convert to a [`reqwest::RequestBuilder`](reqwest::RequestBuilder)
334 /// prepared to build requests to InfluxDB using InfluxQL
335 pub fn into_reqwest_builder(self) -> ReqwestRequestBuilder {
336 let mut params = HashMap::new();
337 if let Some(query) = self.query.as_ref() {
338 params.insert("q", query.as_ref());
339 }
340 if let Some(database) = self.database.as_ref() {
341 params.insert("db", database.as_ref());
342 }
343
344 self.builder
345 .form(¶ms)
346 }
347}
348
349#[async_trait]
350impl InfluxqlResponseWrapper for ReqwestResponse {
351 async fn dataframes<DF, E>(self) -> Result<Vec<StatementResult<DF>>, ClientError>
352 where
353 DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
354 E: Into<ResponseError>,
355 {
356 let text = self.text().await?;
357 let dataframes = from_str(&text)?;
358 Ok(dataframes)
359 }
360}
361
362/// A trait to parse a list of dataframes from [Reqwest responses](reqwest::Response).
363///
364/// This trait is used to attach a `dataframes()` function to [`reqwest::Response`](reqwest::Response).
365///
366/// ```no_run
367/// # use std::collections::HashMap;
368/// # use url::Url;
369/// # use rinfluxdb_influxql::{Query, ResponseError};
370/// use rinfluxdb_influxql::r#async::InfluxqlClientWrapper;
371/// use rinfluxdb_dataframe::DataFrame;
372///
373/// // Bring into scope the trait implementation
374/// use rinfluxdb_influxql::r#async::InfluxqlResponseWrapper;
375///
376/// async_std::task::block_on(async {
377/// // Create Reqwest client
378/// let client = reqwest::Client::new();
379///
380/// // Create InfluxQL request
381/// let base_url = Url::parse("https://example.com")?;
382/// let mut request = client
383/// .influxql(&base_url)?
384/// .database("house")
385/// .query(Query::new("SELECT temperature FROM indoor_temperature"))
386/// .into_reqwest_builder()
387/// .build()?;
388///
389/// // Execute the request through Reqwest and obtain a response
390/// let response = client.execute(request).await?;
391///
392/// // Return an error if response status is not 200
393/// // (this is a function from Reqwest's response)
394/// let response = response.error_for_status()?;
395///
396/// // Parse the response from JSON to a list of dataframes
397/// // (this is a function added by the trait above)
398/// let results: Vec<Result<Vec<(DataFrame, Option<HashMap<String, String>>)>, ResponseError>>
399/// = response.dataframes().await?;
400///
401/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
402/// # })?;
403/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
404/// ```
405#[async_trait]
406pub trait InfluxqlResponseWrapper {
407 /// Return the response body as a list of tagged dataframes
408 async fn dataframes<DF, E>(self) -> Result<Vec<StatementResult<DF>>, ClientError>
409 where
410 DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
411 E: Into<ResponseError>;
412}