zenoh_plugin_zenoh_flow/
lib.rs1use git_version::git_version;
16use std::env;
17use std::sync::Arc;
18use zenoh::plugins::{Plugin, RunningPluginTrait, Runtime, ZenohPlugin};
19use zenoh::prelude::r#async::*;
20use zenoh::Result as ZResult;
21use zenoh_core::{bail, zerror};
22
23use zenoh_flow_daemon::daemon::Daemon;
24use zenoh_flow_daemon::daemon::DaemonConfig;
25
26use flume::{Receiver, Sender};
27
28extern crate zenoh_core;
29
30pub const GIT_VERSION: &str = git_version!(prefix = "v", cargo_prefix = "v");
31lazy_static::lazy_static! {
32 pub static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
33}
34
35zenoh_plugin_trait::declare_plugin!(ZenohFlowPlugin);
36
37macro_rules! ke_for_sure {
38 ($val:expr) => {
39 unsafe { keyexpr::from_str_unchecked($val) }
40 };
41}
42
43pub struct ZenohFlowPlugin(Sender<()>);
44
45impl ZenohPlugin for ZenohFlowPlugin {}
46impl Plugin for ZenohFlowPlugin {
47 type StartArgs = Runtime;
48 type RunningPlugin = zenoh::plugins::RunningPlugin;
49
50 const STATIC_NAME: &'static str = "zenoh_flow";
51
52 fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<Self::RunningPlugin> {
53 let _ = env_logger::try_init();
57
58 let runtime_conf = runtime.config.lock();
59 let plugin_conf = runtime_conf
60 .plugin(name)
61 .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
62
63 let config: DaemonConfig = serde_json::from_value(plugin_conf.clone())
64 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
65
66 let (tx, rx) = flume::bounded(1);
67
68 async_std::task::spawn(run(runtime.clone(), config, rx));
69
70 Ok(Box::new(ZenohFlowPlugin(tx)))
71 }
72}
73
74impl RunningPluginTrait for ZenohFlowPlugin {
75 fn config_checker(&self) -> zenoh::plugins::ValidationFunction {
76 Arc::new(|_, _, _| bail!("zenoh-flow-plugin does not support hot configuration changes."))
77 }
78
79 fn adminspace_getter<'a>(
80 &'a self,
81 selector: &'a Selector<'a>,
82 plugin_status_key: &str,
83 ) -> ZResult<Vec<zenoh::plugins::Response>> {
84 let mut responses = Vec::new();
85 let version_key = [plugin_status_key, "/__version__"].concat();
86 if selector.key_expr.intersects(ke_for_sure!(&version_key)) {
87 responses.push(zenoh::plugins::Response::new(
88 version_key,
89 GIT_VERSION.into(),
90 ));
91 }
92 Ok(responses)
93 }
94}
95
96async fn run(runtime: Runtime, config: DaemonConfig, rx: Receiver<()>) -> ZResult<()> {
97 let _ = env_logger::try_init();
101
102 async_std::task::sleep(std::time::Duration::from_secs(5)).await;
107
108 log::debug!("Zenoh-Flow plugin {}", LONG_VERSION.as_str());
109 log::debug!("Zenoh-Flow plugin {:?}", config);
110
111 let zsession = Arc::new(zenoh::init(runtime).res().await?);
112
113 let daemon = Daemon::from_session_and_config(zsession, config)
114 .map_err(|e| zerror!("Plugin `zenoh-flow-plugin` configuration error: {}", e))?;
115
116 let (s, h) = daemon
117 .start()
118 .await
119 .map_err(|e| zerror!("Plugin `zenoh-flow-plugin` unable to start runtime {}", e))?;
120
121 if (rx.recv_async().await).is_ok() {
122 log::debug!("Zenoh-Flow plugin, stopping.");
123 } else {
124 log::warn!("Zenoh-Flow plugin, stopping, channel sender was dropped!");
125 }
126
127 daemon.stop(s).await.map_err(|e| {
128 zerror!(
129 "Plugin `zenoh-flow-plugin` error when stopping the runtime {}",
130 e
131 )
132 })?;
133 h.await.map_err(|e| {
135 zerror!(
136 "Plugin `zenoh-flow-plugin` error while runtime shutdown {}",
137 e
138 )
139 })?;
140
141 Ok(())
142}
143
144impl Drop for ZenohFlowPlugin {
145 fn drop(&mut self) {
146 let res = self.0.send(());
147 log::debug!("Zenoh Flow plugin, shuting down: {:?}", res);
148 }
149}