robotrt-action-core 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
Documentation
use std::marker::PhantomData;

use core_types::{ActionGoalId, RtDuration, Timestamp};

use crate::message::{ActionLiveness, ActionResult, ActionSchema, ActionSessionHealth, GoalStatus};

use super::ActionChannel;

mod trait_impl;

pub struct BasicActionClient<G, F, R> {
    action_name: String,
    schema: ActionSchema,
    closed: bool,
    next_goal_id: u64,
    heartbeat_timeout: Option<RtDuration>,
    heartbeat_stalled_after: Option<RtDuration>,
    channel: Option<std::sync::Arc<ActionChannel<G, F, R>>>,
    _marker: PhantomData<(G, F, R)>,
}

impl<G, F, R> BasicActionClient<G, F, R> {
    pub fn new(action_name: impl Into<String>, schema: ActionSchema) -> Self {
        Self {
            action_name: action_name.into(),
            schema,
            closed: false,
            next_goal_id: 0,
            heartbeat_timeout: Some(RtDuration::from_secs(5)),
            heartbeat_stalled_after: Some(RtDuration::from_secs(2)),
            channel: None,
            _marker: PhantomData,
        }
    }

    pub fn with_channel(
        action_name: impl Into<String>,
        schema: ActionSchema,
        channel: std::sync::Arc<ActionChannel<G, F, R>>,
    ) -> Self {
        Self {
            action_name: action_name.into(),
            schema,
            closed: false,
            next_goal_id: 0,
            heartbeat_timeout: Some(RtDuration::from_secs(5)),
            heartbeat_stalled_after: Some(RtDuration::from_secs(2)),
            channel: Some(channel),
            _marker: PhantomData,
        }
    }

    pub fn with_heartbeat_timeout(mut self, timeout: Option<RtDuration>) -> Self {
        self.heartbeat_timeout = timeout;
        self
    }

    pub fn with_stalled_threshold(mut self, stalled_after: Option<RtDuration>) -> Self {
        self.heartbeat_stalled_after = stalled_after;
        self
    }

    fn goal_health_at(&self, goal_id: ActionGoalId, now: Timestamp) -> Option<ActionSessionHealth> {
        let ch = self.channel.as_ref()?;

        let status = ch.statuses.lock().unwrap().get(&goal_id.0).copied()?;
        let last_heartbeat = ch.heartbeats.lock().unwrap().get(&goal_id.0).copied();
        let last_feedback = ch
            .feedback_timestamps
            .lock()
            .unwrap()
            .get(&goal_id.0)
            .copied();
        let last_result = ch
            .result_timestamps
            .lock()
            .unwrap()
            .get(&goal_id.0)
            .copied();

        let timeout_nanos = self.heartbeat_timeout.map(|value| value.as_nanos());
        let stalled_nanos = self.heartbeat_stalled_after.map(|value| value.as_nanos());

        let liveness = if status.is_terminal() {
            ActionLiveness::Completed
        } else if let (Some(last), Some(timeout)) = (last_heartbeat, timeout_nanos) {
            let elapsed = now.0.saturating_sub(last.0);
            if elapsed >= timeout {
                ActionLiveness::TimedOut
            } else if let Some(stalled_after) = stalled_nanos {
                if elapsed >= stalled_after {
                    ActionLiveness::Stalled
                } else {
                    ActionLiveness::Active
                }
            } else {
                ActionLiveness::Active
            }
        } else {
            ActionLiveness::Unknown
        };

        Some(ActionSessionHealth {
            goal_id,
            status,
            liveness,
            heartbeat_timeout_nanos: timeout_nanos,
            stalled_threshold_nanos: stalled_nanos,
            last_heartbeat_at_unix_nanos: last_heartbeat.map(|value| value.0),
            last_feedback_at_unix_nanos: last_feedback.map(|value| value.0),
            last_result_at_unix_nanos: last_result.map(|value| value.0),
        })
    }

    fn apply_heartbeat_timeouts_at(&self, now: Timestamp) -> usize {
        let Some(timeout) = self.heartbeat_timeout else {
            return 0;
        };
        let Some(ch) = &self.channel else {
            return 0;
        };

        let timeout_nanos = timeout.as_nanos();
        let expired_ids: Vec<u64> = {
            let statuses = ch.statuses.lock().unwrap();
            let heartbeats = ch.heartbeats.lock().unwrap();
            statuses
                .iter()
                .filter_map(|(goal_id, status)| {
                    if !matches!(status, GoalStatus::Executing | GoalStatus::Canceling) {
                        return None;
                    }
                    let last = heartbeats.get(goal_id).copied().unwrap_or(Timestamp(0));
                    let elapsed = now.0.saturating_sub(last.0);
                    if elapsed >= timeout_nanos {
                        Some(*goal_id)
                    } else {
                        None
                    }
                })
                .collect()
        };

        if expired_ids.is_empty() {
            return 0;
        }

        let mut statuses = ch.statuses.lock().unwrap();
        let mut results = ch.results.lock().unwrap();
        let mut heartbeats = ch.heartbeats.lock().unwrap();
        let mut transitioned = 0usize;

        for goal_id in expired_ids {
            let Some(status) = statuses.get_mut(&goal_id) else {
                continue;
            };
            if !matches!(*status, GoalStatus::Executing | GoalStatus::Canceling) {
                continue;
            }

            *status = GoalStatus::Failed;
            results.entry(goal_id).or_insert(ActionResult {
                status: GoalStatus::Failed,
                value: None,
                error: Some("action heartbeat timeout".to_string()),
            });
            heartbeats.remove(&goal_id);
            ch.result_timestamps.lock().unwrap().insert(goal_id, now);
            transitioned += 1;
        }

        transitioned
    }
}