use super::ActionServerHandle;
use crate::{
log_error,
rcl_bindings::*,
vendor::{
action_msgs::{msg::GoalInfo, srv::CancelGoal_Response},
unique_identifier_msgs::msg::UUID,
},
CancelResponseCode, GoalUuid, Node, RclrsErrorFilter, ToResult,
};
use futures::{
future::{select, Either},
pin_mut,
};
use futures_lite::future::race;
use rosidl_runtime_rs::{Action, Message};
use std::{
borrow::Cow,
collections::HashSet,
future::Future,
sync::{Arc, Mutex},
};
use tokio::sync::watch::{channel as watch_channel, Receiver, Sender};
pub(super) struct CancellationState<A: Action> {
receiver: Receiver<bool>,
sender: Sender<bool>,
mode: Mutex<CancellationMode<A>>,
}
impl<A: Action> CancellationState<A> {
pub(super) fn until_cancel_requested<F: Future + Unpin>(
&self,
f: F,
) -> impl Future<Output = Result<F::Output, F>> {
let mut watcher = self.receiver.clone();
async move {
let cancel_requested = watcher.wait_for(|request_received| *request_received);
pin_mut!(cancel_requested);
match select(f, cancel_requested).await {
Either::Left((result, _)) => Ok(result),
Either::Right((_, f)) => Err(f),
}
}
}
pub(super) fn unless_cancel_requested<F: Future>(
&self,
f: F,
) -> impl Future<Output = Result<F::Output, ()>> {
let mut watcher = self.receiver.clone();
race(async move { Ok(f.await) }, async move {
let _ = watcher.wait_for(|request_received| *request_received).await;
Err(())
})
}
pub(super) fn cancel_requested(&self) -> bool {
*self.receiver.borrow()
}
pub(super) fn request_cancellation(&self, request: CancellationRequest<A>, uuid: &GoalUuid) {
let mut mode = self.mode.lock().unwrap();
match &mut *mode {
CancellationMode::None => {
let requests = Vec::from_iter([request]);
*mode = CancellationMode::CancelRequested(requests);
self.change_cancel_requested_status(true);
}
CancellationMode::CancelRequested(requests) => {
requests.push(request);
self.change_cancel_requested_status(true);
}
CancellationMode::Cancelling => {
request.accept(*uuid);
}
}
}
pub(super) fn reject_cancellation(&self, uuid: &GoalUuid) {
let mut mode = self.mode.lock().unwrap();
match &mut *mode {
CancellationMode::CancelRequested(requesters) => {
for requester in requesters.drain(..) {
requester.reject(*uuid);
}
*mode = CancellationMode::None;
let _ = self.change_cancel_requested_status(false);
}
CancellationMode::None => {
}
CancellationMode::Cancelling => {
}
}
}
pub(super) fn accept_cancellation(&self, uuid: &GoalUuid) {
let mut mode = self.mode.lock().unwrap();
match &mut *mode {
CancellationMode::CancelRequested(requesters) => {
for requester in requesters.drain(..) {
requester.accept(*uuid);
}
*mode = CancellationMode::Cancelling;
let _ = self.change_cancel_requested_status(true);
}
CancellationMode::None => {
*mode = CancellationMode::Cancelling;
let _ = self.change_cancel_requested_status(true);
}
CancellationMode::Cancelling => {
}
}
}
fn change_cancel_requested_status(&self, cancel_requested: bool) {
self.sender.send_if_modified(|status| {
let previously_requested = *status;
*status = cancel_requested;
cancel_requested && !previously_requested
});
}
}
impl<A: Action> Default for CancellationState<A> {
fn default() -> Self {
let (sender, receiver) = watch_channel(false);
Self {
receiver,
sender,
mode: Mutex::new(CancellationMode::None),
}
}
}
pub(super) enum CancellationMode<A: Action> {
None,
CancelRequested(Vec<CancellationRequest<A>>),
Cancelling,
}
pub(super) struct CancellationRequest<A: Action> {
inner: Arc<Mutex<CancellationRequestInner<A>>>,
}
impl<A: Action> Clone for CancellationRequest<A> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<A: Action> CancellationRequest<A> {
pub(super) fn new(
id: rmw_request_id_t,
waiting_for: Vec<GoalUuid>,
server: Arc<ActionServerHandle<A>>,
node: Node,
) -> Self {
Self {
inner: Arc::new(Mutex::new(CancellationRequestInner {
id,
waiting_for,
server,
node,
received: Default::default(),
accepted: Vec::new(),
response_sent: false,
})),
}
}
pub(super) fn accept(&self, uuid: GoalUuid) {
let mut inner = self.inner.lock().unwrap();
if !inner.received.insert(uuid) {
return;
}
let stamp = inner
.node
.get_clock()
.now()
.to_ros_msg()
.unwrap_or_default();
let info = GoalInfo {
goal_id: UUID { uuid: *uuid },
stamp,
};
inner.accepted.push(info);
inner.respond_if_ready();
}
fn reject(&self, uuid: GoalUuid) {
let mut inner = self.inner.lock().unwrap();
if !inner.received.insert(uuid) {
return;
}
inner.respond_if_ready();
}
}
struct CancellationRequestInner<A: Action> {
id: rmw_request_id_t,
waiting_for: Vec<GoalUuid>,
received: HashSet<GoalUuid>,
accepted: Vec<GoalInfo>,
response_sent: bool,
server: Arc<ActionServerHandle<A>>,
node: Node,
}
impl<A: Action> CancellationRequestInner<A> {
fn respond_if_ready(&mut self) {
for expected in &self.waiting_for {
if !self.received.contains(expected) {
return;
}
}
self.respond();
}
fn respond(&mut self) {
if self.response_sent {
return;
}
self.response_sent = true;
let mut response = CancelGoal_Response::default();
response.goals_canceling = self.accepted.drain(..).collect();
if response.goals_canceling.is_empty() {
response.return_code = CancelResponseCode::Reject as i8;
} else {
response.return_code = CancelResponseCode::Accept as i8;
}
let mut response_rmw =
CancelGoal_Response::into_rmw_message(Cow::Owned(response)).into_owned();
let r = unsafe {
rcl_action_send_cancel_response(
&*self.server.lock(),
&mut self.id,
&mut response_rmw as *mut _ as *mut _,
)
}
.ok()
.timeout_ok();
if let Err(err) = r {
log_error!(
"CancellationRequest.respond",
"Error occurred while responding to a cancellation request: {err}"
)
}
}
}
impl<A: Action> Drop for CancellationRequestInner<A> {
fn drop(&mut self) {
if !self.response_sent {
self.respond();
}
}
}