influxdb_client/
client.rs1use reqwest::{Client as HttpClient, Method, Url};
2
3use std::error::Error;
4
5use crate::{
6 models::{InfluxError, Precision, TimestampOptions},
7 traits::PointSerialize,
8};
9
10pub struct Client {
12 host: Url,
13 token: String,
14 client: HttpClient,
15 bucket: Option<String>,
16 org: Option<String>,
17 org_id: Option<String>,
18 precision: Precision,
19
20 insert_to_stdout: bool,
21}
22
23impl Client {
24 pub fn new<T>(host: T, token: T) -> Client
25 where
26 T: Into<String>,
27 {
28 let host_url = reqwest::Url::parse(&host.into()[..]).unwrap();
29
30 Client {
31 host: host_url,
32 token: token.into(),
33 client: HttpClient::default(),
34 bucket: None,
35 org: None,
36 org_id: None,
37 precision: Precision::NS,
38 insert_to_stdout: false,
39 }
40 }
41
42 pub fn insert_to_stdout(mut self) -> Self {
43 self.insert_to_stdout = true;
44 self
45 }
46
47 pub fn with_bucket<T: Into<String>>(mut self, bucket: T) -> Self {
48 self.bucket = Some(bucket.into());
49 self
50 }
51
52 pub fn with_org<T: Into<String>>(mut self, org: T) -> Self {
53 self.org = Some(org.into());
54 self
55 }
56
57 pub fn with_org_id<T: Into<String>>(mut self, org_id: T) -> Self {
58 self.org_id = Some(org_id.into());
59 self
60 }
61
62 pub fn with_precision(mut self, precision: Precision) -> Self {
63 self.precision = precision;
64 self
65 }
66
67 pub fn precision(&self) -> &str {
68 self.precision.to_string()
69 }
70
71 pub async fn insert_points<'a, I: IntoIterator<Item = &'a (impl PointSerialize + 'a)>>(
72 &self,
73 points: I,
74 options: TimestampOptions,
75 ) -> Result<(), InfluxError> {
76 let body = points
77 .into_iter()
78 .map(|p| {
79 format!(
80 "{}",
81 match options.clone() {
82 TimestampOptions::Use(t) => p.serialize_with_timestamp(Some(t)),
83 TimestampOptions::FromPoint => p.serialize_with_timestamp(None),
84 TimestampOptions::None => p.serialize(),
85 }
86 )
87 })
88 .collect::<Vec<String>>()
89 .join("\n");
90
91 let precision = self.precision.to_string();
92 let write_query_params = [("precision", precision)];
93
94 if self.insert_to_stdout {
95 println!("{}", body);
96 } else {
97 let result = self
98 .new_request(Method::POST, "/api/v2/write")
99 .query(&write_query_params)
100 .body(body)
101 .send()
102 .await
103 .unwrap()
104 .error_for_status();
105
106 if let Err(err) = result {
107 let status = err.status().unwrap().as_u16();
108 return Err(Client::status_to_influxerror(status, Box::new(err)));
109 }
110 }
111
112 Ok(())
113 }
114
115 fn new_request(&self, method: Method, path: &str) -> reqwest::RequestBuilder {
116 let mut query_params = Vec::<(&str, String)>::new();
118 if let Some(bucket) = &self.bucket {
119 query_params.push(("bucket", bucket.clone()));
120 }
121
122 if let Some(org) = &self.org {
123 query_params.push(("org", org.clone()));
124 } else if let Some(org_id) = &self.org_id {
125 query_params.push(("orgID", org_id.clone()));
126 }
127
128 let mut url = self.host.clone();
130 url.set_path(path);
131
132 self.client
133 .request(method, url)
134 .header("Content-Type", "text/plain")
135 .header("Authorization", format!("{} {}", "Token", self.token))
136 .query(&query_params)
137 }
138
139 fn status_to_influxerror(status: u16, err: Box<dyn Error>) -> InfluxError {
140 match status {
141 400 => InfluxError::InvalidSyntax(err.to_string()),
142 401 => InfluxError::InvalidCredentials(err.to_string()),
143 403 => InfluxError::Forbidden(err.to_string()),
144 _ => InfluxError::Unknown(err.to_string()),
145 }
146 }
147}