minetest_protocol/services/
socket.rs1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::io::Error;
4use std::net::SocketAddr;
5
6use tokio::io::Interest;
7use tokio::io::Ready;
8use tokio::net::UdpSocket;
9use tokio::sync::mpsc::unbounded_channel;
10use tokio::sync::mpsc::UnboundedReceiver;
11use tokio::sync::mpsc::UnboundedSender;
12
13use crate::peer::peer::PeerToSocket;
14
15use crate::peer::peer::new_peer;
16use crate::peer::peer::Peer;
17use crate::peer::peer::PeerIO;
18
19const MAX_DATAGRAM_SIZE: usize = 65536;
20
21pub struct MinetestSocket {
31 accept_rx: UnboundedReceiver<Peer>,
32 knock_tx: UnboundedSender<SocketAddr>,
33 for_server: bool,
34}
35
36impl MinetestSocket {
37 pub async fn new(bind_addr: SocketAddr, for_server: bool) -> Result<Self, Error> {
41 let socket = UdpSocket::bind(bind_addr).await?;
42 let (peer_tx, peer_rx) = unbounded_channel();
43 let (accept_tx, accept_rx) = unbounded_channel();
44 let (knock_tx, knock_rx) = unbounded_channel();
45 let minetest_socket = Self {
46 accept_rx,
47 knock_tx,
48 for_server,
49 };
50 let minetest_socket_runner = MinetestSocketRunner {
51 socket,
52 peers: HashMap::new(),
53 peer_tx,
54 peer_rx,
55 outgoing: VecDeque::new(),
56 accept_tx,
57 knock_rx,
58 for_server,
59 };
60 tokio::spawn(async move { minetest_socket_runner.run().await });
61 Ok(minetest_socket)
62 }
63
64 pub async fn accept(&mut self) -> Option<Peer> {
66 self.accept_rx.recv().await
67 }
68
69 pub async fn add_peer(&mut self, remote: SocketAddr) -> Peer {
75 assert!(!self.for_server);
76 self.knock_tx.send(remote).unwrap();
77
78 loop {
80 let peer = self.accept().await.unwrap();
81 if peer.remote_addr() == remote {
82 return peer;
83 }
84 }
86 }
87}
88
89pub struct MinetestSocketRunner {
90 socket: UdpSocket,
91 peers: HashMap<SocketAddr, PeerIO>,
92 peer_tx: UnboundedSender<PeerToSocket>,
93 peer_rx: UnboundedReceiver<PeerToSocket>,
94 outgoing: VecDeque<(SocketAddr, Vec<u8>)>,
95 accept_tx: UnboundedSender<Peer>,
96 knock_rx: UnboundedReceiver<SocketAddr>,
97 for_server: bool,
98}
99
100impl MinetestSocketRunner {
101 pub async fn run(mut self) {
102 match self.run_inner().await {
104 Ok(_) => (),
105 Err(err) => {
106 println!("MinetestSocket abnormal exit: {:?}", err);
107 }
108 }
109 }
110
111 pub async fn run_inner(&mut self) -> anyhow::Result<()> {
112 let mut knock_closed = false;
113 let mut buf: Vec<u8> = vec![0u8; MAX_DATAGRAM_SIZE];
114
115 loop {
116 let mut r = Interest::READABLE;
117 if !self.outgoing.is_empty() {
118 r = r | Interest::WRITABLE;
119 }
120 tokio::select! {
122 t = self.socket.ready(r) => self.handle_socket_io(t, &mut buf).await?,
123 msg = self.peer_rx.recv() => self.handle_peer_message(msg),
124 t = self.knock_rx.recv(), if !knock_closed => {
125 match t {
126 Some(t) => {
127 self.get_peer(t, true);
128 },
129 None => {
130 knock_closed = true;
131 },
132 }
133 }
134 }
135 }
136 }
137
138 async fn handle_socket_io(
139 &mut self,
140 t: tokio::io::Result<Ready>,
141 buf: &mut [u8],
142 ) -> anyhow::Result<()> {
143 let t = t.expect("socket.ready should not error");
144 if t.is_readable() {
145 match self.socket.try_recv_from(buf) {
146 Ok((n, remote_addr)) => {
147 if let Some(peer) = self.get_peer(remote_addr, self.for_server) {
148 peer.send(&buf[..n]);
150 }
151 }
152 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
153 Err(e) => panic!("Unexpected socket error: {:?}", e),
154 };
155 }
156 if t.is_writable() && !self.outgoing.is_empty() {
157 let (addr, data) = self.outgoing.pop_back().unwrap();
158 match self.socket.try_send_to(&data, addr) {
159 Ok(_) => (),
160 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
161 self.outgoing.push_back((addr, data));
162 }
163 Err(e) => panic!("Unexpected socket error: {:?}", e),
164 }
165 }
166 Ok(())
167 }
168
169 fn handle_peer_message(&mut self, msg: Option<PeerToSocket>) {
170 let msg = match msg {
171 Some(msg) => msg,
172 None => panic!("Unexpected Server shutdown?"),
173 };
174 match msg {
175 PeerToSocket::SendImmediate(addr, data) => self.outgoing.push_back((addr, data)),
176 PeerToSocket::Send(addr, data) => self.outgoing.push_front((addr, data)),
177 PeerToSocket::PeerIsDisconnected(addr) => self.remove_peer(addr),
178 }
179 }
180
181 fn get_peer(&mut self, remote_addr: SocketAddr, may_insert: bool) -> Option<&mut PeerIO> {
182 if may_insert && !self.peers.contains_key(&remote_addr) {
183 self.insert_peer(remote_addr);
184 }
185 self.peers.get_mut(&remote_addr)
186 }
187
188 fn insert_peer(&mut self, remote_addr: SocketAddr) {
189 let (peer, peerio) = new_peer(remote_addr, !self.for_server, self.peer_tx.clone());
190 self.peers.insert(remote_addr, peerio);
191 let ok = self.accept_tx.send(peer).is_ok();
192 assert!(ok);
193 }
194
195 fn remove_peer(&mut self, remote_addr: SocketAddr) {
196 self.peers.remove(&remote_addr);
197 }
198}