rinfluxdb_lineprotocol/client/async.rs
1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use tracing::*;
8
9use reqwest::Client as ReqwestClient;
10use reqwest::ClientBuilder as ReqwestClientBuilder;
11use reqwest::RequestBuilder as ReqwestRequestBuilder;
12use reqwest::Response as ReqwestResponse;
13
14use url::Url;
15
16use async_trait::async_trait;
17
18use super::super::Line;
19use super::{parse_error, ClientError};
20
21/// A client for sending data with Influx Line Protocol queries in a convenient
22/// way
23///
24/// ```.no_run
25/// use url::Url;
26/// use rinfluxdb_lineprotocol::LineBuilder;
27/// use rinfluxdb_lineprotocol::r#async::Client;
28///
29/// # async_std::task::block_on(async {
30/// let client = Client::new(
31/// Url::parse("https://example.com/")?,
32/// Some(("username", "password")),
33/// )?;
34///
35/// let lines = vec![
36/// LineBuilder::new("measurement")
37/// .insert_field("field", 42.0)
38/// .build(),
39/// LineBuilder::new("measurement")
40/// .insert_field("field", 43.0)
41/// .insert_tag("tag", "value")
42/// .build(),
43/// ];
44///
45/// client.send("database", &lines).await?;
46/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
47/// # })?;
48/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
49/// ```
50#[derive(Debug)]
51pub struct Client {
52 client: ReqwestClient,
53 base_url: Url,
54 credentials: Option<(String, String)>,
55}
56
57impl Client {
58 /// Create a new client to an InfluxDB server
59 ///
60 /// Parameter `credentials` can be used to provide username and password if
61 /// the server requires authentication.
62 pub fn new<T, S>(
63 base_url: Url,
64 credentials: Option<(T, S)>,
65 ) -> Result<Self, ClientError>
66 where
67 T: Into<String>,
68 S: Into<String>,
69 {
70 let client = ReqwestClientBuilder::new()
71 .build()?;
72
73 let credentials = credentials
74 .map(|(username, password)| (username.into(), password.into()));
75
76 Ok(Self {
77 client,
78 base_url,
79 credentials,
80 })
81 }
82
83 /// Sends data using the Influx Line Protocol
84 #[instrument(
85 name = "Sending data using the Influx Line Protocol",
86 skip(self, database, lines),
87 )]
88 pub async fn send(&self, database: &str, lines: &[Line]) -> Result<(), ClientError> {
89 let mut request = self.client
90 .line_protocol(&self.base_url, database, lines)?;
91
92 if let Some((username, password)) = &self.credentials {
93 request = request.basic_auth(username, Some(password));
94 }
95
96 debug!("Sending {} lines to {}", lines.len(), self.base_url);
97 trace!("Request: {:?}", request);
98
99 let response = request.send().await?;
100
101 response.process_line_protocol_response().await?;
102
103 Ok(())
104 }
105}
106
107/// A trait to obtain a prepared Influx Line Protocol request builder from [Reqwest clients](reqwest::Client).
108///
109/// This trait is used to attach a `line_protocol()` function to [`reqwest::Client`](reqwest::Client).
110///
111/// ```no_run
112/// # use url::Url;
113/// # use rinfluxdb_lineprotocol::LineBuilder;
114/// // Bring into scope the trait implementation
115/// use rinfluxdb_lineprotocol::r#async::InfluxLineClientWrapper;
116///
117/// # async_std::task::block_on(async {
118/// // Create Reqwest client
119/// let client = reqwest::Client::new();
120///
121/// // Set database name
122/// let database = "database";
123///
124/// // Create data
125/// let lines = vec![
126/// LineBuilder::new("measurement")
127/// .insert_field("field", 42.0)
128/// .build(),
129/// LineBuilder::new("measurement")
130/// .insert_field("field", 43.0)
131/// .insert_tag("tag", "value")
132/// .build(),
133/// ];
134///
135/// // Create Influx Line Protocol request
136/// let base_url = Url::parse("https://example.com")?;
137/// let mut builder = client
138/// // (this is a function added by the trait above)
139/// .line_protocol(&base_url, &database, &lines)?;
140///
141/// // This is a regular Reqwest builder, and can be customized as usual
142/// if let Some((username, password)) = Some(("username", "password")) {
143/// builder = builder.basic_auth(username, Some(password));
144/// }
145///
146/// // Create a request from the builder
147/// let request = builder.build()?;
148///
149/// // Execute the request through Reqwest and obtain a response
150/// let response = client.execute(request).await?;
151///
152/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
153/// # })?;
154/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
155/// ```
156pub trait InfluxLineClientWrapper {
157 /// Create an Influx Line Protocol request builder
158 ///
159 /// The request will point to the InfluxDB instance available at
160 /// `base_url`.
161 /// In particular, it will send a POST request to `base_url + "/query"`.
162 fn line_protocol(
163 &self,
164 base_url: &Url,
165 database: &str,
166 lines: &[Line],
167 ) -> Result<Self::RequestBuilderType, ClientError>;
168
169 /// The type of the resulting request builder
170 ///
171 /// This type is a parameter so the trait can be implemented for
172 /// `reqwest::Client` returning `reqwest::RequestBuilder`, and for
173 /// `reqwest::Client` returning `reqwest::RequestBuilder`.
174 type RequestBuilderType;
175}
176
177impl InfluxLineClientWrapper for ReqwestClient {
178 type RequestBuilderType = ReqwestRequestBuilder;
179
180 fn line_protocol(
181 &self,
182 base_url: &Url,
183 database: &str,
184 lines: &[Line],
185 ) -> Result<ReqwestRequestBuilder, ClientError> {
186 let mut url = base_url.join("/write")?;
187 let query = "db=".to_string() + database;
188 url.set_query(Some(&query));
189
190 let strings: Vec<String> = lines.iter().map(|line| line.to_string()).collect();
191 let payload: String = strings.join("\n");
192
193 let builder = self
194 .post(url)
195 .body(payload);
196
197 Ok(builder)
198 }
199}
200
201/// A trait to parse a list of dataframes from [Reqwest responses](reqwest::Response).
202///
203/// This trait is used to attach a `dataframes()` function to [`reqwest::Response`](reqwest::Response).
204///
205/// ```no_run
206/// # use url::Url;
207/// # use rinfluxdb_lineprotocol::LineBuilder;
208/// use rinfluxdb_lineprotocol::r#async::InfluxLineClientWrapper;
209///
210/// // Bring into scope the trait implementation
211/// use rinfluxdb_lineprotocol::r#async::InfluxLineResponseWrapper;
212///
213/// # async_std::task::block_on(async {
214/// // Create Reqwest client
215/// let client = reqwest::Client::new();
216///
217/// // Set database name
218/// let database = "database";
219///
220/// // Create data
221/// let lines = vec![
222/// LineBuilder::new("measurement")
223/// .insert_field("field", 42.0)
224/// .build(),
225/// LineBuilder::new("measurement")
226/// .insert_field("field", 43.0)
227/// .insert_tag("tag", "value")
228/// .build(),
229/// ];
230///
231/// // Create Influx Line Protocol request
232/// let base_url = Url::parse("https://example.com")?;
233/// let mut builder = client
234/// // (this is a function added by the trait above)
235/// .line_protocol(&base_url, &database, &lines)?;
236///
237/// // This is a regular Reqwest builder, and can be customized as usual
238/// if let Some((username, password)) = Some(("username", "password")) {
239/// builder = builder.basic_auth(username, Some(password));
240/// }
241///
242/// // Create a request from the builder
243/// let request = builder.build()?;
244///
245/// // Execute the request through Reqwest and obtain a response
246/// let response = client.execute(request).await?;
247///
248/// // Process the response.
249/// response.process_line_protocol_response().await?;
250///
251/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
252/// # })?;
253/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
254/// ```
255#[async_trait]
256pub trait InfluxLineResponseWrapper {
257 /// Process the response, parsing potential errors
258 async fn process_line_protocol_response(self) -> Result<(), ClientError>;
259}
260
261#[async_trait]
262impl InfluxLineResponseWrapper for ReqwestResponse {
263 async fn process_line_protocol_response(self) -> Result<(), ClientError> {
264 match self.error_for_status_ref() {
265 Ok(_) => Ok(()),
266 Err(_) => {
267 let text = self.text().await?;
268 debug!("Response: \"{}\"", text);
269 let error = parse_error(&text);
270 Err(error)
271 }
272 }
273 }
274}