1use anyhow::{bail, Result};
49use serde::{de::DeserializeOwned, Serialize};
50use tokio::{
51 io::{AsyncReadExt, AsyncWriteExt},
52 net::{TcpListener, TcpStream},
53 sync::watch::{self, Receiver, Sender},
54 task::JoinHandle,
55};
56
57const REMOTEXY_PACKAGE_START_BYTE: u8 = 0x55;
58const HEADER_LEN: usize = 3;
59
60pub struct RemoteXY<I, O, const ISIZE: usize, const OSIZE: usize> {
62 input_rx: Receiver<[u8; ISIZE]>,
65 output_tx: Sender<[u8; OSIZE]>,
66 is_connected_rx: Receiver<bool>,
67 server_handle: JoinHandle<()>,
68 phantom: std::marker::PhantomData<(I, O)>,
69}
70
71#[macro_export]
75macro_rules! remote_xy {
76 ( $i:ty , $o:ty , $addr:expr, $conf:expr) => {{
77 const ISIZE: usize = std::mem::size_of::<$i>();
78 const OSIZE: usize = std::mem::size_of::<$o>();
79 RemoteXY::<$i, $o, ISIZE, OSIZE>::new($addr, $conf)
80 }};
81}
82
83impl<I, O, const ISIZE: usize, const OSIZE: usize> RemoteXY<I, O, ISIZE, OSIZE>
84where
85 I: Serialize + DeserializeOwned + Default,
86 O: Serialize + DeserializeOwned + Default,
87{
88 pub async fn new(server_addr: &str, conf_buffer: &[u8]) -> Result<Self> {
93 let input_len;
95 let output_len;
96 let conf_len;
97 let gui_config: Vec<u8>;
98 if conf_buffer[0] == 0xff {
99 input_len = conf_buffer[1] as usize | ((conf_buffer[2] as usize) << 8);
100 output_len = conf_buffer[3] as usize | ((conf_buffer[4] as usize) << 8);
101 conf_len = conf_buffer[5] as usize | ((conf_buffer[6] as usize) << 8);
102 gui_config = conf_buffer[7..].to_vec();
103 } else {
104 input_len = conf_buffer[0] as usize;
105 output_len = conf_buffer[1] as usize;
106 conf_len = conf_buffer[2] as usize | ((conf_buffer[3] as usize) << 8);
107 gui_config = conf_buffer[4..].to_vec();
108 };
109 if conf_len != gui_config.len() {
110 bail!("Config buffer length mismatch");
111 }
112 if input_len != ISIZE {
113 bail!("Input data length mismatch");
114 }
115 if output_len != OSIZE {
116 bail!("Output data length mismatch");
117 }
118
119 let mut input_default = [0_u8; ISIZE];
121 input_default.copy_from_slice(&bincode::serialize(&I::default())?);
122 let mut output_default = [0_u8; OSIZE];
123 output_default.copy_from_slice(&bincode::serialize(&O::default())?);
124 let (input_tx, input_rx) = watch::channel(input_default);
125 let (output_tx, output_rx) = watch::channel(output_default);
126 let (is_connected_tx, is_connected_rx) = watch::channel(false);
127
128 let mut server = RemoteXYServer {
130 server_addr: server_addr.into(),
131 gui_config,
132 input_tx,
133 output_rx,
134 is_connected_tx,
135 };
136 let server_handle =
137 tokio::spawn(async move { server.run().await.expect("Unexpected error in server") });
138
139 Ok(Self {
140 input_rx,
141 output_tx,
142 is_connected_rx,
143 server_handle,
144 phantom: std::marker::PhantomData,
145 })
146 }
147
148 pub fn get_input(&self) -> I {
150 bincode::deserialize(&*self.input_rx.borrow()).expect("Failed to deserialize input data")
151 }
152
153 pub fn set_output(&self, data: &O) {
155 let mut buffer = [0_u8; OSIZE];
156 buffer
157 .copy_from_slice(&bincode::serialize(&data).expect("Failed to serialize output data"));
158 self.output_tx
159 .send(buffer)
160 .expect("Failed to send output data");
161 }
162
163 pub fn is_connected(&self) -> bool {
165 *self.is_connected_rx.borrow()
166 }
167}
168
169impl<I, O, const ISIZE: usize, const OSIZE: usize> Drop for RemoteXY<I, O, ISIZE, OSIZE> {
170 fn drop(&mut self) {
171 log::info!("Closing server");
172 self.server_handle.abort();
173 }
174}
175
176struct RemoteXYServer<const ISIZE: usize, const OSIZE: usize> {
177 server_addr: String,
178 gui_config: Vec<u8>,
179 input_tx: Sender<[u8; ISIZE]>,
180 output_rx: Receiver<[u8; OSIZE]>,
181 is_connected_tx: Sender<bool>,
182}
183
184impl<const ISIZE: usize, const OSIZE: usize> RemoteXYServer<ISIZE, OSIZE> {
185 pub async fn run(&mut self) -> Result<()> {
186 log::info!("Starting server at {}", &self.server_addr);
187 let tcp_listener = TcpListener::bind(&self.server_addr).await?;
188
189 loop {
190 let (mut stream, _) = tcp_listener.accept().await?;
191 log::debug!("New connection with {}", stream.peer_addr()?);
192 self.is_connected_tx.send(true)?;
193
194 loop {
195 if self.handle_connection(&mut stream).await.is_err() {
196 self.is_connected_tx.send(false)?;
197 log::info!("Connection closed, listening for new connections...");
198 break;
199 }
200 }
201 }
202 }
203
204 async fn handle_connection(&mut self, stream: &mut TcpStream) -> Result<()> {
205 let mut header = [0; HEADER_LEN];
207 stream.read_exact(&mut header).await?;
208 if header[0] != REMOTEXY_PACKAGE_START_BYTE {
209 bail!("Invalid package start byte");
210 }
211 let package_length = header[1] as u16 | ((header[2] as u16) << 8);
212
213 let mut body = vec![0; package_length as usize - HEADER_LEN];
215 stream.read_exact(&mut body).await?;
216
217 if !check_crc(&header, &body) {
218 log::warn!("Invalid CRC, package is discarded");
219 return Ok(());
220 }
221 let payload = &body[0..body.len() - 2];
222 let response = self.parse_payload(payload).await;
223 stream.write_all(&response).await?;
224 stream.flush().await?;
225
226 Ok(())
227 }
228
229 async fn parse_payload(&mut self, payload: &[u8]) -> Vec<u8> {
230 let command = payload[0];
231 match command {
232 0x00 => self.gui_conf_response(),
233 0x40 => self.data_response(),
234 0x80 => {
235 let mut buffer = [0_u8; ISIZE];
236 buffer.copy_from_slice(&payload[1..]);
237 self.input_tx.send(buffer).unwrap();
238
239 self.ack_input()
240 }
241 0xC0 => self.output_data_response(),
242 _ => {
243 log::warn!("Received unknown command: 0x{command:X}");
244 vec![]
245 }
246 }
247 }
248
249 fn data_response(&self) -> Vec<u8> {
250 let mut input_and_output_buffer = Vec::from(*self.input_tx.borrow());
251 input_and_output_buffer.extend_from_slice(&*self.output_rx.borrow());
252 create_response(0x40, &input_and_output_buffer)
253 }
254
255 fn output_data_response(&self) -> Vec<u8> {
256 let output_buffer = *self.output_rx.borrow();
257 create_response(0xC0, &output_buffer)
258 }
259
260 fn ack_input(&self) -> Vec<u8> {
261 create_response(0x80, &[])
262 }
263
264 fn gui_conf_response(&self) -> Vec<u8> {
265 create_response(0x00, &self.gui_config)
266 }
267}
268
269fn create_response(cmd: u8, buf: &[u8]) -> Vec<u8> {
270 let package_length = buf.len() + 6;
271 let mut send_buffer = vec![0; package_length];
272
273 send_buffer[0] = REMOTEXY_PACKAGE_START_BYTE;
274 send_buffer[1] = package_length as u8;
275 send_buffer[2] = (package_length >> 8) as u8;
276 send_buffer[3] = cmd;
277 send_buffer[4..package_length - 2].copy_from_slice(buf);
278
279 let crc = calc_crc(&send_buffer[0..package_length - 2]);
280 send_buffer[package_length - 2] = crc as u8;
281 send_buffer[package_length - 1] = (crc >> 8) as u8;
282 send_buffer
283}
284
285fn check_crc(header: &[u8], body: &[u8]) -> bool {
286 let crc = body[body.len() - 2] as u16 | ((body[body.len() - 1] as u16) << 8);
287 let crc_calc = calc_crc(header);
288 let crc_calc = calc_crc_with_start_value(&body[0..body.len() - 2], crc_calc);
289 crc == crc_calc
290}
291fn calc_crc(buf: &[u8]) -> u16 {
292 const REMOTEXY_INIT_CRC: u16 = 0xFFFF;
293 calc_crc_with_start_value(buf, REMOTEXY_INIT_CRC)
294}
295fn calc_crc_with_start_value(buf: &[u8], start_val: u16) -> u16 {
296 let mut crc: u16 = start_val;
297 for b in buf {
298 crc ^= *b as u16;
299 for _ in 0..8 {
300 if (crc & 1) != 0 {
301 crc = ((crc) >> 1) ^ 0xA001;
302 } else {
303 crc >>= 1;
304 }
305 }
306 }
307 crc
308}
309
310#[cfg(test)]
311mod tests {
312 use serde::{Deserialize, Serialize};
313 use tokio::{
314 io::{AsyncReadExt, AsyncWriteExt},
315 net::TcpStream,
316 time::timeout,
317 };
318
319 use crate::{check_crc, RemoteXY};
320 use std::time::Duration;
321
322 const CONF_BUF: &[u8] = &[
323 255, 2, 0, 2, 0, 59, 0, 16, 31, 1, 4, 0, 44, 10, 10, 78, 2, 26, 2, 0, 9, 77, 22, 11, 2, 26,
324 31, 31, 79, 78, 0, 79, 70, 70, 0, 72, 12, 9, 16, 23, 23, 2, 26, 140, 38, 0, 0, 0, 0, 0, 0,
325 200, 66, 0, 0, 0, 0, 70, 16, 16, 63, 9, 9, 26, 37, 0,
326 ];
327
328 #[derive(Serialize, Deserialize, Default)]
329 #[repr(C)]
330 struct InputData {
331 slider_1: i8, switch_1: u8, }
335
336 #[derive(Serialize, Deserialize, Default)]
337 #[repr(C)]
338 struct OutputData {
339 circularbar_1: i8, led_1: u8, }
343
344 #[tokio::test]
345 async fn basic() {
346 let remotexy = remote_xy!(InputData, OutputData, "127.0.0.1:6377", CONF_BUF)
347 .await
348 .unwrap();
349
350 tokio::time::sleep(Duration::from_millis(50)).await;
352 let mut stream = TcpStream::connect("127.0.0.1:6377").await.unwrap();
353
354 stream.write_all(&[85, 6, 0, 0, 241, 233]).await.unwrap();
356 stream.flush().await.unwrap();
357
358 let mut response_buffer = [0_u8; CONF_BUF.len() - 7 + 6];
360 stream.read_exact(&mut response_buffer).await.unwrap();
361 assert_eq!(response_buffer[4..response_buffer.len() - 2], CONF_BUF[7..]);
362
363 stream
365 .write_all(&[85, 8, 0, 128, 57, 1, 63, 167])
366 .await
367 .unwrap();
368 stream.flush().await.unwrap();
369
370 let mut response_buffer = [0_u8; 6];
372 stream.read_exact(&mut response_buffer).await.unwrap();
373 assert_eq!(response_buffer, [85, 6, 0, 128, 240, 73]);
374
375 let expected_input = InputData {
377 slider_1: 57,
378 switch_1: 1,
379 };
380 let actual_input = remotexy.get_input();
381 assert_eq!(actual_input.slider_1, expected_input.slider_1);
382 assert_eq!(actual_input.switch_1, expected_input.switch_1);
383
384 let output = OutputData {
386 led_1: 1,
387 circularbar_1: 57,
388 };
389 remotexy.set_output(&output);
390 stream.write_all(&[85, 6, 0, 192, 241, 185]).await.unwrap();
391 stream.flush().await.unwrap();
392
393 let mut response_buffer = [0_u8; 8];
395 stream.read_exact(&mut response_buffer).await.unwrap();
396 assert_eq!(response_buffer, [85, 8, 0, 192, 57, 1, 62, 115]);
397 }
398
399 #[tokio::test]
400 async fn server_disconnect() {
401 let remotexy = remote_xy!(InputData, OutputData, "127.0.0.1:6378", CONF_BUF)
402 .await
403 .unwrap();
404
405 tokio::time::sleep(Duration::from_millis(10)).await;
407 let mut stream = TcpStream::connect("127.0.0.1:6378").await.unwrap();
408
409 tokio::time::sleep(Duration::from_millis(10)).await;
410 assert!(remotexy.is_connected());
411
412 drop(remotexy);
414
415 let mut buffer = [0_u8; 256];
416 match timeout(Duration::from_secs(1), stream.read(&mut buffer)).await {
417 Ok(Ok(size)) => assert_eq!(size, 0),
418 Ok(Err(err)) => panic!("Server should have disconnected, error: {err}"),
419 Err(err) => panic!("Server should have disconnected, error: {err}"),
420 };
421 }
422
423 #[tokio::test]
424 async fn client_disconnect() {
425 let remotexy = remote_xy!(InputData, OutputData, "127.0.0.1:6379", CONF_BUF)
426 .await
427 .unwrap();
428
429 tokio::time::sleep(Duration::from_millis(10)).await;
431 let stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
432
433 tokio::time::sleep(Duration::from_millis(10)).await;
434 assert!(remotexy.is_connected());
435
436 drop(stream);
437 tokio::time::sleep(Duration::from_millis(10)).await;
438 assert!(!remotexy.is_connected());
439 }
440
441 #[test]
442 fn crc() {
443 let header = [85, 6, 0];
445 let body = [0, 241, 233];
446 assert!(check_crc(&header, &body));
447
448 let header = [85, 6, 0];
449 let body = [192, 241, 185];
450 assert!(check_crc(&header, &body));
451
452 let body = [192, 242, 185];
454 assert!(!check_crc(&header, &body));
455 }
456}