use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Error;
use std::net::SocketAddr;
use tokio::io::Interest;
use tokio::io::Ready;
use tokio::net::UdpSocket;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use crate::peer::peer::PeerToSocket;
use crate::peer::peer::new_peer;
use crate::peer::peer::Peer;
use crate::peer::peer::PeerIO;
const MAX_DATAGRAM_SIZE: usize = 65536;
pub struct MinetestSocket {
accept_rx: UnboundedReceiver<Peer>,
knock_tx: UnboundedSender<SocketAddr>,
for_server: bool,
}
impl MinetestSocket {
pub async fn new(bind_addr: SocketAddr, for_server: bool) -> Result<Self, Error> {
let socket = UdpSocket::bind(bind_addr).await?;
let (peer_tx, peer_rx) = unbounded_channel();
let (accept_tx, accept_rx) = unbounded_channel();
let (knock_tx, knock_rx) = unbounded_channel();
let minetest_socket = Self {
accept_rx,
knock_tx,
for_server,
};
let minetest_socket_runner = MinetestSocketRunner {
socket,
peers: HashMap::new(),
peer_tx,
peer_rx,
outgoing: VecDeque::new(),
accept_tx,
knock_rx,
for_server,
};
tokio::spawn(async move { minetest_socket_runner.run().await });
Ok(minetest_socket)
}
pub async fn accept(&mut self) -> Option<Peer> {
self.accept_rx.recv().await
}
pub async fn add_peer(&mut self, remote: SocketAddr) -> Peer {
assert!(!self.for_server);
self.knock_tx.send(remote).unwrap();
loop {
let peer = self.accept().await.unwrap();
if peer.remote_addr() == remote {
return peer;
}
}
}
}
pub struct MinetestSocketRunner {
socket: UdpSocket,
peers: HashMap<SocketAddr, PeerIO>,
peer_tx: UnboundedSender<PeerToSocket>,
peer_rx: UnboundedReceiver<PeerToSocket>,
outgoing: VecDeque<(SocketAddr, Vec<u8>)>,
accept_tx: UnboundedSender<Peer>,
knock_rx: UnboundedReceiver<SocketAddr>,
for_server: bool,
}
impl MinetestSocketRunner {
pub async fn run(mut self) {
match self.run_inner().await {
Ok(_) => (),
Err(err) => {
println!("MinetestSocket abnormal exit: {:?}", err);
}
}
}
pub async fn run_inner(&mut self) -> anyhow::Result<()> {
let mut knock_closed = false;
let mut buf: Vec<u8> = vec![0u8; MAX_DATAGRAM_SIZE];
loop {
let mut r = Interest::READABLE;
if !self.outgoing.is_empty() {
r = r | Interest::WRITABLE;
}
tokio::select! {
t = self.socket.ready(r) => self.handle_socket_io(t, &mut buf).await?,
msg = self.peer_rx.recv() => self.handle_peer_message(msg),
t = self.knock_rx.recv(), if !knock_closed => {
match t {
Some(t) => {
self.get_peer(t, true);
},
None => {
knock_closed = true;
},
}
}
}
}
}
async fn handle_socket_io(
&mut self,
t: tokio::io::Result<Ready>,
buf: &mut [u8],
) -> anyhow::Result<()> {
let t = t.expect("socket.ready should not error");
if t.is_readable() {
match self.socket.try_recv_from(buf) {
Ok((n, remote_addr)) => {
if let Some(peer) = self.get_peer(remote_addr, self.for_server) {
peer.send(&buf[..n]);
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
Err(e) => panic!("Unexpected socket error: {:?}", e),
};
}
if t.is_writable() && !self.outgoing.is_empty() {
let (addr, data) = self.outgoing.pop_back().unwrap();
match self.socket.try_send_to(&data, addr) {
Ok(_) => (),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.outgoing.push_back((addr, data));
}
Err(e) => panic!("Unexpected socket error: {:?}", e),
}
}
Ok(())
}
fn handle_peer_message(&mut self, msg: Option<PeerToSocket>) {
let msg = match msg {
Some(msg) => msg,
None => panic!("Unexpected Server shutdown?"),
};
match msg {
PeerToSocket::SendImmediate(addr, data) => self.outgoing.push_back((addr, data)),
PeerToSocket::Send(addr, data) => self.outgoing.push_front((addr, data)),
PeerToSocket::PeerIsDisconnected(addr) => self.remove_peer(addr),
}
}
fn get_peer(&mut self, remote_addr: SocketAddr, may_insert: bool) -> Option<&mut PeerIO> {
if may_insert && !self.peers.contains_key(&remote_addr) {
self.insert_peer(remote_addr);
}
self.peers.get_mut(&remote_addr)
}
fn insert_peer(&mut self, remote_addr: SocketAddr) {
let (peer, peerio) = new_peer(remote_addr, !self.for_server, self.peer_tx.clone());
self.peers.insert(remote_addr, peerio);
let ok = self.accept_tx.send(peer).is_ok();
assert!(ok);
}
fn remove_peer(&mut self, remote_addr: SocketAddr) {
self.peers.remove(&remote_addr);
}
}