use core_types::{ActionGoalId, ErrorCode, ErrorDomain, RtError, Timestamp};
use crate::message::{ActionFeedback, ActionResult, ActionSessionHealth, GoalAck, GoalStatus};
use super::super::ActionClient;
use super::BasicActionClient;
impl<G, F, R> ActionClient<G, F, R> for BasicActionClient<G, F, R>
where
G: Send + 'static,
F: Send + 'static,
R: Send + 'static,
{
fn action_name(&self) -> &str {
&self.action_name
}
fn schema(&self) -> &crate::message::ActionSchema {
&self.schema
}
fn send_goal(&mut self, goal: G) -> Result<(ActionGoalId, GoalAck), RtError> {
if self.closed {
return Err(RtError::new(
ErrorCode::InvalidState,
ErrorDomain::Action,
false,
"client is closed",
));
}
self.next_goal_id += 1;
let id = ActionGoalId::new(self.next_goal_id);
if let Some(ch) = &self.channel {
ch.goals.lock().unwrap().push_back((id, goal));
ch.statuses
.lock()
.unwrap()
.insert(id.0, GoalStatus::Accepted);
ch.heartbeats.lock().unwrap().insert(id.0, Timestamp::now());
let ack = GoalAck {
accepted: true,
reason: None,
};
ch.acks.lock().unwrap().insert(id.0, ack.clone());
return Ok((id, ack));
}
let ack = GoalAck {
accepted: true,
reason: None,
};
Ok((id, ack))
}
fn poll_feedback(&mut self, goal_id: ActionGoalId) -> Option<ActionFeedback<F>> {
self.apply_heartbeat_timeouts_at(Timestamp::now());
self.channel
.as_ref()?
.feedbacks
.lock()
.unwrap()
.get_mut(&goal_id.0)
.and_then(|q| q.pop_front())
}
fn poll_result(&mut self, goal_id: ActionGoalId) -> Option<ActionResult<R>> {
self.apply_heartbeat_timeouts_at(Timestamp::now());
let result = self
.channel
.as_ref()?
.results
.lock()
.unwrap()
.remove(&goal_id.0);
if result.is_some() && let Some(ch) = &self.channel {
ch.heartbeats.lock().unwrap().remove(&goal_id.0);
}
result
}
fn cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
if self.closed {
return Err(RtError::new(
ErrorCode::InvalidState,
ErrorDomain::Action,
false,
"client is closed",
));
}
if let Some(ch) = &self.channel {
ch.cancels.lock().unwrap().push_back(goal_id);
ch.statuses
.lock()
.unwrap()
.insert(goal_id.0, GoalStatus::Canceling);
ch.heartbeats
.lock()
.unwrap()
.insert(goal_id.0, Timestamp::now());
}
Ok(())
}
fn goal_status(&self, goal_id: ActionGoalId) -> Option<GoalStatus> {
self.apply_heartbeat_timeouts_at(Timestamp::now());
self.channel
.as_ref()?
.statuses
.lock()
.unwrap()
.get(&goal_id.0)
.copied()
}
fn tick_timeouts(&mut self, now: Timestamp) -> usize {
self.apply_heartbeat_timeouts_at(now)
}
fn goal_health(&self, goal_id: ActionGoalId) -> Option<ActionSessionHealth> {
self.goal_health_at(goal_id, Timestamp::now())
}
fn active_goal_health(&self) -> Vec<ActionSessionHealth> {
let Some(ch) = &self.channel else {
return Vec::new();
};
let now = Timestamp::now();
let mut goal_ids = ch
.statuses
.lock()
.unwrap()
.keys()
.copied()
.collect::<Vec<_>>();
goal_ids.sort_unstable();
goal_ids
.into_iter()
.filter_map(|id| self.goal_health_at(ActionGoalId(id), now))
.collect()
}
fn close(&mut self) -> Result<(), RtError> {
self.closed = true;
Ok(())
}
}