ros2_client/action/
server.rs

1use std::{
2  collections::{btree_map::Entry, BTreeMap},
3  marker::PhantomData,
4  sync::Mutex,
5};
6
7#[allow(unused_imports)]
8use log::{debug, error, info, warn};
9use futures::{pin_mut, stream::StreamExt};
10use rustdds::dds::{ReadError, ReadResult, WriteError, WriteResult};
11pub use action_msgs::{CancelGoalRequest, GoalId, GoalInfo, GoalStatusEnum};
12
13use crate::{
14  action_msgs, builtin_interfaces,
15  message::Message,
16  names::Name,
17  service::{request_id::RmwRequestId, AService, Server},
18  Publisher,
19};
20use super::{
21  ActionTypes, FeedbackMessage, GetResultRequest, GetResultResponse, SendGoalRequest,
22  SendGoalResponse,
23};
24
25/// ROS 2 Action Server - Synchronous version. Please consider using
26/// `AsyncActionServer`instead.
27pub struct ActionServer<A>
28where
29  A: ActionTypes,
30  A::GoalType: Message + Clone,
31  A::ResultType: Message + Clone,
32  A::FeedbackType: Message,
33{
34  pub(crate) my_goal_server: Server<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>>,
35
36  pub(crate) my_cancel_server:
37    Server<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>>,
38
39  pub(crate) my_result_server: Server<AService<GetResultRequest, GetResultResponse<A::ResultType>>>,
40
41  pub(crate) my_feedback_publisher: Publisher<FeedbackMessage<A::FeedbackType>>,
42
43  pub(crate) my_status_publisher: Publisher<action_msgs::GoalStatusArray>,
44
45  pub(crate) my_action_name: Name,
46}
47
48impl<A> ActionServer<A>
49where
50  A: ActionTypes,
51  A::GoalType: Message + Clone,
52  A::ResultType: Message + Clone,
53  A::FeedbackType: Message,
54{
55  pub fn name(&self) -> &Name {
56    &self.my_action_name
57  }
58
59  pub fn goal_server(
60    &mut self,
61  ) -> &mut Server<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>> {
62    &mut self.my_goal_server
63  }
64  pub fn cancel_server(
65    &mut self,
66  ) -> &mut Server<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>> {
67    &mut self.my_cancel_server
68  }
69  pub fn result_server(
70    &mut self,
71  ) -> &mut Server<AService<GetResultRequest, GetResultResponse<A::ResultType>>> {
72    &mut self.my_result_server
73  }
74  pub fn feedback_publisher(&mut self) -> &mut Publisher<FeedbackMessage<A::FeedbackType>> {
75    &mut self.my_feedback_publisher
76  }
77  pub fn my_status_publisher(&mut self) -> &mut Publisher<action_msgs::GoalStatusArray> {
78    &mut self.my_status_publisher
79  }
80
81  /// Receive a new goal, if available.
82  pub fn receive_goal(&self) -> ReadResult<Option<(RmwRequestId, SendGoalRequest<A::GoalType>)>>
83  where
84    <A as ActionTypes>::GoalType: 'static,
85  {
86    self.my_goal_server.receive_request()
87  }
88
89  /// Send a response for the specified goal request
90  pub fn send_goal_response(
91    &self,
92    req_id: RmwRequestId,
93    resp: SendGoalResponse,
94  ) -> WriteResult<(), ()>
95  where
96    <A as ActionTypes>::GoalType: 'static,
97  {
98    self.my_goal_server.send_response(req_id, resp)
99  }
100
101  /// Receive a cancel request, if available.
102  pub fn receive_cancel_request(
103    &self,
104  ) -> ReadResult<Option<(RmwRequestId, action_msgs::CancelGoalRequest)>> {
105    self.my_cancel_server.receive_request()
106  }
107
108  // Respond to a received cancel request
109  pub fn send_cancel_response(
110    &self,
111    req_id: RmwRequestId,
112    resp: action_msgs::CancelGoalResponse,
113  ) -> WriteResult<(), ()> {
114    self.my_cancel_server.send_response(req_id, resp)
115  }
116
117  pub fn receive_result_request(&self) -> ReadResult<Option<(RmwRequestId, GetResultRequest)>>
118  where
119    <A as ActionTypes>::ResultType: 'static,
120  {
121    self.my_result_server.receive_request()
122  }
123
124  pub fn send_result(
125    &self,
126    result_request_id: RmwRequestId,
127    resp: GetResultResponse<A::ResultType>,
128  ) -> WriteResult<(), ()>
129  where
130    <A as ActionTypes>::ResultType: 'static,
131  {
132    self.my_result_server.send_response(result_request_id, resp)
133  }
134
135  pub fn send_feedback(
136    &self,
137    goal_id: GoalId,
138    feedback: A::FeedbackType,
139  ) -> WriteResult<(), FeedbackMessage<A::FeedbackType>> {
140    self
141      .my_feedback_publisher
142      .publish(FeedbackMessage { goal_id, feedback })
143  }
144
145  // Send the status of all known goals.
146  pub fn send_goal_statuses(
147    &self,
148    goal_statuses: action_msgs::GoalStatusArray,
149  ) -> WriteResult<(), action_msgs::GoalStatusArray> {
150    self.my_status_publisher.publish(goal_statuses)
151  }
152} // impl ActionServer
153
154// internal type to keep track of goals executing
155#[derive(Debug, Clone)]
156struct AsyncGoal<A>
157where
158  A: ActionTypes,
159{
160  status: GoalStatusEnum,
161  accepted_time: Option<builtin_interfaces::Time>,
162  goal: A::GoalType,
163}
164
165#[derive(Clone, Copy)]
166struct InnerGoalHandle<G> {
167  goal_id: GoalId,
168  phantom: PhantomData<G>,
169}
170
171/// `NewGoalHandle` is received from an action client. It can be either accepted
172/// of rejected.
173///
174/// `NewGoalHandle` , `AcceptedGoalHandle`, and `ExecutingGoalHandle` are
175/// different types because they support different operations.
176///
177/// `AcceptedGoalHandle` is the result of action server accepting a goal. The
178/// action server can either start executing, or abort the goal, if it is no
179/// longer possible.
180///
181/// `ExecutingGoalHandle` is the result of starting exeuction on a goal. It
182/// supports:
183/// * `publish_feedback` - Notify action client that goal makes progress
184/// * `send_result_response` - Send the final result of the goal to the client
185///   (even if not successful)
186/// * `abort_executing_goal` - Abort the goal, if it is no longer executable.
187#[derive(Clone, Copy)]
188pub struct NewGoalHandle<G> {
189  inner: InnerGoalHandle<G>,
190  req_id: RmwRequestId,
191}
192
193impl<G> NewGoalHandle<G> {
194  pub fn goal_id(&self) -> GoalId {
195    self.inner.goal_id
196  }
197}
198
199/// See [`NewGoalHandle`]
200#[derive(Clone, Copy)]
201pub struct AcceptedGoalHandle<G> {
202  inner: InnerGoalHandle<G>,
203}
204
205impl<G> AcceptedGoalHandle<G> {
206  pub fn goal_id(&self) -> GoalId {
207    self.inner.goal_id
208  }
209}
210
211/// See [`NewGoalHandle`]
212#[derive(Clone, Copy)]
213pub struct ExecutingGoalHandle<G> {
214  inner: InnerGoalHandle<G>,
215}
216
217impl<G> ExecutingGoalHandle<G> {
218  pub fn goal_id(&self) -> GoalId {
219    self.inner.goal_id
220  }
221}
222
223/// Handle used by action server to receive a goal cancel request from client
224/// and respond to it.
225pub struct CancelHandle {
226  req_id: RmwRequestId,
227  goals: Vec<GoalId>,
228}
229
230impl CancelHandle {
231  pub fn goals(&self) -> impl Iterator<Item = GoalId> + '_ {
232    self.goals.iter().cloned()
233  }
234  pub fn contains_goal(&self, goal_id: &GoalId) -> bool {
235    self.goals.contains(goal_id)
236  }
237}
238
239/// What was the cause of action ending
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum GoalEndStatus {
242  Succeeded,
243  Aborted,
244  Canceled,
245}
246
247/// Errors tha may occur in goal operations
248#[derive(Debug)]
249pub enum GoalError<T> {
250  NoSuchGoal,
251  WrongGoalState,
252  DDSReadError(ReadError),
253  DDSWriteError(WriteError<T>),
254}
255
256impl<T> From<ReadError> for GoalError<T> {
257  fn from(e: ReadError) -> Self {
258    GoalError::DDSReadError(e)
259  }
260}
261impl<T> From<WriteError<T>> for GoalError<T> {
262  fn from(e: WriteError<T>) -> Self {
263    GoalError::DDSWriteError(e)
264  }
265}
266
267/// ROS 2 Action Server - Asynchronous version.
268pub struct AsyncActionServer<A>
269where
270  A: ActionTypes,
271  A::GoalType: Message + Clone,
272  A::ResultType: Message + Clone,
273  A::FeedbackType: Message,
274{
275  actionserver: ActionServer<A>,
276  // goals and result_requests are protected by _synchronous_ Mutex (not async)
277  goals: Mutex<BTreeMap<GoalId, AsyncGoal<A>>>,
278  finished_goals: Mutex<Vec<GoalId>>,
279  result_requests: Mutex<BTreeMap<GoalId, RmwRequestId>>,
280}
281
282const FINISHED_GOAL_BUFFER_SIZE: usize = 2;
283
284impl<A> AsyncActionServer<A>
285where
286  A: ActionTypes,
287  A::GoalType: Message + Clone,
288  A::ResultType: Message + Clone,
289  A::FeedbackType: Message,
290{
291  pub fn new(actionserver: ActionServer<A>) -> Self {
292    AsyncActionServer::<A> {
293      actionserver,
294      goals: Mutex::new(BTreeMap::new()),
295      finished_goals: Mutex::new(Vec::with_capacity(FINISHED_GOAL_BUFFER_SIZE)),
296      result_requests: Mutex::new(BTreeMap::new()),
297    }
298  }
299
300  fn mark_goal_finished(&self, goal_id: GoalId) {
301    self.finished_goals.lock().unwrap().push(goal_id);
302  }
303
304  fn flush_finisehd_goals(&self) {
305    let mut goals_guard = self.goals.lock().unwrap();
306    for g in self.finished_goals.lock().unwrap().drain(0..) {
307      goals_guard
308        .remove(&g)
309        .inspect(|goal| {
310          let looks_done = goal.status == GoalStatusEnum::Succeeded
311            || goal.status == GoalStatusEnum::Aborted
312            || goal.status == GoalStatusEnum::Canceled;
313          debug_assert!(
314            looks_done,
315            "Flushing goal with wrong status: {:?}",
316            goal.status
317          )
318        })
319        .or_else(|| {
320          error!("We seem to have lost goal {g:?}");
321          None
322        });
323    }
324  }
325
326  pub fn get_new_goal(&self, handle: NewGoalHandle<A::GoalType>) -> Option<A::GoalType> {
327    self.flush_finisehd_goals();
328    self
329      .goals
330      .lock()
331      .unwrap()
332      .get(&handle.inner.goal_id)
333      .map(|ag| ag.goal.clone())
334  }
335
336  /// Receive a new goal from an action client.
337  /// Server should immediately either accept or reject the goal.
338  pub async fn receive_new_goal(&self) -> ReadResult<NewGoalHandle<A::GoalType>>
339  where
340    <A as ActionTypes>::GoalType: 'static,
341  {
342    let (req_id, goal_id) = loop {
343      let (req_id, goal_request) = self
344        .actionserver
345        .my_goal_server
346        .async_receive_request()
347        .await?;
348      match self.goals.lock().unwrap().entry(goal_request.goal_id) {
349        e @ Entry::Vacant(_) => {
350          e.or_insert(AsyncGoal {
351            status: GoalStatusEnum::Unknown,
352            goal: goal_request.goal,
353            accepted_time: None,
354          });
355          break (req_id, goal_request.goal_id);
356        }
357        Entry::Occupied(_) => {
358          error!(
359            "Received duplicate goal_id {:?} , req_id={:?}",
360            goal_request.goal_id, req_id
361          );
362          continue; // just discard this request
363        }
364      }
365    };
366    let inner = InnerGoalHandle {
367      goal_id,
368      phantom: PhantomData,
369    };
370    Ok(NewGoalHandle { inner, req_id })
371  }
372
373  /// Convert a newly received goal into a accepted goal, i.e. accept it
374  /// for execution later. Client will be notified of acceptance.
375  /// Note: Once the goal is accepted, the server must eventually call
376  /// `.send_result_response()` even if the goal is canceled or aborted.
377  pub async fn accept_goal(
378    &self,
379    handle: NewGoalHandle<A::GoalType>,
380  ) -> Result<AcceptedGoalHandle<A::GoalType>, GoalError<()>>
381  where
382    A::GoalType: 'static,
383  {
384    let now = builtin_interfaces::Time::now();
385    let result = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
386      Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
387      Entry::Occupied(o) => match o.get() {
388        AsyncGoal {
389          status: GoalStatusEnum::Unknown,
390          ..
391        } => {
392          let mut_o = o.into_mut();
393          mut_o.status = GoalStatusEnum::Accepted;
394          mut_o.accepted_time = Some(now);
395          Ok(AcceptedGoalHandle {
396            inner: handle.inner,
397          })
398        }
399        AsyncGoal {
400          status: wrong_status,
401          ..
402        } => {
403          error!(
404            "Tried to accept goal {:?} but status was {:?}, expected Unknown.",
405            handle.inner.goal_id, wrong_status
406          );
407          Err(GoalError::WrongGoalState)
408        }
409      },
410    };
411
412    // We do not do any async operations inside the match above,
413    // because we are holding lock to self there.
414    if result.is_ok() {
415      self.publish_statuses().await;
416      self.actionserver.my_goal_server.send_response(
417        handle.req_id,
418        SendGoalResponse {
419          accepted: true,
420          stamp: now,
421        },
422      )?;
423    }
424    result
425  }
426
427  /// Reject a received goal. Client will be notified of rejection.
428  /// Server must not process the goal further.
429  pub async fn reject_goal(&self, handle: NewGoalHandle<A::GoalType>) -> Result<(), GoalError<()>>
430  where
431    A::GoalType: 'static,
432  {
433    let result = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
434      Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
435      Entry::Occupied(o) => {
436        match o.get() {
437          AsyncGoal {
438            status: GoalStatusEnum::Unknown,
439            ..
440          } => {
441            //o.into_mut().0 = GoalStatusEnum::Rejected; -- there is no such state
442            //self.publish_statuses().await; -- this is not reported as status
443            // Instead, we just forget the goal.
444            o.remove();
445            info!("Action server rejected goal {:?}", handle.goal_id());
446            Ok(())
447          }
448          AsyncGoal {
449            status: wrong_status,
450            ..
451          } => {
452            error!(
453              "Tried to reject goal {:?} but status was {:?}, expected Unknown.",
454              handle.inner.goal_id, wrong_status
455            );
456            Err(GoalError::WrongGoalState)
457          }
458        }
459      }
460    };
461
462    if result.is_ok() {
463      self.actionserver.my_goal_server.send_response(
464        handle.req_id,
465        SendGoalResponse {
466          accepted: false,
467          stamp: builtin_interfaces::Time::now(),
468        },
469      )?;
470    }
471
472    result
473  }
474
475  /// Convert an accepted goal into a expecting goal, i.e. start the execution.
476  /// Executing goal can publish feedback.
477  pub async fn start_executing_goal(
478    &self,
479    handle: AcceptedGoalHandle<A::GoalType>,
480  ) -> Result<ExecutingGoalHandle<A::GoalType>, GoalError<()>> {
481    let result = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
482      Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
483      Entry::Occupied(o) => match o.get() {
484        AsyncGoal {
485          status: GoalStatusEnum::Accepted,
486          ..
487        } => {
488          o.into_mut().status = GoalStatusEnum::Executing;
489          Ok(ExecutingGoalHandle {
490            inner: handle.inner,
491          })
492        }
493        AsyncGoal {
494          status: wrong_status,
495          ..
496        } => {
497          error!(
498            "Tried to execute goal {:?} but status was {:?}, expected Accepted.",
499            handle.inner.goal_id, wrong_status
500          );
501          Err(GoalError::WrongGoalState)
502        }
503      },
504    };
505
506    if result.is_ok() {
507      self.publish_statuses().await;
508    }
509    result
510  }
511
512  /// Publish feedback on how the execution is proceeding.
513  pub async fn publish_feedback(
514    &self,
515    handle: ExecutingGoalHandle<A::GoalType>,
516    feedback: A::FeedbackType,
517  ) -> Result<(), GoalError<FeedbackMessage<A::FeedbackType>>> {
518    match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
519      Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
520      Entry::Occupied(o) => match o.get() {
521        AsyncGoal {
522          status: GoalStatusEnum::Executing,
523          ..
524        } => {
525          self
526            .actionserver
527            .send_feedback(handle.inner.goal_id, feedback)?;
528          Ok(())
529        }
530        AsyncGoal {
531          status: wrong_status,
532          ..
533        } => {
534          error!(
535            "Tried publish feedback on goal {:?} but status was {:?}, expected Executing.",
536            handle.inner.goal_id, wrong_status
537          );
538          Err(GoalError::WrongGoalState)
539        }
540      },
541    }
542  }
543
544  /// Notify Client that a goal end state was reached and
545  /// what was the result of the action.
546  /// This async will not resolve until the action client has requested for the
547  /// result, but the client should request the result as soon as server
548  /// accepts the goal.
549  // TODO: It is a bit silly that we have to supply a "result" even though
550  // goal got canceled. But we have to send something in the ResultResponse.
551  // And where does it say that result is not significant if cancelled or aborted?
552  pub async fn send_result_response(
553    &self,
554    handle: ExecutingGoalHandle<A::GoalType>,
555    result_status: GoalEndStatus,
556    result: A::ResultType,
557  ) -> Result<(), GoalError<()>>
558  where
559    A::ResultType: 'static,
560  {
561    // We translate from interface type to internal type to ensure that
562    // the end status is an end status and not e.g. "Accepted".
563    let result_status = match result_status {
564      GoalEndStatus::Succeeded => GoalStatusEnum::Succeeded,
565      GoalEndStatus::Aborted => GoalStatusEnum::Aborted,
566      GoalEndStatus::Canceled => GoalStatusEnum::Canceled,
567    };
568
569    // First, we must get a result request.
570    // It may already have been read or not.
571    // We will read these into a buffer, because there may be requests for
572    // other goals' results also.
573    let req_id_opt = self
574      .result_requests
575      .lock()
576      .unwrap()
577      .get(&handle.inner.goal_id)
578      .cloned();
579    // Binding `req_id_opt` above is used to drop the lock immediately.
580    let req_id = match req_id_opt {
581      Some(req_id) => req_id,
582      None => {
583        let res_reqs = self.actionserver.my_result_server.receive_request_stream();
584        pin_mut!(res_reqs);
585        loop {
586          // result request was not yet here. Keep receiving until we get it.
587          let (req_id, GetResultRequest { goal_id }) = res_reqs.select_next_some().await?;
588          if goal_id == handle.inner.goal_id {
589            break req_id;
590          } else {
591            self.result_requests.lock().unwrap().insert(goal_id, req_id);
592            debug!("Got result request for goal_id={goal_id:?} req_id={req_id:?}");
593            // and loop to wait for the next
594          }
595        }
596      }
597    };
598    let ret_value = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
599      Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
600      Entry::Occupied(o) => {
601        match o.get() {
602          // Accepted, executing, or canceling goal can be canceled or aborted
603          // TODO: Accepted goal cannot succeed, it must be executing before success.
604          AsyncGoal {
605            status: GoalStatusEnum::Accepted,
606            ..
607          }
608          | AsyncGoal {
609            status: GoalStatusEnum::Executing,
610            ..
611          }
612          | AsyncGoal {
613            status: GoalStatusEnum::Canceling,
614            ..
615          } => {
616            o.into_mut().status = result_status;
617            self.actionserver.send_result(
618              req_id,
619              GetResultResponse {
620                status: result_status,
621                result,
622              },
623            )?;
624            debug!(
625              "Send result for goal_id={:?}  req_id={:?}",
626              handle.inner.goal_id, req_id
627            );
628            Ok(())
629          }
630          AsyncGoal {
631            status: wrong_status,
632            ..
633          } => {
634            error!(
635              "Tried to finish goal {:?} but status was {:?}.",
636              handle.inner.goal_id, wrong_status
637            );
638            Err(GoalError::WrongGoalState)
639          }
640        }
641      }
642    };
643    if ret_value.is_ok() {
644      self.publish_statuses().await;
645      self.mark_goal_finished(handle.goal_id());
646    }
647    ret_value
648  }
649
650  /// Abort goal execution, because action server has determined it
651  /// cannot continue execution.
652  pub async fn abort_executing_goal(
653    &self,
654    handle: ExecutingGoalHandle<A::GoalType>,
655  ) -> Result<(), GoalError<()>> {
656    self.abort_goal(handle.inner).await
657  }
658  pub async fn abort_accepted_goal(
659    &self,
660    handle: AcceptedGoalHandle<A::GoalType>,
661  ) -> Result<(), GoalError<()>> {
662    self.abort_goal(handle.inner).await
663  }
664
665  async fn abort_goal(&self, handle: InnerGoalHandle<A::GoalType>) -> Result<(), GoalError<()>> {
666    let abort_result = match self.goals.lock().unwrap().entry(handle.goal_id) {
667      Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
668      Entry::Occupied(o) => match o.get() {
669        AsyncGoal {
670          status: GoalStatusEnum::Accepted,
671          ..
672        }
673        | AsyncGoal {
674          status: GoalStatusEnum::Executing,
675          ..
676        } => {
677          o.into_mut().status = GoalStatusEnum::Aborted;
678          Ok(())
679        }
680        AsyncGoal {
681          status: wrong_status,
682          ..
683        } => {
684          error!(
685            "Tried to abort goal {:?} but status was {:?}, expected Accepted or Executing. ",
686            handle.goal_id, wrong_status
687          );
688          Err(GoalError::WrongGoalState)
689        }
690      },
691    };
692
693    if abort_result.is_ok() {
694      self.publish_statuses().await;
695      self.mark_goal_finished(handle.goal_id);
696    }
697
698    abort_result
699  }
700
701  /// Receive a set of cancel requests from the action client.
702  /// The server should now respond either by accepting (some of) the
703  /// cancel requests or rejecting all of them. The GoalIds that are requested
704  /// to be cancelled can be currently at either accepted or executing state.
705  pub async fn receive_cancel_request(&self) -> ReadResult<CancelHandle> {
706    let (req_id, CancelGoalRequest { goal_info }) = self
707      .actionserver
708      .my_cancel_server
709      .async_receive_request()
710      .await?;
711
712    #[allow(clippy::type_complexity)] // How would you refactor this type?
713    let goal_filter: Box<dyn FnMut(&(&GoalId, &AsyncGoal<A>)) -> bool> = match goal_info {
714      GoalInfo {
715        goal_id: GoalId::ZERO,
716        stamp: builtin_interfaces::Time::ZERO,
717      } => Box::new(|(_, _)| true), // cancel all goals
718
719      GoalInfo {
720        goal_id: GoalId::ZERO,
721        stamp,
722      } => Box::new(move |(_, ag)| ag.accepted_time.map(|at| at < stamp).unwrap_or(false)),
723
724      GoalInfo {
725        goal_id,
726        stamp: builtin_interfaces::Time::ZERO,
727      } => Box::new(move |(g_id, _)| goal_id == **g_id),
728
729      GoalInfo { goal_id, stamp } => Box::new(move |(g_id, ag)| {
730        goal_id == **g_id || ag.accepted_time.map(move |at| at < stamp).unwrap_or(false)
731      }),
732    };
733
734    // TODO:
735    // Should check if the specified GoalId was unknown to us
736    // or already terminated.
737    // In those case outright send a negative response and not return to the
738    // application.
739    let cancel_handle = CancelHandle {
740      req_id,
741      goals: self
742        .goals
743        .lock()
744        .unwrap()
745        .iter()
746        // only consider goals with status Executing or Accepted for Cancel
747        .filter(|(_, async_goal)| {
748          async_goal.status == GoalStatusEnum::Executing
749            || async_goal.status == GoalStatusEnum::Accepted
750        })
751        // and then filter those that were specified by the cancel request
752        .filter(goal_filter)
753        .map(|p| *p.0)
754        .collect(),
755    };
756
757    Ok(cancel_handle)
758  }
759
760  /// Respond to action client's cancel requests.
761  /// The iterator of goals should list those GoalIds that will start canceling.
762  /// For the other GoalIds, the cancel is not accepted and they do not change
763  /// their state.
764  pub async fn respond_to_cancel_requests(
765    &self,
766    cancel_handle: &CancelHandle,
767    goals_to_cancel: impl Iterator<Item = GoalId>,
768  ) -> WriteResult<(), ()> {
769    let canceling_goals: Vec<GoalInfo> =
770      goals_to_cancel
771        .filter_map(|goal_id| {
772          self.goals.lock().unwrap().get(&goal_id).and_then(
773            |AsyncGoal { accepted_time, .. }| {
774              accepted_time.map(|stamp| GoalInfo { goal_id, stamp })
775            },
776          )
777        })
778        .collect();
779
780    for goal_info in &canceling_goals {
781      self
782        .goals
783        .lock()
784        .unwrap()
785        .entry(goal_info.goal_id)
786        .and_modify(|gg| gg.status = GoalStatusEnum::Canceling);
787    }
788    self.publish_statuses().await;
789
790    let response = action_msgs::CancelGoalResponse {
791      return_code: if canceling_goals.is_empty() {
792        action_msgs::CancelGoalResponseEnum::Rejected
793      } else {
794        action_msgs::CancelGoalResponseEnum::None // i.e. no error
795      },
796      goals_canceling: canceling_goals,
797    };
798
799    self
800      .actionserver
801      .my_cancel_server
802      .async_send_response(cancel_handle.req_id, response)
803      .await
804  }
805
806  // This function is private, because all status publishing happens automatically
807  // via goal status changes.
808  async fn publish_statuses(&self) {
809    let goal_status_array = action_msgs::GoalStatusArray {
810      status_list: self
811        .goals
812        .lock()
813        .unwrap()
814        .iter()
815        .map(
816          |(
817            goal_id,
818            AsyncGoal {
819              status,
820              accepted_time,
821              ..
822            },
823          )| action_msgs::GoalStatus {
824            status: *status,
825            goal_info: GoalInfo {
826              goal_id: *goal_id,
827              stamp: accepted_time.unwrap_or(builtin_interfaces::Time::ZERO),
828            },
829          },
830        )
831        .collect(),
832    };
833    debug!(
834      "Reporting statuses for {:?}",
835      goal_status_array
836        .status_list
837        .iter()
838        .map(|gs| gs.goal_info.goal_id)
839    );
840    self
841      .actionserver
842      .send_goal_statuses(goal_status_array)
843      .unwrap_or_else(|e| error!("AsyncActionServer::publish_statuses: {e:?}"));
844  }
845}