ros2_client/action/
client.rs

1#[allow(unused_imports)]
2use log::{debug, error, info, warn};
3use rustdds::dds::{ReadResult, WriteResult};
4pub use action_msgs::{CancelGoalRequest, CancelGoalResponse, GoalId, GoalInfo, GoalStatusEnum};
5use builtin_interfaces::Time;
6use futures::{
7  //pin_mut,
8  stream::{FusedStream, StreamExt},
9  Future,
10};
11
12use crate::{
13  action_msgs, builtin_interfaces,
14  message::Message,
15  names::Name,
16  service::{request_id::RmwRequestId, AService, CallServiceError, Client},
17  unique_identifier_msgs, Subscription,
18};
19use super::{
20  ActionTypes, FeedbackMessage, GetResultRequest, GetResultResponse, SendGoalRequest,
21  SendGoalResponse,
22};
23
24/// A client for ROS 2 Actions. Supports both sync and async operation.
25pub struct ActionClient<A>
26where
27  A: ActionTypes,
28  A::GoalType: Message + Clone,
29  A::ResultType: Message + Clone,
30  A::FeedbackType: Message,
31{
32  pub(crate) my_goal_client: Client<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>>,
33
34  pub(crate) my_cancel_client:
35    Client<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>>,
36
37  pub(crate) my_result_client: Client<AService<GetResultRequest, GetResultResponse<A::ResultType>>>,
38
39  pub(crate) my_feedback_subscription: Subscription<FeedbackMessage<A::FeedbackType>>,
40
41  pub(crate) my_status_subscription: Subscription<action_msgs::GoalStatusArray>,
42
43  pub(crate) my_action_name: Name,
44}
45
46impl<A> ActionClient<A>
47where
48  A: ActionTypes,
49  A::GoalType: Message + Clone,
50  A::ResultType: Message + Clone,
51  A::FeedbackType: Message,
52{
53  pub fn name(&self) -> &Name {
54    &self.my_action_name
55  }
56
57  pub fn goal_client(
58    &mut self,
59  ) -> &mut Client<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>> {
60    &mut self.my_goal_client
61  }
62  pub fn cancel_client(
63    &mut self,
64  ) -> &mut Client<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>> {
65    &mut self.my_cancel_client
66  }
67  pub fn result_client(
68    &mut self,
69  ) -> &mut Client<AService<GetResultRequest, GetResultResponse<A::ResultType>>> {
70    &mut self.my_result_client
71  }
72  pub fn feedback_subscription(&mut self) -> &mut Subscription<FeedbackMessage<A::FeedbackType>> {
73    &mut self.my_feedback_subscription
74  }
75  pub fn status_subscription(&mut self) -> &mut Subscription<action_msgs::GoalStatusArray> {
76    &mut self.my_status_subscription
77  }
78
79  /// Returns and id of the Request and id for the Goal.
80  /// Request id can be used to recognize correct response from Action Server.
81  /// Goal id is later used to communicate Goal status and result.
82  pub fn send_goal(&self, goal: A::GoalType) -> WriteResult<(RmwRequestId, GoalId), ()>
83  where
84    <A as ActionTypes>::GoalType: 'static,
85  {
86    let goal_id = unique_identifier_msgs::UUID::new_random();
87    self
88      .my_goal_client
89      .send_request(SendGoalRequest { goal_id, goal })
90      .map(|req_id| (req_id, goal_id))
91  }
92
93  /// Receive a response for the specified goal request, or None if response is
94  /// not yet available
95  pub fn receive_goal_response(&self, req_id: RmwRequestId) -> ReadResult<Option<SendGoalResponse>>
96  where
97    <A as ActionTypes>::GoalType: 'static,
98  {
99    loop {
100      match self.my_goal_client.receive_response() {
101        Err(e) => break Err(e),
102        Ok(None) => break Ok(None), // not yet
103        Ok(Some((incoming_req_id, resp))) if incoming_req_id == req_id =>
104        // received the expected answer
105        {
106          break Ok(Some(resp))
107        }
108        Ok(Some((incoming_req_id, _resp))) => {
109          // got someone else's answer. Try again.
110          info!("Goal Response not for us: {incoming_req_id:?} != {req_id:?}");
111          continue;
112        }
113      }
114    }
115    // We loop here to drain all the answers received so far.
116    // The mio .poll() only does not trigger again for the next item, if it has
117    // been received already.
118  }
119
120  pub async fn async_send_goal(
121    &self,
122    goal: A::GoalType,
123  ) -> Result<(GoalId, SendGoalResponse), CallServiceError<()>>
124  where
125    <A as ActionTypes>::GoalType: 'static,
126  {
127    let goal_id = unique_identifier_msgs::UUID::new_random();
128    let send_goal_response = self
129      .my_goal_client
130      .async_call_service(SendGoalRequest { goal_id, goal })
131      .await?;
132    Ok((goal_id, send_goal_response))
133  }
134
135  // From ROS2 docs:
136  // https://docs.ros2.org/foxy/api/action_msgs/srv/CancelGoal.html
137  //
138  // Cancel one or more goals with the following policy:
139  // - If the goal ID is zero and timestamp is zero, cancel all goals.
140  // - If the goal ID is zero and timestamp is not zero, cancel all goals accepted
141  //   at or before the timestamp.
142  // - If the goal ID is not zero and timestamp is zero, cancel the goal with the
143  //   given ID regardless of the time it was accepted.
144  // - If the goal ID is not zero and timestamp is not zero, cancel the goal with
145  //   the given ID and all goals accepted at or before the timestamp.
146
147  fn cancel_goal_raw(&self, goal_id: GoalId, timestamp: Time) -> WriteResult<RmwRequestId, ()> {
148    let goal_info = GoalInfo {
149      goal_id,
150      stamp: timestamp,
151    };
152    self
153      .my_cancel_client
154      .send_request(CancelGoalRequest { goal_info })
155  }
156
157  pub fn cancel_goal(&self, goal_id: GoalId) -> WriteResult<RmwRequestId, ()> {
158    self.cancel_goal_raw(goal_id, Time::ZERO)
159  }
160
161  pub fn cancel_all_goals_before(&self, timestamp: Time) -> WriteResult<RmwRequestId, ()> {
162    self.cancel_goal_raw(GoalId::ZERO, timestamp)
163  }
164
165  pub fn cancel_all_goals(&self) -> WriteResult<RmwRequestId, ()> {
166    self.cancel_goal_raw(GoalId::ZERO, Time::ZERO)
167  }
168
169  pub fn receive_cancel_response(
170    &self,
171    cancel_request_id: RmwRequestId,
172  ) -> ReadResult<Option<CancelGoalResponse>> {
173    loop {
174      match self.my_cancel_client.receive_response() {
175        Err(e) => break Err(e),
176        Ok(None) => break Ok(None), // not yet
177        Ok(Some((incoming_req_id, resp))) if incoming_req_id == cancel_request_id => {
178          break Ok(Some(resp))
179        } // received expected answer
180        Ok(Some(_)) => continue,    // got someone else's answer. Try again.
181      }
182    }
183  }
184
185  pub fn async_cancel_goal(
186    &self,
187    goal_id: GoalId,
188    timestamp: Time,
189  ) -> impl Future<Output = Result<CancelGoalResponse, CallServiceError<()>>> + '_ {
190    let goal_info = GoalInfo {
191      goal_id,
192      stamp: timestamp,
193    };
194    self
195      .my_cancel_client
196      .async_call_service(CancelGoalRequest { goal_info })
197  }
198
199  pub fn request_result(&self, goal_id: GoalId) -> WriteResult<RmwRequestId, ()>
200  where
201    <A as ActionTypes>::ResultType: 'static,
202  {
203    self
204      .my_result_client
205      .send_request(GetResultRequest { goal_id })
206  }
207
208  pub fn receive_result(
209    &self,
210    result_request_id: RmwRequestId,
211  ) -> ReadResult<Option<(GoalStatusEnum, A::ResultType)>>
212  where
213    <A as ActionTypes>::ResultType: 'static,
214  {
215    loop {
216      match self.my_result_client.receive_response() {
217        Err(e) => break Err(e),
218        Ok(None) => break Ok(None), // not yet
219        Ok(Some((incoming_req_id, GetResultResponse { status, result })))
220          if incoming_req_id == result_request_id =>
221        {
222          break Ok(Some((status, result)))
223        } // received expected answer
224        Ok(Some(_)) => continue,    // got someone else's answer. Try again.
225      }
226    }
227  }
228
229  /// Asynchronously request goal result.
230  /// Result should be requested as soon as a goal is accepted.
231  /// Result ia actually received only when Server informs that the goal has
232  /// either Succeeded, or has been Canceled or Aborted.
233  pub async fn async_request_result(
234    &self,
235    goal_id: GoalId,
236  ) -> Result<(GoalStatusEnum, A::ResultType), CallServiceError<()>>
237  where
238    <A as ActionTypes>::ResultType: 'static,
239  {
240    let GetResultResponse { status, result } = self
241      .my_result_client
242      .async_call_service(GetResultRequest { goal_id })
243      .await?;
244    Ok((status, result))
245  }
246
247  pub fn receive_feedback(&self, goal_id: GoalId) -> ReadResult<Option<A::FeedbackType>>
248  where
249    <A as ActionTypes>::FeedbackType: 'static,
250  {
251    loop {
252      match self.my_feedback_subscription.take() {
253        Err(e) => break Err(e),
254        Ok(None) => break Ok(None),
255        Ok(Some((fb_msg, _msg_info))) if fb_msg.goal_id == goal_id => {
256          break Ok(Some(fb_msg.feedback))
257        }
258        Ok(Some((fb_msg, _msg_info))) => {
259          // feedback on some other goal
260          debug!(
261            "Feedback on another goal {:?} != {:?}",
262            fb_msg.goal_id, goal_id
263          )
264        }
265      }
266    }
267  }
268
269  /// Receive asynchronous feedback stream of goal progress.
270  pub fn feedback_stream(
271    &self,
272    goal_id: GoalId,
273  ) -> impl FusedStream<Item = ReadResult<A::FeedbackType>> + '_
274  where
275    <A as ActionTypes>::FeedbackType: 'static,
276  {
277    let expected_goal_id = goal_id; // rename
278    self
279      .my_feedback_subscription
280      .async_stream()
281      .filter_map(move |result| async move {
282        match result {
283          Err(e) => Some(Err(e)),
284          Ok((FeedbackMessage { goal_id, feedback }, _msg_info)) => {
285            if goal_id == expected_goal_id {
286              Some(Ok(feedback))
287            } else {
288              debug!("Feedback for some other {goal_id:?}.");
289              None
290            }
291          }
292        }
293      })
294  }
295
296  /// Note: This does not take GoalId and will therefore report status of all
297  /// Goals.
298  pub fn receive_status(&self) -> ReadResult<Option<action_msgs::GoalStatusArray>> {
299    self
300      .my_status_subscription
301      .take()
302      .map(|r| r.map(|(gsa, _msg_info)| gsa))
303  }
304
305  pub async fn async_receive_status(&self) -> ReadResult<action_msgs::GoalStatusArray> {
306    let (m, _msg_info) = self.my_status_subscription.async_take().await?;
307    Ok(m)
308  }
309
310  /// Async Stream of status updates
311  /// Action server send updates containing status of all goals, hence an array.
312  pub fn all_statuses_stream(
313    &self,
314  ) -> impl FusedStream<Item = ReadResult<action_msgs::GoalStatusArray>> + '_ {
315    self
316      .my_status_subscription
317      .async_stream()
318      .map(|result| result.map(|(gsa, _mi)| gsa))
319  }
320
321  pub fn status_stream(
322    &self,
323    goal_id: GoalId,
324  ) -> impl FusedStream<Item = ReadResult<action_msgs::GoalStatus>> + '_ {
325    self
326      .all_statuses_stream()
327      .filter_map(move |result| async move {
328        match result {
329          Err(e) => Some(Err(e)),
330          Ok(gsa) => gsa
331            .status_list
332            .into_iter()
333            .find(|gs| gs.goal_info.goal_id == goal_id)
334            .map(Ok),
335        }
336      })
337  }
338} // impl ActionClient