rust_raknet/
server.rs

1use std::collections::HashMap;
2use std::{net::SocketAddr, sync::Arc};
3use tokio::net::UdpSocket;
4use tokio::sync::mpsc::channel;
5use tokio::sync::mpsc::{Receiver, Sender};
6use tokio::sync::{Mutex, Notify};
7
8use crate::error::{RaknetError, Result};
9use crate::packet::*;
10use crate::utils::*;
11use crate::{raknet_log_debug, raknet_log_error, socket::*};
12
13const SERVER_NAME: &str = "Rust Raknet Server";
14const MAX_CONNECTION: u32 = 99999;
15
16type SessionSender = (i64, Sender<Vec<u8>>);
17
18/// Implementation of Raknet Server.
19pub struct RaknetListener {
20    motd: String,
21    socket: Option<Arc<UdpSocket>>,
22    guid: u64,
23    listened: bool,
24    connection_receiver: Receiver<RaknetSocket>,
25    connection_sender: Sender<RaknetSocket>,
26    sessions: Arc<Mutex<HashMap<SocketAddr, SessionSender>>>,
27    close_notifier: Arc<tokio::sync::Semaphore>,
28    all_session_closed_notifier: Arc<Notify>,
29    drop_notifier: Arc<Notify>,
30    version_map: Arc<Mutex<HashMap<String, u8>>>,
31}
32
33impl RaknetListener {
34    /// Creates a new RaknetListener which will be bound to the specified address.
35    ///
36    /// # Example
37    /// ```ignore
38    /// let mut listener = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
39    /// listener.listen().await;
40    /// let mut socket = socket = listener.accept().await.unwrap();
41    /// ```
42    pub async fn bind(sockaddr: &SocketAddr) -> Result<Self> {
43        let s = match UdpSocket::bind(sockaddr).await {
44            Ok(p) => p,
45            Err(_) => {
46                return Err(RaknetError::BindAdressError);
47            }
48        };
49
50        let (connection_sender, connection_receiver) = channel::<RaknetSocket>(10);
51
52        let ret = Self {
53            motd: String::new(),
54            socket: Some(Arc::new(s)),
55            guid: rand::random(),
56            listened: false,
57            connection_receiver,
58            connection_sender,
59            sessions: Arc::new(Mutex::new(HashMap::new())),
60            close_notifier: Arc::new(tokio::sync::Semaphore::new(0)),
61            all_session_closed_notifier: Arc::new(Notify::new()),
62            drop_notifier: Arc::new(Notify::new()),
63            version_map: Arc::new(Mutex::new(HashMap::new())),
64        };
65
66        ret.drop_watcher().await;
67        Ok(ret)
68    }
69
70    /// Creates a new RaknetListener from a UdpSocket.
71    ///
72    /// # Example
73    /// ```ignore
74    /// let raw_socket = std::net::UdpSocket::bind("127.0.0.1:19132").unwrap();
75    /// let listener = RaknetListener::from_std(raw_socket);
76    /// ```
77    pub async fn from_std(s: std::net::UdpSocket) -> Result<Self> {
78        s.set_nonblocking(true)
79            .expect("set udpsocket nonblocking error");
80
81        let s = match UdpSocket::from_std(s) {
82            Ok(p) => p,
83            Err(_) => {
84                return Err(RaknetError::SetRaknetRawSocketError);
85            }
86        };
87
88        let (connection_sender, connection_receiver) = channel::<RaknetSocket>(10);
89
90        let ret = Self {
91            motd: String::new(),
92            socket: Some(Arc::new(s)),
93            guid: rand::random(),
94            listened: false,
95            connection_receiver,
96            connection_sender,
97            sessions: Arc::new(Mutex::new(HashMap::new())),
98            close_notifier: Arc::new(tokio::sync::Semaphore::new(0)),
99            all_session_closed_notifier: Arc::new(Notify::new()),
100            drop_notifier: Arc::new(Notify::new()),
101            version_map: Arc::new(Mutex::new(HashMap::new())),
102        };
103
104        ret.drop_watcher().await;
105        Ok(ret)
106    }
107
108    async fn start_session_collect(
109        &self,
110        socket: &Arc<UdpSocket>,
111        sessions: &Arc<Mutex<HashMap<SocketAddr, SessionSender>>>,
112        mut collect_receiver: Receiver<SocketAddr>,
113    ) {
114        let sessions = sessions.clone();
115        let socket = socket.clone();
116        let close_notifier = self.close_notifier.clone();
117        let all_session_closed_notifier = self.all_session_closed_notifier.clone();
118        tokio::spawn(async move {
119            loop {
120                let addr: SocketAddr;
121
122                tokio::select! {
123                    a = collect_receiver.recv() => {
124                        match a {
125                            Some(p) => { addr = p },
126                            None => {
127                                raknet_log_debug!("session collecter closed");
128                                break;
129                            },
130                        };
131                    },
132                    _ = close_notifier.acquire() => {
133                        raknet_log_debug!("session collecter close notified");
134                        break;
135                    }
136                }
137
138                let mut sessions = sessions.lock().await;
139                if sessions.contains_key(&addr) {
140                    match socket.send_to(&[PacketID::Disconnect.to_u8()], addr).await {
141                        Ok(_) => {}
142                        Err(e) => {
143                            raknet_log_error!("udp socket send_to error : {}", e);
144                        }
145                    };
146                    sessions.remove(&addr);
147                    raknet_log_debug!("collect socket : {}", addr);
148                }
149            }
150
151            let mut sessions = sessions.lock().await;
152
153            for i in sessions.iter() {
154                if i.1
155                     .1
156                    .send(vec![PacketID::Disconnect.to_u8()])
157                    .await
158                    .is_ok()
159                {}
160
161                match socket.send_to(&[PacketID::Disconnect.to_u8()], i.0).await {
162                    Ok(_) => {}
163                    Err(e) => {
164                        raknet_log_error!("udp socket send_to error : {}", e);
165                    }
166                };
167            }
168
169            while !sessions.is_empty() {
170                let addr = match collect_receiver.recv().await {
171                    Some(p) => p,
172                    None => {
173                        raknet_log_error!("clean session faild , maybe has session not close");
174                        break;
175                    }
176                };
177
178                if sessions.contains_key(&addr) {
179                    match socket.send_to(&[PacketID::Disconnect.to_u8()], addr).await {
180                        Ok(_) => {}
181                        Err(e) => {
182                            raknet_log_error!("udp socket send_to error : {}", e);
183                        }
184                    };
185                    sessions.remove(&addr);
186                    raknet_log_debug!("collect socket : {}", addr);
187                }
188            }
189
190            sessions.clear();
191            all_session_closed_notifier.notify_one();
192
193            raknet_log_debug!("session collect closed");
194        });
195    }
196
197    /// Listen to a RaknetListener
198    ///
199    /// This method must be called before calling RaknetListener::accept()
200    ///
201    /// # Example
202    /// ```ignore
203    /// let mut listener = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
204    /// listener.listen().await;
205    /// ```
206    pub async fn listen(&mut self) {
207        if self.close_notifier.is_closed() || self.listened {
208            return;
209        }
210
211        if self.motd.is_empty() {
212            self.set_motd(
213                SERVER_NAME,
214                MAX_CONNECTION,
215                "486",
216                "1.18.11",
217                "Survival",
218                self.socket.as_ref().unwrap().local_addr().unwrap().port(),
219            )
220            .await;
221        }
222
223        let socket = self.socket.as_ref().unwrap().clone();
224        let guid = self.guid;
225        let sessions = self.sessions.clone();
226        let connection_sender = self.connection_sender.clone();
227        let motd = self.get_motd().await;
228
229        self.listened = true;
230
231        let (collect_sender, collect_receiver) = channel::<SocketAddr>(10);
232        let collect_sender = Arc::new(Mutex::new(collect_sender));
233        self.start_session_collect(&socket, &sessions, collect_receiver)
234            .await;
235
236        let local_addr = socket.local_addr().unwrap();
237        let close_notify = self.close_notifier.clone();
238        let version_map = self.version_map.clone();
239        tokio::spawn(async move {
240            let mut buf = [0u8; 2048];
241
242            raknet_log_debug!("start listen worker : {}", local_addr);
243
244            loop {
245                let motd = motd.clone();
246                let size: usize;
247                let addr: SocketAddr;
248
249                tokio::select! {
250                    a = socket.recv_from(&mut buf) => {
251                        match a {
252                            Ok(p) => {
253                                size = p.0;
254                                addr = p.1;
255                            },
256                            Err(e) => {
257                                raknet_log_debug!("server recv_from error {}" , e);
258                                break;
259                            },
260                        };
261                    },
262                    _ = close_notify.acquire() => {
263                        raknet_log_debug!("listen close notified");
264                        break;
265                    }
266                }
267
268                let cur_status = match PacketID::from(buf[0]) {
269                    Ok(p) => p,
270                    Err(e) => {
271                        raknet_log_debug!("parse packetid faild : {:?}", e);
272                        continue;
273                    }
274                };
275
276                match cur_status {
277                    PacketID::UnconnectedPing1 => {
278                        let _ping = match read_packet_ping(&buf[..size]) {
279                            Ok(p) => p,
280                            Err(_) => continue,
281                        };
282
283                        let packet = crate::packet::PacketUnconnectedPong {
284                            time: cur_timestamp_millis(),
285                            guid,
286                            magic: true,
287                            motd,
288                        };
289
290                        let pong = match write_packet_pong(&packet) {
291                            Ok(p) => p,
292                            Err(_) => continue,
293                        };
294
295                        match socket.send_to(&pong, addr).await {
296                            Ok(_) => {}
297                            Err(e) => {
298                                raknet_log_error!("udp socket send_to error : {}", e);
299                            }
300                        };
301                        continue;
302                    }
303                    PacketID::UnconnectedPing2 => {
304                        match read_packet_ping(&buf[..size]) {
305                            Ok(p) => p,
306                            Err(_) => continue,
307                        };
308
309                        let packet = crate::packet::PacketUnconnectedPong {
310                            time: cur_timestamp_millis(),
311                            guid,
312                            magic: true,
313                            motd,
314                        };
315
316                        let pong = match write_packet_pong(&packet) {
317                            Ok(p) => p,
318                            Err(_) => continue,
319                        };
320
321                        match socket.send_to(&pong, addr).await {
322                            Ok(_) => {}
323                            Err(e) => {
324                                raknet_log_error!("udp socket send_to error : {}", e);
325                            }
326                        };
327                        continue;
328                    }
329                    PacketID::OpenConnectionRequest1 => {
330                        let req = match read_packet_connection_open_request_1(&buf[..size]) {
331                            Ok(p) => p,
332                            Err(_) => continue,
333                        };
334
335                        if !RAKNET_PROTOCOL_VERSION_LIST
336                            .as_slice()
337                            .contains(&req.protocol_version)
338                        {
339                            let packet = crate::packet::IncompatibleProtocolVersion {
340                                server_protocol: RAKNET_PROTOCOL_VERSION,
341                                magic: true,
342                                server_guid: guid,
343                            };
344                            let buf = write_packet_incompatible_protocol_version(&packet).unwrap();
345
346                            match socket.send_to(&buf, addr).await {
347                                Ok(_) => {}
348                                Err(e) => {
349                                    raknet_log_error!("udp socket send_to error : {}", e);
350                                }
351                            };
352                            continue;
353                        }
354                        {
355                            let mut version_map = version_map.lock().await;
356                            version_map.insert(addr.to_string(), req.protocol_version);
357                        }
358
359                        let packet = crate::packet::OpenConnectionReply1 {
360                            magic: true,
361                            guid,
362                            // Make sure this is false, it is vital for the login sequence to continue
363                            use_encryption: 0x00,
364                            // see Open Connection Request 1
365                            mtu_size: RAKNET_CLIENT_MTU,
366                        };
367
368                        let reply = match write_packet_connection_open_reply_1(&packet) {
369                            Ok(p) => p,
370                            Err(_) => continue,
371                        };
372
373                        match socket.send_to(&reply, addr).await {
374                            Ok(_) => {}
375                            Err(e) => {
376                                raknet_log_error!("udp socket send_to error : {}", e);
377                            }
378                        };
379                        continue;
380                    }
381                    PacketID::OpenConnectionRequest2 => {
382                        let req = match read_packet_connection_open_request_2(&buf[..size]) {
383                            Ok(p) => p,
384                            Err(_) => continue,
385                        };
386
387                        let packet = crate::packet::OpenConnectionReply2 {
388                            magic: true,
389                            guid,
390                            address: addr,
391                            mtu: req.mtu,
392                            encryption_enabled: 0x00,
393                        };
394
395                        let reply = match write_packet_connection_open_reply_2(&packet) {
396                            Ok(p) => p,
397                            Err(_) => continue,
398                        };
399
400                        let mut sessions = sessions.lock().await;
401
402                        if sessions.contains_key(&addr) {
403                            let packet = write_packet_already_connected(&AlreadyConnected {
404                                magic: true,
405                                guid,
406                            })
407                            .unwrap();
408
409                            match socket.send_to(&packet, addr).await {
410                                Ok(_) => {}
411                                Err(e) => {
412                                    raknet_log_error!("udp socket send_to error : {}", e);
413                                }
414                            };
415
416                            continue;
417                        }
418
419                        match socket.send_to(&reply, addr).await {
420                            Ok(_) => {}
421                            Err(e) => {
422                                raknet_log_error!("udp socket send_to error : {}", e);
423                            }
424                        };
425
426                        let (sender, receiver) = channel::<Vec<u8>>(10);
427
428                        let raknet_version: u8;
429                        {
430                            let version_map = version_map.lock().await;
431                            raknet_version = *version_map
432                                .get(&addr.to_string())
433                                .unwrap_or(&RAKNET_PROTOCOL_VERSION);
434                        }
435
436                        let s = RaknetSocket::from(
437                            &addr,
438                            &socket,
439                            receiver,
440                            req.mtu,
441                            collect_sender.clone(),
442                            raknet_version,
443                        )
444                        .await;
445
446                        raknet_log_debug!("accept connection : {}", addr);
447                        sessions.insert(addr, (cur_timestamp_millis(), sender));
448                        let _ = connection_sender.send(s).await;
449                    }
450                    PacketID::Disconnect => {
451                        let mut sessions = sessions.lock().await;
452                        if sessions.contains_key(&addr) {
453                            sessions[&addr].1.send(buf[..size].to_vec()).await.unwrap();
454                            sessions.remove(&addr);
455                        }
456                    }
457                    _ => {
458                        let mut sessions = sessions.lock().await;
459                        if sessions.contains_key(&addr) {
460                            match sessions[&addr].1.send(buf[..size].to_vec()).await {
461                                Ok(_) => {}
462                                Err(_) => {
463                                    sessions.remove(&addr);
464                                    continue;
465                                }
466                            };
467                            sessions.get_mut(&addr).unwrap().0 = cur_timestamp_millis();
468                        }
469                    }
470                }
471            }
472            raknet_log_debug!("listen worker closed");
473        });
474    }
475
476    /// Waiting for and receiving new Raknet connections, returning a Raknet socket
477    ///
478    /// Call this method must be after calling RaknetListener::listen()
479    ///
480    /// # Example
481    /// ```ignore
482    /// let mut listener = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
483    /// listener.listen().await;
484    /// let mut socket = listener.accept().await.unwrap();
485    /// ```
486    pub async fn accept(&mut self) -> Result<RaknetSocket> {
487        if !self.listened {
488            Err(RaknetError::NotListen)
489        } else {
490            tokio::select! {
491                a = self.connection_receiver.recv() => {
492                    match a {
493                        Some(p) => Ok(p),
494                        None => {
495                            Err(RaknetError::NotListen)
496                        },
497                    }
498                },
499                _ = self.close_notifier.acquire() => {
500                    raknet_log_debug!("accept close notified");
501                    Err(RaknetError::NotListen)
502                }
503            }
504        }
505    }
506
507    /// Set the current motd, this motd will be provided to the client in the unconnected pong.
508    ///
509    /// Call this method must be after calling RaknetListener::listen()
510    ///
511    /// # Example
512    /// ```ignore
513    /// let mut listener = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
514    /// listener.set_motd("Another Minecraft Server" , 999999 , "486" , "1.18.11", "Survival" , 19132).await;
515    /// ```
516    pub async fn set_motd(
517        &mut self,
518        server_name: &str,
519        max_connection: u32,
520        mc_protocol_version: &str,
521        mc_version: &str,
522        game_type: &str,
523        port: u16,
524    ) {
525        self.motd = format!(
526            "MCPE;{};{};{};0;{};{};Bedrock level;{};1;{};",
527            server_name,
528            mc_protocol_version,
529            mc_version,
530            max_connection,
531            self.guid,
532            game_type,
533            port
534        );
535    }
536
537    /// Get the current motd, this motd will be provided to the client in the unconnected pong.
538    ///
539    /// # Example
540    /// ```ignore
541    /// let listener = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
542    /// let motd = listener.get_motd().await;
543    /// ```
544    pub async fn get_motd(&self) -> String {
545        self.motd.clone()
546    }
547
548    /// Returns the socket address of the local half of this Raknet connection.
549    ///
550    /// # Example
551    /// ```ignore
552    /// let mut socket = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
553    /// assert_eq!(socket.local_addr().unwrap().ip(), IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
554    /// ```
555    pub fn local_addr(&self) -> Result<SocketAddr> {
556        Ok(self.socket.as_ref().unwrap().local_addr().unwrap())
557    }
558
559    /// Close Raknet Server and all connections.
560    ///
561    /// # Example
562    /// ```ignore
563    /// let mut socket = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
564    /// socket.close().await;
565    /// ```
566    pub async fn close(&mut self) -> Result<()> {
567        if self.close_notifier.is_closed() {
568            return Ok(());
569        }
570        self.close_notifier.close();
571        self.all_session_closed_notifier.notified().await;
572
573        // wait all thread exit and drop socket pointer.
574        while Arc::strong_count(self.socket.as_ref().unwrap()) != 1 {
575            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
576        }
577
578        // drop socket and free bind port
579        self.socket = None;
580        self.listened = false;
581
582        Ok(())
583    }
584
585    /// Set full motd string.
586    ///
587    /// # Example
588    /// ```ignore
589    /// let mut socket = RaknetListener::bind("127.0.0.1:19132".parse().unwrap()).await.unwrap();
590    /// socket.set_full_motd("motd").await;
591    /// ```
592    pub fn set_full_motd(&mut self, motd: String) -> Result<()> {
593        self.motd = motd;
594        Ok(())
595    }
596
597    pub async fn get_peer_raknet_version(&self, peer: &SocketAddr) -> Result<u8> {
598        let version_map = self.version_map.lock().await;
599        let ver = version_map.get(&peer.to_string());
600        Ok(*ver.unwrap_or(&RAKNET_PROTOCOL_VERSION))
601    }
602
603    async fn drop_watcher(&self) {
604        let close_notifier = self.close_notifier.clone();
605        let drop_notifier = self.drop_notifier.clone();
606        tokio::spawn(async move {
607            raknet_log_debug!("listener drop watcher start");
608            drop_notifier.notify_one();
609
610            drop_notifier.notified().await;
611
612            if close_notifier.is_closed() {
613                raknet_log_debug!("close notifier closed");
614                return;
615            }
616
617            close_notifier.close();
618
619            raknet_log_debug!("listener drop watcher closed");
620        });
621
622        self.drop_notifier.notified().await;
623    }
624}
625
626impl Drop for RaknetListener {
627    fn drop(&mut self) {
628        self.drop_notifier.notify_one();
629    }
630}