1use std::{
22 collections::HashMap,
23 fs::File,
24 future::Future,
25 io::{self, BufReader, ErrorKind},
26 net::SocketAddr,
27 path::Path,
28 sync::{Arc, Mutex},
29};
30
31use futures::{future, pin_mut, StreamExt, TryStreamExt};
32use interface::{InRemoteMessage, OutRemoteMessage, SequenceId};
33use remote_state::RemoteState;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37 net::{TcpListener, TcpStream},
38 select,
39 sync::RwLock,
40 task::JoinHandle,
41};
42use tokio_rustls::{
43 rustls::{
44 self,
45 pki_types::{CertificateDer, PrivateKeyDer},
46 },
47 server::TlsStream,
48 TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use uuid::Uuid;
52use zenoh::{
53 bytes::{Encoding, ZBytes},
54 internal::{
55 plugins::{RunningPluginTrait, ZenohPlugin},
56 runtime::Runtime,
57 },
58 key_expr::{
59 format::{kedefine, keformat},
60 keyexpr, OwnedKeyExpr,
61 },
62 query::Query,
63};
64use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
65use zenoh_result::{bail, zerror, ZResult};
66
67mod config;
68pub use config::Config;
69
70mod interface;
71
72mod remote_state;
73
74kedefine!(
75 pub ke_admin_version: "${plugin_status_key:**}/__version__",
77
78 pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
80);
81
82const WORKER_THREAD_NUM: usize = 2;
83const MAX_BLOCK_THREAD_NUM: usize = 50;
84const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
85
86lazy_static::lazy_static! {
87 static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
88 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
90 .worker_threads(WORKER_THREAD_NUM)
91 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
92 .enable_all()
93 .build()
94 .expect("Unable to create runtime");
95 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
96}
97
98#[derive(Debug)]
100enum AdminRef {
101 Config,
102 Version,
103}
104
105#[inline(always)]
106pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
107where
108 F: Future + Send + 'static,
109 F::Output: Send + 'static,
110{
111 match tokio::runtime::Handle::try_current() {
113 Ok(rt) => {
114 rt.spawn(task)
116 }
117 Err(_) => {
118 TOKIO_RUNTIME.spawn(task)
120 }
121 }
122}
123
124pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
125 match tokio::runtime::Handle::try_current() {
126 Ok(rt) => rt.spawn(fut),
127 Err(_) => TOKIO_RUNTIME.spawn(fut),
128 }
129}
130
131fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
132 certs(&mut BufReader::new(File::open(path)?)).collect()
133}
134
135fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
136 private_key(&mut BufReader::new(File::open(path)?))
137 .unwrap()
138 .ok_or(io::Error::new(
139 ErrorKind::Other,
140 "No private key found".to_string(),
141 ))
142}
143
144pub struct RemoteApiPlugin;
145
146#[cfg(feature = "dynamic_plugin")]
147zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
148impl ZenohPlugin for RemoteApiPlugin {}
149impl Plugin for RemoteApiPlugin {
150 type StartArgs = Runtime;
151 type Instance = zenoh::internal::plugins::RunningPlugin;
152 const DEFAULT_NAME: &'static str = "remote_api";
153 const PLUGIN_VERSION: &'static str = plugin_version!();
154 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
155
156 fn start(
157 name: &str,
158 runtime: &Self::StartArgs,
159 ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
160 zenoh_util::try_init_log_from_env();
164 tracing::info!("Starting {name}");
165
166 let runtime_conf = runtime.config().lock();
167
168 let plugin_conf = runtime_conf
169 .plugin(name)
170 .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
171
172 let conf: Config = serde_json::from_value(plugin_conf.clone())
173 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
174
175 let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
176 match conf.secure_websocket.clone() {
177 Some(wss_config) => {
178 tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
179 let certs = load_certs(Path::new(&wss_config.certificate_path))
180 .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
181 tracing::info!(
182 "Loading Private Key from : {} ...",
183 wss_config.private_key_path
184 );
185 let key = load_key(Path::new(&wss_config.private_key_path))
186 .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
187 Some((certs, key))
188 }
189 None => None,
190 };
191
192 let weak_runtime = Runtime::downgrade(runtime);
193 if let Some(runtime) = weak_runtime.upgrade() {
194 spawn_runtime(run(runtime, conf, wss_config));
195
196 Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
197 } else {
198 bail!("Cannot Get Zenoh Instance of Runtime !")
199 }
200 }
201}
202
203pub async fn run(
204 runtime: Runtime,
205 config: Config,
206 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
207) {
208 let state_map = Arc::new(RwLock::new(HashMap::new()));
209
210 let remote_api_runtime = RemoteAPIRuntime {
212 config: Arc::new(config),
213 wss_certs: opt_certs,
214 zenoh_runtime: runtime,
215 state_map,
216 };
217
218 remote_api_runtime.run().await;
219}
220
221struct RemoteAPIRuntime {
222 config: Arc<Config>,
223 wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
224 zenoh_runtime: Runtime,
225 state_map: StateMap,
226}
227
228impl RemoteAPIRuntime {
229 async fn run(self) {
230 let run_websocket_server = run_websocket_server(
231 &self.config.websocket_port,
232 self.zenoh_runtime.clone(),
233 self.state_map.clone(),
234 self.wss_certs,
235 );
236
237 let config = (*self.config).clone();
238
239 let run_admin_space_queryable =
240 run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
241
242 select!(
243 _ = run_websocket_server => {},
244 _ = run_admin_space_queryable => {},
245 );
246 }
247}
248
249#[derive(Debug, Serialize, Clone)]
250pub(crate) struct AdminSpaceClient {
251 uuid: String,
252 remote_address: SocketAddr,
253 publishers: HashMap<u32, String>,
254 subscribers: HashMap<u32, String>,
255 queryables: HashMap<u32, String>,
256 queriers: HashMap<u32, String>,
257 liveliness_tokens: HashMap<u32, String>,
258}
259
260impl AdminSpaceClient {
261 pub(crate) fn new(uuid: String, remote_address: SocketAddr) -> Self {
262 AdminSpaceClient {
263 uuid,
264 remote_address,
265 publishers: HashMap::new(),
266 subscribers: HashMap::new(),
267 queryables: HashMap::new(),
268 queriers: HashMap::new(),
269 liveliness_tokens: HashMap::new(),
270 }
271 }
272
273 pub(crate) fn register_publisher(&mut self, id: u32, key_expr: &str) {
274 self.publishers.insert(id, key_expr.to_string());
275 }
276
277 pub(crate) fn register_subscriber(&mut self, id: u32, key_expr: &str) {
278 self.subscribers.insert(id, key_expr.to_string());
279 }
280
281 pub(crate) fn register_queryable(&mut self, id: u32, key_expr: &str) {
282 self.queryables.insert(id, key_expr.to_string());
283 }
284
285 pub(crate) fn register_querier(&mut self, id: u32, key_expr: &str) {
286 self.queriers.insert(id, key_expr.to_string());
287 }
288
289 pub(crate) fn unregister_publisher(&mut self, id: u32) {
290 self.publishers.remove(&id);
291 }
292
293 pub(crate) fn unregister_subscriber(&mut self, id: u32) {
294 self.subscribers.remove(&id);
295 }
296
297 pub(crate) fn unregister_queryable(&mut self, id: u32) {
298 self.queryables.remove(&id);
299 }
300
301 pub(crate) fn unregister_querier(&mut self, id: u32) {
302 self.queriers.remove(&id);
303 }
304
305 pub(crate) fn id(&self) -> &str {
306 &self.uuid
307 }
308}
309
310async fn run_admin_space_queryable(zenoh_runtime: Runtime, state_map: StateMap, config: Config) {
311 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
312 Ok(session) => session,
313 Err(err) => {
314 tracing::error!("Unable to get Zenoh session from Runtime {err}");
315 return;
316 }
317 };
318
319 let admin_prefix = keformat!(
320 ke_admin_prefix::formatter(),
321 zenoh_id = session.zid().into_keyexpr()
322 )
323 .unwrap();
324
325 let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
326
327 admin_space.insert(
328 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
329 AdminRef::Config,
330 );
331 admin_space.insert(
332 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
333 AdminRef::Version,
334 );
335
336 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
337
338 let admin_queryable = session
339 .declare_queryable(admin_keyexpr_expr)
340 .await
341 .expect("Failed fo create AdminSpace Queryable");
342
343 loop {
344 match admin_queryable.recv_async().await {
345 Ok(query) => {
346 let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
347
348 if query_ke.is_wild() {
349 if query_ke.contains("clients") {
350 let read_guard = state_map.read().await;
351 let admin_space_clients = read_guard
352 .values()
353 .map(|v| v.lock().unwrap().clone())
354 .collect::<Vec<_>>();
355 drop(read_guard);
356 send_reply(admin_space_clients, query, query_ke).await;
357 } else {
358 for (ke, admin_ref) in admin_space.iter() {
359 if query_ke.intersects(ke) {
360 send_admin_reply(&query, ke, admin_ref, &config).await;
361 }
362 }
363 }
364 } else {
365 let own_ke: OwnedKeyExpr = query_ke.to_owned();
366 if own_ke.contains("config") {
367 send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
368 }
369 if own_ke.contains("client") {
370 let mut opt_id = None;
371 let split = own_ke.split('/');
372 let mut next_is_id = false;
373 for elem in split {
374 if next_is_id {
375 opt_id = Some(elem);
376 } else if elem.contains("client") {
377 next_is_id = true;
378 }
379 }
380 if let Some(id) = opt_id {
381 let read_guard = state_map.read().await;
382 if let Some(state) = read_guard.get(id) {
383 let state = state.lock().unwrap().clone();
384 drop(read_guard);
385 send_reply(state, query, own_ke).await;
386 }
387 }
388 }
389 }
390 }
391 Err(_) => {
392 tracing::warn!("Admin Space queryable was closed!");
393 }
394 }
395 }
396}
397
398async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
399where
400 T: Sized + Serialize,
401{
402 match serde_json::to_string_pretty(&reply) {
403 Ok(json_string) => {
404 if let Err(err) = query
405 .reply(query_ke, json_string)
406 .encoding(Encoding::APPLICATION_JSON)
407 .await
408 {
409 tracing::error!("AdminSpace: Reply to Query failed, {}", err);
410 };
411 }
412 Err(_) => {
413 tracing::error!("AdminSpace: Could not seralize client data");
414 }
415 };
416}
417
418async fn send_admin_reply(
419 query: &Query,
420 key_expr: &keyexpr,
421 admin_ref: &AdminRef,
422 config: &Config,
423) {
424 let z_bytes: ZBytes = match admin_ref {
425 AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
426 Ok(v) => match serde_json::to_vec(&v) {
427 Ok(value) => ZBytes::from(value),
428 Err(e) => {
429 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
430 return;
431 }
432 },
433 Err(e) => {
434 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
435 return;
436 }
437 },
438 AdminRef::Config => match serde_json::to_value(config) {
439 Ok(v) => match serde_json::to_vec(&v) {
440 Ok(value) => ZBytes::from(value),
441 Err(e) => {
442 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
443 return;
444 }
445 },
446 Err(e) => {
447 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
448 return;
449 }
450 },
451 };
452 if let Err(e) = query
453 .reply(key_expr.to_owned(), z_bytes)
454 .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
455 .await
456 {
457 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
458 }
459}
460
461struct RemoteAPIPlugin;
462
463#[allow(dead_code)]
464struct RunningPlugin(RemoteAPIPlugin);
465
466impl PluginControl for RunningPlugin {}
467
468impl RunningPluginTrait for RunningPlugin {
469 fn config_checker(
470 &self,
471 _path: &str,
472 _current: &serde_json::Map<String, serde_json::Value>,
473 _new: &serde_json::Map<String, serde_json::Value>,
474 ) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
475 bail!("Runtime configuration change not supported");
476 }
477}
478
479type StateMap = Arc<RwLock<HashMap<String, Arc<Mutex<AdminSpaceClient>>>>>;
480
481pub trait Streamable:
482 tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
483{
484}
485impl Streamable for TcpStream {}
486impl Streamable for TlsStream<TcpStream> {}
487
488async fn run_websocket_server(
490 ws_port: &String,
491 zenoh_runtime: Runtime,
492 state_map: StateMap,
493 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
494) {
495 let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
496
497 if let Some((certs, key)) = opt_certs {
498 let config = rustls::ServerConfig::builder()
499 .with_no_client_auth()
500 .with_single_cert(certs, key)
501 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
502 .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
503 opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
504 }
505
506 let server: TcpListener = match TcpListener::bind(ws_port).await {
507 Ok(server) => server,
508 Err(err) => {
509 tracing::error!("Unable to start TcpListener {err}");
510 return;
511 }
512 };
513
514 while let Ok((tcp_stream, sock_addr)) = server.accept().await {
515 let zenoh_runtime = zenoh_runtime.clone();
516 let opt_tls_acceptor = opt_tls_acceptor.clone();
517 let state_map2 = state_map.clone();
518 let new_websocket = async move {
519 let sock_adress = Arc::new(sock_addr);
520 let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<(OutRemoteMessage, Option<SequenceId>)>();
521
522 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
523 Ok(session) => session,
524 Err(err) => {
525 tracing::error!("Unable to get Zenoh session from Runtime {err}");
526 return;
527 }
528 };
529 let id = Uuid::new_v4();
530 tracing::debug!("Client {sock_addr:?} -> {id}");
531
532 let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
533 Some(acceptor) => match acceptor.accept(tcp_stream).await {
534 Ok(tls_stream) => Box::new(tls_stream),
535 Err(err) => {
536 tracing::error!("Could not secure TcpStream -> TlsStream {:?}", err);
537 return;
538 }
539 },
540 None => Box::new(tcp_stream),
541 };
542
543 let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
544 Ok(ws_stream) => ws_stream,
545 Err(e) => {
546 tracing::error!("Error during the websocket handshake occurred: {}", e);
547 return;
548 }
549 };
550
551 let (ws_tx, ws_rx) = ws_stream.split();
552
553 let ch_rx_stream = ws_ch_rx
554 .into_stream()
555 .map(|(out_msg, sequence_id)| Ok(Message::Binary(out_msg.to_wire(sequence_id))))
556 .forward(ws_tx);
557
558 let admin_client =
560 Arc::new(Mutex::new(AdminSpaceClient::new(id.to_string(), sock_addr)));
561 state_map2
562 .write()
563 .await
564 .insert(id.to_string(), admin_client.clone());
565
566 let mut remote_state = RemoteState::new(ws_ch_tx.clone(), admin_client, session);
567
568 let incoming_ws = tokio::task::spawn(async move {
570 let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
571
572 while let Ok(Some(msg)) = non_close_messages.try_next().await {
573 if let Some(response) = handle_message(msg, &mut remote_state).await {
574 if let Err(err) = ws_ch_tx.send(response) {
575 tracing::error!("WS Send Error: {err:?}");
576 };
577 };
578 }
579 remote_state.clear().await;
580 });
581
582 pin_mut!(ch_rx_stream, incoming_ws);
583 future::select(ch_rx_stream, incoming_ws).await;
584
585 state_map2.write().await.remove(&id.to_string());
587
588 tracing::info!("Client Disconnected {}", sock_adress.as_ref());
589 };
590
591 spawn_future(new_websocket);
592 }
593}
594
595async fn handle_message(
596 msg: Message,
597 state: &mut RemoteState,
598) -> Option<(OutRemoteMessage, Option<SequenceId>)> {
599 match msg {
600 Message::Binary(val) => match InRemoteMessage::from_wire(val) {
601 Ok((header, msg)) => {
602 match state.handle_message(msg).await {
603 Ok(Some(msg)) => Some((msg, header.sequence_id)),
604 Ok(None) => header.sequence_id.map(|_| {
605 (
606 OutRemoteMessage::Ok(interface::Ok {
607 content_id: header.content_id,
608 }),
609 header.sequence_id,
610 )
611 }),
612 Err(error) => {
613 tracing::error!(
614 "RemoteAPI: Failed to execute request {:?}: {}",
615 header.content_id,
616 error
617 );
618 header.sequence_id.map(|_| {
619 (
621 OutRemoteMessage::Error(interface::Error {
622 error: error.to_string(),
623 }),
624 header.sequence_id,
625 )
626 })
627 }
628 }
629 }
630 Err(err) => match err {
631 interface::FromWireError::HeaderError(error) => {
632 tracing::error!("RemoteAPI: Failed to parse message header: {}", error);
633 None
634 }
635 interface::FromWireError::BodyError((header, error)) => {
636 tracing::error!(
637 "RemoteAPI: Failed to parse message body for {:?}: {}",
638 header,
639 error
640 );
641 header.sequence_id.map(|_| {
642 (
644 OutRemoteMessage::Error(interface::Error {
645 error: error.to_string(),
646 }),
647 header.sequence_id,
648 )
649 })
650 }
651 },
652 },
653 _ => {
654 tracing::error!("RemoteAPI: message format is not `Binary`");
655 None
656 }
657 }
658}