zenoh_plugin_zenoh_flow/
lib.rs

1//
2// Copyright (c) 2021 - 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use git_version::git_version;
16use std::env;
17use std::sync::Arc;
18use zenoh::plugins::{Plugin, RunningPluginTrait, Runtime, ZenohPlugin};
19use zenoh::prelude::r#async::*;
20use zenoh::Result as ZResult;
21use zenoh_core::{bail, zerror};
22
23use zenoh_flow_daemon::daemon::Daemon;
24use zenoh_flow_daemon::daemon::DaemonConfig;
25
26use flume::{Receiver, Sender};
27
28extern crate zenoh_core;
29
30pub const GIT_VERSION: &str = git_version!(prefix = "v", cargo_prefix = "v");
31lazy_static::lazy_static! {
32    pub static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
33}
34
35zenoh_plugin_trait::declare_plugin!(ZenohFlowPlugin);
36
37macro_rules! ke_for_sure {
38    ($val:expr) => {
39        unsafe { keyexpr::from_str_unchecked($val) }
40    };
41}
42
43pub struct ZenohFlowPlugin(Sender<()>);
44
45impl ZenohPlugin for ZenohFlowPlugin {}
46impl Plugin for ZenohFlowPlugin {
47    type StartArgs = Runtime;
48    type RunningPlugin = zenoh::plugins::RunningPlugin;
49
50    const STATIC_NAME: &'static str = "zenoh_flow";
51
52    fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<Self::RunningPlugin> {
53        // Try to initiate login.
54        // Required in case of dynamic lib, otherwise no logs.
55        // But cannot be done twice in case of static link.
56        let _ = env_logger::try_init();
57
58        let runtime_conf = runtime.config.lock();
59        let plugin_conf = runtime_conf
60            .plugin(name)
61            .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
62
63        let config: DaemonConfig = serde_json::from_value(plugin_conf.clone())
64            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
65
66        let (tx, rx) = flume::bounded(1);
67
68        async_std::task::spawn(run(runtime.clone(), config, rx));
69
70        Ok(Box::new(ZenohFlowPlugin(tx)))
71    }
72}
73
74impl RunningPluginTrait for ZenohFlowPlugin {
75    fn config_checker(&self) -> zenoh::plugins::ValidationFunction {
76        Arc::new(|_, _, _| bail!("zenoh-flow-plugin does not support hot configuration changes."))
77    }
78
79    fn adminspace_getter<'a>(
80        &'a self,
81        selector: &'a Selector<'a>,
82        plugin_status_key: &str,
83    ) -> ZResult<Vec<zenoh::plugins::Response>> {
84        let mut responses = Vec::new();
85        let version_key = [plugin_status_key, "/__version__"].concat();
86        if selector.key_expr.intersects(ke_for_sure!(&version_key)) {
87            responses.push(zenoh::plugins::Response::new(
88                version_key,
89                GIT_VERSION.into(),
90            ));
91        }
92        Ok(responses)
93    }
94}
95
96async fn run(runtime: Runtime, config: DaemonConfig, rx: Receiver<()>) -> ZResult<()> {
97    // Try to initiate login.
98    // Required in case of dynamic lib, otherwise no logs.
99    // But cannot be done twice in case of static link.
100    let _ = env_logger::try_init();
101
102    //FIXME: sleeping here in order to wait for subscriptions to be propagated.
103    // Because we are starting the plugin, that then does a PUT in Zenoh.
104    // Thus, if the subscription matching the key is not yet propagated,
105    // the information is lost, making impossible to discover the runtime.
106    async_std::task::sleep(std::time::Duration::from_secs(5)).await;
107
108    log::debug!("Zenoh-Flow plugin {}", LONG_VERSION.as_str());
109    log::debug!("Zenoh-Flow plugin {:?}", config);
110
111    let zsession = Arc::new(zenoh::init(runtime).res().await?);
112
113    let daemon = Daemon::from_session_and_config(zsession, config)
114        .map_err(|e| zerror!("Plugin `zenoh-flow-plugin` configuration error: {}", e))?;
115
116    let (s, h) = daemon
117        .start()
118        .await
119        .map_err(|e| zerror!("Plugin `zenoh-flow-plugin` unable to start runtime {}", e))?;
120
121    if (rx.recv_async().await).is_ok() {
122        log::debug!("Zenoh-Flow plugin, stopping.");
123    } else {
124        log::warn!("Zenoh-Flow plugin, stopping, channel sender was dropped!");
125    }
126
127    daemon.stop(s).await.map_err(|e| {
128        zerror!(
129            "Plugin `zenoh-flow-plugin` error when stopping the runtime {}",
130            e
131        )
132    })?;
133    //wait for the futures to ends
134    h.await.map_err(|e| {
135        zerror!(
136            "Plugin `zenoh-flow-plugin` error while runtime shutdown {}",
137            e
138        )
139    })?;
140
141    Ok(())
142}
143
144impl Drop for ZenohFlowPlugin {
145    fn drop(&mut self) {
146        let res = self.0.send(());
147        log::debug!("Zenoh Flow plugin, shuting down: {:?}", res);
148    }
149}