irox_influxdb_v1/
lib.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2023 IROX Contributors
3
4#![forbid(unsafe_code)]
5
6use std::collections::BTreeMap;
7use std::io::Read;
8
9use log::{debug, error, trace};
10use url::Url;
11
12use error::{Error, ErrorType};
13use irox_csv::{Row, UNIX_DIALECT};
14use irox_networking::http::HttpProtocol;
15use types::RetentionPolicy;
16
17use crate::types::MeasurementDescriptor;
18
19pub mod error;
20pub mod types;
21
22#[derive(Debug, Copy, Clone, Default)]
23pub enum EncodingType {
24    #[default]
25    JSON,
26
27    CSV,
28}
29impl EncodingType {
30    #[must_use]
31    pub const fn accept_header(&self) -> &'static str {
32        match self {
33            EncodingType::JSON => "application/json",
34            EncodingType::CSV => "application/csv",
35        }
36    }
37}
38
39#[derive(Debug, Clone, Eq, PartialEq)]
40pub struct InfluxDBConnectionParams {
41    pub(crate) host: String,
42    pub(crate) port: u16,
43    pub(crate) scheme: HttpProtocol,
44}
45
46impl Default for InfluxDBConnectionParams {
47    fn default() -> Self {
48        InfluxDBConnectionParams {
49            host: String::from("localhost"),
50            port: 8086,
51            scheme: HttpProtocol::HTTP,
52        }
53    }
54}
55
56impl InfluxDBConnectionParams {
57    pub fn open(&self) -> Result<InfluxDB, Error> {
58        let base_url_str = format!("{}://{}:{}", self.scheme.name(), self.host, self.port);
59        let base_url = Url::parse(&base_url_str)?;
60        Self::open_url(base_url)
61    }
62
63    pub fn open_url<T: AsRef<str>>(base_url_str: T) -> Result<InfluxDB, Error> {
64        let base_url = Url::parse(base_url_str.as_ref())?;
65        let agent = ureq::AgentBuilder::new()
66            .max_idle_connections(100)
67            .max_idle_connections_per_host(200)
68            .redirect_auth_headers(ureq::RedirectAuthHeaders::SameHost)
69            .no_delay(true)
70            .build();
71        Ok(InfluxDB { agent, base_url })
72    }
73}
74
75#[derive(Default)]
76pub struct InfluxConnectionBuilder {
77    host: Option<String>,
78    port: Option<u16>,
79    scheme: Option<HttpProtocol>,
80}
81
82impl InfluxConnectionBuilder {
83    #[must_use]
84    pub fn with_host<T: Into<String>>(mut self, host: T) -> Self {
85        self.host = Some(host.into());
86        self
87    }
88    #[must_use]
89    pub fn maybe_host(mut self, host: Option<String>) -> Self {
90        self.host = host;
91        self
92    }
93
94    #[must_use]
95    pub fn with_port<T: Into<u16>>(mut self, port: T) -> Self {
96        self.port = Some(port.into());
97        self
98    }
99
100    #[must_use]
101    pub fn maybe_port(mut self, port: Option<u16>) -> Self {
102        self.port = port;
103        self
104    }
105
106    #[must_use]
107    pub fn with_scheme(mut self, scheme: HttpProtocol) -> Self {
108        self.scheme = Some(scheme);
109        self
110    }
111
112    #[must_use]
113    pub fn maybe_scheme(mut self, scheme: Option<HttpProtocol>) -> Self {
114        self.scheme = scheme;
115        self
116    }
117
118    pub fn build(self) -> Result<InfluxDB, Error> {
119        let mut params = InfluxDBConnectionParams::default();
120        if let Some(host) = self.host {
121            params.host = host;
122        }
123        if let Some(port) = self.port {
124            params.port = port;
125        }
126        if let Some(scheme) = self.scheme {
127            params.scheme = scheme;
128        }
129
130        params.open()
131    }
132}
133
134#[derive(Clone)]
135pub struct InfluxDB {
136    agent: ureq::Agent,
137    base_url: Url,
138}
139
140pub type OwnedReader = Box<dyn Read + Send + Sync + 'static>;
141
142impl InfluxDB {
143    pub fn open(params: &InfluxDBConnectionParams) -> Result<InfluxDB, Error> {
144        params.open()
145    }
146
147    pub fn open_default() -> Result<InfluxDB, Error> {
148        InfluxDBConnectionParams::default().open()
149    }
150
151    pub fn ping(&self) -> Result<(), Error> {
152        let mut url = self.base_url.clone();
153        url.set_path("ping");
154        let req = self.agent.request_url("GET", &url);
155
156        let resp = req.call()?;
157        let status = resp.status();
158        match status {
159            200 | 204 => Ok(()),
160            _ => Error::err(ErrorType::RequestErrorCode(status), "Bad Ping Response"),
161        }
162    }
163
164    pub fn query_json<T: AsRef<str>>(
165        &self,
166        query: T,
167        db: Option<String>,
168    ) -> Result<OwnedReader, Error> {
169        self.query(query, EncodingType::JSON, db)
170    }
171
172    pub fn query_csv<T: AsRef<str>>(
173        &self,
174        query: T,
175        db: Option<String>,
176    ) -> Result<OwnedReader, Error> {
177        self.query(query, EncodingType::CSV, db)
178    }
179
180    pub fn query_data<T: AsRef<str>>(
181        &self,
182        query: T,
183        encoding: EncodingType,
184        db: Option<String>,
185    ) -> Result<Vec<u8>, Error> {
186        let mut reader = self.query(query, encoding, db)?;
187        let mut buf = Vec::new();
188        reader.read_to_end(&mut buf)?;
189        Ok(buf)
190    }
191
192    pub fn query_string<T: AsRef<str>>(
193        &self,
194        query: T,
195        encoding: EncodingType,
196        db: Option<String>,
197    ) -> Result<String, Error> {
198        let data = self.query_data(query, encoding, db)?;
199        Ok(String::from_utf8_lossy(&data).to_string())
200    }
201
202    pub fn query<T: AsRef<str>>(
203        &self,
204        query: T,
205        encoding: EncodingType,
206        db: Option<String>,
207    ) -> Result<OwnedReader, Error> {
208        let mut url = self.base_url.clone();
209        url.set_path("query");
210        if let Some(db) = db {
211            url.set_query(Some(format!("db={db}").as_str()));
212        }
213        let resp = self
214            .agent
215            .request_url("POST", &url)
216            .set("Accept", encoding.accept_header())
217            .send_form(&[("q", query.as_ref())])?;
218
219        let status = resp.status();
220        if status != 200 {
221            return Error::err(ErrorType::RequestErrorCode(status), "Query error");
222        }
223        Ok(resp.into_reader())
224    }
225
226    pub fn list_databases(&self) -> Result<Vec<String>, Error> {
227        let res = self.query_csv("SHOW DATABASES", None)?;
228        let mut out: Vec<String> = Vec::new();
229        irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?.for_each(|row| {
230            trace!("{row:?}");
231            let row = row.into_map_lossy();
232            trace!("{row:?}");
233            if let Some(name) = row.get("name") {
234                out.push(name.clone());
235            }
236        })?;
237        Ok(out)
238    }
239
240    pub fn show_retention_policies(
241        &self,
242        db: Option<String>,
243    ) -> Result<Vec<RetentionPolicy>, Error> {
244        let res = match db {
245            Some(db) => self.query_csv(format!("SHOW RETENTION POLICIES ON {db}"), None),
246            None => self.query_csv("SHOW RETENTION POLICIES", None),
247        }?;
248        let mut out: Vec<RetentionPolicy> = Vec::new();
249        irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?.for_each(|row| {
250            match TryInto::<RetentionPolicy>::try_into(row.into_map_lossy()) {
251                Ok(r) => out.push(r),
252                Err(e) => error!("Error converting map into Retention: {e:?}"),
253            };
254        })?;
255
256        Ok(out)
257    }
258
259    pub fn show_tag_keys(&self, db: Option<String>) -> Result<(), Error> {
260        let res = match db {
261            Some(db) => self.query_csv(format!("SHOW TAG KEYS ON {db}"), None),
262            None => self.query_csv("SHOW TAG KEYS", None),
263        }?;
264        irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?.for_each(|row| {
265            debug!("{:?}", row.into_map_lossy());
266        })?;
267        Ok(())
268    }
269
270    fn update_descriptor_map<
271        T: FnOnce(&mut MeasurementDescriptor, &BTreeMap<String, String>) -> Result<(), Error>,
272    >(
273        data: &mut BTreeMap<String, MeasurementDescriptor>,
274        row: Row,
275        func: T,
276    ) -> Result<(), Error> {
277        let row_map = row.into_map_lossy();
278        let Some(name) = row_map.get("name") else {
279            return Error::err(
280                ErrorType::MissingKeyError("name".to_string()),
281                "Missing key name",
282            );
283        };
284        if !data.contains_key(name) {
285            data.insert(
286                name.to_string(),
287                MeasurementDescriptor::new(name.to_string()),
288            );
289        }
290        let Some(meas) = data.get_mut(name) else {
291            return Error::err(ErrorType::NameKeyMismatch, "Missing name in map?");
292        };
293        func(meas, &row_map)
294    }
295
296    pub fn get_descriptors(
297        &self,
298        db: &Option<String>,
299    ) -> Result<Vec<MeasurementDescriptor>, Error> {
300        let mut data: BTreeMap<String, MeasurementDescriptor> = BTreeMap::new();
301
302        let res = match &db {
303            Some(db) => self.query_csv(format!("SHOW TAG KEYS ON {db}"), None),
304            None => self.query_csv("SHOW TAG KEYS", None),
305        }?;
306        let mut reader = irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?;
307        while let Some(row) = reader.next_row()? {
308            Self::update_descriptor_map(&mut data, row, |meas, row_map| {
309                meas.merge_tag_key_map(row_map)
310            })?;
311        }
312
313        let res = match &db {
314            Some(db) => self.query_csv(format!("SHOW FIELD KEYS ON {db}"), None),
315            None => self.query_csv("SHOW FIELD KEYS", None),
316        }?;
317        let mut reader = irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?;
318        while let Some(row) = reader.next_row()? {
319            Self::update_descriptor_map(&mut data, row, |meas, row_map| {
320                meas.merge_field_key_map(row_map)
321            })?;
322        }
323
324        Ok(data.into_values().collect())
325    }
326}