use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::client::Client;
use crate::error::{Error, Result};
use crate::executor::CallbackExecutor;
use crate::message::Message;
use crate::parameter::Parameter;
use crate::publisher::Publisher;
use crate::qos::{QosPreset, QosProfile};
use crate::service::Service;
use crate::subscriber::Subscriber;
use crate::transport::ZenohTransport;
pub struct DropGuard {
cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
}
impl DropGuard {
pub fn new<F>(cleanup: F) -> Self
where
F: FnOnce() + Send + Sync + 'static,
{
Self {
cleanup: Some(Box::new(cleanup)),
}
}
}
impl Drop for DropGuard {
fn drop(&mut self) {
if let Some(cleanup) = self.cleanup.take() {
cleanup();
}
}
}
pub struct PublisherHandle<M: Message> {
publisher: Arc<Publisher<M>>,
_cleanup: DropGuard,
}
impl<M: Message> PublisherHandle<M> {
fn new(
publisher: Arc<Publisher<M>>,
topic: String,
publishers_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
) -> Self {
let cleanup = DropGuard::new(move || {
publishers_map
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&topic);
tracing::debug!("Publisher dropped for topic: {}", topic);
});
Self {
publisher,
_cleanup: cleanup,
}
}
pub fn publisher(&self) -> &Arc<Publisher<M>> {
&self.publisher
}
pub fn publish(&self, message: &M) -> Result<()> {
self.publisher.publish(message)
}
pub fn topic(&self) -> &str {
self.publisher.topic()
}
}
pub struct SubscriberHandle {
subscriber: Arc<Subscriber>,
_cleanup: DropGuard,
}
impl SubscriberHandle {
fn new(
subscriber: Arc<Subscriber>,
topic: String,
subscribers_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
) -> Self {
let cleanup = DropGuard::new(move || {
subscribers_map
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&topic);
tracing::debug!("Subscriber dropped for topic: {}", topic);
});
Self {
subscriber,
_cleanup: cleanup,
}
}
pub fn subscriber(&self) -> &Arc<Subscriber> {
&self.subscriber
}
}
pub struct ServiceHandle {
service: Arc<Service>,
_cleanup: DropGuard,
}
impl ServiceHandle {
fn new(
service: Arc<Service>,
service_name: String,
services_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
) -> Self {
let cleanup = DropGuard::new(move || {
services_map
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&service_name);
tracing::debug!("Service dropped: {}", service_name);
});
Self {
service,
_cleanup: cleanup,
}
}
pub fn service(&self) -> &Arc<Service> {
&self.service
}
}
pub struct ClientHandle<Req: Message, Res: Message> {
client: Arc<Client<Req, Res>>,
_cleanup: DropGuard,
}
impl<Req: Message, Res: Message> ClientHandle<Req, Res> {
fn new(
client: Arc<Client<Req, Res>>,
service_name: String,
clients_map: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
) -> Self {
let cleanup = DropGuard::new(move || {
clients_map
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&service_name);
tracing::debug!("Client dropped for service: {}", service_name);
});
Self {
client,
_cleanup: cleanup,
}
}
pub fn client(&self) -> &Arc<Client<Req, Res>> {
&self.client
}
pub fn call(&self, request: &Req) -> Result<Res> {
self.client.call(request)
}
pub async fn call_async(&self, request: &Req) -> Result<Res> {
self.client.call_async(request).await
}
}
pub struct Node {
name: String,
transport: ZenohTransport,
executor: Arc<CallbackExecutor>,
publishers: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
subscribers: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
services: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
clients: Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
parameters: Mutex<HashMap<String, Parameter>>,
_discovery_queryable:
Option<zenoh::query::Queryable<zenoh::handlers::FifoChannelHandler<zenoh::query::Query>>>,
_discovery_task: Option<tokio::task::JoinHandle<()>>,
}
impl Node {
pub const NODE_PREFIX: &str = "zenobuf/node/";
pub async fn new(name: &str) -> Result<Self> {
let transport = ZenohTransport::new().await?;
Self::with_transport(name, transport).await
}
pub async fn with_transport(name: &str, transport: ZenohTransport) -> Result<Self> {
let (discovery_queryable, discovery_task) =
Self::create_discovery_queryable(&transport, name).await?;
Ok(Self {
name: name.to_string(),
transport,
executor: Arc::new(CallbackExecutor::new()),
publishers: Arc::new(Mutex::new(HashMap::new())),
subscribers: Arc::new(Mutex::new(HashMap::new())),
services: Arc::new(Mutex::new(HashMap::new())),
clients: Arc::new(Mutex::new(HashMap::new())),
parameters: Mutex::new(HashMap::new()),
_discovery_queryable: Some(discovery_queryable),
_discovery_task: Some(discovery_task),
})
}
async fn create_discovery_queryable(
transport: &ZenohTransport,
name: &str,
) -> Result<(
zenoh::query::Queryable<zenoh::handlers::FifoChannelHandler<zenoh::query::Query>>,
tokio::task::JoinHandle<()>,
)> {
let key = format!("{}{}", Self::NODE_PREFIX, name);
let key_expr = zenoh::key_expr::KeyExpr::try_from(key.clone())
.map_err(|e| Error::node(name, format!("Failed to create discovery key: {}", e)))?;
let node_info = serde_json::json!({
"name": name,
"status": "active",
"pid": std::process::id(),
});
let queryable = transport
.session()
.declare_queryable(key_expr)
.await
.map_err(Error::from)?;
let queryable_clone = queryable.clone();
let info_str = node_info.to_string();
let key_clone = key.clone();
let task = tokio::spawn(async move {
while let Ok(query) = queryable_clone.recv_async().await {
let _ = query.reply(&key_clone, info_str.clone()).await;
}
});
tracing::debug!("Node '{}' registered for discovery at {}", name, key);
Ok((queryable, task))
}
#[allow(dead_code)]
pub(crate) fn executor(&self) -> &Arc<CallbackExecutor> {
&self.executor
}
pub fn name(&self) -> &str {
&self.name
}
pub async fn create_publisher<M: Message>(
&self,
topic: &str,
qos: QosProfile,
) -> Result<Arc<Publisher<M>>> {
let topic_name = topic.to_string();
if self.publishers.lock().unwrap().contains_key(&topic_name) {
return Err(Error::topic_already_exists(&topic_name, &self.name));
}
let inner_publisher = self
.transport
.create_publisher::<M>(&topic_name, &qos)
.await?;
let publisher = Arc::new(Publisher::new(
topic_name.clone(),
Box::new(inner_publisher),
));
let mut publishers = self.publishers.lock().unwrap();
if publishers.contains_key(&topic_name) {
return Err(Error::topic_already_exists(&topic_name, &self.name));
}
publishers.insert(topic_name, Box::new(publisher.clone()));
Ok(publisher)
}
pub async fn create_subscriber<M: Message, F>(
&self,
topic: &str,
_qos: QosProfile,
callback: F,
) -> Result<Arc<Subscriber>>
where
F: Fn(M) + Send + Sync + 'static,
{
let topic_name = topic.to_string();
if self.subscribers.lock().unwrap().contains_key(&topic_name) {
return Err(Error::topic_already_exists(&topic_name, &self.name));
}
let inner_subscriber = self
.transport
.create_subscriber::<M, F>(&topic_name, callback, Some(self.executor.clone()))
.await?;
let subscriber = Arc::new(Subscriber::new(
topic_name.clone(),
Box::new(inner_subscriber),
));
let mut subscribers = self.subscribers.lock().unwrap();
if subscribers.contains_key(&topic_name) {
return Err(Error::topic_already_exists(&topic_name, &self.name));
}
subscribers.insert(topic_name, Box::new(subscriber.clone()));
Ok(subscriber)
}
pub async fn create_service<Req: Message, Res: Message, F>(
&self,
service_name: &str,
handler: F,
) -> Result<Arc<Service>>
where
F: Fn(Req) -> Result<Res> + Send + Sync + 'static,
{
let full_service_name = service_name.to_string();
if self
.services
.lock()
.unwrap()
.contains_key(&full_service_name)
{
return Err(Error::service_already_exists(
&full_service_name,
&self.name,
));
}
let inner_service = self
.transport
.create_service::<Req, Res, F>(&full_service_name, handler)
.await?;
let service = Arc::new(Service::new(
full_service_name.clone(),
Box::new(inner_service),
));
let mut services = self.services.lock().unwrap();
if services.contains_key(&full_service_name) {
return Err(Error::service_already_exists(
&full_service_name,
&self.name,
));
}
services.insert(full_service_name, Box::new(service.clone()));
Ok(service)
}
pub fn create_client<Req: Message, Res: Message>(
&self,
service_name: &str,
) -> Result<Arc<Client<Req, Res>>> {
let full_service_name = service_name.to_string();
let mut clients = self.clients.lock().unwrap();
if clients.contains_key(&full_service_name) {
return Err(Error::service_already_exists(
&full_service_name,
&self.name,
));
}
let inner_client = self
.transport
.create_client::<Req, Res>(&full_service_name)?;
let client = Arc::new(Client::new(
full_service_name.clone(),
Box::new(inner_client),
));
clients.insert(full_service_name, Box::new(client.clone()));
Ok(client)
}
pub fn set_parameter<
T: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
>(
&self,
name: &str,
value: T,
) -> Result<()> {
let mut parameters = self.parameters.lock().unwrap();
parameters.insert(name.to_string(), Parameter::new(name, value)?);
Ok(())
}
pub fn get_parameter<T: serde::de::DeserializeOwned + Clone + Send + Sync + 'static>(
&self,
name: &str,
) -> Result<T> {
let parameters = self.parameters.lock().unwrap();
parameters
.get(name)
.ok_or_else(|| Error::parameter(name, "Parameter not found"))?
.get_value()
}
pub fn spin_once(&self) -> Result<usize> {
Ok(self.executor.process_pending())
}
pub async fn spin(&self) -> Result<()> {
while !self.executor.is_shutdown() {
let notified = self.executor.notified();
let processed = self.spin_once()?;
if processed == 0 {
let _ = tokio::time::timeout(Duration::from_secs(1), notified).await;
}
}
Ok(())
}
pub fn shutdown(&self) {
self.executor.shutdown();
}
pub fn is_shutdown(&self) -> bool {
self.executor.is_shutdown()
}
pub fn publisher<M: Message>(&self, topic: &str) -> PublisherBuilder<'_, M> {
PublisherBuilder::new(self, topic)
}
pub fn subscriber<M: Message>(&self, topic: &str) -> SubscriberBuilder<'_, M> {
SubscriberBuilder::new(self, topic)
}
pub fn service<Req: Message, Res: Message>(&self, name: &str) -> ServiceBuilder<'_, Req, Res> {
ServiceBuilder::new(self, name)
}
pub fn client<Req: Message, Res: Message>(&self, name: &str) -> ClientBuilder<'_, Req, Res> {
ClientBuilder::new(self, name)
}
pub async fn publish<M: Message>(&self, topic: &str) -> Result<Arc<Publisher<M>>> {
self.create_publisher(topic, QosProfile::default()).await
}
pub async fn subscribe<M: Message, F>(
&self,
topic: &str,
callback: F,
) -> Result<Arc<Subscriber>>
where
F: Fn(M) + Send + Sync + 'static,
{
self.create_subscriber(topic, QosProfile::default(), callback)
.await
}
}
pub struct PublisherBuilder<'a, M: Message> {
node: &'a Node,
topic: String,
qos: QosProfile,
_phantom: PhantomData<M>,
}
impl<'a, M: Message> PublisherBuilder<'a, M> {
fn new(node: &'a Node, topic: &str) -> Self {
Self {
node,
topic: topic.to_string(),
qos: QosProfile::default(),
_phantom: PhantomData,
}
}
pub fn with_qos(mut self, qos: QosProfile) -> Self {
self.qos = qos;
self
}
pub fn with_qos_preset(mut self, preset: QosPreset) -> Self {
self.qos = preset.into();
self
}
pub fn reliable(mut self) -> Self {
self.qos.reliability = crate::qos::Reliability::Reliable;
self
}
pub fn best_effort(mut self) -> Self {
self.qos.reliability = crate::qos::Reliability::BestEffort;
self
}
pub fn with_depth(mut self, depth: usize) -> Self {
self.qos.depth = depth;
self
}
pub async fn build(self) -> Result<PublisherHandle<M>> {
let topic = self.topic.clone();
let publisher = self.node.create_publisher(&self.topic, self.qos).await?;
Ok(PublisherHandle::new(
publisher,
topic,
self.node.publishers.clone(),
))
}
}
pub struct SubscriberBuilder<'a, M: Message> {
node: &'a Node,
topic: String,
qos: QosProfile,
_phantom: PhantomData<M>,
}
impl<'a, M: Message> SubscriberBuilder<'a, M> {
fn new(node: &'a Node, topic: &str) -> Self {
Self {
node,
topic: topic.to_string(),
qos: QosProfile::default(),
_phantom: PhantomData,
}
}
pub fn with_qos(mut self, qos: QosProfile) -> Self {
self.qos = qos;
self
}
pub fn with_qos_preset(mut self, preset: QosPreset) -> Self {
self.qos = preset.into();
self
}
pub fn reliable(mut self) -> Self {
self.qos.reliability = crate::qos::Reliability::Reliable;
self
}
pub fn best_effort(mut self) -> Self {
self.qos.reliability = crate::qos::Reliability::BestEffort;
self
}
pub fn with_depth(mut self, depth: usize) -> Self {
self.qos.depth = depth;
self
}
pub async fn build<F>(self, callback: F) -> Result<SubscriberHandle>
where
F: Fn(M) + Send + Sync + 'static,
{
let topic = self.topic.clone();
let subscriber = self
.node
.create_subscriber(&self.topic, self.qos, callback)
.await?;
Ok(SubscriberHandle::new(
subscriber,
topic,
self.node.subscribers.clone(),
))
}
}
pub struct ServiceBuilder<'a, Req: Message, Res: Message> {
node: &'a Node,
name: String,
_phantom: PhantomData<(Req, Res)>,
}
impl<'a, Req: Message, Res: Message> ServiceBuilder<'a, Req, Res> {
fn new(node: &'a Node, name: &str) -> Self {
Self {
node,
name: name.to_string(),
_phantom: PhantomData,
}
}
pub async fn build<F>(self, handler: F) -> Result<ServiceHandle>
where
F: Fn(Req) -> Result<Res> + Send + Sync + 'static,
{
let name = self.name.clone();
let service = self.node.create_service(&self.name, handler).await?;
Ok(ServiceHandle::new(
service,
name,
self.node.services.clone(),
))
}
}
pub struct ClientBuilder<'a, Req: Message, Res: Message> {
node: &'a Node,
name: String,
_phantom: PhantomData<(Req, Res)>,
}
impl<'a, Req: Message, Res: Message> ClientBuilder<'a, Req, Res> {
fn new(node: &'a Node, name: &str) -> Self {
Self {
node,
name: name.to_string(),
_phantom: PhantomData,
}
}
pub fn build(self) -> Result<ClientHandle<Req, Res>> {
let name = self.name.clone();
let client = self.node.create_client(&self.name)?;
Ok(ClientHandle::new(client, name, self.node.clients.clone()))
}
}