use std::{
collections::{btree_map::Entry, BTreeMap},
marker::PhantomData,
};
use rustdds::{
dds::{ReadError, ReadResult, WriteError, WriteResult},
*,
};
use serde::{Deserialize, Serialize};
pub use action_msgs::{CancelGoalRequest, CancelGoalResponse, GoalId, GoalInfo, GoalStatusEnum};
use builtin_interfaces::Time;
#[allow(unused_imports)]
use log::{debug, error, info, warn};
use futures::{
pin_mut,
stream::{FusedStream, Stream, StreamExt},
Future,
};
use crate::{
action_msgs, builtin_interfaces,
message::Message,
names::Name,
service::{request_id::RmwRequestId, AService, CallServiceError, Client, Server},
unique_identifier_msgs, Publisher, Subscription,
};
pub trait ActionTypes {
type GoalType: Message + Clone; type ResultType: Message + Clone; type FeedbackType: Message;
fn goal_type_name(&self) -> &str;
fn result_type_name(&self) -> &str;
fn feedback_type_name(&self) -> &str;
}
pub struct Action<G, R, F> {
g: PhantomData<G>,
r: PhantomData<R>,
f: PhantomData<F>,
goal_typename: String,
result_typename: String,
feedback_typename: String,
}
impl<G, R, F> Action<G, R, F>
where
G: Message + Clone,
R: Message + Clone,
F: Message,
{
pub fn new(goal_typename: String, result_typename: String, feedback_typename: String) -> Self {
Self {
goal_typename,
result_typename,
feedback_typename,
g: PhantomData,
r: PhantomData,
f: PhantomData,
}
}
}
impl<G, R, F> ActionTypes for Action<G, R, F>
where
G: Message + Clone,
R: Message + Clone,
F: Message,
{
type GoalType = G;
type ResultType = R;
type FeedbackType = F;
fn goal_type_name(&self) -> &str {
&self.goal_typename
}
fn result_type_name(&self) -> &str {
&self.result_typename
}
fn feedback_type_name(&self) -> &str {
&self.feedback_typename
}
}
pub struct ActionClientQosPolicies {
pub goal_service: QosPolicies,
pub result_service: QosPolicies,
pub cancel_service: QosPolicies,
pub feedback_subscription: QosPolicies,
pub status_subscription: QosPolicies,
}
pub struct ActionServerQosPolicies {
pub goal_service: QosPolicies,
pub result_service: QosPolicies,
pub cancel_service: QosPolicies,
pub feedback_publisher: QosPolicies,
pub status_publisher: QosPolicies,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SendGoalRequest<G> {
pub goal_id: GoalId,
pub goal: G,
}
impl<G: Message> Message for SendGoalRequest<G> {}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SendGoalResponse {
pub accepted: bool,
pub stamp: builtin_interfaces::Time,
}
impl Message for SendGoalResponse {}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct GetResultRequest {
pub goal_id: GoalId,
}
impl Message for GetResultRequest {}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct GetResultResponse<R> {
pub status: GoalStatusEnum, pub result: R,
}
impl<R: Message> Message for GetResultResponse<R> {}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct FeedbackMessage<F> {
pub goal_id: GoalId,
pub feedback: F,
}
impl<F: Message> Message for FeedbackMessage<F> {}
pub struct ActionClient<A>
where
A: ActionTypes,
A::GoalType: Message + Clone,
A::ResultType: Message + Clone,
A::FeedbackType: Message,
{
pub(crate) my_goal_client: Client<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>>,
pub(crate) my_cancel_client:
Client<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>>,
pub(crate) my_result_client: Client<AService<GetResultRequest, GetResultResponse<A::ResultType>>>,
pub(crate) my_feedback_subscription: Subscription<FeedbackMessage<A::FeedbackType>>,
pub(crate) my_status_subscription: Subscription<action_msgs::GoalStatusArray>,
pub(crate) my_action_name: Name,
}
impl<A> ActionClient<A>
where
A: ActionTypes,
A::GoalType: Message + Clone,
A::ResultType: Message + Clone,
A::FeedbackType: Message,
{
pub fn name(&self) -> &Name {
&self.my_action_name
}
pub fn goal_client(
&mut self,
) -> &mut Client<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>> {
&mut self.my_goal_client
}
pub fn cancel_client(
&mut self,
) -> &mut Client<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>> {
&mut self.my_cancel_client
}
pub fn result_client(
&mut self,
) -> &mut Client<AService<GetResultRequest, GetResultResponse<A::ResultType>>> {
&mut self.my_result_client
}
pub fn feedback_subscription(&mut self) -> &mut Subscription<FeedbackMessage<A::FeedbackType>> {
&mut self.my_feedback_subscription
}
pub fn status_subscription(&mut self) -> &mut Subscription<action_msgs::GoalStatusArray> {
&mut self.my_status_subscription
}
pub fn send_goal(&self, goal: A::GoalType) -> WriteResult<(RmwRequestId, GoalId), ()>
where
<A as ActionTypes>::GoalType: 'static,
{
let goal_id = unique_identifier_msgs::UUID::new_random();
self
.my_goal_client
.send_request(SendGoalRequest { goal_id, goal })
.map(|req_id| (req_id, goal_id))
}
pub fn receive_goal_response(&self, req_id: RmwRequestId) -> ReadResult<Option<SendGoalResponse>>
where
<A as ActionTypes>::GoalType: 'static,
{
loop {
match self.my_goal_client.receive_response() {
Err(e) => break Err(e),
Ok(None) => break Ok(None), Ok(Some((incoming_req_id, resp))) if incoming_req_id == req_id =>
{
break Ok(Some(resp))
}
Ok(Some((incoming_req_id, _resp))) => {
info!(
"Goal Response not for us: {:?} != {:?}",
incoming_req_id, req_id
);
continue;
}
}
}
}
pub async fn async_send_goal(
&self,
goal: A::GoalType,
) -> Result<(GoalId, SendGoalResponse), CallServiceError<()>>
where
<A as ActionTypes>::GoalType: 'static,
{
let goal_id = unique_identifier_msgs::UUID::new_random();
let send_goal_response = self
.my_goal_client
.async_call_service(SendGoalRequest { goal_id, goal })
.await?;
Ok((goal_id, send_goal_response))
}
fn cancel_goal_raw(&self, goal_id: GoalId, timestamp: Time) -> WriteResult<RmwRequestId, ()> {
let goal_info = GoalInfo {
goal_id,
stamp: timestamp,
};
self
.my_cancel_client
.send_request(CancelGoalRequest { goal_info })
}
pub fn cancel_goal(&self, goal_id: GoalId) -> WriteResult<RmwRequestId, ()> {
self.cancel_goal_raw(goal_id, Time::ZERO)
}
pub fn cancel_all_goals_before(&self, timestamp: Time) -> WriteResult<RmwRequestId, ()> {
self.cancel_goal_raw(GoalId::ZERO, timestamp)
}
pub fn cancel_all_goals(&self) -> WriteResult<RmwRequestId, ()> {
self.cancel_goal_raw(GoalId::ZERO, Time::ZERO)
}
pub fn receive_cancel_response(
&self,
cancel_request_id: RmwRequestId,
) -> ReadResult<Option<CancelGoalResponse>> {
loop {
match self.my_cancel_client.receive_response() {
Err(e) => break Err(e),
Ok(None) => break Ok(None), Ok(Some((incoming_req_id, resp))) if incoming_req_id == cancel_request_id => {
break Ok(Some(resp))
} Ok(Some(_)) => continue, }
}
}
pub fn async_cancel_goal(
&self,
goal_id: GoalId,
timestamp: Time,
) -> impl Future<Output = Result<CancelGoalResponse, CallServiceError<()>>> + '_ {
let goal_info = GoalInfo {
goal_id,
stamp: timestamp,
};
self
.my_cancel_client
.async_call_service(CancelGoalRequest { goal_info })
}
pub fn request_result(&self, goal_id: GoalId) -> WriteResult<RmwRequestId, ()>
where
<A as ActionTypes>::ResultType: 'static,
{
self
.my_result_client
.send_request(GetResultRequest { goal_id })
}
pub fn receive_result(
&self,
result_request_id: RmwRequestId,
) -> ReadResult<Option<(GoalStatusEnum, A::ResultType)>>
where
<A as ActionTypes>::ResultType: 'static,
{
loop {
match self.my_result_client.receive_response() {
Err(e) => break Err(e),
Ok(None) => break Ok(None), Ok(Some((incoming_req_id, GetResultResponse { status, result })))
if incoming_req_id == result_request_id =>
{
break Ok(Some((status, result)))
} Ok(Some(_)) => continue, }
}
}
pub async fn async_request_result(
&self,
goal_id: GoalId,
) -> Result<(GoalStatusEnum, A::ResultType), CallServiceError<()>>
where
<A as ActionTypes>::ResultType: 'static,
{
let GetResultResponse { status, result } = self
.my_result_client
.async_call_service(GetResultRequest { goal_id })
.await?;
Ok((status, result))
}
pub fn receive_feedback(&self, goal_id: GoalId) -> ReadResult<Option<A::FeedbackType>>
where
<A as ActionTypes>::FeedbackType: 'static,
{
loop {
match self.my_feedback_subscription.take() {
Err(e) => break Err(e),
Ok(None) => break Ok(None),
Ok(Some((fb_msg, _msg_info))) if fb_msg.goal_id == goal_id => {
break Ok(Some(fb_msg.feedback))
}
Ok(Some((fb_msg, _msg_info))) => {
debug!(
"Feedback on another goal {:?} != {:?}",
fb_msg.goal_id, goal_id
)
}
}
}
}
pub fn feedback_stream(
&self,
goal_id: GoalId,
) -> impl Stream<Item = ReadResult<A::FeedbackType>> + FusedStream + '_
where
<A as ActionTypes>::FeedbackType: 'static,
{
let expected_goal_id = goal_id; self
.my_feedback_subscription
.async_stream()
.filter_map(move |result| async move {
match result {
Err(e) => Some(Err(e)),
Ok((FeedbackMessage { goal_id, feedback }, _msg_info)) => {
if goal_id == expected_goal_id {
Some(Ok(feedback))
} else {
debug!("Feedback for some other {:?}.", goal_id);
None
}
}
}
})
}
pub fn receive_status(&self) -> ReadResult<Option<action_msgs::GoalStatusArray>> {
self
.my_status_subscription
.take()
.map(|r| r.map(|(gsa, _msg_info)| gsa))
}
pub async fn async_receive_status(&self) -> ReadResult<action_msgs::GoalStatusArray> {
let (m, _msg_info) = self.my_status_subscription.async_take().await?;
Ok(m)
}
pub fn all_statuses_stream(
&self,
) -> impl Stream<Item = ReadResult<action_msgs::GoalStatusArray>> + FusedStream + '_ {
self
.my_status_subscription
.async_stream()
.map(|result| result.map(|(gsa, _mi)| gsa))
}
pub fn status_stream(
&self,
goal_id: GoalId,
) -> impl Stream<Item = ReadResult<action_msgs::GoalStatus>> + FusedStream + '_ {
self
.all_statuses_stream()
.filter_map(move |result| async move {
match result {
Err(e) => Some(Err(e)),
Ok(gsa) => gsa
.status_list
.into_iter()
.find(|gs| gs.goal_info.goal_id == goal_id)
.map(Ok),
}
})
}
}
pub struct ActionServer<A>
where
A: ActionTypes,
A::GoalType: Message + Clone,
A::ResultType: Message + Clone,
A::FeedbackType: Message,
{
pub(crate) my_goal_server: Server<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>>,
pub(crate) my_cancel_server:
Server<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>>,
pub(crate) my_result_server: Server<AService<GetResultRequest, GetResultResponse<A::ResultType>>>,
pub(crate) my_feedback_publisher: Publisher<FeedbackMessage<A::FeedbackType>>,
pub(crate) my_status_publisher: Publisher<action_msgs::GoalStatusArray>,
pub(crate) my_action_name: Name,
}
impl<A> ActionServer<A>
where
A: ActionTypes,
A::GoalType: Message + Clone,
A::ResultType: Message + Clone,
A::FeedbackType: Message,
{
pub fn name(&self) -> &Name {
&self.my_action_name
}
pub fn goal_server(
&mut self,
) -> &mut Server<AService<SendGoalRequest<A::GoalType>, SendGoalResponse>> {
&mut self.my_goal_server
}
pub fn cancel_server(
&mut self,
) -> &mut Server<AService<action_msgs::CancelGoalRequest, action_msgs::CancelGoalResponse>> {
&mut self.my_cancel_server
}
pub fn result_server(
&mut self,
) -> &mut Server<AService<GetResultRequest, GetResultResponse<A::ResultType>>> {
&mut self.my_result_server
}
pub fn feedback_publisher(&mut self) -> &mut Publisher<FeedbackMessage<A::FeedbackType>> {
&mut self.my_feedback_publisher
}
pub fn my_status_publisher(&mut self) -> &mut Publisher<action_msgs::GoalStatusArray> {
&mut self.my_status_publisher
}
pub fn receive_goal(&self) -> ReadResult<Option<(RmwRequestId, SendGoalRequest<A::GoalType>)>>
where
<A as ActionTypes>::GoalType: 'static,
{
self.my_goal_server.receive_request()
}
pub fn send_goal_response(
&self,
req_id: RmwRequestId,
resp: SendGoalResponse,
) -> WriteResult<(), ()>
where
<A as ActionTypes>::GoalType: 'static,
{
self.my_goal_server.send_response(req_id, resp)
}
pub fn receive_cancel_request(
&self,
) -> ReadResult<Option<(RmwRequestId, action_msgs::CancelGoalRequest)>> {
self.my_cancel_server.receive_request()
}
pub fn send_cancel_response(
&self,
req_id: RmwRequestId,
resp: action_msgs::CancelGoalResponse,
) -> WriteResult<(), ()> {
self.my_cancel_server.send_response(req_id, resp)
}
pub fn receive_result_request(&self) -> ReadResult<Option<(RmwRequestId, GetResultRequest)>>
where
<A as ActionTypes>::ResultType: 'static,
{
self.my_result_server.receive_request()
}
pub fn send_result(
&self,
result_request_id: RmwRequestId,
resp: GetResultResponse<A::ResultType>,
) -> WriteResult<(), ()>
where
<A as ActionTypes>::ResultType: 'static,
{
self.my_result_server.send_response(result_request_id, resp)
}
pub fn send_feedback(
&self,
goal_id: GoalId,
feedback: A::FeedbackType,
) -> WriteResult<(), FeedbackMessage<A::FeedbackType>> {
self
.my_feedback_publisher
.publish(FeedbackMessage { goal_id, feedback })
}
pub fn send_goal_statuses(
&self,
goal_statuses: action_msgs::GoalStatusArray,
) -> WriteResult<(), action_msgs::GoalStatusArray> {
self.my_status_publisher.publish(goal_statuses)
}
}
#[derive(Clone, Copy)]
pub struct NewGoalHandle<G> {
inner: InnerGoalHandle<G>,
req_id: RmwRequestId,
}
impl<G> NewGoalHandle<G> {
pub fn goal_id(&self) -> GoalId {
self.inner.goal_id
}
}
#[derive(Clone, Copy)]
pub struct AcceptedGoalHandle<G> {
inner: InnerGoalHandle<G>,
}
impl<G> AcceptedGoalHandle<G> {
pub fn goal_id(&self) -> GoalId {
self.inner.goal_id
}
}
#[derive(Clone, Copy)]
pub struct ExecutingGoalHandle<G> {
inner: InnerGoalHandle<G>,
}
impl<G> ExecutingGoalHandle<G> {
pub fn goal_id(&self) -> GoalId {
self.inner.goal_id
}
}
#[derive(Clone, Copy)]
struct InnerGoalHandle<G> {
goal_id: GoalId,
phantom: PhantomData<G>,
}
pub struct CancelHandle {
req_id: RmwRequestId,
goals: Vec<GoalId>,
}
impl CancelHandle {
pub fn goals(&self) -> impl Iterator<Item = GoalId> + '_ {
self.goals.iter().cloned()
}
pub fn contains_goal(&self, goal_id: &GoalId) -> bool {
self.goals.contains(goal_id)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GoalEndStatus {
Succeeded,
Aborted,
Canceled,
}
#[derive(Debug)]
pub enum GoalError<T> {
NoSuchGoal,
WrongGoalState,
DDSReadError(ReadError),
DDSWriteError(WriteError<T>),
}
impl<T> From<ReadError> for GoalError<T> {
fn from(e: ReadError) -> Self {
GoalError::DDSReadError(e)
}
}
impl<T> From<WriteError<T>> for GoalError<T> {
fn from(e: WriteError<T>) -> Self {
GoalError::DDSWriteError(e)
}
}
#[derive(Debug, Clone)]
struct AsyncGoal<A>
where
A: ActionTypes,
{
status: GoalStatusEnum,
accepted_time: Option<builtin_interfaces::Time>,
goal: A::GoalType,
}
pub struct AsyncActionServer<A>
where
A: ActionTypes,
A::GoalType: Message + Clone,
A::ResultType: Message + Clone,
A::FeedbackType: Message,
{
actionserver: ActionServer<A>,
goals: BTreeMap<GoalId, AsyncGoal<A>>,
result_requests: BTreeMap<GoalId, RmwRequestId>,
}
impl<A> AsyncActionServer<A>
where
A: ActionTypes,
A::GoalType: Message + Clone,
A::ResultType: Message + Clone,
A::FeedbackType: Message,
{
pub fn new(actionserver: ActionServer<A>) -> Self {
AsyncActionServer::<A> {
actionserver,
goals: BTreeMap::new(),
result_requests: BTreeMap::new(),
}
}
pub fn get_new_goal(&self, handle: NewGoalHandle<A::GoalType>) -> Option<&A::GoalType> {
self.goals.get(&handle.inner.goal_id).map(|ag| &ag.goal)
}
pub async fn receive_new_goal(&mut self) -> ReadResult<NewGoalHandle<A::GoalType>>
where
<A as ActionTypes>::GoalType: 'static,
{
let (req_id, goal_id) = loop {
let (req_id, goal_request) = self
.actionserver
.my_goal_server
.async_receive_request()
.await?;
match self.goals.entry(goal_request.goal_id) {
e @ Entry::Vacant(_) => {
e.or_insert(AsyncGoal {
status: GoalStatusEnum::Unknown,
goal: goal_request.goal,
accepted_time: None,
});
break (req_id, goal_request.goal_id);
}
Entry::Occupied(_) => {
error!(
"Received duplicate goal_id {:?} , req_id={:?}",
goal_request.goal_id, req_id
);
continue; }
}
};
let inner = InnerGoalHandle {
goal_id,
phantom: PhantomData,
};
Ok(NewGoalHandle { inner, req_id })
}
pub async fn accept_goal(
&mut self,
handle: NewGoalHandle<A::GoalType>,
) -> Result<AcceptedGoalHandle<A::GoalType>, GoalError<()>>
where
A::GoalType: 'static,
{
match self.goals.entry(handle.inner.goal_id) {
Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
Entry::Occupied(o) => match o.get() {
AsyncGoal {
status: GoalStatusEnum::Unknown,
..
} => {
let now = builtin_interfaces::Time::now();
let mut_o = o.into_mut();
mut_o.status = GoalStatusEnum::Accepted;
mut_o.accepted_time = Some(now);
self.publish_statuses().await;
self.actionserver.my_goal_server.send_response(
handle.req_id,
SendGoalResponse {
accepted: true,
stamp: now,
},
)?;
Ok(AcceptedGoalHandle {
inner: handle.inner,
})
}
AsyncGoal {
status: wrong_status,
..
} => {
error!(
"Tried to accept goal {:?} but status was {:?}, expected Unknown.",
handle.inner.goal_id, wrong_status
);
Err(GoalError::WrongGoalState)
}
},
}
}
pub async fn reject_goal(
&mut self,
handle: NewGoalHandle<A::GoalType>,
) -> Result<(), GoalError<()>>
where
A::GoalType: 'static,
{
match self.goals.entry(handle.inner.goal_id) {
Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
Entry::Occupied(o) => {
match o.get() {
AsyncGoal {
status: GoalStatusEnum::Unknown,
..
} => {
self.actionserver.my_goal_server.send_response(
handle.req_id,
SendGoalResponse {
accepted: false,
stamp: builtin_interfaces::Time::now(),
},
)?;
Ok(())
}
AsyncGoal {
status: wrong_status,
..
} => {
error!(
"Tried to reject goal {:?} but status was {:?}, expected Unknown.",
handle.inner.goal_id, wrong_status
);
Err(GoalError::WrongGoalState)
}
}
}
}
}
pub async fn start_executing_goal(
&mut self,
handle: AcceptedGoalHandle<A::GoalType>,
) -> Result<ExecutingGoalHandle<A::GoalType>, GoalError<()>> {
match self.goals.entry(handle.inner.goal_id) {
Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
Entry::Occupied(o) => match o.get() {
AsyncGoal {
status: GoalStatusEnum::Accepted,
..
} => {
o.into_mut().status = GoalStatusEnum::Executing;
self.publish_statuses().await;
Ok(ExecutingGoalHandle {
inner: handle.inner,
})
}
AsyncGoal {
status: wrong_status,
..
} => {
error!(
"Tried to execute goal {:?} but status was {:?}, expected Accepted.",
handle.inner.goal_id, wrong_status
);
Err(GoalError::WrongGoalState)
}
},
}
}
pub async fn publish_feedback(
&mut self,
handle: ExecutingGoalHandle<A::GoalType>,
feedback: A::FeedbackType,
) -> Result<(), GoalError<FeedbackMessage<A::FeedbackType>>> {
match self.goals.entry(handle.inner.goal_id) {
Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
Entry::Occupied(o) => match o.get() {
AsyncGoal {
status: GoalStatusEnum::Executing,
..
} => {
self
.actionserver
.send_feedback(handle.inner.goal_id, feedback)?;
Ok(())
}
AsyncGoal {
status: wrong_status,
..
} => {
error!(
"Tried publish feedback on goal {:?} but status was {:?}, expected Executing.",
handle.inner.goal_id, wrong_status
);
Err(GoalError::WrongGoalState)
}
},
}
}
pub async fn send_result_response(
&mut self,
handle: ExecutingGoalHandle<A::GoalType>,
result_status: GoalEndStatus,
result: A::ResultType,
) -> Result<(), GoalError<()>>
where
A::ResultType: 'static,
{
let result_status = match result_status {
GoalEndStatus::Succeeded => GoalStatusEnum::Succeeded,
GoalEndStatus::Aborted => GoalStatusEnum::Aborted,
GoalEndStatus::Canceled => GoalStatusEnum::Canceled,
};
let req_id = match self.result_requests.get(&handle.inner.goal_id) {
Some(req_id) => *req_id,
None => {
let res_reqs = self.actionserver.my_result_server.receive_request_stream();
pin_mut!(res_reqs);
loop {
let (req_id, GetResultRequest { goal_id }) = res_reqs.select_next_some().await?;
if goal_id == handle.inner.goal_id {
break req_id;
} else {
self.result_requests.insert(goal_id, req_id);
debug!(
"Got result request for goal_id={:?} req_id={:?}",
goal_id, req_id
);
}
}
}
};
match self.goals.entry(handle.inner.goal_id) {
Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
Entry::Occupied(o) => {
match o.get() {
AsyncGoal {
status: GoalStatusEnum::Accepted,
..
}
| AsyncGoal {
status: GoalStatusEnum::Executing,
..
}
| AsyncGoal {
status: GoalStatusEnum::Canceling,
..
} => {
o.into_mut().status = result_status;
self.publish_statuses().await;
self.actionserver.send_result(
req_id,
GetResultResponse {
status: result_status,
result,
},
)?;
debug!(
"Send result for goal_id={:?} req_id={:?}",
handle.inner.goal_id, req_id
);
Ok(())
}
AsyncGoal {
status: wrong_status,
..
} => {
error!(
"Tried to finish goal {:?} but status was {:?}.",
handle.inner.goal_id, wrong_status
);
Err(GoalError::WrongGoalState)
}
}
}
}
}
pub async fn abort_executing_goal(
&mut self,
handle: ExecutingGoalHandle<A::GoalType>,
) -> Result<(), GoalError<()>> {
self.abort_goal(handle.inner).await
}
pub async fn abort_accepted_goal(
&mut self,
handle: AcceptedGoalHandle<A::GoalType>,
) -> Result<(), GoalError<()>> {
self.abort_goal(handle.inner).await
}
async fn abort_goal(
&mut self,
handle: InnerGoalHandle<A::GoalType>,
) -> Result<(), GoalError<()>> {
match self.goals.entry(handle.goal_id) {
Entry::Vacant(_) => Err(GoalError::NoSuchGoal),
Entry::Occupied(o) => match o.get() {
AsyncGoal {
status: GoalStatusEnum::Accepted,
..
}
| AsyncGoal {
status: GoalStatusEnum::Executing,
..
} => {
o.into_mut().status = GoalStatusEnum::Aborted;
self.publish_statuses().await;
Ok(())
}
AsyncGoal {
status: wrong_status,
..
} => {
error!(
"Tried to abort goal {:?} but status was {:?}, expected Accepted or Executing. ",
handle.goal_id, wrong_status
);
Err(GoalError::WrongGoalState)
}
},
}
}
pub async fn receive_cancel_request(&self) -> ReadResult<CancelHandle> {
let (req_id, CancelGoalRequest { goal_info }) = self
.actionserver
.my_cancel_server
.async_receive_request()
.await?;
#[allow(clippy::type_complexity)] let goal_filter: Box<dyn FnMut(&(&GoalId, &AsyncGoal<A>)) -> bool> = match goal_info {
GoalInfo {
goal_id: GoalId::ZERO,
stamp: builtin_interfaces::Time::ZERO,
} => Box::new(|(_, _)| true),
GoalInfo {
goal_id: GoalId::ZERO,
stamp,
} => Box::new(move |(_, ag)| ag.accepted_time.map(|at| at < stamp).unwrap_or(false)),
GoalInfo {
goal_id,
stamp: builtin_interfaces::Time::ZERO,
} => Box::new(move |(g_id, _)| goal_id == **g_id),
GoalInfo { goal_id, stamp } => Box::new(move |(g_id, ag)| {
goal_id == **g_id || ag.accepted_time.map(move |at| at < stamp).unwrap_or(false)
}),
};
let cancel_handle = CancelHandle {
req_id,
goals: self
.goals
.iter()
.filter(|(_, async_goal)| {
async_goal.status == GoalStatusEnum::Executing
|| async_goal.status == GoalStatusEnum::Accepted
})
.filter(goal_filter)
.map(|p| *p.0)
.collect(),
};
Ok(cancel_handle)
}
pub async fn respond_to_cancel_requests(
&mut self,
cancel_handle: &CancelHandle,
goals_to_cancel: impl Iterator<Item = GoalId>,
) -> WriteResult<(), ()> {
let canceling_goals: Vec<GoalInfo> = goals_to_cancel
.filter_map(|goal_id| {
self
.goals
.get(&goal_id)
.and_then(|AsyncGoal { accepted_time, .. }| {
accepted_time.map(|stamp| GoalInfo { goal_id, stamp })
})
})
.collect();
for goal_info in &canceling_goals {
self
.goals
.entry(goal_info.goal_id)
.and_modify(|gg| gg.status = GoalStatusEnum::Canceling);
}
self.publish_statuses().await;
let response = action_msgs::CancelGoalResponse {
return_code: if canceling_goals.is_empty() {
action_msgs::CancelGoalResponseEnum::Rejected
} else {
action_msgs::CancelGoalResponseEnum::None },
goals_canceling: canceling_goals,
};
self
.actionserver
.my_cancel_server
.async_send_response(cancel_handle.req_id, response)
.await
}
async fn publish_statuses(&self) {
let goal_status_array = action_msgs::GoalStatusArray {
status_list: self
.goals
.iter()
.map(
|(
goal_id,
AsyncGoal {
status,
accepted_time,
..
},
)| action_msgs::GoalStatus {
status: *status,
goal_info: GoalInfo {
goal_id: *goal_id,
stamp: accepted_time.unwrap_or(builtin_interfaces::Time::ZERO),
},
},
)
.collect(),
};
debug!(
"Reporting statuses for {:?}",
goal_status_array
.status_list
.iter()
.map(|gs| gs.goal_info.goal_id)
);
self
.actionserver
.send_goal_statuses(goal_status_array)
.unwrap_or_else(|e| error!("AsyncActionServer::publish_statuses: {:?}", e));
}
}