rinfluxdb_lineprotocol/client/blocking.rs
1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7use tracing::*;
8
9use reqwest::blocking::Client as ReqwestClient;
10use reqwest::blocking::ClientBuilder as ReqwestClientBuilder;
11use reqwest::blocking::RequestBuilder as ReqwestRequestBuilder;
12use reqwest::blocking::Response as ReqwestResponse;
13
14use url::Url;
15
16use super::super::Line;
17use super::{parse_error, ClientError};
18
19/// A client for sending data with Influx Line Protocol queries in a convenient
20/// way
21///
22/// ```.no_run
23/// use url::Url;
24/// use rinfluxdb_lineprotocol::LineBuilder;
25/// use rinfluxdb_lineprotocol::blocking::Client;
26///
27/// let client = Client::new(
28/// Url::parse("https://example.com/")?,
29/// Some(("username", "password")),
30/// )?;
31///
32/// let lines = vec![
33/// LineBuilder::new("measurement")
34/// .insert_field("field", 42.0)
35/// .build(),
36/// LineBuilder::new("measurement")
37/// .insert_field("field", 43.0)
38/// .insert_tag("tag", "value")
39/// .build(),
40/// ];
41///
42/// client.send("database", &lines)?;
43/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
44/// ```
45#[derive(Debug)]
46pub struct Client {
47 client: ReqwestClient,
48 base_url: Url,
49 credentials: Option<(String, String)>,
50}
51
52impl Client {
53 /// Create a new client to an InfluxDB server
54 ///
55 /// Parameter `credentials` can be used to provide username and password if
56 /// the server requires authentication.
57 pub fn new<T, S>(
58 base_url: Url,
59 credentials: Option<(T, S)>,
60 ) -> Result<Self, ClientError>
61 where
62 T: Into<String>,
63 S: Into<String>,
64 {
65 let client = ReqwestClientBuilder::new()
66 .build()?;
67
68 let credentials = credentials
69 .map(|(username, password)| (username.into(), password.into()));
70
71 Ok(Self {
72 client,
73 base_url,
74 credentials,
75 })
76 }
77
78 /// Sends data using the Influx Line Protocol
79 #[instrument(
80 name = "Sending data using the Influx Line Protocol",
81 skip(self, database, lines),
82 )]
83 pub fn send(&self, database: &str, lines: &[Line]) -> Result<(), ClientError> {
84 let mut request = self.client
85 .line_protocol(&self.base_url, database, lines)?;
86
87 if let Some((username, password)) = &self.credentials {
88 request = request.basic_auth(username, Some(password));
89 }
90
91 debug!("Sending {} lines to {}", lines.len(), self.base_url);
92 trace!("Request: {:?}", request);
93
94 let response = request.send()?;
95
96 response.process_line_protocol_response()?;
97
98 Ok(())
99 }
100}
101
102/// A trait to obtain a prepared Influx Line Protocol request builder from [Reqwest clients](reqwest::blocking::Client).
103///
104/// This trait is used to attach a `line_protocol()` function to [`reqwest::blocking::Client`](reqwest::blocking::Client).
105///
106/// ```no_run
107/// # use url::Url;
108/// # use rinfluxdb_lineprotocol::LineBuilder;
109/// // Bring into scope the trait implementation
110/// use rinfluxdb_lineprotocol::blocking::InfluxLineClientWrapper;
111///
112/// // Create Reqwest client
113/// let client = reqwest::blocking::Client::new();
114///
115/// // Set database name
116/// let database = "database";
117///
118/// // Create data
119/// let lines = vec![
120/// LineBuilder::new("measurement")
121/// .insert_field("field", 42.0)
122/// .build(),
123/// LineBuilder::new("measurement")
124/// .insert_field("field", 43.0)
125/// .insert_tag("tag", "value")
126/// .build(),
127/// ];
128///
129/// // Create Influx Line Protocol request
130/// let base_url = Url::parse("https://example.com")?;
131/// let mut builder = client
132/// // (this is a function added by the trait above)
133/// .line_protocol(&base_url, &database, &lines)?;
134///
135/// // This is a regular Reqwest builder, and can be customized as usual
136/// if let Some((username, password)) = Some(("username", "password")) {
137/// builder = builder.basic_auth(username, Some(password));
138/// }
139///
140/// // Create a request from the builder
141/// let request = builder.build()?;
142///
143/// // Execute the request through Reqwest and obtain a response
144/// let response = client.execute(request)?;
145///
146/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
147/// ```
148pub trait InfluxLineClientWrapper {
149 /// Create an Influx Line Protocol request builder
150 ///
151 /// The request will point to the InfluxDB instance available at
152 /// `base_url`.
153 /// In particular, it will send a POST request to `base_url + "/query"`.
154 fn line_protocol(
155 &self,
156 base_url: &Url,
157 database: &str,
158 lines: &[Line],
159 ) -> Result<Self::RequestBuilderType, ClientError>;
160
161 /// The type of the resulting request builder
162 ///
163 /// This type is a parameter so the trait can be implemented for
164 /// `reqwest::Client` returning `reqwest::RequestBuilder`, and for
165 /// `reqwest::blocking::Client` returning `reqwest::blocking::RequestBuilder`.
166 type RequestBuilderType;
167}
168
169impl InfluxLineClientWrapper for ReqwestClient {
170 type RequestBuilderType = ReqwestRequestBuilder;
171
172 fn line_protocol(
173 &self,
174 base_url: &Url,
175 database: &str,
176 lines: &[Line],
177 ) -> Result<ReqwestRequestBuilder, ClientError> {
178 let mut url = base_url.join("/write")?;
179 let query = "db=".to_string() + database;
180 url.set_query(Some(&query));
181
182 let strings: Vec<String> = lines.iter().map(|line| line.to_string()).collect();
183 let payload: String = strings.join("\n");
184
185 let builder = self
186 .post(url)
187 .body(payload);
188
189 Ok(builder)
190 }
191}
192
193/// A trait to parse a list of dataframes from [Reqwest responses](reqwest::blocking::Response).
194///
195/// This trait is used to attach a `dataframes()` function to [`reqwest::blocking::Response`](reqwest::blocking::Response).
196///
197/// ```no_run
198/// # use url::Url;
199/// # use rinfluxdb_lineprotocol::LineBuilder;
200/// use rinfluxdb_lineprotocol::blocking::InfluxLineClientWrapper;
201///
202/// // Bring into scope the trait implementation
203/// use rinfluxdb_lineprotocol::blocking::InfluxLineResponseWrapper;
204///
205/// // Create Reqwest client
206/// let client = reqwest::blocking::Client::new();
207///
208/// // Set database name
209/// let database = "database";
210///
211/// // Create data
212/// let lines = vec![
213/// LineBuilder::new("measurement")
214/// .insert_field("field", 42.0)
215/// .build(),
216/// LineBuilder::new("measurement")
217/// .insert_field("field", 43.0)
218/// .insert_tag("tag", "value")
219/// .build(),
220/// ];
221///
222/// // Create Influx Line Protocol request
223/// let base_url = Url::parse("https://example.com")?;
224/// let mut builder = client
225/// // (this is a function added by the trait above)
226/// .line_protocol(&base_url, &database, &lines)?;
227///
228/// // This is a regular Reqwest builder, and can be customized as usual
229/// if let Some((username, password)) = Some(("username", "password")) {
230/// builder = builder.basic_auth(username, Some(password));
231/// }
232///
233/// // Create a request from the builder
234/// let request = builder.build()?;
235///
236/// // Execute the request through Reqwest and obtain a response
237/// let response = client.execute(request)?;
238///
239/// // Process the response.
240/// response.process_line_protocol_response()?;
241/// # Ok::<(), rinfluxdb_lineprotocol::ClientError>(())
242/// ```
243pub trait InfluxLineResponseWrapper {
244 /// Process the response, parsing potential errors
245 fn process_line_protocol_response(self) -> Result<(), ClientError>;
246}
247
248impl InfluxLineResponseWrapper for ReqwestResponse {
249 fn process_line_protocol_response(self) -> Result<(), ClientError> {
250 match self.error_for_status_ref() {
251 Ok(_) => Ok(()),
252 Err(_) => {
253 let text = self.text()?;
254 debug!("Response: \"{}\"", text);
255 let error = parse_error(&text);
256 Err(error)
257 }
258 }
259 }
260}