1use std::{
2 collections::{btree_map::Entry, BTreeMap},
3 marker::PhantomData,
4 sync::Mutex,
5};
6
7#[allow(unused_imports)]
8use log::{debug, error, info, warn};
9use futures::{pin_mut, stream::StreamExt};
10use rustdds::dds::{ReadError, ReadResult, WriteError, WriteResult};
11pub use action_msgs::{CancelGoalRequest, GoalId, GoalInfo, GoalStatusEnum};
12
13use crate::{
14 action_msgs, builtin_interfaces,
15 message::Message,
16 names::Name,
17 service::{request_id::RmwRequestId, AService, Server},
18 Publisher,
19};
20use super::{
21 ActionTypes, FeedbackMessage, GetResultRequest, GetResultResponse, SendGoalRequest,
22 SendGoalResponse,
23};
24
25pub struct ActionServer<A>
28where
29 A: ActionTypes,
30 A::GoalType: Message + Clone,
31 A::ResultType: Message + Clone,
32 A::FeedbackType: Message,
33{
34 pub(crate) my_goal_server: Server<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>>,
35
36 pub(crate) my_cancel_server:
37 Server<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>>,
38
39 pub(crate) my_result_server: Server<AService<GetResultRequest, GetResultResponse<A::ResultType>>>,
40
41 pub(crate) my_feedback_publisher: Publisher<FeedbackMessage<A::FeedbackType>>,
42
43 pub(crate) my_status_publisher: Publisher<action_msgs::GoalStatusArray>,
44
45 pub(crate) my_action_name: Name,
46}
47
48impl<A> ActionServer<A>
49where
50 A: ActionTypes,
51 A::GoalType: Message + Clone,
52 A::ResultType: Message + Clone,
53 A::FeedbackType: Message,
54{
55 pub fn name(&self) -> &Name {
56 &self.my_action_name
57 }
58
59 pub fn goal_server(
60 &mut self,
61 ) -> &mut Server<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>> {
62 &mut self.my_goal_server
63 }
64 pub fn cancel_server(
65 &mut self,
66 ) -> &mut Server<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>> {
67 &mut self.my_cancel_server
68 }
69 pub fn result_server(
70 &mut self,
71 ) -> &mut Server<AService<GetResultRequest, GetResultResponse<A::ResultType>>> {
72 &mut self.my_result_server
73 }
74 pub fn feedback_publisher(&mut self) -> &mut Publisher<FeedbackMessage<A::FeedbackType>> {
75 &mut self.my_feedback_publisher
76 }
77 pub fn my_status_publisher(&mut self) -> &mut Publisher<action_msgs::GoalStatusArray> {
78 &mut self.my_status_publisher
79 }
80
81 pub fn receive_goal(&self) -> ReadResult<Option<(RmwRequestId, SendGoalRequest<A::GoalType>)>>
83 where
84 <A as ActionTypes>::GoalType: 'static,
85 {
86 self.my_goal_server.receive_request()
87 }
88
89 pub fn send_goal_response(
91 &self,
92 req_id: RmwRequestId,
93 resp: SendGoalResponse,
94 ) -> WriteResult<(), ()>
95 where
96 <A as ActionTypes>::GoalType: 'static,
97 {
98 self.my_goal_server.send_response(req_id, resp)
99 }
100
101 pub fn receive_cancel_request(
103 &self,
104 ) -> ReadResult<Option<(RmwRequestId, action_msgs::CancelGoalRequest)>> {
105 self.my_cancel_server.receive_request()
106 }
107
108 pub fn send_cancel_response(
110 &self,
111 req_id: RmwRequestId,
112 resp: action_msgs::CancelGoalResponse,
113 ) -> WriteResult<(), ()> {
114 self.my_cancel_server.send_response(req_id, resp)
115 }
116
117 pub fn receive_result_request(&self) -> ReadResult<Option<(RmwRequestId, GetResultRequest)>>
118 where
119 <A as ActionTypes>::ResultType: 'static,
120 {
121 self.my_result_server.receive_request()
122 }
123
124 pub fn send_result(
125 &self,
126 result_request_id: RmwRequestId,
127 resp: GetResultResponse<A::ResultType>,
128 ) -> WriteResult<(), ()>
129 where
130 <A as ActionTypes>::ResultType: 'static,
131 {
132 self.my_result_server.send_response(result_request_id, resp)
133 }
134
135 pub fn send_feedback(
136 &self,
137 goal_id: GoalId,
138 feedback: A::FeedbackType,
139 ) -> WriteResult<(), FeedbackMessage<A::FeedbackType>> {
140 self
141 .my_feedback_publisher
142 .publish(FeedbackMessage { goal_id, feedback })
143 }
144
145 pub fn send_goal_statuses(
147 &self,
148 goal_statuses: action_msgs::GoalStatusArray,
149 ) -> WriteResult<(), action_msgs::GoalStatusArray> {
150 self.my_status_publisher.publish(goal_statuses)
151 }
152} #[derive(Debug, Clone)]
156struct AsyncGoal<A>
157where
158 A: ActionTypes,
159{
160 status: GoalStatusEnum,
161 accepted_time: Option<builtin_interfaces::Time>,
162 goal: A::GoalType,
163}
164
165#[derive(Clone, Copy)]
166struct InnerGoalHandle<G> {
167 goal_id: GoalId,
168 phantom: PhantomData<G>,
169}
170
171#[derive(Clone, Copy)]
188pub struct NewGoalHandle<G> {
189 inner: InnerGoalHandle<G>,
190 req_id: RmwRequestId,
191}
192
193impl<G> NewGoalHandle<G> {
194 pub fn goal_id(&self) -> GoalId {
195 self.inner.goal_id
196 }
197}
198
199#[derive(Clone, Copy)]
201pub struct AcceptedGoalHandle<G> {
202 inner: InnerGoalHandle<G>,
203}
204
205impl<G> AcceptedGoalHandle<G> {
206 pub fn goal_id(&self) -> GoalId {
207 self.inner.goal_id
208 }
209}
210
211#[derive(Clone, Copy)]
213pub struct ExecutingGoalHandle<G> {
214 inner: InnerGoalHandle<G>,
215}
216
217impl<G> ExecutingGoalHandle<G> {
218 pub fn goal_id(&self) -> GoalId {
219 self.inner.goal_id
220 }
221}
222
223pub struct CancelHandle {
226 req_id: RmwRequestId,
227 goals: Vec<GoalId>,
228}
229
230impl CancelHandle {
231 pub fn goals(&self) -> impl Iterator<Item = GoalId> + '_ {
232 self.goals.iter().cloned()
233 }
234 pub fn contains_goal(&self, goal_id: &GoalId) -> bool {
235 self.goals.contains(goal_id)
236 }
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum GoalEndStatus {
242 Succeeded,
243 Aborted,
244 Canceled,
245}
246
247#[derive(Debug)]
249pub enum GoalError<T> {
250 NoSuchGoal,
251 WrongGoalState,
252 DDSReadError(ReadError),
253 DDSWriteError(WriteError<T>),
254}
255
256impl<T> From<ReadError> for GoalError<T> {
257 fn from(e: ReadError) -> Self {
258 GoalError::DDSReadError(e)
259 }
260}
261impl<T> From<WriteError<T>> for GoalError<T> {
262 fn from(e: WriteError<T>) -> Self {
263 GoalError::DDSWriteError(e)
264 }
265}
266
267pub struct AsyncActionServer<A>
269where
270 A: ActionTypes,
271 A::GoalType: Message + Clone,
272 A::ResultType: Message + Clone,
273 A::FeedbackType: Message,
274{
275 actionserver: ActionServer<A>,
276 goals: Mutex<BTreeMap<GoalId, AsyncGoal<A>>>,
278 finished_goals: Mutex<Vec<GoalId>>,
279 result_requests: Mutex<BTreeMap<GoalId, RmwRequestId>>,
280}
281
282const FINISHED_GOAL_BUFFER_SIZE: usize = 2;
283
284impl<A> AsyncActionServer<A>
285where
286 A: ActionTypes,
287 A::GoalType: Message + Clone,
288 A::ResultType: Message + Clone,
289 A::FeedbackType: Message,
290{
291 pub fn new(actionserver: ActionServer<A>) -> Self {
292 AsyncActionServer::<A> {
293 actionserver,
294 goals: Mutex::new(BTreeMap::new()),
295 finished_goals: Mutex::new(Vec::with_capacity(FINISHED_GOAL_BUFFER_SIZE)),
296 result_requests: Mutex::new(BTreeMap::new()),
297 }
298 }
299
300 fn mark_goal_finished(&self, goal_id: GoalId) {
301 self.finished_goals.lock().unwrap().push(goal_id);
302 }
303
304 fn flush_finisehd_goals(&self) {
305 let mut goals_guard = self.goals.lock().unwrap();
306 for g in self.finished_goals.lock().unwrap().drain(0..) {
307 goals_guard
308 .remove(&g)
309 .inspect(|goal| {
310 let looks_done = goal.status == GoalStatusEnum::Succeeded
311 || goal.status == GoalStatusEnum::Aborted
312 || goal.status == GoalStatusEnum::Canceled;
313 debug_assert!(
314 looks_done,
315 "Flushing goal with wrong status: {:?}",
316 goal.status
317 )
318 })
319 .or_else(|| {
320 error!("We seem to have lost goal {g:?}");
321 None
322 });
323 }
324 }
325
326 pub fn get_new_goal(&self, handle: NewGoalHandle<A::GoalType>) -> Option<A::GoalType> {
327 self.flush_finisehd_goals();
328 self
329 .goals
330 .lock()
331 .unwrap()
332 .get(&handle.inner.goal_id)
333 .map(|ag| ag.goal.clone())
334 }
335
336 pub async fn receive_new_goal(&self) -> ReadResult<NewGoalHandle<A::GoalType>>
339 where
340 <A as ActionTypes>::GoalType: 'static,
341 {
342 let (req_id, goal_id) = loop {
343 let (req_id, goal_request) = self
344 .actionserver
345 .my_goal_server
346 .async_receive_request()
347 .await?;
348 match self.goals.lock().unwrap().entry(goal_request.goal_id) {
349 e @ Entry::Vacant(_) => {
350 e.or_insert(AsyncGoal {
351 status: GoalStatusEnum::Unknown,
352 goal: goal_request.goal,
353 accepted_time: None,
354 });
355 break (req_id, goal_request.goal_id);
356 }
357 Entry::Occupied(_) => {
358 error!(
359 "Received duplicate goal_id {:?} , req_id={:?}",
360 goal_request.goal_id, req_id
361 );
362 continue; }
364 }
365 };
366 let inner = InnerGoalHandle {
367 goal_id,
368 phantom: PhantomData,
369 };
370 Ok(NewGoalHandle { inner, req_id })
371 }
372
373 pub async fn accept_goal(
378 &self,
379 handle: NewGoalHandle<A::GoalType>,
380 ) -> Result<AcceptedGoalHandle<A::GoalType>, GoalError<()>>
381 where
382 A::GoalType: 'static,
383 {
384 let now = builtin_interfaces::Time::now();
385 let result = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
386 Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
387 Entry::Occupied(o) => match o.get() {
388 AsyncGoal {
389 status: GoalStatusEnum::Unknown,
390 ..
391 } => {
392 let mut_o = o.into_mut();
393 mut_o.status = GoalStatusEnum::Accepted;
394 mut_o.accepted_time = Some(now);
395 Ok(AcceptedGoalHandle {
396 inner: handle.inner,
397 })
398 }
399 AsyncGoal {
400 status: wrong_status,
401 ..
402 } => {
403 error!(
404 "Tried to accept goal {:?} but status was {:?}, expected Unknown.",
405 handle.inner.goal_id, wrong_status
406 );
407 Err(GoalError::WrongGoalState)
408 }
409 },
410 };
411
412 if result.is_ok() {
415 self.publish_statuses().await;
416 self.actionserver.my_goal_server.send_response(
417 handle.req_id,
418 SendGoalResponse {
419 accepted: true,
420 stamp: now,
421 },
422 )?;
423 }
424 result
425 }
426
427 pub async fn reject_goal(&self, handle: NewGoalHandle<A::GoalType>) -> Result<(), GoalError<()>>
430 where
431 A::GoalType: 'static,
432 {
433 let result = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
434 Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
435 Entry::Occupied(o) => {
436 match o.get() {
437 AsyncGoal {
438 status: GoalStatusEnum::Unknown,
439 ..
440 } => {
441 o.remove();
445 info!("Action server rejected goal {:?}", handle.goal_id());
446 Ok(())
447 }
448 AsyncGoal {
449 status: wrong_status,
450 ..
451 } => {
452 error!(
453 "Tried to reject goal {:?} but status was {:?}, expected Unknown.",
454 handle.inner.goal_id, wrong_status
455 );
456 Err(GoalError::WrongGoalState)
457 }
458 }
459 }
460 };
461
462 if result.is_ok() {
463 self.actionserver.my_goal_server.send_response(
464 handle.req_id,
465 SendGoalResponse {
466 accepted: false,
467 stamp: builtin_interfaces::Time::now(),
468 },
469 )?;
470 }
471
472 result
473 }
474
475 pub async fn start_executing_goal(
478 &self,
479 handle: AcceptedGoalHandle<A::GoalType>,
480 ) -> Result<ExecutingGoalHandle<A::GoalType>, GoalError<()>> {
481 let result = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
482 Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
483 Entry::Occupied(o) => match o.get() {
484 AsyncGoal {
485 status: GoalStatusEnum::Accepted,
486 ..
487 } => {
488 o.into_mut().status = GoalStatusEnum::Executing;
489 Ok(ExecutingGoalHandle {
490 inner: handle.inner,
491 })
492 }
493 AsyncGoal {
494 status: wrong_status,
495 ..
496 } => {
497 error!(
498 "Tried to execute goal {:?} but status was {:?}, expected Accepted.",
499 handle.inner.goal_id, wrong_status
500 );
501 Err(GoalError::WrongGoalState)
502 }
503 },
504 };
505
506 if result.is_ok() {
507 self.publish_statuses().await;
508 }
509 result
510 }
511
512 pub async fn publish_feedback(
514 &self,
515 handle: ExecutingGoalHandle<A::GoalType>,
516 feedback: A::FeedbackType,
517 ) -> Result<(), GoalError<FeedbackMessage<A::FeedbackType>>> {
518 match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
519 Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
520 Entry::Occupied(o) => match o.get() {
521 AsyncGoal {
522 status: GoalStatusEnum::Executing,
523 ..
524 } => {
525 self
526 .actionserver
527 .send_feedback(handle.inner.goal_id, feedback)?;
528 Ok(())
529 }
530 AsyncGoal {
531 status: wrong_status,
532 ..
533 } => {
534 error!(
535 "Tried publish feedback on goal {:?} but status was {:?}, expected Executing.",
536 handle.inner.goal_id, wrong_status
537 );
538 Err(GoalError::WrongGoalState)
539 }
540 },
541 }
542 }
543
544 pub async fn send_result_response(
553 &self,
554 handle: ExecutingGoalHandle<A::GoalType>,
555 result_status: GoalEndStatus,
556 result: A::ResultType,
557 ) -> Result<(), GoalError<()>>
558 where
559 A::ResultType: 'static,
560 {
561 let result_status = match result_status {
564 GoalEndStatus::Succeeded => GoalStatusEnum::Succeeded,
565 GoalEndStatus::Aborted => GoalStatusEnum::Aborted,
566 GoalEndStatus::Canceled => GoalStatusEnum::Canceled,
567 };
568
569 let req_id_opt = self
574 .result_requests
575 .lock()
576 .unwrap()
577 .get(&handle.inner.goal_id)
578 .cloned();
579 let req_id = match req_id_opt {
581 Some(req_id) => req_id,
582 None => {
583 let res_reqs = self.actionserver.my_result_server.receive_request_stream();
584 pin_mut!(res_reqs);
585 loop {
586 let (req_id, GetResultRequest { goal_id }) = res_reqs.select_next_some().await?;
588 if goal_id == handle.inner.goal_id {
589 break req_id;
590 } else {
591 self.result_requests.lock().unwrap().insert(goal_id, req_id);
592 debug!("Got result request for goal_id={goal_id:?} req_id={req_id:?}");
593 }
595 }
596 }
597 };
598 let ret_value = match self.goals.lock().unwrap().entry(handle.inner.goal_id) {
599 Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
600 Entry::Occupied(o) => {
601 match o.get() {
602 AsyncGoal {
605 status: GoalStatusEnum::Accepted,
606 ..
607 }
608 | AsyncGoal {
609 status: GoalStatusEnum::Executing,
610 ..
611 }
612 | AsyncGoal {
613 status: GoalStatusEnum::Canceling,
614 ..
615 } => {
616 o.into_mut().status = result_status;
617 self.actionserver.send_result(
618 req_id,
619 GetResultResponse {
620 status: result_status,
621 result,
622 },
623 )?;
624 debug!(
625 "Send result for goal_id={:?} req_id={:?}",
626 handle.inner.goal_id, req_id
627 );
628 Ok(())
629 }
630 AsyncGoal {
631 status: wrong_status,
632 ..
633 } => {
634 error!(
635 "Tried to finish goal {:?} but status was {:?}.",
636 handle.inner.goal_id, wrong_status
637 );
638 Err(GoalError::WrongGoalState)
639 }
640 }
641 }
642 };
643 if ret_value.is_ok() {
644 self.publish_statuses().await;
645 self.mark_goal_finished(handle.goal_id());
646 }
647 ret_value
648 }
649
650 pub async fn abort_executing_goal(
653 &self,
654 handle: ExecutingGoalHandle<A::GoalType>,
655 ) -> Result<(), GoalError<()>> {
656 self.abort_goal(handle.inner).await
657 }
658 pub async fn abort_accepted_goal(
659 &self,
660 handle: AcceptedGoalHandle<A::GoalType>,
661 ) -> Result<(), GoalError<()>> {
662 self.abort_goal(handle.inner).await
663 }
664
665 async fn abort_goal(&self, handle: InnerGoalHandle<A::GoalType>) -> Result<(), GoalError<()>> {
666 let abort_result = match self.goals.lock().unwrap().entry(handle.goal_id) {
667 Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
668 Entry::Occupied(o) => match o.get() {
669 AsyncGoal {
670 status: GoalStatusEnum::Accepted,
671 ..
672 }
673 | AsyncGoal {
674 status: GoalStatusEnum::Executing,
675 ..
676 } => {
677 o.into_mut().status = GoalStatusEnum::Aborted;
678 Ok(())
679 }
680 AsyncGoal {
681 status: wrong_status,
682 ..
683 } => {
684 error!(
685 "Tried to abort goal {:?} but status was {:?}, expected Accepted or Executing. ",
686 handle.goal_id, wrong_status
687 );
688 Err(GoalError::WrongGoalState)
689 }
690 },
691 };
692
693 if abort_result.is_ok() {
694 self.publish_statuses().await;
695 self.mark_goal_finished(handle.goal_id);
696 }
697
698 abort_result
699 }
700
701 pub async fn receive_cancel_request(&self) -> ReadResult<CancelHandle> {
706 let (req_id, CancelGoalRequest { goal_info }) = self
707 .actionserver
708 .my_cancel_server
709 .async_receive_request()
710 .await?;
711
712 #[allow(clippy::type_complexity)] let goal_filter: Box<dyn FnMut(&(&GoalId, &AsyncGoal<A>)) -> bool> = match goal_info {
714 GoalInfo {
715 goal_id: GoalId::ZERO,
716 stamp: builtin_interfaces::Time::ZERO,
717 } => Box::new(|(_, _)| true), GoalInfo {
720 goal_id: GoalId::ZERO,
721 stamp,
722 } => Box::new(move |(_, ag)| ag.accepted_time.map(|at| at < stamp).unwrap_or(false)),
723
724 GoalInfo {
725 goal_id,
726 stamp: builtin_interfaces::Time::ZERO,
727 } => Box::new(move |(g_id, _)| goal_id == **g_id),
728
729 GoalInfo { goal_id, stamp } => Box::new(move |(g_id, ag)| {
730 goal_id == **g_id || ag.accepted_time.map(move |at| at < stamp).unwrap_or(false)
731 }),
732 };
733
734 let cancel_handle = CancelHandle {
740 req_id,
741 goals: self
742 .goals
743 .lock()
744 .unwrap()
745 .iter()
746 .filter(|(_, async_goal)| {
748 async_goal.status == GoalStatusEnum::Executing
749 || async_goal.status == GoalStatusEnum::Accepted
750 })
751 .filter(goal_filter)
753 .map(|p| *p.0)
754 .collect(),
755 };
756
757 Ok(cancel_handle)
758 }
759
760 pub async fn respond_to_cancel_requests(
765 &self,
766 cancel_handle: &CancelHandle,
767 goals_to_cancel: impl Iterator<Item = GoalId>,
768 ) -> WriteResult<(), ()> {
769 let canceling_goals: Vec<GoalInfo> =
770 goals_to_cancel
771 .filter_map(|goal_id| {
772 self.goals.lock().unwrap().get(&goal_id).and_then(
773 |AsyncGoal { accepted_time, .. }| {
774 accepted_time.map(|stamp| GoalInfo { goal_id, stamp })
775 },
776 )
777 })
778 .collect();
779
780 for goal_info in &canceling_goals {
781 self
782 .goals
783 .lock()
784 .unwrap()
785 .entry(goal_info.goal_id)
786 .and_modify(|gg| gg.status = GoalStatusEnum::Canceling);
787 }
788 self.publish_statuses().await;
789
790 let response = action_msgs::CancelGoalResponse {
791 return_code: if canceling_goals.is_empty() {
792 action_msgs::CancelGoalResponseEnum::Rejected
793 } else {
794 action_msgs::CancelGoalResponseEnum::None },
796 goals_canceling: canceling_goals,
797 };
798
799 self
800 .actionserver
801 .my_cancel_server
802 .async_send_response(cancel_handle.req_id, response)
803 .await
804 }
805
806 async fn publish_statuses(&self) {
809 let goal_status_array = action_msgs::GoalStatusArray {
810 status_list: self
811 .goals
812 .lock()
813 .unwrap()
814 .iter()
815 .map(
816 |(
817 goal_id,
818 AsyncGoal {
819 status,
820 accepted_time,
821 ..
822 },
823 )| action_msgs::GoalStatus {
824 status: *status,
825 goal_info: GoalInfo {
826 goal_id: *goal_id,
827 stamp: accepted_time.unwrap_or(builtin_interfaces::Time::ZERO),
828 },
829 },
830 )
831 .collect(),
832 };
833 debug!(
834 "Reporting statuses for {:?}",
835 goal_status_array
836 .status_list
837 .iter()
838 .map(|gs| gs.goal_info.goal_id)
839 );
840 self
841 .actionserver
842 .send_goal_statuses(goal_status_array)
843 .unwrap_or_else(|e| error!("AsyncActionServer::publish_statuses: {e:?}"));
844 }
845}