arci_ros/
rosrust_utils.rs

1use std::{
2    sync::Arc,
3    time::{Duration, SystemTime},
4};
5
6use flume::{Receiver, Sender};
7use parking_lot::Mutex;
8use rosrust::Time;
9type MessageBuffer<T> = Arc<Mutex<Option<T>>>;
10
11fn set_message_buffer<T>(buffer: &MessageBuffer<T>, message: T) {
12    buffer.lock().replace(message);
13}
14
15fn subscribe_with_message_buffer<T: rosrust::Message>(
16    topic: &str,
17    queue_size: usize,
18) -> (MessageBuffer<T>, rosrust::Subscriber) {
19    let buffer: MessageBuffer<T> = Arc::new(Mutex::new(None));
20    let buffer_for_callback = buffer.clone();
21    let subscriber = rosrust::subscribe(topic, queue_size, move |message: T| {
22        set_message_buffer(&buffer_for_callback, message);
23    })
24    .unwrap();
25    (buffer, subscriber)
26}
27
28pub struct SubscriberHandler<T> {
29    topic: String,
30    buffer: MessageBuffer<T>,
31    _subscriber: rosrust::Subscriber,
32}
33
34impl<T> SubscriberHandler<T>
35where
36    T: rosrust::Message,
37{
38    pub fn new(topic: &str, queue_size: usize) -> Self {
39        let (buffer, _subscriber) = subscribe_with_message_buffer::<T>(topic, queue_size);
40        Self {
41            topic: topic.to_string(),
42            buffer,
43            _subscriber,
44        }
45    }
46
47    pub fn take(&self) -> Result<Option<T>, arci::Error> {
48        Ok(self.buffer.lock().take())
49    }
50
51    pub fn get(&self) -> Result<Option<T>, arci::Error> {
52        Ok(self.buffer.lock().clone())
53    }
54
55    pub fn wait_message(&self, loop_millis: u64) {
56        while rosrust::is_ok() && self.get().unwrap().is_none() {
57            rosrust::ros_info!("Waiting {}", self.topic);
58            std::thread::sleep(Duration::from_millis(loop_millis));
59        }
60    }
61}
62
63pub fn wait_subscriber<T>(publisher: &rosrust::Publisher<T>)
64where
65    T: rosrust::Message,
66{
67    let rate = rosrust::rate(10.0);
68    while rosrust::is_ok() && publisher.subscriber_count() == 0 {
69        rate.sleep();
70    }
71    // one more to avoid `rostopic echo`
72    rate.sleep();
73}
74
75type ReceiverStampedBuffer<T> = Arc<Mutex<Receiver<(rosrust::Time, T)>>>;
76type SenderStampedBuffer<T> = Arc<Mutex<Sender<(rosrust::Time, T)>>>;
77
78pub struct ServiceHandler<T>
79where
80    T: rosrust::ServicePair,
81{
82    request_receiver: ReceiverStampedBuffer<T::Request>,
83    response_sender: SenderStampedBuffer<T::Response>,
84    _service_name: Arc<Mutex<String>>,
85    _server: rosrust::Service,
86}
87
88impl<T> ServiceHandler<T>
89where
90    T: rosrust::ServicePair,
91{
92    pub fn new(service_name: &str) -> Self {
93        let (request_sender, request_receiver) = flume::unbounded::<(rosrust::Time, T::Request)>();
94        let (response_sender, response_receiver) =
95            flume::unbounded::<(rosrust::Time, T::Response)>();
96
97        let request_sender = Arc::new(Mutex::new(request_sender));
98        let request_receiver = Arc::new(Mutex::new(request_receiver));
99        let response_sender = Arc::new(Mutex::new(response_sender));
100        let response_receiver = Arc::new(Mutex::new(response_receiver));
101
102        let _service_name = Arc::new(Mutex::new(service_name.to_string()));
103        let service_name_for_callback = _service_name.clone();
104
105        let _server = rosrust::service::<T, _>(&_service_name.lock(), move |req| {
106            let req_time = rosrust::now();
107            request_sender.lock().send((req_time, req)).unwrap();
108            let (res_time, response) = response_receiver.lock().recv().unwrap();
109            if res_time == req_time {
110                Ok(response)
111            } else {
112                Err(format!(
113                    "Mismatch time stamp in ServiceHandler for {}",
114                    service_name_for_callback.lock()
115                ))
116            }
117        })
118        .unwrap();
119        Self {
120            request_receiver,
121            response_sender,
122            _service_name,
123            _server,
124        }
125    }
126
127    pub fn get_request(&self, timeout_millis: u32) -> Option<(rosrust::Time, T::Request)> {
128        self.request_receiver
129            .lock()
130            .recv_timeout(Duration::from_millis(timeout_millis as u64))
131            .ok()
132    }
133
134    pub fn set_response(&self, time: rosrust::Time, res: T::Response) {
135        self.response_sender.lock().send((time, res)).unwrap();
136    }
137}
138
139pub struct ActionResultWait {
140    goal_id: String,
141    action_result_receiver: Receiver<Result<(), crate::Error>>,
142}
143
144impl ActionResultWait {
145    pub fn new(
146        goal_id: String,
147        action_result_receiver: Receiver<Result<(), crate::Error>>,
148    ) -> Self {
149        Self {
150            goal_id,
151            action_result_receiver,
152        }
153    }
154
155    pub fn wait(&mut self, timeout: Duration) -> Result<(), crate::Error> {
156        self.action_result_receiver
157            .recv_timeout(timeout)
158            .map_err(|_| crate::Error::ActionResultTimeout)?
159    }
160
161    pub fn goal_id(&self) -> &str {
162        &self.goal_id
163    }
164}
165
166#[macro_export]
167macro_rules! define_action_client {
168    ($name:ident, $namespace:path, $action_base:expr) => {
169        $crate::paste::item! {
170            pub struct $name {
171                goal_publisher: $crate::rosrust::Publisher<$namespace::[<$action_base ActionGoal>]>,
172                _result_subscriber: $crate::rosrust::Subscriber,
173                cancel_publisher: $crate::rosrust::Publisher<msg::actionlib_msgs::GoalID>,
174                goal_id_to_sender: std::sync::Arc<$crate::parking_lot::Mutex<std::collections::HashMap<String, $crate::flume::Sender<std::result::Result<(), $crate::Error>>>>>
175            }
176            impl $name {
177                pub fn new(base_topic: &str, queue_size: usize) -> Self {
178                    use std::sync::Arc;
179                    use std::collections::HashMap;
180                    use $crate::parking_lot::Mutex;
181                    use $crate::Error;
182                    use $crate::{flume, rosrust};
183                    let goal_topic = format!("{}/goal", base_topic);
184                    let cancel_topic = format!("{}/cancel", base_topic);
185                    let goal_publisher = rosrust::publish(&goal_topic, queue_size).unwrap();
186                    let goal_id_to_sender: Arc<Mutex<HashMap<String, flume::Sender<std::result::Result<(), Error>> >>> = Arc::new(Mutex::new(HashMap::new()));
187                    let goal_id_to_sender_cloned = goal_id_to_sender.clone();
188                    let _result_subscriber = rosrust::subscribe(
189                        &format!("{}/result", base_topic),
190                        queue_size, move |result: $namespace::[<$action_base ActionResult>]| {
191                            if let Some(sender) = goal_id_to_sender_cloned.lock().remove(&result.status.goal_id.id) {
192                                let _ = sender.send(
193                                // TODO more detailed error / status handling
194                                match result.status.status {
195                                    msg::actionlib_msgs::GoalStatus::SUCCEEDED => {
196                                        Ok(())
197                                    },
198                                    msg::actionlib_msgs::GoalStatus::PREEMPTED => {
199                                        Err(Error::ActionResultPreempted(format!("{:?}", result)))
200                                    },
201                                    _ => {
202                                        Err(Error::ActionResultNotSuccess(format!("{:?}", result)))
203                                    }
204                                });
205                            }
206                        }
207                    ).unwrap();
208                    let cancel_publisher =
209                        rosrust::publish(&cancel_topic, queue_size).unwrap();
210                    rosrust::ros_info!("Waiting {} ....", goal_topic);
211                    $crate::wait_subscriber(&goal_publisher);
212                    rosrust::ros_info!("Waiting {} Done", goal_topic);
213                    rosrust::ros_info!("Waiting {} ....", cancel_topic);
214                    $crate::wait_subscriber(&cancel_publisher);
215                    rosrust::ros_info!("Waiting {} Done", cancel_topic);
216                    Self {
217                        goal_publisher,
218                        _result_subscriber,
219                        cancel_publisher,
220                        goal_id_to_sender
221                     }
222                }
223                #[allow(dead_code)]
224                pub fn send_goal_and_wait(&self, goal: $namespace::[<$action_base Goal>], timeout: std::time::Duration) -> Result<(), $crate::Error> {
225                    self.send_goal(goal)?.wait(timeout)
226                }
227                #[allow(clippy::field_reassign_with_default)]
228                pub fn send_goal(&self, goal: $namespace::[<$action_base Goal>]) -> Result<$crate::ActionResultWait, $crate::Error> {
229                    use $crate::{flume, rosrust};
230                    let mut action_goal = <$namespace::[<$action_base ActionGoal>]>::default();
231                    action_goal.goal = goal;
232                    let goal_id = format!("{}-{}", rosrust::name(), rosrust::now().seconds());
233                    action_goal.goal_id.id = goal_id.clone();
234                    let (sender, receiver) = flume::unbounded();
235                    self.goal_id_to_sender.lock().insert(goal_id.clone(), sender);
236                    if self.goal_publisher.send(action_goal).is_err() {
237                        let _ = self.goal_id_to_sender.lock().remove(&goal_id);
238                        return Err($crate::Error::ActionGoalSendingFailure);
239                    }
240                    Ok($crate::ActionResultWait::new(goal_id, receiver))
241                }
242                #[allow(dead_code)]
243                pub fn cancel_goal(&self, goal_id: &str) -> Result<(), $crate::Error> {
244                    let mut goal_id_to_sender = self.goal_id_to_sender.lock();
245                    if goal_id.is_empty() {
246                        goal_id_to_sender.clear();
247                    } else {
248                        let _ = goal_id_to_sender.remove(goal_id);
249                    }
250                    if self.cancel_publisher.send(
251                    msg::actionlib_msgs::GoalID { id: goal_id.to_owned(), ..Default::default()}).is_err() {
252                        return Err($crate::Error::ActionCancelSendingFailure);
253                    }
254                    Ok(())
255                }
256                #[allow(dead_code)]
257                pub fn cancel_all_goal(&self) -> Result<(), $crate::Error> {
258                    self.cancel_goal("")
259                }
260            }
261        }
262    };
263}
264
265pub fn convert_system_time_to_ros_time(time: &SystemTime) -> Time {
266    let ros_now = rosrust::now();
267    let system_now = SystemTime::now();
268
269    // compare time to avoid SystemTimeError
270    // https://doc.rust-lang.org/std/time/struct.SystemTime.html#method.duration_since
271    if system_now < *time {
272        Time::from_nanos(
273            time.duration_since(system_now).unwrap().as_nanos() as i64 + ros_now.nanos(),
274        )
275    } else {
276        Time::from_nanos(
277            ros_now.nanos() - system_now.duration_since(*time).unwrap().as_nanos() as i64,
278        )
279    }
280}
281
282pub fn convert_ros_time_to_system_time(time: &Time) -> SystemTime {
283    let ros_now = rosrust::now();
284    let system_now = SystemTime::now();
285    let ros_time_nanos = time.nanos() as u64;
286    let ros_now_nanos = ros_now.nanos() as u64;
287    // from_nanos needs u64 as input
288    // https://doc.rust-lang.org/stable/std/time/struct.Duration.html#method.from_nanos
289    if ros_now_nanos < ros_time_nanos {
290        system_now
291            .checked_add(std::time::Duration::from_nanos(
292                ros_time_nanos - ros_now_nanos,
293            ))
294            .unwrap()
295    } else {
296        system_now
297            .checked_sub(std::time::Duration::from_nanos(
298                ros_now_nanos - ros_time_nanos,
299            ))
300            .unwrap()
301    }
302}
303
304/// # subscribe ROS message helper
305///
306/// using for inspect specific massage type.
307/// Message is displayed on screen and sent to `mpsc receiver`
308///
309/// # Panic!
310///
311/// If subscriber can't be construct, this function is panic.
312/// Or if ``Roscore`` is not up, could be panic.
313///
314pub fn subscribe_with_channel<T: rosrust::Message>(
315    topic_name: &str,
316    queue_size: usize,
317) -> (flume::Receiver<T>, rosrust::Subscriber) {
318    let (tx, rx) = flume::unbounded::<T>();
319
320    let sub = rosrust::subscribe(topic_name, queue_size, move |v: T| {
321        tx.send(v).unwrap();
322    })
323    .unwrap();
324
325    (rx, sub)
326}