1use std::{
16 convert::{TryFrom, TryInto},
17 future::Future,
18 str::FromStr,
19 time::Duration,
20};
21
22use async_trait::async_trait;
23use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
24use influxdb::{
25 Client, ReadQuery as InfluxRQuery, Timestamp as InfluxTimestamp, WriteQuery as InfluxWQuery,
26};
27use serde::Deserialize;
28use tracing::{debug, error, warn};
29use uuid::Uuid;
30use zenoh::{
31 bytes::{Encoding, ZBytes},
32 internal::{bail, buffers::ZBuf, zerror},
33 key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
34 query::{Parameters, TimeBound, TimeExpr, TimeRange, ZenohParameters},
35 time::Timestamp,
36 try_init_log_from_env, Error, Result as ZResult,
37};
38use zenoh_backend_traits::{
39 config::{PrivacyGetResult, PrivacyTransparentGet, StorageConfig, VolumeConfig},
40 StorageInsertionResult, *,
41};
42use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};
43use zenoh_util::ffi::JsonValue;
44
45const WORKER_THREAD_NUM: usize = 2;
46const MAX_BLOCK_THREAD_NUM: usize = 50;
47lazy_static::lazy_static! {
48 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
50 .worker_threads(WORKER_THREAD_NUM)
51 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
52 .enable_all()
53 .build()
54 .expect("Unable to create runtime");
55}
56
57#[inline(always)]
58fn blockon_runtime<F: Future>(task: F) -> F::Output {
59 match tokio::runtime::Handle::try_current() {
61 Ok(rt) => {
62 tokio::task::block_in_place(|| rt.block_on(task))
64 }
65 Err(_) => {
66 tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
68 }
69 }
70}
71
72pub const PROP_BACKEND_URL: &str = "url";
74pub const PROP_BACKEND_USERNAME: &str = "username";
75pub const PROP_BACKEND_PASSWORD: &str = "password";
76
77pub const PROP_STORAGE_DB: &str = "db";
79pub const PROP_STORAGE_CREATE_DB: &str = "create_db";
80pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
81pub const PROP_STORAGE_USERNAME: &str = PROP_BACKEND_USERNAME;
82pub const PROP_STORAGE_PASSWORD: &str = PROP_BACKEND_PASSWORD;
83
84pub const NONE_KEY: &str = "@@none_key@@";
86
87const DROP_MEASUREMENT_TIMEOUT_MS: u64 = 5000;
89
90lazy_static::lazy_static!(
91 static ref INFLUX_REGEX_ALL: String = key_exprs_to_influx_regex(&["**".try_into().unwrap()]);
92);
93
94fn get_private_conf<'a>(
95 config: &'a serde_json::Map<String, serde_json::Value>,
96 credit: &str,
97) -> ZResult<Option<&'a String>> {
98 match config.get_private(credit) {
99 PrivacyGetResult::NotFound => Ok(None),
100 PrivacyGetResult::Private(serde_json::Value::String(v)) => Ok(Some(v)),
101 PrivacyGetResult::Public(serde_json::Value::String(v)) => {
102 tracing::warn!(
103 r#"Value "{}" is given for `{}` publicly (i.e. is visible by anyone who can fetch the router configuration). You may want to replace `{}: "{}"` with `private: {{{}: "{}"}}`"#,
104 v,
105 credit,
106 credit,
107 v,
108 credit,
109 v
110 );
111 Ok(Some(v))
112 }
113 PrivacyGetResult::Both {
114 public: serde_json::Value::String(public),
115 private: serde_json::Value::String(private),
116 } => {
117 tracing::warn!(
118 r#"Value "{}" is given for `{}` publicly, but a private value also exists. The private value will be used, but the public value, which is {} the same as the private one, will still be visible in configurations."#,
119 public,
120 credit,
121 if public == private { "" } else { "not " }
122 );
123 Ok(Some(private))
124 }
125 _ => {
126 bail!("Optional property `{}` must be a string", credit)
127 }
128 }
129}
130
131pub struct InfluxDbBackend {}
132
133#[cfg(feature = "dynamic_plugin")]
134zenoh_plugin_trait::declare_plugin!(InfluxDbBackend);
135
136impl Plugin for InfluxDbBackend {
137 type StartArgs = VolumeConfig;
138 type Instance = VolumeInstance;
139
140 const DEFAULT_NAME: &'static str = "influxdb_backend";
141 const PLUGIN_VERSION: &'static str = plugin_version!();
142 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
143
144 fn start(_name: &str, config: &Self::StartArgs) -> ZResult<Self::Instance> {
145 try_init_log_from_env();
146
147 debug!("InfluxDB backend {}", Self::PLUGIN_VERSION);
148
149 let mut cfg_rest = config.rest.into_serde_map();
150 cfg_rest.insert("version".into(), Self::PLUGIN_VERSION.into());
151
152 let url = match cfg_rest.get(PROP_BACKEND_URL) {
153 Some(serde_json::Value::String(url)) => url.clone(),
154 _ => {
155 bail!(
156 "Mandatory property `{}` for InfluxDb Backend must be a string",
157 PROP_BACKEND_URL
158 )
159 }
160 };
161
162 let mut admin_client = Client::new(url, "");
164
165 let credentials = match (
167 get_private_conf(&cfg_rest, PROP_BACKEND_USERNAME)?,
168 get_private_conf(&cfg_rest, PROP_BACKEND_PASSWORD)?,
169 ) {
170 (Some(username), Some(password)) => {
171 admin_client = admin_client.with_auth(username, password);
172 Some((username.clone(), password.clone()))
173 }
174 (None, None) => None,
175 _ => {
176 bail!(
177 "Optional properties `{}` and `{}` must coexist",
178 PROP_BACKEND_USERNAME,
179 PROP_BACKEND_PASSWORD
180 )
181 }
182 };
183
184 match blockon_runtime(async { show_databases(&admin_client).await }) {
186 Ok(dbs) => {
187 if !dbs.iter().any(|e| e == "_internal") {
189 warn!("The InfluxDB credentials are not for an admin user; the volume won't be able to create or drop any database")
190 }
191 }
192 Err(e) => bail!("Failed to create InfluxDb Volume : {}", e),
193 }
194
195 let mut config = config.clone();
196 config.rest = cfg_rest.into();
197
198 Ok(Box::new(InfluxDbVolume {
199 admin_status: config,
200 admin_client,
201 credentials,
202 }))
203 }
204}
205
206pub struct InfluxDbVolume {
207 admin_status: VolumeConfig,
208 admin_client: Client,
209 credentials: Option<(String, String)>,
210}
211
212#[async_trait]
213impl Volume for InfluxDbVolume {
214 fn get_admin_status(&self) -> JsonValue {
215 self.admin_status.to_json_value().into()
216 }
217
218 fn get_capability(&self) -> Capability {
219 Capability {
220 persistence: Persistence::Durable,
221 history: History::All,
222 }
223 }
224
225 async fn create_storage(&self, mut config: StorageConfig) -> ZResult<Box<dyn Storage>> {
226 let mut cfg = config.volume_cfg.into_serde_value();
227 let volume_cfg = match cfg.as_object_mut() {
228 Some(v) => v,
229 None => bail!("InfluxDB backed storages need some volume-specific configuration"),
230 };
231 let on_closure = match volume_cfg.get(PROP_STORAGE_ON_CLOSURE) {
232 Some(serde_json::Value::String(x)) if x == "drop_series" => OnClosure::DropSeries,
233 Some(serde_json::Value::String(x)) if x == "drop_db" => OnClosure::DropDb,
234 Some(serde_json::Value::String(x)) if x == "do_nothing" => OnClosure::DoNothing,
235 None => OnClosure::DoNothing,
236 Some(_) => {
237 bail!(
238 r#"`{}` property of storage `{}` must be one of "do_nothing" (default), "drop_db" and "drop_series""#,
239 PROP_STORAGE_ON_CLOSURE,
240 &config.name
241 )
242 }
243 };
244 let (db, createdb) = match volume_cfg.get(PROP_STORAGE_DB) {
245 Some(serde_json::Value::String(s)) => (
246 s.clone(),
247 match volume_cfg.get(PROP_STORAGE_CREATE_DB) {
248 None | Some(serde_json::Value::Bool(false)) => false,
249 Some(serde_json::Value::Bool(true)) => true,
250 Some(_) => todo!(),
251 },
252 ),
253 None => (generate_db_name(), true),
254 Some(v) => bail!("Invalid value for ${PROP_STORAGE_DB} config property: ${v}"),
255 };
256
257 let mut client = Client::new(self.admin_client.database_url(), &db);
260
261 let storage_username = match (
263 get_private_conf(volume_cfg, PROP_STORAGE_USERNAME)?,
264 get_private_conf(volume_cfg, PROP_STORAGE_PASSWORD)?,
265 ) {
266 (Some(username), Some(password)) => {
267 client = client.with_auth(username, password);
268 Some(username.clone())
269 }
270 (None, None) => None,
271 _ => {
272 bail!(
273 "Optional properties `{}` and `{}` must coexist",
274 PROP_STORAGE_USERNAME,
275 PROP_STORAGE_PASSWORD
276 )
277 }
278 };
279
280 if !is_db_existing(&client, &db).await? {
282 if createdb {
283 create_db(&self.admin_client, &db, storage_username).await?;
285 } else {
286 bail!("Database '{}' doesn't exist in InfluxDb", db)
287 }
288 }
289
290 volume_cfg
292 .entry(PROP_STORAGE_DB)
293 .or_insert(db.clone().into());
294
295 let mut admin_client = Client::new(self.admin_client.database_url(), db);
297 if let Some((username, password)) = &self.credentials {
298 admin_client = admin_client.with_auth(username, password);
299 }
300
301 config.volume_cfg = cfg.into();
302 Ok(Box::new(InfluxDbStorage {
303 config,
304 admin_client,
305 client,
306 on_closure,
307 }))
308 }
309}
310
311enum OnClosure {
312 DropDb,
313 DropSeries,
314 DoNothing,
315}
316
317impl TryFrom<&Parameters<'_>> for OnClosure {
318 type Error = Error;
319 fn try_from(p: &Parameters) -> ZResult<OnClosure> {
320 match p.get(PROP_STORAGE_ON_CLOSURE) {
321 Some(s) => {
322 if s == "drop_db" {
323 Ok(OnClosure::DropDb)
324 } else if s == "drop_series" {
325 Ok(OnClosure::DropSeries)
326 } else {
327 bail!("Unsupported value for 'on_closure' property: {}", s)
328 }
329 }
330 None => Ok(OnClosure::DoNothing),
331 }
332 }
333}
334
335struct InfluxDbStorage {
336 config: StorageConfig,
337 admin_client: Client,
338 client: Client,
339 on_closure: OnClosure,
340}
341
342impl InfluxDbStorage {
343 async fn get_deletion_timestamp(&self, measurement: &str) -> ZResult<Option<Timestamp>> {
344 #[derive(Deserialize, Debug, PartialEq)]
345 struct QueryResult {
346 timestamp: String,
347 }
348
349 let query = InfluxRQuery::new(format!(
350 r#"SELECT "timestamp" FROM "{measurement}" WHERE kind='DEL' ORDER BY time DESC LIMIT 1"#
351 ));
352 match self.client.json_query(query).await {
353 Ok(mut result) => match result.deserialize_next::<QueryResult>() {
354 Ok(qr) => {
355 if !qr.series.is_empty() && !qr.series[0].values.is_empty() {
356 let ts = qr.series[0].values[0]
357 .timestamp
358 .parse::<Timestamp>()
359 .map_err(|err| {
360 zerror!(
361 "Failed to parse the latest timestamp for deletion of measurement {} : {}",
362 measurement, err.cause)
363 })?;
364 Ok(Some(ts))
365 } else {
366 Ok(None)
367 }
368 }
369 Err(err) => bail!(
370 "Failed to get latest timestamp for deletion of measurement {} : {}",
371 measurement,
372 err
373 ),
374 },
375 Err(err) => bail!(
376 "Failed to get latest timestamp for deletion of measurement {} : {}",
377 measurement,
378 err
379 ),
380 }
381 }
382
383 async fn schedule_measurement_drop(&self, measurement: &str) {
384 let m_string = measurement.to_string();
385 let cloned_client = self.client.clone();
386
387 let async_drop = async {
391 tokio::time::sleep(Duration::from_millis(DROP_MEASUREMENT_TIMEOUT_MS)).await;
392 drop_measurement(m_string, cloned_client).await;
393 };
394
395 match tokio::runtime::Handle::try_current() {
396 Ok(handle) => handle.spawn(async_drop),
397 Err(_) => TOKIO_RUNTIME.spawn(async_drop),
398 };
399 }
400
401 fn keyexpr_from_serie(&self, serie_name: &str) -> ZResult<Option<OwnedKeyExpr>> {
402 if serie_name.eq(NONE_KEY) {
403 Ok(None)
404 } else {
405 match OwnedKeyExpr::from_str(serie_name) {
406 Ok(key) => Ok(Some(key)),
407 Err(e) => Err(format!("{}", e).into()),
408 }
409 }
410 }
411}
412
413#[async_trait]
414impl Storage for InfluxDbStorage {
415 fn get_admin_status(&self) -> JsonValue {
416 self.config.to_json_value().into()
418 }
419
420 async fn put(
421 &mut self,
422 key: Option<OwnedKeyExpr>,
423 payload: ZBytes,
424 encoding: Encoding,
425 timestamp: Timestamp,
426 ) -> ZResult<StorageInsertionResult> {
427 let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
428
429 let influx_time = timestamp.get_time().to_duration().as_nanos();
431
432 if let Some(del_time) = self.get_deletion_timestamp(measurement.as_str()).await? {
434 if timestamp < del_time {
436 debug!(
437 "Received a value for {:?} with timestamp older than its deletion; ignore it",
438 measurement
439 );
440 return Ok(StorageInsertionResult::Outdated);
441 }
442 }
443
444 let (base64, strvalue) = match payload.try_to_string() {
446 Ok(s) => (false, s),
447 Err(_) => (true, b64_std_engine.encode(payload.to_bytes()).into()),
448 };
449
450 let encoding_string_rep = encoding.to_string(); let query = InfluxWQuery::new(
456 InfluxTimestamp::Nanoseconds(influx_time),
457 measurement.clone(),
458 )
459 .add_tag("kind", "PUT")
460 .add_field("timestamp", timestamp.to_string())
461 .add_field("encoding_prefix", encoding.id())
462 .add_field("encoding_suffix", encoding_string_rep) .add_field("base64", base64)
464 .add_field("value", strvalue.as_ref());
465
466 debug!("Put {:?} with Influx query: {:?}", measurement, query);
467 if let Err(e) = self.client.query(&query).await {
468 bail!(
469 "Failed to put Value for {:?} in InfluxDb storage : {}",
470 measurement,
471 e
472 )
473 } else {
474 Ok(StorageInsertionResult::Inserted)
475 }
476 }
477
478 async fn delete(
479 &mut self,
480 key: Option<OwnedKeyExpr>,
481 timestamp: Timestamp,
482 ) -> ZResult<StorageInsertionResult> {
483 let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
484
485 let influx_time = timestamp.get_time().to_duration().as_nanos();
487
488 let query = InfluxRQuery::new(format!(
491 r#"DELETE FROM "{}" WHERE time < {}"#,
492 measurement, influx_time
493 ));
494 debug!("Delete {:?} with Influx query: {:?}", measurement, query);
495 if let Err(e) = self.client.query(&query).await {
496 bail!(
497 "Failed to delete points for measurement '{}' from InfluxDb storage : {}",
498 measurement,
499 e
500 )
501 }
502 let query = InfluxWQuery::new(
504 InfluxTimestamp::Nanoseconds(influx_time),
505 measurement.clone(),
506 )
507 .add_tag("kind", "DEL")
508 .add_field("timestamp", timestamp.to_string())
509 .add_field("encoding_prefix", 0_u8)
510 .add_field("encoding_suffix", "")
511 .add_field("base64", false)
512 .add_field("value", "");
513 debug!(
514 "Mark measurement {} as deleted at time {}",
515 measurement, influx_time
516 );
517 if let Err(e) = self.client.query(&query).await {
518 bail!(
519 "Failed to mark measurement {:?} as deleted : {}",
520 measurement,
521 e
522 )
523 }
524 self.schedule_measurement_drop(measurement.as_str()).await;
526 Ok(StorageInsertionResult::Deleted)
527 }
528
529 async fn get(
530 &mut self,
531 key: Option<OwnedKeyExpr>,
532 parameters: &str,
533 ) -> ZResult<Vec<StoredData>> {
534 let measurement = match key {
535 Some(k) => k,
536 None => OwnedKeyExpr::from_str(NONE_KEY).unwrap(),
537 };
538 let regex = key_exprs_to_influx_regex(&[&KeyExpr::from(measurement)]);
540
541 let clauses = clauses_from_parameters(parameters)?;
543
544 let influx_query_str = format!("SELECT * FROM {regex} {clauses}");
546 let influx_query = InfluxRQuery::new(&influx_query_str);
547
548 #[derive(Deserialize, Debug)]
550 struct ZenohPoint {
551 #[allow(dead_code)]
552 kind: String,
554 timestamp: String,
555 encoding_prefix: u8,
556 encoding_suffix: String,
557 base64: bool,
558 value: String,
559 }
560
561 let mut result = Vec::new();
562 match self.client.json_query(influx_query).await {
563 Ok(mut query_result) => {
564 while !query_result.results.is_empty() {
565 match query_result.deserialize_next::<ZenohPoint>() {
566 Ok(retn) => {
567 for serie in retn.series {
569 let ke = match self.keyexpr_from_serie(&serie.name) {
571 Ok(k) => k,
572 Err(e) => {
573 error!(
574 "Error replying with serie '{}' : {}",
575 serie.name, e
576 );
577 continue;
578 }
579 };
580 debug!("Replying {} values for {:?}", serie.values.len(), ke);
581 for zpoint in serie.values {
583 let encoding = if zpoint.encoding_suffix.is_empty() {
586 Encoding::new(zpoint.encoding_prefix.into(), None)
587 } else {
588 Encoding::from(zpoint.encoding_suffix)
589 };
590 let payload = if zpoint.base64 {
592 match b64_std_engine.decode(zpoint.value) {
593 Ok(v) => ZBuf::from(v),
594 Err(e) => {
595 warn!(
596 r#"Failed to decode zenoh base64 Value from Influx point {} with timestamp="{}": {}"#,
597 serie.name, zpoint.timestamp, e
598 );
599 continue;
600 }
601 }
602 } else {
603 ZBuf::from(zpoint.value.into_bytes())
604 };
605 let timestamp = match Timestamp::from_str(&zpoint.timestamp) {
607 Ok(t) => t,
608 Err(e) => {
609 warn!(
610 r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
611 serie.name, zpoint.timestamp, e
612 );
613 continue;
614 }
615 };
616 result.push(StoredData {
617 payload: payload.into(),
618 encoding,
619 timestamp,
620 });
621 }
622 }
623 }
624 Err(e) => {
625 bail!(
626 "Failed to parse result of InfluxDB query '{}': {}",
627 influx_query_str,
628 e
629 )
630 }
631 }
632 }
633 }
634 Err(e) => bail!(
635 "Failed to query InfluxDb with '{}' : {}",
636 influx_query_str,
637 e
638 ),
639 }
640 Ok(result)
641 }
642
643 async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
644 let mut result = Vec::new();
645
646 let influx_query_str = format!(
648 "SELECT * FROM {} ORDER BY time DESC LIMIT 1",
649 *INFLUX_REGEX_ALL
650 );
651 let influx_query = InfluxRQuery::new(&influx_query_str);
652
653 #[derive(Deserialize, Debug)]
655 struct ZenohPoint {
656 #[allow(dead_code)]
657 kind: String,
659 timestamp: String,
660 }
661 debug!("Get all entries with Influx query: {}", influx_query_str);
662 match self.client.json_query(influx_query).await {
663 Ok(mut query_result) => {
664 while !query_result.results.is_empty() {
665 match query_result.deserialize_next::<ZenohPoint>() {
666 Ok(retn) => {
667 for serie in retn.series {
669 match self.keyexpr_from_serie(&serie.name) {
671 Ok(ke) => {
672 debug!(
673 "Replying {} values for {:?}",
674 serie.values.len(),
675 ke
676 );
677 for zpoint in serie.values {
679 match Timestamp::from_str(&zpoint.timestamp) {
681 Ok(timestamp) => {
682 result.push((ke.clone(), timestamp))
683 }
684 Err(e) => warn!(
685 r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
686 serie.name, zpoint.timestamp, e
687 ),
688 };
689 }
690 }
691 Err(e) => {
692 error!("Error replying with serie '{}' : {}", serie.name, e)
693 }
694 };
695 }
696 }
697 Err(e) => {
698 bail!(
699 "Failed to parse result of InfluxDB query '{}': {}",
700 influx_query_str,
701 e
702 )
703 }
704 }
705 }
706 Ok(result)
707 }
708 Err(e) => bail!(
709 "Failed to query InfluxDb with '{}' : {}",
710 influx_query_str,
711 e
712 ),
713 }
714 }
715}
716
717impl Drop for InfluxDbStorage {
718 fn drop(&mut self) {
719 debug!("Closing InfluxDB storage");
720 match self.on_closure {
721 OnClosure::DropDb => {
722 blockon_runtime(async move {
723 let db = self.admin_client.database_name();
724 debug!("Close InfluxDB storage, dropping database {}", db);
725 let query = InfluxRQuery::new(format!(r#"DROP DATABASE "{db}""#));
726 if let Err(e) = self.admin_client.query(&query).await {
727 error!("Failed to drop InfluxDb database '{}' : {}", db, e)
728 }
729 });
730 }
731 OnClosure::DropSeries => {
732 blockon_runtime(async move {
733 let db = self.client.database_name();
734 debug!(
735 "Close InfluxDB storage, dropping all series from database {}",
736 db
737 );
738 let query = InfluxRQuery::new("DROP SERIES FROM /.*/");
739 if let Err(e) = self.client.query(&query).await {
740 error!(
741 "Failed to drop all series from InfluxDb database '{}' : {}",
742 db, e
743 )
744 }
745 });
746 }
747 OnClosure::DoNothing => {
748 debug!(
749 "Close InfluxDB storage, keeping database {} as it is",
750 self.client.database_name()
751 );
752 }
753 }
754 }
755}
756
757async fn drop_measurement(measurement: String, client: Client) {
758 #[derive(Deserialize, Debug, PartialEq)]
759 struct QueryResult {
760 kind: String,
761 }
762
763 let query = InfluxRQuery::new(format!(
765 r#"SELECT "kind" FROM "{}" WHERE kind!='DEL' LIMIT 1"#,
766 measurement
767 ));
768 match client.json_query(query).await {
769 Ok(mut result) => {
770 match result.deserialize_next::<QueryResult>() {
771 Ok(qr) => {
772 if !qr.series.is_empty() {
773 debug!("Measurement {} contains new values inserted after deletion; don't drop it", measurement);
774 return;
775 }
776 }
777 Err(e) => {
778 warn!(
779 "Failed to check if measurement '{}' is empty (can't drop it) : {}",
780 measurement, e
781 );
782 }
783 }
784 }
785 Err(e) => {
786 warn!(
787 "Failed to check if measurement '{}' is empty (can't drop it) : {}",
788 measurement, e
789 );
790 return;
791 }
792 }
793
794 let query = InfluxRQuery::new(format!(r#"DROP MEASUREMENT "{}""#, measurement));
796 debug!(
797 "Drop measurement {} after timeout with Influx query: {:?}",
798 measurement, query
799 );
800 if let Err(e) = client.query(&query).await {
801 warn!(
802 "Failed to drop measurement '{}' from InfluxDb storage : {}",
803 measurement, e
804 );
805 }
806}
807
808fn generate_db_name() -> String {
809 format!("zenoh_db_{}", Uuid::new_v4().simple())
810}
811
812async fn show_databases(client: &Client) -> ZResult<Vec<String>> {
813 #[derive(Deserialize)]
814 struct Database {
815 name: String,
816 }
817 let query = InfluxRQuery::new("SHOW DATABASES");
818 debug!("List databases with Influx query: {:?}", query);
819 match client.json_query(query).await {
820 Ok(mut result) => match result.deserialize_next::<Database>() {
821 Ok(dbs) => {
822 let mut result: Vec<String> = Vec::new();
823 for serie in dbs.series {
824 for db in serie.values {
825 result.push(db.name);
826 }
827 }
828 Ok(result)
829 }
830 Err(e) => bail!(
831 "Failed to parse list of existing InfluxDb databases : {}",
832 e
833 ),
834 },
835 Err(e) => bail!("Failed to list existing InfluxDb databases : {}", e),
836 }
837}
838
839async fn is_db_existing(client: &Client, db_name: &str) -> ZResult<bool> {
840 let dbs = show_databases(client).await?;
841 Ok(dbs.iter().any(|e| e == db_name))
842}
843
844async fn create_db(
845 client: &Client,
846 db_name: &str,
847 storage_username: Option<String>,
848) -> ZResult<()> {
849 let query = InfluxRQuery::new(format!(r#"CREATE DATABASE "{db_name}""#));
850 debug!("Create Influx database: {}", db_name);
851 if let Err(e) = client.query(&query).await {
852 bail!(
853 "Failed to create new InfluxDb database '{}' : {}",
854 db_name,
855 e
856 )
857 }
858 debug!("after await: {}", db_name);
859 if let Some(username) = storage_username {
861 let query = InfluxRQuery::new(format!(r#"GRANT ALL ON "{db_name}" TO "{username}""#));
862 debug!(
863 "Grant access to {} on Influx database: {}",
864 username, db_name
865 );
866 if let Err(e) = client.query(&query).await {
867 bail!(
868 "Failed grant access to {} on Influx database '{}' : {}",
869 username,
870 db_name,
871 e
872 )
873 }
874 }
875 Ok(())
876}
877
878fn key_exprs_to_influx_regex(path_exprs: &[&keyexpr]) -> String {
883 let mut result = String::with_capacity(2 * path_exprs[0].len());
884 result.push_str("/^");
885 for (i, path_expr) in path_exprs.iter().enumerate() {
886 if i != 0 {
887 result.push('|');
888 }
889 let mut chars = path_expr.chars().peekable();
890 while let Some(c) = chars.next() {
891 match c {
892 '*' => {
893 if let Some(c2) = chars.peek() {
894 if c2 == &'*' {
895 result.push_str(".*");
896 chars.next();
897 } else {
898 result.push_str(".*")
899 }
900 }
901 }
902 '/' => result.push_str(r"\/"),
903 _ => result.push(c),
904 }
905 }
906 }
907 result.push_str("$/");
908 result
909}
910
911fn clauses_from_parameters(p: &str) -> ZResult<String> {
912 let parameters = Parameters::from(p);
913 let mut result = String::with_capacity(256);
914 result.push_str("WHERE kind!='DEL'");
915
916 let time_range = match parameters.time_range() {
917 Some(time_range) => time_range,
918 None => {
919 result.push_str(" ORDER BY time DESC LIMIT 1");
920 return Ok(result);
921 }
922 };
923 match time_range {
924 Ok(TimeRange { start, end }) => {
925 match start {
926 TimeBound::Inclusive(t) => {
927 result.push_str(" AND time >= ");
928 write_timeexpr(&mut result, t);
929 }
930 TimeBound::Exclusive(t) => {
931 result.push_str(" AND time > ");
932 write_timeexpr(&mut result, t);
933 }
934 TimeBound::Unbounded => {}
935 }
936 match end {
937 TimeBound::Inclusive(t) => {
938 result.push_str(" AND time <= ");
939 write_timeexpr(&mut result, t);
940 }
941 TimeBound::Exclusive(t) => {
942 result.push_str(" AND time < ");
943 write_timeexpr(&mut result, t);
944 }
945 TimeBound::Unbounded => {}
946 }
947 }
948 Err(err) => {
949 warn!("Error In TimeRange parse from String {}", err);
950 result.push_str(" ORDER BY time DESC LIMIT 1");
952 }
953 }
954 Ok(result)
955}
956
957fn write_timeexpr(s: &mut String, t: TimeExpr) {
958 use std::fmt::Write;
959
960 use humantime::format_rfc3339;
961 match t {
962 TimeExpr::Fixed(t) => write!(s, "'{}'", format_rfc3339(t)),
963 TimeExpr::Now { offset_secs } => write!(s, "now(){offset_secs:+}s"),
964 }
965 .unwrap()
966}