rosrust 0.9.10

Pure Rust implementation of a ROS client library
Documentation
use super::clock::Clock;
use super::error::Result;
use super::master::Master;
use super::slave::Slave;
use crate::rosxmlrpc::Response;
use crate::tcpros::{Message, PublisherStream, ServicePair, ServiceResult};
use crate::RawMessageDescription;
use log::error;
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

#[derive(Clone)]
pub struct Publisher<T: Message> {
    clock: Arc<dyn Clock>,
    seq: Arc<AtomicUsize>,
    stream: PublisherStream<T>,
    _raii: Arc<InteractorRaii<PublisherInfo>>,
}

impl<T: Message> Publisher<T> {
    pub(crate) fn new(
        master: Arc<Master>,
        slave: Arc<Slave>,
        clock: Arc<dyn Clock>,
        hostname: &str,
        name: &str,
        queue_size: usize,
        message_description: Option<RawMessageDescription>,
    ) -> Result<Self> {
        let message_description =
            message_description.unwrap_or_else(RawMessageDescription::from_message::<T>);
        let stream =
            slave.add_publication::<T>(hostname, name, queue_size, message_description.clone())?;

        let raii = Arc::new(InteractorRaii::new(PublisherInfo {
            master,
            slave,
            name: name.into(),
        }));

        raii.interactor
            .master
            .register_publisher(name, &message_description.msg_type)
            .map_err(|err| {
                error!("Failed to register publisher for topic '{}': {}", name, err);
                err
            })?;

        Ok(Self {
            stream,
            clock,
            seq: Arc::new(AtomicUsize::new(0)),
            _raii: raii,
        })
    }

    #[inline]
    pub fn subscriber_count(&self) -> usize {
        self.stream.subscriber_count()
    }

    #[inline]
    pub fn subscriber_names(&self) -> Vec<String> {
        self.stream.subscriber_names()
    }

    #[inline]
    pub fn set_latching(&mut self, latching: bool) {
        self.stream.set_latching(latching);
    }

    #[inline]
    pub fn set_queue_size(&mut self, queue_size: usize) {
        self.stream.set_queue_size(queue_size);
    }

    #[inline]
    pub fn send(&self, mut message: T) -> Result<()> {
        message.set_header(&self.clock, &self.seq);
        self.stream.send(&message).map_err(Into::into)
    }
}

struct PublisherInfo {
    master: Arc<Master>,
    slave: Arc<Slave>,
    name: String,
}

impl Interactor for PublisherInfo {
    fn unregister(&mut self) -> Response<()> {
        self.slave.remove_publication(&self.name);
        self.master.unregister_publisher(&self.name).map(|_| ())
    }
}

#[derive(Clone)]
pub struct Subscriber {
    info: Arc<InteractorRaii<SubscriberInfo>>,
}

impl Subscriber {
    pub(crate) fn new<T, F, G>(
        master: Arc<Master>,
        slave: Arc<Slave>,
        name: &str,
        queue_size: usize,
        on_message: F,
        on_connect: G,
    ) -> Result<Self>
    where
        T: Message,
        F: Fn(T, &str) + Send + 'static,
        G: Fn(HashMap<String, String>) + Send + 'static,
    {
        let id = slave.add_subscription::<T, F, G>(name, queue_size, on_message, on_connect)?;

        let info = Arc::new(InteractorRaii::new(SubscriberInfo {
            master,
            slave,
            name: name.into(),
            id,
        }));

        let publishers = info
            .interactor
            .master
            .register_subscriber(name, &T::msg_type())?;

        if let Err(err) = info
            .interactor
            .slave
            .add_publishers_to_subscription(name, publishers.into_iter())
        {
            error!(
                "Failed to subscribe to all publishers of topic '{}': {}",
                name, err
            );
        }

        Ok(Self { info })
    }

    #[inline]
    pub fn publisher_count(&self) -> usize {
        self.info
            .interactor
            .slave
            .get_publisher_count_of_subscription(&self.info.interactor.name)
    }

    #[inline]
    pub fn publisher_uris(&self) -> Vec<String> {
        self.info
            .interactor
            .slave
            .get_publisher_uris_of_subscription(&self.info.interactor.name)
    }
}

struct SubscriberInfo {
    master: Arc<Master>,
    slave: Arc<Slave>,
    name: String,
    id: usize,
}

impl Interactor for SubscriberInfo {
    fn unregister(&mut self) -> Response<()> {
        self.slave.remove_subscription(&self.name, self.id);
        self.master.unregister_subscriber(&self.name).map(|_| ())
    }
}

#[derive(Clone)]
pub struct Service {
    _raii: Arc<InteractorRaii<ServiceInfo>>,
}

impl Service {
    pub(crate) fn new<T, F>(
        master: Arc<Master>,
        slave: Arc<Slave>,
        hostname: &str,
        bind_address: &str,
        name: &str,
        handler: F,
    ) -> Result<Self>
    where
        T: ServicePair,
        F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
    {
        let api = slave.add_service::<T, F>(hostname, bind_address, name, handler)?;

        let raii = Arc::new(InteractorRaii::new(ServiceInfo {
            master,
            slave,
            api,
            name: name.into(),
        }));

        raii.interactor
            .master
            .register_service(name, &raii.interactor.api)?;
        Ok(Self { _raii: raii })
    }
}

struct ServiceInfo {
    master: Arc<Master>,
    slave: Arc<Slave>,
    name: String,
    api: String,
}

impl Interactor for ServiceInfo {
    fn unregister(&mut self) -> Response<()> {
        self.slave.remove_service(&self.name);
        self.master
            .unregister_service(&self.name, &self.api)
            .map(|_| ())
    }
}

trait Interactor {
    fn unregister(&mut self) -> Response<()>;
}

struct InteractorRaii<I: Interactor> {
    pub interactor: I,
}

impl<I: Interactor> InteractorRaii<I> {
    pub fn new(interactor: I) -> InteractorRaii<I> {
        Self { interactor }
    }
}

impl<I: Interactor> Drop for InteractorRaii<I> {
    fn drop(&mut self) {
        if let Err(e) = self.interactor.unregister() {
            error!("Error while unloading: {:?}", e);
        }
    }
}