Skip to main content

spectrusty_peripherals/network/
zxnet_udp.rs

1/*
2    Copyright (C) 2020-2022  Rafal Michalski
3
4    This file is part of SPECTRUSTY, a Rust library for building emulators.
5
6    For the full copyright notice, see the lib.rs file.
7*/
8use core::slice;
9use std::io::{self, Read};
10use std::net::{UdpSocket, ToSocketAddrs, SocketAddr, IpAddr, Ipv4Addr};
11use std::time::{Instant, Duration};
12
13#[allow(unused_imports)]
14use log::{error, warn, info, debug, trace};
15
16use super::zxnet::{HEAD_SIZE, ZxNetSocket};
17
18/// Implements [ZxNetSocket] sending ZX-NET packets using UDP datagrams in real-time.
19///
20/// Each ZX-NET data packet is being sent as a single datagram. When a packet is being accepted
21/// a special datagram is being sent back that contains only the head part part of the ZX-NET
22/// packet and a flag indicating that this is the reply packet.
23///
24/// Duplicate spam messages are being removed before processing incoming data.
25/// Packets that were recently replied to are being auto-replied when incoming again.
26///
27/// Requires an UDP socket to be "connected" to the remote party in order to send and receive data.
28///
29/// The original ZX-NET data packet is being prepended by 6 bytes of which 5 are the tag: **"ZXNET"**
30/// and a flag byte if 0 indicating an incoming packet and 1 a reply packet.
31#[derive(Debug)]
32pub struct ZxNetUdpSyncSocket {
33    sock: UdpSocket,
34    packet: io::Cursor<Vec<u8>>,
35    last_accepted: [u8;ACCEPT_SIZE],
36    accepted_time: Instant,
37}
38
39impl ZxNetUdpSyncSocket {
40    /// Binds the UDP socket to the indicated local address.
41    pub fn bind<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<()> {
42        self.sock = UdpSocket::bind(addr)?;
43        self.setup_socket()?;
44        Ok(())
45    }
46    /// Connects the UDP socket to the indicated remote address.
47    pub fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<()> {
48        self.sock.connect(addr)
49    }
50
51    fn setup_socket(&mut self) -> io::Result<()> {
52        self.sock.set_read_timeout(Some(READ_ACCEPT_TIMEOUT))?;
53        self.sock.set_nonblocking(true)?;
54        self.sock.set_broadcast(true)
55    }
56}
57
58const PACKET_TAG: &[u8] = b"ZXNET";
59const KIND_NEW_DATA: u8 = 0;
60const KIND_ACCEPTED: u8 = 1;
61const KIND_INDEX: usize = 5;
62const DATA_INDEX: usize = 6;
63const ACCEPT_SIZE: usize = DATA_INDEX + HEAD_SIZE;
64const MIN_SIZE: usize = ACCEPT_SIZE + 1;
65const MAX_SIZE: usize = ACCEPT_SIZE + u8::max_value() as usize;
66// The emulation will be paused for this long to get the response packet.
67const READ_ACCEPT_TIMEOUT: Duration = Duration::from_millis(50);
68// After this time the last accepted header will be ignored.
69const LAST_ACCEPTED_TTL: Duration = Duration::from_secs(5);
70/*
71    packet > 
72    accept < 
73     TAG: "ZXNET"
74    KIND: 0 (packet), 1 (accept)
75    HEAD: 8 bytes
76    BODY: ..
77*/
78impl ZxNetSocket for ZxNetUdpSyncSocket {
79    fn packet_data(&self) -> &[u8] {
80        &self.packet.get_ref()[DATA_INDEX..]
81    }
82
83    fn begin_packet(&mut self) {
84        let vec = self.packet.get_mut();
85        vec.resize(DATA_INDEX, 0);
86        vec[0..5].copy_from_slice(PACKET_TAG);
87        vec[KIND_INDEX] = KIND_NEW_DATA; // packet
88        self.packet.set_position(DATA_INDEX as u64);
89    }
90
91    fn push_byte(&mut self, byte: u8) -> usize {
92        let vec = self.packet.get_mut();
93        vec.push(byte);
94        vec.len() - DATA_INDEX
95    }
96
97    fn send_packet(&mut self) {
98        match self.sock.send(self.packet.get_ref()) {
99            Ok(len) => {
100                trace!("sent: {} size: {}", len, len - DATA_INDEX);
101            },
102            Err(e) => {
103                debug!("err send: {}", e);
104            }
105        }
106    }
107
108    fn recv_accept(&mut self) -> bool {
109        let packet = self.packet.get_ref();
110        if packet.len() > ACCEPT_SIZE {
111            let mut buf = [0u8; ACCEPT_SIZE];
112            while let Some(ACCEPT_SIZE) = Self::try_recv(&self.sock, &mut buf, true) {
113                if buf[KIND_INDEX] == KIND_ACCEPTED && 
114                   buf[DATA_INDEX..] == self.packet.get_ref()[DATA_INDEX..ACCEPT_SIZE] {
115                    return true
116                }
117            }
118        }
119        false
120    }
121
122    fn outbound_index(&self) -> usize {
123        self.packet.get_ref().len() - DATA_INDEX
124    }
125
126    fn recv_packet(&mut self) -> bool {
127        self.packet.set_position(DATA_INDEX as u64);
128        self.packet.get_mut().resize(MAX_SIZE, 0);
129        while let Some(len @ MIN_SIZE..=MAX_SIZE) = Self::try_recv(&self.sock, self.packet.get_mut(), false) {
130            let packet = self.packet.get_mut();
131            packet.truncate(len);
132            let last_accepted = &self.last_accepted;
133            if last_accepted[KIND_INDEX] == KIND_ACCEPTED && 
134               last_accepted[DATA_INDEX..ACCEPT_SIZE] == packet[DATA_INDEX..ACCEPT_SIZE] &&
135               self.accepted_time.elapsed() < LAST_ACCEPTED_TTL {
136                // re-send acceptance of the last accepted packet
137                self.send_last_accepted();
138                continue
139            }
140            return true
141        }
142        false
143    }
144
145    fn pull_byte(&mut self) -> Option<u8> {
146        let mut byte = 0u8;
147        if let Ok(1) = self.packet.read(slice::from_mut(&mut byte)) {
148            return Some(byte)
149        }
150        None
151    }
152
153    fn inbound_index(&self) -> usize {
154        self.packet.position() as usize - DATA_INDEX
155    }
156
157    fn send_accept(&mut self) {
158        self.last_accepted.copy_from_slice(&self.packet.get_ref()[..ACCEPT_SIZE]);
159        self.last_accepted[KIND_INDEX] = KIND_ACCEPTED; // accept
160        self.send_last_accepted();
161        self.accepted_time = Instant::now()
162    }
163}
164
165impl ZxNetUdpSyncSocket {
166    fn send_last_accepted(&mut self) {
167        let res = self.sock.send(&self.last_accepted);
168        match res {
169            Ok(ACCEPT_SIZE) => {}
170            Ok(len) => {
171                debug!("wrong send bytes: {}", len);
172            }
173            Err(e) => {
174                debug!("err send last accepted: {:?} {:?}", e.kind(), e);
175            }
176        }
177    }
178
179    fn try_recv(socket: &UdpSocket, buf: &mut [u8], blocking: bool) -> Option<usize> {
180        if blocking {
181            socket.set_nonblocking(false).expect("recv blocking failed (1)");
182        }
183        let ok = match socket.recv(buf) {
184            Ok(len) if len >= ACCEPT_SIZE => Some(len),
185            Ok(len) => {
186                debug!("too short packet: {}", len);
187                None
188            }
189            Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
190            Err(e) if e.kind() == io::ErrorKind::TimedOut => None,
191            Err(e) => {
192                debug!("err recv: {:?} {:?}", e.kind(), e);
193                None
194            }
195        };
196        if blocking {
197            socket.set_nonblocking(true).expect("recv blocking failed (2)");
198        }
199        ok.and_then(|len| {
200            // println!("recv: {} {:?}", len, &buf[0..ACCEPT_SIZE]);
201            if PACKET_TAG == &buf[0..5] {
202                match buf[KIND_INDEX] {
203                    0 => {
204                        Some(len)
205                    },
206                    1 if len == ACCEPT_SIZE => Some(len),
207                    _ => None
208                }
209            }
210            else {
211                debug!("bad packet: {} {:?}", len, buf);
212                None
213            }
214        })
215        .map(|len| {
216            let mut bufpk = [0u8; MAX_SIZE];
217            loop { // remove spam duplicates
218                match socket.peek(&mut bufpk) {
219                    Ok(plen) if plen == len && bufpk[0..len] == buf[0..len] => {}
220                    _ => break len
221                }
222                socket.recv(&mut bufpk).unwrap();
223            }
224        })
225    }
226}
227
228impl Default for ZxNetUdpSyncSocket {
229    fn default() -> Self {
230        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
231        let packet = io::Cursor::new(Vec::with_capacity(MAX_SIZE));
232        let last_accepted = Default::default();
233        let accepted_time = Instant::now();
234        let mut sock = ZxNetUdpSyncSocket {
235            sock: UdpSocket::bind(addr).expect("can't create an UDP socket"),
236            packet,
237            last_accepted,
238            accepted_time
239        };
240        sock.setup_socket().expect("setup socket failed");
241        sock
242    }
243}