1use std::{
22 collections::HashMap,
23 fs::File,
24 future::Future,
25 io::{self, BufReader, ErrorKind},
26 net::SocketAddr,
27 path::Path,
28 sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37 net::{TcpListener, TcpStream},
38 select,
39 sync::RwLock,
40 task::JoinHandle,
41};
42use tokio_rustls::{
43 rustls::{
44 self,
45 pki_types::{CertificateDer, PrivateKeyDer},
46 },
47 server::TlsStream,
48 TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53 bytes::{Encoding, ZBytes},
54 internal::{
55 plugins::{RunningPluginTrait, ZenohPlugin},
56 runtime::DynamicRuntime,
57 },
58 key_expr::{
59 format::{kedefine, keformat},
60 keyexpr, OwnedKeyExpr,
61 },
62 query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66use zenoh_util::ffi::JsonKeyValueMap;
67
68mod config;
69pub use config::Config;
70
71use crate::interface::{LivelinessTokenId, PublisherId, QuerierId, QueryableId, SubscriberId};
72
73mod interface;
74
75mod remote_state;
76
77kedefine!(
78 pub ke_admin_version: "${plugin_status_key:**}/__version__",
80
81 pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
83);
84
85const WORKER_THREAD_NUM: usize = 2;
86const MAX_BLOCK_THREAD_NUM: usize = 50;
87const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
88
89lazy_static::lazy_static! {
90 static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
91 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
93 .worker_threads(WORKER_THREAD_NUM)
94 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
95 .enable_all()
96 .build()
97 .expect("Unable to create runtime");
98 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
99}
100
101#[derive(Debug)]
103enum AdminRef {
104 Config,
105 Version,
106}
107
108#[inline(always)]
109pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
110where
111 F: Future + Send + 'static,
112 F::Output: Send + 'static,
113{
114 match tokio::runtime::Handle::try_current() {
116 Ok(rt) => {
117 rt.spawn(task)
119 }
120 Err(_) => {
121 TOKIO_RUNTIME.spawn(task)
123 }
124 }
125}
126
127pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
128 match tokio::runtime::Handle::try_current() {
129 Ok(rt) => rt.spawn(fut),
130 Err(_) => TOKIO_RUNTIME.spawn(fut),
131 }
132}
133
134fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
135 certs(&mut BufReader::new(File::open(path)?)).collect()
136}
137
138fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
139 private_key(&mut BufReader::new(File::open(path)?))
140 .unwrap()
141 .ok_or(io::Error::new(
142 ErrorKind::Other,
143 "No private key found".to_string(),
144 ))
145}
146
147pub struct RemoteApiPlugin;
148
149#[cfg(feature = "dynamic_plugin")]
150zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
151impl ZenohPlugin for RemoteApiPlugin {}
152impl Plugin for RemoteApiPlugin {
153 type StartArgs = DynamicRuntime;
154 type Instance = zenoh::internal::plugins::RunningPlugin;
155 const DEFAULT_NAME: &'static str = "remote_api";
156 const PLUGIN_VERSION: &'static str = plugin_version!();
157 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
158
159 fn start(
160 name: &str,
161 runtime: &Self::StartArgs,
162 ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
163 zenoh_util::try_init_log_from_env();
167 tracing::info!("Starting {name}");
168
169 let plugin_conf = runtime
170 .get_config()
171 .get_plugin_config(name)
172 .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
173
174 let conf: Config = serde_json::from_value(plugin_conf)
175 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
176
177 let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
178 match conf.secure_websocket.clone() {
179 Some(wss_config) => {
180 tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
181 let certs = load_certs(Path::new(&wss_config.certificate_path))
182 .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
183 tracing::info!(
184 "Loading Private Key from : {} ...",
185 wss_config.private_key_path
186 );
187 let key = load_key(Path::new(&wss_config.private_key_path))
188 .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
189 Some((certs, key))
190 }
191 None => None,
192 };
193
194 spawn_runtime(run(runtime.clone(), conf, wss_config));
195 Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
196 }
197}
198
199pub async fn run(
200 runtime: DynamicRuntime,
201 config: Config,
202 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
203) {
204 let state_map = Arc::new(RwLock::new(HashMap::new()));
205
206 let remote_api_runtime = RemoteAPIRuntime {
208 config: Arc::new(config),
209 wss_certs: opt_certs,
210 zenoh_runtime: runtime,
211 state_map,
212 };
213
214 remote_api_runtime.run().await;
215}
216
217struct RemoteAPIRuntime {
218 config: Arc<Config>,
219 wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
220 zenoh_runtime: DynamicRuntime,
221 state_map: StateMap,
222}
223
224impl RemoteAPIRuntime {
225 async fn run(self) {
226 let run_websocket_server = run_websocket_server(
227 &self.config.websocket_port,
228 self.zenoh_runtime.clone(),
229 self.state_map.clone(),
230 self.wss_certs,
231 );
232
233 let config = (*self.config).clone();
234
235 let run_admin_space_queryable =
236 run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
237
238 select!(
239 _ = run_websocket_server => {},
240 _ = run_admin_space_queryable => {},
241 );
242 }
243}
244
245#[derive(Debug, Serialize, Clone)]
246pub(crate) struct AdminSpaceClient {
247 uuid: String,
248 remote_address: SocketAddr,
249 publishers: HashMap<PublisherId, String>,
250 subscribers: HashMap<SubscriberId, String>,
251 queryables: HashMap<QueryableId, String>,
252 queriers: HashMap<QuerierId, String>,
253 liveliness_tokens: HashMap<LivelinessTokenId, String>,
254}
255
256impl AdminSpaceClient {
257 pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
258 AdminSpaceClient {
259 uuid,
260 remote_address,
261 publishers: HashMap::new(),
262 subscribers: HashMap::new(),
263 queryables: HashMap::new(),
264 queriers: HashMap::new(),
265 liveliness_tokens: HashMap::new(),
266 }
267 }
268
269 pub(crate) fn register_publisher(&mut self, id: PublisherId, key_expr: &str) {
270 self.publishers.insert(id, key_expr.to_string());
271 }
272
273 pub(crate) fn register_subscriber(&mut self, id: SubscriberId, key_expr: &str) {
274 self.subscribers.insert(id, key_expr.to_string());
275 }
276
277 pub(crate) fn register_queryable(&mut self, id: QueryableId, key_expr: &str) {
278 self.queryables.insert(id, key_expr.to_string());
279 }
280
281 pub(crate) fn register_querier(&mut self, id: QuerierId, key_expr: &str) {
282 self.queriers.insert(id, key_expr.to_string());
283 }
284
285 pub(crate) fn unregister_publisher(&mut self, id: PublisherId) {
286 self.publishers.remove(&id);
287 }
288
289 pub(crate) fn unregister_subscriber(&mut self, id: SubscriberId) {
290 self.subscribers.remove(&id);
291 }
292
293 pub(crate) fn unregister_queryable(&mut self, id: QueryableId) {
294 self.queryables.remove(&id);
295 }
296
297 pub(crate) fn unregister_querier(&mut self, id: QuerierId) {
298 self.queriers.remove(&id);
299 }
300
301 pub(crate) fn id(&self) -> &str {
302 &self.uuid
303 }
304}
305
306async fn run_admin_space_queryable(
307 zenoh_runtime: DynamicRuntime,
308 state_map: StateMap,
309 config: Config,
310) {
311 let session = match zenoh::session::init(zenoh_runtime).await {
312 Ok(session) => session,
313 Err(err) => {
314 tracing::error!("Unable to get Zenoh session from Runtime {err}");
315 return;
316 }
317 };
318
319 let admin_prefix = keformat!(
320 ke_admin_prefix::formatter(),
321 zenoh_id = session.zid().into_keyexpr()
322 )
323 .unwrap();
324
325 let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
326
327 admin_space.insert(
328 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
329 AdminRef::Config,
330 );
331 admin_space.insert(
332 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
333 AdminRef::Version,
334 );
335
336 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
337
338 let admin_queryable = session
339 .declare_queryable(admin_keyexpr_expr)
340 .await
341 .expect("Failed fo create AdminSpace Queryable");
342
343 loop {
344 match admin_queryable.recv_async().await {
345 Ok(query) => {
346 let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
347
348 if query_ke.is_wild() {
349 if query_ke.contains("clients") {
350 let read_guard = state_map.read().await;
351 let admin_space_clients = read_guard
352 .values()
353 .map(|v| v.lock().unwrap().clone())
354 .collect::<Vec<_>>();
355 drop(read_guard);
356 send_reply(admin_space_clients, query, query_ke).await;
357 } else {
358 for (ke, admin_ref) in admin_space.iter() {
359 if query_ke.intersects(ke) {
360 send_admin_reply(&query, ke, admin_ref, &config).await;
361 }
362 }
363 }
364 } else {
365 let own_ke: OwnedKeyExpr = query_ke.to_owned();
366 if own_ke.contains("config") {
367 send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
368 }
369 if own_ke.contains("client") {
370 let mut opt_id = None;
371 let split = own_ke.split('/');
372 let mut next_is_id = false;
373 for elem in split {
374 if next_is_id {
375 opt_id = Some(elem);
376 } else if elem.contains("client") {
377 next_is_id = true;
378 }
379 }
380 if let Some(id) = opt_id {
381 let read_guard = state_map.read().await;
382 if let Some(state) = read_guard.get(id) {
383 let state = state.lock().unwrap().clone();
384 drop(read_guard);
385 send_reply(state, query, own_ke).await;
386 }
387 }
388 }
389 }
390 }
391 Err(_) => {
392 tracing::warn!("Admin Space queryable was closed!");
393 }
394 }
395 }
396}
397
398async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
399where
400 T: Sized + Serialize,
401{
402 match serde_json::to_string_pretty(&reply) {
403 Ok(json_string) => {
404 if let Err(err) = query
405 .reply(query_ke, json_string)
406 .encoding(Encoding::APPLICATION_JSON)
407 .await
408 {
409 tracing::error!("AdminSpace: Reply to Query failed, {}", err);
410 };
411 }
412 Err(_) => {
413 tracing::error!("AdminSpace: Could not seralize client data");
414 }
415 };
416}
417
418async fn send_admin_reply(
419 query: &Query,
420 key_expr: &keyexpr,
421 admin_ref: &AdminRef,
422 config: &Config,
423) {
424 let z_bytes: ZBytes = match admin_ref {
425 AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
426 Ok(v) => match serde_json::to_vec(&v) {
427 Ok(value) => ZBytes::from(value),
428 Err(e) => {
429 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
430 return;
431 }
432 },
433 Err(e) => {
434 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
435 return;
436 }
437 },
438 AdminRef::Config => match serde_json::to_value(config) {
439 Ok(v) => match serde_json::to_vec(&v) {
440 Ok(value) => ZBytes::from(value),
441 Err(e) => {
442 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
443 return;
444 }
445 },
446 Err(e) => {
447 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
448 return;
449 }
450 },
451 };
452 if let Err(e) = query
453 .reply(key_expr.to_owned(), z_bytes)
454 .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
455 .await
456 {
457 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
458 }
459}
460
461struct RemoteAPIPlugin;
462
463#[allow(dead_code)]
464struct RunningPlugin(RemoteAPIPlugin);
465
466impl PluginControl for RunningPlugin {}
467
468impl RunningPluginTrait for RunningPlugin {
469 fn config_checker(
470 &self,
471 _path: &str,
472 _current: &JsonKeyValueMap,
473 _new: &JsonKeyValueMap,
474 ) -> ZResult<Option<JsonKeyValueMap>> {
475 bail!("Runtime configuration change not supported");
476 }
477}
478
479type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
480
481pub trait Streamable:
482 tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
483{
484}
485impl Streamable for TcpStream {}
486impl Streamable for TlsStream<TcpStream> {}
487
488async fn run_websocket_server(
490 ws_port: &String,
491 zenoh_runtime: DynamicRuntime,
492 state_map: StateMap,
493 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
494) {
495 let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
496
497 if let Some((certs, key)) = opt_certs {
498 let config = rustls::ServerConfig::builder()
499 .with_no_client_auth()
500 .with_single_cert(certs, key)
501 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
502 .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
503 opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
504 }
505
506 let server: TcpListener = match TcpListener::bind(ws_port).await {
507 Ok(server) => server,
508 Err(err) => {
509 tracing::error!("Unable to start TcpListener {err}");
510 return;
511 }
512 };
513
514 while let Ok((tcp_stream, sock_addr)) = server.accept().await {
515 let zenoh_runtime = zenoh_runtime.clone();
516 let opt_tls_acceptor = opt_tls_acceptor.clone();
517 let state_map2 = state_map.clone();
518 let new_websocket = async move {
519 let sock_adress = Arc::new(sock_addr);
520 let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
521
522 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
523 Ok(session) => session,
524 Err(err) => {
525 tracing::error!("Unable to get Zenoh session from Runtime {err}");
526 return;
527 }
528 };
529 let id = Uuid::new_v4();
530 tracing::debug!("Client {sock_addr:?} -> {id}");
531
532 let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
533 Some(acceptor) => match acceptor.accept(tcp_stream).await {
534 Ok(tls_stream) => Box::new(tls_stream),
535 Err(err) => {
536 tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
537 return;
538 }
539 },
540 None => Box::new(tcp_stream),
541 };
542
543 let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
544 Ok(ws_stream) => ws_stream,
545 Err(e) => {
546 tracing::error!("Error during the websocket handshake occurred: {}", e);
547 return;
548 }
549 };
550
551 let (ws_tx, ws_rx) = ws_stream.split();
552
553 let ch_rx_stream = ws_ch_rx
554 .into_stream()
555 .map(|(out_msg, sequence_id)| Ok(Message::Binary(out_msg.to_wire(sequence_id))))
556 .forward(ws_tx);
557
558 let admin_client =
560 Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
561 state_map2
562 .write()
563 .await
564 .insert(id.to_string(), admin_client.clone());
565
566 let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
567
568 let incoming_ws = tokio::task::spawn(async move {
570 let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
571
572 while let Ok(Some(msg)) = non_close_messages.try_next().await {
573 if let Some(response) = handle_message(msg, &mut remote_state).await {
574 if let Err(err) = ws_ch_tx.send(response) {
575 tracing::error!("WS Send Error: {err:?}");
576 };
577 };
578 }
579 remote_state.clear().await;
580 });
581
582 pin_mut!(ch_rx_stream, incoming_ws);
583 future::select(ch_rx_stream, incoming_ws).await;
584
585 state_map2.write().await.remove(&id.to_string());
587
588 tracing::info!("Client Disconnected {}", sock_adress.as_ref());
589 };
590
591 spawn_future(new_websocket);
592 }
593}
594
595async fn handle_message(
596 msg: Message,
597 state: &mut RemoteState,
598) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
599 match msg {
600 Message::Binary(val) => match InRemoteMessage::from_wire(val) {
601 Ok((header, msg)) => {
602 match state.handle_message(msg).await {
603 Ok(Some(msg)) => Some((msg, header.sequence_id)),
604 Ok(None) => header.sequence_id.map(|_| {
605 (
606 OutRemoteMessage::Ok(interface::Ok {
607 content_id: header.content_id,
608 }),
609 header.sequence_id,
610 )
611 }),
612 Err(error) => {
613 tracing::error!(
614 "RemoteAPI: Failed to execute request {:?}: {}",
615 header.content_id,
616 error
617 );
618 header.sequence_id.map(|_| {
619 (
621 OutRemoteMessage::Error(interface::Error {
622 error: error.to_string(),
623 }),
624 header.sequence_id,
625 )
626 })
627 }
628 }
629 }
630 Err(err) => match err {
631 interface::FromWireError::HeaderError(error) => {
632 tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
633 None
634 }
635 interface::FromWireError::BodyError((header, error)) => {
636 tracing::error!(
637 "RemoteAPI: Failed to parse message body for {:?}: {}",
638 header,
639 error
640 );
641 header.sequence_id.map(|_| {
642 (
644 OutRemoteMessage::Error(interface::Error {
645 error: error.to_string(),
646 }),
647 header.sequence_id,
648 )
649 })
650 }
651 },
652 },
653 _ => {
654 tracing::error!("RemoteAPI: message format is not `Binary`");
655 None
656 }
657 }
658}