action_core/client_server/client/
trait_impl.rs1use core_types::{ActionGoalId, ErrorCode, ErrorDomain, RtError, Timestamp};
2
3use crate::message::{ActionFeedback, ActionResult, ActionSessionHealth, GoalAck, GoalStatus};
4
5use super::super::ActionClient;
6use super::BasicActionClient;
7
8impl<G, F, R> ActionClient<G, F, R> for BasicActionClient<G, F, R>
9where
10 G: Send + 'static,
11 F: Send + 'static,
12 R: Send + 'static,
13{
14 fn action_name(&self) -> &str {
15 &self.action_name
16 }
17
18 fn schema(&self) -> &crate::message::ActionSchema {
19 &self.schema
20 }
21
22 fn send_goal(&mut self, goal: G) -> Result<(ActionGoalId, GoalAck), RtError> {
23 if self.closed {
24 return Err(RtError::new(
25 ErrorCode::InvalidState,
26 ErrorDomain::Action,
27 false,
28 "client is closed",
29 ));
30 }
31 self.next_goal_id += 1;
32 let id = ActionGoalId::new(self.next_goal_id);
33
34 if let Some(ch) = &self.channel {
35 ch.goals.lock().unwrap().push_back((id, goal));
36 ch.statuses
37 .lock()
38 .unwrap()
39 .insert(id.0, GoalStatus::Accepted);
40 ch.heartbeats.lock().unwrap().insert(id.0, Timestamp::now());
41 let ack = GoalAck {
45 accepted: true,
46 reason: None,
47 };
48 ch.acks.lock().unwrap().insert(id.0, ack.clone());
49 return Ok((id, ack));
50 }
51
52 let ack = GoalAck {
54 accepted: true,
55 reason: None,
56 };
57 Ok((id, ack))
58 }
59
60 fn poll_feedback(&mut self, goal_id: ActionGoalId) -> Option<ActionFeedback<F>> {
61 self.apply_heartbeat_timeouts_at(Timestamp::now());
62 self.channel
63 .as_ref()?
64 .feedbacks
65 .lock()
66 .unwrap()
67 .get_mut(&goal_id.0)
68 .and_then(|q| q.pop_front())
69 }
70
71 fn poll_result(&mut self, goal_id: ActionGoalId) -> Option<ActionResult<R>> {
72 self.apply_heartbeat_timeouts_at(Timestamp::now());
73 let result = self
74 .channel
75 .as_ref()?
76 .results
77 .lock()
78 .unwrap()
79 .remove(&goal_id.0);
80 if result.is_some() && let Some(ch) = &self.channel {
81 ch.heartbeats.lock().unwrap().remove(&goal_id.0);
82 }
83 result
84 }
85
86 fn cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
87 if self.closed {
88 return Err(RtError::new(
89 ErrorCode::InvalidState,
90 ErrorDomain::Action,
91 false,
92 "client is closed",
93 ));
94 }
95 if let Some(ch) = &self.channel {
96 ch.cancels.lock().unwrap().push_back(goal_id);
97 ch.statuses
98 .lock()
99 .unwrap()
100 .insert(goal_id.0, GoalStatus::Canceling);
101 ch.heartbeats
102 .lock()
103 .unwrap()
104 .insert(goal_id.0, Timestamp::now());
105 }
106 Ok(())
107 }
108
109 fn goal_status(&self, goal_id: ActionGoalId) -> Option<GoalStatus> {
110 self.apply_heartbeat_timeouts_at(Timestamp::now());
111 self.channel
112 .as_ref()?
113 .statuses
114 .lock()
115 .unwrap()
116 .get(&goal_id.0)
117 .copied()
118 }
119
120 fn tick_timeouts(&mut self, now: Timestamp) -> usize {
121 self.apply_heartbeat_timeouts_at(now)
122 }
123
124 fn goal_health(&self, goal_id: ActionGoalId) -> Option<ActionSessionHealth> {
125 self.goal_health_at(goal_id, Timestamp::now())
126 }
127
128 fn active_goal_health(&self) -> Vec<ActionSessionHealth> {
129 let Some(ch) = &self.channel else {
130 return Vec::new();
131 };
132
133 let now = Timestamp::now();
134 let mut goal_ids = ch
135 .statuses
136 .lock()
137 .unwrap()
138 .keys()
139 .copied()
140 .collect::<Vec<_>>();
141 goal_ids.sort_unstable();
142
143 goal_ids
144 .into_iter()
145 .filter_map(|id| self.goal_health_at(ActionGoalId(id), now))
146 .collect()
147 }
148
149 fn close(&mut self) -> Result<(), RtError> {
150 self.closed = true;
151 Ok(())
152 }
153}