use std::marker::PhantomData;
use core_types::{ActionGoalId, RtDuration, Timestamp};
use crate::message::{ActionLiveness, ActionResult, ActionSchema, ActionSessionHealth, GoalStatus};
use super::ActionChannel;
mod trait_impl;
pub struct BasicActionClient<G, F, R> {
action_name: String,
schema: ActionSchema,
closed: bool,
next_goal_id: u64,
heartbeat_timeout: Option<RtDuration>,
heartbeat_stalled_after: Option<RtDuration>,
channel: Option<std::sync::Arc<ActionChannel<G, F, R>>>,
_marker: PhantomData<(G, F, R)>,
}
impl<G, F, R> BasicActionClient<G, F, R> {
pub fn new(action_name: impl Into<String>, schema: ActionSchema) -> Self {
Self {
action_name: action_name.into(),
schema,
closed: false,
next_goal_id: 0,
heartbeat_timeout: Some(RtDuration::from_secs(5)),
heartbeat_stalled_after: Some(RtDuration::from_secs(2)),
channel: None,
_marker: PhantomData,
}
}
pub fn with_channel(
action_name: impl Into<String>,
schema: ActionSchema,
channel: std::sync::Arc<ActionChannel<G, F, R>>,
) -> Self {
Self {
action_name: action_name.into(),
schema,
closed: false,
next_goal_id: 0,
heartbeat_timeout: Some(RtDuration::from_secs(5)),
heartbeat_stalled_after: Some(RtDuration::from_secs(2)),
channel: Some(channel),
_marker: PhantomData,
}
}
pub fn with_heartbeat_timeout(mut self, timeout: Option<RtDuration>) -> Self {
self.heartbeat_timeout = timeout;
self
}
pub fn with_stalled_threshold(mut self, stalled_after: Option<RtDuration>) -> Self {
self.heartbeat_stalled_after = stalled_after;
self
}
fn goal_health_at(&self, goal_id: ActionGoalId, now: Timestamp) -> Option<ActionSessionHealth> {
let ch = self.channel.as_ref()?;
let status = ch.statuses.lock().unwrap().get(&goal_id.0).copied()?;
let last_heartbeat = ch.heartbeats.lock().unwrap().get(&goal_id.0).copied();
let last_feedback = ch
.feedback_timestamps
.lock()
.unwrap()
.get(&goal_id.0)
.copied();
let last_result = ch
.result_timestamps
.lock()
.unwrap()
.get(&goal_id.0)
.copied();
let timeout_nanos = self.heartbeat_timeout.map(|value| value.as_nanos());
let stalled_nanos = self.heartbeat_stalled_after.map(|value| value.as_nanos());
let liveness = if status.is_terminal() {
ActionLiveness::Completed
} else if let (Some(last), Some(timeout)) = (last_heartbeat, timeout_nanos) {
let elapsed = now.0.saturating_sub(last.0);
if elapsed >= timeout {
ActionLiveness::TimedOut
} else if let Some(stalled_after) = stalled_nanos {
if elapsed >= stalled_after {
ActionLiveness::Stalled
} else {
ActionLiveness::Active
}
} else {
ActionLiveness::Active
}
} else {
ActionLiveness::Unknown
};
Some(ActionSessionHealth {
goal_id,
status,
liveness,
heartbeat_timeout_nanos: timeout_nanos,
stalled_threshold_nanos: stalled_nanos,
last_heartbeat_at_unix_nanos: last_heartbeat.map(|value| value.0),
last_feedback_at_unix_nanos: last_feedback.map(|value| value.0),
last_result_at_unix_nanos: last_result.map(|value| value.0),
})
}
fn apply_heartbeat_timeouts_at(&self, now: Timestamp) -> usize {
let Some(timeout) = self.heartbeat_timeout else {
return 0;
};
let Some(ch) = &self.channel else {
return 0;
};
let timeout_nanos = timeout.as_nanos();
let expired_ids: Vec<u64> = {
let statuses = ch.statuses.lock().unwrap();
let heartbeats = ch.heartbeats.lock().unwrap();
statuses
.iter()
.filter_map(|(goal_id, status)| {
if !matches!(status, GoalStatus::Executing | GoalStatus::Canceling) {
return None;
}
let last = heartbeats.get(goal_id).copied().unwrap_or(Timestamp(0));
let elapsed = now.0.saturating_sub(last.0);
if elapsed >= timeout_nanos {
Some(*goal_id)
} else {
None
}
})
.collect()
};
if expired_ids.is_empty() {
return 0;
}
let mut statuses = ch.statuses.lock().unwrap();
let mut results = ch.results.lock().unwrap();
let mut heartbeats = ch.heartbeats.lock().unwrap();
let mut transitioned = 0usize;
for goal_id in expired_ids {
let Some(status) = statuses.get_mut(&goal_id) else {
continue;
};
if !matches!(*status, GoalStatus::Executing | GoalStatus::Canceling) {
continue;
}
*status = GoalStatus::Failed;
results.entry(goal_id).or_insert(ActionResult {
status: GoalStatus::Failed,
value: None,
error: Some("action heartbeat timeout".to_string()),
});
heartbeats.remove(&goal_id);
ch.result_timestamps.lock().unwrap().insert(goal_id, now);
transitioned += 1;
}
transitioned
}
}