rune-r2r 0.1.2

Rune bindings for ROS2.
//! Rune module for ROS2.
#![warn(missing_docs)]
#![deny(warnings)]

use ccutils::sync::ArcMutex;
use futures::{
    future::{BoxFuture, FutureExt},
    Stream,
};
use rune::{support::Result, Any, Mut};
use std::{
    pin::Pin,
    sync::OnceLock,
    thread::{self, sleep},
    time::Duration,
};

type ValueStream = Pin<Box<dyn Stream<Item = r2r::Result<serde_json::Value>> + Send>>;

static CONTEXT: OnceLock<r2r::Context> = OnceLock::new();

/// Get the ROS context
fn get_context() -> r2r::Context {
    CONTEXT
        .get_or_init(|| r2r::Context::create().unwrap())
        .clone()
}

/// Wrap ros QosProfile
#[derive(Any, Clone, PartialEq, Eq)]
#[rune(item = ::ros)]
struct QosProfile {
    qos_profile: r2r::QosProfile,
}

impl QosProfile {
    #[rune::function(path = Self::create_default)]
    pub fn create_default() -> Self {
        let qos_profile = r2r::QosProfile::default();
        Self { qos_profile }
    }
}

struct NodeInner {
    node: r2r::Node,
    spinning: bool,
}

struct ThreadHandle {
    inner: ArcMutex<NodeInner>,
    handle: thread::JoinHandle<()>,
}

impl Drop for ThreadHandle {
    fn drop(&mut self) {
        self.inner.lock().unwrap().spinning = false;
        while !self.handle.is_finished() {
            sleep(Duration::from_millis(50));
        }
    }
}

/// Interface to the rosnode
#[derive(Any, Clone)]
#[rune(item = ::ros)]
pub struct Node {
    inner: ArcMutex<NodeInner>,
    #[allow(dead_code)]
    handle: ArcMutex<ThreadHandle>,
}

/// Create a ROS node, with the given name in the given namespace.
pub fn create_node(name: &str, namespace: &str) -> Result<Node> {
    let inner: ArcMutex<NodeInner> = NodeInner {
        node: r2r::Node::create(get_context(), name, namespace)?,
        spinning: true,
    }
    .into();
    let handle = ThreadHandle {
        inner: inner.clone(),
        handle: {
            let inner = inner.clone();
            thread::spawn(move || {
                let duration = Duration::from_millis(200);
                while inner.lock().unwrap().spinning {
                    inner.lock().unwrap().node.spin_once(duration);
                    thread::sleep(duration);
                }
            })
        },
    }
    .into();
    Ok(Node { inner, handle })
}

impl Node {
    #[rune::function(path = Self::create)]
    /// Create a ROS Node with the given name and namespace
    pub fn create(name: &str, namespace: &str) -> Result<Self> {
        create_node(name, namespace)
    }
    #[rune::function]
    /// Create a publisher for the given topic and topic type
    pub(crate) fn create_publisher(
        &mut self,
        topic: &str,
        topic_type: &str,
        qos_profile: QosProfile,
    ) -> Result<Publisher> {
        let publisher = self.inner.lock().unwrap().node.create_publisher_untyped(
            topic,
            topic_type,
            qos_profile.qos_profile,
        )?;
        Ok(Publisher { publisher })
    }
    #[rune::function]
    /// Create a subscriber for the given topic and topic type
    pub(crate) fn subscribe(
        &mut self,
        topic: &str,
        topic_type: &str,
        qos_profile: QosProfile,
    ) -> Result<Subscriber> {
        let subscriber = Box::pin(self.inner.lock().unwrap().node.subscribe_untyped(
            topic,
            topic_type,
            qos_profile.qos_profile,
        )?);
        Ok(Subscriber {
            subscriber,
            topic: topic.to_string(),
        })
    }
    #[rune::function]
    /// Start the spinning thread
    pub fn spin_once(&mut self, _timeout: u64) {
        println!("`spin_once` is deprecated. The node is now spinning in its own thread.")
    }
    /// Create a ROS service client.
    #[rune::function]
    pub(crate) fn create_client(
        &mut self,
        service_name: &str,
        service_type: &str,
        qos_profile: QosProfile,
    ) -> Result<Client> {
        let client = self.inner.lock().unwrap().node.create_client_untyped(
            service_name,
            service_type,
            qos_profile.qos_profile,
        )?;
        Ok(Client { client })
    }
}

