minetest_protocol/services/
socket.rs

1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::io::Error;
4use std::net::SocketAddr;
5
6use tokio::io::Interest;
7use tokio::io::Ready;
8use tokio::net::UdpSocket;
9use tokio::sync::mpsc::unbounded_channel;
10use tokio::sync::mpsc::UnboundedReceiver;
11use tokio::sync::mpsc::UnboundedSender;
12
13use crate::peer::peer::PeerToSocket;
14
15use crate::peer::peer::new_peer;
16use crate::peer::peer::Peer;
17use crate::peer::peer::PeerIO;
18
19const MAX_DATAGRAM_SIZE: usize = 65536;
20
21///
22/// MinetestSocket
23///
24/// Handles the raw UDP socket, protocol validation, separating packets by peer,
25/// reliable packet send, and split packets.
26///
27/// The actual contents of the communication, including authentication/handshaking,
28/// are not handled at this layer.
29///
30pub struct MinetestSocket {
31    accept_rx: UnboundedReceiver<Peer>,
32    knock_tx: UnboundedSender<SocketAddr>,
33    for_server: bool,
34}
35
36impl MinetestSocket {
37    /// Create a new MinetestSocket and bind to address.
38    /// The address may be V4 or V6.
39    /// To select a random bind port, use 0.0.0.0:0 or [::]:0
40    pub async fn new(bind_addr: SocketAddr, for_server: bool) -> Result<Self, Error> {
41        let socket = UdpSocket::bind(bind_addr).await?;
42        let (peer_tx, peer_rx) = unbounded_channel();
43        let (accept_tx, accept_rx) = unbounded_channel();
44        let (knock_tx, knock_rx) = unbounded_channel();
45        let minetest_socket = Self {
46            accept_rx,
47            knock_tx,
48            for_server,
49        };
50        let minetest_socket_runner = MinetestSocketRunner {
51            socket,
52            peers: HashMap::new(),
53            peer_tx,
54            peer_rx,
55            outgoing: VecDeque::new(),
56            accept_tx,
57            knock_rx,
58            for_server,
59        };
60        tokio::spawn(async move { minetest_socket_runner.run().await });
61        Ok(minetest_socket)
62    }
63
64    /// Returns None when the server has shutdown.
65    pub async fn accept(&mut self) -> Option<Peer> {
66        self.accept_rx.recv().await
67    }
68
69    // Add a peer (server) manually. There is no network I/O.
70    //
71    // NOTE: This is not cancel safe, and it should not
72    // be used if incoming connections are expected, or else
73    // they will be discarded.
74    pub async fn add_peer(&mut self, remote: SocketAddr) -> Peer {
75        assert!(!self.for_server);
76        self.knock_tx.send(remote).unwrap();
77
78        // Wait for the peer
79        loop {
80            let peer = self.accept().await.unwrap();
81            if peer.remote_addr() == remote {
82                return peer;
83            }
84            // Random connect from another address? Ignore it.
85        }
86    }
87}
88
89pub struct MinetestSocketRunner {
90    socket: UdpSocket,
91    peers: HashMap<SocketAddr, PeerIO>,
92    peer_tx: UnboundedSender<PeerToSocket>,
93    peer_rx: UnboundedReceiver<PeerToSocket>,
94    outgoing: VecDeque<(SocketAddr, Vec<u8>)>,
95    accept_tx: UnboundedSender<Peer>,
96    knock_rx: UnboundedReceiver<SocketAddr>,
97    for_server: bool,
98}
99
100impl MinetestSocketRunner {
101    pub async fn run(mut self) {
102        // Top-level error handler
103        match self.run_inner().await {
104            Ok(_) => (),
105            Err(err) => {
106                println!("MinetestSocket abnormal exit: {:?}", err);
107            }
108        }
109    }
110
111    pub async fn run_inner(&mut self) -> anyhow::Result<()> {
112        let mut knock_closed = false;
113        let mut buf: Vec<u8> = vec![0u8; MAX_DATAGRAM_SIZE];
114
115        loop {
116            let mut r = Interest::READABLE;
117            if !self.outgoing.is_empty() {
118                r = r | Interest::WRITABLE;
119            }
120            // rust-analyzer chokes on code inside select!, so keep it to a minimum.
121            tokio::select! {
122                t = self.socket.ready(r) => self.handle_socket_io(t, &mut buf).await?,
123                msg = self.peer_rx.recv() => self.handle_peer_message(msg),
124                t = self.knock_rx.recv(), if !knock_closed => {
125                    match t {
126                        Some(t) => {
127                            self.get_peer(t, true);
128                        },
129                        None => {
130                            knock_closed = true;
131                        },
132                    }
133                }
134            }
135        }
136    }
137
138    async fn handle_socket_io(
139        &mut self,
140        t: tokio::io::Result<Ready>,
141        buf: &mut [u8],
142    ) -> anyhow::Result<()> {
143        let t = t.expect("socket.ready should not error");
144        if t.is_readable() {
145            match self.socket.try_recv_from(buf) {
146                Ok((n, remote_addr)) => {
147                    if let Some(peer) = self.get_peer(remote_addr, self.for_server) {
148                        // TODO: If the peer receive channel is full, generate a disconnect message.
149                        peer.send(&buf[..n]);
150                    }
151                }
152                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
153                Err(e) => panic!("Unexpected socket error: {:?}", e),
154            };
155        }
156        if t.is_writable() && !self.outgoing.is_empty() {
157            let (addr, data) = self.outgoing.pop_back().unwrap();
158            match self.socket.try_send_to(&data, addr) {
159                Ok(_) => (),
160                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
161                    self.outgoing.push_back((addr, data));
162                }
163                Err(e) => panic!("Unexpected socket error: {:?}", e),
164            }
165        }
166        Ok(())
167    }
168
169    fn handle_peer_message(&mut self, msg: Option<PeerToSocket>) {
170        let msg = match msg {
171            Some(msg) => msg,
172            None => panic!("Unexpected Server shutdown?"),
173        };
174        match msg {
175            PeerToSocket::SendImmediate(addr, data) => self.outgoing.push_back((addr, data)),
176            PeerToSocket::Send(addr, data) => self.outgoing.push_front((addr, data)),
177            PeerToSocket::PeerIsDisconnected(addr) => self.remove_peer(addr),
178        }
179    }
180
181    fn get_peer(&mut self, remote_addr: SocketAddr, may_insert: bool) -> Option<&mut PeerIO> {
182        if may_insert && !self.peers.contains_key(&remote_addr) {
183            self.insert_peer(remote_addr);
184        }
185        self.peers.get_mut(&remote_addr)
186    }
187
188    fn insert_peer(&mut self, remote_addr: SocketAddr) {
189        let (peer, peerio) = new_peer(remote_addr, !self.for_server, self.peer_tx.clone());
190        self.peers.insert(remote_addr, peerio);
191        let ok = self.accept_tx.send(peer).is_ok();
192        assert!(ok);
193    }
194
195    fn remove_peer(&mut self, remote_addr: SocketAddr) {
196        self.peers.remove(&remote_addr);
197    }
198}