rak_rs/connection/
mod.rs

1//! This module contains the logic to handle a connection or "peer" to the server.
2//! This is used internally by the server, and is not intended to be used by the end user outside
3//! of the server.
4//!
5//! This module contains the following:
6//! - [`Connection`]: The connection struct, which is used to hold the connection state.
7//! - [`ConnectionState`]: The connection state enum, which is used to hold the state of the connection.
8//! - [`ConnectionMeta`]: The connection meta struct, which is used to hold the meta information of the connection.
9//!
10//! This module also contains the following submodules:
11//! - [`controller`]: The controller submodule, which is used to handle relability of the connection.
12//! - [`queue`]: The queue submodule, which is used to handle the connection queues.
13//! - [`state`]: The state submodule, which is used to handle the connection state.
14//!
15//! # Example
16//! This is a snippet of code you would use after you've accepted a connection from the server with
17//! [`Listener::accept()`].
18//!
19//! ```ignore
20//! use rakrs::connection::Connection;
21//!
22//! async fn handle(mut conn: Connection) {
23//!     loop {
24//!         // get a packet sent from the client
25//!         if let Ok(pk) = conn.recv().await {
26//!             println!("Got a connection packet {:?} ", pk);
27//!         }
28//!
29//!         if !conn.state.lock().await.unwrap().is_available() {
30//!             conn.disconnect("Client disconnected.", false);
31//!             println!("Client disconnected!");
32//!             break;
33//!         }
34//!     }
35//! }
36//! ```
37//!
38//! [`Listener::accept()`]: crate::server::Listener::accept
39//! [`Connection`]: crate::connection::Connection
40//! [`ConnectionState`]: crate::connection::state::ConnectionState
41//! [`ConnectionMeta`]: crate::connection::ConnectionMeta
42//! [`controller`]: crate::connection::controller
43//! [`queue`]: crate::connection::queue
44//! [`state`]: crate::connection::state
45pub mod controller;
46/// Necessary queues for the connection.
47pub mod queue;
48pub mod state;
49
50use std::{
51    net::{IpAddr, Ipv4Addr, SocketAddr},
52    sync::{atomic::AtomicU64, Arc},
53    time::Duration,
54};
55
56use binary_util::interfaces::{Reader, Writer};
57
58#[cfg(feature = "async_std")]
59use async_std::{
60    channel::{bounded, Receiver, RecvError, Sender},
61    net::UdpSocket,
62    sync::{Mutex, RwLock},
63    task::{self, sleep, JoinHandle},
64};
65#[cfg(feature = "async_std")]
66use futures::{select, FutureExt};
67#[cfg(feature = "async_tokio")]
68use tokio::{
69    net::UdpSocket,
70    select,
71    sync::{
72        mpsc::{channel as bounded, Receiver, Sender},
73        Mutex, RwLock,
74    },
75    task::{self, JoinHandle},
76    time::sleep,
77};
78#[cfg(feature = "async_tokio")]
79#[derive(Debug, Clone, Copy, PartialEq)]
80pub enum RecvError {
81    Closed,
82    Timeout,
83}
84#[cfg(feature = "async_tokio")]
85impl std::fmt::Display for RecvError {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            RecvError::Closed => write!(f, "RecvError: Channel is closed!"),
89            RecvError::Timeout => write!(f, "RecvError: Timeout!"),
90        }
91    }
92}
93
94#[cfg(feature = "async_tokio")]
95impl std::error::Error for RecvError {}
96
97use crate::{
98    notify::Notify,
99    protocol::{
100        ack::{Ack, Ackable, ACK, NACK},
101        frame::FramePacket,
102        packet::{
103            offline::OfflinePacket,
104            online::{ConnectedPing, ConnectedPong, ConnectionAccept, Disconnect, OnlinePacket},
105        },
106        reliability::Reliability,
107    },
108    rakrs_debug,
109    server::current_epoch,
110    util::to_address_token,
111};
112
113use self::{
114    queue::{RecvQueue, SendQueue, SendQueueError},
115    state::ConnectionState,
116};
117pub(crate) type ConnNetChan = Arc<Mutex<Receiver<Vec<u8>>>>;
118
119#[derive(Debug, Clone, Copy)]
120pub struct ConnMeta {
121    /// This is important, and is stored within the server itself
122    /// This value is 0 until the connection state is `Connecting`
123    pub mtu_size: u16,
124    /// The time this connection last sent any data. This will be used during server tick.
125    pub recv_time: u64,
126}
127
128impl ConnMeta {
129    pub fn new(mtu_size: u16) -> Self {
130        Self {
131            mtu_size,
132            recv_time: current_epoch(),
133        }
134    }
135}
136
137/// The connection struct contains the logic for a connection to the server.
138/// The following methods are the most important:
139/// - [`Connection::recv()`]: This is used to recieve packets from the client.
140/// - [`Connection::send()`]: This is used to send packets to the client.
141/// - [`Connection::close()`]: This is used to disconnect the client.
142///
143/// <style>
144/// .warning-2 {
145///     background: rgba(255,240,76,0.34) !important;
146///     padding: 0.75em;
147///     border-left: 2px solid #fce811;
148///     font-family: "Source Serif 4", NanumBarunGothic, serif;
149///  }
150///
151/// .warning-2 code {
152///     background: rgba(211,201,88,0.64) !important;
153/// }
154///
155/// .notice-2 {
156///     background: rgba(88, 211, 255, 0.34) !important;
157///     padding: 0.75em;
158///     border-left: 2px solid #4c96ff;
159///     font-family: "Source Serif 4", NanumBarunGothic, serif;
160/// }
161///
162/// .notice-2 code {
163///     background: rgba(88, 211, 255, 0.64) !important;
164/// }
165/// </style>
166///
167/// <div class="warning-2">
168///     <strong>Warning:</strong>
169///     <p>
170///         This struct does not provide an API for connecting to other peers, for that
171///         you should use the
172///         <a href="/rak-rs/latest/client/struct.Client.html">
173///          Client
174///         </a>
175///         struct.
176///     </p>
177/// </div>
178pub struct Connection {
179    /// The address of the connection
180    /// This is internally tokenized by rak-rs
181    pub address: SocketAddr,
182    pub state: Arc<Mutex<ConnectionState>>,
183    /// The queue used to send packets back to the connection.
184    send_queue: Arc<RwLock<SendQueue>>,
185    /// The queue used to recieve packets, this is read from by the server.
186    /// This is only used internally.
187    recv_queue: Arc<Mutex<RecvQueue>>,
188    /// The network channel, this is where the connection will be recieving it's packets.
189    /// This is interfaced to provide the api for `Connection::recv()`
190    internal_net_recv: ConnNetChan,
191    /// A notifier for when the connection should close.
192    /// This is used for absolute cleanup withtin the connection
193    disconnect: Arc<Notify>,
194    /// The event dispatcher for the connection.
195    // evt_sender: Sender<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
196    /// The event receiver for the connection.
197    // evt_receiver: mpsc::Receiver<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
198    /// The last time a packet was recieved. This is used to keep the connection from
199    /// being in memory longer than it should be.
200    recv_time: Arc<AtomicU64>,
201    tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
202}
203
204impl Connection {
205    /// Initializes a new Connection instance.
206    pub async fn new(
207        address: SocketAddr,
208        socket: &Arc<UdpSocket>,
209        net: Receiver<Vec<u8>>,
210        notifier: Arc<Sender<SocketAddr>>,
211        mtu: u16,
212    ) -> Self {
213        let (net_sender, net_receiver) = bounded::<Vec<u8>>(100);
214        // let (evt_sender, evt_receiver) = mpsc::channel::<(ServerEvent, oneshot::Sender<ServerEventResponse>)>(10);
215        let c = Self {
216            address,
217            send_queue: Arc::new(RwLock::new(SendQueue::new(
218                mtu,
219                12000,
220                5,
221                socket.clone(),
222                address,
223            ))),
224            recv_queue: Arc::new(Mutex::new(RecvQueue::new())),
225            internal_net_recv: Arc::new(Mutex::new(net_receiver)),
226            // evt_sender,
227            // evt_receiver,
228            state: Arc::new(Mutex::new(ConnectionState::Unidentified)),
229            // disconnect: Arc::new(Condvar::new()),
230            disconnect: Arc::new(Notify::new()),
231            recv_time: Arc::new(AtomicU64::new(current_epoch())),
232            tasks: Arc::new(Mutex::new(Vec::new())),
233        };
234
235        let tk = c.tasks.clone();
236        let mut tasks = tk.lock().await;
237        tasks.push(c.init_tick(notifier));
238        tasks.push(c.init_net_recv(net, net_sender));
239
240        return c;
241    }
242
243    /// Initializes the client ticking process!
244    pub(crate) fn init_tick(&self, notifier: Arc<Sender<SocketAddr>>) -> task::JoinHandle<()> {
245        let address = self.address;
246        let closer = self.disconnect.clone();
247        let last_recv = self.recv_time.clone();
248        let send_queue = self.send_queue.clone();
249        let recv_queue = self.recv_queue.clone();
250        let state = self.state.clone();
251        let mut last_ping: u16 = 0;
252
253        // initialize the event io
254        // we initialize the ticking function here, it's purpose is to update the state of the current connection
255        // while handling throttle
256        return task::spawn(async move {
257            loop {
258                macro_rules! tick_body {
259                    () => {
260                        let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
261                        let mut cstate = state.lock().await;
262
263                        if *cstate == ConnectionState::Disconnected {
264                            rakrs_debug!(
265                                true,
266                                "[{}] Connection has been closed due to state!",
267                                to_address_token(address)
268                            );
269                            // closer.notify_all();
270                            closer.notify().await;
271                            break;
272                        }
273
274                        if recv + 15 <= current_epoch() {
275                            *cstate = ConnectionState::Disconnected;
276                            rakrs_debug!(
277                                true,
278                                "[{}] Connection has been closed due to inactivity!",
279                                to_address_token(address)
280                            );
281                            // closer.notify_all();
282                            closer.notify().await;
283                            break;
284                        }
285
286                        if recv + 10 <= current_epoch() && cstate.is_reliable() {
287                            *cstate = ConnectionState::TimingOut;
288                            rakrs_debug!(
289                                true,
290                                "[{}] Connection is timing out, sending a ping!",
291                                to_address_token(address)
292                            );
293                        }
294
295                        let mut sendq = send_queue.write().await;
296                        let mut recv_q = recv_queue.lock().await;
297
298                        if last_ping >= 3000 {
299                            let ping = ConnectedPing {
300                                time: current_epoch() as i64,
301                            };
302                            if let Ok(_) = sendq
303                                .send_packet(ping.into(), Reliability::Reliable, true)
304                                .await
305                            {};
306                            last_ping = 0;
307                        } else {
308                            last_ping += 50;
309                        }
310
311                        sendq.update().await;
312
313                        // Flush the queue of acks and nacks, and respond to them
314                        let ack = Ack::from_records(recv_q.ack_flush(), false);
315                        if ack.records.len() > 0 {
316                            if let Ok(p) = ack.write_to_bytes() {
317                                sendq.send_stream(p.as_slice()).await;
318                            }
319                        }
320
321                        // flush nacks from recv queue
322                        let nack = Ack::from_records(recv_q.nack_queue(), true);
323                        if nack.records.len() > 0 {
324                            if let Ok(p) = nack.write_to_bytes() {
325                                sendq.send_stream(p.as_slice()).await;
326                            }
327                        }
328                    };
329                }
330
331                #[cfg(feature = "async_std")]
332                select! {
333                    _ = closer.wait().fuse() => {
334                        rakrs_debug!(true, "[{}] [task: tick] Connection has been closed due to closer!", to_address_token(address));
335                        break;
336                    }
337                    _ = sleep(Duration::from_millis(50)).fuse() => {
338                       tick_body!();
339                    }
340                }
341
342                #[cfg(feature = "async_tokio")]
343                select! {
344                    _ = closer.wait() => {
345                        rakrs_debug!(true, "[{}] [task: tick] Connection has been closed due to closer!", to_address_token(address));
346                        break;
347                    }
348                    _ = sleep(Duration::from_millis(50)) => {
349                       tick_body!();
350                    }
351                }
352            }
353
354            #[cfg(feature = "async_std")]
355            if let Ok(_) = notifier.send(address).await {
356                rakrs_debug!(
357                    true,
358                    "[{}] [task: tick] Connection has been closed due to closer!",
359                    to_address_token(address)
360                );
361            } else {
362                rakrs_debug!(
363                    true,
364                    "[{}] [task: tick] Connection has been closed due to closer!",
365                    to_address_token(address)
366                );
367            }
368
369            #[cfg(feature = "async_tokio")]
370            if let Ok(_) = notifier.send(address).await {
371                rakrs_debug!(
372                    true,
373                    "[{}] [task: tick] Connection has been closed due to closer!",
374                    to_address_token(address)
375                );
376            } else {
377                rakrs_debug!(
378                    true,
379                    "[{}] [task: tick] Connection has been closed due to closer!",
380                    to_address_token(address)
381                );
382            }
383            rakrs_debug!(
384                true,
385                "[{}] Connection has been cleaned up!",
386                to_address_token(address)
387            );
388        });
389    }
390
391    /// This function initializes the raw internal packet handling task!
392    ///
393    pub(crate) fn init_net_recv(
394        &self,
395        // THIS IS ONLY ACTIVATED ON STD
396        #[cfg(feature = "async_std")] net: Receiver<Vec<u8>>,
397        // ONLY ACTIVATED ON TOKIO
398        #[cfg(feature = "async_tokio")] mut net: Receiver<Vec<u8>>,
399        sender: Sender<Vec<u8>>,
400    ) -> task::JoinHandle<()> {
401        let recv_time = self.recv_time.clone();
402        let recv_q = self.recv_queue.clone();
403        let send_q = self.send_queue.clone();
404        let disconnect = self.disconnect.clone();
405        let state = self.state.clone();
406        let address = self.address;
407
408        return task::spawn(async move {
409            loop {
410                macro_rules! handle_payload {
411                    ($payload: ident) => {
412                        // We've recieved a payload!
413                        recv_time.store(current_epoch(), std::sync::atomic::Ordering::Relaxed);
414                        let mut cstate = state.lock().await;
415
416                        if *cstate == ConnectionState::TimingOut {
417                            rakrs_debug!(
418                                "[{}] Connection is no longer timing out!",
419                                to_address_token(address)
420                            );
421                            *cstate = ConnectionState::Connected;
422                        }
423
424                        drop(cstate);
425
426                        let id = $payload[0];
427                        match id {
428                            // This is a frame packet.
429                            // This packet will be handled by the recv_queue
430                            0x80..=0x8d => {
431                                if let Ok(pk) = FramePacket::read_from_slice(&$payload[..]) {
432                                    let mut rq = recv_q.lock().await;
433
434                                    if let Err(e) = rq.insert(pk) {
435                                        rakrs_debug!(
436                                            true,
437                                            "[{}] Failed to insert frame packet! {:?}",
438                                            to_address_token(address),
439                                            e
440                                        );
441                                    };
442
443                                    let buffers = rq.flush();
444
445                                    for buffer in buffers {
446                                        let res = Connection::process_packet(
447                                            &buffer, &address, &sender, &send_q, &state,
448                                        )
449                                        .await;
450                                        if let Ok(v) = res {
451                                            if v == true {
452                                                // DISCONNECT
453                                                // disconnect.close();
454                                                rakrs_debug!(true, "[{}] Connection::process_packet returned true!", to_address_token(address));
455                                                disconnect.notify().await;
456                                                break;
457                                            }
458                                        }
459                                        if let Err(e) = res {
460                                            rakrs_debug!(
461                                                "[{}] Failed to process packet: {:?}!",
462                                                to_address_token(address),
463                                                e
464                                            );
465                                        };
466                                    }
467
468                                    drop(rq);
469                                } else {
470                                    rakrs_debug!(
471                                        true,
472                                        "[{}] Failed to parse frame packet!",
473                                        to_address_token(address)
474                                    );
475                                }
476                            }
477                            NACK => {
478                                // Validate this is a nack packet
479
480                                if let Ok(nack) = Ack::read_from_slice(&$payload[..]) {
481                                    // The client acknowledges it did not recieve these packets
482                                    // We should resend them.
483                                    let mut sq = send_q.write().await;
484                                    let resend = sq.nack(nack);
485
486                                    if resend.len() > 0 {
487                                        for packet in resend {
488                                            if let Ok(buffer) = packet.write_to_bytes() {
489                                                if let Err(_) = sq.insert(buffer.as_slice(), Reliability::Unreliable, true, Some(0)).await {
490                                                    rakrs_debug!(
491                                                        true,
492                                                        "[{}] Failed to insert packet into send queue!",
493                                                        to_address_token(address)
494                                                    );
495                                                }
496                                            } else {
497                                                rakrs_debug!(
498                                                    true,
499                                                    "[{}] Failed to send packet to client (parsing failed)!",
500                                                    to_address_token(address)
501                                                );
502                                            }
503                                        }
504                                    }
505                                }
506                            }
507                            ACK => {
508                                // first lets validate this is an ack packet
509                                if let Ok(ack) = Ack::read_from_slice(&$payload[..]) {
510                                    // The client acknowledges it recieved these packets
511                                    // We should remove them from the queue.
512                                    let mut sq = send_q.write().await;
513                                    sq.ack(ack.clone());
514                                    drop(sq);
515                                    recv_q.lock().await.ack(ack);
516                                }
517                            }
518                            _ => {
519                                rakrs_debug!(
520                                    "[{}] Unknown RakNet packet recieved (Or packet is sent out of scope).",
521                                    to_address_token(address)
522                                );
523                            }
524                        };
525                    };
526                }
527
528                #[cfg(feature = "async_std")]
529                select! {
530                    _ = disconnect.wait().fuse() => {
531                        rakrs_debug!(true, "[{}] [task: net_recv] Connection has been closed due to closer!", to_address_token(address));
532                        break;
533                    }
534                    res = net.recv().fuse() => {
535                        match res {
536                            Ok(payload) => {
537                                handle_payload!(payload);
538                            }
539                            _ => continue,
540                        }
541                    }
542                };
543
544                #[cfg(feature = "async_tokio")]
545                select! {
546                    _ = disconnect.wait() => {
547                        rakrs_debug!(true, "[{}] [task: net_recv] Connection has been closed due to closer!", to_address_token(address));
548                        break;
549                    }
550                    res = net.recv() => {
551                        match res {
552                            Some(payload) => {
553                                handle_payload!(payload);
554                            }
555                            _ => continue,
556                        }
557                    }
558                };
559            }
560        });
561    }
562
563    pub(crate) async fn process_packet(
564        buffer: &[u8],
565        address: &SocketAddr,
566        sender: &Sender<Vec<u8>>,
567        send_q: &Arc<RwLock<SendQueue>>,
568        state: &Arc<Mutex<ConnectionState>>,
569    ) -> Result<bool, ()> {
570        if buffer.len() < 1 {
571            rakrs_debug!("[{}] Got packet: {}", to_address_token(*address), buffer[0]);
572        }
573        if let Ok(online_packet) = OnlinePacket::read_from_slice(&buffer) {
574            match online_packet {
575                OnlinePacket::ConnectedPing(pk) => {
576                    let response = ConnectedPong {
577                        ping_time: pk.time,
578                        pong_time: current_epoch() as i64,
579                    };
580                    let mut q = send_q.write().await;
581                    if let Ok(_) = q
582                        .send_packet(response.into(), Reliability::Reliable, true)
583                        .await
584                    {
585                        return Ok(false);
586                    } else {
587                        rakrs_debug!(
588                            true,
589                            "[{}] Failed to send ConnectedPong packet!",
590                            to_address_token(*address)
591                        );
592                        return Err(());
593                    }
594                }
595                OnlinePacket::ConnectedPong(_pk) => {
596                    // do nothing rn
597                    // TODO: add ping calculation
598                    return Ok(false);
599                }
600                OnlinePacket::ConnectionRequest(pk) => {
601                    let internal_ids = vec![
602                        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19132),
603                        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19133),
604                        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19134),
605                        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19135),
606                        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19136),
607                    ];
608                    let response = ConnectionAccept {
609                        system_index: 0,
610                        client_address: *address,
611                        internal_ids,
612                        request_time: pk.time,
613                        timestamp: current_epoch() as i64,
614                    };
615                    let mut q = send_q.write().await;
616                    *state.lock().await = ConnectionState::Connecting;
617                    if let Ok(_) = q
618                        .send_packet(response.clone().into(), Reliability::Reliable, true)
619                        .await
620                    {
621                        return Ok(false);
622                    } else {
623                        rakrs_debug!(
624                            true,
625                            "[{}] Failed to send ConnectionAccept packet!",
626                            to_address_token(*address)
627                        );
628                        return Err(());
629                    }
630                }
631                OnlinePacket::Disconnect(_) => {
632                    // Disconnect the client immediately.
633                    // connection.disconnect("Client disconnected.", false);
634                    return Ok(true);
635                }
636                OnlinePacket::LostConnection(_) => {
637                    // Disconnect the client immediately.
638                    // connection.disconnect("Client disconnected.", false);
639                    rakrs_debug!(
640                        "[{}] Client has lost connection, disconnecting client!",
641                        to_address_token(*address)
642                    );
643                    return Ok(true);
644                }
645                OnlinePacket::NewConnection(_) => {
646                    // if we are already connected, disconnect the client.
647                    if *state.lock().await == ConnectionState::Connected {
648                        rakrs_debug!(
649                            true,
650                            "[{}] Client is already connected, disconnecting client!",
651                            to_address_token(*address)
652                        );
653                        return Ok(true);
654                    }
655
656                    *state.lock().await = ConnectionState::Connected;
657                    return Ok(false);
658                }
659                _ => {
660                    rakrs_debug!(
661                        true,
662                        "[{}] Forwarding packet to socket!\n{:?}",
663                        to_address_token(*address),
664                        buffer
665                    );
666                    if let Err(_) = sender.send(buffer.to_vec()).await {
667                        rakrs_debug!(
668                            "[{}] Failed to to forward packet to recv channel...",
669                            to_address_token(*address)
670                        );
671                        return Err(());
672                    }
673                    return Ok(false);
674                }
675            }
676        } else if let Ok(_) = OfflinePacket::read_from_slice(&buffer) {
677            *state.lock().await = ConnectionState::Disconnecting;
678            rakrs_debug!(
679                true,
680                "[{}] Invalid protocol! Disconnecting client!",
681                to_address_token(*address)
682            );
683            return Err(());
684        }
685
686        rakrs_debug!(
687            true,
688            "[{}] Either Game-packet or unknown packet, sending buffer to client...",
689            to_address_token(*address)
690        );
691        if let Err(_) = sender.send(buffer.to_vec()).await {
692            rakrs_debug!(
693                "[{}] Failed to to forward packet to recv channel...",
694                to_address_token(*address)
695            );
696            return Err(());
697        }
698        Ok(false)
699    }
700
701    /// This method is used to recieve packets from the client connection.
702    /// Packets that are recieved here are packets sent by the peer expected
703    /// to be handled by the server.
704    ///
705    /// This is where you would handle your packets from the client.
706    ///
707    /// # Example
708    /// This is a snippet of code you would use after you've accepted a connection from the server with
709    /// [`Listener::accept()`].
710    ///
711    /// ```ignore
712    /// use rakrs::connection::Connection;
713    ///
714    /// async fn handle(mut conn: Connection) {
715    ///     loop {
716    ///         // get a packet sent from the client
717    ///         if let Ok(pk) = conn.recv().await {
718    ///             println!("Got a connection packet {:?} ", pk);
719    ///         }
720    ///     }
721    /// }
722    /// ```
723    pub async fn recv(&mut self) -> Result<Vec<u8>, RecvError> {
724        #[allow(unused_mut)]
725        let mut q = self.internal_net_recv.as_ref().lock().await;
726        match q.recv().await {
727            #[cfg(feature = "async_std")]
728            Ok(packet) => Ok(packet),
729            #[cfg(feature = "async_std")]
730            Err(e) => Err(e),
731            #[cfg(feature = "async_tokio")]
732            Some(packet) => Ok(packet),
733            #[cfg(feature = "async_tokio")]
734            None => Err(RecvError::Closed),
735        }
736    }
737
738    // /// Handle a RakNet Event. These are sent as they happen.
739    // ///
740    // /// EG:
741    // /// ```ignore
742    // /// let conn: Connection = Connection::new();
743    // ///
744    // /// while let Some((event, responder)) = conn.recv_ev {
745    // ///     match event {
746    // ///         ServerEvent::SetMtuSize(mtu) => {
747    // ///             println!("client updated mtu!");
748    // ///             responder.send(ServerEventResponse::Acknowledge);
749    // ///         }
750    // ///     }
751    // /// }
752    // /// ```
753    // pub async fn recv_ev(
754    //     &mut self,
755    // ) -> Result<(ServerEvent, oneshot::Sender<ServerEventResponse>), ConnectionError> {
756    //     match self.evt_receiver.recv().await {
757    //         Some((server_event, event_responder)) => {
758    //             return Ok((server_event, event_responder));
759    //         }
760    //         None => {
761    //             if self.disconnect.is_closed() {
762    //                 return Err(ConnectionError::Closed);
763    //             }
764    //             return Err(ConnectionError::EventDispatchError);
765    //         }
766    //     }
767    // }
768
769    pub async fn is_closed(&self) -> bool {
770        !self.state.lock().await.is_available()
771    }
772
773    /// This method is used to send payloads to the connection. This method internally
774    /// will encode your payload into a RakNet packet, and send it to the client.
775    ///
776    /// # Example
777    /// This is a snippet of code you would use when you need to send a payload to the client.
778    /// ```ignore
779    /// use rakrs::connection::Connection;
780    ///
781    /// async fn send_payload(mut conn: Connection) {
782    ///     conn.send(&[0x01, 0x02, 0x03], true).await.unwrap();
783    /// }
784    /// ```
785    pub async fn send(&self, buffer: &[u8], immediate: bool) -> Result<(), SendQueueError> {
786        let mut q = self.send_queue.write().await;
787        rakrs_debug!("Send call, sending to write");
788        if let Err(e) = q
789            .insert(buffer, Reliability::ReliableOrd, immediate, Some(0))
790            .await
791        {
792            return Err(e);
793        }
794        Ok(())
795    }
796
797    /// This method should be used when you are ready to disconnect the client.
798    /// this method will attempt to send a disconnect packet to the client, and
799    /// then close the connection.
800    pub async fn close(&self) {
801        rakrs_debug!(
802            true,
803            "[{}] Dropping connection!",
804            to_address_token(self.address)
805        );
806        if let Err(_) = self
807            .send(
808                &OnlinePacket::Disconnect(Disconnect {})
809                    .write_to_bytes()
810                    .unwrap()
811                    .as_slice(),
812                true,
813            )
814            .await
815        {
816            rakrs_debug!(
817                true,
818                "[{}] Failed to send disconnect packet when closing!",
819                to_address_token(self.address)
820            );
821        }
822        let tasks = self.tasks.clone();
823
824        for task in tasks.lock().await.drain(..) {
825            #[cfg(feature = "async_std")]
826            task.cancel().await;
827            #[cfg(feature = "async_tokio")]
828            task.abort();
829        }
830    }
831}
832
833impl Drop for Connection {
834    fn drop(&mut self) {
835        futures_executor::block_on(async {
836            if self.is_closed().await {
837                return;
838            }
839            self.close().await;
840        });
841    }
842}