rak_rs/server/mod.rs
1//! This is the server implementation of RakNet, allowing you to create a RakNet server.
2//!
3//! This module provides a [`Listener`] struct, which is responsible for listening to connections,
4//! and dispatching them to a handler, as well as some other utilities.
5//!
6//! [`Listener`]: struct.Listener.html
7#[allow(unused)]
8/// Server events module. Handles things like updating the MOTD
9/// for certain connections. This is a notifier channel.
10pub mod event;
11
12use std::collections::HashMap;
13use std::net::ToSocketAddrs;
14use std::{net::SocketAddr, sync::Arc};
15
16#[cfg(feature = "async_std")]
17use async_std::{
18 channel::{bounded, Receiver, Sender},
19 net::UdpSocket,
20 sync::Mutex,
21 task::{self},
22};
23#[cfg(feature = "async_std")]
24use futures::{select, FutureExt};
25
26use binary_util::interfaces::{Reader, Writer};
27use binary_util::ByteReader;
28
29#[cfg(feature = "async_tokio")]
30use tokio::{
31 net::UdpSocket,
32 select,
33 sync::mpsc::channel as bounded,
34 sync::mpsc::{Receiver, Sender},
35 sync::Mutex,
36 task::{self},
37};
38
39use crate::connection::{ConnMeta, Connection};
40use crate::error::server::ServerError;
41use crate::notify::Notify;
42use crate::protocol::mcpe::motd::Motd;
43use crate::protocol::packet::offline::{
44 IncompatibleProtocolVersion, OfflinePacket, OpenConnectReply, SessionInfoReply, UnconnectedPong,
45};
46use crate::protocol::packet::RakPacket;
47use crate::protocol::Magic;
48use crate::rakrs_debug;
49use crate::util::to_address_token;
50
51pub(crate) type Session = (ConnMeta, Sender<Vec<u8>>);
52
53/// This is a helper enum that allows you to pass in a `SocketAddr` or a `&str` to the `Listener::bind` function.
54/// This is useful for when you want to bind to a specific address, but you don't want to parse it yourself.
55///
56/// This Trait will successfully parse the following:
57/// - `SocketAddr::new("127.0.0.1:19132")`
58/// - `"127.0.0.1:19132"`
59/// - `String::from("127.0.0.1:19132")`
60pub enum PossiblySocketAddr<'a> {
61 SocketAddr(SocketAddr),
62 Str(&'a str),
63 String(String),
64 ActuallyNot,
65}
66
67impl PossiblySocketAddr<'_> {
68 pub fn to_socket_addr(self) -> Option<SocketAddr> {
69 match self {
70 PossiblySocketAddr::SocketAddr(addr) => Some(addr),
71 PossiblySocketAddr::Str(addr) => {
72 // we need to parse it
73 Some(addr.parse::<SocketAddr>().unwrap())
74 }
75 PossiblySocketAddr::String(addr) => {
76 if let Ok(addr) = addr.parse::<SocketAddr>() {
77 Some(addr.clone())
78 } else {
79 // try to parse it as a socket addr then a string
80 if let Ok(mut addr) = addr.to_socket_addrs() {
81 if let Some(v) = addr.next() {
82 Some(v)
83 } else {
84 None
85 }
86 } else {
87 None
88 }
89 }
90 }
91 _ => None,
92 }
93 }
94}
95
96impl From<&str> for PossiblySocketAddr<'_> {
97 fn from(s: &str) -> Self {
98 PossiblySocketAddr::String(s.to_string())
99 }
100}
101
102impl From<String> for PossiblySocketAddr<'_> {
103 fn from(s: String) -> Self {
104 PossiblySocketAddr::String(s)
105 }
106}
107
108impl From<SocketAddr> for PossiblySocketAddr<'_> {
109 fn from(s: SocketAddr) -> Self {
110 PossiblySocketAddr::SocketAddr(s)
111 }
112}
113
114impl std::fmt::Display for PossiblySocketAddr<'_> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 PossiblySocketAddr::SocketAddr(addr) => write!(f, "{}", addr),
118 PossiblySocketAddr::Str(addr) => write!(f, "{}", addr),
119 PossiblySocketAddr::String(addr) => write!(f, "{}", addr),
120 PossiblySocketAddr::ActuallyNot => write!(f, "Not a valid address!"),
121 }
122 }
123}
124
125/// The main server struct, this is responsible for listening to connections, and dispatching them to a handler.
126/// > If you are having problems with debugging, you can use the rak-rs debug feature, which will print out
127/// > all packets that are sent and recieved.
128///
129/// <style>
130/// .warning-2 {
131/// background: rgba(255,240,76,0.34) !important;
132/// padding: 0.75em;
133/// border-left: 2px solid #fce811;
134/// font-family: "Source Serif 4", NanumBarunGothic, serif;
135/// }
136///
137/// .warning-2 code {
138/// background: rgba(211,201,88,0.64) !important;
139/// }
140///
141/// .notice-2 {
142/// background: rgba(88, 211, 255, 0.34) !important;
143/// padding: 0.75em;
144/// border-left: 2px solid #4c96ff;
145/// font-family: "Source Serif 4", NanumBarunGothic, serif;
146/// }
147///
148/// .notice-2 code {
149/// background: rgba(88, 211, 255, 0.64) !important;
150/// }
151/// </style>
152///
153/// <div class="notice-2">
154/// <strong>Notice:</strong>
155/// <p>
156/// Currently, the <code>Listener</code> does not support encryption, plugins, or any feature to allow you to
157/// hijack the RakNet connection sequence. Currently rak_rs is a pure, bare-bones RakNet implementation. <br /><br />
158/// There is currently an <a href="https://github.com/NetrexMC/RakNet/issues/48">open issue</a>
159/// to add support for plugins but this is not a priority, instead you should use the <code>Connection</code> struct
160/// to handle your own packets with the <code>recv</code> method.
161/// </p>
162/// </div>
163///
164/// ## A generic example
165/// ```rust ignore
166/// use rak_rs::server::Listener;
167///
168/// #[async_std::main]
169/// async fn main() {
170/// // Bind the server to the specified address, but do not start it.
171/// let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
172///
173/// // Begins listening to connections
174/// server.start().await.unwrap();
175///
176/// // Start recieving connections
177/// loop {
178/// let conn = server.accept().await;
179/// async_std::task::spawn(handle(conn.unwrap()));
180/// }
181/// }
182///
183/// // This is a function that handles the connection, this is where you would handle the connection.
184/// async fn handle(mut conn: Connection) {
185/// loop {
186/// // this is used to cleanup the connection
187/// if conn.get_state().await.is_closed() {
188/// println!("Connection closed!");
189/// break;
190/// }
191///
192/// if let Ok(pk) = conn.recv().await {
193/// println!("Got a connection packet {:?} ", pk);
194/// }
195/// }
196/// }
197/// ```
198///
199/// ## Accepting other protocols
200/// This struct allows support for other raknet protocols, however this is not recommended, because occasionally
201/// the protocol may change and the Listener may not be updated to support it. This was mainly added for MCPE.
202///
203/// ```rust ignore
204/// use rak_rs::server::Listener;
205///
206/// #[async_std::main]
207/// async fn main() {
208/// let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
209/// server.versions = &[10, 11];
210/// server.start().await.unwrap();
211///
212/// loop {
213/// // .. same loop as above
214/// }
215/// }
216/// ```
217pub struct Listener {
218 /// If mcpe is true, this is the default MOTD, this is
219 /// the default MOTD to send to the client. You can change this later by setting
220 /// a motd in the `Conn` struct.
221 pub motd: Motd,
222 /// A server Id, passed in unconnected pong.
223 pub id: u64,
224 /// Supported versions
225 pub versions: &'static [u8],
226 /// Whether or not the server is being served.
227 serving: bool,
228 /// The current socket.
229 sock: Option<Arc<UdpSocket>>,
230 /// A Hashmap off all current connections along with a sending channel
231 /// and some meta data like the time of connection, and the requested MTU_Size
232 connections: Arc<Mutex<HashMap<SocketAddr, Session>>>,
233 /// The recieve communication channel, This is used to dispatch connections between a handle
234 /// It allows you to use the syntax sugar for `Listener::accept()`.
235 recv_comm: Receiver<Connection>,
236 send_comm: Sender<Connection>,
237 // TODO, fix this!
238 // send_evnt: Sender<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
239 // pub recv_evnt: Arc<Mutex<mpsc::Receiver<(ServerEvent, oneshot::Sender<ServerEventResponse>)>>>,
240 // TODO
241 /// A Notifier (sephamore) that will wait until all notified listeners
242 /// are completed, and finish closing.
243 closed: Arc<Notify>,
244 // This is a notifier that acknowledges all connections have been removed from the server successfully.
245 // This is important to prevent memory leaks if the process is continously running.
246 // cleanup: Arc<Condvar>,
247}
248
249impl Listener {
250 /// Binds a new listener to the specified address provided, this will error if the address is invalid or already in use.
251 /// This will not start the listener, you must call [`Listener::start`] to start listening to connections.
252 ///
253 /// ## Example
254 /// ```ignore
255 /// use rak_rs::server::Listener;
256 ///
257 /// async fn start() {
258 /// let mut server = Listener::bind("").await.unwrap();
259 /// }
260 /// ```
261 ///
262 /// [`PossiblySocketAddr`]: enum.PossiblySocketAddr.html
263 /// [`Listener::start`]: struct.Listener.html#method.start
264 pub async fn bind<I: for<'a> Into<PossiblySocketAddr<'a>>>(
265 address: I,
266 ) -> Result<Self, ServerError> {
267 let a: PossiblySocketAddr = address.into();
268 let address_r: Option<SocketAddr> = a.to_socket_addr();
269 if address_r.is_none() {
270 rakrs_debug!("Invalid binding value");
271 return Err(ServerError::AddrBindErr);
272 }
273
274 let address = address_r.unwrap();
275
276 let sock = match UdpSocket::bind(address).await {
277 Ok(s) => s,
278 Err(_) => return Err(ServerError::AddrBindErr),
279 };
280
281 rakrs_debug!(true, "listener: Bound to {}", address);
282
283 let server_id: u64 = rand::random();
284 let motd = Motd::new(server_id, format!("{}", address.port()));
285
286 // This channel is a Communication channel for when `Connection` structs are initialized.
287 let (send_comm, recv_comm) = bounded::<Connection>(10);
288 // This channel is responsible for handling and dispatching events between clients.
289 // Oneshot will garauntee this event is intended for the client whom requested the event.
290 // TODO: Fix with new event system
291 // let (send_evnt, recv_evnt) =
292 // mpsc::channel::<(ServerEvent, oneshot::Sender<ServerEventResponse>)>(10);
293
294 let listener = Self {
295 sock: Some(Arc::new(sock)),
296 id: server_id,
297 versions: &[10, 11],
298 motd,
299 send_comm,
300 recv_comm,
301 // send_evnt,
302 // recv_evnt: Arc::new(Mutex::new(recv_evnt)),
303 serving: false,
304 connections: Arc::new(Mutex::new(HashMap::new())),
305 // closer: Arc::new(Semaphore::new(0)),
306 closed: Arc::new(Notify::new()),
307 // cleanup: Arc::new(Notify::new()),
308 // cleanup: Arc::new(Condvar::new()),
309 };
310
311 return Ok(listener);
312 }
313
314 /// This method is required to be called before the server can begin listening to connections.
315 /// However, you must call [`Listener::bind`] before you can call this method, as that method
316 /// is responsible for creating the socket and initializing the server.
317 ///
318 /// ## Example
319 /// ```ignore
320 /// use rak_rs::server::Listener;
321 /// async fn start() {
322 /// let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
323 ///
324 /// // let's begin to listen to connections
325 /// server.start().await;
326 /// }
327 /// ```
328 ///
329 /// [`Listener::bind`]: struct.Listener.html#method.bind
330 pub async fn start(&mut self) -> Result<(), ServerError> {
331 if self.serving {
332 return Err(ServerError::AlreadyOnline);
333 }
334
335 let socket = self.sock.as_ref().unwrap().clone();
336 let send_comm = self.send_comm.clone();
337 // let send_evt = self.send_evnt.clone();
338 let server_id = self.id.clone();
339 #[cfg(feature = "mcpe")]
340 let default_motd = self.motd.clone();
341 let connections = self.connections.clone();
342 let closer = self.closed.clone();
343 let connections2 = self.connections.clone();
344 let closer2 = self.closed.clone();
345 let versions = self.versions;
346
347 self.serving = true;
348
349 #[cfg(feature = "async_std")]
350 let (cs, client_close_recv) = bounded::<SocketAddr>(10);
351 #[cfg(feature = "async_tokio")]
352 let (cs, mut client_close_recv) = bounded::<SocketAddr>(10);
353 let client_close_send = Arc::new(cs);
354
355 task::spawn(async move {
356 // We allocate here to prevent constant allocation of this array
357 let mut buf: [u8; 2048] = [0; 2048];
358 #[cfg(feature = "mcpe")]
359 let motd_default = default_motd.clone();
360 loop {
361 let length: usize;
362 let origin: SocketAddr;
363
364 macro_rules! recv_body {
365 ($recv: ident) => {
366 match $recv {
367 Ok((l, o)) => {
368 length = l;
369 origin = o;
370 }
371 Err(e) => {
372 match e.kind() {
373 std::io::ErrorKind::ConnectionReset => {
374 continue;
375 },
376 _ => {
377 rakrs_debug!(true, "[SERVER-SOCKET] Failed to recieve packet! {}", e);
378 continue;
379 }
380 }
381 }
382 }
383
384 // Do a quick check to see if this a valid raknet packet, otherwise we're going to handle it normally
385 if let Ok(pk) = OfflinePacket::read(&mut ByteReader::from(&buf[..length])) {
386 // Offline packets are not buffered to the user.
387 // The reason for this is because we don't wish for the user to be able to disrupt
388 // raknet protocol, and handshaking.
389 match pk {
390 OfflinePacket::UnconnectedPing(_) => {
391 // let (resp_tx, resp_rx) =
392 // oneshot::channel::<ServerEventResponse>();
393 #[cfg(feature = "mcpe")]
394 let motd: Motd = motd_default.clone();
395
396 // if let Err(e) = send_evt.try_send((
397 // ServerEvent::RefreshMotdRequest(origin, motd.clone()),
398 // // resp_tx,
399 // ))
400 // {
401 // match e {
402 // TrySendError::Full(_) => {
403 // rakrs_debug!(true, "[{}] Event dispatcher is full! Dropping request.", to_address_token(origin));
404 // }
405 // TrySendError::Closed(_) => {
406 // rakrs_debug!(true, "[{}] Event dispatcher is closed! Dropping request.", to_address_token(origin));
407 // }
408 // }
409 // }
410
411 // if let Ok(res) = resp_rx.await {
412 // // get the motd from the server event otherwise use defaults.
413 // // if let Ok(res) = res {
414 // match res {
415 // ServerEventResponse::RefreshMotd(m) => {
416 // motd = m;
417 // }
418 // _ => {
419 // rakrs_debug!(true, "[{}] Response to ServerEvent::RefreshMotdRequest is invalid!", to_address_token(origin));
420 // }
421 // }
422 // // };
423 // }
424
425 // unconnected pong signature is different if MCPE is specified.
426 let resp = UnconnectedPong {
427 timestamp: current_epoch(),
428 server_id,
429 magic: Magic::new(),
430 #[cfg(feature = "mcpe")]
431 motd,
432 };
433
434 send_packet_to_socket(&socket, resp.into(), origin).await;
435 continue;
436 }
437 OfflinePacket::OpenConnectRequest(mut pk) => {
438 // TODO make a constant for this
439 if !versions.contains(&pk.protocol) {
440 let resp = IncompatibleProtocolVersion {
441 protocol: pk.protocol,
442 magic: Magic::new(),
443 server_id,
444 };
445
446 rakrs_debug!("[{}] Sent ({}) which is invalid RakNet protocol. Version is incompatible with server.", pk.protocol, to_address_token(*&origin));
447
448 send_packet_to_socket(&socket, resp.into(), origin).await;
449 continue;
450 }
451
452 rakrs_debug!(
453 true,
454 "[{}] Client requested Mtu Size: {}",
455 to_address_token(*&origin),
456 pk.mtu_size
457 );
458
459 if pk.mtu_size > 2048 {
460 rakrs_debug!(
461 true,
462 "[{}] Client requested Mtu Size: {} which is larger than the maximum allowed size of 2048",
463 to_address_token(*&origin),
464 pk.mtu_size
465 );
466 pk.mtu_size = 2048;
467 }
468
469 let resp = OpenConnectReply {
470 server_id,
471 // TODO allow encryption
472 security: false,
473 magic: Magic::new(),
474 // TODO make this configurable, this is sent to the client to change
475 // it's mtu size, right now we're using what the client prefers.
476 // however in some cases this may not be the preferred use case, for instance
477 // on servers with larger worlds, you may want a larger mtu size, or if
478 // your limited on network bandwith
479 mtu_size: pk.mtu_size,
480 };
481 send_packet_to_socket(&socket, resp.into(), origin).await;
482 continue;
483 }
484 OfflinePacket::SessionInfoRequest(pk) => {
485 let resp = SessionInfoReply {
486 server_id,
487 client_address: origin,
488 magic: Magic::new(),
489 mtu_size: pk.mtu_size,
490 security: false,
491 };
492
493 // This is a valid packet, let's check if a session exists, if not, we should create it.
494 // Event if the connection is only in offline mode.
495 let mut sessions = connections.lock().await;
496
497 if !sessions.contains_key(&origin) {
498 rakrs_debug!(true, "Creating new session for {}", origin);
499 let meta = ConnMeta::new(0);
500 let (net_send, net_recv) = bounded::<Vec<u8>>(10);
501 let connection =
502 Connection::new(origin, &socket, net_recv, client_close_send.clone(), pk.mtu_size).await;
503 rakrs_debug!(true, "Created Session for {}", origin);
504
505 // Add the connection to the available connections list.
506 // we're using the name "sessions" here to differeniate
507 // for some reason the reciever likes to be dropped, so we're saving it here.
508 sessions.insert(origin, (meta, net_send));
509
510 // notify the connection communicator
511 if let Err(err) = send_comm.send(connection).await {
512 let connection = err.0;
513 // there was an error, and we should terminate this connection immediately.
514 rakrs_debug!("[{}] Error while communicating with internal connection channel! Connection withdrawn.", to_address_token(connection.address));
515 sessions.remove(&origin);
516 continue;
517 }
518 }
519
520 // update the sessions mtuSize, this is referred to internally, we also will send this event to the client
521 // event channel. However we are not expecting a response.
522
523 sessions.get_mut(&origin).unwrap().0.mtu_size = pk.mtu_size;
524 rakrs_debug!(
525 true,
526 "[{}] Updated mtu size to {}",
527 to_address_token(origin),
528 pk.mtu_size
529 );
530
531 // let (resp_tx, resp_rx) = oneshot::channel::<ServerEventResponse>();
532
533 // if let Err(_) = timeout(Duration::from_millis(5), resp_rx).await {
534 // rakrs_debug!(
535 // "[{}] Failed to update mtu size with the client!",
536 // to_address_token(origin)
537 // );
538 // }
539
540 // if let Err(_) = send_evt.send((ServerEvent::SetMtuSize(pk.mtu_size), resp_tx))
541 // .await
542 // {
543 // rakrs_debug!(
544 // "[{}] Failed to update mtu size with the client!",
545 // to_address_token(origin)
546 // );
547 // }
548
549 send_packet_to_socket(&socket, resp.into(), origin).await;
550 continue;
551 }
552 _ => {
553 rakrs_debug!(
554 "[{}] Received invalid packet!",
555 to_address_token(*&origin)
556 );
557 }
558 }
559 }
560
561 // Packet may be valid, but we'll let the connection decide this
562 let mut sessions = connections.lock().await;
563 if sessions.contains_key(&origin) {
564 if let Err(_) = sessions[&origin].1.send(buf[..length].to_vec()).await {
565 rakrs_debug!(true, "[{}] Failed when handling recieved packet! Could not pass over to internal connection, the channel might be closed! (Removed the connection)", to_address_token(*&origin));
566 sessions.remove(&origin);
567 }
568 }
569 drop(sessions);
570 };
571 }
572
573 #[cfg(feature = "async_std")]
574 select! {
575 _ = closer.wait().fuse() => {
576 rakrs_debug!(true, "[SERVER] [NETWORK] Server has recieved the shutdown notification!");
577 break;
578 }
579 recv = socket.recv_from(&mut buf).fuse() => {
580 recv_body!(recv);
581 }
582 }
583
584 #[cfg(feature = "async_tokio")]
585 select! {
586 _ = closer.wait() => {
587 rakrs_debug!(true, "[SERVER] [NETWORK] Server has recieved the shutdown notification!");
588 break;
589 }
590 recv = socket.recv_from(&mut buf) => {
591 recv_body!(recv);
592 }
593 }
594 }
595 });
596
597 task::spawn(async move {
598 // here we loop and recv from the client_close_recv channel
599 // and remove the connection from the hashmap
600 loop {
601 #[cfg(feature = "async_std")]
602 select! {
603 _ = closer2.wait().fuse() => {
604 rakrs_debug!(true, "[SERVER] [Cleanup] Server has recieved the shutdown notification!");
605 break;
606 }
607 addr = client_close_recv.recv().fuse() => {
608 if let Ok(addr) = addr {
609 rakrs_debug!(true, "[SERVER] [Cleanup] Removing connection for {}", to_address_token(addr));
610 let mut c = connections2.lock().await;
611 c.remove(&addr);
612 drop(c);
613 }
614 }
615 }
616
617 #[cfg(feature = "async_tokio")]
618 select! {
619 _ = closer2.wait() => {
620 rakrs_debug!(true, "[SERVER] [Cleanup] Server has recieved the shutdown notification!");
621 break;
622 }
623 addr = client_close_recv.recv() => {
624 if let Some(addr) = addr {
625 rakrs_debug!(true, "[SERVER] [Cleanup] Removing connection for {}", to_address_token(addr));
626 let mut c = connections2.lock().await;
627 c.remove(&addr);
628 drop(c);
629 }
630 }
631 }
632 }
633 });
634
635 return Ok(());
636 }
637
638 // pub async fn recv_event(&self) -> Result<(ServerEvent, oneshot::Sender<ServerEventResponse>), ServerError> {
639 // if !self.serving {
640 // Err(ServerError::NotListening)
641 // } else {
642 // let mut recvr = self.recv_evnt.lock().await;
643 // tokio::select! {
644 // receiver = recvr.recv() => {
645 // match receiver {
646 // Some(c) => Ok(c),
647 // None => Err(ServerError::Killed)
648 // }
649 // },
650 // _ = self.closer.acquire() => {
651 // Err(ServerError::Killed)
652 // }
653 // }
654 // }
655 // }
656
657 /// This method is used to accept a connection, this will block until a connection is available.
658 /// You can only call this method once both [`Listener::bind`] AND [`Listener::start`] have. This function
659 /// is used to recieve and accept connections. Alternatively, you can refuse a connection
660 /// by dropping it when you accept it.
661 ///
662 /// [`Listener::bind`]: struct.Listener.html#method.bind
663 /// [`Listener::start`]: struct.Listener.html#method.start
664 ///
665 /// <div class="warning-2">
666 /// <strong>Warning:</strong>
667 /// <p>
668 /// This method will block until a connection is available, this is not recommended to be used
669 /// in the main thread, instead you should use a task or future to handle connections.
670 /// </p>
671 /// </div>
672 ///
673 /// ## Example
674 /// ```rust ignore
675 /// use rak_rs::server::Listener;
676 /// use rak_rs::Connection;
677 ///
678 /// #[async_std::main]
679 /// async fn main() {
680 /// let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
681 /// server.start().await.unwrap();
682 ///
683 /// loop {
684 /// let conn = server.accept().await;
685 /// async_std::task::spawn(handle(conn.unwrap()));
686 /// }
687 /// }
688 ///
689 /// async fn handle(mut conn: Connection) {
690 /// loop {
691 /// let packet = conn.recv().await;
692 /// println!("Received a packet! {:?}", packet);
693 /// }
694 /// }
695 /// ```
696 pub async fn accept(&mut self) -> Result<Connection, ServerError> {
697 if !self.serving {
698 Err(ServerError::NotListening)
699 } else {
700 let receiver = self.recv_comm.recv().await;
701 return match receiver {
702 #[cfg(feature = "async_std")]
703 Ok(c) => Ok(c),
704 #[cfg(feature = "async_std")]
705 Err(_) => Err(ServerError::Killed),
706 #[cfg(feature = "async_tokio")]
707 Some(c) => Ok(c),
708 #[cfg(feature = "async_tokio")]
709 None => Err(ServerError::Killed),
710 };
711 }
712 }
713
714 /// Stops the Listener, effectively closing the socket and stopping the server.
715 /// This will also close all connections, and prevent any new connections from being accepted,
716 /// until [`Listener::start`] is called again.
717 ///
718 /// [`Listener::start`]: struct.Listener.html#method.start
719 pub async fn stop(&mut self) -> Result<(), ServerError> {
720 self.closed.notify().await;
721 // self.cleanup.notified().await;
722
723 self.sock = None;
724 self.serving = false;
725
726 Ok(())
727 }
728}
729
730impl Drop for Listener {
731 fn drop(&mut self) {
732 if self.serving {
733 futures_executor::block_on(self.stop()).unwrap();
734 }
735 }
736}
737
738async fn send_packet_to_socket(socket: &Arc<UdpSocket>, packet: RakPacket, origin: SocketAddr) {
739 if let Err(e) = socket
740 .send_to(&mut packet.write_to_bytes().unwrap().as_slice(), origin)
741 .await
742 {
743 rakrs_debug!(
744 "[{}] Failed sending payload to socket! {}",
745 to_address_token(origin),
746 e
747 );
748 }
749}
750
751pub(crate) fn current_epoch() -> u64 {
752 std::time::SystemTime::now()
753 .duration_since(std::time::UNIX_EPOCH)
754 .unwrap()
755 .as_secs() as u64
756}