zenoh_backend_influxdb/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    convert::{TryFrom, TryInto},
17    future::Future,
18    str::FromStr,
19    time::Duration,
20};
21
22use async_trait::async_trait;
23use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
24use influxdb::{
25    Client, ReadQuery as InfluxRQuery, Timestamp as InfluxTimestamp, WriteQuery as InfluxWQuery,
26};
27use serde::Deserialize;
28use tracing::{debug, error, warn};
29use uuid::Uuid;
30use zenoh::{
31    bytes::{Encoding, ZBytes},
32    internal::{bail, buffers::ZBuf, zerror},
33    key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
34    query::{Parameters, TimeBound, TimeExpr, TimeRange, ZenohParameters},
35    time::Timestamp,
36    try_init_log_from_env, Error, Result as ZResult,
37};
38use zenoh_backend_traits::{
39    config::{PrivacyGetResult, PrivacyTransparentGet, StorageConfig, VolumeConfig},
40    StorageInsertionResult, *,
41};
42use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};
43use zenoh_util::ffi::JsonValue;
44
45const WORKER_THREAD_NUM: usize = 2;
46const MAX_BLOCK_THREAD_NUM: usize = 50;
47lazy_static::lazy_static! {
48    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
49    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
50               .worker_threads(WORKER_THREAD_NUM)
51               .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
52               .enable_all()
53               .build()
54               .expect("Unable to create runtime");
55}
56
57#[inline(always)]
58fn blockon_runtime<F: Future>(task: F) -> F::Output {
59    // Check whether able to get the current runtime
60    match tokio::runtime::Handle::try_current() {
61        Ok(rt) => {
62            // Able to get the current runtime (standalone binary), spawn on the current runtime
63            tokio::task::block_in_place(|| rt.block_on(task))
64        }
65        Err(_) => {
66            // Unable to get the current runtime (dynamic plugins), spawn on the global runtime
67            tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
68        }
69    }
70}
71
72// Properties used by the Backend
73pub const PROP_BACKEND_URL: &str = "url";
74pub const PROP_BACKEND_USERNAME: &str = "username";
75pub const PROP_BACKEND_PASSWORD: &str = "password";
76
77// Properties used by the Storage
78pub const PROP_STORAGE_DB: &str = "db";
79pub const PROP_STORAGE_CREATE_DB: &str = "create_db";
80pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
81pub const PROP_STORAGE_USERNAME: &str = PROP_BACKEND_USERNAME;
82pub const PROP_STORAGE_PASSWORD: &str = PROP_BACKEND_PASSWORD;
83
84// Special key for None (when the prefix being stripped exactly matches the key)
85pub const NONE_KEY: &str = "@@none_key@@";
86
87// delay after deletion to drop a measurement
88const DROP_MEASUREMENT_TIMEOUT_MS: u64 = 5000;
89
90lazy_static::lazy_static!(
91    static ref INFLUX_REGEX_ALL: String = key_exprs_to_influx_regex(&["**".try_into().unwrap()]);
92);
93
94fn get_private_conf<'a>(
95    config: &'a serde_json::Map<String, serde_json::Value>,
96    credit: &str,
97) -> ZResult<Option<&'a String>> {
98    match config.get_private(credit) {
99        PrivacyGetResult::NotFound => Ok(None),
100        PrivacyGetResult::Private(serde_json::Value::String(v)) => Ok(Some(v)),
101        PrivacyGetResult::Public(serde_json::Value::String(v)) => {
102            tracing::warn!(
103                r#"Value "{}" is given for `{}` publicly (i.e. is visible by anyone who can fetch the router configuration). You may want to replace `{}: "{}"` with `private: {{{}: "{}"}}`"#,
104                v,
105                credit,
106                credit,
107                v,
108                credit,
109                v
110            );
111            Ok(Some(v))
112        }
113        PrivacyGetResult::Both {
114            public: serde_json::Value::String(public),
115            private: serde_json::Value::String(private),
116        } => {
117            tracing::warn!(
118                r#"Value "{}" is given for `{}` publicly, but a private value also exists. The private value will be used, but the public value, which is {} the same as the private one, will still be visible in configurations."#,
119                public,
120                credit,
121                if public == private { "" } else { "not " }
122            );
123            Ok(Some(private))
124        }
125        _ => {
126            bail!("Optional property `{}` must be a string", credit)
127        }
128    }
129}
130
131pub struct InfluxDbBackend {}
132
133#[cfg(feature = "dynamic_plugin")]
134zenoh_plugin_trait::declare_plugin!(InfluxDbBackend);
135
136impl Plugin for InfluxDbBackend {
137    type StartArgs = VolumeConfig;
138    type Instance = VolumeInstance;
139
140    const DEFAULT_NAME: &'static str = "influxdb_backend";
141    const PLUGIN_VERSION: &'static str = plugin_version!();
142    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
143
144    fn start(_name: &str, config: &Self::StartArgs) -> ZResult<Self::Instance> {
145        try_init_log_from_env();
146
147        debug!("InfluxDB backend {}", Self::PLUGIN_VERSION);
148
149        let mut cfg_rest = config.rest.into_serde_map();
150        cfg_rest.insert("version".into(), Self::PLUGIN_VERSION.into());
151
152        let url = match cfg_rest.get(PROP_BACKEND_URL) {
153            Some(serde_json::Value::String(url)) => url.clone(),
154            _ => {
155                bail!(
156                    "Mandatory property `{}` for InfluxDb Backend must be a string",
157                    PROP_BACKEND_URL
158                )
159            }
160        };
161
162        // The InfluxDB client used for administration purposes (show/create/drop databases)
163        let mut admin_client = Client::new(url, "");
164
165        // Note: remove username/password from properties to not re-expose them in admin_status
166        let credentials = match (
167            get_private_conf(&cfg_rest, PROP_BACKEND_USERNAME)?,
168            get_private_conf(&cfg_rest, PROP_BACKEND_PASSWORD)?,
169        ) {
170            (Some(username), Some(password)) => {
171                admin_client = admin_client.with_auth(username, password);
172                Some((username.clone(), password.clone()))
173            }
174            (None, None) => None,
175            _ => {
176                bail!(
177                    "Optional properties `{}` and `{}` must coexist",
178                    PROP_BACKEND_USERNAME,
179                    PROP_BACKEND_PASSWORD
180                )
181            }
182        };
183
184        // Check connectivity to InfluxDB, trying to list databases
185        match blockon_runtime(async { show_databases(&admin_client).await }) {
186            Ok(dbs) => {
187                // trick: if "_internal" db is not shown, it means the credentials are not for an admin
188                if !dbs.iter().any(|e| e == "_internal") {
189                    warn!("The InfluxDB credentials are not for an admin user; the volume won't be able to create or drop any database")
190                }
191            }
192            Err(e) => bail!("Failed to create InfluxDb Volume : {}", e),
193        }
194
195        let mut config = config.clone();
196        config.rest = cfg_rest.into();
197
198        Ok(Box::new(InfluxDbVolume {
199            admin_status: config,
200            admin_client,
201            credentials,
202        }))
203    }
204}
205
206pub struct InfluxDbVolume {
207    admin_status: VolumeConfig,
208    admin_client: Client,
209    credentials: Option<(String, String)>,
210}
211
212#[async_trait]
213impl Volume for InfluxDbVolume {
214    fn get_admin_status(&self) -> JsonValue {
215        self.admin_status.to_json_value().into()
216    }
217
218    fn get_capability(&self) -> Capability {
219        Capability {
220            persistence: Persistence::Durable,
221            history: History::All,
222        }
223    }
224
225    async fn create_storage(&self, mut config: StorageConfig) -> ZResult<Box<dyn Storage>> {
226        let mut cfg = config.volume_cfg.into_serde_value();
227        let volume_cfg = match cfg.as_object_mut() {
228            Some(v) => v,
229            None => bail!("InfluxDB backed storages need some volume-specific configuration"),
230        };
231        let on_closure = match volume_cfg.get(PROP_STORAGE_ON_CLOSURE) {
232            Some(serde_json::Value::String(x)) if x == "drop_series" => OnClosure::DropSeries,
233            Some(serde_json::Value::String(x)) if x == "drop_db" => OnClosure::DropDb,
234            Some(serde_json::Value::String(x)) if x == "do_nothing" => OnClosure::DoNothing,
235            None => OnClosure::DoNothing,
236            Some(_) => {
237                bail!(
238                    r#"`{}` property of storage `{}` must be one of "do_nothing" (default), "drop_db" and "drop_series""#,
239                    PROP_STORAGE_ON_CLOSURE,
240                    &config.name
241                )
242            }
243        };
244        let (db, createdb) = match volume_cfg.get(PROP_STORAGE_DB) {
245            Some(serde_json::Value::String(s)) => (
246                s.clone(),
247                match volume_cfg.get(PROP_STORAGE_CREATE_DB) {
248                    None | Some(serde_json::Value::Bool(false)) => false,
249                    Some(serde_json::Value::Bool(true)) => true,
250                    Some(_) => todo!(),
251                },
252            ),
253            None => (generate_db_name(), true),
254            Some(v) => bail!("Invalid value for ${PROP_STORAGE_DB} config property: ${v}"),
255        };
256
257        // The Influx client on database used to write/query on this storage
258        // (using the same URL than backend's admin_client, but with storage credentials)
259        let mut client = Client::new(self.admin_client.database_url(), &db);
260
261        // Use credentials if specified in storage's volume config
262        let storage_username = match (
263            get_private_conf(volume_cfg, PROP_STORAGE_USERNAME)?,
264            get_private_conf(volume_cfg, PROP_STORAGE_PASSWORD)?,
265        ) {
266            (Some(username), Some(password)) => {
267                client = client.with_auth(username, password);
268                Some(username.clone())
269            }
270            (None, None) => None,
271            _ => {
272                bail!(
273                    "Optional properties `{}` and `{}` must coexist",
274                    PROP_STORAGE_USERNAME,
275                    PROP_STORAGE_PASSWORD
276                )
277            }
278        };
279
280        // Check if the database exists (using storages credentials)
281        if !is_db_existing(&client, &db).await? {
282            if createdb {
283                // create db using backend's credentials
284                create_db(&self.admin_client, &db, storage_username).await?;
285            } else {
286                bail!("Database '{}' doesn't exist in InfluxDb", db)
287            }
288        }
289
290        // re-insert the actual name of database (in case it has been generated)
291        volume_cfg
292            .entry(PROP_STORAGE_DB)
293            .or_insert(db.clone().into());
294
295        // The Influx client on database with backend's credentials (admin), to drop measurements and database
296        let mut admin_client = Client::new(self.admin_client.database_url(), db);
297        if let Some((username, password)) = &self.credentials {
298            admin_client = admin_client.with_auth(username, password);
299        }
300
301        config.volume_cfg = cfg.into();
302        Ok(Box::new(InfluxDbStorage {
303            config,
304            admin_client,
305            client,
306            on_closure,
307        }))
308    }
309}
310
311enum OnClosure {
312    DropDb,
313    DropSeries,
314    DoNothing,
315}
316
317impl TryFrom<&Parameters<'_>> for OnClosure {
318    type Error = Error;
319    fn try_from(p: &Parameters) -> ZResult<OnClosure> {
320        match p.get(PROP_STORAGE_ON_CLOSURE) {
321            Some(s) => {
322                if s == "drop_db" {
323                    Ok(OnClosure::DropDb)
324                } else if s == "drop_series" {
325                    Ok(OnClosure::DropSeries)
326                } else {
327                    bail!("Unsupported value for 'on_closure' property: {}", s)
328                }
329            }
330            None => Ok(OnClosure::DoNothing),
331        }
332    }
333}
334
335struct InfluxDbStorage {
336    config: StorageConfig,
337    admin_client: Client,
338    client: Client,
339    on_closure: OnClosure,
340}
341
342impl InfluxDbStorage {
343    async fn get_deletion_timestamp(&self, measurement: &str) -> ZResult<Option<Timestamp>> {
344        #[derive(Deserialize, Debug, PartialEq)]
345        struct QueryResult {
346            timestamp: String,
347        }
348
349        let query = InfluxRQuery::new(format!(
350            r#"SELECT "timestamp" FROM "{measurement}" WHERE kind='DEL' ORDER BY time DESC LIMIT 1"#
351        ));
352        match self.client.json_query(query).await {
353            Ok(mut result) => match result.deserialize_next::<QueryResult>() {
354                Ok(qr) => {
355                    if !qr.series.is_empty() && !qr.series[0].values.is_empty() {
356                        let ts = qr.series[0].values[0]
357                            .timestamp
358                            .parse::<Timestamp>()
359                            .map_err(|err| {
360                                zerror!(
361                                "Failed to parse the latest timestamp for deletion of measurement {} : {}",
362                                measurement, err.cause)
363                            })?;
364                        Ok(Some(ts))
365                    } else {
366                        Ok(None)
367                    }
368                }
369                Err(err) => bail!(
370                    "Failed to get latest timestamp for deletion of measurement {} : {}",
371                    measurement,
372                    err
373                ),
374            },
375            Err(err) => bail!(
376                "Failed to get latest timestamp for deletion of measurement {} : {}",
377                measurement,
378                err
379            ),
380        }
381    }
382
383    async fn schedule_measurement_drop(&self, measurement: &str) {
384        let m_string = measurement.to_string();
385        let cloned_client = self.client.clone();
386
387        // Wait till timeout expires and execute drop,
388        // When this plugin executes as a dynamically loaded plugin,
389        // the zenohd tokio-runtime is unavailable, therefore the plugin's runtime must be invoked
390        let async_drop = async {
391            tokio::time::sleep(Duration::from_millis(DROP_MEASUREMENT_TIMEOUT_MS)).await;
392            drop_measurement(m_string, cloned_client).await;
393        };
394
395        match tokio::runtime::Handle::try_current() {
396            Ok(handle) => handle.spawn(async_drop),
397            Err(_) => TOKIO_RUNTIME.spawn(async_drop),
398        };
399    }
400
401    fn keyexpr_from_serie(&self, serie_name: &str) -> ZResult<Option<OwnedKeyExpr>> {
402        if serie_name.eq(NONE_KEY) {
403            Ok(None)
404        } else {
405            match OwnedKeyExpr::from_str(serie_name) {
406                Ok(key) => Ok(Some(key)),
407                Err(e) => Err(format!("{}", e).into()),
408            }
409        }
410    }
411}
412
413#[async_trait]
414impl Storage for InfluxDbStorage {
415    fn get_admin_status(&self) -> JsonValue {
416        // TODO: possibly add more properties in returned Value for more information about this storage
417        self.config.to_json_value().into()
418    }
419
420    async fn put(
421        &mut self,
422        key: Option<OwnedKeyExpr>,
423        payload: ZBytes,
424        encoding: Encoding,
425        timestamp: Timestamp,
426    ) -> ZResult<StorageInsertionResult> {
427        let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
428
429        // Note: assume that uhlc timestamp was generated by a clock using UNIX_EPOCH (that's the case by default)
430        let influx_time = timestamp.get_time().to_duration().as_nanos();
431
432        // get timestamp of deletion of this measurement, if any
433        if let Some(del_time) = self.get_deletion_timestamp(measurement.as_str()).await? {
434            // ignore sample if oldest than the deletion
435            if timestamp < del_time {
436                debug!(
437                    "Received a value for {:?} with timestamp older than its deletion; ignore it",
438                    measurement
439                );
440                return Ok(StorageInsertionResult::Outdated);
441            }
442        }
443
444        // encode the value as a string to be stored in InfluxDB, converting to base64 if the buffer is not a UTF-8 string
445        let (base64, strvalue) = match payload.try_to_string() {
446            Ok(s) => (false, s),
447            Err(_) => (true, b64_std_engine.encode(payload.to_bytes()).into()),
448        };
449
450        // Note: tags are stored as strings in InfluxDB, while fileds are typed.
451        // For simpler/faster deserialization, we store encoding, timestamp and base64 as fields.
452        // while the kind is stored as a tag to be indexed by InfluxDB and have faster queries on it.
453        let encoding_string_rep = encoding.to_string(); // add_field only supports Strings and not Vec<u8>
454
455        let query = InfluxWQuery::new(
456            InfluxTimestamp::Nanoseconds(influx_time),
457            measurement.clone(),
458        )
459        .add_tag("kind", "PUT")
460        .add_field("timestamp", timestamp.to_string())
461        .add_field("encoding_prefix", encoding.id())
462        .add_field("encoding_suffix", encoding_string_rep) // TODO: Rename To Encoding and only keep String rep
463        .add_field("base64", base64)
464        .add_field("value", strvalue.as_ref());
465
466        debug!("Put {:?} with Influx query: {:?}", measurement, query);
467        if let Err(e) = self.client.query(&query).await {
468            bail!(
469                "Failed to put Value for {:?} in InfluxDb storage : {}",
470                measurement,
471                e
472            )
473        } else {
474            Ok(StorageInsertionResult::Inserted)
475        }
476    }
477
478    async fn delete(
479        &mut self,
480        key: Option<OwnedKeyExpr>,
481        timestamp: Timestamp,
482    ) -> ZResult<StorageInsertionResult> {
483        let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
484
485        // Note: assume that uhlc timestamp was generated by a clock using UNIX_EPOCH (that's the case by default)
486        let influx_time = timestamp.get_time().to_duration().as_nanos();
487
488        // delete all points from the measurement that are older than this DELETE message
489        // (in case more recent PUT have been recevived un-ordered)
490        let query = InfluxRQuery::new(format!(
491            r#"DELETE FROM "{}" WHERE time < {}"#,
492            measurement, influx_time
493        ));
494        debug!("Delete {:?} with Influx query: {:?}", measurement, query);
495        if let Err(e) = self.client.query(&query).await {
496            bail!(
497                "Failed to delete points for measurement '{}' from InfluxDb storage : {}",
498                measurement,
499                e
500            )
501        }
502        // store a point (with timestamp) with "delete" tag, thus we don't re-introduce an older point later
503        let query = InfluxWQuery::new(
504            InfluxTimestamp::Nanoseconds(influx_time),
505            measurement.clone(),
506        )
507        .add_tag("kind", "DEL")
508        .add_field("timestamp", timestamp.to_string())
509        .add_field("encoding_prefix", 0_u8)
510        .add_field("encoding_suffix", "")
511        .add_field("base64", false)
512        .add_field("value", "");
513        debug!(
514            "Mark measurement {} as deleted at time {}",
515            measurement, influx_time
516        );
517        if let Err(e) = self.client.query(&query).await {
518            bail!(
519                "Failed to mark measurement {:?} as deleted : {}",
520                measurement,
521                e
522            )
523        }
524        // schedule the drop of measurement later in the future, if it's empty
525        self.schedule_measurement_drop(measurement.as_str()).await;
526        Ok(StorageInsertionResult::Deleted)
527    }
528
529    async fn get(
530        &mut self,
531        key: Option<OwnedKeyExpr>,
532        parameters: &str,
533    ) -> ZResult<Vec<StoredData>> {
534        let measurement = match key {
535            Some(k) => k,
536            None => OwnedKeyExpr::from_str(NONE_KEY).unwrap(),
537        };
538        // convert the key expression into an Influx regex
539        let regex = key_exprs_to_influx_regex(&[&KeyExpr::from(measurement)]);
540
541        // construct the Influx query clauses from the parameters
542        let clauses = clauses_from_parameters(parameters)?;
543
544        // the Influx query
545        let influx_query_str = format!("SELECT * FROM {regex} {clauses}");
546        let influx_query = InfluxRQuery::new(&influx_query_str);
547
548        // the expected JSon type resulting from the query
549        #[derive(Deserialize, Debug)]
550        struct ZenohPoint {
551            #[allow(dead_code)]
552            // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust...
553            kind: String,
554            timestamp: String,
555            encoding_prefix: u8,
556            encoding_suffix: String,
557            base64: bool,
558            value: String,
559        }
560
561        let mut result = Vec::new();
562        match self.client.json_query(influx_query).await {
563            Ok(mut query_result) => {
564                while !query_result.results.is_empty() {
565                    match query_result.deserialize_next::<ZenohPoint>() {
566                        Ok(retn) => {
567                            // for each serie
568                            for serie in retn.series {
569                                // get the key expression from the serie name
570                                let ke = match self.keyexpr_from_serie(&serie.name) {
571                                    Ok(k) => k,
572                                    Err(e) => {
573                                        error!(
574                                            "Error replying with serie '{}' : {}",
575                                            serie.name, e
576                                        );
577                                        continue;
578                                    }
579                                };
580                                debug!("Replying {} values for {:?}", serie.values.len(), ke);
581                                // for each point
582                                for zpoint in serie.values {
583                                    // get the encoding
584
585                                    let encoding = if zpoint.encoding_suffix.is_empty() {
586                                        Encoding::new(zpoint.encoding_prefix.into(), None)
587                                    } else {
588                                        Encoding::from(zpoint.encoding_suffix)
589                                    };
590                                    // get the payload
591                                    let payload = if zpoint.base64 {
592                                        match b64_std_engine.decode(zpoint.value) {
593                                            Ok(v) => ZBuf::from(v),
594                                            Err(e) => {
595                                                warn!(
596                                                    r#"Failed to decode zenoh base64 Value from Influx point {} with timestamp="{}": {}"#,
597                                                    serie.name, zpoint.timestamp, e
598                                                );
599                                                continue;
600                                            }
601                                        }
602                                    } else {
603                                        ZBuf::from(zpoint.value.into_bytes())
604                                    };
605                                    // get the timestamp
606                                    let timestamp = match Timestamp::from_str(&zpoint.timestamp) {
607                                        Ok(t) => t,
608                                        Err(e) => {
609                                            warn!(
610                                                r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
611                                                serie.name, zpoint.timestamp, e
612                                            );
613                                            continue;
614                                        }
615                                    };
616                                    result.push(StoredData {
617                                        payload: payload.into(),
618                                        encoding,
619                                        timestamp,
620                                    });
621                                }
622                            }
623                        }
624                        Err(e) => {
625                            bail!(
626                                "Failed to parse result of InfluxDB query '{}': {}",
627                                influx_query_str,
628                                e
629                            )
630                        }
631                    }
632                }
633            }
634            Err(e) => bail!(
635                "Failed to query InfluxDb with '{}' : {}",
636                influx_query_str,
637                e
638            ),
639        }
640        Ok(result)
641    }
642
643    async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
644        let mut result = Vec::new();
645
646        // the Influx query: 1 entry == 1 measurement => get only 1 point per measurement (the more recent timestamp)
647        let influx_query_str = format!(
648            "SELECT * FROM {} ORDER BY time DESC LIMIT 1",
649            *INFLUX_REGEX_ALL
650        );
651        let influx_query = InfluxRQuery::new(&influx_query_str);
652
653        // the expected JSon type resulting from the query
654        #[derive(Deserialize, Debug)]
655        struct ZenohPoint {
656            #[allow(dead_code)]
657            // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust...
658            kind: String,
659            timestamp: String,
660        }
661        debug!("Get all entries with Influx query: {}", influx_query_str);
662        match self.client.json_query(influx_query).await {
663            Ok(mut query_result) => {
664                while !query_result.results.is_empty() {
665                    match query_result.deserialize_next::<ZenohPoint>() {
666                        Ok(retn) => {
667                            // for each serie
668                            for serie in retn.series {
669                                // get the key expression from the serie name
670                                match self.keyexpr_from_serie(&serie.name) {
671                                    Ok(ke) => {
672                                        debug!(
673                                            "Replying {} values for {:?}",
674                                            serie.values.len(),
675                                            ke
676                                        );
677                                        // for each point in the serie
678                                        for zpoint in serie.values {
679                                            // get the timestamp (ignore the point if failing)
680                                            match Timestamp::from_str(&zpoint.timestamp) {
681                                                Ok(timestamp) => {
682                                                    result.push((ke.clone(), timestamp))
683                                                }
684                                                Err(e) => warn!(
685                                                    r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
686                                                    serie.name, zpoint.timestamp, e
687                                                ),
688                                            };
689                                        }
690                                    }
691                                    Err(e) => {
692                                        error!("Error replying with serie '{}' : {}", serie.name, e)
693                                    }
694                                };
695                            }
696                        }
697                        Err(e) => {
698                            bail!(
699                                "Failed to parse result of InfluxDB query '{}': {}",
700                                influx_query_str,
701                                e
702                            )
703                        }
704                    }
705                }
706                Ok(result)
707            }
708            Err(e) => bail!(
709                "Failed to query InfluxDb with '{}' : {}",
710                influx_query_str,
711                e
712            ),
713        }
714    }
715}
716
717impl Drop for InfluxDbStorage {
718    fn drop(&mut self) {
719        debug!("Closing InfluxDB storage");
720        match self.on_closure {
721            OnClosure::DropDb => {
722                blockon_runtime(async move {
723                    let db = self.admin_client.database_name();
724                    debug!("Close InfluxDB storage, dropping database {}", db);
725                    let query = InfluxRQuery::new(format!(r#"DROP DATABASE "{db}""#));
726                    if let Err(e) = self.admin_client.query(&query).await {
727                        error!("Failed to drop InfluxDb database '{}' : {}", db, e)
728                    }
729                });
730            }
731            OnClosure::DropSeries => {
732                blockon_runtime(async move {
733                    let db = self.client.database_name();
734                    debug!(
735                        "Close InfluxDB storage, dropping all series from database {}",
736                        db
737                    );
738                    let query = InfluxRQuery::new("DROP SERIES FROM /.*/");
739                    if let Err(e) = self.client.query(&query).await {
740                        error!(
741                            "Failed to drop all series from InfluxDb database '{}' : {}",
742                            db, e
743                        )
744                    }
745                });
746            }
747            OnClosure::DoNothing => {
748                debug!(
749                    "Close InfluxDB storage, keeping database {} as it is",
750                    self.client.database_name()
751                );
752            }
753        }
754    }
755}
756
757async fn drop_measurement(measurement: String, client: Client) {
758    #[derive(Deserialize, Debug, PartialEq)]
759    struct QueryResult {
760        kind: String,
761    }
762
763    // check if there is at least 1 point without "DEL" kind in the measurement
764    let query = InfluxRQuery::new(format!(
765        r#"SELECT "kind" FROM "{}" WHERE kind!='DEL' LIMIT 1"#,
766        measurement
767    ));
768    match client.json_query(query).await {
769        Ok(mut result) => {
770            match result.deserialize_next::<QueryResult>() {
771                Ok(qr) => {
772                    if !qr.series.is_empty() {
773                        debug!("Measurement {} contains new values inserted after deletion; don't drop it", measurement);
774                        return;
775                    }
776                }
777                Err(e) => {
778                    warn!(
779                        "Failed to check if measurement '{}' is empty (can't drop it) : {}",
780                        measurement, e
781                    );
782                }
783            }
784        }
785        Err(e) => {
786            warn!(
787                "Failed to check if measurement '{}' is empty (can't drop it) : {}",
788                measurement, e
789            );
790            return;
791        }
792    }
793
794    // drop the measurement
795    let query = InfluxRQuery::new(format!(r#"DROP MEASUREMENT "{}""#, measurement));
796    debug!(
797        "Drop measurement {} after timeout with Influx query: {:?}",
798        measurement, query
799    );
800    if let Err(e) = client.query(&query).await {
801        warn!(
802            "Failed to drop measurement '{}' from InfluxDb storage : {}",
803            measurement, e
804        );
805    }
806}
807
808fn generate_db_name() -> String {
809    format!("zenoh_db_{}", Uuid::new_v4().simple())
810}
811
812async fn show_databases(client: &Client) -> ZResult<Vec<String>> {
813    #[derive(Deserialize)]
814    struct Database {
815        name: String,
816    }
817    let query = InfluxRQuery::new("SHOW DATABASES");
818    debug!("List databases with Influx query: {:?}", query);
819    match client.json_query(query).await {
820        Ok(mut result) => match result.deserialize_next::<Database>() {
821            Ok(dbs) => {
822                let mut result: Vec<String> = Vec::new();
823                for serie in dbs.series {
824                    for db in serie.values {
825                        result.push(db.name);
826                    }
827                }
828                Ok(result)
829            }
830            Err(e) => bail!(
831                "Failed to parse list of existing InfluxDb databases : {}",
832                e
833            ),
834        },
835        Err(e) => bail!("Failed to list existing InfluxDb databases : {}", e),
836    }
837}
838
839async fn is_db_existing(client: &Client, db_name: &str) -> ZResult<bool> {
840    let dbs = show_databases(client).await?;
841    Ok(dbs.iter().any(|e| e == db_name))
842}
843
844async fn create_db(
845    client: &Client,
846    db_name: &str,
847    storage_username: Option<String>,
848) -> ZResult<()> {
849    let query = InfluxRQuery::new(format!(r#"CREATE DATABASE "{db_name}""#));
850    debug!("Create Influx database: {}", db_name);
851    if let Err(e) = client.query(&query).await {
852        bail!(
853            "Failed to create new InfluxDb database '{}' : {}",
854            db_name,
855            e
856        )
857    }
858    debug!("after await: {}", db_name);
859    // is a username is specified for storage access, grant him access to the database
860    if let Some(username) = storage_username {
861        let query = InfluxRQuery::new(format!(r#"GRANT ALL ON "{db_name}" TO "{username}""#));
862        debug!(
863            "Grant access to {} on Influx database: {}",
864            username, db_name
865        );
866        if let Err(e) = client.query(&query).await {
867            bail!(
868                "Failed grant access to {} on Influx database '{}' : {}",
869                username,
870                db_name,
871                e
872            )
873        }
874    }
875    Ok(())
876}
877
878// Returns an InfluxDB regex (see https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#regular-expressions)
879// corresponding to the list of path expressions. I.e.:
880// Replace "**" with ".*", "*" with "[^\/]*"  and "/" with "\/".
881// Concat each with "|", and surround the result with '/^' and '$/'.
882fn key_exprs_to_influx_regex(path_exprs: &[&keyexpr]) -> String {
883    let mut result = String::with_capacity(2 * path_exprs[0].len());
884    result.push_str("/^");
885    for (i, path_expr) in path_exprs.iter().enumerate() {
886        if i != 0 {
887            result.push('|');
888        }
889        let mut chars = path_expr.chars().peekable();
890        while let Some(c) = chars.next() {
891            match c {
892                '*' => {
893                    if let Some(c2) = chars.peek() {
894                        if c2 == &'*' {
895                            result.push_str(".*");
896                            chars.next();
897                        } else {
898                            result.push_str(".*")
899                        }
900                    }
901                }
902                '/' => result.push_str(r"\/"),
903                _ => result.push(c),
904            }
905        }
906    }
907    result.push_str("$/");
908    result
909}
910
911fn clauses_from_parameters(p: &str) -> ZResult<String> {
912    let parameters = Parameters::from(p);
913    let mut result = String::with_capacity(256);
914    result.push_str("WHERE kind!='DEL'");
915
916    let time_range = match parameters.time_range() {
917        Some(time_range) => time_range,
918        None => {
919            result.push_str(" ORDER BY time DESC LIMIT 1");
920            return Ok(result);
921        }
922    };
923    match time_range {
924        Ok(TimeRange { start, end }) => {
925            match start {
926                TimeBound::Inclusive(t) => {
927                    result.push_str(" AND time >= ");
928                    write_timeexpr(&mut result, t);
929                }
930                TimeBound::Exclusive(t) => {
931                    result.push_str(" AND time > ");
932                    write_timeexpr(&mut result, t);
933                }
934                TimeBound::Unbounded => {}
935            }
936            match end {
937                TimeBound::Inclusive(t) => {
938                    result.push_str(" AND time <= ");
939                    write_timeexpr(&mut result, t);
940                }
941                TimeBound::Exclusive(t) => {
942                    result.push_str(" AND time < ");
943                    write_timeexpr(&mut result, t);
944                }
945                TimeBound::Unbounded => {}
946            }
947        }
948        Err(err) => {
949            warn!("Error In TimeRange parse from String {}", err);
950            //No time selection, return only latest values
951            result.push_str(" ORDER BY time DESC LIMIT 1");
952        }
953    }
954    Ok(result)
955}
956
957fn write_timeexpr(s: &mut String, t: TimeExpr) {
958    use std::fmt::Write;
959
960    use humantime::format_rfc3339;
961    match t {
962        TimeExpr::Fixed(t) => write!(s, "'{}'", format_rfc3339(t)),
963        TimeExpr::Now { offset_secs } => write!(s, "now(){offset_secs:+}s"),
964    }
965    .unwrap()
966}