1use std::{
22 collections::HashMap,
23 fs::File,
24 future::Future,
25 io::{self, BufReader, ErrorKind},
26 net::SocketAddr,
27 path::Path,
28 sync::Arc,
29};
30
31use flume::Sender;
32use futures::{future, pin_mut, StreamExt, TryStreamExt};
33use interface::RemoteAPIMsg;
34use rustls_pemfile::{certs, private_key};
35use serde::Serialize;
36use tokio::{
37 net::{TcpListener, TcpStream},
38 select,
39 sync::RwLock,
40 task::JoinHandle,
41};
42use tokio_rustls::{
43 rustls::{
44 self,
45 pki_types::{CertificateDer, PrivateKeyDer},
46 },
47 server::TlsStream,
48 TlsAcceptor,
49};
50use tokio_tungstenite::tungstenite::protocol::Message;
51use tracing::{debug, error};
52use uhlc::Timestamp;
53use uuid::Uuid;
54use zenoh::{
55 bytes::{Encoding, ZBytes},
56 internal::{
57 plugins::{RunningPluginTrait, ZenohPlugin},
58 runtime::Runtime,
59 },
60 key_expr::{
61 format::{kedefine, keformat},
62 keyexpr, OwnedKeyExpr,
63 },
64 liveliness::LivelinessToken,
65 pubsub::Publisher,
66 query::{Querier, Query},
67 Session,
68};
69use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
70use zenoh_result::{bail, zerror, ZResult};
71
72mod config;
73pub use config::Config;
74
75mod handle_control_message;
76mod handle_data_message;
77mod interface;
78use crate::{
79 handle_control_message::handle_control_message, handle_data_message::handle_data_message,
80};
81
82kedefine!(
83 pub ke_admin_version: "${plugin_status_key:**}/__version__",
85
86 pub ke_admin_prefix: "@/${zenoh_id:*}/remote-plugin/",
88);
89
90const WORKER_THREAD_NUM: usize = 2;
91const MAX_BLOCK_THREAD_NUM: usize = 50;
92const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
93
94lazy_static::lazy_static! {
95 static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
96 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
98 .worker_threads(WORKER_THREAD_NUM)
99 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
100 .enable_all()
101 .build()
102 .expect("Unable to create runtime");
103 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
104}
105
106#[derive(Debug)]
108enum AdminRef {
109 Config,
110 Version,
111}
112
113#[inline(always)]
114pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
115where
116 F: Future + Send + 'static,
117 F::Output: Send + 'static,
118{
119 match tokio::runtime::Handle::try_current() {
121 Ok(rt) => {
122 rt.spawn(task)
124 }
125 Err(_) => {
126 TOKIO_RUNTIME.spawn(task)
128 }
129 }
130}
131
132pub fn spawn_future(fut: impl Future<Output = ()> + 'static + std::marker::Send) -> JoinHandle<()> {
133 match tokio::runtime::Handle::try_current() {
134 Ok(rt) => rt.spawn(fut),
135 Err(_) => TOKIO_RUNTIME.spawn(fut),
136 }
137}
138
139fn load_certs(path: &Path) -> io::Result<Vec<CertificateDer<'static>>> {
140 certs(&mut BufReader::new(File::open(path)?)).collect()
141}
142
143fn load_key(path: &Path) -> io::Result<PrivateKeyDer<'static>> {
144 private_key(&mut BufReader::new(File::open(path)?))
145 .unwrap()
146 .ok_or(io::Error::new(
147 ErrorKind::Other,
148 "No private key found".to_string(),
149 ))
150}
151
152pub struct RemoteApiPlugin;
153
154#[cfg(feature = "dynamic_plugin")]
155zenoh_plugin_trait::declare_plugin!(RemoteApiPlugin);
156impl ZenohPlugin for RemoteApiPlugin {}
157impl Plugin for RemoteApiPlugin {
158 type StartArgs = Runtime;
159 type Instance = zenoh::internal::plugins::RunningPlugin;
160 const DEFAULT_NAME: &'static str = "remote_api";
161 const PLUGIN_VERSION: &'static str = plugin_version!();
162 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
163
164 fn start(
165 name: &str,
166 runtime: &Self::StartArgs,
167 ) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
168 zenoh_util::try_init_log_from_env();
172 tracing::info!("Starting {name}");
173
174 let runtime_conf = runtime.config().lock();
175
176 let plugin_conf = runtime_conf
177 .plugin(name)
178 .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
179
180 let conf: Config = serde_json::from_value(plugin_conf.clone())
181 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
182
183 let wss_config: Option<(Vec<CertificateDer<'_>>, PrivateKeyDer<'_>)> =
184 match conf.secure_websocket.clone() {
185 Some(wss_config) => {
186 tracing::info!("Loading certs from : {} ...", wss_config.certificate_path);
187 let certs = load_certs(Path::new(&wss_config.certificate_path))
188 .map_err(|err| zerror!("Could not Load WSS Cert `{}`", err))?;
189 tracing::info!(
190 "Loading Private Key from : {} ...",
191 wss_config.private_key_path
192 );
193 let key = load_key(Path::new(&wss_config.private_key_path))
194 .map_err(|err| zerror!("Could not Load WSS Private Key `{}`", err))?;
195 Some((certs, key))
196 }
197 None => None,
198 };
199
200 let weak_runtime = Runtime::downgrade(runtime);
201 if let Some(runtime) = weak_runtime.upgrade() {
202 spawn_runtime(run(runtime, conf, wss_config));
203
204 Ok(Box::new(RunningPlugin(RemoteAPIPlugin)))
205 } else {
206 bail!("Cannot Get Zenoh Instance of Runtime !")
207 }
208 }
209}
210
211pub async fn run(
212 runtime: Runtime,
213 config: Config,
214 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
215) {
216 let hm: HashMap<SocketAddr, RemoteState> = HashMap::new();
217 let state_map = Arc::new(RwLock::new(hm));
218
219 let remote_api_runtime = RemoteAPIRuntime {
221 config: Arc::new(config),
222 wss_certs: opt_certs,
223 zenoh_runtime: runtime,
224 state_map,
225 };
226
227 remote_api_runtime.run().await;
228}
229
230struct RemoteAPIRuntime {
231 config: Arc<Config>,
232 wss_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
233 zenoh_runtime: Runtime,
234 state_map: StateMap,
235}
236
237impl RemoteAPIRuntime {
238 async fn run(self) {
239 let run_websocket_server = run_websocket_server(
240 &self.config.websocket_port,
241 self.zenoh_runtime.clone(),
242 self.state_map.clone(),
243 self.wss_certs,
244 );
245
246 let config = (*self.config).clone();
247
248 let run_admin_space_queryable =
249 run_admin_space_queryable(self.zenoh_runtime.clone(), self.state_map.clone(), config);
250
251 select!(
252 _ = run_websocket_server => {},
253 _ = run_admin_space_queryable => {},
254 );
255 }
256}
257
258#[derive(Debug, Serialize)]
259struct AdminSpaceClient {
260 uuid: String,
261 remote_address: SocketAddr,
262 publishers: Vec<String>,
263 subscribers: Vec<String>,
264 queryables: Vec<String>,
265}
266
267impl From<(&SocketAddr, &RemoteState)> for AdminSpaceClient {
268 fn from(value: (&SocketAddr, &RemoteState)) -> Self {
269 let remote_state = value.1;
270 let sock_addr = value.0;
271
272 let pub_keyexprs = remote_state
273 .publishers
274 .values()
275 .map(|x| x.key_expr().to_string())
276 .collect::<Vec<String>>();
277
278 let query_keyexprs = remote_state
279 .queryables
280 .values()
281 .map(|(_, key_expr)| key_expr.to_string())
282 .collect::<Vec<String>>();
283
284 let sub_keyexprs = remote_state
285 .subscribers
286 .values()
287 .map(|(_, key_expr)| key_expr.to_string())
288 .collect::<Vec<String>>();
289
290 AdminSpaceClient {
291 uuid: remote_state.session_id.to_string(),
292 remote_address: *sock_addr,
293 publishers: pub_keyexprs,
294 subscribers: sub_keyexprs,
295 queryables: query_keyexprs,
296 }
297 }
298}
299
300async fn run_admin_space_queryable(zenoh_runtime: Runtime, state_map: StateMap, config: Config) {
301 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
302 Ok(session) => session,
303 Err(err) => {
304 tracing::error!("Unable to get Zenoh session from Runtime {err}");
305 return;
306 }
307 };
308
309 let admin_prefix = keformat!(
310 ke_admin_prefix::formatter(),
311 zenoh_id = session.zid().into_keyexpr()
312 )
313 .unwrap();
314
315 let mut admin_space: HashMap<OwnedKeyExpr, AdminRef> = HashMap::new();
316
317 admin_space.insert(
318 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
319 AdminRef::Config,
320 );
321 admin_space.insert(
322 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
323 AdminRef::Version,
324 );
325
326 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
327
328 let admin_queryable = session
329 .declare_queryable(admin_keyexpr_expr)
330 .await
331 .expect("Failed fo create AdminSpace Queryable");
332
333 loop {
334 match admin_queryable.recv_async().await {
335 Ok(query) => {
336 let query_ke: OwnedKeyExpr = query.key_expr().to_owned().into();
337
338 if query_ke.is_wild() {
339 if query_ke.contains("clients") {
340 let read_guard = state_map.read().await;
341 let mut admin_space_clients = Vec::new();
342 for (sock, remote_state) in read_guard.iter() {
343 admin_space_clients.push(AdminSpaceClient::from((sock, remote_state)));
344 }
345 send_reply(admin_space_clients, query, query_ke).await;
346 } else {
347 for (ke, admin_ref) in admin_space.iter() {
348 if query_ke.intersects(ke) {
349 send_admin_reply(&query, ke, admin_ref, &config).await;
350 }
351 }
352 }
353 } else {
354 let own_ke: OwnedKeyExpr = query_ke.to_owned();
355 if own_ke.contains("config") {
356 send_admin_reply(&query, &own_ke, &AdminRef::Config, &config).await;
357 }
358 if own_ke.contains("client") {
359 let mut opt_id = None;
360 let split = own_ke.split('/');
361 let mut next_is_id = false;
362 for elem in split {
363 if next_is_id {
364 opt_id = Some(elem);
365 } else if elem.contains("client") {
366 next_is_id = true;
367 }
368 }
369 if let Some(id) = opt_id {
370 let read_guard = state_map.read().await;
371 for (sock, remote_state) in read_guard.iter() {
372 if remote_state.session_id.to_string() == id {
373 send_reply(
374 AdminSpaceClient::from((sock, remote_state)),
375 query,
376 own_ke,
377 )
378 .await;
379
380 break;
381 }
382 }
383 }
384 }
385 }
386 }
387 Err(_) => {
388 tracing::warn!("Admin Space queryable was closed!");
389 }
390 }
391 }
392}
393
394async fn send_reply<T>(reply: T, query: Query, query_ke: OwnedKeyExpr)
395where
396 T: Sized + Serialize,
397{
398 match serde_json::to_string_pretty(&reply) {
399 Ok(json_string) => {
400 if let Err(err) = query
401 .reply(query_ke, json_string)
402 .encoding(Encoding::APPLICATION_JSON)
403 .await
404 {
405 error!("AdminSpace: Reply to Query failed, {}", err);
406 };
407 }
408 Err(_) => {
409 error!("AdminSpace: Could not seralize client data");
410 }
411 };
412}
413
414async fn send_admin_reply(
415 query: &Query,
416 key_expr: &keyexpr,
417 admin_ref: &AdminRef,
418 config: &Config,
419) {
420 let z_bytes: ZBytes = match admin_ref {
421 AdminRef::Version => match serde_json::to_value(RemoteApiPlugin::PLUGIN_LONG_VERSION) {
422 Ok(v) => match serde_json::to_vec(&v) {
423 Ok(value) => ZBytes::from(value),
424 Err(e) => {
425 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
426 return;
427 }
428 },
429 Err(e) => {
430 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
431 return;
432 }
433 },
434 AdminRef::Config => match serde_json::to_value(config) {
435 Ok(v) => match serde_json::to_vec(&v) {
436 Ok(value) => ZBytes::from(value),
437 Err(e) => {
438 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
439 return;
440 }
441 },
442 Err(e) => {
443 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
444 return;
445 }
446 },
447 };
448 if let Err(e) = query
449 .reply(key_expr.to_owned(), z_bytes)
450 .encoding(zenoh::bytes::Encoding::APPLICATION_JSON)
451 .await
452 {
453 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
454 }
455}
456
457struct RemoteAPIPlugin;
458
459#[allow(dead_code)]
460struct RunningPlugin(RemoteAPIPlugin);
461
462impl PluginControl for RunningPlugin {}
463
464impl RunningPluginTrait for RunningPlugin {
465 fn config_checker(
466 &self,
467 _path: &str,
468 _current: &serde_json::Map<String, serde_json::Value>,
469 _new: &serde_json::Map<String, serde_json::Value>,
470 ) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
471 bail!("Runtime configuration change not supported");
472 }
473}
474
475type StateMap = Arc<RwLock<HashMap<SocketAddr, RemoteState>>>;
476
477struct RemoteState {
478 websocket_tx: Sender<RemoteAPIMsg>,
479 session_id: Uuid,
480 session: Session,
481 timestamps: HashMap<Uuid, Timestamp>,
483 subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
485 publishers: HashMap<Uuid, Publisher<'static>>,
486 queryables: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
488 unanswered_queries: Arc<std::sync::RwLock<HashMap<Uuid, Query>>>,
489 liveliness_tokens: HashMap<Uuid, LivelinessToken>,
491 liveliness_subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
492 queriers: HashMap<Uuid, Querier<'static>>,
494}
495
496impl RemoteState {
497 fn new(websocket_tx: Sender<RemoteAPIMsg>, session_id: Uuid, session: Session) -> Self {
498 Self {
499 websocket_tx,
500 session_id,
501 session,
502 timestamps: HashMap::new(),
503 subscribers: HashMap::new(),
504 publishers: HashMap::new(),
505 queryables: HashMap::new(),
506 unanswered_queries: Arc::new(std::sync::RwLock::new(HashMap::new())),
507 liveliness_tokens: HashMap::new(),
508 liveliness_subscribers: HashMap::new(),
509 queriers: HashMap::new(),
510 }
511 }
512
513 async fn cleanup(self) {
514 for (_, publisher) in self.publishers {
515 if let Err(e) = publisher.undeclare().await {
516 error!("{e}")
517 }
518 }
519 for (_, (subscriber, _)) in self.subscribers {
520 subscriber.abort();
521 }
522
523 for (_, (queryable, _)) in self.queryables {
524 queryable.abort();
525 }
526
527 drop(self.unanswered_queries);
528
529 for (_, queryable) in self.liveliness_tokens {
530 if let Err(e) = queryable.undeclare().await {
531 error!("{e}")
532 }
533 }
534
535 for (_, (liveliness_subscriber, _)) in self.liveliness_subscribers {
536 liveliness_subscriber.abort();
537 }
538
539 if let Err(err) = self.session.close().await {
540 error!("{err}")
541 };
542 }
543}
544
545pub trait Streamable:
546 tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Send + Unpin
547{
548}
549impl Streamable for TcpStream {}
550impl Streamable for TlsStream<TcpStream> {}
551
552async fn run_websocket_server(
554 ws_port: &String,
555 zenoh_runtime: Runtime,
556 state_map: StateMap,
557 opt_certs: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
558) {
559 let mut opt_tls_acceptor: Option<TlsAcceptor> = None;
560
561 if let Some((certs, key)) = opt_certs {
562 let config = rustls::ServerConfig::builder()
563 .with_no_client_auth()
564 .with_single_cert(certs, key)
565 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
566 .expect("Could not build TLS Configuration from Certficiate/Key Combo :");
567 opt_tls_acceptor = Some(TlsAcceptor::from(Arc::new(config)));
568 }
569
570 let server: TcpListener = match TcpListener::bind(ws_port).await {
571 Ok(server) => server,
572 Err(err) => {
573 tracing::error!("Unable to start TcpListener {err}");
574 return;
575 }
576 };
577
578 while let Ok((tcp_stream, sock_addr)) = server.accept().await {
579 let state_map = state_map.clone();
580 let zenoh_runtime = zenoh_runtime.clone();
581 let opt_tls_acceptor = opt_tls_acceptor.clone();
582
583 let new_websocket = async move {
584 let sock_adress = Arc::new(sock_addr);
585 let (ws_ch_tx, ws_ch_rx) = flume::unbounded::<RemoteAPIMsg>();
586
587 let mut write_guard = state_map.write().await;
588
589 let session = match zenoh::session::init(zenoh_runtime.clone()).await {
590 Ok(session) => session,
591 Err(err) => {
592 tracing::error!("Unable to get Zenoh session from Runtime {err}");
593 return;
594 }
595 };
596 let id = Uuid::new_v4();
597 tracing::debug!("Client {sock_addr:?} -> {id}");
598
599 let state: RemoteState = RemoteState::new(ws_ch_tx.clone(), id, session);
600
601 let _ = write_guard.insert(sock_addr, state);
603 drop(write_guard);
604
605 let streamable: Box<dyn Streamable> = match &opt_tls_acceptor {
606 Some(acceptor) => match acceptor.accept(tcp_stream).await {
607 Ok(tls_stream) => Box::new(tls_stream),
608 Err(err) => {
609 error!("Could not secure TcpStream -> TlsStream {:?}", err);
610 return;
611 }
612 },
613 None => Box::new(tcp_stream),
614 };
615
616 let ws_stream = match tokio_tungstenite::accept_async(streamable).await {
617 Ok(ws_stream) => ws_stream,
618 Err(e) => {
619 error!("Error during the websocket handshake occurred: {}", e);
620 return;
621 }
622 };
623
624 let (ws_tx, ws_rx) = ws_stream.split();
625
626 let ch_rx_stream = ws_ch_rx
627 .into_stream()
628 .map(|remote_api_msg| {
629 let val = serde_json::to_string(&remote_api_msg).unwrap(); Ok(Message::Text(val))
631 })
632 .forward(ws_tx);
633
634 let sock_adress_cl = sock_adress.clone();
635
636 let state_map_cl_outer = state_map.clone();
637
638 let incoming_ws = tokio::task::spawn(async move {
640 let mut non_close_messages = ws_rx.try_filter(|msg| future::ready(!msg.is_close()));
641 let state_map_cl = state_map_cl_outer.clone();
642 let sock_adress_ref = sock_adress_cl.clone();
643 while let Ok(Some(msg)) = non_close_messages.try_next().await {
644 if let Some(response) =
645 handle_message(msg, *sock_adress_ref, state_map_cl.clone()).await
646 {
647 if let Err(err) = ws_ch_tx.send(response) {
648 error!("WS Send Error: {err:?}");
649 };
650 };
651 }
652 });
653
654 pin_mut!(ch_rx_stream, incoming_ws);
655 future::select(ch_rx_stream, incoming_ws).await;
656
657 if let Some(state) = state_map.write().await.remove(sock_adress.as_ref()) {
659 state.cleanup().await;
660 };
661
662 tracing::info!("Client Disconnected {}", sock_adress.as_ref());
663 };
664
665 spawn_future(new_websocket);
666 }
667}
668
669async fn handle_message(
670 msg: Message,
671 sock_addr: SocketAddr,
672 state_map: StateMap,
673) -> Option<RemoteAPIMsg> {
674 match msg {
675 Message::Text(text) => match serde_json::from_str::<RemoteAPIMsg>(&text) {
676 Ok(msg) => match msg {
677 RemoteAPIMsg::Control(ctrl_msg) => {
678 match handle_control_message(ctrl_msg, sock_addr, state_map).await {
679 Ok(_) => return None,
680 Err(err) => {
681 tracing::error!(err);
682 }
683 }
684 }
685 RemoteAPIMsg::Data(data_msg) => {
686 if let Err(err) = handle_data_message(data_msg, sock_addr, state_map).await {
687 tracing::error!(err);
688 }
689 }
690 },
691 Err(err) => {
692 tracing::error!(
693 "RemoteAPI: WS Message Cannot be Deserialized to RemoteAPIMsg {}, message: {}",
694 err,
695 text
696 );
697 }
698 },
699 _ => {
700 debug!("RemoteAPI: WS Message Not Text");
701 }
702 };
703 None
704}