1use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use crate::client::Client;
9use crate::error::{Error, Result};
10use crate::executor::CallbackExecutor;
11use crate::message::Message;
12use crate::parameter::Parameter;
13use crate::publisher::Publisher;
14use crate::qos::{QosPreset, QosProfile};
15use crate::service::Service;
16use crate::subscriber::Subscriber;
17use crate::transport::ZenohTransport;
18
19pub struct DropGuard {
21 cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
22}
23
24impl DropGuard {
25 pub fn new<F>(cleanup: F) -> Self
26 where
27 F: FnOnce() + Send + Sync + 'static,
28 {
29 Self {
30 cleanup: Some(Box::new(cleanup)),
31 }
32 }
33}
34
35impl Drop for DropGuard {
36 fn drop(&mut self) {
37 if let Some(cleanup) = self.cleanup.take() {
38 cleanup();
39 }
40 }
41}
42
43pub struct PublisherHandle<M: Message> {
45 publisher: Arc<Publisher<M>>,
46 _cleanup: DropGuard,
47}
48
49impl<M: Message> PublisherHandle<M> {
50 fn new(
51 publisher: Arc<Publisher<M>>,
52 topic: String,
53 publishers_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
54 ) -> Self {
55 let cleanup = DropGuard::new(move || {
56 publishers_map
57 .lock()
58 .unwrap_or_else(|e| e.into_inner())
59 .remove(&topic);
60 tracing::debug!("Publisher dropped for topic: {}", topic);
61 });
62
63 Self {
64 publisher,
65 _cleanup: cleanup,
66 }
67 }
68
69 pub fn publisher(&self) -> &Arc<Publisher<M>> {
71 &self.publisher
72 }
73
74 pub fn publish(&self, message: &M) -> Result<()> {
76 self.publisher.publish(message)
77 }
78
79 pub fn topic(&self) -> &str {
81 self.publisher.topic()
82 }
83}
84
85pub struct SubscriberHandle {
87 subscriber: Arc<Subscriber>,
88 _cleanup: DropGuard,
89}
90
91impl SubscriberHandle {
92 fn new(
93 subscriber: Arc<Subscriber>,
94 topic: String,
95 subscribers_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
96 ) -> Self {
97 let cleanup = DropGuard::new(move || {
98 subscribers_map
99 .lock()
100 .unwrap_or_else(|e| e.into_inner())
101 .remove(&topic);
102 tracing::debug!("Subscriber dropped for topic: {}", topic);
103 });
104
105 Self {
106 subscriber,
107 _cleanup: cleanup,
108 }
109 }
110
111 pub fn subscriber(&self) -> &Arc<Subscriber> {
113 &self.subscriber
114 }
115}
116
117pub struct ServiceHandle {
119 service: Arc<Service>,
120 _cleanup: DropGuard,
121}
122
123impl ServiceHandle {
124 fn new(
125 service: Arc<Service>,
126 service_name: String,
127 services_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
128 ) -> Self {
129 let cleanup = DropGuard::new(move || {
130 services_map
131 .lock()
132 .unwrap_or_else(|e| e.into_inner())
133 .remove(&service_name);
134 tracing::debug!("Service dropped: {}", service_name);
135 });
136
137 Self {
138 service,
139 _cleanup: cleanup,
140 }
141 }
142
143 pub fn service(&self) -> &Arc<Service> {
145 &self.service
146 }
147}
148
149pub struct ClientHandle<Req: Message, Res: Message> {
151 client: Arc<Client<Req, Res>>,
152 _cleanup: DropGuard,
153}
154
155impl<Req: Message, Res: Message> ClientHandle<Req, Res> {
156 fn new(
157 client: Arc<Client<Req, Res>>,
158 service_name: String,
159 clients_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
160 ) -> Self {
161 let cleanup = DropGuard::new(move || {
162 clients_map
163 .lock()
164 .unwrap_or_else(|e| e.into_inner())
165 .remove(&service_name);
166 tracing::debug!("Client dropped for service: {}", service_name);
167 });
168
169 Self {
170 client,
171 _cleanup: cleanup,
172 }
173 }
174
175 pub fn client(&self) -> &Arc<Client<Req, Res>> {
177 &self.client
178 }
179
180 pub fn call(&self, request: &Req) -> Result<Res> {
182 self.client.call(request)
183 }
184
185 pub async fn call_async(&self, request: &Req) -> Result<Res> {
187 self.client.call_async(request).await
188 }
189}
190
191pub struct Node {
196 name: String,
198 transport: ZenohTransport,
200 executor: Arc<CallbackExecutor>,
202 publishers: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
204 subscribers: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
206 services: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
208 clients: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
210 parameters: Mutex<HashMap<String, Parameter>>,
212 _discovery_queryable:
214 Option<zenoh::query::Queryable<zenoh::handlers::FifoChannelHandler<zenoh::query::Query>>>,
215 _discovery_task: Option<tokio::task::JoinHandle<()>>,
217}
218
219impl Node {
220 pub const NODE_PREFIX: &str = "zenobuf/node/";
222
223 pub async fn new(name: &str) -> Result<Self> {
225 let transport = ZenohTransport::new().await?;
226 Self::with_transport(name, transport).await
227 }
228
229 pub async fn with_transport(name: &str, transport: ZenohTransport) -> Result<Self> {
231 let (discovery_queryable, discovery_task) =
232 Self::create_discovery_queryable(&transport, name).await?;
233
234 Ok(Self {
235 name: name.to_string(),
236 transport,
237 executor: Arc::new(CallbackExecutor::new()),
238 publishers: Arc::new(Mutex::new(HashMap::new())),
239 subscribers: Arc::new(Mutex::new(HashMap::new())),
240 services: Arc::new(Mutex::new(HashMap::new())),
241 clients: Arc::new(Mutex::new(HashMap::new())),
242 parameters: Mutex::new(HashMap::new()),
243 _discovery_queryable: Some(discovery_queryable),
244 _discovery_task: Some(discovery_task),
245 })
246 }
247
248 async fn create_discovery_queryable(
250 transport: &ZenohTransport,
251 name: &str,
252 ) -> Result<(
253 zenoh::query::Queryable<zenoh::handlers::FifoChannelHandler<zenoh::query::Query>>,
254 tokio::task::JoinHandle<()>,
255 )> {
256 let key = format!("{}{}", Self::NODE_PREFIX, name);
257 let key_expr = zenoh::key_expr::KeyExpr::try_from(key.clone())
258 .map_err(|e| Error::node(name, format!("Failed to create discovery key: {}", e)))?;
259
260 let node_info = serde_json::json!({
261 "name": name,
262 "status": "active",
263 "pid": std::process::id(),
264 });
265
266 let queryable = transport
267 .session()
268 .declare_queryable(key_expr)
269 .await
270 .map_err(Error::from)?;
271
272 let queryable_clone = queryable.clone();
274 let info_str = node_info.to_string();
275 let key_clone = key.clone();
276
277 let task = tokio::spawn(async move {
278 while let Ok(query) = queryable_clone.recv_async().await {
279 let _ = query.reply(&key_clone, info_str.clone()).await;
280 }
281 });
282
283 tracing::debug!("Node '{}' registered for discovery at {}", name, key);
284
285 Ok((queryable, task))
286 }
287
288 #[allow(dead_code)]
292 pub(crate) fn executor(&self) -> &Arc<CallbackExecutor> {
293 &self.executor
294 }
295
296 pub fn name(&self) -> &str {
298 &self.name
299 }
300
301 pub async fn create_publisher<M: Message>(
303 &self,
304 topic: &str,
305 qos: QosProfile,
306 ) -> Result<Arc<Publisher<M>>> {
307 let topic_name = topic.to_string();
308
309 if self.publishers.lock().unwrap().contains_key(&topic_name) {
311 return Err(Error::topic_already_exists(&topic_name, &self.name));
312 }
313
314 let inner_publisher = self
315 .transport
316 .create_publisher::<M>(&topic_name, &qos)
317 .await?;
318 let publisher = Arc::new(Publisher::new(
319 topic_name.clone(),
320 Box::new(inner_publisher),
321 ));
322
323 let mut publishers = self.publishers.lock().unwrap();
325 if publishers.contains_key(&topic_name) {
326 return Err(Error::topic_already_exists(&topic_name, &self.name));
327 }
328 publishers.insert(topic_name, Box::new(publisher.clone()));
329
330 Ok(publisher)
331 }
332
333 pub async fn create_subscriber<M: Message, F>(
335 &self,
336 topic: &str,
337 _qos: QosProfile,
338 callback: F,
339 ) -> Result<Arc<Subscriber>>
340 where
341 F: Fn(M) + Send + Sync + 'static,
342 {
343 let topic_name = topic.to_string();
344
345 if self.subscribers.lock().unwrap().contains_key(&topic_name) {
346 return Err(Error::topic_already_exists(&topic_name, &self.name));
347 }
348
349 let inner_subscriber = self
350 .transport
351 .create_subscriber::<M, F>(&topic_name, callback, Some(self.executor.clone()))
352 .await?;
353 let subscriber = Arc::new(Subscriber::new(
354 topic_name.clone(),
355 Box::new(inner_subscriber),
356 ));
357
358 let mut subscribers = self.subscribers.lock().unwrap();
359 if subscribers.contains_key(&topic_name) {
360 return Err(Error::topic_already_exists(&topic_name, &self.name));
361 }
362 subscribers.insert(topic_name, Box::new(subscriber.clone()));
363
364 Ok(subscriber)
365 }
366
367 pub async fn create_service<Req: Message, Res: Message, F>(
369 &self,
370 service_name: &str,
371 handler: F,
372 ) -> Result<Arc<Service>>
373 where
374 F: Fn(Req) -> Result<Res> + Send + Sync + 'static,
375 {
376 let full_service_name = service_name.to_string();
377
378 if self
379 .services
380 .lock()
381 .unwrap()
382 .contains_key(&full_service_name)
383 {
384 return Err(Error::service_already_exists(
385 &full_service_name,
386 &self.name,
387 ));
388 }
389
390 let inner_service = self
391 .transport
392 .create_service::<Req, Res, F>(&full_service_name, handler)
393 .await?;
394 let service = Arc::new(Service::new(
395 full_service_name.clone(),
396 Box::new(inner_service),
397 ));
398
399 let mut services = self.services.lock().unwrap();
400 if services.contains_key(&full_service_name) {
401 return Err(Error::service_already_exists(
402 &full_service_name,
403 &self.name,
404 ));
405 }
406 services.insert(full_service_name, Box::new(service.clone()));
407
408 Ok(service)
409 }
410
411 pub fn create_client<Req: Message, Res: Message>(
413 &self,
414 service_name: &str,
415 ) -> Result<Arc<Client<Req, Res>>> {
416 let full_service_name = service_name.to_string();
418
419 let mut clients = self.clients.lock().unwrap();
421 if clients.contains_key(&full_service_name) {
422 return Err(Error::service_already_exists(
423 &full_service_name,
424 &self.name,
425 ));
426 }
427
428 let inner_client = self
430 .transport
431 .create_client::<Req, Res>(&full_service_name)?;
432 let client = Arc::new(Client::new(
433 full_service_name.clone(),
434 Box::new(inner_client),
435 ));
436
437 clients.insert(full_service_name, Box::new(client.clone()));
439
440 Ok(client)
441 }
442
443 pub fn set_parameter<
445 T: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
446 >(
447 &self,
448 name: &str,
449 value: T,
450 ) -> Result<()> {
451 let mut parameters = self.parameters.lock().unwrap();
452 parameters.insert(name.to_string(), Parameter::new(name, value)?);
453 Ok(())
454 }
455
456 pub fn get_parameter<T: serde::de::DeserializeOwned + Clone + Send + Sync + 'static>(
458 &self,
459 name: &str,
460 ) -> Result<T> {
461 let parameters = self.parameters.lock().unwrap();
462 parameters
463 .get(name)
464 .ok_or_else(|| Error::parameter(name, "Parameter not found"))?
465 .get_value()
466 }
467
468 pub fn spin_once(&self) -> Result<usize> {
472 Ok(self.executor.process_pending())
473 }
474
475 pub async fn spin(&self) -> Result<()> {
479 while !self.executor.is_shutdown() {
480 let notified = self.executor.notified();
484 let processed = self.spin_once()?;
485 if processed == 0 {
486 let _ = tokio::time::timeout(Duration::from_secs(1), notified).await;
487 }
488 }
489 Ok(())
490 }
491
492 pub fn shutdown(&self) {
496 self.executor.shutdown();
497 }
498
499 pub fn is_shutdown(&self) -> bool {
501 self.executor.is_shutdown()
502 }
503
504 pub fn publisher<M: Message>(&self, topic: &str) -> PublisherBuilder<'_, M> {
508 PublisherBuilder::new(self, topic)
509 }
510
511 pub fn subscriber<M: Message>(&self, topic: &str) -> SubscriberBuilder<'_, M> {
513 SubscriberBuilder::new(self, topic)
514 }
515
516 pub fn service<Req: Message, Res: Message>(&self, name: &str) -> ServiceBuilder<'_, Req, Res> {
518 ServiceBuilder::new(self, name)
519 }
520
521 pub fn client<Req: Message, Res: Message>(&self, name: &str) -> ClientBuilder<'_, Req, Res> {
523 ClientBuilder::new(self, name)
524 }
525
526 pub async fn publish<M: Message>(&self, topic: &str) -> Result<Arc<Publisher<M>>> {
530 self.create_publisher(topic, QosProfile::default()).await
531 }
532
533 pub async fn subscribe<M: Message, F>(
535 &self,
536 topic: &str,
537 callback: F,
538 ) -> Result<Arc<Subscriber>>
539 where
540 F: Fn(M) + Send + Sync + 'static,
541 {
542 self.create_subscriber(topic, QosProfile::default(), callback)
543 .await
544 }
545}
546
547pub struct PublisherBuilder<'a, M: Message> {
549 node: &'a Node,
550 topic: String,
551 qos: QosProfile,
552 _phantom: PhantomData<M>,
553}
554
555impl<'a, M: Message> PublisherBuilder<'a, M> {
556 fn new(node: &'a Node, topic: &str) -> Self {
557 Self {
558 node,
559 topic: topic.to_string(),
560 qos: QosProfile::default(),
561 _phantom: PhantomData,
562 }
563 }
564
565 pub fn with_qos(mut self, qos: QosProfile) -> Self {
567 self.qos = qos;
568 self
569 }
570
571 pub fn with_qos_preset(mut self, preset: QosPreset) -> Self {
573 self.qos = preset.into();
574 self
575 }
576
577 pub fn reliable(mut self) -> Self {
579 self.qos.reliability = crate::qos::Reliability::Reliable;
580 self
581 }
582
583 pub fn best_effort(mut self) -> Self {
585 self.qos.reliability = crate::qos::Reliability::BestEffort;
586 self
587 }
588
589 pub fn with_depth(mut self, depth: usize) -> Self {
591 self.qos.depth = depth;
592 self
593 }
594
595 pub async fn build(self) -> Result<PublisherHandle<M>> {
597 let topic = self.topic.clone();
598 let publisher = self.node.create_publisher(&self.topic, self.qos).await?;
599 Ok(PublisherHandle::new(
600 publisher,
601 topic,
602 self.node.publishers.clone(),
603 ))
604 }
605}
606
607pub struct SubscriberBuilder<'a, M: Message> {
609 node: &'a Node,
610 topic: String,
611 qos: QosProfile,
612 _phantom: PhantomData<M>,
613}
614
615impl<'a, M: Message> SubscriberBuilder<'a, M> {
616 fn new(node: &'a Node, topic: &str) -> Self {
617 Self {
618 node,
619 topic: topic.to_string(),
620 qos: QosProfile::default(),
621 _phantom: PhantomData,
622 }
623 }
624
625 pub fn with_qos(mut self, qos: QosProfile) -> Self {
627 self.qos = qos;
628 self
629 }
630
631 pub fn with_qos_preset(mut self, preset: QosPreset) -> Self {
633 self.qos = preset.into();
634 self
635 }
636
637 pub fn reliable(mut self) -> Self {
639 self.qos.reliability = crate::qos::Reliability::Reliable;
640 self
641 }
642
643 pub fn best_effort(mut self) -> Self {
645 self.qos.reliability = crate::qos::Reliability::BestEffort;
646 self
647 }
648
649 pub fn with_depth(mut self, depth: usize) -> Self {
651 self.qos.depth = depth;
652 self
653 }
654
655 pub async fn build<F>(self, callback: F) -> Result<SubscriberHandle>
657 where
658 F: Fn(M) + Send + Sync + 'static,
659 {
660 let topic = self.topic.clone();
661 let subscriber = self
662 .node
663 .create_subscriber(&self.topic, self.qos, callback)
664 .await?;
665 Ok(SubscriberHandle::new(
666 subscriber,
667 topic,
668 self.node.subscribers.clone(),
669 ))
670 }
671}
672
673pub struct ServiceBuilder<'a, Req: Message, Res: Message> {
675 node: &'a Node,
676 name: String,
677 _phantom: PhantomData<(Req, Res)>,
678}
679
680impl<'a, Req: Message, Res: Message> ServiceBuilder<'a, Req, Res> {
681 fn new(node: &'a Node, name: &str) -> Self {
682 Self {
683 node,
684 name: name.to_string(),
685 _phantom: PhantomData,
686 }
687 }
688
689 pub async fn build<F>(self, handler: F) -> Result<ServiceHandle>
691 where
692 F: Fn(Req) -> Result<Res> + Send + Sync + 'static,
693 {
694 let name = self.name.clone();
695 let service = self.node.create_service(&self.name, handler).await?;
696 Ok(ServiceHandle::new(
697 service,
698 name,
699 self.node.services.clone(),
700 ))
701 }
702}
703
704pub struct ClientBuilder<'a, Req: Message, Res: Message> {
706 node: &'a Node,
707 name: String,
708 _phantom: PhantomData<(Req, Res)>,
709}
710
711impl<'a, Req: Message, Res: Message> ClientBuilder<'a, Req, Res> {
712 fn new(node: &'a Node, name: &str) -> Self {
713 Self {
714 node,
715 name: name.to_string(),
716 _phantom: PhantomData,
717 }
718 }
719
720 pub fn build(self) -> Result<ClientHandle<Req, Res>> {
722 let name = self.name.clone();
723 let client = self.node.create_client(&self.name)?;
724 Ok(ClientHandle::new(client, name, self.node.clients.clone()))
725 }
726}