zenoh_plugin_webserver/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    future::Future,
16    str::FromStr,
17    sync::{
18        atomic::{AtomicUsize, Ordering},
19        Arc,
20    },
21};
22
23use futures::stream::TryStreamExt;
24use tide::{http::Mime, Request, Response, Server, StatusCode};
25use tokio::task::JoinHandle;
26use tracing::debug;
27use zenoh::{
28    bytes::{Encoding, ZBytes},
29    internal::{
30        bail,
31        plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
32        runtime::Runtime,
33        zerror,
34    },
35    query::Selector,
36    sample::Sample,
37    Result as ZResult, Session,
38};
39use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
40
41mod config;
42use config::Config;
43
44lazy_static::lazy_static! {
45    static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
46    static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
47    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
48    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
49               .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
50               .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
51               .enable_all()
52               .build()
53               .expect("Unable to create runtime");
54}
55#[inline(always)]
56pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
57where
58    F: Future + Send + 'static,
59    F::Output: Send + 'static,
60{
61    // Check whether able to get the current runtime
62    match tokio::runtime::Handle::try_current() {
63        Ok(rt) => {
64            // Able to get the current runtime (standalone binary), spawn on the current runtime
65            rt.spawn(task)
66        }
67        Err(_) => {
68            // Unable to get the current runtime (dynamic plugins), spawn on the global runtime
69            TOKIO_RUNTIME.spawn(task)
70        }
71    }
72}
73
74const DEFAULT_DIRECTORY_INDEX: &str = "index.html";
75
76lazy_static::lazy_static! {
77    static ref DEFAULT_MIME: Mime = Mime::from_str(&Encoding::APPLICATION_OCTET_STREAM.to_string()).unwrap();
78}
79pub struct WebServerPlugin;
80impl PluginControl for WebServerPlugin {}
81impl ZenohPlugin for WebServerPlugin {}
82
83impl Plugin for WebServerPlugin {
84    type StartArgs = Runtime;
85    type Instance = RunningPlugin;
86
87    const DEFAULT_NAME: &'static str = "zenoh-plugin-webserver";
88    const PLUGIN_VERSION: &'static str = plugin_version!();
89    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
90
91    fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
92        zenoh::try_init_log_from_env();
93        let runtime_conf = runtime.config().lock();
94        let plugin_conf = runtime_conf
95            .plugin(name)
96            .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
97        let conf: Config = serde_json::from_value(plugin_conf.clone())
98            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
99        WORK_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst);
100        MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst);
101
102        spawn_runtime(run(runtime.clone(), conf));
103
104        Ok(Box::new(WebServerPlugin))
105    }
106}
107
108impl RunningPluginTrait for WebServerPlugin {}
109
110#[cfg(feature = "dynamic_plugin")]
111zenoh_plugin_trait::declare_plugin!(WebServerPlugin);
112
113async fn run(runtime: Runtime, conf: Config) {
114    debug!("WebServer plugin {}", WebServerPlugin::PLUGIN_LONG_VERSION);
115
116    let zenoh = match zenoh::session::init(runtime).await {
117        Ok(session) => Arc::new(session),
118        Err(e) => {
119            tracing::error!("Unable to init zenoh session for WebServer plugin : {}", e);
120            return;
121        }
122    };
123
124    let mut app = Server::with_state(zenoh);
125
126    app.at("").get(handle_request);
127    app.at("*").get(handle_request);
128
129    if let Err(e) = app.listen(conf.http_port).await {
130        tracing::error!("Unable to start http server for WebServer plugin : {}", e);
131    }
132}
133
134async fn handle_request(req: Request<Arc<Session>>) -> tide::Result<Response> {
135    let session = req.state();
136
137    // Reconstruct Selector from req.url() (no easier way...)
138    let url = req.url();
139    tracing::debug!("GET on {}", url);
140
141    // Build corresponding Selector
142    let path = url.path();
143    let mut selector = String::with_capacity(url.as_str().len());
144    selector.push_str(path.strip_prefix('/').unwrap_or(path));
145
146    // if URL id a directory, append DirectoryIndex
147    if selector.ends_with('/') || selector.is_empty() {
148        selector.push_str(DEFAULT_DIRECTORY_INDEX);
149    }
150    if let Some(q) = url.query() {
151        selector.push('?');
152        selector.push_str(q);
153    }
154
155    // Check if selector's key expression is a single key (i.e. for a single resource)
156    if selector.contains('*') {
157        return Ok(bad_request(
158            "The URL must correspond to 1 resource only (i.e. zenoh key expressions not supported)",
159        ));
160    }
161    match Selector::try_from(selector) {
162        Ok(selector) => {
163            if selector.parameters().get("_method") == Some("SUB") {
164                tracing::debug!("Subscribe to {} for Multipart stream", selector.key_expr());
165                let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
166                let c_selector = selector.key_expr().clone().into_owned();
167                spawn_runtime(async move {
168                    tracing::debug!("Subscribe to {} for Multipart stream", c_selector);
169                    let sub = req.state().declare_subscriber(c_selector).await.unwrap();
170                    loop {
171                        let sample = sub.recv_async().await.unwrap();
172                        let mut buf = "--boundary\nContent-Type: ".as_bytes().to_vec();
173                        buf.extend_from_slice(sample.encoding().to_string().as_bytes());
174                        buf.extend_from_slice("\n\n".as_bytes());
175                        buf.extend_from_slice(sample.payload().to_bytes().as_ref());
176
177                        match tokio::time::timeout(
178                            std::time::Duration::new(10, 0),
179                            sender.send(Ok(buf)),
180                        )
181                        .await
182                        {
183                            Ok(Ok(_)) => {}
184                            Ok(Err(e)) => {
185                                tracing::debug!(
186                                    "Multipart error ({})! Unsubscribe and terminate",
187                                    e
188                                );
189                                if let Err(e) = sub.undeclare().await {
190                                    tracing::error!("Error undeclaring subscriber: {}", e);
191                                }
192                                break;
193                            }
194                            Err(_) => {
195                                tracing::debug!("Multipart timeout! Unsubscribe and terminate",);
196                                if let Err(e) = sub.undeclare().await {
197                                    tracing::error!("Error undeclaring subscriber: {}", e);
198                                }
199                                break;
200                            }
201                        }
202                    }
203                });
204
205                let receiver = async_stream::stream! {
206                      while let Some(item) = receiver.recv().await {
207                          yield item;
208                      }
209                };
210                let mut res = Response::new(StatusCode::Ok);
211                res.set_content_type("multipart/x-mixed-replace; boundary=\"boundary\"");
212                res.set_body(tide::Body::from_reader(
213                    Box::pin(receiver.into_async_read()),
214                    None,
215                ));
216
217                Ok(res)
218            } else {
219                match zenoh_get(session, &selector).await {
220                    Ok(Some(sample)) => Ok(response_with_value(sample)),
221                    Ok(None) => {
222                        // Check if considering the URL as a directory, there is an existing "URL/DirectoryIndex" resource
223                        let mut new_selector = selector.key_expr().as_str().to_string();
224                        new_selector.push('/');
225                        new_selector.push_str(DEFAULT_DIRECTORY_INDEX);
226                        if let Ok(new_selector) = Selector::try_from(new_selector) {
227                            if let Ok(Some(_)) = zenoh_get(session, &new_selector).await {
228                                // In this case, we must reply a redirection to the URL as a directory
229                                Ok(redirect(&format!("{}/", url.path())))
230                            } else {
231                                Ok(not_found())
232                            }
233                        } else {
234                            Ok(not_found())
235                        }
236                    }
237                    Err(e) => Ok(internal_error(&e.to_string())),
238                }
239            }
240        }
241        Err(e) => Err(tide::Error::new(
242            tide::StatusCode::BadRequest,
243            anyhow::anyhow!("{}", e),
244        )),
245    }
246}
247
248async fn zenoh_get(session: &Session, selector: &Selector<'_>) -> ZResult<Option<Sample>> {
249    let replies = session.get(selector).await?;
250    match replies.recv_async().await {
251        Ok(reply) => match reply.result() {
252            Ok(sample) => Ok(Some(sample.to_owned())),
253            Err(err) => bail!("Zenoh get on {} returned the error: {:?}", selector, err),
254        },
255        Err(_) => Ok(None),
256    }
257}
258
259fn response_with_value(sample: Sample) -> Response {
260    let mime =
261        Mime::from_str(&sample.encoding().to_string()).unwrap_or_else(|_| DEFAULT_MIME.clone());
262    response_ok(mime, sample.payload())
263}
264
265fn bad_request(body: &str) -> Response {
266    let mut res = Response::new(StatusCode::BadRequest);
267    res.set_content_type(Mime::from_str("text/plain").unwrap());
268    res.set_body(body);
269    res
270}
271
272fn not_found() -> Response {
273    Response::new(StatusCode::NotFound)
274}
275
276fn internal_error(body: &str) -> Response {
277    let mut res = Response::new(StatusCode::InternalServerError);
278    res.set_content_type(Mime::from_str("text/plain").unwrap());
279    res.set_body(body);
280    res
281}
282
283fn redirect(url: &str) -> Response {
284    let mut res = Response::new(StatusCode::MovedPermanently);
285    res.insert_header("Location", url);
286    res
287}
288
289fn response_ok(content_type: Mime, payload: &ZBytes) -> Response {
290    let mut res = Response::new(StatusCode::Ok);
291    res.set_content_type(content_type);
292    res.set_body(payload.to_bytes().as_ref());
293    res
294}