Skip to main content

zenoh_plugin_remote_api/
lib.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](../zenoh/index.html)
20
21use std::{
22    collections::HashMap,
23    fs::File,
24    future::Future,
25    io::{self, BufReader},
26    net::SocketAddr,
27    path::Path,
28    sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37    net::{TcpListener, TcpStream},
38    select,
39    sync::RwLock,
40    task::JoinHandle,
41};
42use tokio_rustls::{
43    rustls::{
44        self,
45        pki_types::{CertificateDer, PrivateKeyDer},
46    },
47    server::TlsStream,
48    TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53    bytes::{Encoding, ZBytes},
54    internal::{
55        plugins::{RunningPluginTrait, ZenohPlugin},
56        runtime::DynamicRuntime,
57    },
58    key_expr::{
59        format::{kedefine, keformat},
60        keyexpr, OwnedKeyExpr,
61    },
62    query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66use zenoh_util::ffi::JsonKeyValueMap;
67
68mod config;
69pub use config::Config;
70
71use crate::interface::{LivelinessTokenId, PublisherId, QuerierId, QueryableId, SubscriberId};
72
73mod interface;
74
75mod remote_state;
76
77kedefine!(
78    // Admin space key expressions of plugin's version
79    pub ke_admin_version: "${plugin_status_key:**}/__version__",
80
81    // Admin prefix of this bridge
82    pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
83);
84
85const WORKER_THREAD_NUM: usize = 2;
86const MAX_BLOCK_THREAD_NUM: usize = 50;
87
88lazy_static::lazy_static! {
89    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
90    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
91        .worker_threads(WORKER_THREAD_NUM)
92        .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
93        .enable_all()
94        .build()
95        .expect("Unable to create runtime");
96    static ref KE_ANY_N_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("**") };
97}
98
99// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
100#[derive(Debug)]
101enum AdminRef {
102    Config,
103    Version,
104}
105
106#[inline(always)]
107pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
108where
109    F: Future + Send + 'static,
110    F::Output: Send + 'static,
111{
112    // Check whether able to get the current runtime
113    match tokio::runtime::Handle::try_current() {
114        Ok(rt) => {
115            // Able to get the current runtime (standalone binary), use the current runtime
116            rt.spawn(task)
117        }
118        Err(_) => {
119            // Unable to get the current runtime (dynamic plugins), reuse the global runtime
120            TOKIO_RUNTIME.spawn(task)
121        }
122    }
123}
124
125pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
126    match tokio::runtime::Handle::try_current() {
127        Ok(rt) => rt.spawn(fut),
128        Err(_) => TOKIO_RUNTIME.spawn(fut),
129    }
130}
131
132fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
133    certs(&mut BufReader::new(File::open(path)?)).collect()
134}
135
136fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
137    private_key(&mut BufReader::new(File::open(path)?))
138        .unwrap()
139        .ok_or_else(|| io::Error::other("No private key found"))
140}
141
142pub struct RemoteApiPlugin;
143
144#[cfg(feature = "dynamic_plugin")]
145zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
146impl ZenohPlugin for RemoteApiPlugin {}
147impl Plugin for RemoteApiPlugin {
148    type StartArgs = DynamicRuntime;
149    type Instance = zenoh::internal::plugins::RunningPlugin;
150    const DEFAULT_NAME: &'static str = "remote_api";
151    const PLUGIN_VERSION: &'static str = plugin_version!();
152    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
153
154    fn start(
155        name: &str,
156        runtime: &Self::StartArgs,
157    ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
158        // Try to initiate login.
159        // Required in case of dynamic lib, otherwise no logs.
160        // But cannot be done twice in case of static link.
161        zenoh_util::try_init_log_from_env();
162        tracing::info!("Starting {name}");
163
164        let plugin_conf = runtime
165            .get_config()
166            .get_plugin_config(name)
167            .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
168
169        let conf: Config = serde_json::from_value(plugin_conf)
170            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
171
172        let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
173            match conf.secure_websocket.clone() {
174                Some(wss_config) => {
175                    tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
176                    let certs = load_certs(Path::new(&wss_config.certificate_path))
177                        .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
178                    tracing::info!(
179                        "Loading Private Key from : {} ...",
180                        wss_config.private_key_path
181                    );
182                    let key = load_key(Path::new(&wss_config.private_key_path))
183                        .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
184                    Some((certs, key))
185                }
186                None => None,
187            };
188
189        spawn_runtime(run(runtime.clone(), conf, wss_config));
190        Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
191    }
192}
193
194pub async fn run(
195    runtime: DynamicRuntime,
196    config: Config,
197    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
198) {
199    let state_map = Arc::new(RwLock::new(HashMap::new()));
200
201    // Return WebServer And State
202    let remote_api_runtime = RemoteAPIRuntime {
203        config: Arc::new(config),
204        wss_certs: opt_certs,
205        zenoh_runtime: runtime,
206        state_map,
207    };
208
209    remote_api_runtime.run().await;
210}
211
212struct RemoteAPIRuntime {
213    config: Arc<Config>,
214    wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
215    zenoh_runtime: DynamicRuntime,
216    state_map: StateMap,
217}
218
219impl RemoteAPIRuntime {
220    async fn run(self) {
221        let run_websocket_server = run_websocket_server(
222            &self.config.websocket_port,
223            self.zenoh_runtime.clone(),
224            self.state_map.clone(),
225            self.wss_certs,
226        );
227
228        let config = (*self.config).clone();
229
230        let run_admin_space_queryable =
231            run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
232
233        select!(
234            _ = run_websocket_server => {},
235            _ = run_admin_space_queryable => {},
236        );
237    }
238}
239
240#[derive(Debug, Serialize, Clone)]
241pub(crate) struct AdminSpaceClient {
242    uuid: String,
243    remote_address: SocketAddr,
244    publishers: HashMap<PublisherId, String>,
245    subscribers: HashMap<SubscriberId, String>,
246    queryables: HashMap<QueryableId, String>,
247    queriers: HashMap<QuerierId, String>,
248    liveliness_tokens: HashMap<LivelinessTokenId, String>,
249}
250
251impl AdminSpaceClient {
252    pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
253        AdminSpaceClient {
254            uuid,
255            remote_address,
256            publishers: HashMap::new(),
257            subscribers: HashMap::new(),
258            queryables: HashMap::new(),
259            queriers: HashMap::new(),
260            liveliness_tokens: HashMap::new(),
261        }
262    }
263
264    pub(crate) fn register_publisher(&mut self, id: PublisherId, key_expr: &str) {
265        self.publishers.insert(id, key_expr.to_string());
266    }
267
268    pub(crate) fn register_subscriber(&mut self, id: SubscriberId, key_expr: &str) {
269        self.subscribers.insert(id, key_expr.to_string());
270    }
271
272    pub(crate) fn register_queryable(&mut self, id: QueryableId, key_expr: &str) {
273        self.queryables.insert(id, key_expr.to_string());
274    }
275
276    pub(crate) fn register_querier(&mut self, id: QuerierId, key_expr: &str) {
277        self.queriers.insert(id, key_expr.to_string());
278    }
279
280    pub(crate) fn unregister_publisher(&mut self, id: PublisherId) {
281        self.publishers.remove(&id);
282    }
283
284    pub(crate) fn unregister_subscriber(&mut self, id: SubscriberId) {
285        self.subscribers.remove(&id);
286    }
287
288    pub(crate) fn unregister_queryable(&mut self, id: QueryableId) {
289        self.queryables.remove(&id);
290    }
291
292    pub(crate) fn unregister_querier(&mut self, id: QuerierId) {
293        self.queriers.remove(&id);
294    }
295
296    pub(crate) fn id(&self) -> &str {
297        &self.uuid
298    }
299}
300
301async fn run_admin_space_queryable(
302    zenoh_runtime: DynamicRuntime,
303    state_map: StateMap,
304    config: Config,
305) {
306    let session = match zenoh::session::init(zenoh_runtime).await {
307        Ok(session) => session,
308        Err(err) => {
309            tracing::error!("Unable to get Zenoh session from Runtime {err}");
310            return;
311        }
312    };
313
314    let admin_prefix = keformat!(
315        ke_admin_prefix::formatter(),
316        zenoh_id = session.zid().into_keyexpr()
317    )
318    .unwrap();
319
320    let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
321
322    admin_space.insert(
323        &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
324        AdminRef::Config,
325    );
326    admin_space.insert(
327        &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
328        AdminRef::Version,
329    );
330
331    let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
332
333    let admin_queryable = session
334        .declare_queryable(admin_keyexpr_expr)
335        .await
336        .expect("Failed fo create AdminSpace Queryable");
337
338    loop {
339        match admin_queryable.recv_async().await {
340            Ok(query) => {
341                let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
342
343                if query_ke.is_wild() {
344                    if query_ke.contains("clients") {
345                        let read_guard = state_map.read().await;
346                        let admin_space_clients = read_guard
347                            .values()
348                            .map(|v| v.lock().unwrap().clone())
349                            .collect::<Vec<_>>();
350                        drop(read_guard);
351                        send_reply(admin_space_clients, query, query_ke).await;
352                    } else {
353                        for (ke, admin_ref) in admin_space.iter() {
354                            if query_ke.intersects(ke) {
355                                send_admin_reply(&query, ke, admin_ref, &config).await;
356                            }
357                        }
358                    }
359                } else {
360                    let own_ke: OwnedKeyExpr = query_ke.to_owned();
361                    if own_ke.contains("config") {
362                        send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
363                    }
364                    if own_ke.contains("client") {
365                        let mut opt_id = None;
366                        let split = own_ke.split('/');
367                        let mut next_is_id = false;
368                        for elem in split {
369                            if next_is_id {
370                                opt_id = Some(elem);
371                            } else if elem.contains("client") {
372                                next_is_id = true;
373                            }
374                        }
375                        if let Some(id) = opt_id {
376                            let read_guard = state_map.read().await;
377                            if let Some(state) = read_guard.get(id) {
378                                let state = state.lock().unwrap().clone();
379                                drop(read_guard);
380                                send_reply(state, query, own_ke).await;
381                            }
382                        }
383                    }
384                }
385            }
386            Err(_) => {
387                tracing::warn!("Admin Space queryable was closed!");
388            }
389        }
390    }
391}
392
393async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
394where
395    T: Sized + Serialize,
396{
397    match serde_json::to_string_pretty(&reply) {
398        Ok(json_string) => {
399            if let Err(err) = query
400                .reply(query_ke, json_string)
401                .encoding(Encoding::APPLICATION_JSON)
402                .await
403            {
404                tracing::error!("AdminSpace: Reply to Query failed, {}", err);
405            };
406        }
407        Err(_) => {
408            tracing::error!("AdminSpace: Could not seralize client data");
409        }
410    };
411}
412
413async fn send_admin_reply(
414    query: &Query,
415    key_expr: &keyexpr,
416    admin_ref: &AdminRef,
417    config: &Config,
418) {
419    let z_bytes: ZBytes = match admin_ref {
420        AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
421            Ok(v) => match serde_json::to_vec(&v) {
422                Ok(value) => ZBytes::from(value),
423                Err(e) => {
424                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
425                    return;
426                }
427            },
428            Err(e) => {
429                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
430                return;
431            }
432        },
433        AdminRef::Config => match serde_json::to_value(config) {
434            Ok(v) => match serde_json::to_vec(&v) {
435                Ok(value) => ZBytes::from(value),
436                Err(e) => {
437                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
438                    return;
439                }
440            },
441            Err(e) => {
442                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
443                return;
444            }
445        },
446    };
447    if let Err(e) = query
448        .reply(key_expr.to_owned(), z_bytes)
449        .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
450        .await
451    {
452        tracing::warn!("Error replying to admin query {:?}: {}", query, e);
453    }
454}
455
456struct RemoteAPIPlugin;
457
458#[allow(dead_code)]
459struct RunningPlugin(RemoteAPIPlugin);
460
461impl PluginControl for RunningPlugin {}
462
463impl RunningPluginTrait for RunningPlugin {
464    fn config_checker(
465        &self,
466        _path: &str,
467        _current: &JsonKeyValueMap,
468        _new: &JsonKeyValueMap,
469    ) -> ZResult<Option<JsonKeyValueMap>> {
470        bail!("Runtime configuration change not supported");
471    }
472}
473
474type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
475
476pub trait Streamable:
477    tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
478{
479}
480impl Streamable for TcpStream {}
481impl Streamable for TlsStream<TcpStream> {}
482
483// Listen on the Zenoh Session
484async fn run_websocket_server(
485    ws_port: &String,
486    zenoh_runtime: DynamicRuntime,
487    state_map: StateMap,
488    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
489) {
490    let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
491
492    if let Some((certs, key)) = opt_certs {
493        let config = rustls::ServerConfig::builder()
494            .with_no_client_auth()
495            .with_single_cert(certs, key)
496            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
497            .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
498        opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
499    }
500
501    let server: TcpListener = match TcpListener::bind(ws_port).await {
502        Ok(server) => server,
503        Err(err) => {
504            tracing::error!("Unable to start TcpListener {err}");
505            return;
506        }
507    };
508
509    while let Ok((tcp_stream, sock_addr)) = server.accept().await {
510        let zenoh_runtime = zenoh_runtime.clone();
511        let opt_tls_acceptor = opt_tls_acceptor.clone();
512        let state_map2 = state_map.clone();
513        let new_websocket = async move {
514            let sock_adress = Arc::new(sock_addr);
515            let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
516
517            let session = match zenoh::session::init(zenoh_runtime.clone()).await {
518                Ok(session) => session,
519                Err(err) => {
520                    tracing::error!("Unable to get Zenoh session from Runtime {err}");
521                    return;
522                }
523            };
524            let id = Uuid::new_v4();
525            tracing::debug!("Client {sock_addr:?} -> {id}");
526
527            let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
528                Some(acceptor) => match acceptor.accept(tcp_stream).await {
529                    Ok(tls_stream) => Box::new(tls_stream),
530                    Err(err) => {
531                        tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
532                        return;
533                    }
534                },
535                None => Box::new(tcp_stream),
536            };
537
538            let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
539                Ok(ws_stream) => ws_stream,
540                Err(e) => {
541                    tracing::error!("Error during the websocket handshake occurred: {}", e);
542                    return;
543                }
544            };
545
546            let (ws_tx, ws_rx) = ws_stream.split();
547
548            let ch_rx_stream = ws_ch_rx
549                .into_stream()
550                .map(|(out_msg, sequence_id)| {
551                    tracing::trace!("<< Send: {:?} (seq={:?})", out_msg.id(), sequence_id);
552                    Ok(Message::Binary(out_msg.to_wire(sequence_id)))
553                })
554                .forward(ws_tx);
555
556            // send confirmation that session was successfully opened
557            let admin_client =
558                Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
559            state_map2
560                .write()
561                .await
562                .insert(id.to_string(), admin_client.clone());
563
564            let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
565
566            //  Incoming message from Websocket
567            let incoming_ws = tokio::task::spawn(async move {
568                let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
569
570                while let Ok(Some(msg)) = non_close_messages.try_next().await {
571                    if let Some(response) = handle_message(msg, &mut remote_state).await {
572                        if let Err(err) = ws_ch_tx.send(response) {
573                            tracing::error!("WS Send Error: {err:?}");
574                        };
575                    };
576                }
577                remote_state.clear().await;
578            });
579
580            pin_mut!(ch_rx_stream, incoming_ws);
581            future::select(ch_rx_stream, incoming_ws).await;
582
583            // cleanup state
584            state_map2.write().await.remove(&id.to_string());
585
586            tracing::info!("Client Disconnected {}", sock_adress.as_ref());
587        };
588
589        spawn_future(new_websocket);
590    }
591}
592
593async fn handle_message(
594    msg: Message,
595    state: &mut RemoteState,
596) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
597    match msg {
598        Message::Binary(val) => match InRemoteMessage::from_wire(val) {
599            Ok((header, msg)) => {
600                tracing::trace!(
601                    ">> Recv: {:?} (seq={:?})",
602                    header.content_id,
603                    header.sequence_id
604                );
605                match state.handle_message(msg).await {
606                    Ok(Some(msg)) => Some((msg, header.sequence_id)),
607                    Ok(None) => header.sequence_id.map(|_| {
608                        (
609                            OutRemoteMessage::Ok(interface::Ok {
610                                content_id: header.content_id,
611                            }),
612                            header.sequence_id,
613                        )
614                    }),
615                    Err(error) => {
616                        tracing::error!(
617                            "RemoteAPI: Failed to execute request {:?}: {}",
618                            header.content_id,
619                            error
620                        );
621                        header.sequence_id.map(|_| {
622                            // send error response if ack was requested
623                            (
624                                OutRemoteMessage::Error(interface::Error {
625                                    error: error.to_string(),
626                                }),
627                                header.sequence_id,
628                            )
629                        })
630                    }
631                }
632            }
633            Err(err) => match err {
634                interface::FromWireError::HeaderError(error) => {
635                    tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
636                    None
637                }
638                interface::FromWireError::BodyError((header, error)) => {
639                    tracing::error!(
640                        "RemoteAPI: Failed to parse message body for {:?}: {}",
641                        header,
642                        error
643                    );
644                    header.sequence_id.map(|_| {
645                        // send error response if ack was requested
646                        (
647                            OutRemoteMessage::Error(interface::Error {
648                                error: error.to_string(),
649                            }),
650                            header.sequence_id,
651                        )
652                    })
653                }
654            },
655        },
656        _ => {
657            tracing::error!("RemoteAPI: message format is not `Binary`");
658            None
659        }
660    }
661}