zenoh_backend_influxdb/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    convert::{TryFrom, TryInto},
17    future::Future,
18    str::FromStr,
19    time::Duration,
20};
21
22use async_trait::async_trait;
23use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
24use influxdb::{
25    Client, ReadQuery as InfluxRQuery, Timestamp as InfluxTimestamp, WriteQuery as InfluxWQuery,
26};
27use serde::Deserialize;
28use tracing::{debug, error, warn};
29use uuid::Uuid;
30use zenoh::{
31    bytes::{Encoding, ZBytes},
32    internal::{bail, buffers::ZBuf, zerror},
33    key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
34    query::{Parameters, TimeBound, TimeExpr, TimeRange, ZenohParameters},
35    time::Timestamp,
36    try_init_log_from_env, Error, Result as ZResult,
37};
38use zenoh_backend_traits::{
39    config::{PrivacyGetResult, PrivacyTransparentGet, StorageConfig, VolumeConfig},
40    StorageInsertionResult, *,
41};
42use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};
43
44const WORKER_THREAD_NUM: usize = 2;
45const MAX_BLOCK_THREAD_NUM: usize = 50;
46lazy_static::lazy_static! {
47    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
48    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
49               .worker_threads(WORKER_THREAD_NUM)
50               .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
51               .enable_all()
52               .build()
53               .expect("Unable to create runtime");
54}
55
56#[inline(always)]
57fn blockon_runtime<F: Future>(task: F) -> F::Output {
58    // Check whether able to get the current runtime
59    match tokio::runtime::Handle::try_current() {
60        Ok(rt) => {
61            // Able to get the current runtime (standalone binary), spawn on the current runtime
62            tokio::task::block_in_place(|| rt.block_on(task))
63        }
64        Err(_) => {
65            // Unable to get the current runtime (dynamic plugins), spawn on the global runtime
66            tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
67        }
68    }
69}
70
71// Properties used by the Backend
72pub const PROP_BACKEND_URL: &str = "url";
73pub const PROP_BACKEND_USERNAME: &str = "username";
74pub const PROP_BACKEND_PASSWORD: &str = "password";
75
76// Properties used by the Storage
77pub const PROP_STORAGE_DB: &str = "db";
78pub const PROP_STORAGE_CREATE_DB: &str = "create_db";
79pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
80pub const PROP_STORAGE_USERNAME: &str = PROP_BACKEND_USERNAME;
81pub const PROP_STORAGE_PASSWORD: &str = PROP_BACKEND_PASSWORD;
82
83// Special key for None (when the prefix being stripped exactly matches the key)
84pub const NONE_KEY: &str = "@@none_key@@";
85
86// delay after deletion to drop a measurement
87const DROP_MEASUREMENT_TIMEOUT_MS: u64 = 5000;
88
89lazy_static::lazy_static!(
90    static ref INFLUX_REGEX_ALL: String = key_exprs_to_influx_regex(&["**".try_into().unwrap()]);
91);
92
93fn get_private_conf<'a>(
94    config: &'a serde_json::Map<String, serde_json::Value>,
95    credit: &str,
96) -> ZResult<Option<&'a String>> {
97    match config.get_private(credit) {
98        PrivacyGetResult::NotFound => Ok(None),
99        PrivacyGetResult::Private(serde_json::Value::String(v)) => Ok(Some(v)),
100        PrivacyGetResult::Public(serde_json::Value::String(v)) => {
101            tracing::warn!(
102                r#"Value "{}" is given for `{}` publicly (i.e. is visible by anyone who can fetch the router configuration). You may want to replace `{}: "{}"` with `private: {{{}: "{}"}}`"#,
103                v,
104                credit,
105                credit,
106                v,
107                credit,
108                v
109            );
110            Ok(Some(v))
111        }
112        PrivacyGetResult::Both {
113            public: serde_json::Value::String(public),
114            private: serde_json::Value::String(private),
115        } => {
116            tracing::warn!(
117                r#"Value "{}" is given for `{}` publicly, but a private value also exists. The private value will be used, but the public value, which is {} the same as the private one, will still be visible in configurations."#,
118                public,
119                credit,
120                if public == private { "" } else { "not " }
121            );
122            Ok(Some(private))
123        }
124        _ => {
125            bail!("Optional property `{}` must be a string", credit)
126        }
127    }
128}
129
130pub struct InfluxDbBackend {}
131
132#[cfg(feature = "dynamic_plugin")]
133zenoh_plugin_trait::declare_plugin!(InfluxDbBackend);
134
135impl Plugin for InfluxDbBackend {
136    type StartArgs = VolumeConfig;
137    type Instance = VolumeInstance;
138
139    const DEFAULT_NAME: &'static str = "influxdb_backend";
140    const PLUGIN_VERSION: &'static str = plugin_version!();
141    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
142
143    fn start(_name: &str, config: &Self::StartArgs) -> ZResult<Self::Instance> {
144        try_init_log_from_env();
145
146        debug!("InfluxDB backend {}", Self::PLUGIN_VERSION);
147
148        let mut config = config.clone();
149        config
150            .rest
151            .insert("version".into(), Self::PLUGIN_VERSION.into());
152
153        let url = match config.rest.get(PROP_BACKEND_URL) {
154            Some(serde_json::Value::String(url)) => url.clone(),
155            _ => {
156                bail!(
157                    "Mandatory property `{}` for InfluxDb Backend must be a string",
158                    PROP_BACKEND_URL
159                )
160            }
161        };
162
163        // The InfluxDB client used for administration purposes (show/create/drop databases)
164        let mut admin_client = Client::new(url, "");
165
166        // Note: remove username/password from properties to not re-expose them in admin_status
167        let credentials = match (
168            get_private_conf(&config.rest, PROP_BACKEND_USERNAME)?,
169            get_private_conf(&config.rest, PROP_BACKEND_PASSWORD)?,
170        ) {
171            (Some(username), Some(password)) => {
172                admin_client = admin_client.with_auth(username, password);
173                Some((username.clone(), password.clone()))
174            }
175            (None, None) => None,
176            _ => {
177                bail!(
178                    "Optional properties `{}` and `{}` must coexist",
179                    PROP_BACKEND_USERNAME,
180                    PROP_BACKEND_PASSWORD
181                )
182            }
183        };
184
185        // Check connectivity to InfluxDB, trying to list databases
186        match blockon_runtime(async { show_databases(&admin_client).await }) {
187            Ok(dbs) => {
188                // trick: if "_internal" db is not shown, it means the credentials are not for an admin
189                if !dbs.iter().any(|e| e == "_internal") {
190                    warn!("The InfluxDB credentials are not for an admin user; the volume won't be able to create or drop any database")
191                }
192            }
193            Err(e) => bail!("Failed to create InfluxDb Volume : {}", e),
194        }
195
196        Ok(Box::new(InfluxDbVolume {
197            admin_status: config,
198            admin_client,
199            credentials,
200        }))
201    }
202}
203
204pub struct InfluxDbVolume {
205    admin_status: VolumeConfig,
206    admin_client: Client,
207    credentials: Option<(String, String)>,
208}
209
210#[async_trait]
211impl Volume for InfluxDbVolume {
212    fn get_admin_status(&self) -> serde_json::Value {
213        self.admin_status.to_json_value()
214    }
215
216    fn get_capability(&self) -> Capability {
217        Capability {
218            persistence: Persistence::Durable,
219            history: History::All,
220        }
221    }
222
223    async fn create_storage(&self, mut config: StorageConfig) -> ZResult<Box<dyn Storage>> {
224        let volume_cfg = match config.volume_cfg.as_object() {
225            Some(v) => v,
226            None => bail!("InfluxDB backed storages need some volume-specific configuration"),
227        };
228        let on_closure = match volume_cfg.get(PROP_STORAGE_ON_CLOSURE) {
229            Some(serde_json::Value::String(x)) if x == "drop_series" => OnClosure::DropSeries,
230            Some(serde_json::Value::String(x)) if x == "drop_db" => OnClosure::DropDb,
231            Some(serde_json::Value::String(x)) if x == "do_nothing" => OnClosure::DoNothing,
232            None => OnClosure::DoNothing,
233            Some(_) => {
234                bail!(
235                    r#"`{}` property of storage `{}` must be one of "do_nothing" (default), "drop_db" and "drop_series""#,
236                    PROP_STORAGE_ON_CLOSURE,
237                    &config.name
238                )
239            }
240        };
241        let (db, createdb) = match volume_cfg.get(PROP_STORAGE_DB) {
242            Some(serde_json::Value::String(s)) => (
243                s.clone(),
244                match volume_cfg.get(PROP_STORAGE_CREATE_DB) {
245                    None | Some(serde_json::Value::Bool(false)) => false,
246                    Some(serde_json::Value::Bool(true)) => true,
247                    Some(_) => todo!(),
248                },
249            ),
250            None => (generate_db_name(), true),
251            Some(v) => bail!("Invalid value for ${PROP_STORAGE_DB} config property: ${v}"),
252        };
253
254        // The Influx client on database used to write/query on this storage
255        // (using the same URL than backend's admin_client, but with storage credentials)
256        let mut client = Client::new(self.admin_client.database_url(), &db);
257
258        // Use credentials if specified in storage's volume config
259        let storage_username = match (
260            get_private_conf(volume_cfg, PROP_STORAGE_USERNAME)?,
261            get_private_conf(volume_cfg, PROP_STORAGE_PASSWORD)?,
262        ) {
263            (Some(username), Some(password)) => {
264                client = client.with_auth(username, password);
265                Some(username.clone())
266            }
267            (None, None) => None,
268            _ => {
269                bail!(
270                    "Optional properties `{}` and `{}` must coexist",
271                    PROP_STORAGE_USERNAME,
272                    PROP_STORAGE_PASSWORD
273                )
274            }
275        };
276
277        // Check if the database exists (using storages credentials)
278        if !is_db_existing(&client, &db).await? {
279            if createdb {
280                // create db using backend's credentials
281                create_db(&self.admin_client, &db, storage_username).await?;
282            } else {
283                bail!("Database '{}' doesn't exist in InfluxDb", db)
284            }
285        }
286
287        // re-insert the actual name of database (in case it has been generated)
288        config
289            .volume_cfg
290            .as_object_mut()
291            .unwrap()
292            .entry(PROP_STORAGE_DB)
293            .or_insert(db.clone().into());
294
295        // The Influx client on database with backend's credentials (admin), to drop measurements and database
296        let mut admin_client = Client::new(self.admin_client.database_url(), db);
297        if let Some((username, password)) = &self.credentials {
298            admin_client = admin_client.with_auth(username, password);
299        }
300
301        Ok(Box::new(InfluxDbStorage {
302            config,
303            admin_client,
304            client,
305            on_closure,
306        }))
307    }
308}
309
310enum OnClosure {
311    DropDb,
312    DropSeries,
313    DoNothing,
314}
315
316impl TryFrom<&Parameters<'_>> for OnClosure {
317    type Error = Error;
318    fn try_from(p: &Parameters) -> ZResult<OnClosure> {
319        match p.get(PROP_STORAGE_ON_CLOSURE) {
320            Some(s) => {
321                if s == "drop_db" {
322                    Ok(OnClosure::DropDb)
323                } else if s == "drop_series" {
324                    Ok(OnClosure::DropSeries)
325                } else {
326                    bail!("Unsupported value for 'on_closure' property: {}", s)
327                }
328            }
329            None => Ok(OnClosure::DoNothing),
330        }
331    }
332}
333
334struct InfluxDbStorage {
335    config: StorageConfig,
336    admin_client: Client,
337    client: Client,
338    on_closure: OnClosure,
339}
340
341impl InfluxDbStorage {
342    async fn get_deletion_timestamp(&self, measurement: &str) -> ZResult<Option<Timestamp>> {
343        #[derive(Deserialize, Debug, PartialEq)]
344        struct QueryResult {
345            timestamp: String,
346        }
347
348        let query = InfluxRQuery::new(format!(
349            r#"SELECT "timestamp" FROM "{measurement}" WHERE kind='DEL' ORDER BY time DESC LIMIT 1"#
350        ));
351        match self.client.json_query(query).await {
352            Ok(mut result) => match result.deserialize_next::<QueryResult>() {
353                Ok(qr) => {
354                    if !qr.series.is_empty() && !qr.series[0].values.is_empty() {
355                        let ts = qr.series[0].values[0]
356                            .timestamp
357                            .parse::<Timestamp>()
358                            .map_err(|err| {
359                                zerror!(
360                                "Failed to parse the latest timestamp for deletion of measurement {} : {}",
361                                measurement, err.cause)
362                            })?;
363                        Ok(Some(ts))
364                    } else {
365                        Ok(None)
366                    }
367                }
368                Err(err) => bail!(
369                    "Failed to get latest timestamp for deletion of measurement {} : {}",
370                    measurement,
371                    err
372                ),
373            },
374            Err(err) => bail!(
375                "Failed to get latest timestamp for deletion of measurement {} : {}",
376                measurement,
377                err
378            ),
379        }
380    }
381
382    async fn schedule_measurement_drop(&self, measurement: &str) {
383        let m_string = measurement.to_string();
384        let cloned_client = self.client.clone();
385
386        // Wait till timeout expires and execute drop,
387        // When this plugin executes as a dynamically loaded plugin,
388        // the zenohd tokio-runtime is unavailable, therefore the plugin's runtime must be invoked
389        let async_drop = async {
390            tokio::time::sleep(Duration::from_millis(DROP_MEASUREMENT_TIMEOUT_MS)).await;
391            drop_measurement(m_string, cloned_client).await;
392        };
393
394        match tokio::runtime::Handle::try_current() {
395            Ok(handle) => handle.spawn(async_drop),
396            Err(_) => TOKIO_RUNTIME.spawn(async_drop),
397        };
398    }
399
400    fn keyexpr_from_serie(&self, serie_name: &str) -> ZResult<Option<OwnedKeyExpr>> {
401        if serie_name.eq(NONE_KEY) {
402            Ok(None)
403        } else {
404            match OwnedKeyExpr::from_str(serie_name) {
405                Ok(key) => Ok(Some(key)),
406                Err(e) => Err(format!("{}", e).into()),
407            }
408        }
409    }
410}
411
412#[async_trait]
413impl Storage for InfluxDbStorage {
414    fn get_admin_status(&self) -> serde_json::Value {
415        // TODO: possibly add more properties in returned Value for more information about this storage
416        self.config.to_json_value()
417    }
418
419    async fn put(
420        &mut self,
421        key: Option<OwnedKeyExpr>,
422        payload: ZBytes,
423        encoding: Encoding,
424        timestamp: Timestamp,
425    ) -> ZResult<StorageInsertionResult> {
426        let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
427
428        // Note: assume that uhlc timestamp was generated by a clock using UNIX_EPOCH (that's the case by default)
429        let influx_time = timestamp.get_time().to_duration().as_nanos();
430
431        // get timestamp of deletion of this measurement, if any
432        if let Some(del_time) = self.get_deletion_timestamp(measurement.as_str()).await? {
433            // ignore sample if oldest than the deletion
434            if timestamp < del_time {
435                debug!(
436                    "Received a value for {:?} with timestamp older than its deletion; ignore it",
437                    measurement
438                );
439                return Ok(StorageInsertionResult::Outdated);
440            }
441        }
442
443        // encode the value as a string to be stored in InfluxDB, converting to base64 if the buffer is not a UTF-8 string
444        let (base64, strvalue) = match payload.try_to_string() {
445            Ok(s) => (false, s),
446            Err(_) => (true, b64_std_engine.encode(payload.to_bytes()).into()),
447        };
448
449        // Note: tags are stored as strings in InfluxDB, while fileds are typed.
450        // For simpler/faster deserialization, we store encoding, timestamp and base64 as fields.
451        // while the kind is stored as a tag to be indexed by InfluxDB and have faster queries on it.
452        let encoding_string_rep = encoding.to_string(); // add_field only supports Strings and not Vec<u8>
453
454        let query = InfluxWQuery::new(
455            InfluxTimestamp::Nanoseconds(influx_time),
456            measurement.clone(),
457        )
458        .add_tag("kind", "PUT")
459        .add_field("timestamp", timestamp.to_string())
460        .add_field("encoding_prefix", encoding.id())
461        .add_field("encoding_suffix", encoding_string_rep) // TODO: Rename To Encoding and only keep String rep
462        .add_field("base64", base64)
463        .add_field("value", strvalue.as_ref());
464
465        debug!("Put {:?} with Influx query: {:?}", measurement, query);
466        if let Err(e) = self.client.query(&query).await {
467            bail!(
468                "Failed to put Value for {:?} in InfluxDb storage : {}",
469                measurement,
470                e
471            )
472        } else {
473            Ok(StorageInsertionResult::Inserted)
474        }
475    }
476
477    async fn delete(
478        &mut self,
479        key: Option<OwnedKeyExpr>,
480        timestamp: Timestamp,
481    ) -> ZResult<StorageInsertionResult> {
482        let measurement = key.unwrap_or_else(|| OwnedKeyExpr::from_str(NONE_KEY).unwrap());
483
484        // Note: assume that uhlc timestamp was generated by a clock using UNIX_EPOCH (that's the case by default)
485        let influx_time = timestamp.get_time().to_duration().as_nanos();
486
487        // delete all points from the measurement that are older than this DELETE message
488        // (in case more recent PUT have been recevived un-ordered)
489        let query = InfluxRQuery::new(format!(
490            r#"DELETE FROM "{}" WHERE time < {}"#,
491            measurement, influx_time
492        ));
493        debug!("Delete {:?} with Influx query: {:?}", measurement, query);
494        if let Err(e) = self.client.query(&query).await {
495            bail!(
496                "Failed to delete points for measurement '{}' from InfluxDb storage : {}",
497                measurement,
498                e
499            )
500        }
501        // store a point (with timestamp) with "delete" tag, thus we don't re-introduce an older point later
502        let query = InfluxWQuery::new(
503            InfluxTimestamp::Nanoseconds(influx_time),
504            measurement.clone(),
505        )
506        .add_tag("kind", "DEL")
507        .add_field("timestamp", timestamp.to_string())
508        .add_field("encoding_prefix", 0_u8)
509        .add_field("encoding_suffix", "")
510        .add_field("base64", false)
511        .add_field("value", "");
512        debug!(
513            "Mark measurement {} as deleted at time {}",
514            measurement, influx_time
515        );
516        if let Err(e) = self.client.query(&query).await {
517            bail!(
518                "Failed to mark measurement {:?} as deleted : {}",
519                measurement,
520                e
521            )
522        }
523        // schedule the drop of measurement later in the future, if it's empty
524        self.schedule_measurement_drop(measurement.as_str()).await;
525        Ok(StorageInsertionResult::Deleted)
526    }
527
528    async fn get(
529        &mut self,
530        key: Option<OwnedKeyExpr>,
531        parameters: &str,
532    ) -> ZResult<Vec<StoredData>> {
533        let measurement = match key {
534            Some(k) => k,
535            None => OwnedKeyExpr::from_str(NONE_KEY).unwrap(),
536        };
537        // convert the key expression into an Influx regex
538        let regex = key_exprs_to_influx_regex(&[&KeyExpr::from(measurement)]);
539
540        // construct the Influx query clauses from the parameters
541        let clauses = clauses_from_parameters(parameters)?;
542
543        // the Influx query
544        let influx_query_str = format!("SELECT * FROM {regex} {clauses}");
545        let influx_query = InfluxRQuery::new(&influx_query_str);
546
547        // the expected JSon type resulting from the query
548        #[derive(Deserialize, Debug)]
549        struct ZenohPoint {
550            #[allow(dead_code)]
551            // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust...
552            kind: String,
553            timestamp: String,
554            encoding_prefix: u8,
555            encoding_suffix: String,
556            base64: bool,
557            value: String,
558        }
559
560        let mut result = Vec::new();
561        match self.client.json_query(influx_query).await {
562            Ok(mut query_result) => {
563                while !query_result.results.is_empty() {
564                    match query_result.deserialize_next::<ZenohPoint>() {
565                        Ok(retn) => {
566                            // for each serie
567                            for serie in retn.series {
568                                // get the key expression from the serie name
569                                let ke = match self.keyexpr_from_serie(&serie.name) {
570                                    Ok(k) => k,
571                                    Err(e) => {
572                                        error!(
573                                            "Error replying with serie '{}' : {}",
574                                            serie.name, e
575                                        );
576                                        continue;
577                                    }
578                                };
579                                debug!("Replying {} values for {:?}", serie.values.len(), ke);
580                                // for each point
581                                for zpoint in serie.values {
582                                    // get the encoding
583
584                                    let encoding = if zpoint.encoding_suffix.is_empty() {
585                                        Encoding::new(zpoint.encoding_prefix.into(), None)
586                                    } else {
587                                        Encoding::from(zpoint.encoding_suffix)
588                                    };
589                                    // get the payload
590                                    let payload = if zpoint.base64 {
591                                        match b64_std_engine.decode(zpoint.value) {
592                                            Ok(v) => ZBuf::from(v),
593                                            Err(e) => {
594                                                warn!(
595                                                    r#"Failed to decode zenoh base64 Value from Influx point {} with timestamp="{}": {}"#,
596                                                    serie.name, zpoint.timestamp, e
597                                                );
598                                                continue;
599                                            }
600                                        }
601                                    } else {
602                                        ZBuf::from(zpoint.value.into_bytes())
603                                    };
604                                    // get the timestamp
605                                    let timestamp = match Timestamp::from_str(&zpoint.timestamp) {
606                                        Ok(t) => t,
607                                        Err(e) => {
608                                            warn!(
609                                                r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
610                                                serie.name, zpoint.timestamp, e
611                                            );
612                                            continue;
613                                        }
614                                    };
615                                    result.push(StoredData {
616                                        payload: payload.into(),
617                                        encoding,
618                                        timestamp,
619                                    });
620                                }
621                            }
622                        }
623                        Err(e) => {
624                            bail!(
625                                "Failed to parse result of InfluxDB query '{}': {}",
626                                influx_query_str,
627                                e
628                            )
629                        }
630                    }
631                }
632            }
633            Err(e) => bail!(
634                "Failed to query InfluxDb with '{}' : {}",
635                influx_query_str,
636                e
637            ),
638        }
639        Ok(result)
640    }
641
642    async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
643        let mut result = Vec::new();
644
645        // the Influx query: 1 entry == 1 measurement => get only 1 point per measurement (the more recent timestamp)
646        let influx_query_str = format!(
647            "SELECT * FROM {} ORDER BY time DESC LIMIT 1",
648            *INFLUX_REGEX_ALL
649        );
650        let influx_query = InfluxRQuery::new(&influx_query_str);
651
652        // the expected JSon type resulting from the query
653        #[derive(Deserialize, Debug)]
654        struct ZenohPoint {
655            #[allow(dead_code)]
656            // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust...
657            kind: String,
658            timestamp: String,
659        }
660        debug!("Get all entries with Influx query: {}", influx_query_str);
661        match self.client.json_query(influx_query).await {
662            Ok(mut query_result) => {
663                while !query_result.results.is_empty() {
664                    match query_result.deserialize_next::<ZenohPoint>() {
665                        Ok(retn) => {
666                            // for each serie
667                            for serie in retn.series {
668                                // get the key expression from the serie name
669                                match self.keyexpr_from_serie(&serie.name) {
670                                    Ok(ke) => {
671                                        debug!(
672                                            "Replying {} values for {:?}",
673                                            serie.values.len(),
674                                            ke
675                                        );
676                                        // for each point in the serie
677                                        for zpoint in serie.values {
678                                            // get the timestamp (ignore the point if failing)
679                                            match Timestamp::from_str(&zpoint.timestamp) {
680                                                Ok(timestamp) => {
681                                                    result.push((ke.clone(), timestamp))
682                                                }
683                                                Err(e) => warn!(
684                                                    r#"Failed to decode zenoh Timestamp from Influx point {} with timestamp="{}": {:?}"#,
685                                                    serie.name, zpoint.timestamp, e
686                                                ),
687                                            };
688                                        }
689                                    }
690                                    Err(e) => {
691                                        error!("Error replying with serie '{}' : {}", serie.name, e)
692                                    }
693                                };
694                            }
695                        }
696                        Err(e) => {
697                            bail!(
698                                "Failed to parse result of InfluxDB query '{}': {}",
699                                influx_query_str,
700                                e
701                            )
702                        }
703                    }
704                }
705                Ok(result)
706            }
707            Err(e) => bail!(
708                "Failed to query InfluxDb with '{}' : {}",
709                influx_query_str,
710                e
711            ),
712        }
713    }
714}
715
716impl Drop for InfluxDbStorage {
717    fn drop(&mut self) {
718        debug!("Closing InfluxDB storage");
719        match self.on_closure {
720            OnClosure::DropDb => {
721                blockon_runtime(async move {
722                    let db = self.admin_client.database_name();
723                    debug!("Close InfluxDB storage, dropping database {}", db);
724                    let query = InfluxRQuery::new(format!(r#"DROP DATABASE "{db}""#));
725                    if let Err(e) = self.admin_client.query(&query).await {
726                        error!("Failed to drop InfluxDb database '{}' : {}", db, e)
727                    }
728                });
729            }
730            OnClosure::DropSeries => {
731                blockon_runtime(async move {
732                    let db = self.client.database_name();
733                    debug!(
734                        "Close InfluxDB storage, dropping all series from database {}",
735                        db
736                    );
737                    let query = InfluxRQuery::new("DROP SERIES FROM /.*/");
738                    if let Err(e) = self.client.query(&query).await {
739                        error!(
740                            "Failed to drop all series from InfluxDb database '{}' : {}",
741                            db, e
742                        )
743                    }
744                });
745            }
746            OnClosure::DoNothing => {
747                debug!(
748                    "Close InfluxDB storage, keeping database {} as it is",
749                    self.client.database_name()
750                );
751            }
752        }
753    }
754}
755
756async fn drop_measurement(measurement: String, client: Client) {
757    #[derive(Deserialize, Debug, PartialEq)]
758    struct QueryResult {
759        kind: String,
760    }
761
762    // check if there is at least 1 point without "DEL" kind in the measurement
763    let query = InfluxRQuery::new(format!(
764        r#"SELECT "kind" FROM "{}" WHERE kind!='DEL' LIMIT 1"#,
765        measurement
766    ));
767    match client.json_query(query).await {
768        Ok(mut result) => {
769            match result.deserialize_next::<QueryResult>() {
770                Ok(qr) => {
771                    if !qr.series.is_empty() {
772                        debug!("Measurement {} contains new values inserted after deletion; don't drop it", measurement);
773                        return;
774                    }
775                }
776                Err(e) => {
777                    warn!(
778                        "Failed to check if measurement '{}' is empty (can't drop it) : {}",
779                        measurement, e
780                    );
781                }
782            }
783        }
784        Err(e) => {
785            warn!(
786                "Failed to check if measurement '{}' is empty (can't drop it) : {}",
787                measurement, e
788            );
789            return;
790        }
791    }
792
793    // drop the measurement
794    let query = InfluxRQuery::new(format!(r#"DROP MEASUREMENT "{}""#, measurement));
795    debug!(
796        "Drop measurement {} after timeout with Influx query: {:?}",
797        measurement, query
798    );
799    if let Err(e) = client.query(&query).await {
800        warn!(
801            "Failed to drop measurement '{}' from InfluxDb storage : {}",
802            measurement, e
803        );
804    }
805}
806
807fn generate_db_name() -> String {
808    format!("zenoh_db_{}", Uuid::new_v4().simple())
809}
810
811async fn show_databases(client: &Client) -> ZResult<Vec<String>> {
812    #[derive(Deserialize)]
813    struct Database {
814        name: String,
815    }
816    let query = InfluxRQuery::new("SHOW DATABASES");
817    debug!("List databases with Influx query: {:?}", query);
818    match client.json_query(query).await {
819        Ok(mut result) => match result.deserialize_next::<Database>() {
820            Ok(dbs) => {
821                let mut result: Vec<String> = Vec::new();
822                for serie in dbs.series {
823                    for db in serie.values {
824                        result.push(db.name);
825                    }
826                }
827                Ok(result)
828            }
829            Err(e) => bail!(
830                "Failed to parse list of existing InfluxDb databases : {}",
831                e
832            ),
833        },
834        Err(e) => bail!("Failed to list existing InfluxDb databases : {}", e),
835    }
836}
837
838async fn is_db_existing(client: &Client, db_name: &str) -> ZResult<bool> {
839    let dbs = show_databases(client).await?;
840    Ok(dbs.iter().any(|e| e == db_name))
841}
842
843async fn create_db(
844    client: &Client,
845    db_name: &str,
846    storage_username: Option<String>,
847) -> ZResult<()> {
848    let query = InfluxRQuery::new(format!(r#"CREATE DATABASE "{db_name}""#));
849    debug!("Create Influx database: {}", db_name);
850    if let Err(e) = client.query(&query).await {
851        bail!(
852            "Failed to create new InfluxDb database '{}' : {}",
853            db_name,
854            e
855        )
856    }
857    debug!("after await: {}", db_name);
858    // is a username is specified for storage access, grant him access to the database
859    if let Some(username) = storage_username {
860        let query = InfluxRQuery::new(format!(r#"GRANT ALL ON "{db_name}" TO "{username}""#));
861        debug!(
862            "Grant access to {} on Influx database: {}",
863            username, db_name
864        );
865        if let Err(e) = client.query(&query).await {
866            bail!(
867                "Failed grant access to {} on Influx database '{}' : {}",
868                username,
869                db_name,
870                e
871            )
872        }
873    }
874    Ok(())
875}
876
877// Returns an InfluxDB regex (see https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#regular-expressions)
878// corresponding to the list of path expressions. I.e.:
879// Replace "**" with ".*", "*" with "[^\/]*"  and "/" with "\/".
880// Concat each with "|", and surround the result with '/^' and '$/'.
881fn key_exprs_to_influx_regex(path_exprs: &[&keyexpr]) -> String {
882    let mut result = String::with_capacity(2 * path_exprs[0].len());
883    result.push_str("/^");
884    for (i, path_expr) in path_exprs.iter().enumerate() {
885        if i != 0 {
886            result.push('|');
887        }
888        let mut chars = path_expr.chars().peekable();
889        while let Some(c) = chars.next() {
890            match c {
891                '*' => {
892                    if let Some(c2) = chars.peek() {
893                        if c2 == &'*' {
894                            result.push_str(".*");
895                            chars.next();
896                        } else {
897                            result.push_str(".*")
898                        }
899                    }
900                }
901                '/' => result.push_str(r"\/"),
902                _ => result.push(c),
903            }
904        }
905    }
906    result.push_str("$/");
907    result
908}
909
910fn clauses_from_parameters(p: &str) -> ZResult<String> {
911    let parameters = Parameters::from(p);
912    let mut result = String::with_capacity(256);
913    result.push_str("WHERE kind!='DEL'");
914
915    let time_range = match parameters.time_range() {
916        Some(time_range) => time_range,
917        None => {
918            result.push_str(" ORDER BY time DESC LIMIT 1");
919            return Ok(result);
920        }
921    };
922    match time_range {
923        Ok(TimeRange { start, end }) => {
924            match start {
925                TimeBound::Inclusive(t) => {
926                    result.push_str(" AND time >= ");
927                    write_timeexpr(&mut result, t);
928                }
929                TimeBound::Exclusive(t) => {
930                    result.push_str(" AND time > ");
931                    write_timeexpr(&mut result, t);
932                }
933                TimeBound::Unbounded => {}
934            }
935            match end {
936                TimeBound::Inclusive(t) => {
937                    result.push_str(" AND time <= ");
938                    write_timeexpr(&mut result, t);
939                }
940                TimeBound::Exclusive(t) => {
941                    result.push_str(" AND time < ");
942                    write_timeexpr(&mut result, t);
943                }
944                TimeBound::Unbounded => {}
945            }
946        }
947        Err(err) => {
948            warn!("Error In TimeRange parse from String {}", err);
949            //No time selection, return only latest values
950            result.push_str(" ORDER BY time DESC LIMIT 1");
951        }
952    }
953    Ok(result)
954}
955
956fn write_timeexpr(s: &mut String, t: TimeExpr) {
957    use std::fmt::Write;
958
959    use humantime::format_rfc3339;
960    match t {
961        TimeExpr::Fixed(t) => write!(s, "'{}'", format_rfc3339(t)),
962        TimeExpr::Now { offset_secs } => write!(s, "now(){offset_secs:+}s"),
963    }
964    .unwrap()
965}