enet_client/
cmd.rs

1use crate::{
2  conn::{Connection, RecvError, SendError},
3  ConnectError,
4};
5use enet_proto::{
6  GetChannelInfoAllReq, GetChannelInfoAllRes, ItemSetValue, ItemValueRes, ItemValueSetReq,
7  ProjectListReq, ProjectListRes, RequestEnvelope, RequestType, Response, VersionReq, VersionRes,
8};
9use paste::paste;
10use std::{
11  convert::{TryFrom, TryInto},
12  fmt,
13  time::Duration,
14};
15use thiserror::Error;
16use tokio::{
17  net::ToSocketAddrs,
18  sync::{mpsc, oneshot},
19};
20use tracing::{event, Level};
21
22struct CommandActor<A>
23where
24  A: ToSocketAddrs + Clone + Send + Sync,
25{
26  conn: Option<Connection>,
27  addr: A,
28  recv: mpsc::Receiver<ActorMessage>,
29  response_listener: Option<ResponseListener>,
30}
31
32enum ActorMessage {
33  Send(RequestEnvelope, ResponseListener),
34}
35
36macro_rules! define_response_listener {
37  ($($res:ident($ty:ty)),*$(,)?) => {
38    enum ResponseListener {
39      $(
40        $res(oneshot::Sender<Result<$ty, CommandError>>),
41      )*
42    }
43
44    impl ResponseListener {
45      fn accept(self, res: Response) -> Result<(), (Option<Self>, Response)> {
46        match self {
47          $(
48            Self::$res(sender) => {
49              match res.try_into() {
50                Ok(msg) => {
51                  match sender.send(Ok(msg)) {
52                    Ok(()) => Ok(()),
53                    Err(msg) => Err((None, msg.unwrap().into())),
54                  }
55                }
56
57                Err(res) => Err((Some(Self::$res(sender)), res))
58              }
59            }
60          )*
61        }
62      }
63
64      fn error(self, error: CommandError) -> Result<(), CommandError> {
65        match self {
66          $(
67            Self::$res(sender) => sender.send(Err(error)).map_err(Result::unwrap_err),
68          )*
69        }
70      }
71    }
72
73    $(
74      impl From<oneshot::Sender<Result<$ty, CommandError>>> for ResponseListener {
75        #[inline]
76        fn from(sender: oneshot::Sender<Result<$ty, CommandError>>) -> Self {
77          Self::$res(sender)
78        }
79      }
80    )*
81  };
82}
83
84define_response_listener! {
85  Version(VersionRes),
86  GetChannelInfoAll(GetChannelInfoAllRes),
87  GetProject(ProjectListRes),
88  ItemValue(ItemValueRes),
89}
90
91impl<A> CommandActor<A>
92where
93  A: ToSocketAddrs + Clone + Send + Sync,
94{
95  fn new(conn: Connection, addr: A, recv: mpsc::Receiver<ActorMessage>) -> Self {
96    Self {
97      conn: Some(conn),
98      addr,
99      recv,
100      response_listener: None,
101    }
102  }
103
104  async fn run(mut self) {
105    loop {
106      let result = if let Some(conn) = self.conn.as_mut() {
107        let sleep = tokio::time::sleep(Duration::from_secs(15));
108
109        tokio::select! {
110          enet = conn.recv() => self.handle_enet(enet).await,
111          cmd = self.recv.recv() => self.handle_cmd(cmd).await,
112          _ = sleep => self.sleep().await,
113        }
114      } else {
115        let cmd = self.recv.recv().await;
116        self.handle_cmd(cmd).await
117      };
118
119      match result {
120        Ok(()) => (),
121        Err(()) => return,
122      }
123    }
124  }
125
126  async fn handle_enet(&mut self, msg: Result<Response, RecvError>) -> Result<(), ()> {
127    let msg = match msg {
128      Ok(v) => v,
129      Err(e) => {
130        event!(target: "enet-client::cmd", Level::ERROR, error = ?e, "connection closed");
131        if let Some(listener) = self.response_listener.take() {
132          let _ = listener.error(ConnectionClosed.into());
133        }
134        return Err(());
135      }
136    };
137
138    event!(target: "enet-client::cmd", Level::INFO, message.kind = ?msg.kind(), "received message");
139    match self.response_listener.take() {
140      None => {
141        event!(target: "enet-client::cmd", Level::WARN, message.kind = ?msg.kind(), "no listener available");
142        Ok(())
143      }
144
145      Some(listener) => match listener.accept(msg) {
146        Ok(()) => Ok(()),
147        Err((None, msg)) => {
148          event!(target: "enet-client::cmd", Level::INFO, message.kind = ?msg.kind(), "listener closed");
149          Ok(())
150        }
151
152        Err((Some(listener), msg)) => {
153          event!(target: "enet-client::cmd", Level::WARN, message.kind = ?msg.kind(), "wrong listener available");
154          let _ = listener.error(msg.into());
155          Ok(())
156        }
157      },
158    }
159  }
160
161  async fn handle_cmd(&mut self, msg: Option<ActorMessage>) -> Result<(), ()> {
162    let msg = if let Some(msg) = msg {
163      msg
164    } else {
165      return Err(());
166    };
167
168    match msg {
169      ActorMessage::Send(req, res) => {
170        self.response_listener = Some(res);
171        let conn = match self.conn.as_mut() {
172          Some(conn) => conn,
173          None => {
174            event!(target: "enet-client::cmd", Level::INFO, "Establishing new connection to eNet gateway.");
175            let conn = match Connection::new(self.addr.clone()).await {
176              Ok(conn) => conn,
177              Err(e) => {
178                event!(target: "enet-client::cmd", Level::ERROR, "Failed to establish connection to eNet gateway: {:?}", e);
179                return Err(());
180              }
181            };
182
183            self.conn = Some(conn);
184            self.conn.as_mut().unwrap()
185          }
186        };
187
188        let kind = req.body.kind();
189        event!(target: "enet-client::cmd", Level::INFO, message.kind = ?kind, "Sending message");
190        match conn.send(&req).await {
191          Ok(()) => (),
192          Err(e) => {
193            event!(target: "enet-client::cmd", Level::WARN, message.kind = ?kind, "Message failed to send");
194            if let Some(listener) = self.response_listener.take() {
195              let _ = listener.error(e.into());
196            }
197          }
198        }
199      }
200    }
201
202    Ok(())
203  }
204
205  async fn sleep(&mut self) -> Result<(), ()> {
206    event!(target: "enet-client::cmd", Level::INFO, "Closing command connection after 15 seconds of innactivity.");
207    self.conn.take(); // drop connection
208
209    Ok(())
210  }
211}
212
213trait Command: RequestType {
214  type Response: TryFrom<Response>;
215}
216
217pub(crate) struct CommandHandler {
218  sender: mpsc::Sender<ActorMessage>,
219}
220
221impl CommandHandler {
222  pub(crate) async fn new(
223    addr: impl ToSocketAddrs + Clone + Send + Sync + 'static,
224  ) -> Result<Self, ConnectError> {
225    let conn = Connection::new(addr.clone()).await?;
226    let (sender, recv) = mpsc::channel(10);
227    tokio::spawn(CommandActor::new(conn, addr, recv).run());
228
229    Ok(Self { sender })
230  }
231
232  async fn send<C>(&mut self, command: C) -> Result<C::Response, CommandError>
233  where
234    C: Command,
235    oneshot::Sender<Result<C::Response, CommandError>>: Into<ResponseListener>,
236  {
237    let envelope = RequestEnvelope::new(command);
238    let (sender, receiver) = oneshot::channel::<Result<C::Response, CommandError>>();
239    let msg = ActorMessage::Send(envelope, sender.into());
240    self.sender.send(msg).await?;
241
242    Ok(receiver.await??)
243  }
244}
245
246macro_rules! define_command {
247  ($name:ident$((
248    $($arg_i:ident : $arg_t:ty),*$(,)?
249  ))? => $req:ty => $res:ty) => {
250    impl Command for $req {
251      type Response = $res;
252    }
253
254    paste! {
255      #[non_exhaustive]
256      #[derive(Debug, Error)]
257      pub enum [<$name:camel CommandError>] {
258        Command(#[from] CommandError),
259      }
260
261      impl fmt::Display for [<$name:camel CommandError>] {
262        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263          f.write_str("Failed to send '")?;
264          f.write_str(stringify!($name))?;
265          f.write_str("' command.")
266        }
267      }
268
269      impl CommandHandler {
270        #[allow(dead_code)]
271        pub(crate) async fn $name(
272          &mut self,
273          $($($arg_i: $arg_t,)*)?
274        ) -> Result<$res, [<$name:camel CommandError>]> {
275          let req = $req::new ($(
276            $($arg_i,)*
277          )?);
278
279          Ok(self.send(req).await?)
280        }
281      }
282    }
283  };
284}
285
286define_command!(get_version => VersionReq => VersionRes);
287define_command!(get_channel_info => GetChannelInfoAllReq => GetChannelInfoAllRes);
288define_command!(get_project => ProjectListReq => ProjectListRes);
289define_command!(set_values(values: Vec<ItemSetValue>) => ItemValueSetReq => ItemValueRes);
290
291#[non_exhaustive]
292#[derive(Debug, Error)]
293#[error("Failed to send command.")]
294pub enum CommandError {
295  SendError(#[from] SendError),
296
297  RecvError(#[from] RecvError),
298
299  ConnectionClosed(#[from] ConnectionClosed),
300
301  NoResponse(#[from] NoResponse),
302
303  WrongResponse(Response),
304}
305
306impl From<Response> for CommandError {
307  fn from(r: Response) -> CommandError {
308    CommandError::WrongResponse(r)
309  }
310}
311
312impl From<mpsc::error::SendError<ActorMessage>> for CommandError {
313  #[inline]
314  fn from(_: mpsc::error::SendError<ActorMessage>) -> Self {
315    CommandError::ConnectionClosed(ConnectionClosed)
316  }
317}
318
319impl From<oneshot::error::RecvError> for CommandError {
320  #[inline]
321  fn from(_: oneshot::error::RecvError) -> Self {
322    CommandError::NoResponse(NoResponse)
323  }
324}
325
326#[derive(Debug, Error)]
327#[error("Connection closed.")]
328pub struct ConnectionClosed;
329
330#[derive(Debug, Error)]
331#[error("Actor did not respond closed.")]
332pub struct NoResponse;