1use crate::{
2 conn::{Connection, RecvError, SendError},
3 ConnectError,
4};
5use enet_proto::{
6 GetChannelInfoAllReq, GetChannelInfoAllRes, ItemSetValue, ItemValueRes, ItemValueSetReq,
7 ProjectListReq, ProjectListRes, RequestEnvelope, RequestType, Response, VersionReq, VersionRes,
8};
9use paste::paste;
10use std::{
11 convert::{TryFrom, TryInto},
12 fmt,
13 time::Duration,
14};
15use thiserror::Error;
16use tokio::{
17 net::ToSocketAddrs,
18 sync::{mpsc, oneshot},
19};
20use tracing::{event, Level};
21
22struct CommandActor<A>
23where
24 A: ToSocketAddrs + Clone + Send + Sync,
25{
26 conn: Option<Connection>,
27 addr: A,
28 recv: mpsc::Receiver<ActorMessage>,
29 response_listener: Option<ResponseListener>,
30}
31
32enum ActorMessage {
33 Send(RequestEnvelope, ResponseListener),
34}
35
36macro_rules! define_response_listener {
37 ($($res:ident($ty:ty)),*$(,)?) => {
38 enum ResponseListener {
39 $(
40 $res(oneshot::Sender<Result<$ty, CommandError>>),
41 )*
42 }
43
44 impl ResponseListener {
45 fn accept(self, res: Response) -> Result<(), (Option<Self>, Response)> {
46 match self {
47 $(
48 Self::$res(sender) => {
49 match res.try_into() {
50 Ok(msg) => {
51 match sender.send(Ok(msg)) {
52 Ok(()) => Ok(()),
53 Err(msg) => Err((None, msg.unwrap().into())),
54 }
55 }
56
57 Err(res) => Err((Some(Self::$res(sender)), res))
58 }
59 }
60 )*
61 }
62 }
63
64 fn error(self, error: CommandError) -> Result<(), CommandError> {
65 match self {
66 $(
67 Self::$res(sender) => sender.send(Err(error)).map_err(Result::unwrap_err),
68 )*
69 }
70 }
71 }
72
73 $(
74 impl From<oneshot::Sender<Result<$ty, CommandError>>> for ResponseListener {
75 #[inline]
76 fn from(sender: oneshot::Sender<Result<$ty, CommandError>>) -> Self {
77 Self::$res(sender)
78 }
79 }
80 )*
81 };
82}
83
84define_response_listener! {
85 Version(VersionRes),
86 GetChannelInfoAll(GetChannelInfoAllRes),
87 GetProject(ProjectListRes),
88 ItemValue(ItemValueRes),
89}
90
91impl<A> CommandActor<A>
92where
93 A: ToSocketAddrs + Clone + Send + Sync,
94{
95 fn new(conn: Connection, addr: A, recv: mpsc::Receiver<ActorMessage>) -> Self {
96 Self {
97 conn: Some(conn),
98 addr,
99 recv,
100 response_listener: None,
101 }
102 }
103
104 async fn run(mut self) {
105 loop {
106 let result = if let Some(conn) = self.conn.as_mut() {
107 let sleep = tokio::time::sleep(Duration::from_secs(15));
108
109 tokio::select! {
110 enet = conn.recv() => self.handle_enet(enet).await,
111 cmd = self.recv.recv() => self.handle_cmd(cmd).await,
112 _ = sleep => self.sleep().await,
113 }
114 } else {
115 let cmd = self.recv.recv().await;
116 self.handle_cmd(cmd).await
117 };
118
119 match result {
120 Ok(()) => (),
121 Err(()) => return,
122 }
123 }
124 }
125
126 async fn handle_enet(&mut self, msg: Result<Response, RecvError>) -> Result<(), ()> {
127 let msg = match msg {
128 Ok(v) => v,
129 Err(e) => {
130 event!(target: "enet-client::cmd", Level::ERROR, error = ?e, "connection closed");
131 if let Some(listener) = self.response_listener.take() {
132 let _ = listener.error(ConnectionClosed.into());
133 }
134 return Err(());
135 }
136 };
137
138 event!(target: "enet-client::cmd", Level::INFO, message.kind = ?msg.kind(), "received message");
139 match self.response_listener.take() {
140 None => {
141 event!(target: "enet-client::cmd", Level::WARN, message.kind = ?msg.kind(), "no listener available");
142 Ok(())
143 }
144
145 Some(listener) => match listener.accept(msg) {
146 Ok(()) => Ok(()),
147 Err((None, msg)) => {
148 event!(target: "enet-client::cmd", Level::INFO, message.kind = ?msg.kind(), "listener closed");
149 Ok(())
150 }
151
152 Err((Some(listener), msg)) => {
153 event!(target: "enet-client::cmd", Level::WARN, message.kind = ?msg.kind(), "wrong listener available");
154 let _ = listener.error(msg.into());
155 Ok(())
156 }
157 },
158 }
159 }
160
161 async fn handle_cmd(&mut self, msg: Option<ActorMessage>) -> Result<(), ()> {
162 let msg = if let Some(msg) = msg {
163 msg
164 } else {
165 return Err(());
166 };
167
168 match msg {
169 ActorMessage::Send(req, res) => {
170 self.response_listener = Some(res);
171 let conn = match self.conn.as_mut() {
172 Some(conn) => conn,
173 None => {
174 event!(target: "enet-client::cmd", Level::INFO, "Establishing new connection to eNet gateway.");
175 let conn = match Connection::new(self.addr.clone()).await {
176 Ok(conn) => conn,
177 Err(e) => {
178 event!(target: "enet-client::cmd", Level::ERROR, "Failed to establish connection to eNet gateway: {:?}", e);
179 return Err(());
180 }
181 };
182
183 self.conn = Some(conn);
184 self.conn.as_mut().unwrap()
185 }
186 };
187
188 let kind = req.body.kind();
189 event!(target: "enet-client::cmd", Level::INFO, message.kind = ?kind, "Sending message");
190 match conn.send(&req).await {
191 Ok(()) => (),
192 Err(e) => {
193 event!(target: "enet-client::cmd", Level::WARN, message.kind = ?kind, "Message failed to send");
194 if let Some(listener) = self.response_listener.take() {
195 let _ = listener.error(e.into());
196 }
197 }
198 }
199 }
200 }
201
202 Ok(())
203 }
204
205 async fn sleep(&mut self) -> Result<(), ()> {
206 event!(target: "enet-client::cmd", Level::INFO, "Closing command connection after 15 seconds of innactivity.");
207 self.conn.take(); Ok(())
210 }
211}
212
213trait Command: RequestType {
214 type Response: TryFrom<Response>;
215}
216
217pub(crate) struct CommandHandler {
218 sender: mpsc::Sender<ActorMessage>,
219}
220
221impl CommandHandler {
222 pub(crate) async fn new(
223 addr: impl ToSocketAddrs + Clone + Send + Sync + 'static,
224 ) -> Result<Self, ConnectError> {
225 let conn = Connection::new(addr.clone()).await?;
226 let (sender, recv) = mpsc::channel(10);
227 tokio::spawn(CommandActor::new(conn, addr, recv).run());
228
229 Ok(Self { sender })
230 }
231
232 async fn send<C>(&mut self, command: C) -> Result<C::Response, CommandError>
233 where
234 C: Command,
235 oneshot::Sender<Result<C::Response, CommandError>>: Into<ResponseListener>,
236 {
237 let envelope = RequestEnvelope::new(command);
238 let (sender, receiver) = oneshot::channel::<Result<C::Response, CommandError>>();
239 let msg = ActorMessage::Send(envelope, sender.into());
240 self.sender.send(msg).await?;
241
242 Ok(receiver.await??)
243 }
244}
245
246macro_rules! define_command {
247 ($name:ident$((
248 $($arg_i:ident : $arg_t:ty),*$(,)?
249 ))? => $req:ty => $res:ty) => {
250 impl Command for $req {
251 type Response = $res;
252 }
253
254 paste! {
255 #[non_exhaustive]
256 #[derive(Debug, Error)]
257 pub enum [<$name:camel CommandError>] {
258 Command(#[from] CommandError),
259 }
260
261 impl fmt::Display for [<$name:camel CommandError>] {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.write_str("Failed to send '")?;
264 f.write_str(stringify!($name))?;
265 f.write_str("' command.")
266 }
267 }
268
269 impl CommandHandler {
270 #[allow(dead_code)]
271 pub(crate) async fn $name(
272 &mut self,
273 $($($arg_i: $arg_t,)*)?
274 ) -> Result<$res, [<$name:camel CommandError>]> {
275 let req = $req::new ($(
276 $($arg_i,)*
277 )?);
278
279 Ok(self.send(req).await?)
280 }
281 }
282 }
283 };
284}
285
286define_command!(get_version => VersionReq => VersionRes);
287define_command!(get_channel_info => GetChannelInfoAllReq => GetChannelInfoAllRes);
288define_command!(get_project => ProjectListReq => ProjectListRes);
289define_command!(set_values(values: Vec<ItemSetValue>) => ItemValueSetReq => ItemValueRes);
290
291#[non_exhaustive]
292#[derive(Debug, Error)]
293#[error("Failed to send command.")]
294pub enum CommandError {
295 SendError(#[from] SendError),
296
297 RecvError(#[from] RecvError),
298
299 ConnectionClosed(#[from] ConnectionClosed),
300
301 NoResponse(#[from] NoResponse),
302
303 WrongResponse(Response),
304}
305
306impl From<Response> for CommandError {
307 fn from(r: Response) -> CommandError {
308 CommandError::WrongResponse(r)
309 }
310}
311
312impl From<mpsc::error::SendError<ActorMessage>> for CommandError {
313 #[inline]
314 fn from(_: mpsc::error::SendError<ActorMessage>) -> Self {
315 CommandError::ConnectionClosed(ConnectionClosed)
316 }
317}
318
319impl From<oneshot::error::RecvError> for CommandError {
320 #[inline]
321 fn from(_: oneshot::error::RecvError) -> Self {
322 CommandError::NoResponse(NoResponse)
323 }
324}
325
326#[derive(Debug, Error)]
327#[error("Connection closed.")]
328pub struct ConnectionClosed;
329
330#[derive(Debug, Error)]
331#[error("Actor did not respond closed.")]
332pub struct NoResponse;