1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use std::net::SocketAddr;
use prost::Message;
use tokio::net::UdpSocket;
use crate::InvalidMessageError;
use crate::ReceiveError;
use crate::SendError;
use crate::msg::EgmRobot;
use crate::msg::EgmSensor;
#[derive(Debug)]
pub struct EgmPeer {
socket: UdpSocket,
}
impl EgmPeer {
pub fn new(socket: UdpSocket) -> Self {
Self { socket }
}
pub async fn bind(addrs: impl tokio::net::ToSocketAddrs) -> std::io::Result<Self> {
Ok(Self::new(UdpSocket::bind(addrs).await?))
}
pub fn bind_sync(addrs: impl std::net::ToSocketAddrs) -> std::io::Result<Self> {
let socket = std::net::UdpSocket::bind(addrs)?;
let socket = tokio::net::UdpSocket::from_std(socket)?;
Ok(Self::new(socket))
}
pub fn socket(&self) -> &UdpSocket {
&self.socket
}
pub fn socket_mut(&mut self) -> &mut UdpSocket {
&mut self.socket
}
pub fn into_socket(self) -> UdpSocket {
self.socket
}
pub async fn recv(&self) -> Result<EgmRobot, ReceiveError> {
let mut buffer = vec![0u8; 1024];
let bytes_received = self.socket.recv(&mut buffer).await?;
Ok(EgmRobot::decode(&buffer[..bytes_received])?)
}
pub async fn recv_from(&self) -> Result<(EgmRobot, SocketAddr), ReceiveError> {
let mut buffer = vec![0u8; 1024];
let (bytes_received, sender) = self.socket.recv_from(&mut buffer).await?;
Ok((EgmRobot::decode(&buffer[..bytes_received])?, sender))
}
pub async fn purge_read_queue(&self) -> std::io::Result<()> {
let mut buffer = vec![0; 1024];
loop {
match poll_once(self.socket.recv_from(&mut buffer)).await {
std::task::Poll::Ready(Ok(_)) => (),
std::task::Poll::Ready(Err(e)) => return Err(e),
std::task::Poll::Pending => return Ok(()),
}
}
}
pub async fn send(&mut self, msg: &EgmSensor) -> Result<(), SendError> {
InvalidMessageError::check_sensor_msg(msg)?;
let buffer = crate::encode_to_vec(msg)?;
let bytes_sent = self.socket.send(&buffer).await?;
crate::error::check_transfer(bytes_sent, buffer.len())?;
Ok(())
}
pub async fn send_to(&mut self, msg: &EgmSensor, target: &SocketAddr) -> Result<(), SendError> {
InvalidMessageError::check_sensor_msg(msg)?;
let buffer = crate::encode_to_vec(msg)?;
let bytes_sent = self.socket.send_to(&buffer, target).await?;
crate::error::check_transfer(bytes_sent, buffer.len())?;
Ok(())
}
}
struct PollOnce<F> {
future: F,
}
impl<F: std::future::Future> std::future::Future for PollOnce<F> {
type Output = std::task::Poll<F::Output>;
fn poll(self: std::pin::Pin<&mut Self>, context: &mut std::task::Context) -> std::task::Poll<Self::Output> {
let pin = unsafe { std::pin::Pin::new_unchecked(&mut self.get_unchecked_mut().future) };
std::task::Poll::Ready(pin.poll(context))
}
}
async fn poll_once<F: std::future::Future>(future: F) -> std::task::Poll<F::Output> {
PollOnce { future }.await
}