1use std::{
2 sync::Arc,
3 time::{Duration, SystemTime},
4};
5
6use flume::{Receiver, Sender};
7use parking_lot::Mutex;
8use rosrust::Time;
9type MessageBuffer<T> = Arc<Mutex<Option<T>>>;
10
11fn set_message_buffer<T>(buffer: &MessageBuffer<T>, message: T) {
12 buffer.lock().replace(message);
13}
14
15fn subscribe_with_message_buffer<T: rosrust::Message>(
16 topic: &str,
17 queue_size: usize,
18) -> (MessageBuffer<T>, rosrust::Subscriber) {
19 let buffer: MessageBuffer<T> = Arc::new(Mutex::new(None));
20 let buffer_for_callback = buffer.clone();
21 let subscriber = rosrust::subscribe(topic, queue_size, move |message: T| {
22 set_message_buffer(&buffer_for_callback, message);
23 })
24 .unwrap();
25 (buffer, subscriber)
26}
27
28pub struct SubscriberHandler<T> {
29 topic: String,
30 buffer: MessageBuffer<T>,
31 _subscriber: rosrust::Subscriber,
32}
33
34impl<T> SubscriberHandler<T>
35where
36 T: rosrust::Message,
37{
38 pub fn new(topic: &str, queue_size: usize) -> Self {
39 let (buffer, _subscriber) = subscribe_with_message_buffer::<T>(topic, queue_size);
40 Self {
41 topic: topic.to_string(),
42 buffer,
43 _subscriber,
44 }
45 }
46
47 pub fn take(&self) -> Result<Option<T>, arci::Error> {
48 Ok(self.buffer.lock().take())
49 }
50
51 pub fn get(&self) -> Result<Option<T>, arci::Error> {
52 Ok(self.buffer.lock().clone())
53 }
54
55 pub fn wait_message(&self, loop_millis: u64) {
56 while rosrust::is_ok() && self.get().unwrap().is_none() {
57 rosrust::ros_info!("Waiting {}", self.topic);
58 std::thread::sleep(Duration::from_millis(loop_millis));
59 }
60 }
61}
62
63pub fn wait_subscriber<T>(publisher: &rosrust::Publisher<T>)
64where
65 T: rosrust::Message,
66{
67 let rate = rosrust::rate(10.0);
68 while rosrust::is_ok() && publisher.subscriber_count() == 0 {
69 rate.sleep();
70 }
71 rate.sleep();
73}
74
75type ReceiverStampedBuffer<T> = Arc<Mutex<Receiver<(rosrust::Time, T)>>>;
76type SenderStampedBuffer<T> = Arc<Mutex<Sender<(rosrust::Time, T)>>>;
77
78pub struct ServiceHandler<T>
79where
80 T: rosrust::ServicePair,
81{
82 request_receiver: ReceiverStampedBuffer<T::Request>,
83 response_sender: SenderStampedBuffer<T::Response>,
84 _service_name: Arc<Mutex<String>>,
85 _server: rosrust::Service,
86}
87
88impl<T> ServiceHandler<T>
89where
90 T: rosrust::ServicePair,
91{
92 pub fn new(service_name: &str) -> Self {
93 let (request_sender, request_receiver) = flume::unbounded::<(rosrust::Time, T::Request)>();
94 let (response_sender, response_receiver) =
95 flume::unbounded::<(rosrust::Time, T::Response)>();
96
97 let request_sender = Arc::new(Mutex::new(request_sender));
98 let request_receiver = Arc::new(Mutex::new(request_receiver));
99 let response_sender = Arc::new(Mutex::new(response_sender));
100 let response_receiver = Arc::new(Mutex::new(response_receiver));
101
102 let _service_name = Arc::new(Mutex::new(service_name.to_string()));
103 let service_name_for_callback = _service_name.clone();
104
105 let _server = rosrust::service::<T, _>(&_service_name.lock(), move |req| {
106 let req_time = rosrust::now();
107 request_sender.lock().send((req_time, req)).unwrap();
108 let (res_time, response) = response_receiver.lock().recv().unwrap();
109 if res_time == req_time {
110 Ok(response)
111 } else {
112 Err(format!(
113 "Mismatch time stamp in ServiceHandler for {}",
114 service_name_for_callback.lock()
115 ))
116 }
117 })
118 .unwrap();
119 Self {
120 request_receiver,
121 response_sender,
122 _service_name,
123 _server,
124 }
125 }
126
127 pub fn get_request(&self, timeout_millis: u32) -> Option<(rosrust::Time, T::Request)> {
128 self.request_receiver
129 .lock()
130 .recv_timeout(Duration::from_millis(timeout_millis as u64))
131 .ok()
132 }
133
134 pub fn set_response(&self, time: rosrust::Time, res: T::Response) {
135 self.response_sender.lock().send((time, res)).unwrap();
136 }
137}
138
139pub struct ActionResultWait {
140 goal_id: String,
141 action_result_receiver: Receiver<Result<(), crate::Error>>,
142}
143
144impl ActionResultWait {
145 pub fn new(
146 goal_id: String,
147 action_result_receiver: Receiver<Result<(), crate::Error>>,
148 ) -> Self {
149 Self {
150 goal_id,
151 action_result_receiver,
152 }
153 }
154
155 pub fn wait(&mut self, timeout: Duration) -> Result<(), crate::Error> {
156 self.action_result_receiver
157 .recv_timeout(timeout)
158 .map_err(|_| crate::Error::ActionResultTimeout)?
159 }
160
161 pub fn goal_id(&self) -> &str {
162 &self.goal_id
163 }
164}
165
166#[macro_export]
167macro_rules! define_action_client {
168 ($name:ident, $namespace:path, $action_base:expr) => {
169 $crate::paste::item! {
170 pub struct $name {
171 goal_publisher: $crate::rosrust::Publisher<$namespace::[<$action_base ActionGoal>]>,
172 _result_subscriber: $crate::rosrust::Subscriber,
173 cancel_publisher: $crate::rosrust::Publisher<msg::actionlib_msgs::GoalID>,
174 goal_id_to_sender: std::sync::Arc<$crate::parking_lot::Mutex<std::collections::HashMap<String, $crate::flume::Sender<std::result::Result<(), $crate::Error>>>>>
175 }
176 impl $name {
177 pub fn new(base_topic: &str, queue_size: usize) -> Self {
178 use std::sync::Arc;
179 use std::collections::HashMap;
180 use $crate::parking_lot::Mutex;
181 use $crate::Error;
182 use $crate::{flume, rosrust};
183 let goal_topic = format!("{}/goal", base_topic);
184 let cancel_topic = format!("{}/cancel", base_topic);
185 let goal_publisher = rosrust::publish(&goal_topic, queue_size).unwrap();
186 let goal_id_to_sender: Arc<Mutex<HashMap<String, flume::Sender<std::result::Result<(), Error>> >>> = Arc::new(Mutex::new(HashMap::new()));
187 let goal_id_to_sender_cloned = goal_id_to_sender.clone();
188 let _result_subscriber = rosrust::subscribe(
189 &format!("{}/result", base_topic),
190 queue_size, move |result: $namespace::[<$action_base ActionResult>]| {
191 if let Some(sender) = goal_id_to_sender_cloned.lock().remove(&result.status.goal_id.id) {
192 let _ = sender.send(
193 match result.status.status {
195 msg::actionlib_msgs::GoalStatus::SUCCEEDED => {
196 Ok(())
197 },
198 msg::actionlib_msgs::GoalStatus::PREEMPTED => {
199 Err(Error::ActionResultPreempted(format!("{:?}", result)))
200 },
201 _ => {
202 Err(Error::ActionResultNotSuccess(format!("{:?}", result)))
203 }
204 });
205 }
206 }
207 ).unwrap();
208 let cancel_publisher =
209 rosrust::publish(&cancel_topic, queue_size).unwrap();
210 rosrust::ros_info!("Waiting {} ....", goal_topic);
211 $crate::wait_subscriber(&goal_publisher);
212 rosrust::ros_info!("Waiting {} Done", goal_topic);
213 rosrust::ros_info!("Waiting {} ....", cancel_topic);
214 $crate::wait_subscriber(&cancel_publisher);
215 rosrust::ros_info!("Waiting {} Done", cancel_topic);
216 Self {
217 goal_publisher,
218 _result_subscriber,
219 cancel_publisher,
220 goal_id_to_sender
221 }
222 }
223 #[allow(dead_code)]
224 pub fn send_goal_and_wait(&self, goal: $namespace::[<$action_base Goal>], timeout: std::time::Duration) -> Result<(), $crate::Error> {
225 self.send_goal(goal)?.wait(timeout)
226 }
227 #[allow(clippy::field_reassign_with_default)]
228 pub fn send_goal(&self, goal: $namespace::[<$action_base Goal>]) -> Result<$crate::ActionResultWait, $crate::Error> {
229 use $crate::{flume, rosrust};
230 let mut action_goal = <$namespace::[<$action_base ActionGoal>]>::default();
231 action_goal.goal = goal;
232 let goal_id = format!("{}-{}", rosrust::name(), rosrust::now().seconds());
233 action_goal.goal_id.id = goal_id.clone();
234 let (sender, receiver) = flume::unbounded();
235 self.goal_id_to_sender.lock().insert(goal_id.clone(), sender);
236 if self.goal_publisher.send(action_goal).is_err() {
237 let _ = self.goal_id_to_sender.lock().remove(&goal_id);
238 return Err($crate::Error::ActionGoalSendingFailure);
239 }
240 Ok($crate::ActionResultWait::new(goal_id, receiver))
241 }
242 #[allow(dead_code)]
243 pub fn cancel_goal(&self, goal_id: &str) -> Result<(), $crate::Error> {
244 let mut goal_id_to_sender = self.goal_id_to_sender.lock();
245 if goal_id.is_empty() {
246 goal_id_to_sender.clear();
247 } else {
248 let _ = goal_id_to_sender.remove(goal_id);
249 }
250 if self.cancel_publisher.send(
251 msg::actionlib_msgs::GoalID { id: goal_id.to_owned(), ..Default::default()}).is_err() {
252 return Err($crate::Error::ActionCancelSendingFailure);
253 }
254 Ok(())
255 }
256 #[allow(dead_code)]
257 pub fn cancel_all_goal(&self) -> Result<(), $crate::Error> {
258 self.cancel_goal("")
259 }
260 }
261 }
262 };
263}
264
265pub fn convert_system_time_to_ros_time(time: &SystemTime) -> Time {
266 let ros_now = rosrust::now();
267 let system_now = SystemTime::now();
268
269 if system_now < *time {
272 Time::from_nanos(
273 time.duration_since(system_now).unwrap().as_nanos() as i64 + ros_now.nanos(),
274 )
275 } else {
276 Time::from_nanos(
277 ros_now.nanos() - system_now.duration_since(*time).unwrap().as_nanos() as i64,
278 )
279 }
280}
281
282pub fn convert_ros_time_to_system_time(time: &Time) -> SystemTime {
283 let ros_now = rosrust::now();
284 let system_now = SystemTime::now();
285 let ros_time_nanos = time.nanos() as u64;
286 let ros_now_nanos = ros_now.nanos() as u64;
287 if ros_now_nanos < ros_time_nanos {
290 system_now
291 .checked_add(std::time::Duration::from_nanos(
292 ros_time_nanos - ros_now_nanos,
293 ))
294 .unwrap()
295 } else {
296 system_now
297 .checked_sub(std::time::Duration::from_nanos(
298 ros_now_nanos - ros_time_nanos,
299 ))
300 .unwrap()
301 }
302}
303
304pub fn subscribe_with_channel<T: rosrust::Message>(
315 topic_name: &str,
316 queue_size: usize,
317) -> (flume::Receiver<T>, rosrust::Subscriber) {
318 let (tx, rx) = flume::unbounded::<T>();
319
320 let sub = rosrust::subscribe(topic_name, queue_size, move |v: T| {
321 tx.send(v).unwrap();
322 })
323 .unwrap();
324
325 (rx, sub)
326}