abbegm/
sync_peer.rs

1use std::net::SocketAddr;
2
3use prost::Message;
4use std::net::UdpSocket;
5
6use crate::InvalidMessageError;
7use crate::ReceiveError;
8use crate::SendError;
9use crate::msg::EgmRobot;
10use crate::msg::EgmSensor;
11
12#[derive(Debug)]
13/// Blocking EGM peer for sending and receiving messages over UDP.
14pub struct EgmPeer {
15	socket: UdpSocket,
16}
17
18impl EgmPeer {
19	/// Wrap an existing UDP socket in a peer.
20	///
21	/// If you want to use the [`EgmPeer::recv`] and [`EgmPeer::send`] functions,
22	/// you should use an already connected socket.
23	/// Otherwise, you can only use [`EgmPeer::recv_from`] and [`EgmPeer::send_to`].
24	pub fn new(socket: UdpSocket) -> Self {
25		Self { socket }
26	}
27
28	/// Create an EGM peer on a newly bound UDP socket.
29	///
30	/// The socket will not be connected to a remote peer,
31	/// so you can only use [`EgmPeer::recv_from`] and [`EgmPeer::send_to`].
32	pub fn bind(addrs: impl std::net::ToSocketAddrs) -> std::io::Result<Self> {
33		Ok(Self::new(UdpSocket::bind(addrs)?))
34	}
35
36	/// Get a shared reference to the inner socket.
37	pub fn socket(&self) -> &UdpSocket {
38		&self.socket
39	}
40
41	/// Get an exclusive reference to the inner socket.
42	pub fn socket_mut(&mut self) -> &mut UdpSocket {
43		&mut self.socket
44	}
45
46	/// Consume self and get the inner socket.
47	pub fn into_socket(self) -> UdpSocket {
48		self.socket
49	}
50
51	/// Receive a message from the remote address to which the inner socket is connected.
52	///
53	/// To use this function, you must pass an already connected socket to [`EgmPeer::new`].
54	/// If the peer was created with an unconnected socket, this function will panic.
55	pub fn recv(&mut self) -> Result<EgmRobot, ReceiveError> {
56		let mut buffer = vec![0u8; 1024];
57		let bytes_received = self.socket.recv(&mut buffer)?;
58		Ok(EgmRobot::decode(&buffer[..bytes_received])?)
59	}
60
61	/// Receive a message from any remote address.
62	pub fn recv_from(&mut self) -> Result<(EgmRobot, SocketAddr), ReceiveError> {
63		let mut buffer = vec![0u8; 1024];
64		let (bytes_received, sender) = self.socket.recv_from(&mut buffer)?;
65		Ok((EgmRobot::decode(&buffer[..bytes_received])?, sender))
66	}
67
68	/// Purge all messages from the socket read queue.
69	///
70	/// Useful to ignore old messages when the socket has been left unpolled for a while.
71	///
72	/// This will leave the socket in blocking mode when the purging is done.
73	pub fn purge_recv_queue(&mut self) -> std::io::Result<()> {
74		self.socket.set_nonblocking(true)?;
75
76		let mut buffer = vec![0u8; 1024];
77		let read_loop_result = loop {
78			match self.socket.recv_from(&mut buffer) {
79				Err(e) => {
80					if e.kind() == std::io::ErrorKind::WouldBlock {
81						break Ok(());
82					} else {
83						break Err(e);
84					}
85				}
86				Ok(_transferred) => (),
87			}
88		};
89
90		// Restore blocking mode, but make sure we return potential errors from the read loop
91		// before errors in restoring blocking mode.
92		let restore_blocking_result = self.socket.set_nonblocking(false);
93		read_loop_result?;
94		restore_blocking_result
95	}
96
97	/// Send a message to the remote address to which the inner socket is connected.
98	///
99	/// To use this function, you must pass an already connected socket to [`EgmPeer::new`].
100	/// If the peer was created with an unconnected socket, this function will panic.
101	pub fn send(&mut self, msg: &EgmSensor) -> Result<(), SendError> {
102		InvalidMessageError::check_sensor_msg(msg)?;
103		let buffer = crate::encode_to_vec(msg)?;
104		let bytes_sent = self.socket.send(&buffer)?;
105		crate::error::check_transfer(bytes_sent, buffer.len())?;
106		Ok(())
107	}
108
109	/// Send a message to the specified address.
110	pub fn send_to(&mut self, msg: &EgmSensor, target: &SocketAddr) -> Result<(), SendError> {
111		InvalidMessageError::check_sensor_msg(msg)?;
112		let buffer = crate::encode_to_vec(msg)?;
113		let bytes_sent = self.socket.send_to(&buffer, target)?;
114		crate::error::check_transfer(bytes_sent, buffer.len())?;
115		Ok(())
116	}
117}