Skip to main content

zenobuf_core/
node.rs

1//! Node abstraction for Zenobuf
2
3use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use crate::client::Client;
9use crate::error::{Error, Result};
10use crate::executor::CallbackExecutor;
11use crate::message::Message;
12use crate::parameter::Parameter;
13use crate::publisher::Publisher;
14use crate::qos::{QosPreset, QosProfile};
15use crate::service::Service;
16use crate::subscriber::Subscriber;
17use crate::transport::ZenohTransport;
18
19/// A guard that automatically cleans up resources when dropped
20pub struct DropGuard {
21    cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
22}
23
24impl DropGuard {
25    pub fn new<F>(cleanup: F) -> Self
26    where
27        F: FnOnce() + Send + Sync + 'static,
28    {
29        Self {
30            cleanup: Some(Box::new(cleanup)),
31        }
32    }
33}
34
35impl Drop for DropGuard {
36    fn drop(&mut self) {
37        if let Some(cleanup) = self.cleanup.take() {
38            cleanup();
39        }
40    }
41}
42
43/// A handle to a publisher with automatic cleanup
44pub struct PublisherHandle<M: Message> {
45    publisher: Arc<Publisher<M>>,
46    _cleanup: DropGuard,
47}
48
49impl<M: Message> PublisherHandle<M> {
50    fn new(
51        publisher: Arc<Publisher<M>>,
52        topic: String,
53        publishers_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
54    ) -> Self {
55        let cleanup = DropGuard::new(move || {
56            publishers_map
57                .lock()
58                .unwrap_or_else(|e| e.into_inner())
59                .remove(&topic);
60            tracing::debug!("Publisher dropped for topic: {}", topic);
61        });
62
63        Self {
64            publisher,
65            _cleanup: cleanup,
66        }
67    }
68
69    /// Get the underlying publisher
70    pub fn publisher(&self) -> &Arc<Publisher<M>> {
71        &self.publisher
72    }
73
74    /// Publish a message
75    pub fn publish(&self, message: &M) -> Result<()> {
76        self.publisher.publish(message)
77    }
78
79    /// Get the topic name
80    pub fn topic(&self) -> &str {
81        self.publisher.topic()
82    }
83}
84
85/// A handle to a subscriber with automatic cleanup
86pub struct SubscriberHandle {
87    subscriber: Arc<Subscriber>,
88    _cleanup: DropGuard,
89}
90
91impl SubscriberHandle {
92    fn new(
93        subscriber: Arc<Subscriber>,
94        topic: String,
95        subscribers_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
96    ) -> Self {
97        let cleanup = DropGuard::new(move || {
98            subscribers_map
99                .lock()
100                .unwrap_or_else(|e| e.into_inner())
101                .remove(&topic);
102            tracing::debug!("Subscriber dropped for topic: {}", topic);
103        });
104
105        Self {
106            subscriber,
107            _cleanup: cleanup,
108        }
109    }
110
111    /// Get the underlying subscriber
112    pub fn subscriber(&self) -> &Arc<Subscriber> {
113        &self.subscriber
114    }
115}
116
117/// A handle to a service with automatic cleanup
118pub struct ServiceHandle {
119    service: Arc<Service>,
120    _cleanup: DropGuard,
121}
122
123impl ServiceHandle {
124    fn new(
125        service: Arc<Service>,
126        service_name: String,
127        services_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
128    ) -> Self {
129        let cleanup = DropGuard::new(move || {
130            services_map
131                .lock()
132                .unwrap_or_else(|e| e.into_inner())
133                .remove(&service_name);
134            tracing::debug!("Service dropped: {}", service_name);
135        });
136
137        Self {
138            service,
139            _cleanup: cleanup,
140        }
141    }
142
143    /// Get the underlying service
144    pub fn service(&self) -> &Arc<Service> {
145        &self.service
146    }
147}
148
149/// A handle to a client with automatic cleanup
150pub struct ClientHandle<Req: Message, Res: Message> {
151    client: Arc<Client<Req, Res>>,
152    _cleanup: DropGuard,
153}
154
155impl<Req: Message, Res: Message> ClientHandle<Req, Res> {
156    fn new(
157        client: Arc<Client<Req, Res>>,
158        service_name: String,
159        clients_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
160    ) -> Self {
161        let cleanup = DropGuard::new(move || {
162            clients_map
163                .lock()
164                .unwrap_or_else(|e| e.into_inner())
165                .remove(&service_name);
166            tracing::debug!("Client dropped for service: {}", service_name);
167        });
168
169        Self {
170            client,
171            _cleanup: cleanup,
172        }
173    }
174
175    /// Get the underlying client
176    pub fn client(&self) -> &Arc<Client<Req, Res>> {
177        &self.client
178    }
179
180    /// Call the service
181    pub fn call(&self, request: &Req) -> Result<Res> {
182        self.client.call(request)
183    }
184
185    /// Call the service asynchronously
186    pub async fn call_async(&self, request: &Req) -> Result<Res> {
187        self.client.call_async(request).await
188    }
189}
190
191/// Node abstraction for Zenobuf
192///
193/// A Node is the main entry point for using Zenobuf. It provides methods for
194/// creating publishers, subscribers, services, and clients.
195pub struct Node {
196    /// Name of the node
197    name: String,
198    /// Transport layer
199    transport: ZenohTransport,
200    /// Callback executor for processing subscriber callbacks
201    executor: Arc<CallbackExecutor>,
202    /// Publishers
203    publishers: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
204    /// Subscribers
205    subscribers: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
206    /// Services
207    services: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
208    /// Clients
209    clients: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
210    /// Parameters
211    parameters: Mutex<HashMap<String, Parameter>>,
212    /// Discovery queryable (keeps node discoverable while alive)
213    _discovery_queryable:
214        Option<zenoh::query::Queryable<zenoh::handlers::FifoChannelHandler<zenoh::query::Query>>>,
215    /// Discovery task handle (detached on drop; terminates when queryable is dropped)
216    _discovery_task: Option<tokio::task::JoinHandle<()>>,
217}
218
219impl Node {
220    /// Prefix for node discovery key expressions
221    pub const NODE_PREFIX: &str = "zenobuf/node/";
222
223    /// Creates a new Node with the given name
224    pub async fn new(name: &str) -> Result<Self> {
225        let transport = ZenohTransport::new().await?;
226        Self::with_transport(name, transport).await
227    }
228
229    /// Creates a new Node with the given name and transport
230    pub async fn with_transport(name: &str, transport: ZenohTransport) -> Result<Self> {
231        let (discovery_queryable, discovery_task) =
232            Self::create_discovery_queryable(&transport, name).await?;
233
234        Ok(Self {
235            name: name.to_string(),
236            transport,
237            executor: Arc::new(CallbackExecutor::new()),
238            publishers: Arc::new(Mutex::new(HashMap::new())),
239            subscribers: Arc::new(Mutex::new(HashMap::new())),
240            services: Arc::new(Mutex::new(HashMap::new())),
241            clients: Arc::new(Mutex::new(HashMap::new())),
242            parameters: Mutex::new(HashMap::new()),
243            _discovery_queryable: Some(discovery_queryable),
244            _discovery_task: Some(discovery_task),
245        })
246    }
247
248    /// Creates a discovery queryable that responds to node discovery queries
249    async fn create_discovery_queryable(
250        transport: &ZenohTransport,
251        name: &str,
252    ) -> Result<(
253        zenoh::query::Queryable<zenoh::handlers::FifoChannelHandler<zenoh::query::Query>>,
254        tokio::task::JoinHandle<()>,
255    )> {
256        let key = format!("{}{}", Self::NODE_PREFIX, name);
257        let key_expr = zenoh::key_expr::KeyExpr::try_from(key.clone())
258            .map_err(|e| Error::node(name, format!("Failed to create discovery key: {}", e)))?;
259
260        let node_info = serde_json::json!({
261            "name": name,
262            "status": "active",
263            "pid": std::process::id(),
264        });
265
266        let queryable = transport
267            .session()
268            .declare_queryable(key_expr)
269            .await
270            .map_err(Error::from)?;
271
272        // Clone for the spawned task
273        let queryable_clone = queryable.clone();
274        let info_str = node_info.to_string();
275        let key_clone = key.clone();
276
277        let task = tokio::spawn(async move {
278            while let Ok(query) = queryable_clone.recv_async().await {
279                let _ = query.reply(&key_clone, info_str.clone()).await;
280            }
281        });
282
283        tracing::debug!("Node '{}' registered for discovery at {}", name, key);
284
285        Ok((queryable, task))
286    }
287
288    /// Returns a reference to the callback executor
289    ///
290    /// This is used internally by the transport layer to queue callbacks.
291    #[allow(dead_code)]
292    pub(crate) fn executor(&self) -> &Arc<CallbackExecutor> {
293        &self.executor
294    }
295
296    /// Returns the name of the node
297    pub fn name(&self) -> &str {
298        &self.name
299    }
300
301    /// Creates a publisher for the given topic
302    pub async fn create_publisher<M: Message>(
303        &self,
304        topic: &str,
305        qos: QosProfile,
306    ) -> Result<Arc<Publisher<M>>> {
307        let topic_name = topic.to_string();
308
309        // Fast-path rejection before expensive transport call
310        if self.publishers.lock().unwrap().contains_key(&topic_name) {
311            return Err(Error::topic_already_exists(&topic_name, &self.name));
312        }
313
314        let inner_publisher = self
315            .transport
316            .create_publisher::<M>(&topic_name, &qos)
317            .await?;
318        let publisher = Arc::new(Publisher::new(
319            topic_name.clone(),
320            Box::new(inner_publisher),
321        ));
322
323        // Re-check under lock to handle concurrent creation
324        let mut publishers = self.publishers.lock().unwrap();
325        if publishers.contains_key(&topic_name) {
326            return Err(Error::topic_already_exists(&topic_name, &self.name));
327        }
328        publishers.insert(topic_name, Box::new(publisher.clone()));
329
330        Ok(publisher)
331    }
332
333    /// Creates a subscriber for the given topic with a callback
334    pub async fn create_subscriber<M: Message, F>(
335        &self,
336        topic: &str,
337        _qos: QosProfile,
338        callback: F,
339    ) -> Result<Arc<Subscriber>>
340    where
341        F: Fn(M) + Send + Sync + 'static,
342    {
343        let topic_name = topic.to_string();
344
345        if self.subscribers.lock().unwrap().contains_key(&topic_name) {
346            return Err(Error::topic_already_exists(&topic_name, &self.name));
347        }
348
349        let inner_subscriber = self
350            .transport
351            .create_subscriber::<M, F>(&topic_name, callback, Some(self.executor.clone()))
352            .await?;
353        let subscriber = Arc::new(Subscriber::new(
354            topic_name.clone(),
355            Box::new(inner_subscriber),
356        ));
357
358        let mut subscribers = self.subscribers.lock().unwrap();
359        if subscribers.contains_key(&topic_name) {
360            return Err(Error::topic_already_exists(&topic_name, &self.name));
361        }
362        subscribers.insert(topic_name, Box::new(subscriber.clone()));
363
364        Ok(subscriber)
365    }
366
367    /// Creates a service for the given name with a handler
368    pub async fn create_service<Req: Message, Res: Message, F>(
369        &self,
370        service_name: &str,
371        handler: F,
372    ) -> Result<Arc<Service>>
373    where
374        F: Fn(Req) -> Result<Res> + Send + Sync + 'static,
375    {
376        let full_service_name = service_name.to_string();
377
378        if self
379            .services
380            .lock()
381            .unwrap()
382            .contains_key(&full_service_name)
383        {
384            return Err(Error::service_already_exists(
385                &full_service_name,
386                &self.name,
387            ));
388        }
389
390        let inner_service = self
391            .transport
392            .create_service::<Req, Res, F>(&full_service_name, handler)
393            .await?;
394        let service = Arc::new(Service::new(
395            full_service_name.clone(),
396            Box::new(inner_service),
397        ));
398
399        let mut services = self.services.lock().unwrap();
400        if services.contains_key(&full_service_name) {
401            return Err(Error::service_already_exists(
402                &full_service_name,
403                &self.name,
404            ));
405        }
406        services.insert(full_service_name, Box::new(service.clone()));
407
408        Ok(service)
409    }
410
411    /// Creates a client for the given service name
412    pub fn create_client<Req: Message, Res: Message>(
413        &self,
414        service_name: &str,
415    ) -> Result<Arc<Client<Req, Res>>> {
416        // Use the service name as provided by the user (global services by default)
417        let full_service_name = service_name.to_string();
418
419        // Check if the client already exists
420        let mut clients = self.clients.lock().unwrap();
421        if clients.contains_key(&full_service_name) {
422            return Err(Error::service_already_exists(
423                &full_service_name,
424                &self.name,
425            ));
426        }
427
428        // Create the client
429        let inner_client = self
430            .transport
431            .create_client::<Req, Res>(&full_service_name)?;
432        let client = Arc::new(Client::new(
433            full_service_name.clone(),
434            Box::new(inner_client),
435        ));
436
437        // Store the client
438        clients.insert(full_service_name, Box::new(client.clone()));
439
440        Ok(client)
441    }
442
443    /// Sets a parameter
444    pub fn set_parameter<
445        T: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
446    >(
447        &self,
448        name: &str,
449        value: T,
450    ) -> Result<()> {
451        let mut parameters = self.parameters.lock().unwrap();
452        parameters.insert(name.to_string(), Parameter::new(name, value)?);
453        Ok(())
454    }
455
456    /// Gets a parameter
457    pub fn get_parameter<T: serde::de::DeserializeOwned + Clone + Send + Sync + 'static>(
458        &self,
459        name: &str,
460    ) -> Result<T> {
461        let parameters = self.parameters.lock().unwrap();
462        parameters
463            .get(name)
464            .ok_or_else(|| Error::parameter(name, "Parameter not found"))?
465            .get_value()
466    }
467
468    /// Spins the node once, processing all pending callbacks
469    ///
470    /// Returns the number of callbacks that were processed.
471    pub fn spin_once(&self) -> Result<usize> {
472        Ok(self.executor.process_pending())
473    }
474
475    /// Spins the node, processing callbacks until the node is shutdown
476    ///
477    /// This method will block until `shutdown()` is called on the node.
478    pub async fn spin(&self) -> Result<()> {
479        while !self.executor.is_shutdown() {
480            // Register interest in notifications BEFORE draining the queue
481            // to avoid a race where enqueue's notify_one fires between
482            // process_pending and the await.
483            let notified = self.executor.notified();
484            let processed = self.spin_once()?;
485            if processed == 0 {
486                let _ = tokio::time::timeout(Duration::from_secs(1), notified).await;
487            }
488        }
489        Ok(())
490    }
491
492    /// Shuts down the node
493    ///
494    /// This will cause `spin()` to return and prevent new callbacks from being queued.
495    pub fn shutdown(&self) {
496        self.executor.shutdown();
497    }
498
499    /// Returns true if the node has been shutdown
500    pub fn is_shutdown(&self) -> bool {
501        self.executor.is_shutdown()
502    }
503
504    // Builder pattern methods for simplified API
505
506    /// Creates a publisher builder for the given topic
507    pub fn publisher<M: Message>(&self, topic: &str) -> PublisherBuilder<'_, M> {
508        PublisherBuilder::new(self, topic)
509    }
510
511    /// Creates a subscriber builder for the given topic
512    pub fn subscriber<M: Message>(&self, topic: &str) -> SubscriberBuilder<'_, M> {
513        SubscriberBuilder::new(self, topic)
514    }
515
516    /// Creates a service builder for the given service name
517    pub fn service<Req: Message, Res: Message>(&self, name: &str) -> ServiceBuilder<'_, Req, Res> {
518        ServiceBuilder::new(self, name)
519    }
520
521    /// Creates a client builder for the given service name
522    pub fn client<Req: Message, Res: Message>(&self, name: &str) -> ClientBuilder<'_, Req, Res> {
523        ClientBuilder::new(self, name)
524    }
525
526    // Simplified convenience methods
527
528    /// Creates a publisher with default QoS
529    pub async fn publish<M: Message>(&self, topic: &str) -> Result<Arc<Publisher<M>>> {
530        self.create_publisher(topic, QosProfile::default()).await
531    }
532
533    /// Creates a subscriber with default QoS and a callback
534    pub async fn subscribe<M: Message, F>(
535        &self,
536        topic: &str,
537        callback: F,
538    ) -> Result<Arc<Subscriber>>
539    where
540        F: Fn(M) + Send + Sync + 'static,
541    {
542        self.create_subscriber(topic, QosProfile::default(), callback)
543            .await
544    }
545}
546
547/// Builder for creating publishers with fluent API
548pub struct PublisherBuilder<'a, M: Message> {
549    node: &'a Node,
550    topic: String,
551    qos: QosProfile,
552    _phantom: PhantomData<M>,
553}
554
555impl<'a, M: Message> PublisherBuilder<'a, M> {
556    fn new(node: &'a Node, topic: &str) -> Self {
557        Self {
558            node,
559            topic: topic.to_string(),
560            qos: QosProfile::default(),
561            _phantom: PhantomData,
562        }
563    }
564
565    /// Sets the QoS profile
566    pub fn with_qos(mut self, qos: QosProfile) -> Self {
567        self.qos = qos;
568        self
569    }
570
571    /// Sets the QoS preset
572    pub fn with_qos_preset(mut self, preset: QosPreset) -> Self {
573        self.qos = preset.into();
574        self
575    }
576
577    /// Sets reliability
578    pub fn reliable(mut self) -> Self {
579        self.qos.reliability = crate::qos::Reliability::Reliable;
580        self
581    }
582
583    /// Sets best effort reliability
584    pub fn best_effort(mut self) -> Self {
585        self.qos.reliability = crate::qos::Reliability::BestEffort;
586        self
587    }
588
589    /// Sets the history depth
590    pub fn with_depth(mut self, depth: usize) -> Self {
591        self.qos.depth = depth;
592        self
593    }
594
595    /// Builds the publisher
596    pub async fn build(self) -> Result<PublisherHandle<M>> {
597        let topic = self.topic.clone();
598        let publisher = self.node.create_publisher(&self.topic, self.qos).await?;
599        Ok(PublisherHandle::new(
600            publisher,
601            topic,
602            self.node.publishers.clone(),
603        ))
604    }
605}
606
607/// Builder for creating subscribers with fluent API
608pub struct SubscriberBuilder<'a, M: Message> {
609    node: &'a Node,
610    topic: String,
611    qos: QosProfile,
612    _phantom: PhantomData<M>,
613}
614
615impl<'a, M: Message> SubscriberBuilder<'a, M> {
616    fn new(node: &'a Node, topic: &str) -> Self {
617        Self {
618            node,
619            topic: topic.to_string(),
620            qos: QosProfile::default(),
621            _phantom: PhantomData,
622        }
623    }
624
625    /// Sets the QoS profile
626    pub fn with_qos(mut self, qos: QosProfile) -> Self {
627        self.qos = qos;
628        self
629    }
630
631    /// Sets the QoS preset
632    pub fn with_qos_preset(mut self, preset: QosPreset) -> Self {
633        self.qos = preset.into();
634        self
635    }
636
637    /// Sets reliability
638    pub fn reliable(mut self) -> Self {
639        self.qos.reliability = crate::qos::Reliability::Reliable;
640        self
641    }
642
643    /// Sets best effort reliability
644    pub fn best_effort(mut self) -> Self {
645        self.qos.reliability = crate::qos::Reliability::BestEffort;
646        self
647    }
648
649    /// Sets the history depth
650    pub fn with_depth(mut self, depth: usize) -> Self {
651        self.qos.depth = depth;
652        self
653    }
654
655    /// Builds the subscriber with a callback
656    pub async fn build<F>(self, callback: F) -> Result<SubscriberHandle>
657    where
658        F: Fn(M) + Send + Sync + 'static,
659    {
660        let topic = self.topic.clone();
661        let subscriber = self
662            .node
663            .create_subscriber(&self.topic, self.qos, callback)
664            .await?;
665        Ok(SubscriberHandle::new(
666            subscriber,
667            topic,
668            self.node.subscribers.clone(),
669        ))
670    }
671}
672
673/// Builder for creating services with fluent API
674pub struct ServiceBuilder<'a, Req: Message, Res: Message> {
675    node: &'a Node,
676    name: String,
677    _phantom: PhantomData<(Req, Res)>,
678}
679
680impl<'a, Req: Message, Res: Message> ServiceBuilder<'a, Req, Res> {
681    fn new(node: &'a Node, name: &str) -> Self {
682        Self {
683            node,
684            name: name.to_string(),
685            _phantom: PhantomData,
686        }
687    }
688
689    /// Builds the service with a handler
690    pub async fn build<F>(self, handler: F) -> Result<ServiceHandle>
691    where
692        F: Fn(Req) -> Result<Res> + Send + Sync + 'static,
693    {
694        let name = self.name.clone();
695        let service = self.node.create_service(&self.name, handler).await?;
696        Ok(ServiceHandle::new(
697            service,
698            name,
699            self.node.services.clone(),
700        ))
701    }
702}
703
704/// Builder for creating clients with fluent API
705pub struct ClientBuilder<'a, Req: Message, Res: Message> {
706    node: &'a Node,
707    name: String,
708    _phantom: PhantomData<(Req, Res)>,
709}
710
711impl<'a, Req: Message, Res: Message> ClientBuilder<'a, Req, Res> {
712    fn new(node: &'a Node, name: &str) -> Self {
713        Self {
714            node,
715            name: name.to_string(),
716            _phantom: PhantomData,
717        }
718    }
719
720    /// Builds the client
721    pub fn build(self) -> Result<ClientHandle<Req, Res>> {
722        let name = self.name.clone();
723        let client = self.node.create_client(&self.name)?;
724        Ok(ClientHandle::new(client, name, self.node.clients.clone()))
725    }
726}