rak_rs/server/
mod.rs

1//! This is the server implementation of RakNet, allowing you to create a RakNet server.
2//!
3//! This module provides a [`Listener`] struct, which is responsible for listening to connections,
4//! and dispatching them to a handler, as well as some other utilities.
5//!
6//! [`Listener`]: struct.Listener.html
7#[allow(unused)]
8/// Server events module. Handles things like updating the MOTD
9/// for certain connections. This is a notifier channel.
10pub mod event;
11
12use std::collections::HashMap;
13use std::net::ToSocketAddrs;
14use std::{net::SocketAddr, sync::Arc};
15
16#[cfg(feature = "async_std")]
17use async_std::{
18    channel::{bounded, Receiver, Sender},
19    net::UdpSocket,
20    sync::Mutex,
21    task::{self},
22};
23#[cfg(feature = "async_std")]
24use futures::{select, FutureExt};
25
26use binary_util::interfaces::{Reader, Writer};
27use binary_util::ByteReader;
28
29#[cfg(feature = "async_tokio")]
30use tokio::{
31    net::UdpSocket,
32    select,
33    sync::mpsc::channel as bounded,
34    sync::mpsc::{Receiver, Sender},
35    sync::Mutex,
36    task::{self},
37};
38
39use crate::connection::{ConnMeta, Connection};
40use crate::error::server::ServerError;
41use crate::notify::Notify;
42use crate::protocol::mcpe::motd::Motd;
43use crate::protocol::packet::offline::{
44    IncompatibleProtocolVersion, OfflinePacket, OpenConnectReply, SessionInfoReply, UnconnectedPong,
45};
46use crate::protocol::packet::RakPacket;
47use crate::protocol::Magic;
48use crate::rakrs_debug;
49use crate::util::to_address_token;
50
51pub(crate) type Session = (ConnMeta, Sender<Vec<u8>>);
52
53/// This is a helper enum that allows you to pass in a `SocketAddr` or a `&str` to the `Listener::bind` function.
54/// This is useful for when you want to bind to a specific address, but you don't want to parse it yourself.
55///
56/// This Trait will successfully parse the following:
57/// - `SocketAddr::new("127.0.0.1:19132")`
58/// - `"127.0.0.1:19132"`
59/// - `String::from("127.0.0.1:19132")`
60pub enum PossiblySocketAddr<'a> {
61    SocketAddr(SocketAddr),
62    Str(&'a str),
63    String(String),
64    ActuallyNot,
65}
66
67impl PossiblySocketAddr<'_> {
68    pub fn to_socket_addr(self) -> Option<SocketAddr> {
69        match self {
70            PossiblySocketAddr::SocketAddr(addr) => Some(addr),
71            PossiblySocketAddr::Str(addr) => {
72                // we need to parse it
73                Some(addr.parse::<SocketAddr>().unwrap())
74            }
75            PossiblySocketAddr::String(addr) => {
76                if let Ok(addr) = addr.parse::<SocketAddr>() {
77                    Some(addr.clone())
78                } else {
79                    // try to parse it as a socket addr then a string
80                    if let Ok(mut addr) = addr.to_socket_addrs() {
81                        if let Some(v) = addr.next() {
82                            Some(v)
83                        } else {
84                            None
85                        }
86                    } else {
87                        None
88                    }
89                }
90            }
91            _ => None,
92        }
93    }
94}
95
96impl From<&str> for PossiblySocketAddr<'_> {
97    fn from(s: &str) -> Self {
98        PossiblySocketAddr::String(s.to_string())
99    }
100}
101
102impl From<String> for PossiblySocketAddr<'_> {
103    fn from(s: String) -> Self {
104        PossiblySocketAddr::String(s)
105    }
106}
107
108impl From<SocketAddr> for PossiblySocketAddr<'_> {
109    fn from(s: SocketAddr) -> Self {
110        PossiblySocketAddr::SocketAddr(s)
111    }
112}
113
114impl std::fmt::Display for PossiblySocketAddr<'_> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            PossiblySocketAddr::SocketAddr(addr) => write!(f, "{}", addr),
118            PossiblySocketAddr::Str(addr) => write!(f, "{}", addr),
119            PossiblySocketAddr::String(addr) => write!(f, "{}", addr),
120            PossiblySocketAddr::ActuallyNot => write!(f, "Not a valid address!"),
121        }
122    }
123}
124
125/// The main server struct, this is responsible for listening to connections, and dispatching them to a handler.
126/// > If you are having problems with debugging, you can use the rak-rs debug feature, which will print out
127/// > all packets that are sent and recieved.
128///
129/// <style>
130/// .warning-2 {
131///     background: rgba(255,240,76,0.34) !important;
132///     padding: 0.75em;
133///     border-left: 2px solid #fce811;
134///     font-family: "Source Serif 4", NanumBarunGothic, serif;
135///  }
136///
137/// .warning-2 code {
138///     background: rgba(211,201,88,0.64) !important;
139/// }
140///
141/// .notice-2 {
142///     background: rgba(88, 211, 255, 0.34) !important;
143///     padding: 0.75em;
144///     border-left: 2px solid #4c96ff;
145///     font-family: "Source Serif 4", NanumBarunGothic, serif;
146/// }
147///
148/// .notice-2 code {
149///     background: rgba(88, 211, 255, 0.64) !important;
150/// }
151/// </style>
152///
153/// <div class="notice-2">
154///     <strong>Notice:</strong>
155///     <p>
156///         Currently, the <code>Listener</code> does not support encryption, plugins, or any feature to allow you to
157///         hijack the RakNet connection sequence. Currently rak_rs is a pure, bare-bones RakNet implementation. <br /><br />
158///         There is currently an <a href="https://github.com/NetrexMC/RakNet/issues/48">open issue</a>
159///         to add support for plugins but this is not a priority, instead you should use the <code>Connection</code> struct
160///         to handle your own packets with the <code>recv</code> method.
161///     </p>
162/// </div>
163///
164/// ## A generic example
165/// ```rust ignore
166/// use rak_rs::server::Listener;
167///
168/// #[async_std::main]
169/// async fn main() {
170///     // Bind the server to the specified address, but do not start it.
171///     let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
172///
173///     // Begins listening to connections
174///     server.start().await.unwrap();
175///
176///     // Start recieving connections
177///     loop {
178///         let conn = server.accept().await;
179///         async_std::task::spawn(handle(conn.unwrap()));
180///     }
181/// }
182///
183/// // This is a function that handles the connection, this is where you would handle the connection.
184/// async fn handle(mut conn: Connection) {
185///     loop {
186///         // this is used to cleanup the connection
187///         if conn.get_state().await.is_closed() {
188///             println!("Connection closed!");
189///             break;
190///         }
191///
192///         if let Ok(pk) = conn.recv().await {
193///             println!("Got a connection packet {:?} ", pk);
194///         }
195///     }
196/// }
197/// ```
198///
199/// ## Accepting other protocols
200/// This struct allows support for other raknet protocols, however this is not recommended, because occasionally
201/// the protocol may change and the Listener may not be updated to support it. This was mainly added for MCPE.
202///
203/// ```rust ignore
204/// use rak_rs::server::Listener;
205///
206/// #[async_std::main]
207/// async fn main() {
208///     let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
209///     server.versions = &[10, 11];
210///     server.start().await.unwrap();
211///
212///     loop {
213///         // .. same loop as above
214///     }
215/// }
216/// ```
217pub struct Listener {
218    /// If mcpe is true, this is the default MOTD, this is
219    /// the default MOTD to send to the client. You can change this later by setting
220    /// a motd in the `Conn` struct.
221    pub motd: Motd,
222    /// A server Id, passed in unconnected pong.
223    pub id: u64,
224    /// Supported versions
225    pub versions: &'static [u8],
226    /// Whether or not the server is being served.
227    serving: bool,
228    /// The current socket.
229    sock: Option<Arc<UdpSocket>>,
230    /// A Hashmap off all current connections along with a sending channel
231    /// and some meta data like the time of connection, and the requested MTU_Size
232    connections: Arc<Mutex<HashMap<SocketAddr, Session>>>,
233    /// The recieve communication channel, This is used to dispatch connections between a handle
234    /// It allows you to use the syntax sugar for `Listener::accept()`.
235    recv_comm: Receiver<Connection>,
236    send_comm: Sender<Connection>,
237    // TODO, fix this!
238    // send_evnt: Sender<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
239    // pub recv_evnt: Arc<Mutex<mpsc::Receiver<(ServerEvent, oneshot::Sender<ServerEventResponse>)>>>,
240    // TODO
241    /// A Notifier (sephamore) that will wait until all notified listeners
242    /// are completed, and finish closing.
243    closed: Arc<Notify>,
244    // This is a notifier that acknowledges all connections have been removed from the server successfully.
245    // This is important to prevent memory leaks if the process is continously running.
246    // cleanup: Arc<Condvar>,
247}
248
249impl Listener {
250    /// Binds a new listener to the specified address provided, this will error if the address is invalid or already in use.
251    /// This will not start the listener, you must call [`Listener::start`] to start listening to connections.
252    ///
253    /// ## Example
254    /// ```ignore
255    /// use rak_rs::server::Listener;
256    ///
257    /// async fn start() {
258    ///     let mut server = Listener::bind("").await.unwrap();
259    /// }
260    /// ```
261    ///
262    /// [`PossiblySocketAddr`]: enum.PossiblySocketAddr.html
263    /// [`Listener::start`]: struct.Listener.html#method.start
264    pub async fn bind<I: for<'a> Into<PossiblySocketAddr<'a>>>(
265        address: I,
266    ) -> Result<Self, ServerError> {
267        let a: PossiblySocketAddr = address.into();
268        let address_r: Option<SocketAddr> = a.to_socket_addr();
269        if address_r.is_none() {
270            rakrs_debug!("Invalid binding value");
271            return Err(ServerError::AddrBindErr);
272        }
273
274        let address = address_r.unwrap();
275
276        let sock = match UdpSocket::bind(address).await {
277            Ok(s) => s,
278            Err(_) => return Err(ServerError::AddrBindErr),
279        };
280
281        rakrs_debug!(true, "listener: Bound to {}", address);
282
283        let server_id: u64 = rand::random();
284        let motd = Motd::new(server_id, format!("{}", address.port()));
285
286        // This channel is a Communication channel for when `Connection` structs are initialized.
287        let (send_comm, recv_comm) = bounded::<Connection>(10);
288        // This channel is responsible for handling and dispatching events between clients.
289        // Oneshot will garauntee this event is intended for the client whom requested the event.
290        // TODO: Fix with new event system
291        // let (send_evnt, recv_evnt) =
292        //     mpsc::channel::<(ServerEvent, oneshot::Sender<ServerEventResponse>)>(10);
293
294        let listener = Self {
295            sock: Some(Arc::new(sock)),
296            id: server_id,
297            versions: &[10, 11],
298            motd,
299            send_comm,
300            recv_comm,
301            // send_evnt,
302            // recv_evnt: Arc::new(Mutex::new(recv_evnt)),
303            serving: false,
304            connections: Arc::new(Mutex::new(HashMap::new())),
305            // closer: Arc::new(Semaphore::new(0)),
306            closed: Arc::new(Notify::new()),
307            // cleanup: Arc::new(Notify::new()),
308            // cleanup: Arc::new(Condvar::new()),
309        };
310
311        return Ok(listener);
312    }
313
314    /// This method is required to be called before the server can begin listening to connections.
315    /// However, you must call [`Listener::bind`] before you can call this method, as that method
316    /// is responsible for creating the socket and initializing the server.
317    ///
318    /// ## Example
319    /// ```ignore
320    /// use rak_rs::server::Listener;
321    /// async fn start() {
322    ///     let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
323    ///
324    ///     // let's begin to listen to connections
325    ///     server.start().await;
326    /// }
327    /// ```
328    ///
329    /// [`Listener::bind`]: struct.Listener.html#method.bind
330    pub async fn start(&mut self) -> Result<(), ServerError> {
331        if self.serving {
332            return Err(ServerError::AlreadyOnline);
333        }
334
335        let socket = self.sock.as_ref().unwrap().clone();
336        let send_comm = self.send_comm.clone();
337        // let send_evt = self.send_evnt.clone();
338        let server_id = self.id.clone();
339        #[cfg(feature = "mcpe")]
340        let default_motd = self.motd.clone();
341        let connections = self.connections.clone();
342        let closer = self.closed.clone();
343        let connections2 = self.connections.clone();
344        let closer2 = self.closed.clone();
345        let versions = self.versions;
346
347        self.serving = true;
348
349        #[cfg(feature = "async_std")]
350        let (cs, client_close_recv) = bounded::<SocketAddr>(10);
351        #[cfg(feature = "async_tokio")]
352        let (cs, mut client_close_recv) = bounded::<SocketAddr>(10);
353        let client_close_send = Arc::new(cs);
354
355        task::spawn(async move {
356            // We allocate here to prevent constant allocation of this array
357            let mut buf: [u8; 2048] = [0; 2048];
358            #[cfg(feature = "mcpe")]
359            let motd_default = default_motd.clone();
360            loop {
361                let length: usize;
362                let origin: SocketAddr;
363
364                macro_rules! recv_body {
365                    ($recv: ident) => {
366                        match $recv {
367                            Ok((l, o)) => {
368                                length = l;
369                                origin = o;
370                            }
371                            Err(e) => {
372                                match e.kind() {
373                                    std::io::ErrorKind::ConnectionReset => {
374                                        continue;
375                                    },
376                                    _ => {
377                                        rakrs_debug!(true, "[SERVER-SOCKET] Failed to recieve packet! {}", e);
378                                        continue;
379                                    }
380                                }
381                            }
382                        }
383
384                        // Do a quick check to see if this a valid raknet packet, otherwise we're going to handle it normally
385                        if let Ok(pk) = OfflinePacket::read(&mut ByteReader::from(&buf[..length])) {
386                            // Offline packets are not buffered to the user.
387                            // The reason for this is because we don't wish for the user to be able to disrupt
388                            // raknet protocol, and handshaking.
389                            match pk {
390                                OfflinePacket::UnconnectedPing(_) => {
391                                    // let (resp_tx, resp_rx) =
392                                    //     oneshot::channel::<ServerEventResponse>();
393                                    #[cfg(feature = "mcpe")]
394                                    let motd: Motd = motd_default.clone();
395
396                                    // if let Err(e) = send_evt.try_send((
397                                    //         ServerEvent::RefreshMotdRequest(origin, motd.clone()),
398                                    //         // resp_tx,
399                                    //     ))
400                                    // {
401                                    //     match e {
402                                    //         TrySendError::Full(_) => {
403                                    //             rakrs_debug!(true, "[{}] Event dispatcher is full! Dropping request.", to_address_token(origin));
404                                    //         }
405                                    //         TrySendError::Closed(_) => {
406                                    //             rakrs_debug!(true, "[{}] Event dispatcher is closed! Dropping request.", to_address_token(origin));
407                                    //         }
408                                    //     }
409                                    // }
410
411                                    // if let Ok(res) = resp_rx.await {
412                                    //     // get the motd from the server event otherwise use defaults.
413                                    //     // if let Ok(res) = res {
414                                    //         match res {
415                                    //             ServerEventResponse::RefreshMotd(m) => {
416                                    //                 motd = m;
417                                    //             }
418                                    //             _ => {
419                                    //                 rakrs_debug!(true, "[{}] Response to ServerEvent::RefreshMotdRequest is invalid!", to_address_token(origin));
420                                    //             }
421                                    //         }
422                                    //     // };
423                                    // }
424
425                                    // unconnected pong signature is different if MCPE is specified.
426                                    let resp = UnconnectedPong {
427                                        timestamp: current_epoch(),
428                                        server_id,
429                                        magic: Magic::new(),
430                                        #[cfg(feature = "mcpe")]
431                                        motd,
432                                    };
433
434                                    send_packet_to_socket(&socket, resp.into(), origin).await;
435                                    continue;
436                                }
437                                OfflinePacket::OpenConnectRequest(mut pk) => {
438                                    // TODO make a constant for this
439                                    if !versions.contains(&pk.protocol) {
440                                        let resp = IncompatibleProtocolVersion {
441                                            protocol: pk.protocol,
442                                            magic: Magic::new(),
443                                            server_id,
444                                        };
445
446                                        rakrs_debug!("[{}] Sent ({}) which is invalid RakNet protocol. Version is incompatible with server.", pk.protocol, to_address_token(*&origin));
447
448                                        send_packet_to_socket(&socket, resp.into(), origin).await;
449                                        continue;
450                                    }
451
452                                    rakrs_debug!(
453                                        true,
454                                        "[{}] Client requested Mtu Size: {}",
455                                        to_address_token(*&origin),
456                                        pk.mtu_size
457                                    );
458
459                                    if pk.mtu_size > 2048 {
460                                        rakrs_debug!(
461                                            true,
462                                            "[{}] Client requested Mtu Size: {} which is larger than the maximum allowed size of 2048",
463                                            to_address_token(*&origin),
464                                            pk.mtu_size
465                                        );
466                                        pk.mtu_size = 2048;
467                                    }
468
469                                    let resp = OpenConnectReply {
470                                        server_id,
471                                        // TODO allow encryption
472                                        security: false,
473                                        magic: Magic::new(),
474                                        // TODO make this configurable, this is sent to the client to change
475                                        // it's mtu size, right now we're using what the client prefers.
476                                        // however in some cases this may not be the preferred use case, for instance
477                                        // on servers with larger worlds, you may want a larger mtu size, or if
478                                        // your limited on network bandwith
479                                        mtu_size: pk.mtu_size,
480                                    };
481                                    send_packet_to_socket(&socket, resp.into(), origin).await;
482                                    continue;
483                                }
484                                OfflinePacket::SessionInfoRequest(pk) => {
485                                    let resp = SessionInfoReply {
486                                        server_id,
487                                        client_address: origin,
488                                        magic: Magic::new(),
489                                        mtu_size: pk.mtu_size,
490                                        security: false,
491                                    };
492
493                                    // This is a valid packet, let's check if a session exists, if not, we should create it.
494                                    // Event if the connection is only in offline mode.
495                                    let mut sessions = connections.lock().await;
496
497                                    if !sessions.contains_key(&origin) {
498                                        rakrs_debug!(true, "Creating new session for {}", origin);
499                                        let meta = ConnMeta::new(0);
500                                        let (net_send, net_recv) = bounded::<Vec<u8>>(10);
501                                        let connection =
502                                            Connection::new(origin, &socket, net_recv, client_close_send.clone(), pk.mtu_size).await;
503                                        rakrs_debug!(true, "Created Session for {}", origin);
504
505                                        // Add the connection to the available connections list.
506                                        // we're using the name "sessions" here to differeniate
507                                        // for some reason the reciever likes to be dropped, so we're saving it here.
508                                        sessions.insert(origin, (meta, net_send));
509
510                                        // notify the connection communicator
511                                        if let Err(err) = send_comm.send(connection).await {
512                                            let connection = err.0;
513                                            // there was an error, and we should terminate this connection immediately.
514                                            rakrs_debug!("[{}] Error while communicating with internal connection channel! Connection withdrawn.", to_address_token(connection.address));
515                                            sessions.remove(&origin);
516                                            continue;
517                                        }
518                                    }
519
520                                    // update the sessions mtuSize, this is referred to internally, we also will send this event to the client
521                                    // event channel. However we are not expecting a response.
522
523                                    sessions.get_mut(&origin).unwrap().0.mtu_size = pk.mtu_size;
524                                    rakrs_debug!(
525                                        true,
526                                        "[{}] Updated mtu size to {}",
527                                        to_address_token(origin),
528                                        pk.mtu_size
529                                    );
530
531                                    // let (resp_tx, resp_rx) = oneshot::channel::<ServerEventResponse>();
532
533                                    // if let Err(_) = timeout(Duration::from_millis(5), resp_rx).await {
534                                    //     rakrs_debug!(
535                                    //         "[{}] Failed to update mtu size with the client!",
536                                    //         to_address_token(origin)
537                                    //     );
538                                    // }
539
540                                    // if let Err(_) = send_evt.send((ServerEvent::SetMtuSize(pk.mtu_size), resp_tx))
541                                    //     .await
542                                    // {
543                                    //     rakrs_debug!(
544                                    //         "[{}] Failed to update mtu size with the client!",
545                                    //         to_address_token(origin)
546                                    //     );
547                                    // }
548
549                                    send_packet_to_socket(&socket, resp.into(), origin).await;
550                                    continue;
551                                }
552                                _ => {
553                                    rakrs_debug!(
554                                        "[{}] Received invalid packet!",
555                                        to_address_token(*&origin)
556                                    );
557                                }
558                            }
559                        }
560
561                        // Packet may be valid, but we'll let the connection decide this
562                        let mut sessions = connections.lock().await;
563                        if sessions.contains_key(&origin) {
564                            if let Err(_) = sessions[&origin].1.send(buf[..length].to_vec()).await {
565                                rakrs_debug!(true, "[{}] Failed when handling recieved packet! Could not pass over to internal connection, the channel might be closed! (Removed the connection)", to_address_token(*&origin));
566                                sessions.remove(&origin);
567                            }
568                        }
569                        drop(sessions);
570                    };
571                }
572
573                #[cfg(feature = "async_std")]
574                select! {
575                    _ = closer.wait().fuse() => {
576                        rakrs_debug!(true, "[SERVER] [NETWORK] Server has recieved the shutdown notification!");
577                        break;
578                    }
579                    recv = socket.recv_from(&mut buf).fuse() => {
580                       recv_body!(recv);
581                    }
582                }
583
584                #[cfg(feature = "async_tokio")]
585                select! {
586                    _ = closer.wait() => {
587                        rakrs_debug!(true, "[SERVER] [NETWORK] Server has recieved the shutdown notification!");
588                        break;
589                    }
590                    recv = socket.recv_from(&mut buf) => {
591                        recv_body!(recv);
592                    }
593                }
594            }
595        });
596
597        task::spawn(async move {
598            // here we loop and recv from the client_close_recv channel
599            // and remove the connection from the hashmap
600            loop {
601                #[cfg(feature = "async_std")]
602                select! {
603                    _ = closer2.wait().fuse() => {
604                        rakrs_debug!(true, "[SERVER] [Cleanup] Server has recieved the shutdown notification!");
605                        break;
606                    }
607                    addr = client_close_recv.recv().fuse() => {
608                        if let Ok(addr) = addr {
609                            rakrs_debug!(true, "[SERVER] [Cleanup] Removing connection for {}", to_address_token(addr));
610                            let mut c = connections2.lock().await;
611                            c.remove(&addr);
612                            drop(c);
613                        }
614                    }
615                }
616
617                #[cfg(feature = "async_tokio")]
618                select! {
619                    _ = closer2.wait() => {
620                        rakrs_debug!(true, "[SERVER] [Cleanup] Server has recieved the shutdown notification!");
621                        break;
622                    }
623                    addr = client_close_recv.recv() => {
624                        if let Some(addr) = addr {
625                            rakrs_debug!(true, "[SERVER] [Cleanup] Removing connection for {}", to_address_token(addr));
626                            let mut c = connections2.lock().await;
627                            c.remove(&addr);
628                            drop(c);
629                        }
630                    }
631                }
632            }
633        });
634
635        return Ok(());
636    }
637
638    // pub async fn recv_event(&self) -> Result<(ServerEvent, oneshot::Sender<ServerEventResponse>), ServerError> {
639    //     if !self.serving {
640    //         Err(ServerError::NotListening)
641    //     } else {
642    //         let mut recvr = self.recv_evnt.lock().await;
643    //         tokio::select! {
644    //             receiver = recvr.recv() => {
645    //                 match receiver {
646    //                     Some(c) => Ok(c),
647    //                     None => Err(ServerError::Killed)
648    //                 }
649    //             },
650    //             _ = self.closer.acquire() => {
651    //                 Err(ServerError::Killed)
652    //             }
653    //         }
654    //     }
655    // }
656
657    /// This method is used to accept a connection, this will block until a connection is available.
658    /// You can only call this method once both [`Listener::bind`] AND [`Listener::start`] have. This function
659    /// is used to recieve and accept connections. Alternatively, you can refuse a connection
660    /// by dropping it when you accept it.
661    ///
662    /// [`Listener::bind`]: struct.Listener.html#method.bind
663    /// [`Listener::start`]: struct.Listener.html#method.start
664    ///
665    /// <div class="warning-2">
666    ///     <strong>Warning:</strong>
667    ///     <p>
668    ///         This method will block until a connection is available, this is not recommended to be used
669    ///         in the main thread, instead you should use a task or future to handle connections.
670    ///     </p>
671    /// </div>
672    ///
673    /// ## Example
674    /// ```rust ignore
675    /// use rak_rs::server::Listener;
676    /// use rak_rs::Connection;
677    ///
678    /// #[async_std::main]
679    /// async fn main() {
680    ///     let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
681    ///     server.start().await.unwrap();
682    ///
683    ///     loop {
684    ///         let conn = server.accept().await;
685    ///         async_std::task::spawn(handle(conn.unwrap()));
686    ///     }
687    /// }
688    ///
689    /// async fn handle(mut conn: Connection) {
690    ///    loop {
691    ///         let packet = conn.recv().await;
692    ///         println!("Received a packet! {:?}", packet);
693    ///    }
694    /// }
695    /// ```
696    pub async fn accept(&mut self) -> Result<Connection, ServerError> {
697        if !self.serving {
698            Err(ServerError::NotListening)
699        } else {
700            let receiver = self.recv_comm.recv().await;
701            return match receiver {
702                #[cfg(feature = "async_std")]
703                Ok(c) => Ok(c),
704                #[cfg(feature = "async_std")]
705                Err(_) => Err(ServerError::Killed),
706                #[cfg(feature = "async_tokio")]
707                Some(c) => Ok(c),
708                #[cfg(feature = "async_tokio")]
709                None => Err(ServerError::Killed),
710            };
711        }
712    }
713
714    /// Stops the Listener, effectively closing the socket and stopping the server.
715    /// This will also close all connections, and prevent any new connections from being accepted,
716    /// until [`Listener::start`] is called again.
717    ///
718    /// [`Listener::start`]: struct.Listener.html#method.start
719    pub async fn stop(&mut self) -> Result<(), ServerError> {
720        self.closed.notify().await;
721        // self.cleanup.notified().await;
722
723        self.sock = None;
724        self.serving = false;
725
726        Ok(())
727    }
728}
729
730impl Drop for Listener {
731    fn drop(&mut self) {
732        if self.serving {
733            futures_executor::block_on(self.stop()).unwrap();
734        }
735    }
736}
737
738async fn send_packet_to_socket(socket: &Arc<UdpSocket>, packet: RakPacket, origin: SocketAddr) {
739    if let Err(e) = socket
740        .send_to(&mut packet.write_to_bytes().unwrap().as_slice(), origin)
741        .await
742    {
743        rakrs_debug!(
744            "[{}] Failed sending payload to socket! {}",
745            to_address_token(origin),
746            e
747        );
748    }
749}
750
751pub(crate) fn current_epoch() -> u64 {
752    std::time::SystemTime::now()
753        .duration_since(std::time::UNIX_EPOCH)
754        .unwrap()
755        .as_secs() as u64
756}