Skip to main content

action_core/client_server/client/
trait_impl.rs

1use core_types::{ActionGoalId, ErrorCode, ErrorDomain, RtError, Timestamp};
2
3use crate::message::{ActionFeedback, ActionResult, ActionSessionHealth, GoalAck, GoalStatus};
4
5use super::super::ActionClient;
6use super::BasicActionClient;
7
8impl<G, F, R> ActionClient<G, F, R> for BasicActionClient<G, F, R>
9where
10    G: Send + 'static,
11    F: Send + 'static,
12    R: Send + 'static,
13{
14    fn action_name(&self) -> &str {
15        &self.action_name
16    }
17
18    fn schema(&self) -> &crate::message::ActionSchema {
19        &self.schema
20    }
21
22    fn send_goal(&mut self, goal: G) -> Result<(ActionGoalId, GoalAck), RtError> {
23        if self.closed {
24            return Err(RtError::new(
25                ErrorCode::InvalidState,
26                ErrorDomain::Action,
27                false,
28                "client is closed",
29            ));
30        }
31        self.next_goal_id += 1;
32        let id = ActionGoalId::new(self.next_goal_id);
33
34        if let Some(ch) = &self.channel {
35            ch.goals.lock().unwrap().push_back((id, goal));
36            ch.statuses
37                .lock()
38                .unwrap()
39                .insert(id.0, GoalStatus::Accepted);
40            ch.heartbeats.lock().unwrap().insert(id.0, Timestamp::now());
41            // In same-process mode the server overwrites this ack via accept_goal/reject_goal.
42            // In an async runtime this would be a Future; here we return the default accepted=true
43            // and let test code control the server call order to verify semantics.
44            let ack = GoalAck {
45                accepted: true,
46                reason: None,
47            };
48            ch.acks.lock().unwrap().insert(id.0, ack.clone());
49            return Ok((id, ack));
50        }
51
52        // Standalone mode (no channel): always accept.
53        let ack = GoalAck {
54            accepted: true,
55            reason: None,
56        };
57        Ok((id, ack))
58    }
59
60    fn poll_feedback(&mut self, goal_id: ActionGoalId) -> Option<ActionFeedback<F>> {
61        self.apply_heartbeat_timeouts_at(Timestamp::now());
62        self.channel
63            .as_ref()?
64            .feedbacks
65            .lock()
66            .unwrap()
67            .get_mut(&goal_id.0)
68            .and_then(|q| q.pop_front())
69    }
70
71    fn poll_result(&mut self, goal_id: ActionGoalId) -> Option<ActionResult<R>> {
72        self.apply_heartbeat_timeouts_at(Timestamp::now());
73        let result = self
74            .channel
75            .as_ref()?
76            .results
77            .lock()
78            .unwrap()
79            .remove(&goal_id.0);
80        if result.is_some() && let Some(ch) = &self.channel {
81            ch.heartbeats.lock().unwrap().remove(&goal_id.0);
82        }
83        result
84    }
85
86    fn cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
87        if self.closed {
88            return Err(RtError::new(
89                ErrorCode::InvalidState,
90                ErrorDomain::Action,
91                false,
92                "client is closed",
93            ));
94        }
95        if let Some(ch) = &self.channel {
96            ch.cancels.lock().unwrap().push_back(goal_id);
97            ch.statuses
98                .lock()
99                .unwrap()
100                .insert(goal_id.0, GoalStatus::Canceling);
101            ch.heartbeats
102                .lock()
103                .unwrap()
104                .insert(goal_id.0, Timestamp::now());
105        }
106        Ok(())
107    }
108
109    fn goal_status(&self, goal_id: ActionGoalId) -> Option<GoalStatus> {
110        self.apply_heartbeat_timeouts_at(Timestamp::now());
111        self.channel
112            .as_ref()?
113            .statuses
114            .lock()
115            .unwrap()
116            .get(&goal_id.0)
117            .copied()
118    }
119
120    fn tick_timeouts(&mut self, now: Timestamp) -> usize {
121        self.apply_heartbeat_timeouts_at(now)
122    }
123
124    fn goal_health(&self, goal_id: ActionGoalId) -> Option<ActionSessionHealth> {
125        self.goal_health_at(goal_id, Timestamp::now())
126    }
127
128    fn active_goal_health(&self) -> Vec<ActionSessionHealth> {
129        let Some(ch) = &self.channel else {
130            return Vec::new();
131        };
132
133        let now = Timestamp::now();
134        let mut goal_ids = ch
135            .statuses
136            .lock()
137            .unwrap()
138            .keys()
139            .copied()
140            .collect::<Vec<_>>();
141        goal_ids.sort_unstable();
142
143        goal_ids
144            .into_iter()
145            .filter_map(|id| self.goal_health_at(ActionGoalId(id), now))
146            .collect()
147    }
148
149    fn close(&mut self) -> Result<(), RtError> {
150        self.closed = true;
151        Ok(())
152    }
153}