/// Interface to publisher
#[derive(Any)]
#[rune(item = ::ros)]
struct Publisher {
    publisher: r2r::PublisherUntyped,
}

impl Publisher {
    #[rune::function]
    /// Publish a message
    pub fn publish(&self, msg: rune::Value) -> Result<()> {
        Ok(self.publisher.publish(serde_json::to_value(msg)?)?)
    }
}

/// Interface to subscriber
#[derive(Any)]
#[rune(item = ::ros)]
struct Subscriber {
    subscriber: ValueStream,
    topic: String,
}

impl Subscriber {
    #[rune::function(keep, instance, path = Self::next)]
    /// Wait for the next message
    pub async fn next(mut this: Mut<Subscriber>) -> Option<Result<rune::Value>> {
        use futures::stream::StreamExt;

        this.subscriber
            .next()
            .await
            .map(|x| Ok(serde_json::from_value(x?)?))
    }
    #[rune::function]
    /// Return the name of the topic for this subscriber
    pub fn topic(&self) -> String {
        self.topic.clone()
    }
}

#[derive(Any)]
#[rune(item = ::ros)]
struct ClientRequest {
    request: BoxFuture<'static, r2r::Result<r2r::Result<serde_json::Value>>>,
}

impl ClientRequest {
    /// Wait for the next message
    #[rune::function(instance, protocol = INTO_FUTURE)]
    pub async fn into_future(self) -> Result<rune::Value> {
        Ok(serde_json::from_value(self.request.await??)?)
    }
}

/// Interface to client
#[derive(Any)]
#[rune(item = ::ros)]
struct Client {
    client: r2r::ClientUntyped,
}
impl Client {
    #[rune::function]
    /// Send a request to a service
    pub fn request(&self, request: rune::Value) -> Result<ClientRequest> {
        let request = self.client.request(serde_json::to_value(request)?)?.boxed();
        Ok(ClientRequest { request })
    }
    #[rune::function(keep, instance, path = Self::wait_for_service_availability)]
    /// Wait for service availability
    pub async fn wait_for_service_availability(this: Mut<Self>) -> Result<()> {
        let fut = r2r::Node::is_available(&this.client)?;
        Ok(fut.await?)
    }
}

/// Create the `ros` module
pub fn module() -> anyhow::Result<rune::Module> {
    let mut m = rune::Module::with_crate("ros")?;

    // QosProfile
    m.ty::<QosProfile>()?;
    m.function_meta(QosProfile::create_default)?;

    // Node
    m.ty::<Node>()?;
    m.function_meta(Node::create)?;
    m.function_meta(Node::create_client)?;
    m.function_meta(Node::create_publisher)?;
    m.function_meta(Node::subscribe)?;
    m.function_meta(Node::spin_once)?;

    // Publisher
    m.ty::<Publisher>()?;
    m.function_meta(Publisher::publish)?;

    // Subscriber
    m.ty::<Subscriber>()?;
    m.function_meta(Subscriber::next__meta)?;
    m.function_meta(Subscriber::topic)?;

    // Client
    m.ty::<Client>()?;
    m.function_meta(Client::request)?;
    m.function_meta(Client::wait_for_service_availability__meta)?;

    // ClientRequest
    m.ty::<ClientRequest>()?;
    m.function_meta(ClientRequest::into_future)?;

    Ok(m)
}

