1use rand::Rng;
2use std::{
3 net::SocketAddr,
4 sync::{
5 atomic::{AtomicI64, AtomicU8},
6 Arc,
7 },
8};
9use tokio::{
10 net::UdpSocket,
11 sync::{mpsc::channel, Mutex, Notify, RwLock},
12 time::{sleep, timeout},
13};
14
15use crate::{
16 error::{RaknetError, Result},
17 raknet_log_error, raknet_log_info,
18};
19use std::sync::atomic::{AtomicBool, Ordering};
20use tokio::sync::mpsc::{Receiver, Sender};
21
22use crate::{arq::*, packet::*, raknet_log_debug, utils::*};
23
24pub struct RaknetSocket {
26 local_addr: SocketAddr,
27 peer_addr: SocketAddr,
28 user_data_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
29 recvq: Arc<Mutex<RecvQ>>,
30 sendq: Arc<RwLock<SendQ>>,
31 close_notifier: Arc<tokio::sync::Semaphore>,
32 last_heartbeat_time: Arc<AtomicI64>,
33 enable_loss: Arc<AtomicBool>,
34 loss_rate: Arc<AtomicU8>,
35 incomming_notifier: Arc<Notify>,
36 sender: Sender<(Vec<u8>, SocketAddr, bool, u8)>,
37 drop_notifier: Arc<Notify>,
38 raknet_version: u8,
39}
40
41impl RaknetSocket {
42 pub async fn from(
46 addr: &SocketAddr,
47 s: &Arc<UdpSocket>,
48 receiver: Receiver<Vec<u8>>,
49 mtu: u16,
50 collecter: Arc<Mutex<Sender<SocketAddr>>>,
51 raknet_version: u8,
52 ) -> Self {
53 let (user_data_sender, user_data_receiver) = channel::<Vec<u8>>(100);
54 let (sender_sender, sender_receiver) = channel::<(Vec<u8>, SocketAddr, bool, u8)>(10);
55
56 let ret = RaknetSocket {
57 peer_addr: *addr,
58 local_addr: s.local_addr().unwrap(),
59 user_data_receiver: Arc::new(Mutex::new(user_data_receiver)),
60 recvq: Arc::new(Mutex::new(RecvQ::new())),
61 sendq: Arc::new(RwLock::new(SendQ::new(mtu))),
62 close_notifier: Arc::new(tokio::sync::Semaphore::new(0)),
63 last_heartbeat_time: Arc::new(AtomicI64::new(cur_timestamp_millis())),
64 enable_loss: Arc::new(AtomicBool::new(false)),
65 loss_rate: Arc::new(AtomicU8::new(0)),
66 incomming_notifier: Arc::new(Notify::new()),
67 sender: sender_sender,
68 drop_notifier: Arc::new(Notify::new()),
69 raknet_version,
70 };
71 ret.start_receiver(s, receiver, user_data_sender);
72 ret.start_tick(s, Some(collecter));
73 ret.start_sender(s, sender_receiver);
74 ret.drop_watcher().await;
75 ret
76 }
77
78 async fn handle(
79 frame: &FrameSetPacket,
80 peer_addr: &SocketAddr,
81 local_addr: &SocketAddr,
82 sendq: &RwLock<SendQ>,
83 user_data_sender: &Sender<Vec<u8>>,
84 incomming_notify: &Notify,
85 ) -> Result<bool> {
86 match PacketID::from(frame.data[0])? {
87 PacketID::ConnectionRequest => {
88 let packet = read_packet_connection_request(frame.data.as_slice())?;
89
90 let packet_reply = ConnectionRequestAccepted {
91 client_address: *peer_addr,
92 system_index: 0,
93 request_timestamp: packet.time,
94 accepted_timestamp: cur_timestamp_millis(),
95 };
96
97 let buf = write_packet_connection_request_accepted(&packet_reply)?;
98 sendq
99 .write()
100 .await
101 .insert(Reliability::ReliableOrdered, &buf)?;
102 }
103 PacketID::ConnectionRequestAccepted => {
104 let packet = read_packet_connection_request_accepted(frame.data.as_slice())?;
105
106 let packet_reply = NewIncomingConnection {
107 server_address: *local_addr,
108 request_timestamp: packet.request_timestamp,
109 accepted_timestamp: cur_timestamp_millis(),
110 };
111
112 let mut sendq = sendq.write().await;
113
114 let buf = write_packet_new_incomming_connection(&packet_reply)?;
115 sendq.insert(Reliability::ReliableOrdered, &buf)?;
116
117 let ping = ConnectedPing {
118 client_timestamp: cur_timestamp_millis(),
119 };
120
121 let buf = write_packet_connected_ping(&ping)?;
123 sendq.insert(Reliability::Unreliable, &buf)?;
124 raknet_log_debug!("incomming notified");
125 incomming_notify.notify_one();
126 }
127 PacketID::NewIncomingConnection => {
128 let _packet = read_packet_new_incomming_connection(frame.data.as_slice())?;
129 }
130 PacketID::ConnectedPing => {
131 let packet = read_packet_connected_ping(frame.data.as_slice())?;
132
133 let packet_reply = ConnectedPong {
134 client_timestamp: packet.client_timestamp,
135 server_timestamp: cur_timestamp_millis(),
136 };
137
138 let buf = write_packet_connected_pong(&packet_reply)?;
139 sendq.write().await.insert(Reliability::Unreliable, &buf)?;
140 }
141 PacketID::ConnectedPong => {}
142 PacketID::Disconnect => {
143 return Ok(false);
144 }
145 _ => {
146 match user_data_sender.send(frame.data.clone()).await {
147 Ok(_) => {}
148 Err(_) => {
149 return Ok(false);
150 }
151 };
152 }
153 }
154 Ok(true)
155 }
156
157 async fn sendto(
158 s: &UdpSocket,
159 buf: &[u8],
160 target: &SocketAddr,
161 enable_loss: bool,
162 loss_rate: u8,
163 ) -> tokio::io::Result<usize> {
164 if enable_loss {
165 let mut rng = rand::thread_rng();
166 let i: u8 = rng.gen_range(0..11);
167 if i > loss_rate {
168 raknet_log_debug!("loss packet");
169 return Ok(0);
170 }
171 }
172 match s.send_to(buf, target).await {
173 Ok(p) => Ok(p),
174 Err(e) => {
175 raknet_log_error!("udp socket send_to error : {}", e);
176 Ok(0)
177 }
178 }
179 }
180
181 pub async fn connect(addr: &SocketAddr) -> Result<Self> {
194 Self::connect_with_version(addr, RAKNET_PROTOCOL_VERSION).await
195 }
196
197 pub async fn connect_with_version(addr: &SocketAddr, raknet_version: u8) -> Result<Self> {
198 let guid: u64 = rand::random();
199
200 let s = match UdpSocket::bind("0.0.0.0:0").await {
201 Ok(p) => p,
202 Err(_) => return Err(RaknetError::BindAdressError),
203 };
204
205 let packet = OpenConnectionRequest1 {
206 magic: true,
207 protocol_version: raknet_version,
208 mtu_size: RAKNET_CLIENT_MTU,
209 };
210
211 let buf = write_packet_connection_open_request_1(&packet).unwrap();
212
213 let mut remote_addr: SocketAddr;
214 let mut reply1_size: usize;
215
216 let mut reply1_buf = [0u8; 2048];
217
218 loop {
219 match s.send_to(&buf, addr).await {
220 Ok(p) => p,
221 Err(e) => {
222 raknet_log_error!("udp socket sendto error {}", e);
223 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
224 continue;
225 }
226 };
227 let (size, src) = match match timeout(
228 std::time::Duration::from_secs(2),
229 s.recv_from(&mut reply1_buf),
230 )
231 .await
232 {
233 Ok(p) => p,
234 Err(_) => {
235 raknet_log_debug!("wait reply1 timeout");
236 continue;
237 }
238 } {
239 Ok(p) => p,
240 Err(e) => {
241 raknet_log_error!("recvfrom error : {}", e);
242 continue;
243 }
244 };
245
246 remote_addr = src;
247 reply1_size = size;
248
249 if reply1_buf[0] != PacketID::OpenConnectionReply1.to_u8() {
250 if reply1_buf[0] == PacketID::IncompatibleProtocolVersion.to_u8() {
251 let _packet = match read_packet_incompatible_protocol_version(&buf[..size]) {
252 Ok(p) => p,
253 Err(_) => return Err(RaknetError::NotSupportVersion),
254 };
255
256 return Err(RaknetError::NotSupportVersion);
257 } else {
258 raknet_log_debug!("incorrect reply1");
259 continue;
260 }
261 }
262
263 break;
264 }
265
266 let reply1 = match read_packet_connection_open_reply_1(&reply1_buf[..reply1_size]) {
267 Ok(p) => p,
268 Err(_) => return Err(RaknetError::PacketParseError),
269 };
270
271 let packet = OpenConnectionRequest2 {
272 magic: true,
273 address: remote_addr,
274 mtu: reply1.mtu_size,
275 guid,
276 };
277
278 let buf = write_packet_connection_open_request_2(&packet).unwrap();
279
280 loop {
281 match s.send_to(&buf, addr).await {
282 Ok(_) => {}
283 Err(e) => {
284 raknet_log_error!("udp socket sendto error {}", e);
285 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
286 continue;
287 }
288 };
289
290 let mut buf = [0u8; 2048];
291 let (size, _) =
292 match match timeout(std::time::Duration::from_secs(2), s.recv_from(&mut buf)).await
293 {
294 Ok(p) => p,
295 Err(_) => {
296 raknet_log_debug!("wait reply2 timeout");
297 continue;
298 }
299 } {
300 Ok(p) => p,
301 Err(e) => {
302 raknet_log_error!("recvfrom error : {}", e);
303 continue;
304 }
305 };
306
307 if buf[0] == PacketID::OpenConnectionReply1.to_u8() {
308 raknet_log_debug!("repeat receive reply1");
309 continue;
310 }
311
312 if buf[0] != PacketID::OpenConnectionReply2.to_u8() {
313 raknet_log_debug!("incorrect reply2");
314 continue;
315 }
316
317 let _reply2 = match read_packet_connection_open_reply_2(&buf[..size]) {
318 Ok(p) => p,
319 Err(_) => return Err(RaknetError::PacketParseError),
320 };
321
322 break;
323 }
324
325 let sendq = Arc::new(RwLock::new(SendQ::new(reply1.mtu_size)));
326
327 let packet = ConnectionRequest {
328 guid,
329 time: cur_timestamp_millis(),
330 use_encryption: 0x00,
331 };
332
333 let buf = write_packet_connection_request(&packet).unwrap();
334
335 let mut sendq1 = sendq.write().await;
336 sendq1.insert(Reliability::ReliableOrdered, &buf)?;
337 std::mem::drop(sendq1);
338
339 let (user_data_sender, user_data_receiver) = channel::<Vec<u8>>(100);
340
341 let (sender, receiver) = channel::<Vec<u8>>(100);
342
343 let s = Arc::new(s);
344
345 let recv_s = s.clone();
346 let connected = Arc::new(tokio::sync::Semaphore::new(0));
347 let connected_s = connected.clone();
348 let peer_addr = *addr;
349 tokio::spawn(async move {
350 let mut buf = [0u8; 2048];
351 loop {
352 if connected_s.is_closed() {
353 break;
354 }
355 let (size, _) = match match timeout(
356 std::time::Duration::from_secs(10),
357 recv_s.recv_from(&mut buf),
358 )
359 .await
360 {
361 Ok(p) => p,
362 Err(_) => continue,
363 } {
364 Ok(p) => p,
365 Err(e) => {
366 #[cfg(target_family = "windows")]
367 if e.raw_os_error().unwrap() == 10040 {
368 raknet_log_debug!("recv_from error : {}", e.raw_os_error().unwrap());
370 continue;
371 }
372 raknet_log_debug!("recv_from error : {}", e);
373 connected_s.close();
374 break;
375 }
376 };
377
378 match sender.send(buf[..size].to_vec()).await {
379 Ok(_) => {}
380 Err(e) => {
381 raknet_log_debug!("channel send error : {}", e);
382 connected_s.close();
383 break;
384 }
385 };
386 }
387 raknet_log_debug!("{} , recv_from finished", peer_addr);
388 });
389
390 let (sender_sender, sender_receiver) = channel::<(Vec<u8>, SocketAddr, bool, u8)>(10);
391
392 let ret = RaknetSocket {
393 peer_addr: *addr,
394 local_addr: s.local_addr().unwrap(),
395 user_data_receiver: Arc::new(Mutex::new(user_data_receiver)),
396 recvq: Arc::new(Mutex::new(RecvQ::new())),
397 sendq,
398 close_notifier: connected,
399 last_heartbeat_time: Arc::new(AtomicI64::new(cur_timestamp_millis())),
400 enable_loss: Arc::new(AtomicBool::new(false)),
401 loss_rate: Arc::new(AtomicU8::new(0)),
402 incomming_notifier: Arc::new(Notify::new()),
403 sender: sender_sender,
404 drop_notifier: Arc::new(Notify::new()),
405 raknet_version,
406 };
407
408 ret.start_receiver(&s, receiver, user_data_sender);
409 ret.start_tick(&s, None);
410 ret.start_sender(&s, sender_receiver);
411 ret.drop_watcher().await;
412
413 raknet_log_debug!("wait incomming notify");
414 ret.incomming_notifier.notified().await;
415
416 Ok(ret)
417 }
418
419 fn start_receiver(
420 &self,
421 s: &Arc<UdpSocket>,
422 mut receiver: Receiver<Vec<u8>>,
423 user_data_sender: Sender<Vec<u8>>,
424 ) {
425 let connected = self.close_notifier.clone();
426 let peer_addr = self.peer_addr;
427 let local_addr = self.local_addr;
428 let sendq = self.sendq.clone();
429 let recvq = self.recvq.clone();
430 let last_heartbeat_time = self.last_heartbeat_time.clone();
431 let incomming_notify = self.incomming_notifier.clone();
432 let s = s.clone();
433 let enable_loss = self.enable_loss.clone();
434 let loss_rate = self.loss_rate.clone();
435 tokio::spawn(async move {
436 loop {
437 if connected.is_closed() {
438 let mut recvq = recvq.lock().await;
439 for f in recvq.flush(&peer_addr) {
440 RaknetSocket::handle(
441 &f,
442 &peer_addr,
443 &local_addr,
444 &sendq,
445 &user_data_sender,
446 &incomming_notify,
447 )
448 .await
449 .unwrap();
450 }
451 break;
452 }
453
454 let buf = match receiver.recv().await {
455 Some(buf) => buf,
456 None => {
457 raknet_log_debug!("channel receiver finished");
458 connected.close();
459 break;
460 }
461 };
462
463 last_heartbeat_time.store(cur_timestamp_millis(), Ordering::Relaxed);
464
465 if PacketID::from(buf[0]).unwrap() == PacketID::Disconnect {
466 connected.close();
467 break;
468 }
469
470 if buf[0] == PacketID::Ack.to_u8() {
471 let mut sendq = sendq.write().await;
473 let ack = read_packet_ack(&buf).unwrap();
474 for i in 0..ack.record_count {
475 if ack.sequences[i as usize].0 == ack.sequences[i as usize].1 {
476 sendq.ack(ack.sequences[i as usize].0, cur_timestamp_millis());
477 } else {
478 for i in ack.sequences[i as usize].0..ack.sequences[i as usize].1 + 1 {
479 sendq.ack(i, cur_timestamp_millis());
480 }
481 }
482 }
483 continue;
484 }
485
486 if buf[0] == PacketID::Nack.to_u8() {
487 let nack = read_packet_nack(&buf).unwrap();
489
490 let mut sendq = sendq.write().await;
491
492 for i in 0..nack.record_count {
493 if nack.sequences[i as usize].0 == nack.sequences[i as usize].1 {
494 sendq.nack(nack.sequences[i as usize].0, cur_timestamp_millis());
495 } else {
496 for i in nack.sequences[i as usize].0..nack.sequences[i as usize].1 + 1
497 {
498 sendq.nack(i, cur_timestamp_millis());
499 }
500 }
501 }
502 continue;
503 }
504
505 if buf[0] >= PacketID::FrameSetPacketBegin.to_u8()
507 && buf[0] <= PacketID::FrameSetPacketEnd.to_u8()
508 {
509 let frames = FrameVec::new(buf.clone()).unwrap();
510
511 let mut recvq = recvq.lock().await;
512 let mut is_break = false;
513 for frame in frames.frames {
514 recvq.insert(frame).unwrap();
515
516 for f in recvq.flush(&peer_addr) {
517 if !RaknetSocket::handle(
518 &f,
519 &peer_addr,
520 &local_addr,
521 &sendq,
522 &user_data_sender,
523 &incomming_notify,
524 )
525 .await
526 .unwrap()
527 {
528 raknet_log_info!("handle over");
529 connected.close();
530 is_break = true;
531 };
532 }
533
534 if is_break {
535 break;
536 }
537 }
538
539 let acks = recvq.get_ack();
541
542 if !acks.is_empty() {
543 let packet = Ack {
544 record_count: acks.len() as u16,
545 sequences: acks,
546 };
547
548 let buf = write_packet_ack(&packet).unwrap();
549 RaknetSocket::sendto(
550 &s,
551 &buf,
552 &peer_addr,
553 enable_loss.load(Ordering::Relaxed),
554 loss_rate.load(Ordering::Relaxed),
555 )
556 .await
557 .unwrap();
558 }
559 } else {
560 raknet_log_debug!("unknown packetid : {}", buf[0]);
561 }
562 }
563
564 raknet_log_debug!("{} , receiver finished", peer_addr);
565 });
566 }
567
568 fn start_sender(
569 &self,
570 s: &Arc<UdpSocket>,
571 mut receiver: Receiver<(Vec<u8>, SocketAddr, bool, u8)>,
572 ) {
573 let connected = self.close_notifier.clone();
574 let s = s.clone();
575 tokio::spawn(async move {
576 loop {
577 tokio::select! {
578 a = receiver.recv() => {
579 match a {
580 Some(p) => {
581 match RaknetSocket::sendto(&s, &p.0, &p.1, p.2, p.3).await{
582 Ok(_) => {},
583 Err(e) => {
584 raknet_log_debug!("sendto error : {}" , e);
585 break;
586 },
587 }
588 },
589 None => {
590 raknet_log_debug!("sender worker's receiver channel closed");
591 break;
592 },
593 };
594 },
595 _ = connected.acquire() => {
596 raknet_log_debug!("sender close notified");
597 break;
598 }
599 }
600 }
601
602 raknet_log_debug!("sender worker closed");
603 });
604 }
605
606 fn start_tick(&self, s: &Arc<UdpSocket>, collecter: Option<Arc<Mutex<Sender<SocketAddr>>>>) {
607 let connected = self.close_notifier.clone();
608 let s = s.clone();
609 let peer_addr = self.peer_addr;
610 let sendq = self.sendq.clone();
611 let recvq = self.recvq.clone();
612 let mut last_monitor_tick = cur_timestamp_millis();
613 let enable_loss = self.enable_loss.clone();
614 let loss_rate = self.loss_rate.clone();
615 let last_heartbeat_time = self.last_heartbeat_time.clone();
616 tokio::spawn(async move {
617 loop {
618 sleep(std::time::Duration::from_millis(
619 SendQ::DEFAULT_TIMEOUT_MILLS as u64,
620 ))
621 .await;
622
623 let mut recvq = recvq.lock().await;
625 let nacks = recvq.get_nack();
626 if !nacks.is_empty() {
627 let nack = Nack {
628 record_count: nacks.len() as u16,
629 sequences: nacks,
630 };
631
632 let buf = write_packet_nack(&nack).unwrap();
633 RaknetSocket::sendto(
634 &s,
635 &buf,
636 &peer_addr,
637 enable_loss.load(Ordering::Relaxed),
638 loss_rate.load(Ordering::Relaxed),
639 )
640 .await
641 .unwrap();
642 }
643
644 let mut sendq = sendq.write().await;
646 for f in sendq.flush(cur_timestamp_millis(), &peer_addr) {
647 let data = f.serialize().unwrap();
648 RaknetSocket::sendto(
649 &s,
650 &data,
651 &peer_addr,
652 enable_loss.load(Ordering::Relaxed),
653 loss_rate.load(Ordering::Relaxed),
654 )
655 .await
656 .unwrap();
657 }
658
659 if cur_timestamp_millis() - last_monitor_tick > 10000 {
661 raknet_log_debug!("peer addr : {} , sendq size : {} , sentq size : {} , rto : {} , recvq size : {} , recvq fragment size : {} , ordered queue size : {} - {:?}" ,
662 peer_addr,
663 sendq.get_reliable_queue_size(),
664 sendq.get_sent_queue_size(),
665 sendq.get_rto(),
666 recvq.get_size(),
667 recvq.get_fragment_queue_size(),
668 recvq.get_ordered_packet(),
669 recvq.get_ordered_keys()
670 );
671 last_monitor_tick = cur_timestamp_millis();
672 }
673
674 if cur_timestamp_millis() - last_heartbeat_time.load(Ordering::Relaxed)
676 > RECEIVE_TIMEOUT
677 {
678 raknet_log_debug!("recv timeout");
679 connected.close();
680 break;
681 }
682
683 if connected.is_closed() {
684 for _ in 0..10 {
685 RaknetSocket::sendto(
686 &s,
687 &[PacketID::Disconnect.to_u8()],
688 &peer_addr,
689 enable_loss.load(Ordering::Relaxed),
690 loss_rate.load(Ordering::Relaxed),
691 )
692 .await
693 .unwrap();
694 }
695 break;
696 }
697 }
698
699 match collecter {
700 Some(p) => {
701 match p.lock().await.send(peer_addr).await {
702 Ok(_) => {}
703 Err(e) => {
704 raknet_log_error!("channel send error : {}", e);
705 }
706 };
707 }
708 None => {}
709 }
710 raknet_log_debug!("{} , ticker finished", peer_addr);
711 });
712 }
713
714 pub async fn close(&self) -> Result<()> {
724 if !self.close_notifier.is_closed() {
725 self.sendq
726 .write()
727 .await
728 .insert(Reliability::Reliable, &[PacketID::Disconnect.to_u8()])?;
729 self.close_notifier.close();
730 }
731 Ok(())
732 }
733
734 pub async fn ping(addr: &SocketAddr) -> Result<(i64, String)> {
742 let s = match UdpSocket::bind("0.0.0.0:0").await {
743 Ok(p) => p,
744 Err(_) => return Err(RaknetError::BindAdressError),
745 };
746
747 loop {
748 let packet = PacketUnconnectedPing {
749 time: cur_timestamp_millis(),
750 magic: true,
751 guid: rand::random(),
752 };
753
754 let buf = write_packet_ping(&packet)?;
755
756 match s.send_to(buf.as_slice(), addr).await {
757 Ok(_) => {}
758 Err(e) => {
759 raknet_log_error!("udp socket sendto error {}", e);
760 return Err(RaknetError::SocketError);
761 }
762 };
763
764 let mut buf = [0u8; 1024];
765
766 match match tokio::time::timeout(
767 std::time::Duration::from_secs(5),
768 s.recv_from(&mut buf),
769 )
770 .await
771 {
772 Ok(p) => p,
773 Err(_) => {
774 continue;
775 }
776 } {
777 Ok(p) => p,
778 Err(_) => return Err(RaknetError::SocketError),
779 };
780
781 if let Ok(p) = read_packet_pong(&buf) {
782 return Ok((p.time - packet.time, p.motd));
783 };
784
785 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
786 }
787 }
788
789 pub async fn send(&self, buf: &[u8], r: Reliability) -> Result<()> {
801 if buf.is_empty() {
802 return Err(RaknetError::PacketHeaderError);
803 }
804
805 if buf[0] != 0xfe {
806 return Err(RaknetError::PacketHeaderError);
807 }
808
809 if self.close_notifier.is_closed() {
810 return Err(RaknetError::ConnectionClosed);
811 }
812
813 let mut sendq = self.sendq.write().await;
815 sendq.insert(r, buf)?;
816 let sender = self.sender.clone();
817 for f in sendq.flush(cur_timestamp_millis(), &self.peer_addr) {
818 let data = f.serialize().unwrap();
819 sender
820 .send((
821 data,
822 self.peer_addr,
823 self.enable_loss.load(Ordering::Relaxed),
824 self.loss_rate.load(Ordering::Relaxed),
825 ))
826 .await
827 .unwrap();
828 }
829 Ok(())
830 }
831
832 pub async fn flush(&self) -> Result<()> {
841 loop {
842 {
843 if self.close_notifier.is_closed() {
844 return Err(RaknetError::ConnectionClosed);
845 }
846 let sendq = self.sendq.read().await;
847 if sendq.is_empty() {
848 return Ok(());
849 }
850 }
851 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
852 }
853 }
854
855 pub async fn recv(&self) -> Result<Vec<u8>> {
866 match self.user_data_receiver.lock().await.recv().await {
867 Some(p) => Ok(p),
868 None => {
869 if self.close_notifier.is_closed() {
870 return Err(RaknetError::ConnectionClosed);
871 }
872 Err(RaknetError::SocketError)
873 }
874 }
875 }
876
877 pub fn peer_addr(&self) -> Result<SocketAddr> {
885 Ok(self.peer_addr)
886 }
887
888 pub fn local_addr(&self) -> Result<SocketAddr> {
896 Ok(self.local_addr)
897 }
898
899 pub fn raknet_version(&self) -> Result<u8> {
901 Ok(self.raknet_version)
902 }
903
904 pub fn set_loss_rate(&mut self, stage: u8) {
914 self.enable_loss.store(true, Ordering::Relaxed);
915 self.loss_rate.store(stage, Ordering::Relaxed);
916 }
917
918 async fn drop_watcher(&self) {
919 let close_notifier = self.close_notifier.clone();
920 let drop_notifier = self.drop_notifier.clone();
921 tokio::spawn(async move {
922 raknet_log_debug!("socket drop watcher start");
923 drop_notifier.notify_one();
924
925 drop_notifier.notified().await;
926
927 if close_notifier.is_closed() {
928 raknet_log_debug!("socket close notifier closed");
929 return;
930 }
931
932 close_notifier.close();
933
934 raknet_log_debug!("socket drop watcher closed");
935 });
936
937 self.drop_notifier.notified().await;
938 }
939}
940
941impl Drop for RaknetSocket {
942 fn drop(&mut self) {
943 self.drop_notifier.notify_one();
944 }
945}