robotrt-action-core 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
Documentation
use core_types::{ActionGoalId, ErrorCode, ErrorDomain, RtError, Timestamp};

use crate::message::{ActionFeedback, ActionResult, ActionSessionHealth, GoalAck, GoalStatus};

use super::super::ActionClient;
use super::BasicActionClient;

impl<G, F, R> ActionClient<G, F, R> for BasicActionClient<G, F, R>
where
    G: Send + 'static,
    F: Send + 'static,
    R: Send + 'static,
{
    fn action_name(&self) -> &str {
        &self.action_name
    }

    fn schema(&self) -> &crate::message::ActionSchema {
        &self.schema
    }

    fn send_goal(&mut self, goal: G) -> Result<(ActionGoalId, GoalAck), RtError> {
        if self.closed {
            return Err(RtError::new(
                ErrorCode::InvalidState,
                ErrorDomain::Action,
                false,
                "client is closed",
            ));
        }
        self.next_goal_id += 1;
        let id = ActionGoalId::new(self.next_goal_id);

        if let Some(ch) = &self.channel {
            ch.goals.lock().unwrap().push_back((id, goal));
            ch.statuses
                .lock()
                .unwrap()
                .insert(id.0, GoalStatus::Accepted);
            ch.heartbeats.lock().unwrap().insert(id.0, Timestamp::now());
            // In same-process mode the server overwrites this ack via accept_goal/reject_goal.
            // In an async runtime this would be a Future; here we return the default accepted=true
            // and let test code control the server call order to verify semantics.
            let ack = GoalAck {
                accepted: true,
                reason: None,
            };
            ch.acks.lock().unwrap().insert(id.0, ack.clone());
            return Ok((id, ack));
        }

        // Standalone mode (no channel): always accept.
        let ack = GoalAck {
            accepted: true,
            reason: None,
        };
        Ok((id, ack))
    }

    fn poll_feedback(&mut self, goal_id: ActionGoalId) -> Option<ActionFeedback<F>> {
        self.apply_heartbeat_timeouts_at(Timestamp::now());
        self.channel
            .as_ref()?
            .feedbacks
            .lock()
            .unwrap()
            .get_mut(&goal_id.0)
            .and_then(|q| q.pop_front())
    }

    fn poll_result(&mut self, goal_id: ActionGoalId) -> Option<ActionResult<R>> {
        self.apply_heartbeat_timeouts_at(Timestamp::now());
        let result = self
            .channel
            .as_ref()?
            .results
            .lock()
            .unwrap()
            .remove(&goal_id.0);
        if result.is_some() && let Some(ch) = &self.channel {
            ch.heartbeats.lock().unwrap().remove(&goal_id.0);
        }
        result
    }

    fn cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
        if self.closed {
            return Err(RtError::new(
                ErrorCode::InvalidState,
                ErrorDomain::Action,
                false,
                "client is closed",
            ));
        }
        if let Some(ch) = &self.channel {
            ch.cancels.lock().unwrap().push_back(goal_id);
            ch.statuses
                .lock()
                .unwrap()
                .insert(goal_id.0, GoalStatus::Canceling);
            ch.heartbeats
                .lock()
                .unwrap()
                .insert(goal_id.0, Timestamp::now());
        }
        Ok(())
    }

    fn goal_status(&self, goal_id: ActionGoalId) -> Option<GoalStatus> {
        self.apply_heartbeat_timeouts_at(Timestamp::now());
        self.channel
            .as_ref()?
            .statuses
            .lock()
            .unwrap()
            .get(&goal_id.0)
            .copied()
    }

    fn tick_timeouts(&mut self, now: Timestamp) -> usize {
        self.apply_heartbeat_timeouts_at(now)
    }

    fn goal_health(&self, goal_id: ActionGoalId) -> Option<ActionSessionHealth> {
        self.goal_health_at(goal_id, Timestamp::now())
    }

    fn active_goal_health(&self) -> Vec<ActionSessionHealth> {
        let Some(ch) = &self.channel else {
            return Vec::new();
        };

        let now = Timestamp::now();
        let mut goal_ids = ch
            .statuses
            .lock()
            .unwrap()
            .keys()
            .copied()
            .collect::<Vec<_>>();
        goal_ids.sort_unstable();

        goal_ids
            .into_iter()
            .filter_map(|id| self.goal_health_at(ActionGoalId(id), now))
            .collect()
    }

    fn close(&mut self) -> Result<(), RtError> {
        self.closed = true;
        Ok(())
    }
}