1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::net::SocketAddr;

use prost::Message;
use std::net::UdpSocket;

use crate::InvalidMessageError;
use crate::ReceiveError;
use crate::SendError;
use crate::msg::EgmRobot;
use crate::msg::EgmSensor;

#[derive(Debug)]
/// Blocking EGM peer for sending and receiving messages over UDP.
pub struct EgmPeer {
	socket: UdpSocket,
}

impl EgmPeer {
	/// Wrap an existing UDP socket in a peer.
	///
	/// If you want to use the [`EgmPeer::recv`] and [`EgmPeer::send`] functions,
	/// you should use an already connected socket.
	/// Otherwise, you can only use [`EgmPeer::recv_from`] and [`EgmPeer::send_to`].
	pub fn new(socket: UdpSocket) -> Self {
		Self { socket }
	}

	/// Create an EGM peer on a newly bound UDP socket.
	///
	/// The socket will not be connected to a remote peer,
	/// so you can only use [`EgmPeer::recv_from`] and [`EgmPeer::send_to`].
	pub fn bind(addrs: impl std::net::ToSocketAddrs) -> std::io::Result<Self> {
		Ok(Self::new(UdpSocket::bind(addrs)?))
	}

	/// Get a shared reference to the inner socket.
	pub fn socket(&self) -> &UdpSocket {
		&self.socket
	}

	/// Get an exclusive reference to the inner socket.
	pub fn socket_mut(&mut self) -> &mut UdpSocket {
		&mut self.socket
	}

	/// Consume self and get the inner socket.
	pub fn into_socket(self) -> UdpSocket {
		self.socket
	}

	/// Receive a message from the remote address to which the inner socket is connected.
	///
	/// To use this function, you must pass an already connected socket to [`EgmPeer::new`].
	/// If the peer was created with an unconnected socket, this function will panic.
	pub fn recv(&mut self) -> Result<EgmRobot, ReceiveError> {
		let mut buffer = vec![0u8; 1024];
		let bytes_received = self.socket.recv(&mut buffer)?;
		Ok(EgmRobot::decode(&buffer[..bytes_received])?)
	}

	/// Receive a message from any remote address.
	pub fn recv_from(&mut self) -> Result<(EgmRobot, SocketAddr), ReceiveError> {
		let mut buffer = vec![0u8; 1024];
		let (bytes_received, sender) = self.socket.recv_from(&mut buffer)?;
		Ok((EgmRobot::decode(&buffer[..bytes_received])?, sender))
	}

	/// Purge all messages from the socket read queue.
	///
	/// Useful to ignore old messages when the socket has been left unpolled for a while.
	///
	/// This will leave the socket in blocking mode when the purging is done.
	pub fn purge_recv_queue(&mut self) -> std::io::Result<()> {
		self.socket.set_nonblocking(true)?;

		let mut buffer = vec![0u8; 1024];
		let read_loop_result = loop {
			match self.socket.recv_from(&mut buffer) {
				Err(e) => {
					if e.kind() == std::io::ErrorKind::WouldBlock {
						break Ok(());
					} else {
						break Err(e);
					}
				}
				Ok(_transferred) => (),
			}
		};

		// Restore blocking mode, but make sure we return potential errors from the read loop
		// before errors in restoring blocking mode.
		let restore_blocking_result = self.socket.set_nonblocking(false);
		read_loop_result?;
		restore_blocking_result
	}

	/// Send a message to the remote address to which the inner socket is connected.
	///
	/// To use this function, you must pass an already connected socket to [`EgmPeer::new`].
	/// If the peer was created with an unconnected socket, this function will panic.
	pub fn send(&mut self, msg: &EgmSensor) -> Result<(), SendError> {
		InvalidMessageError::check_sensor_msg(msg)?;
		let buffer = crate::encode_to_vec(msg)?;
		let bytes_sent = self.socket.send(&buffer)?;
		crate::error::check_transfer(bytes_sent, buffer.len())?;
		Ok(())
	}

	/// Send a message to the specified address.
	pub fn send_to(&mut self, msg: &EgmSensor, target: &SocketAddr) -> Result<(), SendError> {
		InvalidMessageError::check_sensor_msg(msg)?;
		let buffer = crate::encode_to_vec(msg)?;
		let bytes_sent = self.socket.send_to(&buffer, target)?;
		crate::error::check_transfer(bytes_sent, buffer.len())?;
		Ok(())
	}
}