spectrusty_peripherals/network/
zxnet_udp.rs1use core::slice;
9use std::io::{self, Read};
10use std::net::{UdpSocket, ToSocketAddrs, SocketAddr, IpAddr, Ipv4Addr};
11use std::time::{Instant, Duration};
12
13#[allow(unused_imports)]
14use log::{error, warn, info, debug, trace};
15
16use super::zxnet::{HEAD_SIZE, ZxNetSocket};
17
18#[derive(Debug)]
32pub struct ZxNetUdpSyncSocket {
33 sock: UdpSocket,
34 packet: io::Cursor<Vec<u8>>,
35 last_accepted: [u8;ACCEPT_SIZE],
36 accepted_time: Instant,
37}
38
39impl ZxNetUdpSyncSocket {
40 pub fn bind<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<()> {
42 self.sock = UdpSocket::bind(addr)?;
43 self.setup_socket()?;
44 Ok(())
45 }
46 pub fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<()> {
48 self.sock.connect(addr)
49 }
50
51 fn setup_socket(&mut self) -> io::Result<()> {
52 self.sock.set_read_timeout(Some(READ_ACCEPT_TIMEOUT))?;
53 self.sock.set_nonblocking(true)?;
54 self.sock.set_broadcast(true)
55 }
56}
57
58const PACKET_TAG: &[u8] = b"ZXNET";
59const KIND_NEW_DATA: u8 = 0;
60const KIND_ACCEPTED: u8 = 1;
61const KIND_INDEX: usize = 5;
62const DATA_INDEX: usize = 6;
63const ACCEPT_SIZE: usize = DATA_INDEX + HEAD_SIZE;
64const MIN_SIZE: usize = ACCEPT_SIZE + 1;
65const MAX_SIZE: usize = ACCEPT_SIZE + u8::max_value() as usize;
66const READ_ACCEPT_TIMEOUT: Duration = Duration::from_millis(50);
68const LAST_ACCEPTED_TTL: Duration = Duration::from_secs(5);
70impl ZxNetSocket for ZxNetUdpSyncSocket {
79 fn packet_data(&self) -> &[u8] {
80 &self.packet.get_ref()[DATA_INDEX..]
81 }
82
83 fn begin_packet(&mut self) {
84 let vec = self.packet.get_mut();
85 vec.resize(DATA_INDEX, 0);
86 vec[0..5].copy_from_slice(PACKET_TAG);
87 vec[KIND_INDEX] = KIND_NEW_DATA; self.packet.set_position(DATA_INDEX as u64);
89 }
90
91 fn push_byte(&mut self, byte: u8) -> usize {
92 let vec = self.packet.get_mut();
93 vec.push(byte);
94 vec.len() - DATA_INDEX
95 }
96
97 fn send_packet(&mut self) {
98 match self.sock.send(self.packet.get_ref()) {
99 Ok(len) => {
100 trace!("sent: {} size: {}", len, len - DATA_INDEX);
101 },
102 Err(e) => {
103 debug!("err send: {}", e);
104 }
105 }
106 }
107
108 fn recv_accept(&mut self) -> bool {
109 let packet = self.packet.get_ref();
110 if packet.len() > ACCEPT_SIZE {
111 let mut buf = [0u8; ACCEPT_SIZE];
112 while let Some(ACCEPT_SIZE) = Self::try_recv(&self.sock, &mut buf, true) {
113 if buf[KIND_INDEX] == KIND_ACCEPTED &&
114 buf[DATA_INDEX..] == self.packet.get_ref()[DATA_INDEX..ACCEPT_SIZE] {
115 return true
116 }
117 }
118 }
119 false
120 }
121
122 fn outbound_index(&self) -> usize {
123 self.packet.get_ref().len() - DATA_INDEX
124 }
125
126 fn recv_packet(&mut self) -> bool {
127 self.packet.set_position(DATA_INDEX as u64);
128 self.packet.get_mut().resize(MAX_SIZE, 0);
129 while let Some(len @ MIN_SIZE..=MAX_SIZE) = Self::try_recv(&self.sock, self.packet.get_mut(), false) {
130 let packet = self.packet.get_mut();
131 packet.truncate(len);
132 let last_accepted = &self.last_accepted;
133 if last_accepted[KIND_INDEX] == KIND_ACCEPTED &&
134 last_accepted[DATA_INDEX..ACCEPT_SIZE] == packet[DATA_INDEX..ACCEPT_SIZE] &&
135 self.accepted_time.elapsed() < LAST_ACCEPTED_TTL {
136 self.send_last_accepted();
138 continue
139 }
140 return true
141 }
142 false
143 }
144
145 fn pull_byte(&mut self) -> Option<u8> {
146 let mut byte = 0u8;
147 if let Ok(1) = self.packet.read(slice::from_mut(&mut byte)) {
148 return Some(byte)
149 }
150 None
151 }
152
153 fn inbound_index(&self) -> usize {
154 self.packet.position() as usize - DATA_INDEX
155 }
156
157 fn send_accept(&mut self) {
158 self.last_accepted.copy_from_slice(&self.packet.get_ref()[..ACCEPT_SIZE]);
159 self.last_accepted[KIND_INDEX] = KIND_ACCEPTED; self.send_last_accepted();
161 self.accepted_time = Instant::now()
162 }
163}
164
165impl ZxNetUdpSyncSocket {
166 fn send_last_accepted(&mut self) {
167 let res = self.sock.send(&self.last_accepted);
168 match res {
169 Ok(ACCEPT_SIZE) => {}
170 Ok(len) => {
171 debug!("wrong send bytes: {}", len);
172 }
173 Err(e) => {
174 debug!("err send last accepted: {:?} {:?}", e.kind(), e);
175 }
176 }
177 }
178
179 fn try_recv(socket: &UdpSocket, buf: &mut [u8], blocking: bool) -> Option<usize> {
180 if blocking {
181 socket.set_nonblocking(false).expect("recv blocking failed (1)");
182 }
183 let ok = match socket.recv(buf) {
184 Ok(len) if len >= ACCEPT_SIZE => Some(len),
185 Ok(len) => {
186 debug!("too short packet: {}", len);
187 None
188 }
189 Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
190 Err(e) if e.kind() == io::ErrorKind::TimedOut => None,
191 Err(e) => {
192 debug!("err recv: {:?} {:?}", e.kind(), e);
193 None
194 }
195 };
196 if blocking {
197 socket.set_nonblocking(true).expect("recv blocking failed (2)");
198 }
199 ok.and_then(|len| {
200 if PACKET_TAG == &buf[0..5] {
202 match buf[KIND_INDEX] {
203 0 => {
204 Some(len)
205 },
206 1 if len == ACCEPT_SIZE => Some(len),
207 _ => None
208 }
209 }
210 else {
211 debug!("bad packet: {} {:?}", len, buf);
212 None
213 }
214 })
215 .map(|len| {
216 let mut bufpk = [0u8; MAX_SIZE];
217 loop { match socket.peek(&mut bufpk) {
219 Ok(plen) if plen == len && bufpk[0..len] == buf[0..len] => {}
220 _ => break len
221 }
222 socket.recv(&mut bufpk).unwrap();
223 }
224 })
225 }
226}
227
228impl Default for ZxNetUdpSyncSocket {
229 fn default() -> Self {
230 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
231 let packet = io::Cursor::new(Vec::with_capacity(MAX_SIZE));
232 let last_accepted = Default::default();
233 let accepted_time = Instant::now();
234 let mut sock = ZxNetUdpSyncSocket {
235 sock: UdpSocket::bind(addr).expect("can't create an UDP socket"),
236 packet,
237 last_accepted,
238 accepted_time
239 };
240 sock.setup_socket().expect("setup socket failed");
241 sock
242 }
243}