#[cfg(test)]
mod tests {
    use rune::{
        alloc::clone::TryClone,
        runtime::Args,
        support::Result,
        termcolor::{ColorChoice, StandardStream},
        Context, Diagnostics, FromValue, Hash, Options, Source, Sources, Vm,
    };
    use std::{process::Command, sync::Arc};

    struct Tester {
        rune_context: Context,
        sources: Vec<Source>,
    }
    impl Default for Tester {
        fn default() -> Tester {
            let mut rune_context = Context::with_default_modules().unwrap();
            rune_context.install(crate::module().unwrap()).unwrap();
            let sources = Default::default();
            Tester {
                rune_context,
                sources,
            }
        }
    }
    impl Tester {
        fn build(
            &self,
            script: Option<Source>,
        ) -> Result<(Arc<rune::runtime::RuntimeContext>, Arc<rune::runtime::Unit>)> {
            let mut options = Options::default();
            let mut sources = Sources::default();

            for source in self.sources.iter() {
                sources.insert(source.try_clone()?)?;
            }

            if let Some(script) = script {
                sources.insert(script)?;
                options.script(true);
            }

            let mut diagnostics = Diagnostics::new();

            let result = rune::prepare(&mut sources)
                .with_context(&self.rune_context)
                .with_diagnostics(&mut diagnostics)
                .with_options(&options)
                .build();

            if !diagnostics.is_empty() {
                let mut writer = StandardStream::stderr(ColorChoice::Always);
                diagnostics.emit(&mut writer, &sources)?;
            }

            let unit = result?;
            Ok((Arc::new(self.rune_context.runtime()?), Arc::new(unit)))
        }
        #[allow(dead_code)]
        fn call<T>(&self, name: impl AsRef<str>, args: impl Args) -> Result<T>
        where
            T: FromValue,
        {
            let (runtime, unit) = self.build(None)?;
            let mut vm = Vm::new(runtime, unit);
            let r =
                futures::executor::block_on(vm.execute([name.as_ref()], args)?.async_complete())
                    .into_result()?;
            Result::<T>::from_value(r)?
        }
        fn eval<T>(&self, source: impl AsRef<str>) -> Result<T>
        where
            T: FromValue,
        {
            let (runtime, unit) = self.build(Some(Source::memory(source)?))?;
            let mut vm = Vm::new(runtime, unit);
            let r = futures::executor::block_on(vm.execute(Hash::EMPTY, ())?.async_complete())
                .into_result()?;
            Result::<T>::from_value(r)?
        }
    }
    #[test]
    fn subcriber_publisher() {
        let tester = Tester::default();
        assert_eq!(tester.eval::<String>(r#"
            let node = ros::Node::create("rune_r2r_node", "rune_r2r_namespace")?;
            let sub = node.subscribe("message", "std_msgs/msg/String", ros::QosProfile::create_default())?;
            let publisher = node.create_publisher("message", "std_msgs/msg/String", ros::QosProfile::create_default())?;
            publisher.publish(#{ data: "hello"})?;
            let msg = sub.next().await??;
            Ok(msg.data)
        "#).unwrap(), "hello".to_string());
    }
    #[test]
    fn client() {
        let tester = Tester::default();
        let mut service = Command::new("ros2")
            .args(["run", "examples_rclcpp_minimal_service", "service_main"])
            .spawn()
            .unwrap();
        assert_eq!(tester.eval::<i64>(r#"
            let node = ros::Node::create("rune_r2r_node", "rune_r2r_namespace")?;
            let client = node.create_client("/add_two_ints", "example_interfaces/srv/AddTwoInts", ros::QosProfile::create_default())?;
            client.wait_for_service_availability().await?;
            let req = client.request(#{ a: 3, b: 7})?;
            let answer = req.await?;
            Ok(answer.sum)
        "#).unwrap(), 10);
        service.kill().unwrap();
    }
}