1use std::{
22 collections::HashMap,
23 fs::File,
24 future::Future,
25 io::{self, BufReader},
26 net::SocketAddr,
27 path::Path,
28 sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37 net::{TcpListener, TcpStream},
38 select,
39 sync::RwLock,
40 task::JoinHandle,
41};
42use tokio_rustls::{
43 rustls::{
44 self,
45 pki_types::{CertificateDer, PrivateKeyDer},
46 },
47 server::TlsStream,
48 TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53 bytes::{Encoding, ZBytes},
54 internal::{
55 plugins::{RunningPluginTrait, ZenohPlugin},
56 runtime::DynamicRuntime,
57 },
58 key_expr::{
59 format::{kedefine, keformat},
60 keyexpr, OwnedKeyExpr,
61 },
62 query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66use zenoh_util::ffi::JsonKeyValueMap;
67
68mod config;
69pub use config::Config;
70
71use crate::interface::{LivelinessTokenId, PublisherId, QuerierId, QueryableId, SubscriberId};
72
73mod interface;
74
75mod remote_state;
76
77kedefine!(
78 pub ke_admin_version: "${plugin_status_key:**}/__version__",
80
81 pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
83);
84
85const WORKER_THREAD_NUM: usize = 2;
86const MAX_BLOCK_THREAD_NUM: usize = 50;
87
88lazy_static::lazy_static! {
89 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
91 .worker_threads(WORKER_THREAD_NUM)
92 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
93 .enable_all()
94 .build()
95 .expect("Unable to create runtime");
96 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
97}
98
99#[derive(Debug)]
101enum AdminRef {
102 Config,
103 Version,
104}
105
106#[inline(always)]
107pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
108where
109 F: Future + Send + 'static,
110 F::Output: Send + 'static,
111{
112 match tokio::runtime::Handle::try_current() {
114 Ok(rt) => {
115 rt.spawn(task)
117 }
118 Err(_) => {
119 TOKIO_RUNTIME.spawn(task)
121 }
122 }
123}
124
125pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
126 match tokio::runtime::Handle::try_current() {
127 Ok(rt) => rt.spawn(fut),
128 Err(_) => TOKIO_RUNTIME.spawn(fut),
129 }
130}
131
132fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
133 certs(&mut BufReader::new(File::open(path)?)).collect()
134}
135
136fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
137 private_key(&mut BufReader::new(File::open(path)?))
138 .unwrap()
139 .ok_or_else(|| io::Error::other("No private key found"))
140}
141
142pub struct RemoteApiPlugin;
143
144#[cfg(feature = "dynamic_plugin")]
145zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
146impl ZenohPlugin for RemoteApiPlugin {}
147impl Plugin for RemoteApiPlugin {
148 type StartArgs = DynamicRuntime;
149 type Instance = zenoh::internal::plugins::RunningPlugin;
150 const DEFAULT_NAME: &'static str = "remote_api";
151 const PLUGIN_VERSION: &'static str = plugin_version!();
152 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
153
154 fn start(
155 name: &str,
156 runtime: &Self::StartArgs,
157 ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
158 zenoh_util::try_init_log_from_env();
162 tracing::info!("Starting {name}");
163
164 let plugin_conf = runtime
165 .get_config()
166 .get_plugin_config(name)
167 .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
168
169 let conf: Config = serde_json::from_value(plugin_conf)
170 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
171
172 let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
173 match conf.secure_websocket.clone() {
174 Some(wss_config) => {
175 tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
176 let certs = load_certs(Path::new(&wss_config.certificate_path))
177 .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
178 tracing::info!(
179 "Loading Private Key from : {} ...",
180 wss_config.private_key_path
181 );
182 let key = load_key(Path::new(&wss_config.private_key_path))
183 .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
184 Some((certs, key))
185 }
186 None => None,
187 };
188
189 spawn_runtime(run(runtime.clone(), conf, wss_config));
190 Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
191 }
192}
193
194pub async fn run(
195 runtime: DynamicRuntime,
196 config: Config,
197 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
198) {
199 let state_map = Arc::new(RwLock::new(HashMap::new()));
200
201 let remote_api_runtime = RemoteAPIRuntime {
203 config: Arc::new(config),
204 wss_certs: opt_certs,
205 zenoh_runtime: runtime,
206 state_map,
207 };
208
209 remote_api_runtime.run().await;
210}
211
212struct RemoteAPIRuntime {
213 config: Arc<Config>,
214 wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
215 zenoh_runtime: DynamicRuntime,
216 state_map: StateMap,
217}
218
219impl RemoteAPIRuntime {
220 async fn run(self) {
221 let run_websocket_server = run_websocket_server(
222 &self.config.websocket_port,
223 self.zenoh_runtime.clone(),
224 self.state_map.clone(),
225 self.wss_certs,
226 );
227
228 let config = (*self.config).clone();
229
230 let run_admin_space_queryable =
231 run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
232
233 select!(
234 _ = run_websocket_server => {},
235 _ = run_admin_space_queryable => {},
236 );
237 }
238}
239
240#[derive(Debug, Serialize, Clone)]
241pub(crate) struct AdminSpaceClient {
242 uuid: String,
243 remote_address: SocketAddr,
244 publishers: HashMap<PublisherId, String>,
245 subscribers: HashMap<SubscriberId, String>,
246 queryables: HashMap<QueryableId, String>,
247 queriers: HashMap<QuerierId, String>,
248 liveliness_tokens: HashMap<LivelinessTokenId, String>,
249}
250
251impl AdminSpaceClient {
252 pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
253 AdminSpaceClient {
254 uuid,
255 remote_address,
256 publishers: HashMap::new(),
257 subscribers: HashMap::new(),
258 queryables: HashMap::new(),
259 queriers: HashMap::new(),
260 liveliness_tokens: HashMap::new(),
261 }
262 }
263
264 pub(crate) fn register_publisher(&mut self, id: PublisherId, key_expr: &str) {
265 self.publishers.insert(id, key_expr.to_string());
266 }
267
268 pub(crate) fn register_subscriber(&mut self, id: SubscriberId, key_expr: &str) {
269 self.subscribers.insert(id, key_expr.to_string());
270 }
271
272 pub(crate) fn register_queryable(&mut self, id: QueryableId, key_expr: &str) {
273 self.queryables.insert(id, key_expr.to_string());
274 }
275
276 pub(crate) fn register_querier(&mut self, id: QuerierId, key_expr: &str) {
277 self.queriers.insert(id, key_expr.to_string());
278 }
279
280 pub(crate) fn unregister_publisher(&mut self, id: PublisherId) {
281 self.publishers.remove(&id);
282 }
283
284 pub(crate) fn unregister_subscriber(&mut self, id: SubscriberId) {
285 self.subscribers.remove(&id);
286 }
287
288 pub(crate) fn unregister_queryable(&mut self, id: QueryableId) {
289 self.queryables.remove(&id);
290 }
291
292 pub(crate) fn unregister_querier(&mut self, id: QuerierId) {
293 self.queriers.remove(&id);
294 }
295
296 pub(crate) fn id(&self) -> &str {
297 &self.uuid
298 }
299}
300
301async fn run_admin_space_queryable(
302 zenoh_runtime: DynamicRuntime,
303 state_map: StateMap,
304 config: Config,
305) {
306 let session = match zenoh::session::init(zenoh_runtime).await {
307 Ok(session) => session,
308 Err(err) => {
309 tracing::error!("Unable to get Zenoh session from Runtime {err}");
310 return;
311 }
312 };
313
314 let admin_prefix = keformat!(
315 ke_admin_prefix::formatter(),
316 zenoh_id = session.zid().into_keyexpr()
317 )
318 .unwrap();
319
320 let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
321
322 admin_space.insert(
323 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
324 AdminRef::Config,
325 );
326 admin_space.insert(
327 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
328 AdminRef::Version,
329 );
330
331 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
332
333 let admin_queryable = session
334 .declare_queryable(admin_keyexpr_expr)
335 .await
336 .expect("Failed fo create AdminSpace Queryable");
337
338 loop {
339 match admin_queryable.recv_async().await {
340 Ok(query) => {
341 let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
342
343 if query_ke.is_wild() {
344 if query_ke.contains("clients") {
345 let read_guard = state_map.read().await;
346 let admin_space_clients = read_guard
347 .values()
348 .map(|v| v.lock().unwrap().clone())
349 .collect::<Vec<_>>();
350 drop(read_guard);
351 send_reply(admin_space_clients, query, query_ke).await;
352 } else {
353 for (ke, admin_ref) in admin_space.iter() {
354 if query_ke.intersects(ke) {
355 send_admin_reply(&query, ke, admin_ref, &config).await;
356 }
357 }
358 }
359 } else {
360 let own_ke: OwnedKeyExpr = query_ke.to_owned();
361 if own_ke.contains("config") {
362 send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
363 }
364 if own_ke.contains("client") {
365 let mut opt_id = None;
366 let split = own_ke.split('/');
367 let mut next_is_id = false;
368 for elem in split {
369 if next_is_id {
370 opt_id = Some(elem);
371 } else if elem.contains("client") {
372 next_is_id = true;
373 }
374 }
375 if let Some(id) = opt_id {
376 let read_guard = state_map.read().await;
377 if let Some(state) = read_guard.get(id) {
378 let state = state.lock().unwrap().clone();
379 drop(read_guard);
380 send_reply(state, query, own_ke).await;
381 }
382 }
383 }
384 }
385 }
386 Err(_) => {
387 tracing::warn!("Admin Space queryable was closed!");
388 }
389 }
390 }
391}
392
393async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
394where
395 T: Sized + Serialize,
396{
397 match serde_json::to_string_pretty(&reply) {
398 Ok(json_string) => {
399 if let Err(err) = query
400 .reply(query_ke, json_string)
401 .encoding(Encoding::APPLICATION_JSON)
402 .await
403 {
404 tracing::error!("AdminSpace: Reply to Query failed, {}", err);
405 };
406 }
407 Err(_) => {
408 tracing::error!("AdminSpace: Could not seralize client data");
409 }
410 };
411}
412
413async fn send_admin_reply(
414 query: &Query,
415 key_expr: &keyexpr,
416 admin_ref: &AdminRef,
417 config: &Config,
418) {
419 let z_bytes: ZBytes = match admin_ref {
420 AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
421 Ok(v) => match serde_json::to_vec(&v) {
422 Ok(value) => ZBytes::from(value),
423 Err(e) => {
424 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
425 return;
426 }
427 },
428 Err(e) => {
429 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
430 return;
431 }
432 },
433 AdminRef::Config => match serde_json::to_value(config) {
434 Ok(v) => match serde_json::to_vec(&v) {
435 Ok(value) => ZBytes::from(value),
436 Err(e) => {
437 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
438 return;
439 }
440 },
441 Err(e) => {
442 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
443 return;
444 }
445 },
446 };
447 if let Err(e) = query
448 .reply(key_expr.to_owned(), z_bytes)
449 .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
450 .await
451 {
452 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
453 }
454}
455
456struct RemoteAPIPlugin;
457
458#[allow(dead_code)]
459struct RunningPlugin(RemoteAPIPlugin);
460
461impl PluginControl for RunningPlugin {}
462
463impl RunningPluginTrait for RunningPlugin {
464 fn config_checker(
465 &self,
466 _path: &str,
467 _current: &JsonKeyValueMap,
468 _new: &JsonKeyValueMap,
469 ) -> ZResult<Option<JsonKeyValueMap>> {
470 bail!("Runtime configuration change not supported");
471 }
472}
473
474type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
475
476pub trait Streamable:
477 tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
478{
479}
480impl Streamable for TcpStream {}
481impl Streamable for TlsStream<TcpStream> {}
482
483async fn run_websocket_server(
485 ws_port: &String,
486 zenoh_runtime: DynamicRuntime,
487 state_map: StateMap,
488 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
489) {
490 let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
491
492 if let Some((certs, key)) = opt_certs {
493 let config = rustls::ServerConfig::builder()
494 .with_no_client_auth()
495 .with_single_cert(certs, key)
496 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
497 .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
498 opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
499 }
500
501 let server: TcpListener = match TcpListener::bind(ws_port).await {
502 Ok(server) => server,
503 Err(err) => {
504 tracing::error!("Unable to start TcpListener {err}");
505 return;
506 }
507 };
508
509 while let Ok((tcp_stream, sock_addr)) = server.accept().await {
510 let zenoh_runtime = zenoh_runtime.clone();
511 let opt_tls_acceptor = opt_tls_acceptor.clone();
512 let state_map2 = state_map.clone();
513 let new_websocket = async move {
514 let sock_adress = Arc::new(sock_addr);
515 let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
516
517 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
518 Ok(session) => session,
519 Err(err) => {
520 tracing::error!("Unable to get Zenoh session from Runtime {err}");
521 return;
522 }
523 };
524 let id = Uuid::new_v4();
525 tracing::debug!("Client {sock_addr:?} -> {id}");
526
527 let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
528 Some(acceptor) => match acceptor.accept(tcp_stream).await {
529 Ok(tls_stream) => Box::new(tls_stream),
530 Err(err) => {
531 tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
532 return;
533 }
534 },
535 None => Box::new(tcp_stream),
536 };
537
538 let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
539 Ok(ws_stream) => ws_stream,
540 Err(e) => {
541 tracing::error!("Error during the websocket handshake occurred: {}", e);
542 return;
543 }
544 };
545
546 let (ws_tx, ws_rx) = ws_stream.split();
547
548 let ch_rx_stream = ws_ch_rx
549 .into_stream()
550 .map(|(out_msg, sequence_id)| {
551 tracing::trace!("<< Send: {:?} (seq={:?})", out_msg.id(), sequence_id);
552 Ok(Message::Binary(out_msg.to_wire(sequence_id)))
553 })
554 .forward(ws_tx);
555
556 let admin_client =
558 Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
559 state_map2
560 .write()
561 .await
562 .insert(id.to_string(), admin_client.clone());
563
564 let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
565
566 let incoming_ws = tokio::task::spawn(async move {
568 let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
569
570 while let Ok(Some(msg)) = non_close_messages.try_next().await {
571 if let Some(response) = handle_message(msg, &mut remote_state).await {
572 if let Err(err) = ws_ch_tx.send(response) {
573 tracing::error!("WS Send Error: {err:?}");
574 };
575 };
576 }
577 remote_state.clear().await;
578 });
579
580 pin_mut!(ch_rx_stream, incoming_ws);
581 future::select(ch_rx_stream, incoming_ws).await;
582
583 state_map2.write().await.remove(&id.to_string());
585
586 tracing::info!("Client Disconnected {}", sock_adress.as_ref());
587 };
588
589 spawn_future(new_websocket);
590 }
591}
592
593async fn handle_message(
594 msg: Message,
595 state: &mut RemoteState,
596) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
597 match msg {
598 Message::Binary(val) => match InRemoteMessage::from_wire(val) {
599 Ok((header, msg)) => {
600 tracing::trace!(
601 ">> Recv: {:?} (seq={:?})",
602 header.content_id,
603 header.sequence_id
604 );
605 match state.handle_message(msg).await {
606 Ok(Some(msg)) => Some((msg, header.sequence_id)),
607 Ok(None) => header.sequence_id.map(|_| {
608 (
609 OutRemoteMessage::Ok(interface::Ok {
610 content_id: header.content_id,
611 }),
612 header.sequence_id,
613 )
614 }),
615 Err(error) => {
616 tracing::error!(
617 "RemoteAPI: Failed to execute request {:?}: {}",
618 header.content_id,
619 error
620 );
621 header.sequence_id.map(|_| {
622 (
624 OutRemoteMessage::Error(interface::Error {
625 error: error.to_string(),
626 }),
627 header.sequence_id,
628 )
629 })
630 }
631 }
632 }
633 Err(err) => match err {
634 interface::FromWireError::HeaderError(error) => {
635 tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
636 None
637 }
638 interface::FromWireError::BodyError((header, error)) => {
639 tracing::error!(
640 "RemoteAPI: Failed to parse message body for {:?}: {}",
641 header,
642 error
643 );
644 header.sequence_id.map(|_| {
645 (
647 OutRemoteMessage::Error(interface::Error {
648 error: error.to_string(),
649 }),
650 header.sequence_id,
651 )
652 })
653 }
654 },
655 },
656 _ => {
657 tracing::error!("RemoteAPI: message format is not `Binary`");
658 None
659 }
660 }
661}