zenoh_plugin_remote_api/
lib.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](../zenoh/index.html)
20
21use std::{
22    collections::HashMap,
23    fs::File,
24    future::Future,
25    io::{self, BufReader, ErrorKind},
26    net::SocketAddr,
27    path::Path,
28    sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37    net::{TcpListener, TcpStream},
38    select,
39    sync::RwLock,
40    task::JoinHandle,
41};
42use tokio_rustls::{
43    rustls::{
44        self,
45        pki_types::{CertificateDer, PrivateKeyDer},
46    },
47    server::TlsStream,
48    TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53    bytes::{Encoding, ZBytes},
54    internal::{
55        plugins::{RunningPluginTrait, ZenohPlugin},
56        runtime::DynamicRuntime,
57    },
58    key_expr::{
59        format::{kedefine, keformat},
60        keyexpr, OwnedKeyExpr,
61    },
62    query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66use zenoh_util::ffi::JsonKeyValueMap;
67
68mod config;
69pub use config::Config;
70
71use crate::interface::{LivelinessTokenId, PublisherId, QuerierId, QueryableId, SubscriberId};
72
73mod interface;
74
75mod remote_state;
76
77kedefine!(
78    // Admin space key expressions of plugin's version
79    pub ke_admin_version: "${plugin_status_key:**}/__version__",
80
81    // Admin prefix of this bridge
82    pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
83);
84
85const WORKER_THREAD_NUM: usize = 2;
86const MAX_BLOCK_THREAD_NUM: usize = 50;
87const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
88
89lazy_static::lazy_static! {
90    static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
91    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
92    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
93        .worker_threads(WORKER_THREAD_NUM)
94        .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
95        .enable_all()
96        .build()
97        .expect("Unable to create runtime");
98    static ref KE_ANY_N_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("**") };
99}
100
101// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
102#[derive(Debug)]
103enum AdminRef {
104    Config,
105    Version,
106}
107
108#[inline(always)]
109pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
110where
111    F: Future + Send + 'static,
112    F::Output: Send + 'static,
113{
114    // Check whether able to get the current runtime
115    match tokio::runtime::Handle::try_current() {
116        Ok(rt) => {
117            // Able to get the current runtime (standalone binary), use the current runtime
118            rt.spawn(task)
119        }
120        Err(_) => {
121            // Unable to get the current runtime (dynamic plugins), reuse the global runtime
122            TOKIO_RUNTIME.spawn(task)
123        }
124    }
125}
126
127pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
128    match tokio::runtime::Handle::try_current() {
129        Ok(rt) => rt.spawn(fut),
130        Err(_) => TOKIO_RUNTIME.spawn(fut),
131    }
132}
133
134fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
135    certs(&mut BufReader::new(File::open(path)?)).collect()
136}
137
138fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
139    private_key(&mut BufReader::new(File::open(path)?))
140        .unwrap()
141        .ok_or(io::Error::new(
142            ErrorKind::Other,
143            "No private key found".to_string(),
144        ))
145}
146
147pub struct RemoteApiPlugin;
148
149#[cfg(feature = "dynamic_plugin")]
150zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
151impl ZenohPlugin for RemoteApiPlugin {}
152impl Plugin for RemoteApiPlugin {
153    type StartArgs = DynamicRuntime;
154    type Instance = zenoh::internal::plugins::RunningPlugin;
155    const DEFAULT_NAME: &'static str = "remote_api";
156    const PLUGIN_VERSION: &'static str = plugin_version!();
157    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
158
159    fn start(
160        name: &str,
161        runtime: &Self::StartArgs,
162    ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
163        // Try to initiate login.
164        // Required in case of dynamic lib, otherwise no logs.
165        // But cannot be done twice in case of static link.
166        zenoh_util::try_init_log_from_env();
167        tracing::info!("Starting {name}");
168
169        let plugin_conf = runtime
170            .get_config()
171            .get_plugin_config(name)
172            .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
173
174        let conf: Config = serde_json::from_value(plugin_conf)
175            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
176
177        let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
178            match conf.secure_websocket.clone() {
179                Some(wss_config) => {
180                    tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
181                    let certs = load_certs(Path::new(&wss_config.certificate_path))
182                        .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
183                    tracing::info!(
184                        "Loading Private Key from : {} ...",
185                        wss_config.private_key_path
186                    );
187                    let key = load_key(Path::new(&wss_config.private_key_path))
188                        .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
189                    Some((certs, key))
190                }
191                None => None,
192            };
193
194        spawn_runtime(run(runtime.clone(), conf, wss_config));
195        Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
196    }
197}
198
199pub async fn run(
200    runtime: DynamicRuntime,
201    config: Config,
202    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
203) {
204    let state_map = Arc::new(RwLock::new(HashMap::new()));
205
206    // Return WebServer And State
207    let remote_api_runtime = RemoteAPIRuntime {
208        config: Arc::new(config),
209        wss_certs: opt_certs,
210        zenoh_runtime: runtime,
211        state_map,
212    };
213
214    remote_api_runtime.run().await;
215}
216
217struct RemoteAPIRuntime {
218    config: Arc<Config>,
219    wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
220    zenoh_runtime: DynamicRuntime,
221    state_map: StateMap,
222}
223
224impl RemoteAPIRuntime {
225    async fn run(self) {
226        let run_websocket_server = run_websocket_server(
227            &self.config.websocket_port,
228            self.zenoh_runtime.clone(),
229            self.state_map.clone(),
230            self.wss_certs,
231        );
232
233        let config = (*self.config).clone();
234
235        let run_admin_space_queryable =
236            run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
237
238        select!(
239            _ = run_websocket_server => {},
240            _ = run_admin_space_queryable => {},
241        );
242    }
243}
244
245#[derive(Debug, Serialize, Clone)]
246pub(crate) struct AdminSpaceClient {
247    uuid: String,
248    remote_address: SocketAddr,
249    publishers: HashMap<PublisherId, String>,
250    subscribers: HashMap<SubscriberId, String>,
251    queryables: HashMap<QueryableId, String>,
252    queriers: HashMap<QuerierId, String>,
253    liveliness_tokens: HashMap<LivelinessTokenId, String>,
254}
255
256impl AdminSpaceClient {
257    pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
258        AdminSpaceClient {
259            uuid,
260            remote_address,
261            publishers: HashMap::new(),
262            subscribers: HashMap::new(),
263            queryables: HashMap::new(),
264            queriers: HashMap::new(),
265            liveliness_tokens: HashMap::new(),
266        }
267    }
268
269    pub(crate) fn register_publisher(&mut self, id: PublisherId, key_expr: &str) {
270        self.publishers.insert(id, key_expr.to_string());
271    }
272
273    pub(crate) fn register_subscriber(&mut self, id: SubscriberId, key_expr: &str) {
274        self.subscribers.insert(id, key_expr.to_string());
275    }
276
277    pub(crate) fn register_queryable(&mut self, id: QueryableId, key_expr: &str) {
278        self.queryables.insert(id, key_expr.to_string());
279    }
280
281    pub(crate) fn register_querier(&mut self, id: QuerierId, key_expr: &str) {
282        self.queriers.insert(id, key_expr.to_string());
283    }
284
285    pub(crate) fn unregister_publisher(&mut self, id: PublisherId) {
286        self.publishers.remove(&id);
287    }
288
289    pub(crate) fn unregister_subscriber(&mut self, id: SubscriberId) {
290        self.subscribers.remove(&id);
291    }
292
293    pub(crate) fn unregister_queryable(&mut self, id: QueryableId) {
294        self.queryables.remove(&id);
295    }
296
297    pub(crate) fn unregister_querier(&mut self, id: QuerierId) {
298        self.queriers.remove(&id);
299    }
300
301    pub(crate) fn id(&self) -> &str {
302        &self.uuid
303    }
304}
305
306async fn run_admin_space_queryable(
307    zenoh_runtime: DynamicRuntime,
308    state_map: StateMap,
309    config: Config,
310) {
311    let session = match zenoh::session::init(zenoh_runtime).await {
312        Ok(session) => session,
313        Err(err) => {
314            tracing::error!("Unable to get Zenoh session from Runtime {err}");
315            return;
316        }
317    };
318
319    let admin_prefix = keformat!(
320        ke_admin_prefix::formatter(),
321        zenoh_id = session.zid().into_keyexpr()
322    )
323    .unwrap();
324
325    let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
326
327    admin_space.insert(
328        &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
329        AdminRef::Config,
330    );
331    admin_space.insert(
332        &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
333        AdminRef::Version,
334    );
335
336    let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
337
338    let admin_queryable = session
339        .declare_queryable(admin_keyexpr_expr)
340        .await
341        .expect("Failed fo create AdminSpace Queryable");
342
343    loop {
344        match admin_queryable.recv_async().await {
345            Ok(query) => {
346                let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
347
348                if query_ke.is_wild() {
349                    if query_ke.contains("clients") {
350                        let read_guard = state_map.read().await;
351                        let admin_space_clients = read_guard
352                            .values()
353                            .map(|v| v.lock().unwrap().clone())
354                            .collect::<Vec<_>>();
355                        drop(read_guard);
356                        send_reply(admin_space_clients, query, query_ke).await;
357                    } else {
358                        for (ke, admin_ref) in admin_space.iter() {
359                            if query_ke.intersects(ke) {
360                                send_admin_reply(&query, ke, admin_ref, &config).await;
361                            }
362                        }
363                    }
364                } else {
365                    let own_ke: OwnedKeyExpr = query_ke.to_owned();
366                    if own_ke.contains("config") {
367                        send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
368                    }
369                    if own_ke.contains("client") {
370                        let mut opt_id = None;
371                        let split = own_ke.split('/');
372                        let mut next_is_id = false;
373                        for elem in split {
374                            if next_is_id {
375                                opt_id = Some(elem);
376                            } else if elem.contains("client") {
377                                next_is_id = true;
378                            }
379                        }
380                        if let Some(id) = opt_id {
381                            let read_guard = state_map.read().await;
382                            if let Some(state) = read_guard.get(id) {
383                                let state = state.lock().unwrap().clone();
384                                drop(read_guard);
385                                send_reply(state, query, own_ke).await;
386                            }
387                        }
388                    }
389                }
390            }
391            Err(_) => {
392                tracing::warn!("Admin Space queryable was closed!");
393            }
394        }
395    }
396}
397
398async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
399where
400    T: Sized + Serialize,
401{
402    match serde_json::to_string_pretty(&reply) {
403        Ok(json_string) => {
404            if let Err(err) = query
405                .reply(query_ke, json_string)
406                .encoding(Encoding::APPLICATION_JSON)
407                .await
408            {
409                tracing::error!("AdminSpace: Reply to Query failed, {}", err);
410            };
411        }
412        Err(_) => {
413            tracing::error!("AdminSpace: Could not seralize client data");
414        }
415    };
416}
417
418async fn send_admin_reply(
419    query: &Query,
420    key_expr: &keyexpr,
421    admin_ref: &AdminRef,
422    config: &Config,
423) {
424    let z_bytes: ZBytes = match admin_ref {
425        AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
426            Ok(v) => match serde_json::to_vec(&v) {
427                Ok(value) => ZBytes::from(value),
428                Err(e) => {
429                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
430                    return;
431                }
432            },
433            Err(e) => {
434                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
435                return;
436            }
437        },
438        AdminRef::Config => match serde_json::to_value(config) {
439            Ok(v) => match serde_json::to_vec(&v) {
440                Ok(value) => ZBytes::from(value),
441                Err(e) => {
442                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
443                    return;
444                }
445            },
446            Err(e) => {
447                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
448                return;
449            }
450        },
451    };
452    if let Err(e) = query
453        .reply(key_expr.to_owned(), z_bytes)
454        .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
455        .await
456    {
457        tracing::warn!("Error replying to admin query {:?}: {}", query, e);
458    }
459}
460
461struct RemoteAPIPlugin;
462
463#[allow(dead_code)]
464struct RunningPlugin(RemoteAPIPlugin);
465
466impl PluginControl for RunningPlugin {}
467
468impl RunningPluginTrait for RunningPlugin {
469    fn config_checker(
470        &self,
471        _path: &str,
472        _current: &JsonKeyValueMap,
473        _new: &JsonKeyValueMap,
474    ) -> ZResult<Option<JsonKeyValueMap>> {
475        bail!("Runtime configuration change not supported");
476    }
477}
478
479type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
480
481pub trait Streamable:
482    tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
483{
484}
485impl Streamable for TcpStream {}
486impl Streamable for TlsStream<TcpStream> {}
487
488// Listen on the Zenoh Session
489async fn run_websocket_server(
490    ws_port: &String,
491    zenoh_runtime: DynamicRuntime,
492    state_map: StateMap,
493    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
494) {
495    let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
496
497    if let Some((certs, key)) = opt_certs {
498        let config = rustls::ServerConfig::builder()
499            .with_no_client_auth()
500            .with_single_cert(certs, key)
501            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
502            .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
503        opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
504    }
505
506    let server: TcpListener = match TcpListener::bind(ws_port).await {
507        Ok(server) => server,
508        Err(err) => {
509            tracing::error!("Unable to start TcpListener {err}");
510            return;
511        }
512    };
513
514    while let Ok((tcp_stream, sock_addr)) = server.accept().await {
515        let zenoh_runtime = zenoh_runtime.clone();
516        let opt_tls_acceptor = opt_tls_acceptor.clone();
517        let state_map2 = state_map.clone();
518        let new_websocket = async move {
519            let sock_adress = Arc::new(sock_addr);
520            let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
521
522            let session = match zenoh::session::init(zenoh_runtime.clone()).await {
523                Ok(session) => session,
524                Err(err) => {
525                    tracing::error!("Unable to get Zenoh session from Runtime {err}");
526                    return;
527                }
528            };
529            let id = Uuid::new_v4();
530            tracing::debug!("Client {sock_addr:?} -> {id}");
531
532            let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
533                Some(acceptor) => match acceptor.accept(tcp_stream).await {
534                    Ok(tls_stream) => Box::new(tls_stream),
535                    Err(err) => {
536                        tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
537                        return;
538                    }
539                },
540                None => Box::new(tcp_stream),
541            };
542
543            let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
544                Ok(ws_stream) => ws_stream,
545                Err(e) => {
546                    tracing::error!("Error during the websocket handshake occurred: {}", e);
547                    return;
548                }
549            };
550
551            let (ws_tx, ws_rx) = ws_stream.split();
552
553            let ch_rx_stream = ws_ch_rx
554                .into_stream()
555                .map(|(out_msg, sequence_id)| Ok(Message::Binary(out_msg.to_wire(sequence_id))))
556                .forward(ws_tx);
557
558            // send confirmation that session was successfully opened
559            let admin_client =
560                Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
561            state_map2
562                .write()
563                .await
564                .insert(id.to_string(), admin_client.clone());
565
566            let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
567
568            //  Incoming message from Websocket
569            let incoming_ws = tokio::task::spawn(async move {
570                let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
571
572                while let Ok(Some(msg)) = non_close_messages.try_next().await {
573                    if let Some(response) = handle_message(msg, &mut remote_state).await {
574                        if let Err(err) = ws_ch_tx.send(response) {
575                            tracing::error!("WS Send Error: {err:?}");
576                        };
577                    };
578                }
579                remote_state.clear().await;
580            });
581
582            pin_mut!(ch_rx_stream, incoming_ws);
583            future::select(ch_rx_stream, incoming_ws).await;
584
585            // cleanup state
586            state_map2.write().await.remove(&id.to_string());
587
588            tracing::info!("Client Disconnected {}", sock_adress.as_ref());
589        };
590
591        spawn_future(new_websocket);
592    }
593}
594
595async fn handle_message(
596    msg: Message,
597    state: &mut RemoteState,
598) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
599    match msg {
600        Message::Binary(val) => match InRemoteMessage::from_wire(val) {
601            Ok((header, msg)) => {
602                match state.handle_message(msg).await {
603                    Ok(Some(msg)) => Some((msg, header.sequence_id)),
604                    Ok(None) => header.sequence_id.map(|_| {
605                        (
606                            OutRemoteMessage::Ok(interface::Ok {
607                                content_id: header.content_id,
608                            }),
609                            header.sequence_id,
610                        )
611                    }),
612                    Err(error) => {
613                        tracing::error!(
614                            "RemoteAPI: Failed to execute request {:?}: {}",
615                            header.content_id,
616                            error
617                        );
618                        header.sequence_id.map(|_| {
619                            // send error response if ack was requested
620                            (
621                                OutRemoteMessage::Error(interface::Error {
622                                    error: error.to_string(),
623                                }),
624                                header.sequence_id,
625                            )
626                        })
627                    }
628                }
629            }
630            Err(err) => match err {
631                interface::FromWireError::HeaderError(error) => {
632                    tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
633                    None
634                }
635                interface::FromWireError::BodyError((header, error)) => {
636                    tracing::error!(
637                        "RemoteAPI: Failed to parse message body for {:?}: {}",
638                        header,
639                        error
640                    );
641                    header.sequence_id.map(|_| {
642                        // send error response if ack was requested
643                        (
644                            OutRemoteMessage::Error(interface::Error {
645                                error: error.to_string(),
646                            }),
647                            header.sequence_id,
648                        )
649                    })
650                }
651            },
652        },
653        _ => {
654            tracing::error!("RemoteAPI: message format is not `Binary`");
655            None
656        }
657    }
658}