#[allow(unused)]
pub mod event;
use std::collections::HashMap;
use std::{net::SocketAddr, sync::Arc};
#[cfg(feature = "async_std")]
use async_std::{
channel::{bounded, Receiver, Sender},
net::UdpSocket,
sync::Mutex,
task::{self},
};
#[cfg(feature = "async_std")]
use futures::{select, FutureExt};
use binary_util::interfaces::{Reader, Writer};
use binary_util::ByteReader;
#[cfg(feature = "async_tokio")]
use tokio::{
net::UdpSocket,
select,
sync::mpsc::channel as bounded,
sync::mpsc::{Receiver, Sender},
sync::Mutex,
task::{self},
};
use crate::connection::{ConnMeta, Connection};
use crate::error::server::ServerError;
use crate::notify::Notify;
use crate::protocol::mcpe::motd::Motd;
use crate::protocol::packet::offline::{
IncompatibleProtocolVersion, OfflinePacket, OpenConnectReply, SessionInfoReply, UnconnectedPong,
};
use crate::protocol::packet::RakPacket;
use crate::protocol::Magic;
use crate::rakrs_debug;
use crate::util::to_address_token;
pub(crate) type Session = (ConnMeta, Sender<Vec<u8>>);
pub enum PossiblySocketAddr<'a> {
SocketAddr(SocketAddr),
Str(&'a str),
String(String),
ActuallyNot,
}
impl PossiblySocketAddr<'_> {
pub fn to_socket_addr(self) -> Option<SocketAddr> {
match self {
PossiblySocketAddr::SocketAddr(addr) => Some(addr),
PossiblySocketAddr::Str(addr) => {
Some(addr.parse::<SocketAddr>().unwrap())
}
PossiblySocketAddr::String(addr) => {
Some(addr.clone().as_str().parse::<SocketAddr>().unwrap())
}
_ => None,
}
}
}
impl From<&str> for PossiblySocketAddr<'_> {
fn from(s: &str) -> Self {
PossiblySocketAddr::String(s.to_string())
}
}
impl From<String> for PossiblySocketAddr<'_> {
fn from(s: String) -> Self {
PossiblySocketAddr::String(s)
}
}
impl From<SocketAddr> for PossiblySocketAddr<'_> {
fn from(s: SocketAddr) -> Self {
PossiblySocketAddr::SocketAddr(s)
}
}
impl std::fmt::Display for PossiblySocketAddr<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PossiblySocketAddr::SocketAddr(addr) => write!(f, "{}", addr),
PossiblySocketAddr::Str(addr) => write!(f, "{}", addr),
PossiblySocketAddr::String(addr) => write!(f, "{}", addr),
PossiblySocketAddr::ActuallyNot => write!(f, "Not a valid address!"),
}
}
}
pub struct Listener {
pub motd: Motd,
pub id: u64,
pub versions: &'static [u8],
serving: bool,
sock: Option<Arc<UdpSocket>>,
connections: Arc<Mutex<HashMap<SocketAddr, Session>>>,
recv_comm: Receiver<Connection>,
send_comm: Sender<Connection>,
closed: Arc<Notify>,
}
impl Listener {
pub async fn bind<I: for<'a> Into<PossiblySocketAddr<'a>>>(
address: I,
) -> Result<Self, ServerError> {
let a: PossiblySocketAddr = address.into();
let address_r: Option<SocketAddr> = a.to_socket_addr();
if address_r.is_none() {
rakrs_debug!("Invalid binding value");
return Err(ServerError::AddrBindErr);
}
let address = address_r.unwrap();
let sock = match UdpSocket::bind(address).await {
Ok(s) => s,
Err(_) => return Err(ServerError::AddrBindErr),
};
rakrs_debug!(true, "listener: Bound to {}", address);
let server_id: u64 = rand::random();
let motd = Motd::new(server_id, format!("{}", address.port()));
let (send_comm, recv_comm) = bounded::<Connection>(10);
let listener = Self {
sock: Some(Arc::new(sock)),
id: server_id,
versions: &[10, 11],
motd,
send_comm,
recv_comm,
serving: false,
connections: Arc::new(Mutex::new(HashMap::new())),
closed: Arc::new(Notify::new()),
};
return Ok(listener);
}
pub async fn start(&mut self) -> Result<(), ServerError> {
if self.serving {
return Err(ServerError::AlreadyOnline);
}
let socket = self.sock.as_ref().unwrap().clone();
let send_comm = self.send_comm.clone();
let server_id = self.id.clone();
#[cfg(feature = "mcpe")]
let default_motd = self.motd.clone();
let connections = self.connections.clone();
let closer = self.closed.clone();
let connections2 = self.connections.clone();
let closer2 = self.closed.clone();
let versions = self.versions.clone();
self.serving = true;
#[cfg(feature = "async_std")]
let (cs, client_close_recv) = bounded::<SocketAddr>(10);
#[cfg(feature = "async_tokio")]
let (cs, mut client_close_recv) = bounded::<SocketAddr>(10);
let client_close_send = Arc::new(cs);
task::spawn(async move {
let mut buf: [u8; 2048] = [0; 2048];
#[cfg(feature = "mcpe")]
let motd_default = default_motd.clone();
loop {
let length: usize;
let origin: SocketAddr;
macro_rules! recv_body {
($recv: ident) => {
match $recv {
Ok((l, o)) => {
length = l;
origin = o;
}
Err(e) => {
match e.kind() {
std::io::ErrorKind::ConnectionReset => {
continue;
},
_ => {
rakrs_debug!(true, "[SERVER-SOCKET] Failed to recieve packet! {}", e);
continue;
}
}
}
}
if let Ok(pk) = OfflinePacket::read(&mut ByteReader::from(&buf[..length])) {
match pk {
OfflinePacket::UnconnectedPing(_) => {
#[cfg(feature = "mcpe")]
let motd: Motd = motd_default.clone();
let resp = UnconnectedPong {
timestamp: current_epoch(),
server_id,
magic: Magic::new(),
#[cfg(feature = "mcpe")]
motd,
};
send_packet_to_socket(&socket, resp.into(), origin).await;
continue;
}
OfflinePacket::OpenConnectRequest(mut pk) => {
if !versions.contains(&pk.protocol) {
let resp = IncompatibleProtocolVersion {
protocol: pk.protocol,
magic: Magic::new(),
server_id,
};
rakrs_debug!("[{}] Sent ({}) which is invalid RakNet protocol. Version is incompatible with server.", pk.protocol, to_address_token(*&origin));
send_packet_to_socket(&socket, resp.into(), origin).await;
continue;
}
rakrs_debug!(
true,
"[{}] Client requested Mtu Size: {}",
to_address_token(*&origin),
pk.mtu_size
);
if pk.mtu_size > 2048 {
rakrs_debug!(
true,
"[{}] Client requested Mtu Size: {} which is larger than the maximum allowed size of 2048",
to_address_token(*&origin),
pk.mtu_size
);
pk.mtu_size = 2048;
}
let resp = OpenConnectReply {
server_id,
security: false,
magic: Magic::new(),
mtu_size: pk.mtu_size,
};
send_packet_to_socket(&socket, resp.into(), origin).await;
continue;
}
OfflinePacket::SessionInfoRequest(pk) => {
let resp = SessionInfoReply {
server_id,
client_address: origin,
magic: Magic::new(),
mtu_size: pk.mtu_size,
security: false,
};
let mut sessions = connections.lock().await;
if !sessions.contains_key(&origin) {
rakrs_debug!(true, "Creating new session for {}", origin);
let meta = ConnMeta::new(0);
let (net_send, net_recv) = bounded::<Vec<u8>>(10);
let connection =
Connection::new(origin, &socket, net_recv, client_close_send.clone(), pk.mtu_size).await;
rakrs_debug!(true, "Created Session for {}", origin);
sessions.insert(origin, (meta, net_send));
if let Err(err) = send_comm.send(connection).await {
let connection = err.0;
rakrs_debug!("[{}] Error while communicating with internal connection channel! Connection withdrawn.", to_address_token(connection.address));
sessions.remove(&origin);
continue;
}
}
sessions.get_mut(&origin).unwrap().0.mtu_size = pk.mtu_size;
rakrs_debug!(
true,
"[{}] Updated mtu size to {}",
to_address_token(origin),
pk.mtu_size
);
send_packet_to_socket(&socket, resp.into(), origin).await;
continue;
}
_ => {
rakrs_debug!(
"[{}] Received invalid packet!",
to_address_token(*&origin)
);
}
}
}
let mut sessions = connections.lock().await;
if sessions.contains_key(&origin) {
if let Err(_) = sessions[&origin].1.send(buf[..length].to_vec()).await {
rakrs_debug!(true, "[{}] Failed when handling recieved packet! Could not pass over to internal connection, the channel might be closed! (Removed the connection)", to_address_token(*&origin));
sessions.remove(&origin);
}
}
drop(sessions);
};
}
#[cfg(feature = "async_std")]
select! {
_ = closer.wait().fuse() => {
rakrs_debug!(true, "[SERVER] [NETWORK] Server has recieved the shutdown notification!");
break;
}
recv = socket.recv_from(&mut buf).fuse() => {
recv_body!(recv);
}
}
#[cfg(feature = "async_tokio")]
select! {
_ = closer.wait() => {
rakrs_debug!(true, "[SERVER] [NETWORK] Server has recieved the shutdown notification!");
break;
}
recv = socket.recv_from(&mut buf) => {
recv_body!(recv);
}
}
}
});
task::spawn(async move {
loop {
#[cfg(feature = "async_std")]
select! {
_ = closer2.wait().fuse() => {
rakrs_debug!(true, "[SERVER] [Cleanup] Server has recieved the shutdown notification!");
break;
}
addr = client_close_recv.recv().fuse() => {
if let Ok(addr) = addr {
rakrs_debug!(true, "[SERVER] [Cleanup] Removing connection for {}", to_address_token(addr));
let mut c = connections2.lock().await;
c.remove(&addr);
drop(c);
}
}
}
#[cfg(feature = "async_tokio")]
select! {
_ = closer2.wait() => {
rakrs_debug!(true, "[SERVER] [Cleanup] Server has recieved the shutdown notification!");
break;
}
addr = client_close_recv.recv() => {
if let Some(addr) = addr {
rakrs_debug!(true, "[SERVER] [Cleanup] Removing connection for {}", to_address_token(addr));
let mut c = connections2.lock().await;
c.remove(&addr);
drop(c);
}
}
}
}
});
return Ok(());
}
pub async fn accept(&mut self) -> Result<Connection, ServerError> {
if !self.serving {
Err(ServerError::NotListening)
} else {
let receiver = self.recv_comm.recv().await;
return match receiver {
#[cfg(feature = "async_std")]
Ok(c) => Ok(c),
#[cfg(feature = "async_std")]
Err(_) => Err(ServerError::Killed),
#[cfg(feature = "async_tokio")]
Some(c) => Ok(c),
#[cfg(feature = "async_tokio")]
None => Err(ServerError::Killed),
};
}
}
pub async fn stop(&mut self) -> Result<(), ServerError> {
self.closed.notify().await;
self.sock = None;
self.serving = false;
Ok(())
}
}
async fn send_packet_to_socket(socket: &Arc<UdpSocket>, packet: RakPacket, origin: SocketAddr) {
if let Err(e) = socket
.send_to(&mut packet.write_to_bytes().unwrap().as_slice(), origin)
.await
{
rakrs_debug!(
"[{}] Failed sending payload to socket! {}",
to_address_token(origin),
e
);
}
}
pub(crate) fn current_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as u64
}