zenoh_plugin_remote_api/
lib.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](../zenoh/index.html)
20
21use std::{
22    collections::HashMap,
23    fs::File,
24    future::Future,
25    io::{self, BufReader, ErrorKind},
26    net::SocketAddr,
27    path::Path,
28    sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37    net::{TcpListener, TcpStream},
38    select,
39    sync::RwLock,
40    task::JoinHandle,
41};
42use tokio_rustls::{
43    rustls::{
44        self,
45        pki_types::{CertificateDer, PrivateKeyDer},
46    },
47    server::TlsStream,
48    TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53    bytes::{Encoding, ZBytes},
54    internal::{
55        plugins::{RunningPluginTrait, ZenohPlugin},
56        runtime::DynamicRuntime,
57    },
58    key_expr::{
59        format::{kedefine, keformat},
60        keyexpr, OwnedKeyExpr,
61    },
62    query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66use zenoh_util::ffi::JsonKeyValueMap;
67
68mod config;
69pub use config::Config;
70
71mod interface;
72
73mod remote_state;
74
75kedefine!(
76    // Admin space key expressions of plugin's version
77    pub ke_admin_version: "${plugin_status_key:**}/__version__",
78
79    // Admin prefix of this bridge
80    pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
81);
82
83const WORKER_THREAD_NUM: usize = 2;
84const MAX_BLOCK_THREAD_NUM: usize = 50;
85const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
86
87lazy_static::lazy_static! {
88    static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
89    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
90    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
91        .worker_threads(WORKER_THREAD_NUM)
92        .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
93        .enable_all()
94        .build()
95        .expect("Unable to create runtime");
96    static ref KE_ANY_N_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("**") };
97}
98
99// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
100#[derive(Debug)]
101enum AdminRef {
102    Config,
103    Version,
104}
105
106#[inline(always)]
107pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
108where
109    F: Future + Send + 'static,
110    F::Output: Send + 'static,
111{
112    // Check whether able to get the current runtime
113    match tokio::runtime::Handle::try_current() {
114        Ok(rt) => {
115            // Able to get the current runtime (standalone binary), use the current runtime
116            rt.spawn(task)
117        }
118        Err(_) => {
119            // Unable to get the current runtime (dynamic plugins), reuse the global runtime
120            TOKIO_RUNTIME.spawn(task)
121        }
122    }
123}
124
125pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
126    match tokio::runtime::Handle::try_current() {
127        Ok(rt) => rt.spawn(fut),
128        Err(_) => TOKIO_RUNTIME.spawn(fut),
129    }
130}
131
132fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
133    certs(&mut BufReader::new(File::open(path)?)).collect()
134}
135
136fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
137    private_key(&mut BufReader::new(File::open(path)?))
138        .unwrap()
139        .ok_or(io::Error::new(
140            ErrorKind::Other,
141            "No private key found".to_string(),
142        ))
143}
144
145pub struct RemoteApiPlugin;
146
147#[cfg(feature = "dynamic_plugin")]
148zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
149impl ZenohPlugin for RemoteApiPlugin {}
150impl Plugin for RemoteApiPlugin {
151    type StartArgs = DynamicRuntime;
152    type Instance = zenoh::internal::plugins::RunningPlugin;
153    const DEFAULT_NAME: &'static str = "remote_api";
154    const PLUGIN_VERSION: &'static str = plugin_version!();
155    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
156
157    fn start(
158        name: &str,
159        runtime: &Self::StartArgs,
160    ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
161        // Try to initiate login.
162        // Required in case of dynamic lib, otherwise no logs.
163        // But cannot be done twice in case of static link.
164        zenoh_util::try_init_log_from_env();
165        tracing::info!("Starting {name}");
166
167        let plugin_conf = runtime
168            .get_config()
169            .get_plugin_config(name)
170            .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
171
172        let conf: Config = serde_json::from_value(plugin_conf)
173            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
174
175        let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
176            match conf.secure_websocket.clone() {
177                Some(wss_config) => {
178                    tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
179                    let certs = load_certs(Path::new(&wss_config.certificate_path))
180                        .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
181                    tracing::info!(
182                        "Loading Private Key from : {} ...",
183                        wss_config.private_key_path
184                    );
185                    let key = load_key(Path::new(&wss_config.private_key_path))
186                        .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
187                    Some((certs, key))
188                }
189                None => None,
190            };
191
192        spawn_runtime(run(runtime.clone(), conf, wss_config));
193        Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
194    }
195}
196
197pub async fn run(
198    runtime: DynamicRuntime,
199    config: Config,
200    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
201) {
202    let state_map = Arc::new(RwLock::new(HashMap::new()));
203
204    // Return WebServer And State
205    let remote_api_runtime = RemoteAPIRuntime {
206        config: Arc::new(config),
207        wss_certs: opt_certs,
208        zenoh_runtime: runtime,
209        state_map,
210    };
211
212    remote_api_runtime.run().await;
213}
214
215struct RemoteAPIRuntime {
216    config: Arc<Config>,
217    wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
218    zenoh_runtime: DynamicRuntime,
219    state_map: StateMap,
220}
221
222impl RemoteAPIRuntime {
223    async fn run(self) {
224        let run_websocket_server = run_websocket_server(
225            &self.config.websocket_port,
226            self.zenoh_runtime.clone(),
227            self.state_map.clone(),
228            self.wss_certs,
229        );
230
231        let config = (*self.config).clone();
232
233        let run_admin_space_queryable =
234            run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
235
236        select!(
237            _ = run_websocket_server => {},
238            _ = run_admin_space_queryable => {},
239        );
240    }
241}
242
243#[derive(Debug, Serialize, Clone)]
244pub(crate) struct AdminSpaceClient {
245    uuid: String,
246    remote_address: SocketAddr,
247    publishers: HashMap<u32, String>,
248    subscribers: HashMap<u32, String>,
249    queryables: HashMap<u32, String>,
250    queriers: HashMap<u32, String>,
251    liveliness_tokens: HashMap<u32, String>,
252}
253
254impl AdminSpaceClient {
255    pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
256        AdminSpaceClient {
257            uuid,
258            remote_address,
259            publishers: HashMap::new(),
260            subscribers: HashMap::new(),
261            queryables: HashMap::new(),
262            queriers: HashMap::new(),
263            liveliness_tokens: HashMap::new(),
264        }
265    }
266
267    pub(crate) fn register_publisher(&mut self, id: u32, key_expr: &str) {
268        self.publishers.insert(id, key_expr.to_string());
269    }
270
271    pub(crate) fn register_subscriber(&mut self, id: u32, key_expr: &str) {
272        self.subscribers.insert(id, key_expr.to_string());
273    }
274
275    pub(crate) fn register_queryable(&mut self, id: u32, key_expr: &str) {
276        self.queryables.insert(id, key_expr.to_string());
277    }
278
279    pub(crate) fn register_querier(&mut self, id: u32, key_expr: &str) {
280        self.queriers.insert(id, key_expr.to_string());
281    }
282
283    pub(crate) fn unregister_publisher(&mut self, id: u32) {
284        self.publishers.remove(&id);
285    }
286
287    pub(crate) fn unregister_subscriber(&mut self, id: u32) {
288        self.subscribers.remove(&id);
289    }
290
291    pub(crate) fn unregister_queryable(&mut self, id: u32) {
292        self.queryables.remove(&id);
293    }
294
295    pub(crate) fn unregister_querier(&mut self, id: u32) {
296        self.queriers.remove(&id);
297    }
298
299    pub(crate) fn id(&self) -> &str {
300        &self.uuid
301    }
302}
303
304async fn run_admin_space_queryable(
305    zenoh_runtime: DynamicRuntime,
306    state_map: StateMap,
307    config: Config,
308) {
309    let session = match zenoh::session::init(zenoh_runtime).await {
310        Ok(session) => session,
311        Err(err) => {
312            tracing::error!("Unable to get Zenoh session from Runtime {err}");
313            return;
314        }
315    };
316
317    let admin_prefix = keformat!(
318        ke_admin_prefix::formatter(),
319        zenoh_id = session.zid().into_keyexpr()
320    )
321    .unwrap();
322
323    let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
324
325    admin_space.insert(
326        &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
327        AdminRef::Config,
328    );
329    admin_space.insert(
330        &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
331        AdminRef::Version,
332    );
333
334    let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
335
336    let admin_queryable = session
337        .declare_queryable(admin_keyexpr_expr)
338        .await
339        .expect("Failed fo create AdminSpace Queryable");
340
341    loop {
342        match admin_queryable.recv_async().await {
343            Ok(query) => {
344                let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
345
346                if query_ke.is_wild() {
347                    if query_ke.contains("clients") {
348                        let read_guard = state_map.read().await;
349                        let admin_space_clients = read_guard
350                            .values()
351                            .map(|v| v.lock().unwrap().clone())
352                            .collect::<Vec<_>>();
353                        drop(read_guard);
354                        send_reply(admin_space_clients, query, query_ke).await;
355                    } else {
356                        for (ke, admin_ref) in admin_space.iter() {
357                            if query_ke.intersects(ke) {
358                                send_admin_reply(&query, ke, admin_ref, &config).await;
359                            }
360                        }
361                    }
362                } else {
363                    let own_ke: OwnedKeyExpr = query_ke.to_owned();
364                    if own_ke.contains("config") {
365                        send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
366                    }
367                    if own_ke.contains("client") {
368                        let mut opt_id = None;
369                        let split = own_ke.split('/');
370                        let mut next_is_id = false;
371                        for elem in split {
372                            if next_is_id {
373                                opt_id = Some(elem);
374                            } else if elem.contains("client") {
375                                next_is_id = true;
376                            }
377                        }
378                        if let Some(id) = opt_id {
379                            let read_guard = state_map.read().await;
380                            if let Some(state) = read_guard.get(id) {
381                                let state = state.lock().unwrap().clone();
382                                drop(read_guard);
383                                send_reply(state, query, own_ke).await;
384                            }
385                        }
386                    }
387                }
388            }
389            Err(_) => {
390                tracing::warn!("Admin Space queryable was closed!");
391            }
392        }
393    }
394}
395
396async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
397where
398    T: Sized + Serialize,
399{
400    match serde_json::to_string_pretty(&reply) {
401        Ok(json_string) => {
402            if let Err(err) = query
403                .reply(query_ke, json_string)
404                .encoding(Encoding::APPLICATION_JSON)
405                .await
406            {
407                tracing::error!("AdminSpace: Reply to Query failed, {}", err);
408            };
409        }
410        Err(_) => {
411            tracing::error!("AdminSpace: Could not seralize client data");
412        }
413    };
414}
415
416async fn send_admin_reply(
417    query: &Query,
418    key_expr: &keyexpr,
419    admin_ref: &AdminRef,
420    config: &Config,
421) {
422    let z_bytes: ZBytes = match admin_ref {
423        AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
424            Ok(v) => match serde_json::to_vec(&v) {
425                Ok(value) => ZBytes::from(value),
426                Err(e) => {
427                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
428                    return;
429                }
430            },
431            Err(e) => {
432                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
433                return;
434            }
435        },
436        AdminRef::Config => match serde_json::to_value(config) {
437            Ok(v) => match serde_json::to_vec(&v) {
438                Ok(value) => ZBytes::from(value),
439                Err(e) => {
440                    tracing::warn!("Error transforming JSON to ZBytes: {}", e);
441                    return;
442                }
443            },
444            Err(e) => {
445                tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
446                return;
447            }
448        },
449    };
450    if let Err(e) = query
451        .reply(key_expr.to_owned(), z_bytes)
452        .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
453        .await
454    {
455        tracing::warn!("Error replying to admin query {:?}: {}", query, e);
456    }
457}
458
459struct RemoteAPIPlugin;
460
461#[allow(dead_code)]
462struct RunningPlugin(RemoteAPIPlugin);
463
464impl PluginControl for RunningPlugin {}
465
466impl RunningPluginTrait for RunningPlugin {
467    fn config_checker(
468        &self,
469        _path: &str,
470        _current: &JsonKeyValueMap,
471        _new: &JsonKeyValueMap,
472    ) -> ZResult<Option<JsonKeyValueMap>> {
473        bail!("Runtime configuration change not supported");
474    }
475}
476
477type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
478
479pub trait Streamable:
480    tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
481{
482}
483impl Streamable for TcpStream {}
484impl Streamable for TlsStream<TcpStream> {}
485
486// Listen on the Zenoh Session
487async fn run_websocket_server(
488    ws_port: &String,
489    zenoh_runtime: DynamicRuntime,
490    state_map: StateMap,
491    opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
492) {
493    let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
494
495    if let Some((certs, key)) = opt_certs {
496        let config = rustls::ServerConfig::builder()
497            .with_no_client_auth()
498            .with_single_cert(certs, key)
499            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
500            .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
501        opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
502    }
503
504    let server: TcpListener = match TcpListener::bind(ws_port).await {
505        Ok(server) => server,
506        Err(err) => {
507            tracing::error!("Unable to start TcpListener {err}");
508            return;
509        }
510    };
511
512    while let Ok((tcp_stream, sock_addr)) = server.accept().await {
513        let zenoh_runtime = zenoh_runtime.clone();
514        let opt_tls_acceptor = opt_tls_acceptor.clone();
515        let state_map2 = state_map.clone();
516        let new_websocket = async move {
517            let sock_adress = Arc::new(sock_addr);
518            let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
519
520            let session = match zenoh::session::init(zenoh_runtime.clone()).await {
521                Ok(session) => session,
522                Err(err) => {
523                    tracing::error!("Unable to get Zenoh session from Runtime {err}");
524                    return;
525                }
526            };
527            let id = Uuid::new_v4();
528            tracing::debug!("Client {sock_addr:?} -> {id}");
529
530            let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
531                Some(acceptor) => match acceptor.accept(tcp_stream).await {
532                    Ok(tls_stream) => Box::new(tls_stream),
533                    Err(err) => {
534                        tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
535                        return;
536                    }
537                },
538                None => Box::new(tcp_stream),
539            };
540
541            let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
542                Ok(ws_stream) => ws_stream,
543                Err(e) => {
544                    tracing::error!("Error during the websocket handshake occurred: {}", e);
545                    return;
546                }
547            };
548
549            let (ws_tx, ws_rx) = ws_stream.split();
550
551            let ch_rx_stream = ws_ch_rx
552                .into_stream()
553                .map(|(out_msg, sequence_id)| Ok(Message::Binary(out_msg.to_wire(sequence_id))))
554                .forward(ws_tx);
555
556            // send confirmation that session was successfully opened
557            let admin_client =
558                Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
559            state_map2
560                .write()
561                .await
562                .insert(id.to_string(), admin_client.clone());
563
564            let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
565
566            //  Incoming message from Websocket
567            let incoming_ws = tokio::task::spawn(async move {
568                let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
569
570                while let Ok(Some(msg)) = non_close_messages.try_next().await {
571                    if let Some(response) = handle_message(msg, &mut remote_state).await {
572                        if let Err(err) = ws_ch_tx.send(response) {
573                            tracing::error!("WS Send Error: {err:?}");
574                        };
575                    };
576                }
577                remote_state.clear().await;
578            });
579
580            pin_mut!(ch_rx_stream, incoming_ws);
581            future::select(ch_rx_stream, incoming_ws).await;
582
583            // cleanup state
584            state_map2.write().await.remove(&id.to_string());
585
586            tracing::info!("Client Disconnected {}", sock_adress.as_ref());
587        };
588
589        spawn_future(new_websocket);
590    }
591}
592
593async fn handle_message(
594    msg: Message,
595    state: &mut RemoteState,
596) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
597    match msg {
598        Message::Binary(val) => match InRemoteMessage::from_wire(val) {
599            Ok((header, msg)) => {
600                match state.handle_message(msg).await {
601                    Ok(Some(msg)) => Some((msg, header.sequence_id)),
602                    Ok(None) => header.sequence_id.map(|_| {
603                        (
604                            OutRemoteMessage::Ok(interface::Ok {
605                                content_id: header.content_id,
606                            }),
607                            header.sequence_id,
608                        )
609                    }),
610                    Err(error) => {
611                        tracing::error!(
612                            "RemoteAPI: Failed to execute request {:?}: {}",
613                            header.content_id,
614                            error
615                        );
616                        header.sequence_id.map(|_| {
617                            // send error response if ack was requested
618                            (
619                                OutRemoteMessage::Error(interface::Error {
620                                    error: error.to_string(),
621                                }),
622                                header.sequence_id,
623                            )
624                        })
625                    }
626                }
627            }
628            Err(err) => match err {
629                interface::FromWireError::HeaderError(error) => {
630                    tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
631                    None
632                }
633                interface::FromWireError::BodyError((header, error)) => {
634                    tracing::error!(
635                        "RemoteAPI: Failed to parse message body for {:?}: {}",
636                        header,
637                        error
638                    );
639                    header.sequence_id.map(|_| {
640                        // send error response if ack was requested
641                        (
642                            OutRemoteMessage::Error(interface::Error {
643                                error: error.to_string(),
644                            }),
645                            header.sequence_id,
646                        )
647                    })
648                }
649            },
650        },
651        _ => {
652            tracing::error!("RemoteAPI: message format is not `Binary`");
653            None
654        }
655    }
656}