rinfluxdb_influxql/client/
async.rs

1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use std::collections::HashMap;
8use std::convert::TryFrom;
9
10use tracing::*;
11
12use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
13use reqwest::Client as ReqwestClient;
14use reqwest::ClientBuilder as ReqwestClientBuilder;
15use reqwest::RequestBuilder as ReqwestRequestBuilder;
16use reqwest::Response as ReqwestResponse;
17
18use url::Url;
19
20use chrono::{DateTime, Utc};
21
22use async_trait::async_trait;
23
24use rinfluxdb_types::Value;
25
26use super::ClientError;
27
28use super::super::query::Query;
29use super::super::response::{from_str, ResponseError};
30use super::super::StatementResult;
31
32/// A client for performing frequent InfluxQL queries in a convenient way
33///
34/// ```.no_run
35/// use std::collections::HashMap;
36/// use url::Url;
37/// use rinfluxdb_influxql::QueryBuilder;
38/// use rinfluxdb_influxql::r#async::Client;
39/// use rinfluxdb_dataframe::DataFrame;
40///
41/// async_std::task::block_on(async {
42/// let client = Client::new(
43///     Url::parse("https://example.com/")?,
44///     Some(("username", "password")),
45/// )?;
46///
47/// let query = QueryBuilder::from("indoor_environment")
48///     .database("house")
49///     .field("temperature")
50///     .field("humidity")
51///     .build();
52/// let dataframe: DataFrame = client.fetch_dataframe(query).await?;
53/// println!("{}", dataframe);
54///
55/// let query = QueryBuilder::from("indoor_environment")
56///     .database("house")
57///     .field("temperature")
58///     .field("humidity")
59///     .group_by("room")
60///     .build();
61/// let tagged_dataframes: HashMap<String, DataFrame> = client.fetch_dataframes_by_tag(query, "room").await?;
62/// for (tag, dataframe) in tagged_dataframes {
63///     println!("{}: {}", tag, dataframe);
64/// }
65/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
66/// # })?;
67/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
68/// ```
69#[derive(Debug)]
70pub struct Client {
71    client: ReqwestClient,
72    base_url: Url,
73    credentials: Option<(String, String)>,
74}
75
76impl Client {
77    /// Create a new client to an InfluxDB server
78    ///
79    /// Parameter `credentials` can be used to provide username and password if
80    /// the server requires authentication.
81    pub fn new<T, S>(
82        base_url: Url,
83        credentials: Option<(T, S)>,
84    ) -> Result<Self, ClientError>
85    where
86        T: Into<String>,
87        S: Into<String>,
88    {
89        let mut headers = HeaderMap::new();
90        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
91
92        let client = ReqwestClientBuilder::new()
93            .default_headers(headers)
94            .build()?;
95
96        let credentials = credentials
97            .map(|(username, password)| (username.into(), password.into()));
98
99        Ok(Self {
100            client,
101            base_url,
102            credentials,
103        })
104    }
105
106    /// Query the server for a single dataframe
107    ///
108    /// This function assumes a single statement is returned, and that such
109    /// statement contains a single dataframe. Everything else is ignored.
110    ///
111    /// [`ClientError::EmptyError`](ClientError::EmptyError) is returned if the
112    /// response does not contain
113    /// dataframes.
114    #[instrument(
115        name = "Fetching dataframe",
116        skip(self),
117    )]
118    pub async fn fetch_dataframe<DF, E>(
119        &self,
120        query: Query,
121    ) -> Result<DF, ClientError>
122    where
123        DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
124        E: Into<ResponseError>,
125    {
126        let statement_results = self.fetch_readings_from_database(query, None::<String>).await?;
127        let statement_result = statement_results
128            .into_iter()
129            .next()
130            .ok_or(ClientError::EmptyError)?;
131        let dataframes = statement_result?;
132        let (dataframe, _tags) = dataframes
133            .into_iter()
134            .next()
135            .ok_or(ClientError::EmptyError)?;
136        Ok(dataframe)
137    }
138
139    /// Query the server for dataframes grouped by a single tag
140    ///
141    /// This function assumes a single statement is returned, and that such
142    /// statement contains multiple dataframe with the specified tag.
143    /// Everything else is ignored.
144    ///
145    /// [`ClientError::EmptyError`](ClientError::EmptyError) is returned if the
146    /// response does not contain dataframes.
147    /// [`ClientError::ExpectedTagsError`](ClientError::ExpectedTagsError) is
148    /// returned if the response does not contain tagged dataframes.
149    /// [`ClientError::ExpectedTagError`](ClientError::ExpectedTagError) is
150    /// returned if the response contains tagged dataframes, but the specified
151    /// tag is missing.
152    #[instrument(
153        name = "Fetching dataframe by tag",
154        skip(self),
155    )]
156    pub async fn fetch_dataframes_by_tag<DF, E>(
157        &self,
158        query: Query,
159        tag: &str,
160    ) -> Result<HashMap<String, DF>, ClientError>
161    where
162        DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
163        E: Into<ResponseError>,
164    {
165        let statement_results = self.fetch_readings_from_database(query, None::<String>).await?;
166        let statement_result = statement_results
167            .into_iter()
168            .next()
169            .ok_or(ClientError::EmptyError)?;
170        let dataframes = statement_result?;
171        dataframes
172            .into_iter()
173            .map(|(dataframe, tags)| {
174                let tags = tags.ok_or(ClientError::ExpectedTagsError)?;
175                let tag_value = tags
176                    .get(tag)
177                    .ok_or_else(|| ClientError::ExpectedTagError(tag.to_owned()))?;
178                Ok((tag_value.to_owned(), dataframe))
179            })
180            .collect()
181    }
182
183    pub async fn fetch_readings<DF, E>(
184        &self,
185        query: Query,
186    ) -> Result<Vec<StatementResult<DF>>, ClientError>
187    where
188        DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
189        E: Into<ResponseError>,
190    {
191        self.fetch_readings_from_database(query, None::<String>).await
192    }
193
194    pub async fn fetch_readings_from_database<DF, E, T>(
195        &self,
196        query: Query,
197        database: Option<T>,
198    ) -> Result<Vec<StatementResult<DF>>, ClientError>
199    where
200        DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
201        E: Into<ResponseError>,
202        T: Into<String>,
203    {
204        let mut influxql_request = self.client
205            .influxql(&self.base_url)?
206            .query(query);
207        if let Some(database) = database {
208            influxql_request = influxql_request.database(database);
209        }
210        let mut request = influxql_request.into_reqwest_builder();
211
212        if let Some((username, password)) = &self.credentials {
213            request = request.basic_auth(username, Some(password));
214        }
215
216        let request = request.build()?;
217
218        debug!("Sending request to {}", self.base_url);
219        trace!("Request: {:?}", request);
220
221        let response = self.client.execute(request).await?;
222
223        let response = response.error_for_status()?;
224
225        type TaggedDataFrames<DF> = Vec<(DF, Option<HashMap<String, String>>)>;
226        let results: Vec<Result<TaggedDataFrames<DF>, ResponseError>> = response.dataframes().await?;
227        debug!("Fetched {} statement results", results.len());
228
229        Ok(results)
230    }
231}
232
233/// A trait to obtain a prepared InfluxQL request builder from [Reqwest clients](reqwest::Client).
234///
235/// This trait is used to attach an `influxql()` function to [`reqwest::Client`](reqwest::Client).
236///
237/// ```no_run
238/// # use url::Url;
239/// # use rinfluxdb_influxql::Query;
240/// // Bring into scope the trait implementation
241/// use rinfluxdb_influxql::r#async::InfluxqlClientWrapper;
242///
243/// async_std::task::block_on(async {
244/// // Create Reqwest client
245/// let client = reqwest::Client::new();
246///
247/// // Create InfluxQL request
248/// let base_url = Url::parse("https://example.com")?;
249/// let mut builder = client
250///     // (this is a function added by the trait above)
251///     .influxql(&base_url)?
252///     // (this functions are defined on influxql::RequestBuilder)
253///     .database("house")
254///     .query(Query::new("SELECT temperature FROM indoor_temperature"))
255///     // (this function returns a regular Reqwest builder)
256///     .into_reqwest_builder();
257///
258/// // Now this is a regular Reqwest builder, and can be customized as usual
259/// if let Some((username, password)) = Some(("username", "password")) {
260///     builder = builder.basic_auth(username, Some(password));
261/// }
262///
263/// // Create a request from the builder
264/// let request = builder.build()?;
265///
266/// // Execute the request through Reqwest and obtain a response
267/// let response = client.execute(request).await?;
268///
269/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
270/// # })?;
271/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
272/// ```
273pub trait InfluxqlClientWrapper {
274    /// Create an InfluxQL request builder
275    ///
276    /// The request will point to the InfluxDB instance available at
277    /// `base_url`.
278    /// In particular, it will send a POST request to `base_url + "/query"`.
279    fn influxql(&self, base_url: &Url) -> Result<RequestBuilder, ClientError>;
280}
281
282impl InfluxqlClientWrapper for ReqwestClient {
283    fn influxql(&self, base_url: &Url) -> Result<RequestBuilder, ClientError> {
284        let url = base_url.join("/query")?;
285
286        let mut headers = HeaderMap::new();
287        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
288
289        let builder = self
290            .post(url)
291            .headers(headers);
292
293        Ok(RequestBuilder::new(builder))
294    }
295}
296
297/// An extension of [`reqwest::RequestBuilder`](reqwest::RequestBuilder)
298/// to build requests to InfluxDB using InfluxQL
299///
300/// See traits [`InfluxqlClientWrapper`](InfluxqlClientWrapper) and
301/// [`InfluxqlResponseWrapper`](InfluxqlResponseWrapper) for an example.
302#[derive(Debug)]
303pub struct RequestBuilder {
304    builder: ReqwestRequestBuilder,
305    database: Option<String>,
306    query: Option<Query>,
307}
308
309impl RequestBuilder {
310    fn new(builder: ReqwestRequestBuilder) -> Self {
311        Self {
312            builder,
313            database: None,
314            query: None,
315        }
316    }
317
318    /// Set a database for the request
319    pub fn database<T>(mut self, database: T) -> Self
320    where
321        T: Into<String>,
322    {
323        self.database = Some(database.into());
324        self
325    }
326
327    /// Set the query for the request
328    pub fn query(mut self, query: Query) -> Self {
329        self.query = Some(query);
330        self
331    }
332
333    /// Convert to a [`reqwest::RequestBuilder`](reqwest::RequestBuilder)
334    /// prepared to build requests to InfluxDB using InfluxQL
335    pub fn into_reqwest_builder(self) -> ReqwestRequestBuilder {
336        let mut params = HashMap::new();
337        if let Some(query) = self.query.as_ref() {
338            params.insert("q", query.as_ref());
339        }
340        if let Some(database) = self.database.as_ref() {
341            params.insert("db", database.as_ref());
342        }
343
344        self.builder
345            .form(&params)
346    }
347}
348
349#[async_trait]
350impl InfluxqlResponseWrapper for ReqwestResponse {
351    async fn dataframes<DF, E>(self) -> Result<Vec<StatementResult<DF>>, ClientError>
352    where
353        DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
354        E: Into<ResponseError>,
355    {
356        let text = self.text().await?;
357        let dataframes = from_str(&text)?;
358        Ok(dataframes)
359    }
360}
361
362/// A trait to parse a list of dataframes from [Reqwest responses](reqwest::Response).
363///
364/// This trait is used to attach a `dataframes()` function to [`reqwest::Response`](reqwest::Response).
365///
366/// ```no_run
367/// # use std::collections::HashMap;
368/// # use url::Url;
369/// # use rinfluxdb_influxql::{Query, ResponseError};
370/// use rinfluxdb_influxql::r#async::InfluxqlClientWrapper;
371/// use rinfluxdb_dataframe::DataFrame;
372///
373/// // Bring into scope the trait implementation
374/// use rinfluxdb_influxql::r#async::InfluxqlResponseWrapper;
375///
376/// async_std::task::block_on(async {
377/// // Create Reqwest client
378/// let client = reqwest::Client::new();
379///
380/// // Create InfluxQL request
381/// let base_url = Url::parse("https://example.com")?;
382/// let mut request = client
383///     .influxql(&base_url)?
384///     .database("house")
385///     .query(Query::new("SELECT temperature FROM indoor_temperature"))
386///     .into_reqwest_builder()
387///     .build()?;
388///
389/// // Execute the request through Reqwest and obtain a response
390/// let response = client.execute(request).await?;
391///
392/// // Return an error if response status is not 200
393/// // (this is a function from Reqwest's response)
394/// let response = response.error_for_status()?;
395///
396/// // Parse the response from JSON to a list of dataframes
397/// // (this is a function added by the trait above)
398/// let results: Vec<Result<Vec<(DataFrame, Option<HashMap<String, String>>)>, ResponseError>>
399///     = response.dataframes().await?;
400///
401/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
402/// # })?;
403/// # Ok::<(), rinfluxdb_influxql::ClientError>(())
404/// ```
405#[async_trait]
406pub trait InfluxqlResponseWrapper {
407    /// Return the response body as a list of tagged dataframes
408    async fn dataframes<DF, E>(self) -> Result<Vec<StatementResult<DF>>, ClientError>
409    where
410        DF: TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>,
411        E: Into<ResponseError>;
412}