1use std::{
22 collections::HashMap,
23 fs::File,
24 future::Future,
25 io::{self, BufReader, ErrorKind},
26 net::SocketAddr,
27 path::Path,
28 sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37 net::{TcpListener, TcpStream},
38 select,
39 sync::RwLock,
40 task::JoinHandle,
41};
42use tokio_rustls::{
43 rustls::{
44 self,
45 pki_types::{CertificateDer, PrivateKeyDer},
46 },
47 server::TlsStream,
48 TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53 bytes::{Encoding, ZBytes},
54 internal::{
55 plugins::{RunningPluginTrait, ZenohPlugin},
56 runtime::DynamicRuntime,
57 },
58 key_expr::{
59 format::{kedefine, keformat},
60 keyexpr, OwnedKeyExpr,
61 },
62 query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66use zenoh_util::ffi::JsonKeyValueMap;
67
68mod config;
69pub use config::Config;
70
71mod interface;
72
73mod remote_state;
74
75kedefine!(
76 pub ke_admin_version: "${plugin_status_key:**}/__version__",
78
79 pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
81);
82
83const WORKER_THREAD_NUM: usize = 2;
84const MAX_BLOCK_THREAD_NUM: usize = 50;
85const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
86
87lazy_static::lazy_static! {
88 static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
89 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
91 .worker_threads(WORKER_THREAD_NUM)
92 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
93 .enable_all()
94 .build()
95 .expect("Unable to create runtime");
96 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
97}
98
99#[derive(Debug)]
101enum AdminRef {
102 Config,
103 Version,
104}
105
106#[inline(always)]
107pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
108where
109 F: Future + Send + 'static,
110 F::Output: Send + 'static,
111{
112 match tokio::runtime::Handle::try_current() {
114 Ok(rt) => {
115 rt.spawn(task)
117 }
118 Err(_) => {
119 TOKIO_RUNTIME.spawn(task)
121 }
122 }
123}
124
125pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
126 match tokio::runtime::Handle::try_current() {
127 Ok(rt) => rt.spawn(fut),
128 Err(_) => TOKIO_RUNTIME.spawn(fut),
129 }
130}
131
132fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
133 certs(&mut BufReader::new(File::open(path)?)).collect()
134}
135
136fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
137 private_key(&mut BufReader::new(File::open(path)?))
138 .unwrap()
139 .ok_or(io::Error::new(
140 ErrorKind::Other,
141 "No private key found".to_string(),
142 ))
143}
144
145pub struct RemoteApiPlugin;
146
147#[cfg(feature = "dynamic_plugin")]
148zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
149impl ZenohPlugin for RemoteApiPlugin {}
150impl Plugin for RemoteApiPlugin {
151 type StartArgs = DynamicRuntime;
152 type Instance = zenoh::internal::plugins::RunningPlugin;
153 const DEFAULT_NAME: &'static str = "remote_api";
154 const PLUGIN_VERSION: &'static str = plugin_version!();
155 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
156
157 fn start(
158 name: &str,
159 runtime: &Self::StartArgs,
160 ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
161 zenoh_util::try_init_log_from_env();
165 tracing::info!("Starting {name}");
166
167 let plugin_conf = runtime
168 .get_config()
169 .get_plugin_config(name)
170 .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
171
172 let conf: Config = serde_json::from_value(plugin_conf)
173 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
174
175 let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
176 match conf.secure_websocket.clone() {
177 Some(wss_config) => {
178 tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
179 let certs = load_certs(Path::new(&wss_config.certificate_path))
180 .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
181 tracing::info!(
182 "Loading Private Key from : {} ...",
183 wss_config.private_key_path
184 );
185 let key = load_key(Path::new(&wss_config.private_key_path))
186 .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
187 Some((certs, key))
188 }
189 None => None,
190 };
191
192 spawn_runtime(run(runtime.clone(), conf, wss_config));
193 Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
194 }
195}
196
197pub async fn run(
198 runtime: DynamicRuntime,
199 config: Config,
200 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
201) {
202 let state_map = Arc::new(RwLock::new(HashMap::new()));
203
204 let remote_api_runtime = RemoteAPIRuntime {
206 config: Arc::new(config),
207 wss_certs: opt_certs,
208 zenoh_runtime: runtime,
209 state_map,
210 };
211
212 remote_api_runtime.run().await;
213}
214
215struct RemoteAPIRuntime {
216 config: Arc<Config>,
217 wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
218 zenoh_runtime: DynamicRuntime,
219 state_map: StateMap,
220}
221
222impl RemoteAPIRuntime {
223 async fn run(self) {
224 let run_websocket_server = run_websocket_server(
225 &self.config.websocket_port,
226 self.zenoh_runtime.clone(),
227 self.state_map.clone(),
228 self.wss_certs,
229 );
230
231 let config = (*self.config).clone();
232
233 let run_admin_space_queryable =
234 run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
235
236 select!(
237 _ = run_websocket_server => {},
238 _ = run_admin_space_queryable => {},
239 );
240 }
241}
242
243#[derive(Debug, Serialize, Clone)]
244pub(crate) struct AdminSpaceClient {
245 uuid: String,
246 remote_address: SocketAddr,
247 publishers: HashMap<u32, String>,
248 subscribers: HashMap<u32, String>,
249 queryables: HashMap<u32, String>,
250 queriers: HashMap<u32, String>,
251 liveliness_tokens: HashMap<u32, String>,
252}
253
254impl AdminSpaceClient {
255 pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
256 AdminSpaceClient {
257 uuid,
258 remote_address,
259 publishers: HashMap::new(),
260 subscribers: HashMap::new(),
261 queryables: HashMap::new(),
262 queriers: HashMap::new(),
263 liveliness_tokens: HashMap::new(),
264 }
265 }
266
267 pub(crate) fn register_publisher(&mut self, id: u32, key_expr: &str) {
268 self.publishers.insert(id, key_expr.to_string());
269 }
270
271 pub(crate) fn register_subscriber(&mut self, id: u32, key_expr: &str) {
272 self.subscribers.insert(id, key_expr.to_string());
273 }
274
275 pub(crate) fn register_queryable(&mut self, id: u32, key_expr: &str) {
276 self.queryables.insert(id, key_expr.to_string());
277 }
278
279 pub(crate) fn register_querier(&mut self, id: u32, key_expr: &str) {
280 self.queriers.insert(id, key_expr.to_string());
281 }
282
283 pub(crate) fn unregister_publisher(&mut self, id: u32) {
284 self.publishers.remove(&id);
285 }
286
287 pub(crate) fn unregister_subscriber(&mut self, id: u32) {
288 self.subscribers.remove(&id);
289 }
290
291 pub(crate) fn unregister_queryable(&mut self, id: u32) {
292 self.queryables.remove(&id);
293 }
294
295 pub(crate) fn unregister_querier(&mut self, id: u32) {
296 self.queriers.remove(&id);
297 }
298
299 pub(crate) fn id(&self) -> &str {
300 &self.uuid
301 }
302}
303
304async fn run_admin_space_queryable(
305 zenoh_runtime: DynamicRuntime,
306 state_map: StateMap,
307 config: Config,
308) {
309 let session = match zenoh::session::init(zenoh_runtime).await {
310 Ok(session) => session,
311 Err(err) => {
312 tracing::error!("Unable to get Zenoh session from Runtime {err}");
313 return;
314 }
315 };
316
317 let admin_prefix = keformat!(
318 ke_admin_prefix::formatter(),
319 zenoh_id = session.zid().into_keyexpr()
320 )
321 .unwrap();
322
323 let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
324
325 admin_space.insert(
326 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
327 AdminRef::Config,
328 );
329 admin_space.insert(
330 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
331 AdminRef::Version,
332 );
333
334 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
335
336 let admin_queryable = session
337 .declare_queryable(admin_keyexpr_expr)
338 .await
339 .expect("Failed fo create AdminSpace Queryable");
340
341 loop {
342 match admin_queryable.recv_async().await {
343 Ok(query) => {
344 let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
345
346 if query_ke.is_wild() {
347 if query_ke.contains("clients") {
348 let read_guard = state_map.read().await;
349 let admin_space_clients = read_guard
350 .values()
351 .map(|v| v.lock().unwrap().clone())
352 .collect::<Vec<_>>();
353 drop(read_guard);
354 send_reply(admin_space_clients, query, query_ke).await;
355 } else {
356 for (ke, admin_ref) in admin_space.iter() {
357 if query_ke.intersects(ke) {
358 send_admin_reply(&query, ke, admin_ref, &config).await;
359 }
360 }
361 }
362 } else {
363 let own_ke: OwnedKeyExpr = query_ke.to_owned();
364 if own_ke.contains("config") {
365 send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
366 }
367 if own_ke.contains("client") {
368 let mut opt_id = None;
369 let split = own_ke.split('/');
370 let mut next_is_id = false;
371 for elem in split {
372 if next_is_id {
373 opt_id = Some(elem);
374 } else if elem.contains("client") {
375 next_is_id = true;
376 }
377 }
378 if let Some(id) = opt_id {
379 let read_guard = state_map.read().await;
380 if let Some(state) = read_guard.get(id) {
381 let state = state.lock().unwrap().clone();
382 drop(read_guard);
383 send_reply(state, query, own_ke).await;
384 }
385 }
386 }
387 }
388 }
389 Err(_) => {
390 tracing::warn!("Admin Space queryable was closed!");
391 }
392 }
393 }
394}
395
396async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
397where
398 T: Sized + Serialize,
399{
400 match serde_json::to_string_pretty(&reply) {
401 Ok(json_string) => {
402 if let Err(err) = query
403 .reply(query_ke, json_string)
404 .encoding(Encoding::APPLICATION_JSON)
405 .await
406 {
407 tracing::error!("AdminSpace: Reply to Query failed, {}", err);
408 };
409 }
410 Err(_) => {
411 tracing::error!("AdminSpace: Could not seralize client data");
412 }
413 };
414}
415
416async fn send_admin_reply(
417 query: &Query,
418 key_expr: &keyexpr,
419 admin_ref: &AdminRef,
420 config: &Config,
421) {
422 let z_bytes: ZBytes = match admin_ref {
423 AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
424 Ok(v) => match serde_json::to_vec(&v) {
425 Ok(value) => ZBytes::from(value),
426 Err(e) => {
427 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
428 return;
429 }
430 },
431 Err(e) => {
432 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
433 return;
434 }
435 },
436 AdminRef::Config => match serde_json::to_value(config) {
437 Ok(v) => match serde_json::to_vec(&v) {
438 Ok(value) => ZBytes::from(value),
439 Err(e) => {
440 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
441 return;
442 }
443 },
444 Err(e) => {
445 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
446 return;
447 }
448 },
449 };
450 if let Err(e) = query
451 .reply(key_expr.to_owned(), z_bytes)
452 .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
453 .await
454 {
455 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
456 }
457}
458
459struct RemoteAPIPlugin;
460
461#[allow(dead_code)]
462struct RunningPlugin(RemoteAPIPlugin);
463
464impl PluginControl for RunningPlugin {}
465
466impl RunningPluginTrait for RunningPlugin {
467 fn config_checker(
468 &self,
469 _path: &str,
470 _current: &JsonKeyValueMap,
471 _new: &JsonKeyValueMap,
472 ) -> ZResult<Option<JsonKeyValueMap>> {
473 bail!("Runtime configuration change not supported");
474 }
475}
476
477type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
478
479pub trait Streamable:
480 tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
481{
482}
483impl Streamable for TcpStream {}
484impl Streamable for TlsStream<TcpStream> {}
485
486async fn run_websocket_server(
488 ws_port: &String,
489 zenoh_runtime: DynamicRuntime,
490 state_map: StateMap,
491 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
492) {
493 let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
494
495 if let Some((certs, key)) = opt_certs {
496 let config = rustls::ServerConfig::builder()
497 .with_no_client_auth()
498 .with_single_cert(certs, key)
499 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
500 .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
501 opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
502 }
503
504 let server: TcpListener = match TcpListener::bind(ws_port).await {
505 Ok(server) => server,
506 Err(err) => {
507 tracing::error!("Unable to start TcpListener {err}");
508 return;
509 }
510 };
511
512 while let Ok((tcp_stream, sock_addr)) = server.accept().await {
513 let zenoh_runtime = zenoh_runtime.clone();
514 let opt_tls_acceptor = opt_tls_acceptor.clone();
515 let state_map2 = state_map.clone();
516 let new_websocket = async move {
517 let sock_adress = Arc::new(sock_addr);
518 let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
519
520 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
521 Ok(session) => session,
522 Err(err) => {
523 tracing::error!("Unable to get Zenoh session from Runtime {err}");
524 return;
525 }
526 };
527 let id = Uuid::new_v4();
528 tracing::debug!("Client {sock_addr:?} -> {id}");
529
530 let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
531 Some(acceptor) => match acceptor.accept(tcp_stream).await {
532 Ok(tls_stream) => Box::new(tls_stream),
533 Err(err) => {
534 tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
535 return;
536 }
537 },
538 None => Box::new(tcp_stream),
539 };
540
541 let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
542 Ok(ws_stream) => ws_stream,
543 Err(e) => {
544 tracing::error!("Error during the websocket handshake occurred: {}", e);
545 return;
546 }
547 };
548
549 let (ws_tx, ws_rx) = ws_stream.split();
550
551 let ch_rx_stream = ws_ch_rx
552 .into_stream()
553 .map(|(out_msg, sequence_id)| Ok(Message::Binary(out_msg.to_wire(sequence_id))))
554 .forward(ws_tx);
555
556 let admin_client =
558 Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
559 state_map2
560 .write()
561 .await
562 .insert(id.to_string(), admin_client.clone());
563
564 let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
565
566 let incoming_ws = tokio::task::spawn(async move {
568 let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
569
570 while let Ok(Some(msg)) = non_close_messages.try_next().await {
571 if let Some(response) = handle_message(msg, &mut remote_state).await {
572 if let Err(err) = ws_ch_tx.send(response) {
573 tracing::error!("WS Send Error: {err:?}");
574 };
575 };
576 }
577 remote_state.clear().await;
578 });
579
580 pin_mut!(ch_rx_stream, incoming_ws);
581 future::select(ch_rx_stream, incoming_ws).await;
582
583 state_map2.write().await.remove(&id.to_string());
585
586 tracing::info!("Client Disconnected {}", sock_adress.as_ref());
587 };
588
589 spawn_future(new_websocket);
590 }
591}
592
593async fn handle_message(
594 msg: Message,
595 state: &mut RemoteState,
596) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
597 match msg {
598 Message::Binary(val) => match InRemoteMessage::from_wire(val) {
599 Ok((header, msg)) => {
600 match state.handle_message(msg).await {
601 Ok(Some(msg)) => Some((msg, header.sequence_id)),
602 Ok(None) => header.sequence_id.map(|_| {
603 (
604 OutRemoteMessage::Ok(interface::Ok {
605 content_id: header.content_id,
606 }),
607 header.sequence_id,
608 )
609 }),
610 Err(error) => {
611 tracing::error!(
612 "RemoteAPI: Failed to execute request {:?}: {}",
613 header.content_id,
614 error
615 );
616 header.sequence_id.map(|_| {
617 (
619 OutRemoteMessage::Error(interface::Error {
620 error: error.to_string(),
621 }),
622 header.sequence_id,
623 )
624 })
625 }
626 }
627 }
628 Err(err) => match err {
629 interface::FromWireError::HeaderError(error) => {
630 tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
631 None
632 }
633 interface::FromWireError::BodyError((header, error)) => {
634 tracing::error!(
635 "RemoteAPI: Failed to parse message body for {:?}: {}",
636 header,
637 error
638 );
639 header.sequence_id.map(|_| {
640 (
642 OutRemoteMessage::Error(interface::Error {
643 error: error.to_string(),
644 }),
645 header.sequence_id,
646 )
647 })
648 }
649 },
650 },
651 _ => {
652 tracing::error!("RemoteAPI: message format is not `Binary`");
653 None
654 }
655 }
656}