1#![forbid(unsafe_code)]
5
6use std::collections::BTreeMap;
7use std::io::Read;
8
9use log::{debug, error, trace};
10use url::Url;
11
12use error::{Error, ErrorType};
13use irox_csv::{Row, UNIX_DIALECT};
14use irox_networking::http::HttpProtocol;
15use types::RetentionPolicy;
16
17use crate::types::MeasurementDescriptor;
18
19pub mod error;
20pub mod types;
21
22#[derive(Debug, Copy, Clone, Default)]
23pub enum EncodingType {
24 #[default]
25 JSON,
26
27 CSV,
28}
29impl EncodingType {
30 #[must_use]
31 pub const fn accept_header(&self) -> &'static str {
32 match self {
33 EncodingType::JSON => "application/json",
34 EncodingType::CSV => "application/csv",
35 }
36 }
37}
38
39#[derive(Debug, Clone, Eq, PartialEq)]
40pub struct InfluxDBConnectionParams {
41 pub(crate) host: String,
42 pub(crate) port: u16,
43 pub(crate) scheme: HttpProtocol,
44}
45
46impl Default for InfluxDBConnectionParams {
47 fn default() -> Self {
48 InfluxDBConnectionParams {
49 host: String::from("localhost"),
50 port: 8086,
51 scheme: HttpProtocol::HTTP,
52 }
53 }
54}
55
56impl InfluxDBConnectionParams {
57 pub fn open(&self) -> Result<InfluxDB, Error> {
58 let base_url_str = format!("{}://{}:{}", self.scheme.name(), self.host, self.port);
59 let base_url = Url::parse(&base_url_str)?;
60 Self::open_url(base_url)
61 }
62
63 pub fn open_url<T: AsRef<str>>(base_url_str: T) -> Result<InfluxDB, Error> {
64 let base_url = Url::parse(base_url_str.as_ref())?;
65 let agent = ureq::AgentBuilder::new()
66 .max_idle_connections(100)
67 .max_idle_connections_per_host(200)
68 .redirect_auth_headers(ureq::RedirectAuthHeaders::SameHost)
69 .no_delay(true)
70 .build();
71 Ok(InfluxDB { agent, base_url })
72 }
73}
74
75#[derive(Default)]
76pub struct InfluxConnectionBuilder {
77 host: Option<String>,
78 port: Option<u16>,
79 scheme: Option<HttpProtocol>,
80}
81
82impl InfluxConnectionBuilder {
83 #[must_use]
84 pub fn with_host<T: Into<String>>(mut self, host: T) -> Self {
85 self.host = Some(host.into());
86 self
87 }
88 #[must_use]
89 pub fn maybe_host(mut self, host: Option<String>) -> Self {
90 self.host = host;
91 self
92 }
93
94 #[must_use]
95 pub fn with_port<T: Into<u16>>(mut self, port: T) -> Self {
96 self.port = Some(port.into());
97 self
98 }
99
100 #[must_use]
101 pub fn maybe_port(mut self, port: Option<u16>) -> Self {
102 self.port = port;
103 self
104 }
105
106 #[must_use]
107 pub fn with_scheme(mut self, scheme: HttpProtocol) -> Self {
108 self.scheme = Some(scheme);
109 self
110 }
111
112 #[must_use]
113 pub fn maybe_scheme(mut self, scheme: Option<HttpProtocol>) -> Self {
114 self.scheme = scheme;
115 self
116 }
117
118 pub fn build(self) -> Result<InfluxDB, Error> {
119 let mut params = InfluxDBConnectionParams::default();
120 if let Some(host) = self.host {
121 params.host = host;
122 }
123 if let Some(port) = self.port {
124 params.port = port;
125 }
126 if let Some(scheme) = self.scheme {
127 params.scheme = scheme;
128 }
129
130 params.open()
131 }
132}
133
134#[derive(Clone)]
135pub struct InfluxDB {
136 agent: ureq::Agent,
137 base_url: Url,
138}
139
140pub type OwnedReader = Box<dyn Read + Send + Sync + 'static>;
141
142impl InfluxDB {
143 pub fn open(params: &InfluxDBConnectionParams) -> Result<InfluxDB, Error> {
144 params.open()
145 }
146
147 pub fn open_default() -> Result<InfluxDB, Error> {
148 InfluxDBConnectionParams::default().open()
149 }
150
151 pub fn ping(&self) -> Result<(), Error> {
152 let mut url = self.base_url.clone();
153 url.set_path("ping");
154 let req = self.agent.request_url("GET", &url);
155
156 let resp = req.call()?;
157 let status = resp.status();
158 match status {
159 200 | 204 => Ok(()),
160 _ => Error::err(ErrorType::RequestErrorCode(status), "Bad Ping Response"),
161 }
162 }
163
164 pub fn query_json<T: AsRef<str>>(
165 &self,
166 query: T,
167 db: Option<String>,
168 ) -> Result<OwnedReader, Error> {
169 self.query(query, EncodingType::JSON, db)
170 }
171
172 pub fn query_csv<T: AsRef<str>>(
173 &self,
174 query: T,
175 db: Option<String>,
176 ) -> Result<OwnedReader, Error> {
177 self.query(query, EncodingType::CSV, db)
178 }
179
180 pub fn query_data<T: AsRef<str>>(
181 &self,
182 query: T,
183 encoding: EncodingType,
184 db: Option<String>,
185 ) -> Result<Vec<u8>, Error> {
186 let mut reader = self.query(query, encoding, db)?;
187 let mut buf = Vec::new();
188 reader.read_to_end(&mut buf)?;
189 Ok(buf)
190 }
191
192 pub fn query_string<T: AsRef<str>>(
193 &self,
194 query: T,
195 encoding: EncodingType,
196 db: Option<String>,
197 ) -> Result<String, Error> {
198 let data = self.query_data(query, encoding, db)?;
199 Ok(String::from_utf8_lossy(&data).to_string())
200 }
201
202 pub fn query<T: AsRef<str>>(
203 &self,
204 query: T,
205 encoding: EncodingType,
206 db: Option<String>,
207 ) -> Result<OwnedReader, Error> {
208 let mut url = self.base_url.clone();
209 url.set_path("query");
210 if let Some(db) = db {
211 url.set_query(Some(format!("db={db}").as_str()));
212 }
213 let resp = self
214 .agent
215 .request_url("POST", &url)
216 .set("Accept", encoding.accept_header())
217 .send_form(&[("q", query.as_ref())])?;
218
219 let status = resp.status();
220 if status != 200 {
221 return Error::err(ErrorType::RequestErrorCode(status), "Query error");
222 }
223 Ok(resp.into_reader())
224 }
225
226 pub fn list_databases(&self) -> Result<Vec<String>, Error> {
227 let res = self.query_csv("SHOW DATABASES", None)?;
228 let mut out: Vec<String> = Vec::new();
229 irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?.for_each(|row| {
230 trace!("{row:?}");
231 let row = row.into_map_lossy();
232 trace!("{row:?}");
233 if let Some(name) = row.get("name") {
234 out.push(name.clone());
235 }
236 })?;
237 Ok(out)
238 }
239
240 pub fn show_retention_policies(
241 &self,
242 db: Option<String>,
243 ) -> Result<Vec<RetentionPolicy>, Error> {
244 let res = match db {
245 Some(db) => self.query_csv(format!("SHOW RETENTION POLICIES ON {db}"), None),
246 None => self.query_csv("SHOW RETENTION POLICIES", None),
247 }?;
248 let mut out: Vec<RetentionPolicy> = Vec::new();
249 irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?.for_each(|row| {
250 match TryInto::<RetentionPolicy>::try_into(row.into_map_lossy()) {
251 Ok(r) => out.push(r),
252 Err(e) => error!("Error converting map into Retention: {e:?}"),
253 };
254 })?;
255
256 Ok(out)
257 }
258
259 pub fn show_tag_keys(&self, db: Option<String>) -> Result<(), Error> {
260 let res = match db {
261 Some(db) => self.query_csv(format!("SHOW TAG KEYS ON {db}"), None),
262 None => self.query_csv("SHOW TAG KEYS", None),
263 }?;
264 irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?.for_each(|row| {
265 debug!("{:?}", row.into_map_lossy());
266 })?;
267 Ok(())
268 }
269
270 fn update_descriptor_map<
271 T: FnOnce(&mut MeasurementDescriptor, &BTreeMap<String, String>) -> Result<(), Error>,
272 >(
273 data: &mut BTreeMap<String, MeasurementDescriptor>,
274 row: Row,
275 func: T,
276 ) -> Result<(), Error> {
277 let row_map = row.into_map_lossy();
278 let Some(name) = row_map.get("name") else {
279 return Error::err(
280 ErrorType::MissingKeyError("name".to_string()),
281 "Missing key name",
282 );
283 };
284 if !data.contains_key(name) {
285 data.insert(
286 name.to_string(),
287 MeasurementDescriptor::new(name.to_string()),
288 );
289 }
290 let Some(meas) = data.get_mut(name) else {
291 return Error::err(ErrorType::NameKeyMismatch, "Missing name in map?");
292 };
293 func(meas, &row_map)
294 }
295
296 pub fn get_descriptors(
297 &self,
298 db: &Option<String>,
299 ) -> Result<Vec<MeasurementDescriptor>, Error> {
300 let mut data: BTreeMap<String, MeasurementDescriptor> = BTreeMap::new();
301
302 let res = match &db {
303 Some(db) => self.query_csv(format!("SHOW TAG KEYS ON {db}"), None),
304 None => self.query_csv("SHOW TAG KEYS", None),
305 }?;
306 let mut reader = irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?;
307 while let Some(row) = reader.next_row()? {
308 Self::update_descriptor_map(&mut data, row, |meas, row_map| {
309 meas.merge_tag_key_map(row_map)
310 })?;
311 }
312
313 let res = match &db {
314 Some(db) => self.query_csv(format!("SHOW FIELD KEYS ON {db}"), None),
315 None => self.query_csv("SHOW FIELD KEYS", None),
316 }?;
317 let mut reader = irox_csv::CSVMapReader::dialect(res, UNIX_DIALECT)?;
318 while let Some(row) = reader.next_row()? {
319 Self::update_descriptor_map(&mut data, row, |meas, row_map| {
320 meas.merge_field_key_map(row_map)
321 })?;
322 }
323
324 Ok(data.into_values().collect())
325 }
326}