zenoh_plugin_webserver/
lib.rs1use std::{
15 future::Future,
16 str::FromStr,
17 sync::{
18 atomic::{AtomicUsize, Ordering},
19 Arc,
20 },
21};
22
23use futures::stream::TryStreamExt;
24use tide::{http::Mime, Request, Response, Server, StatusCode};
25use tokio::task::JoinHandle;
26use tracing::debug;
27use zenoh::{
28 bytes::{Encoding, ZBytes},
29 internal::{
30 bail,
31 plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
32 runtime::Runtime,
33 zerror,
34 },
35 query::Selector,
36 sample::Sample,
37 Result as ZResult, Session,
38};
39use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
40
41mod config;
42use config::Config;
43
44lazy_static::lazy_static! {
45 static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
46 static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
47 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
49 .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
50 .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
51 .enable_all()
52 .build()
53 .expect("Unable to create runtime");
54}
55#[inline(always)]
56pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
57where
58 F: Future + Send + 'static,
59 F::Output: Send + 'static,
60{
61 match tokio::runtime::Handle::try_current() {
63 Ok(rt) => {
64 rt.spawn(task)
66 }
67 Err(_) => {
68 TOKIO_RUNTIME.spawn(task)
70 }
71 }
72}
73
74const DEFAULT_DIRECTORY_INDEX: &str = "index.html";
75
76lazy_static::lazy_static! {
77 static ref DEFAULT_MIME: Mime = Mime::from_str(&Encoding::APPLICATION_OCTET_STREAM.to_string()).unwrap();
78}
79pub struct WebServerPlugin;
80impl PluginControl for WebServerPlugin {}
81impl ZenohPlugin for WebServerPlugin {}
82
83impl Plugin for WebServerPlugin {
84 type StartArgs = Runtime;
85 type Instance = RunningPlugin;
86
87 const DEFAULT_NAME: &'static str = "zenoh-plugin-webserver";
88 const PLUGIN_VERSION: &'static str = plugin_version!();
89 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
90
91 fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
92 zenoh::try_init_log_from_env();
93 let runtime_conf = runtime.config().lock();
94 let plugin_conf = runtime_conf
95 .plugin(name)
96 .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
97 let conf: Config = serde_json::from_value(plugin_conf.clone())
98 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
99 WORK_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst);
100 MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst);
101
102 spawn_runtime(run(runtime.clone(), conf));
103
104 Ok(Box::new(WebServerPlugin))
105 }
106}
107
108impl RunningPluginTrait for WebServerPlugin {}
109
110#[cfg(feature = "dynamic_plugin")]
111zenoh_plugin_trait::declare_plugin!(WebServerPlugin);
112
113async fn run(runtime: Runtime, conf: Config) {
114 debug!("WebServer plugin {}", WebServerPlugin::PLUGIN_LONG_VERSION);
115
116 let zenoh = match zenoh::session::init(runtime).await {
117 Ok(session) => Arc::new(session),
118 Err(e) => {
119 tracing::error!("Unable to init zenoh session for WebServer plugin : {}", e);
120 return;
121 }
122 };
123
124 let mut app = Server::with_state(zenoh);
125
126 app.at("").get(handle_request);
127 app.at("*").get(handle_request);
128
129 if let Err(e) = app.listen(conf.http_port).await {
130 tracing::error!("Unable to start http server for WebServer plugin : {}", e);
131 }
132}
133
134async fn handle_request(req: Request<Arc<Session>>) -> tide::Result<Response> {
135 let session = req.state();
136
137 let url = req.url();
139 tracing::debug!("GET on {}", url);
140
141 let path = url.path();
143 let mut selector = String::with_capacity(url.as_str().len());
144 selector.push_str(path.strip_prefix('/').unwrap_or(path));
145
146 if selector.ends_with('/') || selector.is_empty() {
148 selector.push_str(DEFAULT_DIRECTORY_INDEX);
149 }
150 if let Some(q) = url.query() {
151 selector.push('?');
152 selector.push_str(q);
153 }
154
155 if selector.contains('*') {
157 return Ok(bad_request(
158 "The URL must correspond to 1 resource only (i.e. zenoh key expressions not supported)",
159 ));
160 }
161 match Selector::try_from(selector) {
162 Ok(selector) => {
163 if selector.parameters().get("_method") == Some("SUB") {
164 tracing::debug!("Subscribe to {} for Multipart stream", selector.key_expr());
165 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
166 let c_selector = selector.key_expr().clone().into_owned();
167 spawn_runtime(async move {
168 tracing::debug!("Subscribe to {} for Multipart stream", c_selector);
169 let sub = req.state().declare_subscriber(c_selector).await.unwrap();
170 loop {
171 let sample = sub.recv_async().await.unwrap();
172 let mut buf = "--boundary\nContent-Type: ".as_bytes().to_vec();
173 buf.extend_from_slice(sample.encoding().to_string().as_bytes());
174 buf.extend_from_slice("\n\n".as_bytes());
175 buf.extend_from_slice(sample.payload().to_bytes().as_ref());
176
177 match tokio::time::timeout(
178 std::time::Duration::new(10, 0),
179 sender.send(Ok(buf)),
180 )
181 .await
182 {
183 Ok(Ok(_)) => {}
184 Ok(Err(e)) => {
185 tracing::debug!(
186 "Multipart error ({})! Unsubscribe and terminate",
187 e
188 );
189 if let Err(e) = sub.undeclare().await {
190 tracing::error!("Error undeclaring subscriber: {}", e);
191 }
192 break;
193 }
194 Err(_) => {
195 tracing::debug!("Multipart timeout! Unsubscribe and terminate",);
196 if let Err(e) = sub.undeclare().await {
197 tracing::error!("Error undeclaring subscriber: {}", e);
198 }
199 break;
200 }
201 }
202 }
203 });
204
205 let receiver = async_stream::stream! {
206 while let Some(item) = receiver.recv().await {
207 yield item;
208 }
209 };
210 let mut res = Response::new(StatusCode::Ok);
211 res.set_content_type("multipart/x-mixed-replace; boundary=\"boundary\"");
212 res.set_body(tide::Body::from_reader(
213 Box::pin(receiver.into_async_read()),
214 None,
215 ));
216
217 Ok(res)
218 } else {
219 match zenoh_get(session, &selector).await {
220 Ok(Some(sample)) => Ok(response_with_value(sample)),
221 Ok(None) => {
222 let mut new_selector = selector.key_expr().as_str().to_string();
224 new_selector.push('/');
225 new_selector.push_str(DEFAULT_DIRECTORY_INDEX);
226 if let Ok(new_selector) = Selector::try_from(new_selector) {
227 if let Ok(Some(_)) = zenoh_get(session, &new_selector).await {
228 Ok(redirect(&format!("{}/", url.path())))
230 } else {
231 Ok(not_found())
232 }
233 } else {
234 Ok(not_found())
235 }
236 }
237 Err(e) => Ok(internal_error(&e.to_string())),
238 }
239 }
240 }
241 Err(e) => Err(tide::Error::new(
242 tide::StatusCode::BadRequest,
243 anyhow::anyhow!("{}", e),
244 )),
245 }
246}
247
248async fn zenoh_get(session: &Session, selector: &Selector<'_>) -> ZResult<Option<Sample>> {
249 let replies = session.get(selector).await?;
250 match replies.recv_async().await {
251 Ok(reply) => match reply.result() {
252 Ok(sample) => Ok(Some(sample.to_owned())),
253 Err(err) => bail!("Zenoh get on {} returned the error: {:?}", selector, err),
254 },
255 Err(_) => Ok(None),
256 }
257}
258
259fn response_with_value(sample: Sample) -> Response {
260 let mime =
261 Mime::from_str(&sample.encoding().to_string()).unwrap_or_else(|_| DEFAULT_MIME.clone());
262 response_ok(mime, sample.payload())
263}
264
265fn bad_request(body: &str) -> Response {
266 let mut res = Response::new(StatusCode::BadRequest);
267 res.set_content_type(Mime::from_str("text/plain").unwrap());
268 res.set_body(body);
269 res
270}
271
272fn not_found() -> Response {
273 Response::new(StatusCode::NotFound)
274}
275
276fn internal_error(body: &str) -> Response {
277 let mut res = Response::new(StatusCode::InternalServerError);
278 res.set_content_type(Mime::from_str("text/plain").unwrap());
279 res.set_body(body);
280 res
281}
282
283fn redirect(url: &str) -> Response {
284 let mut res = Response::new(StatusCode::MovedPermanently);
285 res.insert_header("Location", url);
286 res
287}
288
289fn response_ok(content_type: Mime, payload: &ZBytes) -> Response {
290 let mut res = Response::new(StatusCode::Ok);
291 res.set_content_type(content_type);
292 res.set_body(payload.to_bytes().as_ref());
293 res
294}