use std::collections::HashMap;
use tokio::sync::mpsc::channel;
use tokio::sync::{Mutex, Notify};
use tokio::sync::mpsc::{Receiver, Sender};
use std::{net::SocketAddr, sync::Arc};
use tokio::net::UdpSocket;
use crate::{socket::*, raknet_log_debug, raknet_log_error};
use crate::packet::*;
use crate::utils::*;
use crate::error::{Result, RaknetError};
const SERVER_NAME : &str = "Rust Raknet Server";
const MAX_CONNECTION : u32 = 99999;
type SessionSender = (i64 ,Sender<Vec<u8>>);
pub struct RaknetListener {
motd : String,
socket : Option<Arc<UdpSocket>>,
guid : u64,
listened : bool,
connection_receiver : Receiver<RaknetSocket>,
connection_sender : Sender<RaknetSocket>,
sessions : Arc<Mutex<HashMap<SocketAddr , SessionSender>>>,
close_notifier : Arc<tokio::sync::Semaphore>,
all_session_closed_notifier : Arc<Notify>,
drop_notifier : Arc<Notify>
}
impl RaknetListener {
pub async fn bind(sockaddr : &SocketAddr) -> Result<Self> {
let s = match UdpSocket::bind(sockaddr).await{
Ok(p) => p,
Err(_) => {
return Err(RaknetError::BindAdreesError);
},
};
let (connection_sender ,connection_receiver) = channel::<RaknetSocket>(10);
let ret = Self {
motd : String::new(),
socket : Some(Arc::new(s)),
guid : rand::random(),
listened : false,
connection_receiver,
connection_sender,
sessions : Arc::new(Mutex::new(HashMap::new())),
close_notifier : Arc::new(tokio::sync::Semaphore::new(0)),
all_session_closed_notifier: Arc::new(Notify::new()),
drop_notifier : Arc::new(Notify::new())
};
ret.drop_watcher().await;
Ok(ret)
}
pub async fn from_std(s : std::net::UdpSocket) -> Result<Self>{
s.set_nonblocking(true).expect("set udpsocket nonblocking error");
let s = match UdpSocket::from_std(s){
Ok(p) => p,
Err(_) => {
return Err(RaknetError::SetRaknetRawSocketError);
},
};
let (connection_sender ,connection_receiver) = channel::<RaknetSocket>(10);
let ret = Self {
motd : String::new(),
socket : Some(Arc::new(s)),
guid : rand::random(),
listened : false,
connection_receiver,
connection_sender,
sessions : Arc::new(Mutex::new(HashMap::new())),
close_notifier : Arc::new(tokio::sync::Semaphore::new(0)),
all_session_closed_notifier : Arc::new(Notify::new()),
drop_notifier : Arc::new(Notify::new())
};
ret.drop_watcher().await;
Ok(ret)
}
async fn start_session_collect(&self ,socket : &Arc<UdpSocket> , sessions : &Arc<Mutex<HashMap<SocketAddr , SessionSender>>> ,mut collect_receiver : Receiver<SocketAddr>) {
let sessions = sessions.clone();
let socket = socket.clone();
let close_notifier = self.close_notifier.clone();
let all_session_closed_notifier = self.all_session_closed_notifier.clone();
tokio::spawn(async move{
loop{
let addr : SocketAddr;
tokio::select! {
a = collect_receiver.recv() => {
match a {
Some(p) => { addr = p },
None => {
raknet_log_debug!("session collecter closed");
break;
},
};
},
_ = close_notifier.acquire() => {
raknet_log_debug!("session collecter close notified");
break;
}
}
let mut sessions = sessions.lock().await;
if sessions.contains_key(&addr){
match socket.send_to(&[PacketID::Disconnect.to_u8()], addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
sessions.remove(&addr);
raknet_log_debug!("collect socket : {}" , addr);
}
}
let mut sessions = sessions.lock().await;
for i in sessions.iter(){
if i.1.1.send(vec![PacketID::Disconnect.to_u8()]).await.is_ok(){
}
match socket.send_to(&[PacketID::Disconnect.to_u8()], i.0).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
}
while !sessions.is_empty(){
let addr = match collect_receiver.recv().await{
Some(p) => p,
None => {
raknet_log_error!("clean session faild , maybe has session not close");
break;
},
};
if sessions.contains_key(&addr){
match socket.send_to(&[PacketID::Disconnect.to_u8()], addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
sessions.remove(&addr);
raknet_log_debug!("collect socket : {}" , addr);
}
}
sessions.clear();
all_session_closed_notifier.notify_one();
raknet_log_debug!("session collect closed");
});
}
pub async fn listen(&mut self) {
if self.close_notifier.is_closed(){
return;
}
if self.motd.is_empty(){
self.set_motd(SERVER_NAME , MAX_CONNECTION, "486" , "1.18.11", "Survival" , self.socket.as_ref().unwrap().local_addr().unwrap().port()).await;
}
let socket = self.socket.as_ref().unwrap().clone();
let guid = self.guid;
let sessions = self.sessions.clone();
let connection_sender = self.connection_sender.clone();
let motd = self.get_motd().await;
self.listened = true;
let (collect_sender ,collect_receiver) = channel::<SocketAddr>(10);
let collect_sender = Arc::new(Mutex::new(collect_sender));
self.start_session_collect(&socket ,&sessions , collect_receiver).await;
let local_addr = socket.local_addr().unwrap();
let close_notify = self.close_notifier.clone();
tokio::spawn(async move {
let mut buf= [0u8;2048];
raknet_log_debug!("start listen worker : {}" , local_addr );
loop{
let motd = motd.clone();
let size : usize ;
let addr : SocketAddr;
tokio::select!{
a = socket.recv_from(&mut buf) => {
match a {
Ok(p) => {
size = p.0;
addr = p.1;
},
Err(e) => {
raknet_log_debug!("server recv_from error {}" , e);
break;
},
};
},
_ = close_notify.acquire() => {
raknet_log_debug!("listen close notified");
break;
}
}
let cur_status = match PacketID::from(buf[0]){
Ok(p) => p,
Err(e) => {
raknet_log_debug!("parse packetid faild : {:?}" ,e );
continue;
},
};
match cur_status{
PacketID::UnconnectedPing1 => {
let _ping = match read_packet_ping(&buf[..size]).await{
Ok(p) => p,
Err(_) => continue,
};
let packet = crate::packet::PacketUnconnectedPong {
time: cur_timestamp_millis(),
guid,
magic: true,
motd
};
let pong = match write_packet_pong(&packet).await{
Ok(p) => p,
Err(_) => continue,
};
match socket.send_to(&pong, addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
continue;
},
PacketID::UnconnectedPing2 => {
match read_packet_ping(&buf[..size]).await{
Ok(p) => p,
Err(_) => continue,
};
let packet = crate::packet::PacketUnconnectedPong {
time: cur_timestamp_millis(),
guid,
magic: true,
motd
};
let pong = match write_packet_pong(&packet).await{
Ok(p) => p,
Err(_) => continue,
};
match socket.send_to(&pong, addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
continue;
},
PacketID::OpenConnectionRequest1 => {
let req = match read_packet_connection_open_request_1(&buf[..size]).await{
Ok(p) => p,
Err(_) => continue,
};
if req.protocol_version != RAKNET_PROTOCOL_VERSION{
let packet = crate::packet::IncompatibleProtocolVersion{
server_protocol: RAKNET_PROTOCOL_VERSION,
magic: true,
server_guid: guid,
};
let buf = write_packet_incompatible_protocol_version(&packet).await.unwrap();
match socket.send_to(&buf, addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
continue;
}
let packet = crate::packet::OpenConnectionReply1 {
magic: true,
guid,
use_encryption: 0x00,
mtu_size: RAKNET_CLIENT_MTU,
};
let reply = match write_packet_connection_open_reply_1(&packet).await{
Ok(p) => p,
Err(_) => continue,
};
match socket.send_to(&reply, addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
continue;
},
PacketID::OpenConnectionRequest2 => {
let req = match read_packet_connection_open_request_2(&buf[..size]).await{
Ok(p) => p,
Err(_) => continue,
};
let packet = crate::packet::OpenConnectionReply2 {
magic: true,
guid,
address: addr,
mtu: req.mtu,
encryption_enabled: 0x00,
};
let reply = match write_packet_connection_open_reply_2(&packet).await{
Ok(p) => p,
Err(_) => continue,
};
let mut sessions = sessions.lock().await;
if sessions.contains_key(&addr) {
let packet = write_packet_already_connected(&AlreadyConnected{
magic: true,
guid,
}).await.unwrap();
match socket.send_to(&packet, addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
continue;
}
match socket.send_to(&reply, addr).await{
Ok(_) => {},
Err(e) => {
raknet_log_error!("udp socket send_to error : {}" ,e);
},
};
let (sender , receiver) = channel::<Vec<u8>>(10);
let s = RaknetSocket::from(&addr, &socket, receiver , req.mtu , collect_sender.clone()).await;
raknet_log_debug!("accept connection : {}", addr);
sessions.insert(addr, (cur_timestamp_millis() ,sender));
let _ = connection_sender.send(s).await;
},
PacketID::Disconnect => {
let mut sessions = sessions.lock().await;
if sessions.contains_key(&addr){
sessions[&addr].1.send(buf[..size].to_vec()).await.unwrap();
sessions.remove(&addr);
}
}
_ => {
let mut sessions = sessions.lock().await;
if sessions.contains_key(&addr){
match sessions[&addr].1.send(buf[..size].to_vec()).await{
Ok(_) => {},
Err(_) => {
sessions.remove(&addr);
continue;
},
};
sessions.get_mut(&addr).unwrap().0 = cur_timestamp_millis();
}
},
}
}
raknet_log_debug!("listen worker closed");
});
}
pub async fn accept(&mut self) -> Result<RaknetSocket> {
if !self.listened{
Err(RaknetError::NotListen)
}else {
tokio::select!{
a = self.connection_receiver.recv() => {
match a {
Some(p) => Ok(p),
None => {
Err(RaknetError::NotListen)
},
}
},
_ = self.close_notifier.acquire() => {
raknet_log_debug!("accept close notified");
Err(RaknetError::NotListen)
}
}
}
}
pub async fn set_motd(&mut self ,server_name : &str, max_connection : u32, mc_protocol_version : &str , mc_version : &str , game_type : &str ,port : u16 ) {
self.motd = format!("MCPE;{};{};{};0;{};{};Bedrock level;{};1;{};",server_name, mc_protocol_version , mc_version , max_connection , self.guid , game_type, port);
}
pub async fn get_motd(&self) -> String{
self.motd.clone()
}
pub fn local_addr(&self) -> Result<SocketAddr> {
Ok(self.socket.as_ref().unwrap().local_addr().unwrap())
}
pub async fn close(&mut self) -> Result<()>{
if self.close_notifier.is_closed(){
return Ok(());
}
self.close_notifier.close();
self.all_session_closed_notifier.notified().await;
while Arc::strong_count(self.socket.as_ref().unwrap()) != 1{
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
self.socket = None;
self.listened = false;
Ok(())
}
pub fn set_full_motd(&mut self, motd : String ) -> Result<()>{
self.motd = motd;
Ok(())
}
async fn drop_watcher(&self){
let close_notifier = self.close_notifier.clone();
let drop_notifier = self.drop_notifier.clone();
tokio::spawn(async move {
raknet_log_debug!("listener drop watcher start");
drop_notifier.notify_one();
drop_notifier.notified().await;
if close_notifier.is_closed(){
raknet_log_debug!("close notifier closed");
return;
}
close_notifier.close();
raknet_log_debug!("listener drop watcher closed");
});
self.drop_notifier.notified().await;
}
}
impl Drop for RaknetListener{
fn drop(&mut self) {
self.drop_notifier.notify_one();
}
}