#![warn(missing_docs)]
#![deny(warnings)]
use ccutils::sync::ArcMutex;
use futures::{
future::{BoxFuture, FutureExt},
Stream,
};
use rune::{support::Result, Any, Mut};
use std::{
pin::Pin,
sync::OnceLock,
thread::{self, sleep},
time::Duration,
};
type ValueStream = Pin<Box<dyn Stream<Item = r2r::Result<serde_json::Value>> + Send>>;
static CONTEXT: OnceLock<r2r::Context> = OnceLock::new();
fn get_context() -> r2r::Context {
CONTEXT
.get_or_init(|| r2r::Context::create().unwrap())
.clone()
}
#[derive(Any, Clone, PartialEq, Eq)]
#[rune(item = ::ros)]
struct QosProfile {
qos_profile: r2r::QosProfile,
}
impl QosProfile {
#[rune::function(path = Self::create_default)]
pub fn create_default() -> Self {
let qos_profile = r2r::QosProfile::default();
Self { qos_profile }
}
}
struct NodeInner {
node: r2r::Node,
spinning: bool,
}
struct ThreadHandle {
inner: ArcMutex<NodeInner>,
handle: thread::JoinHandle<()>,
}
impl Drop for ThreadHandle {
fn drop(&mut self) {
self.inner.lock().unwrap().spinning = false;
while !self.handle.is_finished() {
sleep(Duration::from_millis(50));
}
}
}
#[derive(Any, Clone)]
#[rune(item = ::ros)]
pub struct Node {
inner: ArcMutex<NodeInner>,
#[allow(dead_code)]
handle: ArcMutex<ThreadHandle>,
}
pub fn create_node(name: &str, namespace: &str) -> Result<Node> {
let inner: ArcMutex<NodeInner> = NodeInner {
node: r2r::Node::create(get_context(), name, namespace)?,
spinning: true,
}
.into();
let handle = ThreadHandle {
inner: inner.clone(),
handle: {
let inner = inner.clone();
thread::spawn(move || {
let duration = Duration::from_millis(200);
while inner.lock().unwrap().spinning {
inner.lock().unwrap().node.spin_once(duration);
thread::sleep(duration);
}
})
},
}
.into();
Ok(Node { inner, handle })
}
impl Node {
#[rune::function(path = Self::create)]
pub fn create(name: &str, namespace: &str) -> Result<Self> {
create_node(name, namespace)
}
#[rune::function]
pub(crate) fn create_publisher(
&mut self,
topic: &str,
topic_type: &str,
qos_profile: QosProfile,
) -> Result<Publisher> {
let publisher = self.inner.lock().unwrap().node.create_publisher_untyped(
topic,
topic_type,
qos_profile.qos_profile,
)?;
Ok(Publisher { publisher })
}
#[rune::function]
pub(crate) fn subscribe(
&mut self,
topic: &str,
topic_type: &str,
qos_profile: QosProfile,
) -> Result<Subscriber> {
let subscriber = Box::pin(self.inner.lock().unwrap().node.subscribe_untyped(
topic,
topic_type,
qos_profile.qos_profile,
)?);
Ok(Subscriber {
subscriber,
topic: topic.to_string(),
})
}
#[rune::function]
pub fn spin_once(&mut self, _timeout: u64) {
println!("`spin_once` is deprecated. The node is now spinning in its own thread.")
}
#[rune::function]
pub(crate) fn create_client(
&mut self,
service_name: &str,
service_type: &str,
qos_profile: QosProfile,
) -> Result<Client> {
let client = self.inner.lock().unwrap().node.create_client_untyped(
service_name,
service_type,
qos_profile.qos_profile,
)?;
Ok(Client { client })
}
}
#[derive(Any)]
#[rune(item = ::ros)]
struct Publisher {
publisher: r2r::PublisherUntyped,
}
impl Publisher {
#[rune::function]
pub fn publish(&self, msg: rune::Value) -> Result<()> {
Ok(self.publisher.publish(serde_json::to_value(msg)?)?)
}
}
#[derive(Any)]
#[rune(item = ::ros)]
struct Subscriber {
subscriber: ValueStream,
topic: String,
}
impl Subscriber {
#[rune::function(keep, instance, path = Self::next)]
pub async fn next(mut this: Mut<Subscriber>) -> Option<Result<rune::Value>> {
use futures::stream::StreamExt;
this.subscriber
.next()
.await
.map(|x| Ok(serde_json::from_value(x?)?))
}
#[rune::function]
pub fn topic(&self) -> String {
self.topic.clone()
}
}
#[derive(Any)]
#[rune(item = ::ros)]
struct ClientRequest {
request: BoxFuture<'static, r2r::Result<r2r::Result<serde_json::Value>>>,
}
impl ClientRequest {
#[rune::function(instance, protocol = INTO_FUTURE)]
pub async fn into_future(self) -> Result<rune::Value> {
Ok(serde_json::from_value(self.request.await??)?)
}
}
#[derive(Any)]
#[rune(item = ::ros)]
struct Client {
client: r2r::ClientUntyped,
}
impl Client {
#[rune::function]
pub fn request(&self, request: rune::Value) -> Result<ClientRequest> {
let request = self.client.request(serde_json::to_value(request)?)?.boxed();
Ok(ClientRequest { request })
}
#[rune::function(keep, instance, path = Self::wait_for_service_availability)]
pub async fn wait_for_service_availability(this: Mut<Self>) -> Result<()> {
let fut = r2r::Node::is_available(&this.client)?;
Ok(fut.await?)
}
}
pub fn module() -> anyhow::Result<rune::Module> {
let mut m = rune::Module::with_crate("ros")?;
m.ty::<QosProfile>()?;
m.function_meta(QosProfile::create_default)?;
m.ty::<Node>()?;
m.function_meta(Node::create)?;
m.function_meta(Node::create_client)?;
m.function_meta(Node::create_publisher)?;
m.function_meta(Node::subscribe)?;
m.function_meta(Node::spin_once)?;
m.ty::<Publisher>()?;
m.function_meta(Publisher::publish)?;
m.ty::<Subscriber>()?;
m.function_meta(Subscriber::next__meta)?;
m.function_meta(Subscriber::topic)?;
m.ty::<Client>()?;
m.function_meta(Client::request)?;
m.function_meta(Client::wait_for_service_availability__meta)?;
m.ty::<ClientRequest>()?;
m.function_meta(ClientRequest::into_future)?;
Ok(m)
}
#[cfg(test)]
mod tests {
use rune::{
alloc::clone::TryClone,
runtime::Args,
support::Result,
termcolor::{ColorChoice, StandardStream},
Context, Diagnostics, FromValue, Hash, Options, Source, Sources, Vm,
};
use std::{process::Command, sync::Arc};
struct Tester {
rune_context: Context,
sources: Vec<Source>,
}
impl Default for Tester {
fn default() -> Tester {
let mut rune_context = Context::with_default_modules().unwrap();
rune_context.install(crate::module().unwrap()).unwrap();
let sources = Default::default();
Tester {
rune_context,
sources,
}
}
}
impl Tester {
fn build(
&self,
script: Option<Source>,
) -> Result<(Arc<rune::runtime::RuntimeContext>, Arc<rune::runtime::Unit>)> {
let mut options = Options::default();
let mut sources = Sources::default();
for source in self.sources.iter() {
sources.insert(source.try_clone()?)?;
}
if let Some(script) = script {
sources.insert(script)?;
options.script(true);
}
let mut diagnostics = Diagnostics::new();
let result = rune::prepare(&mut sources)
.with_context(&self.rune_context)
.with_diagnostics(&mut diagnostics)
.with_options(&options)
.build();
if !diagnostics.is_empty() {
let mut writer = StandardStream::stderr(ColorChoice::Always);
diagnostics.emit(&mut writer, &sources)?;
}
let unit = result?;
Ok((Arc::new(self.rune_context.runtime()?), Arc::new(unit)))
}
#[allow(dead_code)]
fn call<T>(&self, name: impl AsRef<str>, args: impl Args) -> Result<T>
where
T: FromValue,
{
let (runtime, unit) = self.build(None)?;
let mut vm = Vm::new(runtime, unit);
let r =
futures::executor::block_on(vm.execute([name.as_ref()], args)?.async_complete())
.into_result()?;
Result::<T>::from_value(r)?
}
fn eval<T>(&self, source: impl AsRef<str>) -> Result<T>
where
T: FromValue,
{
let (runtime, unit) = self.build(Some(Source::memory(source)?))?;
let mut vm = Vm::new(runtime, unit);
let r = futures::executor::block_on(vm.execute(Hash::EMPTY, ())?.async_complete())
.into_result()?;
Result::<T>::from_value(r)?
}
}
#[test]
fn subcriber_publisher() {
let tester = Tester::default();
assert_eq!(tester.eval::<String>(r#"
let node = ros::Node::create("rune_r2r_node", "rune_r2r_namespace")?;
let sub = node.subscribe("message", "std_msgs/msg/String", ros::QosProfile::create_default())?;
let publisher = node.create_publisher("message", "std_msgs/msg/String", ros::QosProfile::create_default())?;
publisher.publish(#{ data: "hello"})?;
let msg = sub.next().await??;
Ok(msg.data)
"#).unwrap(), "hello".to_string());
}
#[test]
fn client() {
let tester = Tester::default();
let mut service = Command::new("ros2")
.args(["run", "examples_rclcpp_minimal_service", "service_main"])
.spawn()
.unwrap();
assert_eq!(tester.eval::<i64>(r#"
let node = ros::Node::create("rune_r2r_node", "rune_r2r_namespace")?;
let client = node.create_client("/add_two_ints", "example_interfaces/srv/AddTwoInts", ros::QosProfile::create_default())?;
client.wait_for_service_availability().await?;
let req = client.request(#{ a: 3, b: 7})?;
let answer = req.await?;
Ok(answer.sum)
"#).unwrap(), 10);
service.kill().unwrap();
}
}