1use std::{
16 convert::{TryFrom, TryInto},
17 future::Future,
18 str::FromStr,
19 time::Duration,
20};
21
22use async_trait::async_trait;
23use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
24use influxdb::{
25 Client, ReadQuery as InfluxRQuery, Timestamp as InfluxTimestamp, WriteQuery as InfluxWQuery,
26};
27use serde::Deserialize;
28use tracing::{debug, error, warn};
29use uuid::Uuid;
30use zenoh::{
31 bytes::{Encoding, ZBytes},
32 internal::{bail, buffers::ZBuf, zerror},
33 key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
34 query::{Parameters, TimeBound, TimeExpr, TimeRange, ZenohParameters},
35 time::Timestamp,
36 try_init_log_from_env, Error, Result as ZResult,
37};
38use zenoh_backend_traits::{
39 config::{PrivacyGetResult, PrivacyTransparentGet, StorageConfig, VolumeConfig},
40 StorageInsertionResult, *,
41};
42use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};
43
44const WORKER_THREAD_NUM: usize = 2;
45const MAX_BLOCK_THREAD_NUM: usize = 50;
46lazy_static::lazy_static! {
47 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
49 .worker_threads(WORKER_THREAD_NUM)
50 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
51 .enable_all()
52 .build()
53 .expect("Unable to create runtime");
54}
55
56#[inline(always)]
57fn blockon_runtime<F: Future>(task: F) -> F::Output {
58 match tokio::runtime::Handle::try_current() {
60 Ok(rt) => {
61 tokio::task::block_in_place(|| rt.block_on(task))
63 }
64 Err(_) => {
65 tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
67 }
68 }
69}
70
71pub const PROP_BACKEND_URL: &str = "url";
73pub const PROP_BACKEND_USERNAME: &str = "username";
74pub const PROP_BACKEND_PASSWORD: &str = "password";
75
76pub const PROP_STORAGE_DB: &str = "db";
78pub const PROP_STORAGE_CREATE_DB: &str = "create_db";
79pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
80pub const PROP_STORAGE_USERNAME: &str = PROP_BACKEND_USERNAME;
81pub const PROP_STORAGE_PASSWORD: &str = PROP_BACKEND_PASSWORD;
82
83pub const NONE_KEY: &str = "@@none_key@@";
85
86const DROP_MEASUREMENT_TIMEOUT_MS: u64 = 5000;
88
89lazy_static::lazy_static!(
90 static ref INFLUX_REGEX_ALL: String = key_exprs_to_influx_regex(&["**".try_into().unwrap()]);
91);
92
93fn get_private_conf<'a>(
94 config: &'a serde_json::Map<String, serde_json::Value>,
95 credit: &str,
96) -> ZResult<Option<&'a String>> {
97 match config.get_private(credit) {
98 PrivacyGetResult::NotFound => Ok(None),
99 PrivacyGetResult::Private(serde_json::Value::String(v)) => Ok(Some(v)),
100 PrivacyGetResult::Public(serde_json::Value::String(v)) => {
101 tracing::warn!(
102 r#"Value "{}" is given for `{}` publicly (i.e. is visible by anyone who can fetch the router configuration). You may want to replace `{}: "{}"` with `private: {{{}: "{}"}}`"#,
103 v,
104 credit,
105 credit,
106 v,
107 credit,
108 v
109 );
110 Ok(Some(v))
111 }
112 PrivacyGetResult::Both {
113 public: serde_json::Value::String(public),
114 private: serde_json::Value::String(private),
115 } => {
116 tracing::warn!(
117 r#"Value "{}" is given for `{}` publicly, but a private value also exists. The private value will be used, but the public value, which is {} the same as the private one, will still be visible in configurations."#,
118 public,
119 credit,
120 if public == private { "" } else { "not " }
121 );
122 Ok(Some(private))
123 }
124 _ => {
125 bail!("Optional property `{}` must be a string", credit)
126 }
127 }
128}
129
130pub struct InfluxDbBackend {}
131
132#[cfg(feature = "dynamic_plugin")]
133zenoh_plugin_trait::declare_plugin!(InfluxDbBackend);
134
135impl Plugin for InfluxDbBackend {
136 type StartArgs = VolumeConfig;
137 type Instance = VolumeInstance;
138
139 const DEFAULT_NAME: &'static str = "influxdb_backend";
140 const PLUGIN_VERSION: &'static str = plugin_version!();
141 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
142
143 fn start(_name: &str, config: &Self::StartArgs) -> ZResult<Self::Instance> {
144 try_init_log_from_env();
145
146 debug!("InfluxDB backend {}", Self::PLUGIN_VERSION);
147
148 let mut config = config.clone();
149 config
150 .rest
151 .insert("version".into(), Self::PLUGIN_VERSION.into());
152
153 let url = match config.rest.get(PROP_BACKEND_URL) {
154 Some(serde_json::Value::String(url)) => url.clone(),
155 _ => {
156 bail!(
157 "Mandatory property `{}` for InfluxDb Backend must be a string",
158 PROP_BACKEND_URL
159 )
160 }
161 };
162
163 let mut admin_client = Client::new(url, "");
165
166 let credentials = match (
168 get_private_conf(&config.rest, PROP_BACKEND_USERNAME)?,
169 get_private_conf(&config.rest, PROP_BACKEND_PASSWORD)?,
170 ) {
171 (Some(username), Some(password)) => {
172 admin_client = admin_client.with_auth(username, password);
173 Some((username.clone(), password.clone()))
174 }
175 (None, None) => None,
176 _ => {
177 bail!(
178 "Optional properties `{}` and `{}` must coexist",
179 PROP_BACKEND_USERNAME,
180 PROP_BACKEND_PASSWORD
181 )
182 }
183 };
184
185 match blockon_runtime(async { show_databases(&admin_client).await }) {
187 Ok(dbs) => {
188 if !dbs.iter().any(|e| e == "_internal") {
190 warn!("The InfluxDB credentials are not for an admin user; the volume won't be able to create or drop any database")
191 }
192 }
193 Err(e) => bail!("Failed to create InfluxDb Volume : {}", e),
194 }
195
196 Ok(Box::new(InfluxDbVolume {
197 admin_status: config,
198 admin_client,
199 credentials,
200 }))
201 }
202}
203
204pub struct InfluxDbVolume {
205 admin_status: VolumeConfig,
206 admin_client: Client,
207 credentials: Option<(String, String)>,
208}
209
210#[async_trait]
211impl Volume for InfluxDbVolume {
212 fn get_admin_status(&self) -> serde_json::Value {
213 self.admin_status.to_json_value()
214 }
215
216 fn get_capability(&self) -> Capability {
217 Capability {
218 persistence: Persistence::Durable,
219 history: History::All,
220 }
221 }
222
223 async fn create_storage(&self, mut config: StorageConfig) -> ZResult<Box<dyn Storage>> {
224 let volume_cfg = match config.volume_cfg.as_object() {
225 Some(v) => v,
226 None => bail!("InfluxDB backed storages need some volume-specific configuration"),
227 };
228 let on_closure = match volume_cfg.get(PROP_STORAGE_ON_CLOSURE) {
229 Some(serde_json::Value::String(x)) if x == "drop_series" => OnClosure::DropSeries,
230 Some(serde_json::Value::String(x)) if x == "drop_db" => OnClosure::DropDb,
231 Some(serde_json::Value::String(x)) if x == "do_nothing" => OnClosure::DoNothing,
232 None => OnClosure::DoNothing,
233 Some(_) => {
234 bail!(
235 r#"`{}` property of storage `{}` must be one of "do_nothing" (default), "drop_db" and "drop_series""#,
236 PROP_STORAGE_ON_CLOSURE,
237 &config.name
238 )
239 }
240 };
241 let (db, createdb) = match volume_cfg.get(PROP_STORAGE_DB) {
242 Some(serde_json::Value::String(s)) => (
243 s.clone(),
244 match volume_cfg.get(PROP_STORAGE_CREATE_DB) {
245 None | Some(serde_json::Value::Bool(false)) => false,
246 Some(serde_json::Value::Bool(true)) => true,
247 Some(_) => todo!(),
248 },
249 ),
250 None => (generate_db_name(), true),
251 Some(v) => bail!("Invalid value for ${PROP_STORAGE_DB} config property: ${v}"),
252 };
253
254 let mut client = Client::new(self.admin_client.database_url(), &db);
257
258 let storage_username = match (
260 get_private_conf(volume_cfg, PROP_STORAGE_USERNAME)?,
261 get_private_conf(volume_cfg, PROP_STORAGE_PASSWORD)?,
262 ) {
263 (Some(username), Some(password)) => {
264 client = client.with_auth(username, password);
265 Some(username.clone())
266 }
267 (None, None) => None,
268 _ => {
269 bail!(
270 "Optional properties `{}` and `{}` must coexist",
271 PROP_STORAGE_USERNAME,
272 PROP_STORAGE_PASSWORD
273 )
274 }
275 };
276
277 if !is_db_existing(&client, &db).await? {
279 if createdb {
280 create_db(&self.admin_client, &db, storage_username).await?;
282 } else {
283 bail!("Database '{}' doesn't exist in InfluxDb", db)
284 }
285 }
286
287 config
289 .volume_cfg
290 .as_object_mut()
291 .unwrap()
292 .entry(PROP_STORAGE_DB)
293 .or_insert(db.clone().into());
294
295 let mut admin_client = Client::new(self.admin_client.database_url(), db);
297 if let Some((username, password)) = &self.credentials {
298 admin_client = admin_client.with_auth(username, password);
299 }
300
301 Ok(Box::new(InfluxDbStorage {
302 config,
303 admin_client,
304 client,
305 on_closure,
306 }))
307 }
308}
309
310enum OnClosure {
311 DropDb,
312 DropSeries,
313 DoNothing,
314}
315
316impl TryFrom<&Parameters<'_>> for OnClosure {
317 type Error = Error;
318 fn try_from(p: &Parameters) -> ZResult<OnClosure> {
319 match p.get(PROP_STORAGE_ON_CLOSURE) {
320 Some(s) => {
321 if s == "drop_db" {
322 Ok(OnClosure::DropDb)
323 } else if s == "drop_series" {
324 Ok(OnClosure::DropSeries)
325 } else {
326 bail!("Unsupported value for 'on_closure' property: {}", s)
327 }
328 }
329 None => Ok(OnClosure::DoNothing),
330 }
331 }
332}
333
334struct InfluxDbStorage {
335 config: StorageConfig,
336 admin_client: Client,
337 client: Client,
338 on_closure: OnClosure,
339}
340
341impl InfluxDbStorage {
342 async fn get_deletion_timestamp(&self, measurement: &str) -> ZResult<Option<Timestamp>> {
343 #[derive(Deserialize, Debug, PartialEq)]
344 struct QueryResult {
345 timestamp: String,
346 }
347
348 let query = InfluxRQuery::new(format!(
349 r#"SELECT "timestamp" FROM "{measurement}" WHERE kind='DEL' ORDER BY time DESC LIMIT 1"#
350 ));
351 match self.client.json_query(query).await {
352 Ok(mut result) => match result.deserialize_next::<QueryResult>() {
353 Ok(qr) => {
354 if !qr.series.is_empty() && !qr.series[0].values.is_empty() {
355 let ts = qr.series[0].values[0]
356 .timestamp
357 .parse::<Timestamp>()
358 .map_err(|err| {
359 zerror!(
360 "Failed to parse the latest timestamp for deletion of measurement {} : {}",
361 measurement, err.cause)
362 })?;
363 Ok(Some(ts))
364 } else {
365 Ok(None)
366 }
367 }
368 Err(err) => bail!(
369 "Failed to get latest timestamp for deletion of measurement {} : {}",
370 measurement,
371 err
372 ),
373 },
374 Err(err) => bail!(
375 "Failed to get latest timestamp for deletion of measurement {} : {}",
376 measurement,
377 err
378 ),
379 }
380 }
381
382 async fn schedule_measurement_drop(&self, measurement: &str) {
383 let m_string = measurement.to_string();
384 let cloned_client = self.client.clone();
385
386 let async_drop = async {
390 tokio::time::sleep(Duration::from_millis(DROP_MEASUREMENT_TIMEOUT_MS)).await;
391 drop_measurement(m_string, cloned_client).await;
392 };
393
394 match tokio::runtime::Handle::try_current() {
395 Ok(handle) => handle.spawn(async_drop),
396 Err(_) => TOKIO_RUNTIME.spawn(async_drop),
397 };
398 }
399
400 fn keyexpr_from_serie(&self, serie_name: &str) -> ZResult<Option<OwnedKeyExpr>> {
401 if serie_name.eq(NONE_KEY) {
402 Ok(None)
403 } else {
404 match OwnedKeyExpr::from_str(serie_name) {
405 Ok(key) => Ok(Some(key)),
406 Err(e) => Err(format!("{}", e).into()),
407 }
408 }
409 }
410}
411
412#[async_trait]
413impl Storage for InfluxDbStorage {
414 fn get_admin_status(&self) -> serde_json::Value {
415 self.config.to_json_value()
417 }
418
419 async fn put(
420 &mut self,
421 key: Option<OwnedKeyExpr>,
422 payload: ZBytes,
423 encoding: Encoding,
424 timestamp: Timestamp,
425 ) -> ZResult<StorageInsertionResult> {
426 let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
427
428 let influx_time = timestamp.get_time().to_duration().as_nanos();
430
431 if let Some(del_time) = self.get_deletion_timestamp(measurement.as_str()).await? {
433 if timestamp < del_time {
435 debug!(
436 "Received a value for {:?} with timestamp older than its deletion; ignore it",
437 measurement
438 );
439 return Ok(StorageInsertionResult::Outdated);
440 }
441 }
442
443 let (base64, strvalue) = match payload.try_to_string() {
445 Ok(s) => (false, s),
446 Err(_) => (true, b64_std_engine.encode(payload.to_bytes()).into()),
447 };
448
449 let encoding_string_rep = encoding.to_string(); let query = InfluxWQuery::new(
455 InfluxTimestamp::Nanoseconds(influx_time),
456 measurement.clone(),
457 )
458 .add_tag("kind", "PUT")
459 .add_field("timestamp", timestamp.to_string())
460 .add_field("encoding_prefix", encoding.id())
461 .add_field("encoding_suffix", encoding_string_rep) .add_field("base64", base64)
463 .add_field("value", strvalue.as_ref());
464
465 debug!("Put {:?} with Influx query: {:?}", measurement, query);
466 if let Err(e) = self.client.query(&query).await {
467 bail!(
468 "Failed to put Value for {:?} in InfluxDb storage : {}",
469 measurement,
470 e
471 )
472 } else {
473 Ok(StorageInsertionResult::Inserted)
474 }
475 }
476
477 async fn delete(
478 &mut self,
479 key: Option<OwnedKeyExpr>,
480 timestamp: Timestamp,
481 ) -> ZResult<StorageInsertionResult> {
482 let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
483
484 let influx_time = timestamp.get_time().to_duration().as_nanos();
486
487 let query = InfluxRQuery::new(format!(
490 r#"DELETE FROM "{}" WHERE time < {}"#,
491 measurement, influx_time
492 ));
493 debug!("Delete {:?} with Influx query: {:?}", measurement, query);
494 if let Err(e) = self.client.query(&query).await {
495 bail!(
496 "Failed to delete points for measurement '{}' from InfluxDb storage : {}",
497 measurement,
498 e
499 )
500 }
501 let query = InfluxWQuery::new(
503 InfluxTimestamp::Nanoseconds(influx_time),
504 measurement.clone(),
505 )
506 .add_tag("kind", "DEL")
507 .add_field("timestamp", timestamp.to_string())
508 .add_field("encoding_prefix", 0_u8)
509 .add_field("encoding_suffix", "")
510 .add_field("base64", false)
511 .add_field("value", "");
512 debug!(
513 "Mark measurement {} as deleted at time {}",
514 measurement, influx_time
515 );
516 if let Err(e) = self.client.query(&query).await {
517 bail!(
518 "Failed to mark measurement {:?} as deleted : {}",
519 measurement,
520 e
521 )
522 }
523 self.schedule_measurement_drop(measurement.as_str()).await;
525 Ok(StorageInsertionResult::Deleted)
526 }
527
528 async fn get(
529 &mut self,
530 key: Option<OwnedKeyExpr>,
531 parameters: &str,
532 ) -> ZResult<Vec<StoredData>> {
533 let measurement = match key {
534 Some(k) => k,
535 None => OwnedKeyExpr::from_str(NONE_KEY).unwrap(),
536 };
537 let regex = key_exprs_to_influx_regex(&[&KeyExpr::from(measurement)]);
539
540 let clauses = clauses_from_parameters(parameters)?;
542
543 let influx_query_str = format!("SELECT * FROM {regex} {clauses}");
545 let influx_query = InfluxRQuery::new(&influx_query_str);
546
547 #[derive(Deserialize, Debug)]
549 struct ZenohPoint {
550 #[allow(dead_code)]
551 kind: String,
553 timestamp: String,
554 encoding_prefix: u8,
555 encoding_suffix: String,
556 base64: bool,
557 value: String,
558 }
559
560 let mut result = Vec::new();
561 match self.client.json_query(influx_query).await {
562 Ok(mut query_result) => {
563 while !query_result.results.is_empty() {
564 match query_result.deserialize_next::<ZenohPoint>() {
565 Ok(retn) => {
566 for serie in retn.series {
568 let ke = match self.keyexpr_from_serie(&serie.name) {
570 Ok(k) => k,
571 Err(e) => {
572 error!(
573 "Error replying with serie '{}' : {}",
574 serie.name, e
575 );
576 continue;
577 }
578 };
579 debug!("Replying {} values for {:?}", serie.values.len(), ke);
580 for zpoint in serie.values {
582 let encoding = if zpoint.encoding_suffix.is_empty() {
585 Encoding::new(zpoint.encoding_prefix.into(), None)
586 } else {
587 Encoding::from(zpoint.encoding_suffix)
588 };
589 let payload = if zpoint.base64 {
591 match b64_std_engine.decode(zpoint.value) {
592 Ok(v) => ZBuf::from(v),
593 Err(e) => {
594 warn!(
595 r#"Failed to decode zenoh base64 Value from Influx point {} with timestamp="{}": {}"#,
596 serie.name, zpoint.timestamp, e
597 );
598 continue;
599 }
600 }
601 } else {
602 ZBuf::from(zpoint.value.into_bytes())
603 };
604 let timestamp = match Timestamp::from_str(&zpoint.timestamp) {
606 Ok(t) => t,
607 Err(e) => {
608 warn!(
609 r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
610 serie.name, zpoint.timestamp, e
611 );
612 continue;
613 }
614 };
615 result.push(StoredData {
616 payload: payload.into(),
617 encoding,
618 timestamp,
619 });
620 }
621 }
622 }
623 Err(e) => {
624 bail!(
625 "Failed to parse result of InfluxDB query '{}': {}",
626 influx_query_str,
627 e
628 )
629 }
630 }
631 }
632 }
633 Err(e) => bail!(
634 "Failed to query InfluxDb with '{}' : {}",
635 influx_query_str,
636 e
637 ),
638 }
639 Ok(result)
640 }
641
642 async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
643 let mut result = Vec::new();
644
645 let influx_query_str = format!(
647 "SELECT * FROM {} ORDER BY time DESC LIMIT 1",
648 *INFLUX_REGEX_ALL
649 );
650 let influx_query = InfluxRQuery::new(&influx_query_str);
651
652 #[derive(Deserialize, Debug)]
654 struct ZenohPoint {
655 #[allow(dead_code)]
656 kind: String,
658 timestamp: String,
659 }
660 debug!("Get all entries with Influx query: {}", influx_query_str);
661 match self.client.json_query(influx_query).await {
662 Ok(mut query_result) => {
663 while !query_result.results.is_empty() {
664 match query_result.deserialize_next::<ZenohPoint>() {
665 Ok(retn) => {
666 for serie in retn.series {
668 match self.keyexpr_from_serie(&serie.name) {
670 Ok(ke) => {
671 debug!(
672 "Replying {} values for {:?}",
673 serie.values.len(),
674 ke
675 );
676 for zpoint in serie.values {
678 match Timestamp::from_str(&zpoint.timestamp) {
680 Ok(timestamp) => {
681 result.push((ke.clone(), timestamp))
682 }
683 Err(e) => warn!(
684 r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
685 serie.name, zpoint.timestamp, e
686 ),
687 };
688 }
689 }
690 Err(e) => {
691 error!("Error replying with serie '{}' : {}", serie.name, e)
692 }
693 };
694 }
695 }
696 Err(e) => {
697 bail!(
698 "Failed to parse result of InfluxDB query '{}': {}",
699 influx_query_str,
700 e
701 )
702 }
703 }
704 }
705 Ok(result)
706 }
707 Err(e) => bail!(
708 "Failed to query InfluxDb with '{}' : {}",
709 influx_query_str,
710 e
711 ),
712 }
713 }
714}
715
716impl Drop for InfluxDbStorage {
717 fn drop(&mut self) {
718 debug!("Closing InfluxDB storage");
719 match self.on_closure {
720 OnClosure::DropDb => {
721 blockon_runtime(async move {
722 let db = self.admin_client.database_name();
723 debug!("Close InfluxDB storage, dropping database {}", db);
724 let query = InfluxRQuery::new(format!(r#"DROP DATABASE "{db}""#));
725 if let Err(e) = self.admin_client.query(&query).await {
726 error!("Failed to drop InfluxDb database '{}' : {}", db, e)
727 }
728 });
729 }
730 OnClosure::DropSeries => {
731 blockon_runtime(async move {
732 let db = self.client.database_name();
733 debug!(
734 "Close InfluxDB storage, dropping all series from database {}",
735 db
736 );
737 let query = InfluxRQuery::new("DROP SERIES FROM /.*/");
738 if let Err(e) = self.client.query(&query).await {
739 error!(
740 "Failed to drop all series from InfluxDb database '{}' : {}",
741 db, e
742 )
743 }
744 });
745 }
746 OnClosure::DoNothing => {
747 debug!(
748 "Close InfluxDB storage, keeping database {} as it is",
749 self.client.database_name()
750 );
751 }
752 }
753 }
754}
755
756async fn drop_measurement(measurement: String, client: Client) {
757 #[derive(Deserialize, Debug, PartialEq)]
758 struct QueryResult {
759 kind: String,
760 }
761
762 let query = InfluxRQuery::new(format!(
764 r#"SELECT "kind" FROM "{}" WHERE kind!='DEL' LIMIT 1"#,
765 measurement
766 ));
767 match client.json_query(query).await {
768 Ok(mut result) => {
769 match result.deserialize_next::<QueryResult>() {
770 Ok(qr) => {
771 if !qr.series.is_empty() {
772 debug!("Measurement {} contains new values inserted after deletion; don't drop it", measurement);
773 return;
774 }
775 }
776 Err(e) => {
777 warn!(
778 "Failed to check if measurement '{}' is empty (can't drop it) : {}",
779 measurement, e
780 );
781 }
782 }
783 }
784 Err(e) => {
785 warn!(
786 "Failed to check if measurement '{}' is empty (can't drop it) : {}",
787 measurement, e
788 );
789 return;
790 }
791 }
792
793 let query = InfluxRQuery::new(format!(r#"DROP MEASUREMENT "{}""#, measurement));
795 debug!(
796 "Drop measurement {} after timeout with Influx query: {:?}",
797 measurement, query
798 );
799 if let Err(e) = client.query(&query).await {
800 warn!(
801 "Failed to drop measurement '{}' from InfluxDb storage : {}",
802 measurement, e
803 );
804 }
805}
806
807fn generate_db_name() -> String {
808 format!("zenoh_db_{}", Uuid::new_v4().simple())
809}
810
811async fn show_databases(client: &Client) -> ZResult<Vec<String>> {
812 #[derive(Deserialize)]
813 struct Database {
814 name: String,
815 }
816 let query = InfluxRQuery::new("SHOW DATABASES");
817 debug!("List databases with Influx query: {:?}", query);
818 match client.json_query(query).await {
819 Ok(mut result) => match result.deserialize_next::<Database>() {
820 Ok(dbs) => {
821 let mut result: Vec<String> = Vec::new();
822 for serie in dbs.series {
823 for db in serie.values {
824 result.push(db.name);
825 }
826 }
827 Ok(result)
828 }
829 Err(e) => bail!(
830 "Failed to parse list of existing InfluxDb databases : {}",
831 e
832 ),
833 },
834 Err(e) => bail!("Failed to list existing InfluxDb databases : {}", e),
835 }
836}
837
838async fn is_db_existing(client: &Client, db_name: &str) -> ZResult<bool> {
839 let dbs = show_databases(client).await?;
840 Ok(dbs.iter().any(|e| e == db_name))
841}
842
843async fn create_db(
844 client: &Client,
845 db_name: &str,
846 storage_username: Option<String>,
847) -> ZResult<()> {
848 let query = InfluxRQuery::new(format!(r#"CREATE DATABASE "{db_name}""#));
849 debug!("Create Influx database: {}", db_name);
850 if let Err(e) = client.query(&query).await {
851 bail!(
852 "Failed to create new InfluxDb database '{}' : {}",
853 db_name,
854 e
855 )
856 }
857 debug!("after await: {}", db_name);
858 if let Some(username) = storage_username {
860 let query = InfluxRQuery::new(format!(r#"GRANT ALL ON "{db_name}" TO "{username}""#));
861 debug!(
862 "Grant access to {} on Influx database: {}",
863 username, db_name
864 );
865 if let Err(e) = client.query(&query).await {
866 bail!(
867 "Failed grant access to {} on Influx database '{}' : {}",
868 username,
869 db_name,
870 e
871 )
872 }
873 }
874 Ok(())
875}
876
877fn key_exprs_to_influx_regex(path_exprs: &[&keyexpr]) -> String {
882 let mut result = String::with_capacity(2 * path_exprs[0].len());
883 result.push_str("/^");
884 for (i, path_expr) in path_exprs.iter().enumerate() {
885 if i != 0 {
886 result.push('|');
887 }
888 let mut chars = path_expr.chars().peekable();
889 while let Some(c) = chars.next() {
890 match c {
891 '*' => {
892 if let Some(c2) = chars.peek() {
893 if c2 == &'*' {
894 result.push_str(".*");
895 chars.next();
896 } else {
897 result.push_str(".*")
898 }
899 }
900 }
901 '/' => result.push_str(r"\/"),
902 _ => result.push(c),
903 }
904 }
905 }
906 result.push_str("$/");
907 result
908}
909
910fn clauses_from_parameters(p: &str) -> ZResult<String> {
911 let parameters = Parameters::from(p);
912 let mut result = String::with_capacity(256);
913 result.push_str("WHERE kind!='DEL'");
914
915 let time_range = match parameters.time_range() {
916 Some(time_range) => time_range,
917 None => {
918 result.push_str(" ORDER BY time DESC LIMIT 1");
919 return Ok(result);
920 }
921 };
922 match time_range {
923 Ok(TimeRange { start, end }) => {
924 match start {
925 TimeBound::Inclusive(t) => {
926 result.push_str(" AND time >= ");
927 write_timeexpr(&mut result, t);
928 }
929 TimeBound::Exclusive(t) => {
930 result.push_str(" AND time > ");
931 write_timeexpr(&mut result, t);
932 }
933 TimeBound::Unbounded => {}
934 }
935 match end {
936 TimeBound::Inclusive(t) => {
937 result.push_str(" AND time <= ");
938 write_timeexpr(&mut result, t);
939 }
940 TimeBound::Exclusive(t) => {
941 result.push_str(" AND time < ");
942 write_timeexpr(&mut result, t);
943 }
944 TimeBound::Unbounded => {}
945 }
946 }
947 Err(err) => {
948 warn!("Error In TimeRange parse from String {}", err);
949 result.push_str(" ORDER BY time DESC LIMIT 1");
951 }
952 }
953 Ok(result)
954}
955
956fn write_timeexpr(s: &mut String, t: TimeExpr) {
957 use std::fmt::Write;
958
959 use humantime::format_rfc3339;
960 match t {
961 TimeExpr::Fixed(t) => write!(s, "'{}'", format_rfc3339(t)),
962 TimeExpr::Now { offset_secs } => write!(s, "now(){offset_secs:+}s"),
963 }
964 .unwrap()
965}