rak_rs/connection/mod.rs
1//! This module contains the logic to handle a connection or "peer" to the server.
2//! This is used internally by the server, and is not intended to be used by the end user outside
3//! of the server.
4//!
5//! This module contains the following:
6//! - [`Connection`]: The connection struct, which is used to hold the connection state.
7//! - [`ConnectionState`]: The connection state enum, which is used to hold the state of the connection.
8//! - [`ConnectionMeta`]: The connection meta struct, which is used to hold the meta information of the connection.
9//!
10//! This module also contains the following submodules:
11//! - [`controller`]: The controller submodule, which is used to handle relability of the connection.
12//! - [`queue`]: The queue submodule, which is used to handle the connection queues.
13//! - [`state`]: The state submodule, which is used to handle the connection state.
14//!
15//! # Example
16//! This is a snippet of code you would use after you've accepted a connection from the server with
17//! [`Listener::accept()`].
18//!
19//! ```ignore
20//! use rakrs::connection::Connection;
21//!
22//! async fn handle(mut conn: Connection) {
23//! loop {
24//! // get a packet sent from the client
25//! if let Ok(pk) = conn.recv().await {
26//! println!("Got a connection packet {:?} ", pk);
27//! }
28//!
29//! if !conn.state.lock().await.unwrap().is_available() {
30//! conn.disconnect("Client disconnected.", false);
31//! println!("Client disconnected!");
32//! break;
33//! }
34//! }
35//! }
36//! ```
37//!
38//! [`Listener::accept()`]: crate::server::Listener::accept
39//! [`Connection`]: crate::connection::Connection
40//! [`ConnectionState`]: crate::connection::state::ConnectionState
41//! [`ConnectionMeta`]: crate::connection::ConnectionMeta
42//! [`controller`]: crate::connection::controller
43//! [`queue`]: crate::connection::queue
44//! [`state`]: crate::connection::state
45pub mod controller;
46/// Necessary queues for the connection.
47pub mod queue;
48pub mod state;
49
50use std::{
51 net::{IpAddr, Ipv4Addr, SocketAddr},
52 sync::{atomic::AtomicU64, Arc},
53 time::Duration,
54};
55
56use binary_util::interfaces::{Reader, Writer};
57
58#[cfg(feature = "async_std")]
59use async_std::{
60 channel::{bounded, Receiver, RecvError, Sender},
61 net::UdpSocket,
62 sync::{Mutex, RwLock},
63 task::{self, sleep, JoinHandle},
64};
65#[cfg(feature = "async_std")]
66use futures::{select, FutureExt};
67#[cfg(feature = "async_tokio")]
68use tokio::{
69 net::UdpSocket,
70 select,
71 sync::{
72 mpsc::{channel as bounded, Receiver, Sender},
73 Mutex, RwLock,
74 },
75 task::{self, JoinHandle},
76 time::sleep,
77};
78#[cfg(feature = "async_tokio")]
79#[derive(Debug, Clone, Copy, PartialEq)]
80pub enum RecvError {
81 Closed,
82 Timeout,
83}
84#[cfg(feature = "async_tokio")]
85impl std::fmt::Display for RecvError {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 RecvError::Closed => write!(f, "RecvError: Channel is closed!"),
89 RecvError::Timeout => write!(f, "RecvError: Timeout!"),
90 }
91 }
92}
93
94#[cfg(feature = "async_tokio")]
95impl std::error::Error for RecvError {}
96
97use crate::{
98 notify::Notify,
99 protocol::{
100 ack::{Ack, Ackable, ACK, NACK},
101 frame::FramePacket,
102 packet::{
103 offline::OfflinePacket,
104 online::{ConnectedPing, ConnectedPong, ConnectionAccept, Disconnect, OnlinePacket},
105 },
106 reliability::Reliability,
107 },
108 rakrs_debug,
109 server::current_epoch,
110 util::to_address_token,
111};
112
113use self::{
114 queue::{RecvQueue, SendQueue, SendQueueError},
115 state::ConnectionState,
116};
117pub(crate) type ConnNetChan = Arc<Mutex<Receiver<Vec<u8>>>>;
118
119#[derive(Debug, Clone, Copy)]
120pub struct ConnMeta {
121 /// This is important, and is stored within the server itself
122 /// This value is 0 until the connection state is `Connecting`
123 pub mtu_size: u16,
124 /// The time this connection last sent any data. This will be used during server tick.
125 pub recv_time: u64,
126}
127
128impl ConnMeta {
129 pub fn new(mtu_size: u16) -> Self {
130 Self {
131 mtu_size,
132 recv_time: current_epoch(),
133 }
134 }
135}
136
137/// The connection struct contains the logic for a connection to the server.
138/// The following methods are the most important:
139/// - [`Connection::recv()`]: This is used to recieve packets from the client.
140/// - [`Connection::send()`]: This is used to send packets to the client.
141/// - [`Connection::close()`]: This is used to disconnect the client.
142///
143/// <style>
144/// .warning-2 {
145/// background: rgba(255,240,76,0.34) !important;
146/// padding: 0.75em;
147/// border-left: 2px solid #fce811;
148/// font-family: "Source Serif 4", NanumBarunGothic, serif;
149/// }
150///
151/// .warning-2 code {
152/// background: rgba(211,201,88,0.64) !important;
153/// }
154///
155/// .notice-2 {
156/// background: rgba(88, 211, 255, 0.34) !important;
157/// padding: 0.75em;
158/// border-left: 2px solid #4c96ff;
159/// font-family: "Source Serif 4", NanumBarunGothic, serif;
160/// }
161///
162/// .notice-2 code {
163/// background: rgba(88, 211, 255, 0.64) !important;
164/// }
165/// </style>
166///
167/// <div class="warning-2">
168/// <strong>Warning:</strong>
169/// <p>
170/// This struct does not provide an API for connecting to other peers, for that
171/// you should use the
172/// <a href="/rak-rs/latest/client/struct.Client.html">
173/// Client
174/// </a>
175/// struct.
176/// </p>
177/// </div>
178pub struct Connection {
179 /// The address of the connection
180 /// This is internally tokenized by rak-rs
181 pub address: SocketAddr,
182 pub state: Arc<Mutex<ConnectionState>>,
183 /// The queue used to send packets back to the connection.
184 send_queue: Arc<RwLock<SendQueue>>,
185 /// The queue used to recieve packets, this is read from by the server.
186 /// This is only used internally.
187 recv_queue: Arc<Mutex<RecvQueue>>,
188 /// The network channel, this is where the connection will be recieving it's packets.
189 /// This is interfaced to provide the api for `Connection::recv()`
190 internal_net_recv: ConnNetChan,
191 /// A notifier for when the connection should close.
192 /// This is used for absolute cleanup withtin the connection
193 disconnect: Arc<Notify>,
194 /// The event dispatcher for the connection.
195 // evt_sender: Sender<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
196 /// The event receiver for the connection.
197 // evt_receiver: mpsc::Receiver<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
198 /// The last time a packet was recieved. This is used to keep the connection from
199 /// being in memory longer than it should be.
200 recv_time: Arc<AtomicU64>,
201 tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
202}
203
204impl Connection {
205 /// Initializes a new Connection instance.
206 pub async fn new(
207 address: SocketAddr,
208 socket: &Arc<UdpSocket>,
209 net: Receiver<Vec<u8>>,
210 notifier: Arc<Sender<SocketAddr>>,
211 mtu: u16,
212 ) -> Self {
213 let (net_sender, net_receiver) = bounded::<Vec<u8>>(100);
214 // let (evt_sender, evt_receiver) = mpsc::channel::<(ServerEvent, oneshot::Sender<ServerEventResponse>)>(10);
215 let c = Self {
216 address,
217 send_queue: Arc::new(RwLock::new(SendQueue::new(
218 mtu,
219 12000,
220 5,
221 socket.clone(),
222 address,
223 ))),
224 recv_queue: Arc::new(Mutex::new(RecvQueue::new())),
225 internal_net_recv: Arc::new(Mutex::new(net_receiver)),
226 // evt_sender,
227 // evt_receiver,
228 state: Arc::new(Mutex::new(ConnectionState::Unidentified)),
229 // disconnect: Arc::new(Condvar::new()),
230 disconnect: Arc::new(Notify::new()),
231 recv_time: Arc::new(AtomicU64::new(current_epoch())),
232 tasks: Arc::new(Mutex::new(Vec::new())),
233 };
234
235 let tk = c.tasks.clone();
236 let mut tasks = tk.lock().await;
237 tasks.push(c.init_tick(notifier));
238 tasks.push(c.init_net_recv(net, net_sender));
239
240 return c;
241 }
242
243 /// Initializes the client ticking process!
244 pub(crate) fn init_tick(&self, notifier: Arc<Sender<SocketAddr>>) -> task::JoinHandle<()> {
245 let address = self.address;
246 let closer = self.disconnect.clone();
247 let last_recv = self.recv_time.clone();
248 let send_queue = self.send_queue.clone();
249 let recv_queue = self.recv_queue.clone();
250 let state = self.state.clone();
251 let mut last_ping: u16 = 0;
252
253 // initialize the event io
254 // we initialize the ticking function here, it's purpose is to update the state of the current connection
255 // while handling throttle
256 return task::spawn(async move {
257 loop {
258 macro_rules! tick_body {
259 () => {
260 let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
261 let mut cstate = state.lock().await;
262
263 if *cstate == ConnectionState::Disconnected {
264 rakrs_debug!(
265 true,
266 "[{}] Connection has been closed due to state!",
267 to_address_token(address)
268 );
269 // closer.notify_all();
270 closer.notify().await;
271 break;
272 }
273
274 if recv + 15 <= current_epoch() {
275 *cstate = ConnectionState::Disconnected;
276 rakrs_debug!(
277 true,
278 "[{}] Connection has been closed due to inactivity!",
279 to_address_token(address)
280 );
281 // closer.notify_all();
282 closer.notify().await;
283 break;
284 }
285
286 if recv + 10 <= current_epoch() && cstate.is_reliable() {
287 *cstate = ConnectionState::TimingOut;
288 rakrs_debug!(
289 true,
290 "[{}] Connection is timing out, sending a ping!",
291 to_address_token(address)
292 );
293 }
294
295 let mut sendq = send_queue.write().await;
296 let mut recv_q = recv_queue.lock().await;
297
298 if last_ping >= 3000 {
299 let ping = ConnectedPing {
300 time: current_epoch() as i64,
301 };
302 if let Ok(_) = sendq
303 .send_packet(ping.into(), Reliability::Reliable, true)
304 .await
305 {};
306 last_ping = 0;
307 } else {
308 last_ping += 50;
309 }
310
311 sendq.update().await;
312
313 // Flush the queue of acks and nacks, and respond to them
314 let ack = Ack::from_records(recv_q.ack_flush(), false);
315 if ack.records.len() > 0 {
316 if let Ok(p) = ack.write_to_bytes() {
317 sendq.send_stream(p.as_slice()).await;
318 }
319 }
320
321 // flush nacks from recv queue
322 let nack = Ack::from_records(recv_q.nack_queue(), true);
323 if nack.records.len() > 0 {
324 if let Ok(p) = nack.write_to_bytes() {
325 sendq.send_stream(p.as_slice()).await;
326 }
327 }
328 };
329 }
330
331 #[cfg(feature = "async_std")]
332 select! {
333 _ = closer.wait().fuse() => {
334 rakrs_debug!(true, "[{}] [task: tick] Connection has been closed due to closer!", to_address_token(address));
335 break;
336 }
337 _ = sleep(Duration::from_millis(50)).fuse() => {
338 tick_body!();
339 }
340 }
341
342 #[cfg(feature = "async_tokio")]
343 select! {
344 _ = closer.wait() => {
345 rakrs_debug!(true, "[{}] [task: tick] Connection has been closed due to closer!", to_address_token(address));
346 break;
347 }
348 _ = sleep(Duration::from_millis(50)) => {
349 tick_body!();
350 }
351 }
352 }
353
354 #[cfg(feature = "async_std")]
355 if let Ok(_) = notifier.send(address).await {
356 rakrs_debug!(
357 true,
358 "[{}] [task: tick] Connection has been closed due to closer!",
359 to_address_token(address)
360 );
361 } else {
362 rakrs_debug!(
363 true,
364 "[{}] [task: tick] Connection has been closed due to closer!",
365 to_address_token(address)
366 );
367 }
368
369 #[cfg(feature = "async_tokio")]
370 if let Ok(_) = notifier.send(address).await {
371 rakrs_debug!(
372 true,
373 "[{}] [task: tick] Connection has been closed due to closer!",
374 to_address_token(address)
375 );
376 } else {
377 rakrs_debug!(
378 true,
379 "[{}] [task: tick] Connection has been closed due to closer!",
380 to_address_token(address)
381 );
382 }
383 rakrs_debug!(
384 true,
385 "[{}] Connection has been cleaned up!",
386 to_address_token(address)
387 );
388 });
389 }
390
391 /// This function initializes the raw internal packet handling task!
392 ///
393 pub(crate) fn init_net_recv(
394 &self,
395 // THIS IS ONLY ACTIVATED ON STD
396 #[cfg(feature = "async_std")] net: Receiver<Vec<u8>>,
397 // ONLY ACTIVATED ON TOKIO
398 #[cfg(feature = "async_tokio")] mut net: Receiver<Vec<u8>>,
399 sender: Sender<Vec<u8>>,
400 ) -> task::JoinHandle<()> {
401 let recv_time = self.recv_time.clone();
402 let recv_q = self.recv_queue.clone();
403 let send_q = self.send_queue.clone();
404 let disconnect = self.disconnect.clone();
405 let state = self.state.clone();
406 let address = self.address;
407
408 return task::spawn(async move {
409 loop {
410 macro_rules! handle_payload {
411 ($payload: ident) => {
412 // We've recieved a payload!
413 recv_time.store(current_epoch(), std::sync::atomic::Ordering::Relaxed);
414 let mut cstate = state.lock().await;
415
416 if *cstate == ConnectionState::TimingOut {
417 rakrs_debug!(
418 "[{}] Connection is no longer timing out!",
419 to_address_token(address)
420 );
421 *cstate = ConnectionState::Connected;
422 }
423
424 drop(cstate);
425
426 let id = $payload[0];
427 match id {
428 // This is a frame packet.
429 // This packet will be handled by the recv_queue
430 0x80..=0x8d => {
431 if let Ok(pk) = FramePacket::read_from_slice(&$payload[..]) {
432 let mut rq = recv_q.lock().await;
433
434 if let Err(e) = rq.insert(pk) {
435 rakrs_debug!(
436 true,
437 "[{}] Failed to insert frame packet! {:?}",
438 to_address_token(address),
439 e
440 );
441 };
442
443 let buffers = rq.flush();
444
445 for buffer in buffers {
446 let res = Connection::process_packet(
447 &buffer, &address, &sender, &send_q, &state,
448 )
449 .await;
450 if let Ok(v) = res {
451 if v == true {
452 // DISCONNECT
453 // disconnect.close();
454 rakrs_debug!(true, "[{}] Connection::process_packet returned true!", to_address_token(address));
455 disconnect.notify().await;
456 break;
457 }
458 }
459 if let Err(e) = res {
460 rakrs_debug!(
461 "[{}] Failed to process packet: {:?}!",
462 to_address_token(address),
463 e
464 );
465 };
466 }
467
468 drop(rq);
469 } else {
470 rakrs_debug!(
471 true,
472 "[{}] Failed to parse frame packet!",
473 to_address_token(address)
474 );
475 }
476 }
477 NACK => {
478 // Validate this is a nack packet
479
480 if let Ok(nack) = Ack::read_from_slice(&$payload[..]) {
481 // The client acknowledges it did not recieve these packets
482 // We should resend them.
483 let mut sq = send_q.write().await;
484 let resend = sq.nack(nack);
485
486 if resend.len() > 0 {
487 for packet in resend {
488 if let Ok(buffer) = packet.write_to_bytes() {
489 if let Err(_) = sq.insert(buffer.as_slice(), Reliability::Unreliable, true, Some(0)).await {
490 rakrs_debug!(
491 true,
492 "[{}] Failed to insert packet into send queue!",
493 to_address_token(address)
494 );
495 }
496 } else {
497 rakrs_debug!(
498 true,
499 "[{}] Failed to send packet to client (parsing failed)!",
500 to_address_token(address)
501 );
502 }
503 }
504 }
505 }
506 }
507 ACK => {
508 // first lets validate this is an ack packet
509 if let Ok(ack) = Ack::read_from_slice(&$payload[..]) {
510 // The client acknowledges it recieved these packets
511 // We should remove them from the queue.
512 let mut sq = send_q.write().await;
513 sq.ack(ack.clone());
514 drop(sq);
515 recv_q.lock().await.ack(ack);
516 }
517 }
518 _ => {
519 rakrs_debug!(
520 "[{}] Unknown RakNet packet recieved (Or packet is sent out of scope).",
521 to_address_token(address)
522 );
523 }
524 };
525 };
526 }
527
528 #[cfg(feature = "async_std")]
529 select! {
530 _ = disconnect.wait().fuse() => {
531 rakrs_debug!(true, "[{}] [task: net_recv] Connection has been closed due to closer!", to_address_token(address));
532 break;
533 }
534 res = net.recv().fuse() => {
535 match res {
536 Ok(payload) => {
537 handle_payload!(payload);
538 }
539 _ => continue,
540 }
541 }
542 };
543
544 #[cfg(feature = "async_tokio")]
545 select! {
546 _ = disconnect.wait() => {
547 rakrs_debug!(true, "[{}] [task: net_recv] Connection has been closed due to closer!", to_address_token(address));
548 break;
549 }
550 res = net.recv() => {
551 match res {
552 Some(payload) => {
553 handle_payload!(payload);
554 }
555 _ => continue,
556 }
557 }
558 };
559 }
560 });
561 }
562
563 pub(crate) async fn process_packet(
564 buffer: &[u8],
565 address: &SocketAddr,
566 sender: &Sender<Vec<u8>>,
567 send_q: &Arc<RwLock<SendQueue>>,
568 state: &Arc<Mutex<ConnectionState>>,
569 ) -> Result<bool, ()> {
570 if buffer.len() < 1 {
571 rakrs_debug!("[{}] Got packet: {}", to_address_token(*address), buffer[0]);
572 }
573 if let Ok(online_packet) = OnlinePacket::read_from_slice(&buffer) {
574 match online_packet {
575 OnlinePacket::ConnectedPing(pk) => {
576 let response = ConnectedPong {
577 ping_time: pk.time,
578 pong_time: current_epoch() as i64,
579 };
580 let mut q = send_q.write().await;
581 if let Ok(_) = q
582 .send_packet(response.into(), Reliability::Reliable, true)
583 .await
584 {
585 return Ok(false);
586 } else {
587 rakrs_debug!(
588 true,
589 "[{}] Failed to send ConnectedPong packet!",
590 to_address_token(*address)
591 );
592 return Err(());
593 }
594 }
595 OnlinePacket::ConnectedPong(_pk) => {
596 // do nothing rn
597 // TODO: add ping calculation
598 return Ok(false);
599 }
600 OnlinePacket::ConnectionRequest(pk) => {
601 let internal_ids = vec![
602 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19132),
603 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19133),
604 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19134),
605 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19135),
606 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 19136),
607 ];
608 let response = ConnectionAccept {
609 system_index: 0,
610 client_address: *address,
611 internal_ids,
612 request_time: pk.time,
613 timestamp: current_epoch() as i64,
614 };
615 let mut q = send_q.write().await;
616 *state.lock().await = ConnectionState::Connecting;
617 if let Ok(_) = q
618 .send_packet(response.clone().into(), Reliability::Reliable, true)
619 .await
620 {
621 return Ok(false);
622 } else {
623 rakrs_debug!(
624 true,
625 "[{}] Failed to send ConnectionAccept packet!",
626 to_address_token(*address)
627 );
628 return Err(());
629 }
630 }
631 OnlinePacket::Disconnect(_) => {
632 // Disconnect the client immediately.
633 // connection.disconnect("Client disconnected.", false);
634 return Ok(true);
635 }
636 OnlinePacket::LostConnection(_) => {
637 // Disconnect the client immediately.
638 // connection.disconnect("Client disconnected.", false);
639 rakrs_debug!(
640 "[{}] Client has lost connection, disconnecting client!",
641 to_address_token(*address)
642 );
643 return Ok(true);
644 }
645 OnlinePacket::NewConnection(_) => {
646 // if we are already connected, disconnect the client.
647 if *state.lock().await == ConnectionState::Connected {
648 rakrs_debug!(
649 true,
650 "[{}] Client is already connected, disconnecting client!",
651 to_address_token(*address)
652 );
653 return Ok(true);
654 }
655
656 *state.lock().await = ConnectionState::Connected;
657 return Ok(false);
658 }
659 _ => {
660 rakrs_debug!(
661 true,
662 "[{}] Forwarding packet to socket!\n{:?}",
663 to_address_token(*address),
664 buffer
665 );
666 if let Err(_) = sender.send(buffer.to_vec()).await {
667 rakrs_debug!(
668 "[{}] Failed to to forward packet to recv channel...",
669 to_address_token(*address)
670 );
671 return Err(());
672 }
673 return Ok(false);
674 }
675 }
676 } else if let Ok(_) = OfflinePacket::read_from_slice(&buffer) {
677 *state.lock().await = ConnectionState::Disconnecting;
678 rakrs_debug!(
679 true,
680 "[{}] Invalid protocol! Disconnecting client!",
681 to_address_token(*address)
682 );
683 return Err(());
684 }
685
686 rakrs_debug!(
687 true,
688 "[{}] Either Game-packet or unknown packet, sending buffer to client...",
689 to_address_token(*address)
690 );
691 if let Err(_) = sender.send(buffer.to_vec()).await {
692 rakrs_debug!(
693 "[{}] Failed to to forward packet to recv channel...",
694 to_address_token(*address)
695 );
696 return Err(());
697 }
698 Ok(false)
699 }
700
701 /// This method is used to recieve packets from the client connection.
702 /// Packets that are recieved here are packets sent by the peer expected
703 /// to be handled by the server.
704 ///
705 /// This is where you would handle your packets from the client.
706 ///
707 /// # Example
708 /// This is a snippet of code you would use after you've accepted a connection from the server with
709 /// [`Listener::accept()`].
710 ///
711 /// ```ignore
712 /// use rakrs::connection::Connection;
713 ///
714 /// async fn handle(mut conn: Connection) {
715 /// loop {
716 /// // get a packet sent from the client
717 /// if let Ok(pk) = conn.recv().await {
718 /// println!("Got a connection packet {:?} ", pk);
719 /// }
720 /// }
721 /// }
722 /// ```
723 pub async fn recv(&mut self) -> Result<Vec<u8>, RecvError> {
724 #[allow(unused_mut)]
725 let mut q = self.internal_net_recv.as_ref().lock().await;
726 match q.recv().await {
727 #[cfg(feature = "async_std")]
728 Ok(packet) => Ok(packet),
729 #[cfg(feature = "async_std")]
730 Err(e) => Err(e),
731 #[cfg(feature = "async_tokio")]
732 Some(packet) => Ok(packet),
733 #[cfg(feature = "async_tokio")]
734 None => Err(RecvError::Closed),
735 }
736 }
737
738 // /// Handle a RakNet Event. These are sent as they happen.
739 // ///
740 // /// EG:
741 // /// ```ignore
742 // /// let conn: Connection = Connection::new();
743 // ///
744 // /// while let Some((event, responder)) = conn.recv_ev {
745 // /// match event {
746 // /// ServerEvent::SetMtuSize(mtu) => {
747 // /// println!("client updated mtu!");
748 // /// responder.send(ServerEventResponse::Acknowledge);
749 // /// }
750 // /// }
751 // /// }
752 // /// ```
753 // pub async fn recv_ev(
754 // &mut self,
755 // ) -> Result<(ServerEvent, oneshot::Sender<ServerEventResponse>), ConnectionError> {
756 // match self.evt_receiver.recv().await {
757 // Some((server_event, event_responder)) => {
758 // return Ok((server_event, event_responder));
759 // }
760 // None => {
761 // if self.disconnect.is_closed() {
762 // return Err(ConnectionError::Closed);
763 // }
764 // return Err(ConnectionError::EventDispatchError);
765 // }
766 // }
767 // }
768
769 pub async fn is_closed(&self) -> bool {
770 !self.state.lock().await.is_available()
771 }
772
773 /// This method is used to send payloads to the connection. This method internally
774 /// will encode your payload into a RakNet packet, and send it to the client.
775 ///
776 /// # Example
777 /// This is a snippet of code you would use when you need to send a payload to the client.
778 /// ```ignore
779 /// use rakrs::connection::Connection;
780 ///
781 /// async fn send_payload(mut conn: Connection) {
782 /// conn.send(&[0x01, 0x02, 0x03], true).await.unwrap();
783 /// }
784 /// ```
785 pub async fn send(&self, buffer: &[u8], immediate: bool) -> Result<(), SendQueueError> {
786 let mut q = self.send_queue.write().await;
787 rakrs_debug!("Send call, sending to write");
788 if let Err(e) = q
789 .insert(buffer, Reliability::ReliableOrd, immediate, Some(0))
790 .await
791 {
792 return Err(e);
793 }
794 Ok(())
795 }
796
797 /// This method should be used when you are ready to disconnect the client.
798 /// this method will attempt to send a disconnect packet to the client, and
799 /// then close the connection.
800 pub async fn close(&self) {
801 rakrs_debug!(
802 true,
803 "[{}] Dropping connection!",
804 to_address_token(self.address)
805 );
806 if let Err(_) = self
807 .send(
808 &OnlinePacket::Disconnect(Disconnect {})
809 .write_to_bytes()
810 .unwrap()
811 .as_slice(),
812 true,
813 )
814 .await
815 {
816 rakrs_debug!(
817 true,
818 "[{}] Failed to send disconnect packet when closing!",
819 to_address_token(self.address)
820 );
821 }
822 let tasks = self.tasks.clone();
823
824 for task in tasks.lock().await.drain(..) {
825 #[cfg(feature = "async_std")]
826 task.cancel().await;
827 #[cfg(feature = "async_tokio")]
828 task.abort();
829 }
830 }
831}
832
833impl Drop for Connection {
834 fn drop(&mut self) {
835 futures_executor::block_on(async {
836 if self.is_closed().await {
837 return;
838 }
839 self.close().await;
840 });
841 }
842}