influxdb_client/
client.rs

1use reqwest::{Client as HttpClient, Method, Url};
2
3use std::error::Error;
4
5use crate::{
6    models::{InfluxError, Precision, TimestampOptions},
7    traits::PointSerialize,
8};
9
10/// Client for InfluxDB
11pub struct Client {
12    host: Url,
13    token: String,
14    client: HttpClient,
15    bucket: Option<String>,
16    org: Option<String>,
17    org_id: Option<String>,
18    precision: Precision,
19
20    insert_to_stdout: bool,
21}
22
23impl Client {
24    pub fn new<T>(host: T, token: T) -> Client
25    where
26        T: Into<String>,
27    {
28        let host_url = reqwest::Url::parse(&host.into()[..]).unwrap();
29
30        Client {
31            host: host_url,
32            token: token.into(),
33            client: HttpClient::default(),
34            bucket: None,
35            org: None,
36            org_id: None,
37            precision: Precision::NS,
38            insert_to_stdout: false,
39        }
40    }
41
42    pub fn insert_to_stdout(mut self) -> Self {
43        self.insert_to_stdout = true;
44        self
45    }
46
47    pub fn with_bucket<T: Into<String>>(mut self, bucket: T) -> Self {
48        self.bucket = Some(bucket.into());
49        self
50    }
51
52    pub fn with_org<T: Into<String>>(mut self, org: T) -> Self {
53        self.org = Some(org.into());
54        self
55    }
56
57    pub fn with_org_id<T: Into<String>>(mut self, org_id: T) -> Self {
58        self.org_id = Some(org_id.into());
59        self
60    }
61
62    pub fn with_precision(mut self, precision: Precision) -> Self {
63        self.precision = precision;
64        self
65    }
66
67    pub fn precision(&self) -> &str {
68        self.precision.to_string()
69    }
70
71    pub async fn insert_points<'a, I: IntoIterator<Item = &'a (impl PointSerialize + 'a)>>(
72        &self,
73        points: I,
74        options: TimestampOptions,
75    ) -> Result<(), InfluxError> {
76        let body = points
77            .into_iter()
78            .map(|p| {
79                format!(
80                    "{}",
81                    match options.clone() {
82                        TimestampOptions::Use(t) => p.serialize_with_timestamp(Some(t)),
83                        TimestampOptions::FromPoint => p.serialize_with_timestamp(None),
84                        TimestampOptions::None => p.serialize(),
85                    }
86                )
87            })
88            .collect::<Vec<String>>()
89            .join("\n");
90
91        let precision = self.precision.to_string();
92        let write_query_params = [("precision", precision)];
93
94        if self.insert_to_stdout {
95            println!("{}", body);
96        } else {
97            let result = self
98                .new_request(Method::POST, "/api/v2/write")
99                .query(&write_query_params)
100                .body(body)
101                .send()
102                .await
103                .unwrap()
104                .error_for_status();
105
106            if let Err(err) = result {
107                let status = err.status().unwrap().as_u16();
108                return Err(Client::status_to_influxerror(status, Box::new(err)));
109            }
110        }
111
112        Ok(())
113    }
114
115    fn new_request(&self, method: Method, path: &str) -> reqwest::RequestBuilder {
116        // Build query params
117        let mut query_params = Vec::<(&str, String)>::new();
118        if let Some(bucket) = &self.bucket {
119            query_params.push(("bucket", bucket.clone()));
120        }
121
122        if let Some(org) = &self.org {
123            query_params.push(("org", org.clone()));
124        } else if let Some(org_id) = &self.org_id {
125            query_params.push(("orgID", org_id.clone()));
126        }
127
128        // Build default request
129        let mut url = self.host.clone();
130        url.set_path(path);
131
132        self.client
133            .request(method, url)
134            .header("Content-Type", "text/plain")
135            .header("Authorization", format!("{} {}", "Token", self.token))
136            .query(&query_params)
137    }
138
139    fn status_to_influxerror(status: u16, err: Box<dyn Error>) -> InfluxError {
140        match status {
141            400 => InfluxError::InvalidSyntax(err.to_string()),
142            401 => InfluxError::InvalidCredentials(err.to_string()),
143            403 => InfluxError::Forbidden(err.to_string()),
144            _ => InfluxError::Unknown(err.to_string()),
145        }
146    }
147}