rinfluxdb_lineprotocol/client/
async.rs

1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use tracing::*;
8
9use reqwest::Client as ReqwestClient;
10use reqwest::ClientBuilder as ReqwestClientBuilder;
11use reqwest::RequestBuilder as ReqwestRequestBuilder;
12use reqwest::Response as ReqwestResponse;
13
14use url::Url;
15
16use async_trait::async_trait;
17
18use super::super::Line;
19use super::{parse_error, ClientError};
20
21/// A client for sending data with Influx Line Protocol queries in a convenient
22/// way
23///
24/// ```.no_run
25/// use url::Url;
26/// use rinfluxdb_lineprotocol::LineBuilder;
27/// use rinfluxdb_lineprotocol::r#async::Client;
28///
29/// # async_std::task::block_on(async {
30/// let client = Client::new(
31///     Url::parse("https://example.com/")?,
32///     Some(("username", "password")),
33/// )?;
34///
35/// let lines = vec![
36///     LineBuilder::new("measurement")
37///         .insert_field("field", 42.0)
38///         .build(),
39///     LineBuilder::new("measurement")
40///         .insert_field("field", 43.0)
41///         .insert_tag("tag", "value")
42///         .build(),
43/// ];
44///
45/// client.send("database", &lines).await?;
46/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
47/// # })?;
48/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
49/// ```
50#[derive(Debug)]
51pub struct Client {
52    client: ReqwestClient,
53    base_url: Url,
54    credentials: Option<(String, String)>,
55}
56
57impl Client {
58    /// Create a new client to an InfluxDB server
59    ///
60    /// Parameter `credentials` can be used to provide username and password if
61    /// the server requires authentication.
62    pub fn new<T, S>(
63        base_url: Url,
64        credentials: Option<(T, S)>,
65    ) -> Result<Self, ClientError>
66    where
67        T: Into<String>,
68        S: Into<String>,
69    {
70        let client = ReqwestClientBuilder::new()
71            .build()?;
72
73        let credentials = credentials
74            .map(|(username, password)| (username.into(), password.into()));
75
76        Ok(Self {
77            client,
78            base_url,
79            credentials,
80        })
81    }
82
83    /// Sends data using the Influx Line Protocol
84    #[instrument(
85        name = "Sending data using the Influx Line Protocol",
86        skip(self, database, lines),
87    )]
88    pub async fn send(&self, database: &str, lines: &[Line]) -> Result<(), ClientError> {
89        let mut request = self.client
90                .line_protocol(&self.base_url, database, lines)?;
91
92        if let Some((username, password)) = &self.credentials {
93            request = request.basic_auth(username, Some(password));
94        }
95
96        debug!("Sending {} lines to {}", lines.len(), self.base_url);
97        trace!("Request: {:?}", request);
98
99        let response = request.send().await?;
100
101        response.process_line_protocol_response().await?;
102
103        Ok(())
104    }
105}
106
107/// A trait to obtain a prepared Influx Line Protocol request builder from [Reqwest clients](reqwest::Client).
108///
109/// This trait is used to attach a `line_protocol()` function to [`reqwest::Client`](reqwest::Client).
110///
111/// ```no_run
112/// # use url::Url;
113/// # use rinfluxdb_lineprotocol::LineBuilder;
114/// // Bring into scope the trait implementation
115/// use rinfluxdb_lineprotocol::r#async::InfluxLineClientWrapper;
116///
117/// # async_std::task::block_on(async {
118/// // Create Reqwest client
119/// let client = reqwest::Client::new();
120///
121/// // Set database name
122/// let database = "database";
123///
124/// // Create data
125/// let lines = vec![
126///     LineBuilder::new("measurement")
127///         .insert_field("field", 42.0)
128///         .build(),
129///     LineBuilder::new("measurement")
130///         .insert_field("field", 43.0)
131///         .insert_tag("tag", "value")
132///         .build(),
133/// ];
134///
135/// // Create Influx Line Protocol request
136/// let base_url = Url::parse("https://example.com")?;
137/// let mut builder = client
138///     // (this is a function added by the trait above)
139///     .line_protocol(&base_url, &database, &lines)?;
140///
141/// // This is a regular Reqwest builder, and can be customized as usual
142/// if let Some((username, password)) = Some(("username", "password")) {
143///     builder = builder.basic_auth(username, Some(password));
144/// }
145///
146/// // Create a request from the builder
147/// let request = builder.build()?;
148///
149/// // Execute the request through Reqwest and obtain a response
150/// let response = client.execute(request).await?;
151///
152/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
153/// # })?;
154/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
155/// ```
156pub trait InfluxLineClientWrapper {
157    /// Create an Influx Line Protocol request builder
158    ///
159    /// The request will point to the InfluxDB instance available at
160    /// `base_url`.
161    /// In particular, it will send a POST request to `base_url + "/query"`.
162    fn line_protocol(
163        &self,
164        base_url: &Url,
165        database: &str,
166        lines: &[Line],
167    ) -> Result<Self::RequestBuilderType, ClientError>;
168
169    /// The type of the resulting request builder
170    ///
171    /// This type is a parameter so the trait can be implemented for
172    /// `reqwest::Client` returning `reqwest::RequestBuilder`, and for
173    /// `reqwest::Client` returning `reqwest::RequestBuilder`.
174    type RequestBuilderType;
175}
176
177impl InfluxLineClientWrapper for ReqwestClient {
178    type RequestBuilderType = ReqwestRequestBuilder;
179
180    fn line_protocol(
181        &self,
182        base_url: &Url,
183        database: &str,
184        lines: &[Line],
185    ) -> Result<ReqwestRequestBuilder, ClientError> {
186        let mut url = base_url.join("/write")?;
187        let query = "db=".to_string() + database;
188        url.set_query(Some(&query));
189
190        let strings: Vec<String> = lines.iter().map(|line| line.to_string()).collect();
191        let payload: String = strings.join("\n");
192
193        let builder = self
194            .post(url)
195            .body(payload);
196
197        Ok(builder)
198    }
199}
200
201/// A trait to parse a list of dataframes from [Reqwest responses](reqwest::Response).
202///
203/// This trait is used to attach a `dataframes()` function to [`reqwest::Response`](reqwest::Response).
204///
205/// ```no_run
206/// # use url::Url;
207/// # use rinfluxdb_lineprotocol::LineBuilder;
208/// use rinfluxdb_lineprotocol::r#async::InfluxLineClientWrapper;
209///
210/// // Bring into scope the trait implementation
211/// use rinfluxdb_lineprotocol::r#async::InfluxLineResponseWrapper;
212///
213/// # async_std::task::block_on(async {
214/// // Create Reqwest client
215/// let client = reqwest::Client::new();
216///
217/// // Set database name
218/// let database = "database";
219///
220/// // Create data
221/// let lines = vec![
222///     LineBuilder::new("measurement")
223///         .insert_field("field", 42.0)
224///         .build(),
225///     LineBuilder::new("measurement")
226///         .insert_field("field", 43.0)
227///         .insert_tag("tag", "value")
228///         .build(),
229/// ];
230///
231/// // Create Influx Line Protocol request
232/// let base_url = Url::parse("https://example.com")?;
233/// let mut builder = client
234///     // (this is a function added by the trait above)
235///     .line_protocol(&base_url, &database, &lines)?;
236///
237/// // This is a regular Reqwest builder, and can be customized as usual
238/// if let Some((username, password)) = Some(("username", "password")) {
239///     builder = builder.basic_auth(username, Some(password));
240/// }
241///
242/// // Create a request from the builder
243/// let request = builder.build()?;
244///
245/// // Execute the request through Reqwest and obtain a response
246/// let response = client.execute(request).await?;
247///
248/// // Process the response.
249/// response.process_line_protocol_response().await?;
250///
251/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
252/// # })?;
253/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
254/// ```
255#[async_trait]
256pub trait InfluxLineResponseWrapper {
257    /// Process the response, parsing potential errors
258    async fn process_line_protocol_response(self) -> Result<(), ClientError>;
259}
260
261#[async_trait]
262impl InfluxLineResponseWrapper for ReqwestResponse {
263    async fn process_line_protocol_response(self) -> Result<(), ClientError> {
264        match self.error_for_status_ref() {
265            Ok(_) => Ok(()),
266            Err(_) => {
267                let text = self.text().await?;
268                debug!("Response: \"{}\"", text);
269                let error = parse_error(&text);
270                Err(error)
271            }
272        }
273    }
274}