zenoh_plugin_remote_api/
lib.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](../zenoh/index.html)
20
21use std::{
22    collections::HashMap,
23    fs::File,
24    future::Future,
25    io::{self, BufReader, ErrorKind},
26    net::SocketAddr,
27    path::Path,
28    sync::Arc,
29};
30
31use flume::Sender;
32use futures::{future, pin_mut, StreamExt, TryStreamExt};
33use interface::RemoteAPIMsg;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37    net::{TcpListener, TcpStream},
38    select,
39    sync::RwLock,
40    task::JoinHandle,
41};
42use tokio_rustls::{
43    rustls::{
44        self,
45        pki_types::{CertificateDer, PrivateKeyDer},
46    },
47    server::TlsStream,
48    TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use tracing::{debug, error};
52use uhlc::Timestamp;
53use uuid::Uuid;
54use zenoh::{
55    bytes::{Encoding, ZBytes},
56    internal::{
57        plugins::{RunningPluginTrait, ZenohPlugin},
58        runtime::Runtime,
59    },
60    key_expr::{
61        format::{kedefine, keformat},
62        keyexpr, OwnedKeyExpr,
63    },
64    liveliness::LivelinessToken,
65    pubsub::Publisher,
66    query::{Querier, Query},
67    Session,
68};
69use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
70use zenoh_result::{bail, zerror, ZResult};
71
72mod config;
73pub use config::Config;
74
75mod handle_control_message;
76mod handle_data_message;
77mod interface;
78use crate::{
79    handle_control_message::handle_control_message, handle_data_message::handle_data_message,
80};
81
82kedefine!(
83    // Admin space key expressions of plugin's version
84    pub ke_admin_version: "${plugin_status_key:**}/__version__",
85
86    // Admin prefix of this bridge
87    pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
88);
89
90const WORKER_THREAD_NUM: usize = 2;
91const MAX_BLOCK_THREAD_NUM: usize = 50;
92const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
93
94lazy_static::lazy_static! {
95    static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
96    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
97    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
98        .worker_threads(WORKER_THREAD_NUM)
99        .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
100        .enable_all()
101        .build()
102        .expect("Unable to create runtime");
103    static ref KE_ANY_N_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("**") };
104}
105
106// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
107#[derive(Debug)]
108enum AdminRef {
109    Config,
110    Version,
111}
112
113#[inline(always)]
114pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
115where
116    F: Future + Send + 'static,
117    F::Output: Send + 'static,
118{
119    // Check whether able to get the current runtime
120    match tokio::runtime::Handle::try_current() {
121        Ok(rt) => {
122            // Able to get the current runtime (standalone binary), use the current runtime
123            rt.spawn(task)
124        }
125        Err(_) => {
126            // Unable to get the current runtime (dynamic plugins), reuse the global runtime
127            TOKIO_RUNTIME.spawn(task)
128        }
129    }
130}
131
132pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
133    match tokio::runtime::Handle::try_current() {
134        Ok(rt) => rt.spawn(fut),
135        Err(_) => TOKIO_RUNTIME.spawn(fut),
136    }
137}
138
139fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
140    certs(&mut BufReader::new(File::open(path)?)).collect()
141}
142
143fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
144    private_key(&mut BufReader::new(File::open(path)?))
145        .unwrap()
146        .ok_or(io::Error::new(
147            ErrorKind::Other,
148            "No private key found".to_string(),
149        ))
150}
151
152pub struct RemoteApiPlugin;
153
154#[cfg(feature = "dynamic_plugin")]
155zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
156impl ZenohPlugin for RemoteApiPlugin {}
157impl Plugin for RemoteApiPlugin {
158    type StartArgs = Runtime;
159    type Instance = zenoh::internal::plugins::RunningPlugin;
160    const DEFAULT_NAME: &'static str = "remote_api";
161    const PLUGIN_VERSION: &'static str = plugin_version!();
162    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
163
164    fn start(
165        name: &str,
166        runtime: &Self::StartArgs,
167    ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
168        // Try to initiate login.
169        // Required in case of dynamic lib, otherwise no logs.
170        // But cannot be done twice in case of static link.
171        zenoh_util::try_init_log_from_env();
172        tracing::info!("Starting {name}");
173
174        let runtime_conf = runtime.config().lock();
175
176        let plugin_conf = runtime_conf
177            .plugin(name)
178            .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
179
180        let conf: Config = serde_json::from_value(plugin_conf.clone())
181            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
182
183        let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
184            match conf.secure_websocket.clone() {
185                Some(wss_config) => {
186                    tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
187                    let certs = load_certs(Path::new(&wss_config.certificate_path))
188                        .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
189                    tracing::info!(
190                        "Loading Private Key from : {} ...",
191                        wss_config.private_key_path
192                    );
193                    let key = load_key(Path::new(&wss_config.private_key_path))
194                        .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
195                    Some((certs, key))
196                }
197                None => None,
198            };
199
200        let weak_runtime = Runtime::downgrade(runtime);
201        if let Some(runtime) = weak_runtime.upgrade() {
202            spawn_runtime(run(runtime, conf, wss_config));
203
204            Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
205        } else {
206            bail!("Cannot Get Zenoh Instance of Runtime !")
207        }
208    }
209}
210
211pub async fn run(
212    runtime: Runtime,
213    config: Config,
214    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
215) {
216    let hm: HashMap<SocketAddr, RemoteState> = HashMap::new();
217    let state_map = Arc::new(RwLock::new(hm));
218
219    // Return WebServer And State
220    let remote_api_runtime = RemoteAPIRuntime {
221        config: Arc::new(config),
222        wss_certs: opt_certs,
223        zenoh_runtime: runtime,
224        state_map,
225    };
226
227    remote_api_runtime.run().await;
228}
229
230struct RemoteAPIRuntime {
231    config: Arc<Config>,
232    wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
233    zenoh_runtime: Runtime,
234    state_map: StateMap,
235}
236
237impl RemoteAPIRuntime {
238    async fn run(self) {
239        let run_websocket_server = run_websocket_server(
240            &self.config.websocket_port,
241            self.zenoh_runtime.clone(),
242            self.state_map.clone(),
243            self.wss_certs,
244        );
245
246        let config = (*self.config).clone();
247
248        let run_admin_space_queryable =
249            run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
250
251        select!(
252            _ = run_websocket_server => {},
253            _ = run_admin_space_queryable => {},
254        );
255    }
256}
257
258#[derive(Debug, Serialize)]
259struct AdminSpaceClient {
260    uuid: String,
261    remote_address: SocketAddr,
262    publishers: Vec<String>,
263    subscribers: Vec<String>,
264    queryables: Vec<String>,
265}
266
267impl From<(&SocketAddr, &RemoteState)> for AdminSpaceClient {
268    fn from(value: (&SocketAddr, &RemoteState)) -> Self {
269        let remote_state = value.1;
270        let sock_addr = value.0;
271
272        let pub_keyexprs = remote_state
273            .publishers
274            .values()
275            .map(|x| x.key_expr().to_string())
276            .collect::<Vec<String>>();
277
278        let query_keyexprs = remote_state
279            .queryables
280            .values()
281            .map(|(_, key_expr)| key_expr.to_string())
282            .collect::<Vec<String>>();
283
284        let sub_keyexprs = remote_state
285            .subscribers
286            .values()
287            .map(|(_, key_expr)| key_expr.to_string())
288            .collect::<Vec<String>>();
289
290        AdminSpaceClient {
291            uuid: remote_state.session_id.to_string(),
292            remote_address: *sock_addr,
293            publishers: pub_keyexprs,
294            subscribers: sub_keyexprs,
295            queryables: query_keyexprs,
296        }
297    }
298}
299
300async fn run_admin_space_queryable(zenoh_runtime: Runtime, state_map: StateMap, config: Config) {
301    let session = match zenoh::session::init(zenoh_runtime.clone()).await {
302        Ok(session) => session,
303        Err(err) => {
304            tracing::error!("Unable to get Zenoh session from Runtime {err}");
305            return;
306        }
307    };
308
309    let admin_prefix = keformat!(
310        ke_admin_prefix::formatter(),
311        zenoh_id = session.zid().into_keyexpr()
312    )
313    .unwrap();
314
315    let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
316
317    admin_space.insert(
318        &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
319        AdminRef::Config,
320    );
321    admin_space.insert(
322        &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
323        AdminRef::Version,
324    );
325
326    let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
327
328    let admin_queryable = session
329        .declare_queryable(admin_keyexpr_expr)
330        .await
331        .expect("Failed fo create AdminSpace Queryable");
332
333    loop {
334        match admin_queryable.recv_async().await {
335            Ok(query) => {
336                let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
337
338                if query_ke.is_wild() {
339                    if query_ke.contains("clients") {
340                        let read_guard = state_map.read().await;
341                        let mut admin_space_clients = Vec::new();
342                        for (sock, remote_state) in read_guard.iter() {
343                            admin_space_clients.push(AdminSpaceClient::from((sock, remote_state)));
344                        }
345                        send_reply(admin_space_clients, query, query_ke).await;
346                    } else {
347                        for (ke, admin_ref) in admin_space.iter() {
348                            if query_ke.intersects(ke) {
349                                send_admin_reply(&query, ke, admin_ref, &config).await;
350                            }
351                        }
352                    }
353                } else {
354                    let own_ke: OwnedKeyExpr = query_ke.to_owned();
355                    if own_ke.contains("config") {
356                        send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
357                    }
358                    if own_ke.contains("client") {
359                        let mut opt_id = None;
360                        let split = own_ke.split('/');
361                        let mut next_is_id = false;
362                        for elem in split {
363                            if next_is_id {
364                                opt_id = Some(elem);
365                            } else if elem.contains("client") {
366                                next_is_id = true;
367                            }
368                        }
369                        if let Some(id) = opt_id {
370                            let read_guard = state_map.read().await;
371                            for (sock, remote_state) in read_guard.iter() {
372                                if remote_state.session_id.to_string() == id {
373                                    send_reply(
374                                        AdminSpaceClient::from((sock, remote_state)),
375                                        query,
376                                        own_ke,
377                                    )
378                                    .await;
379
380                                    break;
381                                }
382                            }
383                        }
384                    }
385                }
386            }
387            Err(_) => {
388                tracing::warn!("Admin Space queryable was closed!");
389            }
390        }
391    }
392}
393
394async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
395where
396    T: Sized + Serialize,
397{
398    match serde_json::to_string_pretty(&reply) {
399        Ok(json_string) => {
400            if let Err(err) = query
401                .reply(query_ke, json_string)
402                .encoding(Encoding::APPLICATION_JSON)
403                .await
404            {
405                error!("AdminSpace: Reply to Query failed, {}", err);
406            };
407        }
408        Err(_) => {
409            error!("AdminSpace: Could not seralize client data");
410        }
411    };
412}
413
414async fn send_admin_reply(
415    query: &Query,
416    key_expr: &keyexpr,
417    admin_ref: &AdminRef,
418    config: &Config,
419) {
420    let z_bytes: ZBytes = match admin_ref {
421        AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
422            Ok(v) => match serde_json::to_vec(&v) {
423                Ok(value) => ZBytes::from(value),
424                Err(e) => {
425                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
426                    return;
427                }
428            },
429            Err(e) => {
430                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
431                return;
432            }
433        },
434        AdminRef::Config => match serde_json::to_value(config) {
435            Ok(v) => match serde_json::to_vec(&v) {
436                Ok(value) => ZBytes::from(value),
437                Err(e) => {
438                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
439                    return;
440                }
441            },
442            Err(e) => {
443                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
444                return;
445            }
446        },
447    };
448    if let Err(e) = query
449        .reply(key_expr.to_owned(), z_bytes)
450        .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
451        .await
452    {
453        tracing::warn!("Error replying to admin query {:?}: {}", query, e);
454    }
455}
456
457struct RemoteAPIPlugin;
458
459#[allow(dead_code)]
460struct RunningPlugin(RemoteAPIPlugin);
461
462impl PluginControl for RunningPlugin {}
463
464impl RunningPluginTrait for RunningPlugin {
465    fn config_checker(
466        &self,
467        _path: &str,
468        _current: &serde_json::Map<String, serde_json::Value>,
469        _new: &serde_json::Map<String, serde_json::Value>,
470    ) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
471        bail!("Runtime configuration change not supported");
472    }
473}
474
475type StateMap = Arc<RwLock<HashMap<SocketAddr, RemoteState>>>;
476
477struct RemoteState {
478    websocket_tx: Sender<RemoteAPIMsg>,
479    session_id: Uuid,
480    session: Session,
481    // Timestamp
482    timestamps: HashMap<Uuid, Timestamp>,
483    // PubSub
484    subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
485    publishers: HashMap<Uuid, Publisher<'static>>,
486    // Queryable
487    queryables: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
488    unanswered_queries: Arc<std::sync::RwLock<HashMap<Uuid, Query>>>,
489    // Liveliness
490    liveliness_tokens: HashMap<Uuid, LivelinessToken>,
491    liveliness_subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
492    // Querier
493    queriers: HashMap<Uuid, Querier<'static>>,
494}
495
496impl RemoteState {
497    fn new(websocket_tx: Sender<RemoteAPIMsg>, session_id: Uuid, session: Session) -> Self {
498        Self {
499            websocket_tx,
500            session_id,
501            session,
502            timestamps: HashMap::new(),
503            subscribers: HashMap::new(),
504            publishers: HashMap::new(),
505            queryables: HashMap::new(),
506            unanswered_queries: Arc::new(std::sync::RwLock::new(HashMap::new())),
507            liveliness_tokens: HashMap::new(),
508            liveliness_subscribers: HashMap::new(),
509            queriers: HashMap::new(),
510        }
511    }
512
513    async fn cleanup(self) {
514        for (_, publisher) in self.publishers {
515            if let Err(e) = publisher.undeclare().await {
516                error!("{e}")
517            }
518        }
519        for (_, (subscriber, _)) in self.subscribers {
520            subscriber.abort();
521        }
522
523        for (_, (queryable, _)) in self.queryables {
524            queryable.abort();
525        }
526
527        drop(self.unanswered_queries);
528
529        for (_, queryable) in self.liveliness_tokens {
530            if let Err(e) = queryable.undeclare().await {
531                error!("{e}")
532            }
533        }
534
535        for (_, (liveliness_subscriber, _)) in self.liveliness_subscribers {
536            liveliness_subscriber.abort();
537        }
538
539        if let Err(err) = self.session.close().await {
540            error!("{err}")
541        };
542    }
543}
544
545pub trait Streamable:
546    tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
547{
548}
549impl Streamable for TcpStream {}
550impl Streamable for TlsStream<TcpStream> {}
551
552// Listen on the Zenoh Session
553async fn run_websocket_server(
554    ws_port: &String,
555    zenoh_runtime: Runtime,
556    state_map: StateMap,
557    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
558) {
559    let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
560
561    if let Some((certs, key)) = opt_certs {
562        let config = rustls::ServerConfig::builder()
563            .with_no_client_auth()
564            .with_single_cert(certs, key)
565            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
566            .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
567        opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
568    }
569
570    let server: TcpListener = match TcpListener::bind(ws_port).await {
571        Ok(server) => server,
572        Err(err) => {
573            tracing::error!("Unable to start TcpListener {err}");
574            return;
575        }
576    };
577
578    while let Ok((tcp_stream, sock_addr)) = server.accept().await {
579        let state_map = state_map.clone();
580        let zenoh_runtime = zenoh_runtime.clone();
581        let opt_tls_acceptor = opt_tls_acceptor.clone();
582
583        let new_websocket = async move {
584            let sock_adress = Arc::new(sock_addr);
585            let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<RemoteAPIMsg>();
586
587            let mut write_guard = state_map.write().await;
588
589            let session = match zenoh::session::init(zenoh_runtime.clone()).await {
590                Ok(session) => session,
591                Err(err) => {
592                    tracing::error!("Unable to get Zenoh session from Runtime {err}");
593                    return;
594                }
595            };
596            let id = Uuid::new_v4();
597            tracing::debug!("Client {sock_addr:?} -> {id}");
598
599            let state: RemoteState = RemoteState::new(ws_ch_tx.clone(), id, session);
600
601            // if remote state exists in map already. Ignore it and reinitialize
602            let _ = write_guard.insert(sock_addr, state);
603            drop(write_guard);
604
605            let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
606                Some(acceptor) => match acceptor.accept(tcp_stream).await {
607                    Ok(tls_stream) => Box::new(tls_stream),
608                    Err(err) => {
609                        error!("Could not secure TcpStream -> TlsStream {:?}", err);
610                        return;
611                    }
612                },
613                None => Box::new(tcp_stream),
614            };
615
616            let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
617                Ok(ws_stream) => ws_stream,
618                Err(e) => {
619                    error!("Error during the websocket handshake occurred: {}", e);
620                    return;
621                }
622            };
623
624            let (ws_tx, ws_rx) = ws_stream.split();
625
626            let ch_rx_stream = ws_ch_rx
627                .into_stream()
628                .map(|remote_api_msg| {
629                    let val = serde_json::to_string(&remote_api_msg).unwrap(); // This unwrap should be alright
630                    Ok(Message::Text(val))
631                })
632                .forward(ws_tx);
633
634            let sock_adress_cl = sock_adress.clone();
635
636            let state_map_cl_outer = state_map.clone();
637
638            //  Incomming message from Websocket
639            let incoming_ws = tokio::task::spawn(async move {
640                let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
641                let state_map_cl = state_map_cl_outer.clone();
642                let sock_adress_ref = sock_adress_cl.clone();
643                while let Ok(Some(msg)) = non_close_messages.try_next().await {
644                    if let Some(response) =
645                        handle_message(msg, *sock_adress_ref, state_map_cl.clone()).await
646                    {
647                        if let Err(err) = ws_ch_tx.send(response) {
648                            error!("WS Send Error: {err:?}");
649                        };
650                    };
651                }
652            });
653
654            pin_mut!(ch_rx_stream, incoming_ws);
655            future::select(ch_rx_stream, incoming_ws).await;
656
657            // cleanup state
658            if let Some(state) = state_map.write().await.remove(sock_adress.as_ref()) {
659                state.cleanup().await;
660            };
661
662            tracing::info!("Client Disconnected {}", sock_adress.as_ref());
663        };
664
665        spawn_future(new_websocket);
666    }
667}
668
669async fn handle_message(
670    msg: Message,
671    sock_addr: SocketAddr,
672    state_map: StateMap,
673) -> Option<RemoteAPIMsg> {
674    match msg {
675        Message::Text(text) => match serde_json::from_str::<RemoteAPIMsg>(&text) {
676            Ok(msg) => match msg {
677                RemoteAPIMsg::Control(ctrl_msg) => {
678                    match handle_control_message(ctrl_msg, sock_addr, state_map).await {
679                        Ok(_) => return None,
680                        Err(err) => {
681                            tracing::error!(err);
682                        }
683                    }
684                }
685                RemoteAPIMsg::Data(data_msg) => {
686                    if let Err(err) = handle_data_message(data_msg, sock_addr, state_map).await {
687                        tracing::error!(err);
688                    }
689                }
690            },
691            Err(err) => {
692                tracing::error!(
693                    "RemoteAPI: WS Message Cannot be Deserialized to RemoteAPIMsg {}, message: {}",
694                    err,
695                    text
696                );
697            }
698        },
699        _ => {
700            debug!("RemoteAPI: WS Message Not Text");
701        }
702    };
703    None
704}