use crate::dataflow::{
operator::{OperatorConfig, Source},
operators::ros::*,
stream::{WriteStream, WriteStreamT},
Data, Message,
};
use serde::Deserialize;
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct FromRosOperator<T: rosrust::Message, U>
where
U: Data + for<'a> Deserialize<'a>,
{
topic: String,
from_ros_msg: Arc<dyn Fn(&T) -> Vec<Message<U>> + Send + Sync>,
}
impl<T: rosrust::Message, U> FromRosOperator<T, U>
where
U: Data + for<'a> Deserialize<'a>,
{
pub fn new<F>(topic: &str, from_ros_msg: F) -> Self
where
F: 'static + Fn(&T) -> Vec<Message<U>> + Send + Sync,
{
Self {
topic: topic.to_string(),
from_ros_msg: Arc::new(from_ros_msg),
}
}
}
impl<T: rosrust::Message, U> Source<U> for FromRosOperator<T, U>
where
U: Data + for<'a> Deserialize<'a>,
{
fn run(&mut self, config: &OperatorConfig, write_stream: &mut WriteStream<U>) {
let from_ros_msg = self.from_ros_msg.clone();
let config_clone = config.clone();
let write_stream_clone = Arc::new(Mutex::new(write_stream.clone()));
let _subscriber_raii =
rosrust::subscribe(self.topic.as_str(), ROS_QUEUE_SIZE, move |ros_msg: T| {
let erdos_msg_vec = (from_ros_msg)(&ros_msg);
for erdos_msg in erdos_msg_vec.into_iter() {
tracing::trace!(
"{}: Received and Converted {:?}",
config_clone.get_name(),
erdos_msg,
);
write_stream_clone.lock().unwrap().send(erdos_msg).unwrap();
}
})
.unwrap();
rosrust::spin();
}
}