Skip to main content

action_core/client_server/server/
trait_impl.rs

1use core_types::{ActionGoalId, ErrorCode, ErrorDomain, RtError, Timestamp};
2
3use crate::message::{ActionFeedback, ActionResult, ActionSchema, GoalAck, GoalStatus};
4
5use super::BasicActionServer;
6use crate::client_server::ActionServer;
7
8impl<G, F, R> ActionServer<G, F, R> for BasicActionServer<G, F, R>
9where
10    G: Send + 'static,
11    F: Send + 'static,
12    R: Send + 'static,
13{
14    fn action_name(&self) -> &str {
15        &self.action_name
16    }
17
18    fn schema(&self) -> &ActionSchema {
19        &self.schema
20    }
21
22    fn recv_goal(&mut self) -> Result<Option<(ActionGoalId, G)>, RtError> {
23        if self.closed {
24            return Err(RtError::new(
25                ErrorCode::InvalidState,
26                ErrorDomain::Action,
27                false,
28                "server is closed",
29            ));
30        }
31        if let Some(ch) = &self.channel {
32            return Ok(ch.goals.lock().unwrap().pop_front());
33        }
34        Ok(self.injected.pop_front())
35    }
36
37    fn accept_goal(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
38        let ack = GoalAck {
39            accepted: true,
40            reason: None,
41        };
42        if let Some(ch) = &self.channel {
43            ch.acks.lock().unwrap().insert(goal_id.0, ack);
44            ch.statuses
45                .lock()
46                .unwrap()
47                .insert(goal_id.0, GoalStatus::Executing);
48            ch.heartbeats
49                .lock()
50                .unwrap()
51                .insert(goal_id.0, Timestamp::now());
52        }
53        Ok(())
54    }
55
56    fn reject_goal(
57        &mut self,
58        goal_id: ActionGoalId,
59        reason: impl Into<String>,
60    ) -> Result<(), RtError> {
61        let ack = GoalAck {
62            accepted: false,
63            reason: Some(reason.into()),
64        };
65        if let Some(ch) = &self.channel {
66            ch.acks.lock().unwrap().insert(goal_id.0, ack);
67            ch.statuses
68                .lock()
69                .unwrap()
70                .insert(goal_id.0, GoalStatus::Rejected);
71            ch.heartbeats.lock().unwrap().remove(&goal_id.0);
72            ch.results.lock().unwrap().insert(
73                goal_id.0,
74                ActionResult {
75                    status: GoalStatus::Rejected,
76                    value: None,
77                    error: None,
78                },
79            );
80            ch.result_timestamps
81                .lock()
82                .unwrap()
83                .insert(goal_id.0, Timestamp::now());
84        }
85        Ok(())
86    }
87
88    fn publish_feedback(&mut self, goal_id: ActionGoalId, feedback: F) -> Result<(), RtError> {
89        if self.closed {
90            return Err(RtError::new(
91                ErrorCode::InvalidState,
92                ErrorDomain::Action,
93                false,
94                "server is closed",
95            ));
96        }
97        let fb = ActionFeedback {
98            timestamp: Timestamp::now(),
99            value: feedback,
100        };
101        if let Some(ch) = &self.channel {
102            ch.feedbacks
103                .lock()
104                .unwrap()
105                .entry(goal_id.0)
106                .or_default()
107                .push_back(fb);
108            ch.heartbeats
109                .lock()
110                .unwrap()
111                .insert(goal_id.0, Timestamp::now());
112            ch.feedback_timestamps
113                .lock()
114                .unwrap()
115                .insert(goal_id.0, Timestamp::now());
116        }
117        Ok(())
118    }
119
120    fn heartbeat(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
121        if self.closed {
122            return Err(RtError::new(
123                ErrorCode::InvalidState,
124                ErrorDomain::Action,
125                false,
126                "server is closed",
127            ));
128        }
129        if let Some(ch) = &self.channel {
130            ch.heartbeats
131                .lock()
132                .unwrap()
133                .insert(goal_id.0, Timestamp::now());
134        }
135        Ok(())
136    }
137
138    fn succeed(&mut self, goal_id: ActionGoalId, result: R) -> Result<(), RtError> {
139        if let Some(ch) = &self.channel {
140            ch.statuses
141                .lock()
142                .unwrap()
143                .insert(goal_id.0, GoalStatus::Succeeded);
144            ch.heartbeats.lock().unwrap().remove(&goal_id.0);
145            ch.results.lock().unwrap().insert(
146                goal_id.0,
147                ActionResult {
148                    status: GoalStatus::Succeeded,
149                    value: Some(result),
150                    error: None,
151                },
152            );
153            ch.result_timestamps
154                .lock()
155                .unwrap()
156                .insert(goal_id.0, Timestamp::now());
157        }
158        Ok(())
159    }
160
161    fn fail(&mut self, goal_id: ActionGoalId, reason: impl Into<String>) -> Result<(), RtError> {
162        if let Some(ch) = &self.channel {
163            ch.statuses
164                .lock()
165                .unwrap()
166                .insert(goal_id.0, GoalStatus::Failed);
167            ch.heartbeats.lock().unwrap().remove(&goal_id.0);
168            ch.results.lock().unwrap().insert(
169                goal_id.0,
170                ActionResult {
171                    status: GoalStatus::Failed,
172                    value: None,
173                    error: Some(reason.into()),
174                },
175            );
176            ch.result_timestamps
177                .lock()
178                .unwrap()
179                .insert(goal_id.0, Timestamp::now());
180        }
181        Ok(())
182    }
183
184    fn poll_cancel_request(&mut self) -> Option<ActionGoalId> {
185        let goal_id = self.channel.as_ref()?.cancels.lock().unwrap().pop_front()?;
186        if let Some(ch) = &self.channel {
187            ch.statuses
188                .lock()
189                .unwrap()
190                .insert(goal_id.0, GoalStatus::Canceling);
191            ch.heartbeats
192                .lock()
193                .unwrap()
194                .insert(goal_id.0, Timestamp::now());
195        }
196        Some(goal_id)
197    }
198
199    fn confirm_cancel(&mut self, goal_id: ActionGoalId) -> Result<(), RtError> {
200        if let Some(ch) = &self.channel {
201            ch.statuses
202                .lock()
203                .unwrap()
204                .insert(goal_id.0, GoalStatus::Canceled);
205            ch.heartbeats.lock().unwrap().remove(&goal_id.0);
206            ch.results.lock().unwrap().insert(
207                goal_id.0,
208                ActionResult {
209                    status: GoalStatus::Canceled,
210                    value: None,
211                    error: None,
212                },
213            );
214            ch.result_timestamps
215                .lock()
216                .unwrap()
217                .insert(goal_id.0, Timestamp::now());
218        }
219        Ok(())
220    }
221
222    fn close(&mut self) -> Result<(), RtError> {
223        self.closed = true;
224        Ok(())
225    }
226}