1#[allow(unused_imports)]
2use log::{debug, error, info, warn};
3use rustdds::dds::{ReadResult, WriteResult};
4pub use action_msgs::{CancelGoalRequest, CancelGoalResponse, GoalId, GoalInfo, GoalStatusEnum};
5use builtin_interfaces::Time;
6use futures::{
7 stream::{FusedStream, StreamExt},
9 Future,
10};
11
12use crate::{
13 action_msgs, builtin_interfaces,
14 message::Message,
15 names::Name,
16 service::{request_id::RmwRequestId, AService, CallServiceError, Client},
17 unique_identifier_msgs, Subscription,
18};
19use super::{
20 ActionTypes, FeedbackMessage, GetResultRequest, GetResultResponse, SendGoalRequest,
21 SendGoalResponse,
22};
23
24pub struct ActionClient<A>
26where
27 A: ActionTypes,
28 A::GoalType: Message + Clone,
29 A::ResultType: Message + Clone,
30 A::FeedbackType: Message,
31{
32 pub(crate) my_goal_client: Client<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>>,
33
34 pub(crate) my_cancel_client:
35 Client<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>>,
36
37 pub(crate) my_result_client: Client<AService<GetResultRequest, GetResultResponse<A::ResultType>>>,
38
39 pub(crate) my_feedback_subscription: Subscription<FeedbackMessage<A::FeedbackType>>,
40
41 pub(crate) my_status_subscription: Subscription<action_msgs::GoalStatusArray>,
42
43 pub(crate) my_action_name: Name,
44}
45
46impl<A> ActionClient<A>
47where
48 A: ActionTypes,
49 A::GoalType: Message + Clone,
50 A::ResultType: Message + Clone,
51 A::FeedbackType: Message,
52{
53 pub fn name(&self) -> &Name {
54 &self.my_action_name
55 }
56
57 pub fn goal_client(
58 &mut self,
59 ) -> &mut Client<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>> {
60 &mut self.my_goal_client
61 }
62 pub fn cancel_client(
63 &mut self,
64 ) -> &mut Client<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>> {
65 &mut self.my_cancel_client
66 }
67 pub fn result_client(
68 &mut self,
69 ) -> &mut Client<AService<GetResultRequest, GetResultResponse<A::ResultType>>> {
70 &mut self.my_result_client
71 }
72 pub fn feedback_subscription(&mut self) -> &mut Subscription<FeedbackMessage<A::FeedbackType>> {
73 &mut self.my_feedback_subscription
74 }
75 pub fn status_subscription(&mut self) -> &mut Subscription<action_msgs::GoalStatusArray> {
76 &mut self.my_status_subscription
77 }
78
79 pub fn send_goal(&self, goal: A::GoalType) -> WriteResult<(RmwRequestId, GoalId), ()>
83 where
84 <A as ActionTypes>::GoalType: 'static,
85 {
86 let goal_id = unique_identifier_msgs::UUID::new_random();
87 self
88 .my_goal_client
89 .send_request(SendGoalRequest { goal_id, goal })
90 .map(|req_id| (req_id, goal_id))
91 }
92
93 pub fn receive_goal_response(&self, req_id: RmwRequestId) -> ReadResult<Option<SendGoalResponse>>
96 where
97 <A as ActionTypes>::GoalType: 'static,
98 {
99 loop {
100 match self.my_goal_client.receive_response() {
101 Err(e) => break Err(e),
102 Ok(None) => break Ok(None), Ok(Some((incoming_req_id, resp))) if incoming_req_id == req_id =>
104 {
106 break Ok(Some(resp))
107 }
108 Ok(Some((incoming_req_id, _resp))) => {
109 info!("Goal Response not for us: {incoming_req_id:?} != {req_id:?}");
111 continue;
112 }
113 }
114 }
115 }
119
120 pub async fn async_send_goal(
121 &self,
122 goal: A::GoalType,
123 ) -> Result<(GoalId, SendGoalResponse), CallServiceError<()>>
124 where
125 <A as ActionTypes>::GoalType: 'static,
126 {
127 let goal_id = unique_identifier_msgs::UUID::new_random();
128 let send_goal_response = self
129 .my_goal_client
130 .async_call_service(SendGoalRequest { goal_id, goal })
131 .await?;
132 Ok((goal_id, send_goal_response))
133 }
134
135 fn cancel_goal_raw(&self, goal_id: GoalId, timestamp: Time) -> WriteResult<RmwRequestId, ()> {
148 let goal_info = GoalInfo {
149 goal_id,
150 stamp: timestamp,
151 };
152 self
153 .my_cancel_client
154 .send_request(CancelGoalRequest { goal_info })
155 }
156
157 pub fn cancel_goal(&self, goal_id: GoalId) -> WriteResult<RmwRequestId, ()> {
158 self.cancel_goal_raw(goal_id, Time::ZERO)
159 }
160
161 pub fn cancel_all_goals_before(&self, timestamp: Time) -> WriteResult<RmwRequestId, ()> {
162 self.cancel_goal_raw(GoalId::ZERO, timestamp)
163 }
164
165 pub fn cancel_all_goals(&self) -> WriteResult<RmwRequestId, ()> {
166 self.cancel_goal_raw(GoalId::ZERO, Time::ZERO)
167 }
168
169 pub fn receive_cancel_response(
170 &self,
171 cancel_request_id: RmwRequestId,
172 ) -> ReadResult<Option<CancelGoalResponse>> {
173 loop {
174 match self.my_cancel_client.receive_response() {
175 Err(e) => break Err(e),
176 Ok(None) => break Ok(None), Ok(Some((incoming_req_id, resp))) if incoming_req_id == cancel_request_id => {
178 break Ok(Some(resp))
179 } Ok(Some(_)) => continue, }
182 }
183 }
184
185 pub fn async_cancel_goal(
186 &self,
187 goal_id: GoalId,
188 timestamp: Time,
189 ) -> impl Future<Output = Result<CancelGoalResponse, CallServiceError<()>>> + '_ {
190 let goal_info = GoalInfo {
191 goal_id,
192 stamp: timestamp,
193 };
194 self
195 .my_cancel_client
196 .async_call_service(CancelGoalRequest { goal_info })
197 }
198
199 pub fn request_result(&self, goal_id: GoalId) -> WriteResult<RmwRequestId, ()>
200 where
201 <A as ActionTypes>::ResultType: 'static,
202 {
203 self
204 .my_result_client
205 .send_request(GetResultRequest { goal_id })
206 }
207
208 pub fn receive_result(
209 &self,
210 result_request_id: RmwRequestId,
211 ) -> ReadResult<Option<(GoalStatusEnum, A::ResultType)>>
212 where
213 <A as ActionTypes>::ResultType: 'static,
214 {
215 loop {
216 match self.my_result_client.receive_response() {
217 Err(e) => break Err(e),
218 Ok(None) => break Ok(None), Ok(Some((incoming_req_id, GetResultResponse { status, result })))
220 if incoming_req_id == result_request_id =>
221 {
222 break Ok(Some((status, result)))
223 } Ok(Some(_)) => continue, }
226 }
227 }
228
229 pub async fn async_request_result(
234 &self,
235 goal_id: GoalId,
236 ) -> Result<(GoalStatusEnum, A::ResultType), CallServiceError<()>>
237 where
238 <A as ActionTypes>::ResultType: 'static,
239 {
240 let GetResultResponse { status, result } = self
241 .my_result_client
242 .async_call_service(GetResultRequest { goal_id })
243 .await?;
244 Ok((status, result))
245 }
246
247 pub fn receive_feedback(&self, goal_id: GoalId) -> ReadResult<Option<A::FeedbackType>>
248 where
249 <A as ActionTypes>::FeedbackType: 'static,
250 {
251 loop {
252 match self.my_feedback_subscription.take() {
253 Err(e) => break Err(e),
254 Ok(None) => break Ok(None),
255 Ok(Some((fb_msg, _msg_info))) if fb_msg.goal_id == goal_id => {
256 break Ok(Some(fb_msg.feedback))
257 }
258 Ok(Some((fb_msg, _msg_info))) => {
259 debug!(
261 "Feedback on another goal {:?} != {:?}",
262 fb_msg.goal_id, goal_id
263 )
264 }
265 }
266 }
267 }
268
269 pub fn feedback_stream(
271 &self,
272 goal_id: GoalId,
273 ) -> impl FusedStream<Item = ReadResult<A::FeedbackType>> + '_
274 where
275 <A as ActionTypes>::FeedbackType: 'static,
276 {
277 let expected_goal_id = goal_id; self
279 .my_feedback_subscription
280 .async_stream()
281 .filter_map(move |result| async move {
282 match result {
283 Err(e) => Some(Err(e)),
284 Ok((FeedbackMessage { goal_id, feedback }, _msg_info)) => {
285 if goal_id == expected_goal_id {
286 Some(Ok(feedback))
287 } else {
288 debug!("Feedback for some other {goal_id:?}.");
289 None
290 }
291 }
292 }
293 })
294 }
295
296 pub fn receive_status(&self) -> ReadResult<Option<action_msgs::GoalStatusArray>> {
299 self
300 .my_status_subscription
301 .take()
302 .map(|r| r.map(|(gsa, _msg_info)| gsa))
303 }
304
305 pub async fn async_receive_status(&self) -> ReadResult<action_msgs::GoalStatusArray> {
306 let (m, _msg_info) = self.my_status_subscription.async_take().await?;
307 Ok(m)
308 }
309
310 pub fn all_statuses_stream(
313 &self,
314 ) -> impl FusedStream<Item = ReadResult<action_msgs::GoalStatusArray>> + '_ {
315 self
316 .my_status_subscription
317 .async_stream()
318 .map(|result| result.map(|(gsa, _mi)| gsa))
319 }
320
321 pub fn status_stream(
322 &self,
323 goal_id: GoalId,
324 ) -> impl FusedStream<Item = ReadResult<action_msgs::GoalStatus>> + '_ {
325 self
326 .all_statuses_stream()
327 .filter_map(move |result| async move {
328 match result {
329 Err(e) => Some(Err(e)),
330 Ok(gsa) => gsa
331 .status_list
332 .into_iter()
333 .find(|gs| gs.goal_info.goal_id == goal_id)
334 .map(Ok),
335 }
336 })
337 }
338}