1use std::collections::HashMap;
2use std::{net::SocketAddr, sync::Arc};
3use tokio::net::UdpSocket;
4use tokio::sync::mpsc::channel;
5use tokio::sync::mpsc::{Receiver, Sender};
6use tokio::sync::{Mutex, Notify};
7
8use crate::error::{RaknetError, Result};
9use crate::packet::*;
10use crate::utils::*;
11use crate::{raknet_log_debug, raknet_log_error, socket::*};
12
13const SERVER_NAME: &str = "Rust Raknet Server";
14const MAX_CONNECTION: u32 = 99999;
15
16type SessionSender = (i64, Sender<Vec<u8>>);
17
18pub struct RaknetListener {
20 motd: String,
21 socket: Option<Arc<UdpSocket>>,
22 guid: u64,
23 listened: bool,
24 connection_receiver: Receiver<RaknetSocket>,
25 connection_sender: Sender<RaknetSocket>,
26 sessions: Arc<Mutex<HashMap<SocketAddr, SessionSender>>>,
27 close_notifier: Arc<tokio::sync::Semaphore>,
28 all_session_closed_notifier: Arc<Notify>,
29 drop_notifier: Arc<Notify>,
30 version_map: Arc<Mutex<HashMap<String, u8>>>,
31}
32
33impl RaknetListener {
34 pub async fn bind(sockaddr: &SocketAddr) -> Result<Self> {
43 let s = match UdpSocket::bind(sockaddr).await {
44 Ok(p) => p,
45 Err(_) => {
46 return Err(RaknetError::BindAdressError);
47 }
48 };
49
50 let (connection_sender, connection_receiver) = channel::<RaknetSocket>(10);
51
52 let ret = Self {
53 motd: String::new(),
54 socket: Some(Arc::new(s)),
55 guid: rand::random(),
56 listened: false,
57 connection_receiver,
58 connection_sender,
59 sessions: Arc::new(Mutex::new(HashMap::new())),
60 close_notifier: Arc::new(tokio::sync::Semaphore::new(0)),
61 all_session_closed_notifier: Arc::new(Notify::new()),
62 drop_notifier: Arc::new(Notify::new()),
63 version_map: Arc::new(Mutex::new(HashMap::new())),
64 };
65
66 ret.drop_watcher().await;
67 Ok(ret)
68 }
69
70 pub async fn from_std(s: std::net::UdpSocket) -> Result<Self> {
78 s.set_nonblocking(true)
79 .expect("set udpsocket nonblocking error");
80
81 let s = match UdpSocket::from_std(s) {
82 Ok(p) => p,
83 Err(_) => {
84 return Err(RaknetError::SetRaknetRawSocketError);
85 }
86 };
87
88 let (connection_sender, connection_receiver) = channel::<RaknetSocket>(10);
89
90 let ret = Self {
91 motd: String::new(),
92 socket: Some(Arc::new(s)),
93 guid: rand::random(),
94 listened: false,
95 connection_receiver,
96 connection_sender,
97 sessions: Arc::new(Mutex::new(HashMap::new())),
98 close_notifier: Arc::new(tokio::sync::Semaphore::new(0)),
99 all_session_closed_notifier: Arc::new(Notify::new()),
100 drop_notifier: Arc::new(Notify::new()),
101 version_map: Arc::new(Mutex::new(HashMap::new())),
102 };
103
104 ret.drop_watcher().await;
105 Ok(ret)
106 }
107
108 async fn start_session_collect(
109 &self,
110 socket: &Arc<UdpSocket>,
111 sessions: &Arc<Mutex<HashMap<SocketAddr, SessionSender>>>,
112 mut collect_receiver: Receiver<SocketAddr>,
113 ) {
114 let sessions = sessions.clone();
115 let socket = socket.clone();
116 let close_notifier = self.close_notifier.clone();
117 let all_session_closed_notifier = self.all_session_closed_notifier.clone();
118 tokio::spawn(async move {
119 loop {
120 let addr: SocketAddr;
121
122 tokio::select! {
123 a = collect_receiver.recv() => {
124 match a {
125 Some(p) => { addr = p },
126 None => {
127 raknet_log_debug!("session collecter closed");
128 break;
129 },
130 };
131 },
132 _ = close_notifier.acquire() => {
133 raknet_log_debug!("session collecter close notified");
134 break;
135 }
136 }
137
138 let mut sessions = sessions.lock().await;
139 if sessions.contains_key(&addr) {
140 match socket.send_to(&[PacketID::Disconnect.to_u8()], addr).await {
141 Ok(_) => {}
142 Err(e) => {
143 raknet_log_error!("udp socket send_to error : {}", e);
144 }
145 };
146 sessions.remove(&addr);
147 raknet_log_debug!("collect socket : {}", addr);
148 }
149 }
150
151 let mut sessions = sessions.lock().await;
152
153 for i in sessions.iter() {
154 if i.1
155 .1
156 .send(vec![PacketID::Disconnect.to_u8()])
157 .await
158 .is_ok()
159 {}
160
161 match socket.send_to(&[PacketID::Disconnect.to_u8()], i.0).await {
162 Ok(_) => {}
163 Err(e) => {
164 raknet_log_error!("udp socket send_to error : {}", e);
165 }
166 };
167 }
168
169 while !sessions.is_empty() {
170 let addr = match collect_receiver.recv().await {
171 Some(p) => p,
172 None => {
173 raknet_log_error!("clean session faild , maybe has session not close");
174 break;
175 }
176 };
177
178 if sessions.contains_key(&addr) {
179 match socket.send_to(&[PacketID::Disconnect.to_u8()], addr).await {
180 Ok(_) => {}
181 Err(e) => {
182 raknet_log_error!("udp socket send_to error : {}", e);
183 }
184 };
185 sessions.remove(&addr);
186 raknet_log_debug!("collect socket : {}", addr);
187 }
188 }
189
190 sessions.clear();
191 all_session_closed_notifier.notify_one();
192
193 raknet_log_debug!("session collect closed");
194 });
195 }
196
197 pub async fn listen(&mut self) {
207 if self.close_notifier.is_closed() || self.listened {
208 return;
209 }
210
211 if self.motd.is_empty() {
212 self.set_motd(
213 SERVER_NAME,
214 MAX_CONNECTION,
215 "486",
216 "1.18.11",
217 "Survival",
218 self.socket.as_ref().unwrap().local_addr().unwrap().port(),
219 )
220 .await;
221 }
222
223 let socket = self.socket.as_ref().unwrap().clone();
224 let guid = self.guid;
225 let sessions = self.sessions.clone();
226 let connection_sender = self.connection_sender.clone();
227 let motd = self.get_motd().await;
228
229 self.listened = true;
230
231 let (collect_sender, collect_receiver) = channel::<SocketAddr>(10);
232 let collect_sender = Arc::new(Mutex::new(collect_sender));
233 self.start_session_collect(&socket, &sessions, collect_receiver)
234 .await;
235
236 let local_addr = socket.local_addr().unwrap();
237 let close_notify = self.close_notifier.clone();
238 let version_map = self.version_map.clone();
239 tokio::spawn(async move {
240 let mut buf = [0u8; 2048];
241
242 raknet_log_debug!("start listen worker : {}", local_addr);
243
244 loop {
245 let motd = motd.clone();
246 let size: usize;
247 let addr: SocketAddr;
248
249 tokio::select! {
250 a = socket.recv_from(&mut buf) => {
251 match a {
252 Ok(p) => {
253 size = p.0;
254 addr = p.1;
255 },
256 Err(e) => {
257 raknet_log_debug!("server recv_from error {}" , e);
258 break;
259 },
260 };
261 },
262 _ = close_notify.acquire() => {
263 raknet_log_debug!("listen close notified");
264 break;
265 }
266 }
267
268 let cur_status = match PacketID::from(buf[0]) {
269 Ok(p) => p,
270 Err(e) => {
271 raknet_log_debug!("parse packetid faild : {:?}", e);
272 continue;
273 }
274 };
275
276 match cur_status {
277 PacketID::UnconnectedPing1 => {
278 let _ping = match read_packet_ping(&buf[..size]) {
279 Ok(p) => p,
280 Err(_) => continue,
281 };
282
283 let packet = crate::packet::PacketUnconnectedPong {
284 time: cur_timestamp_millis(),
285 guid,
286 magic: true,
287 motd,
288 };
289
290 let pong = match write_packet_pong(&packet) {
291 Ok(p) => p,
292 Err(_) => continue,
293 };
294
295 match socket.send_to(&pong, addr).await {
296 Ok(_) => {}
297 Err(e) => {
298 raknet_log_error!("udp socket send_to error : {}", e);
299 }
300 };
301 continue;
302 }
303 PacketID::UnconnectedPing2 => {
304 match read_packet_ping(&buf[..size]) {
305 Ok(p) => p,
306 Err(_) => continue,
307 };
308
309 let packet = crate::packet::PacketUnconnectedPong {
310 time: cur_timestamp_millis(),
311 guid,
312 magic: true,
313 motd,
314 };
315
316 let pong = match write_packet_pong(&packet) {
317 Ok(p) => p,
318 Err(_) => continue,
319 };
320
321 match socket.send_to(&pong, addr).await {
322 Ok(_) => {}
323 Err(e) => {
324 raknet_log_error!("udp socket send_to error : {}", e);
325 }
326 };
327 continue;
328 }
329 PacketID::OpenConnectionRequest1 => {
330 let req = match read_packet_connection_open_request_1(&buf[..size]) {
331 Ok(p) => p,
332 Err(_) => continue,
333 };
334
335 if !RAKNET_PROTOCOL_VERSION_LIST
336 .as_slice()
337 .contains(&req.protocol_version)
338 {
339 let packet = crate::packet::IncompatibleProtocolVersion {
340 server_protocol: RAKNET_PROTOCOL_VERSION,
341 magic: true,
342 server_guid: guid,
343 };
344 let buf = write_packet_incompatible_protocol_version(&packet).unwrap();
345
346 match socket.send_to(&buf, addr).await {
347 Ok(_) => {}
348 Err(e) => {
349 raknet_log_error!("udp socket send_to error : {}", e);
350 }
351 };
352 continue;
353 }
354 {
355 let mut version_map = version_map.lock().await;
356 version_map.insert(addr.to_string(), req.protocol_version);
357 }
358
359 let packet = crate::packet::OpenConnectionReply1 {
360 magic: true,
361 guid,
362 use_encryption: 0x00,
364 mtu_size: RAKNET_CLIENT_MTU,
366 };
367
368 let reply = match write_packet_connection_open_reply_1(&packet) {
369 Ok(p) => p,
370 Err(_) => continue,
371 };
372
373 match socket.send_to(&reply, addr).await {
374 Ok(_) => {}
375 Err(e) => {
376 raknet_log_error!("udp socket send_to error : {}", e);
377 }
378 };
379 continue;
380 }
381 PacketID::OpenConnectionRequest2 => {
382 let req = match read_packet_connection_open_request_2(&buf[..size]) {
383 Ok(p) => p,
384 Err(_) => continue,
385 };
386
387 let packet = crate::packet::OpenConnectionReply2 {
388 magic: true,
389 guid,
390 address: addr,
391 mtu: req.mtu,
392 encryption_enabled: 0x00,
393 };
394
395 let reply = match write_packet_connection_open_reply_2(&packet) {
396 Ok(p) => p,
397 Err(_) => continue,
398 };
399
400 let mut sessions = sessions.lock().await;
401
402 if sessions.contains_key(&addr) {
403 let packet = write_packet_already_connected(&AlreadyConnected {
404 magic: true,
405 guid,
406 })
407 .unwrap();
408
409 match socket.send_to(&packet, addr).await {
410 Ok(_) => {}
411 Err(e) => {
412 raknet_log_error!("udp socket send_to error : {}", e);
413 }
414 };
415
416 continue;
417 }
418
419 match socket.send_to(&reply, addr).await {
420 Ok(_) => {}
421 Err(e) => {
422 raknet_log_error!("udp socket send_to error : {}", e);
423 }
424 };
425
426 let (sender, receiver) = channel::<Vec<u8>>(10);
427
428 let raknet_version: u8;
429 {
430 let version_map = version_map.lock().await;
431 raknet_version = *version_map
432 .get(&addr.to_string())
433 .unwrap_or(&RAKNET_PROTOCOL_VERSION);
434 }
435
436 let s = RaknetSocket::from(
437 &addr,
438 &socket,
439 receiver,
440 req.mtu,
441 collect_sender.clone(),
442 raknet_version,
443 )
444 .await;
445
446 raknet_log_debug!("accept connection : {}", addr);
447 sessions.insert(addr, (cur_timestamp_millis(), sender));
448 let _ = connection_sender.send(s).await;
449 }
450 PacketID::Disconnect => {
451 let mut sessions = sessions.lock().await;
452 if sessions.contains_key(&addr) {
453 sessions[&addr].1.send(buf[..size].to_vec()).await.unwrap();
454 sessions.remove(&addr);
455 }
456 }
457 _ => {
458 let mut sessions = sessions.lock().await;
459 if sessions.contains_key(&addr) {
460 match sessions[&addr].1.send(buf[..size].to_vec()).await {
461 Ok(_) => {}
462 Err(_) => {
463 sessions.remove(&addr);
464 continue;
465 }
466 };
467 sessions.get_mut(&addr).unwrap().0 = cur_timestamp_millis();
468 }
469 }
470 }
471 }
472 raknet_log_debug!("listen worker closed");
473 });
474 }
475
476 pub async fn accept(&mut self) -> Result<RaknetSocket> {
487 if !self.listened {
488 Err(RaknetError::NotListen)
489 } else {
490 tokio::select! {
491 a = self.connection_receiver.recv() => {
492 match a {
493 Some(p) => Ok(p),
494 None => {
495 Err(RaknetError::NotListen)
496 },
497 }
498 },
499 _ = self.close_notifier.acquire() => {
500 raknet_log_debug!("accept close notified");
501 Err(RaknetError::NotListen)
502 }
503 }
504 }
505 }
506
507 pub async fn set_motd(
517 &mut self,
518 server_name: &str,
519 max_connection: u32,
520 mc_protocol_version: &str,
521 mc_version: &str,
522 game_type: &str,
523 port: u16,
524 ) {
525 self.motd = format!(
526 "MCPE;{};{};{};0;{};{};Bedrock level;{};1;{};",
527 server_name,
528 mc_protocol_version,
529 mc_version,
530 max_connection,
531 self.guid,
532 game_type,
533 port
534 );
535 }
536
537 pub async fn get_motd(&self) -> String {
545 self.motd.clone()
546 }
547
548 pub fn local_addr(&self) -> Result<SocketAddr> {
556 Ok(self.socket.as_ref().unwrap().local_addr().unwrap())
557 }
558
559 pub async fn close(&mut self) -> Result<()> {
567 if self.close_notifier.is_closed() {
568 return Ok(());
569 }
570 self.close_notifier.close();
571 self.all_session_closed_notifier.notified().await;
572
573 while Arc::strong_count(self.socket.as_ref().unwrap()) != 1 {
575 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
576 }
577
578 self.socket = None;
580 self.listened = false;
581
582 Ok(())
583 }
584
585 pub fn set_full_motd(&mut self, motd: String) -> Result<()> {
593 self.motd = motd;
594 Ok(())
595 }
596
597 pub async fn get_peer_raknet_version(&self, peer: &SocketAddr) -> Result<u8> {
598 let version_map = self.version_map.lock().await;
599 let ver = version_map.get(&peer.to_string());
600 Ok(*ver.unwrap_or(&RAKNET_PROTOCOL_VERSION))
601 }
602
603 async fn drop_watcher(&self) {
604 let close_notifier = self.close_notifier.clone();
605 let drop_notifier = self.drop_notifier.clone();
606 tokio::spawn(async move {
607 raknet_log_debug!("listener drop watcher start");
608 drop_notifier.notify_one();
609
610 drop_notifier.notified().await;
611
612 if close_notifier.is_closed() {
613 raknet_log_debug!("close notifier closed");
614 return;
615 }
616
617 close_notifier.close();
618
619 raknet_log_debug!("listener drop watcher closed");
620 });
621
622 self.drop_notifier.notified().await;
623 }
624}
625
626impl Drop for RaknetListener {
627 fn drop(&mut self) {
628 self.drop_notifier.notify_one();
629 }
630}