rinfluxdb_lineprotocol/client/
blocking.rs

1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use tracing::*;
8
9use reqwest::blocking::Client as ReqwestClient;
10use reqwest::blocking::ClientBuilder as ReqwestClientBuilder;
11use reqwest::blocking::RequestBuilder as ReqwestRequestBuilder;
12use reqwest::blocking::Response as ReqwestResponse;
13
14use url::Url;
15
16use super::super::Line;
17use super::{parse_error, ClientError};
18
19/// A client for sending data with Influx Line Protocol queries in a convenient
20/// way
21///
22/// ```.no_run
23/// use url::Url;
24/// use rinfluxdb_lineprotocol::LineBuilder;
25/// use rinfluxdb_lineprotocol::blocking::Client;
26///
27/// let client = Client::new(
28///     Url::parse("https://example.com/")?,
29///     Some(("username", "password")),
30/// )?;
31///
32/// let lines = vec![
33///     LineBuilder::new("measurement")
34///         .insert_field("field", 42.0)
35///         .build(),
36///     LineBuilder::new("measurement")
37///         .insert_field("field", 43.0)
38///         .insert_tag("tag", "value")
39///         .build(),
40/// ];
41///
42/// client.send("database", &lines)?;
43/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
44/// ```
45#[derive(Debug)]
46pub struct Client {
47    client: ReqwestClient,
48    base_url: Url,
49    credentials: Option<(String, String)>,
50}
51
52impl Client {
53    /// Create a new client to an InfluxDB server
54    ///
55    /// Parameter `credentials` can be used to provide username and password if
56    /// the server requires authentication.
57    pub fn new<T, S>(
58        base_url: Url,
59        credentials: Option<(T, S)>,
60    ) -> Result<Self, ClientError>
61    where
62        T: Into<String>,
63        S: Into<String>,
64    {
65        let client = ReqwestClientBuilder::new()
66            .build()?;
67
68        let credentials = credentials
69            .map(|(username, password)| (username.into(), password.into()));
70
71        Ok(Self {
72            client,
73            base_url,
74            credentials,
75        })
76    }
77
78    /// Sends data using the Influx Line Protocol
79    #[instrument(
80        name = "Sending data using the Influx Line Protocol",
81        skip(self, database, lines),
82    )]
83    pub fn send(&self, database: &str, lines: &[Line]) -> Result<(), ClientError> {
84        let mut request = self.client
85                .line_protocol(&self.base_url, database, lines)?;
86
87        if let Some((username, password)) = &self.credentials {
88            request = request.basic_auth(username, Some(password));
89        }
90
91        debug!("Sending {} lines to {}", lines.len(), self.base_url);
92        trace!("Request: {:?}", request);
93
94        let response = request.send()?;
95
96        response.process_line_protocol_response()?;
97
98        Ok(())
99    }
100}
101
102/// A trait to obtain a prepared Influx Line Protocol request builder from [Reqwest clients](reqwest::blocking::Client).
103///
104/// This trait is used to attach a `line_protocol()` function to [`reqwest::blocking::Client`](reqwest::blocking::Client).
105///
106/// ```no_run
107/// # use url::Url;
108/// # use rinfluxdb_lineprotocol::LineBuilder;
109/// // Bring into scope the trait implementation
110/// use rinfluxdb_lineprotocol::blocking::InfluxLineClientWrapper;
111///
112/// // Create Reqwest client
113/// let client = reqwest::blocking::Client::new();
114///
115/// // Set database name
116/// let database = "database";
117///
118/// // Create data
119/// let lines = vec![
120///     LineBuilder::new("measurement")
121///         .insert_field("field", 42.0)
122///         .build(),
123///     LineBuilder::new("measurement")
124///         .insert_field("field", 43.0)
125///         .insert_tag("tag", "value")
126///         .build(),
127/// ];
128///
129/// // Create Influx Line Protocol request
130/// let base_url = Url::parse("https://example.com")?;
131/// let mut builder = client
132///     // (this is a function added by the trait above)
133///     .line_protocol(&base_url, &database, &lines)?;
134///
135/// // This is a regular Reqwest builder, and can be customized as usual
136/// if let Some((username, password)) = Some(("username", "password")) {
137///     builder = builder.basic_auth(username, Some(password));
138/// }
139///
140/// // Create a request from the builder
141/// let request = builder.build()?;
142///
143/// // Execute the request through Reqwest and obtain a response
144/// let response = client.execute(request)?;
145///
146/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
147/// ```
148pub trait InfluxLineClientWrapper {
149    /// Create an Influx Line Protocol request builder
150    ///
151    /// The request will point to the InfluxDB instance available at
152    /// `base_url`.
153    /// In particular, it will send a POST request to `base_url + "/query"`.
154    fn line_protocol(
155        &self,
156        base_url: &Url,
157        database: &str,
158        lines: &[Line],
159    ) -> Result<Self::RequestBuilderType, ClientError>;
160
161    /// The type of the resulting request builder
162    ///
163    /// This type is a parameter so the trait can be implemented for
164    /// `reqwest::Client` returning `reqwest::RequestBuilder`, and for
165    /// `reqwest::blocking::Client` returning `reqwest::blocking::RequestBuilder`.
166    type RequestBuilderType;
167}
168
169impl InfluxLineClientWrapper for ReqwestClient {
170    type RequestBuilderType = ReqwestRequestBuilder;
171
172    fn line_protocol(
173        &self,
174        base_url: &Url,
175        database: &str,
176        lines: &[Line],
177    ) -> Result<ReqwestRequestBuilder, ClientError> {
178        let mut url = base_url.join("/write")?;
179        let query = "db=".to_string() + database;
180        url.set_query(Some(&query));
181
182        let strings: Vec<String> = lines.iter().map(|line| line.to_string()).collect();
183        let payload: String = strings.join("\n");
184
185        let builder = self
186            .post(url)
187            .body(payload);
188
189        Ok(builder)
190    }
191}
192
193/// A trait to parse a list of dataframes from [Reqwest responses](reqwest::blocking::Response).
194///
195/// This trait is used to attach a `dataframes()` function to [`reqwest::blocking::Response`](reqwest::blocking::Response).
196///
197/// ```no_run
198/// # use url::Url;
199/// # use rinfluxdb_lineprotocol::LineBuilder;
200/// use rinfluxdb_lineprotocol::blocking::InfluxLineClientWrapper;
201///
202/// // Bring into scope the trait implementation
203/// use rinfluxdb_lineprotocol::blocking::InfluxLineResponseWrapper;
204///
205/// // Create Reqwest client
206/// let client = reqwest::blocking::Client::new();
207///
208/// // Set database name
209/// let database = "database";
210///
211/// // Create data
212/// let lines = vec![
213///     LineBuilder::new("measurement")
214///         .insert_field("field", 42.0)
215///         .build(),
216///     LineBuilder::new("measurement")
217///         .insert_field("field", 43.0)
218///         .insert_tag("tag", "value")
219///         .build(),
220/// ];
221///
222/// // Create Influx Line Protocol request
223/// let base_url = Url::parse("https://example.com")?;
224/// let mut builder = client
225///     // (this is a function added by the trait above)
226///     .line_protocol(&base_url, &database, &lines)?;
227///
228/// // This is a regular Reqwest builder, and can be customized as usual
229/// if let Some((username, password)) = Some(("username", "password")) {
230///     builder = builder.basic_auth(username, Some(password));
231/// }
232///
233/// // Create a request from the builder
234/// let request = builder.build()?;
235///
236/// // Execute the request through Reqwest and obtain a response
237/// let response = client.execute(request)?;
238///
239/// // Process the response.
240/// response.process_line_protocol_response()?;
241/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
242/// ```
243pub trait InfluxLineResponseWrapper {
244    /// Process the response, parsing potential errors
245    fn process_line_protocol_response(self) -> Result<(), ClientError>;
246}
247
248impl InfluxLineResponseWrapper for ReqwestResponse {
249    fn process_line_protocol_response(self) -> Result<(), ClientError> {
250        match self.error_for_status_ref() {
251            Ok(_) => Ok(()),
252            Err(_) => {
253                let text = self.text()?;
254                debug!("Response: \"{}\"", text);
255                let error = parse_error(&text);
256                Err(error)
257            }
258        }
259    }
260}