rust_raknet/
socket.rs

1use rand::Rng;
2use std::{
3    net::SocketAddr,
4    sync::{
5        atomic::{AtomicI64, AtomicU8},
6        Arc,
7    },
8};
9use tokio::{
10    net::UdpSocket,
11    sync::{mpsc::channel, Mutex, Notify, RwLock},
12    time::{sleep, timeout},
13};
14
15use crate::{
16    error::{RaknetError, Result},
17    raknet_log_error, raknet_log_info,
18};
19use std::sync::atomic::{AtomicBool, Ordering};
20use tokio::sync::mpsc::{Receiver, Sender};
21
22use crate::{arq::*, packet::*, raknet_log_debug, utils::*};
23
24/// Raknet socket wrapper with local and remote.
25pub struct RaknetSocket {
26    local_addr: SocketAddr,
27    peer_addr: SocketAddr,
28    user_data_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
29    recvq: Arc<Mutex<RecvQ>>,
30    sendq: Arc<RwLock<SendQ>>,
31    close_notifier: Arc<tokio::sync::Semaphore>,
32    last_heartbeat_time: Arc<AtomicI64>,
33    enable_loss: Arc<AtomicBool>,
34    loss_rate: Arc<AtomicU8>,
35    incomming_notifier: Arc<Notify>,
36    sender: Sender<(Vec<u8>, SocketAddr, bool, u8)>,
37    drop_notifier: Arc<Notify>,
38    raknet_version: u8,
39}
40
41impl RaknetSocket {
42    /// Create a Raknet Socket from a UDP socket with an established Raknet connection
43    ///
44    /// This method is used for RaknetListener, users of the library should not care about it.
45    pub async fn from(
46        addr: &SocketAddr,
47        s: &Arc<UdpSocket>,
48        receiver: Receiver<Vec<u8>>,
49        mtu: u16,
50        collecter: Arc<Mutex<Sender<SocketAddr>>>,
51        raknet_version: u8,
52    ) -> Self {
53        let (user_data_sender, user_data_receiver) = channel::<Vec<u8>>(100);
54        let (sender_sender, sender_receiver) = channel::<(Vec<u8>, SocketAddr, bool, u8)>(10);
55
56        let ret = RaknetSocket {
57            peer_addr: *addr,
58            local_addr: s.local_addr().unwrap(),
59            user_data_receiver: Arc::new(Mutex::new(user_data_receiver)),
60            recvq: Arc::new(Mutex::new(RecvQ::new())),
61            sendq: Arc::new(RwLock::new(SendQ::new(mtu))),
62            close_notifier: Arc::new(tokio::sync::Semaphore::new(0)),
63            last_heartbeat_time: Arc::new(AtomicI64::new(cur_timestamp_millis())),
64            enable_loss: Arc::new(AtomicBool::new(false)),
65            loss_rate: Arc::new(AtomicU8::new(0)),
66            incomming_notifier: Arc::new(Notify::new()),
67            sender: sender_sender,
68            drop_notifier: Arc::new(Notify::new()),
69            raknet_version,
70        };
71        ret.start_receiver(s, receiver, user_data_sender);
72        ret.start_tick(s, Some(collecter));
73        ret.start_sender(s, sender_receiver);
74        ret.drop_watcher().await;
75        ret
76    }
77
78    async fn handle(
79        frame: &FrameSetPacket,
80        peer_addr: &SocketAddr,
81        local_addr: &SocketAddr,
82        sendq: &RwLock<SendQ>,
83        user_data_sender: &Sender<Vec<u8>>,
84        incomming_notify: &Notify,
85    ) -> Result<bool> {
86        match PacketID::from(frame.data[0])? {
87            PacketID::ConnectionRequest => {
88                let packet = read_packet_connection_request(frame.data.as_slice())?;
89
90                let packet_reply = ConnectionRequestAccepted {
91                    client_address: *peer_addr,
92                    system_index: 0,
93                    request_timestamp: packet.time,
94                    accepted_timestamp: cur_timestamp_millis(),
95                };
96
97                let buf = write_packet_connection_request_accepted(&packet_reply)?;
98                sendq
99                    .write()
100                    .await
101                    .insert(Reliability::ReliableOrdered, &buf)?;
102            }
103            PacketID::ConnectionRequestAccepted => {
104                let packet = read_packet_connection_request_accepted(frame.data.as_slice())?;
105
106                let packet_reply = NewIncomingConnection {
107                    server_address: *local_addr,
108                    request_timestamp: packet.request_timestamp,
109                    accepted_timestamp: cur_timestamp_millis(),
110                };
111
112                let mut sendq = sendq.write().await;
113
114                let buf = write_packet_new_incomming_connection(&packet_reply)?;
115                sendq.insert(Reliability::ReliableOrdered, &buf)?;
116
117                let ping = ConnectedPing {
118                    client_timestamp: cur_timestamp_millis(),
119                };
120
121                //i dont know why incomming packet after always follow a connected ping packet in minecraft bedrock 1.18.12.
122                let buf = write_packet_connected_ping(&ping)?;
123                sendq.insert(Reliability::Unreliable, &buf)?;
124                raknet_log_debug!("incomming notified");
125                incomming_notify.notify_one();
126            }
127            PacketID::NewIncomingConnection => {
128                let _packet = read_packet_new_incomming_connection(frame.data.as_slice())?;
129            }
130            PacketID::ConnectedPing => {
131                let packet = read_packet_connected_ping(frame.data.as_slice())?;
132
133                let packet_reply = ConnectedPong {
134                    client_timestamp: packet.client_timestamp,
135                    server_timestamp: cur_timestamp_millis(),
136                };
137
138                let buf = write_packet_connected_pong(&packet_reply)?;
139                sendq.write().await.insert(Reliability::Unreliable, &buf)?;
140            }
141            PacketID::ConnectedPong => {}
142            PacketID::Disconnect => {
143                return Ok(false);
144            }
145            _ => {
146                match user_data_sender.send(frame.data.clone()).await {
147                    Ok(_) => {}
148                    Err(_) => {
149                        return Ok(false);
150                    }
151                };
152            }
153        }
154        Ok(true)
155    }
156
157    async fn sendto(
158        s: &UdpSocket,
159        buf: &[u8],
160        target: &SocketAddr,
161        enable_loss: bool,
162        loss_rate: u8,
163    ) -> tokio::io::Result<usize> {
164        if enable_loss {
165            let mut rng = rand::thread_rng();
166            let i: u8 = rng.gen_range(0..11);
167            if i > loss_rate {
168                raknet_log_debug!("loss packet");
169                return Ok(0);
170            }
171        }
172        match s.send_to(buf, target).await {
173            Ok(p) => Ok(p),
174            Err(e) => {
175                raknet_log_error!("udp socket send_to error : {}", e);
176                Ok(0)
177            }
178        }
179    }
180
181    /// Connect to a Raknet server and return a Raknet socket
182    ///
183    /// # Example
184    /// ```ignore
185    /// let socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
186    /// socket.send(&[0xfe], Reliability::ReliableOrdered).await.unwrap();
187    /// let buf = socket.recv().await.unwrap();
188    /// if buf[0] == 0xfe{
189    ///    //do something
190    /// }
191    /// ```
192
193    pub async fn connect(addr: &SocketAddr) -> Result<Self> {
194        Self::connect_with_version(addr, RAKNET_PROTOCOL_VERSION).await
195    }
196
197    pub async fn connect_with_version(addr: &SocketAddr, raknet_version: u8) -> Result<Self> {
198        let guid: u64 = rand::random();
199
200        let s = match UdpSocket::bind("0.0.0.0:0").await {
201            Ok(p) => p,
202            Err(_) => return Err(RaknetError::BindAdressError),
203        };
204
205        let packet = OpenConnectionRequest1 {
206            magic: true,
207            protocol_version: raknet_version,
208            mtu_size: RAKNET_CLIENT_MTU,
209        };
210
211        let buf = write_packet_connection_open_request_1(&packet).unwrap();
212
213        let mut remote_addr: SocketAddr;
214        let mut reply1_size: usize;
215
216        let mut reply1_buf = [0u8; 2048];
217
218        loop {
219            match s.send_to(&buf, addr).await {
220                Ok(p) => p,
221                Err(e) => {
222                    raknet_log_error!("udp socket sendto error {}", e);
223                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
224                    continue;
225                }
226            };
227            let (size, src) = match match timeout(
228                std::time::Duration::from_secs(2),
229                s.recv_from(&mut reply1_buf),
230            )
231            .await
232            {
233                Ok(p) => p,
234                Err(_) => {
235                    raknet_log_debug!("wait reply1 timeout");
236                    continue;
237                }
238            } {
239                Ok(p) => p,
240                Err(e) => {
241                    raknet_log_error!("recvfrom error : {}", e);
242                    continue;
243                }
244            };
245
246            remote_addr = src;
247            reply1_size = size;
248
249            if reply1_buf[0] != PacketID::OpenConnectionReply1.to_u8() {
250                if reply1_buf[0] == PacketID::IncompatibleProtocolVersion.to_u8() {
251                    let _packet = match read_packet_incompatible_protocol_version(&buf[..size]) {
252                        Ok(p) => p,
253                        Err(_) => return Err(RaknetError::NotSupportVersion),
254                    };
255
256                    return Err(RaknetError::NotSupportVersion);
257                } else {
258                    raknet_log_debug!("incorrect reply1");
259                    continue;
260                }
261            }
262
263            break;
264        }
265
266        let reply1 = match read_packet_connection_open_reply_1(&reply1_buf[..reply1_size]) {
267            Ok(p) => p,
268            Err(_) => return Err(RaknetError::PacketParseError),
269        };
270
271        let packet = OpenConnectionRequest2 {
272            magic: true,
273            address: remote_addr,
274            mtu: reply1.mtu_size,
275            guid,
276        };
277
278        let buf = write_packet_connection_open_request_2(&packet).unwrap();
279
280        loop {
281            match s.send_to(&buf, addr).await {
282                Ok(_) => {}
283                Err(e) => {
284                    raknet_log_error!("udp socket sendto error {}", e);
285                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
286                    continue;
287                }
288            };
289
290            let mut buf = [0u8; 2048];
291            let (size, _) =
292                match match timeout(std::time::Duration::from_secs(2), s.recv_from(&mut buf)).await
293                {
294                    Ok(p) => p,
295                    Err(_) => {
296                        raknet_log_debug!("wait reply2 timeout");
297                        continue;
298                    }
299                } {
300                    Ok(p) => p,
301                    Err(e) => {
302                        raknet_log_error!("recvfrom error : {}", e);
303                        continue;
304                    }
305                };
306
307            if buf[0] == PacketID::OpenConnectionReply1.to_u8() {
308                raknet_log_debug!("repeat receive reply1");
309                continue;
310            }
311
312            if buf[0] != PacketID::OpenConnectionReply2.to_u8() {
313                raknet_log_debug!("incorrect reply2");
314                continue;
315            }
316
317            let _reply2 = match read_packet_connection_open_reply_2(&buf[..size]) {
318                Ok(p) => p,
319                Err(_) => return Err(RaknetError::PacketParseError),
320            };
321
322            break;
323        }
324
325        let sendq = Arc::new(RwLock::new(SendQ::new(reply1.mtu_size)));
326
327        let packet = ConnectionRequest {
328            guid,
329            time: cur_timestamp_millis(),
330            use_encryption: 0x00,
331        };
332
333        let buf = write_packet_connection_request(&packet).unwrap();
334
335        let mut sendq1 = sendq.write().await;
336        sendq1.insert(Reliability::ReliableOrdered, &buf)?;
337        std::mem::drop(sendq1);
338
339        let (user_data_sender, user_data_receiver) = channel::<Vec<u8>>(100);
340
341        let (sender, receiver) = channel::<Vec<u8>>(100);
342
343        let s = Arc::new(s);
344
345        let recv_s = s.clone();
346        let connected = Arc::new(tokio::sync::Semaphore::new(0));
347        let connected_s = connected.clone();
348        let peer_addr = *addr;
349        tokio::spawn(async move {
350            let mut buf = [0u8; 2048];
351            loop {
352                if connected_s.is_closed() {
353                    break;
354                }
355                let (size, _) = match match timeout(
356                    std::time::Duration::from_secs(10),
357                    recv_s.recv_from(&mut buf),
358                )
359                .await
360                {
361                    Ok(p) => p,
362                    Err(_) => continue,
363                } {
364                    Ok(p) => p,
365                    Err(e) => {
366                        #[cfg(target_family = "windows")]
367                        if e.raw_os_error().unwrap() == 10040 {
368                            // https://docs.microsoft.com/zh-CN/troubleshoot/windows-server/networking/wsaemsgsize-error-10040-in-winsock-2
369                            raknet_log_debug!("recv_from error : {}", e.raw_os_error().unwrap());
370                            continue;
371                        }
372                        raknet_log_debug!("recv_from error : {}", e);
373                        connected_s.close();
374                        break;
375                    }
376                };
377
378                match sender.send(buf[..size].to_vec()).await {
379                    Ok(_) => {}
380                    Err(e) => {
381                        raknet_log_debug!("channel send error : {}", e);
382                        connected_s.close();
383                        break;
384                    }
385                };
386            }
387            raknet_log_debug!("{} , recv_from finished", peer_addr);
388        });
389
390        let (sender_sender, sender_receiver) = channel::<(Vec<u8>, SocketAddr, bool, u8)>(10);
391
392        let ret = RaknetSocket {
393            peer_addr: *addr,
394            local_addr: s.local_addr().unwrap(),
395            user_data_receiver: Arc::new(Mutex::new(user_data_receiver)),
396            recvq: Arc::new(Mutex::new(RecvQ::new())),
397            sendq,
398            close_notifier: connected,
399            last_heartbeat_time: Arc::new(AtomicI64::new(cur_timestamp_millis())),
400            enable_loss: Arc::new(AtomicBool::new(false)),
401            loss_rate: Arc::new(AtomicU8::new(0)),
402            incomming_notifier: Arc::new(Notify::new()),
403            sender: sender_sender,
404            drop_notifier: Arc::new(Notify::new()),
405            raknet_version,
406        };
407
408        ret.start_receiver(&s, receiver, user_data_sender);
409        ret.start_tick(&s, None);
410        ret.start_sender(&s, sender_receiver);
411        ret.drop_watcher().await;
412
413        raknet_log_debug!("wait incomming notify");
414        ret.incomming_notifier.notified().await;
415
416        Ok(ret)
417    }
418
419    fn start_receiver(
420        &self,
421        s: &Arc<UdpSocket>,
422        mut receiver: Receiver<Vec<u8>>,
423        user_data_sender: Sender<Vec<u8>>,
424    ) {
425        let connected = self.close_notifier.clone();
426        let peer_addr = self.peer_addr;
427        let local_addr = self.local_addr;
428        let sendq = self.sendq.clone();
429        let recvq = self.recvq.clone();
430        let last_heartbeat_time = self.last_heartbeat_time.clone();
431        let incomming_notify = self.incomming_notifier.clone();
432        let s = s.clone();
433        let enable_loss = self.enable_loss.clone();
434        let loss_rate = self.loss_rate.clone();
435        tokio::spawn(async move {
436            loop {
437                if connected.is_closed() {
438                    let mut recvq = recvq.lock().await;
439                    for f in recvq.flush(&peer_addr) {
440                        RaknetSocket::handle(
441                            &f,
442                            &peer_addr,
443                            &local_addr,
444                            &sendq,
445                            &user_data_sender,
446                            &incomming_notify,
447                        )
448                        .await
449                        .unwrap();
450                    }
451                    break;
452                }
453
454                let buf = match receiver.recv().await {
455                    Some(buf) => buf,
456                    None => {
457                        raknet_log_debug!("channel receiver finished");
458                        connected.close();
459                        break;
460                    }
461                };
462
463                last_heartbeat_time.store(cur_timestamp_millis(), Ordering::Relaxed);
464
465                if PacketID::from(buf[0]).unwrap() == PacketID::Disconnect {
466                    connected.close();
467                    break;
468                }
469
470                if buf[0] == PacketID::Ack.to_u8() {
471                    //handle ack
472                    let mut sendq = sendq.write().await;
473                    let ack = read_packet_ack(&buf).unwrap();
474                    for i in 0..ack.record_count {
475                        if ack.sequences[i as usize].0 == ack.sequences[i as usize].1 {
476                            sendq.ack(ack.sequences[i as usize].0, cur_timestamp_millis());
477                        } else {
478                            for i in ack.sequences[i as usize].0..ack.sequences[i as usize].1 + 1 {
479                                sendq.ack(i, cur_timestamp_millis());
480                            }
481                        }
482                    }
483                    continue;
484                }
485
486                if buf[0] == PacketID::Nack.to_u8() {
487                    //handle nack
488                    let nack = read_packet_nack(&buf).unwrap();
489
490                    let mut sendq = sendq.write().await;
491
492                    for i in 0..nack.record_count {
493                        if nack.sequences[i as usize].0 == nack.sequences[i as usize].1 {
494                            sendq.nack(nack.sequences[i as usize].0, cur_timestamp_millis());
495                        } else {
496                            for i in nack.sequences[i as usize].0..nack.sequences[i as usize].1 + 1
497                            {
498                                sendq.nack(i, cur_timestamp_millis());
499                            }
500                        }
501                    }
502                    continue;
503                }
504
505                // handle packet in here
506                if buf[0] >= PacketID::FrameSetPacketBegin.to_u8()
507                    && buf[0] <= PacketID::FrameSetPacketEnd.to_u8()
508                {
509                    let frames = FrameVec::new(buf.clone()).unwrap();
510
511                    let mut recvq = recvq.lock().await;
512                    let mut is_break = false;
513                    for frame in frames.frames {
514                        recvq.insert(frame).unwrap();
515
516                        for f in recvq.flush(&peer_addr) {
517                            if !RaknetSocket::handle(
518                                &f,
519                                &peer_addr,
520                                &local_addr,
521                                &sendq,
522                                &user_data_sender,
523                                &incomming_notify,
524                            )
525                            .await
526                            .unwrap()
527                            {
528                                raknet_log_info!("handle over");
529                                connected.close();
530                                is_break = true;
531                            };
532                        }
533
534                        if is_break {
535                            break;
536                        }
537                    }
538
539                    //flush ack
540                    let acks = recvq.get_ack();
541
542                    if !acks.is_empty() {
543                        let packet = Ack {
544                            record_count: acks.len() as u16,
545                            sequences: acks,
546                        };
547
548                        let buf = write_packet_ack(&packet).unwrap();
549                        RaknetSocket::sendto(
550                            &s,
551                            &buf,
552                            &peer_addr,
553                            enable_loss.load(Ordering::Relaxed),
554                            loss_rate.load(Ordering::Relaxed),
555                        )
556                        .await
557                        .unwrap();
558                    }
559                } else {
560                    raknet_log_debug!("unknown packetid : {}", buf[0]);
561                }
562            }
563
564            raknet_log_debug!("{} , receiver finished", peer_addr);
565        });
566    }
567
568    fn start_sender(
569        &self,
570        s: &Arc<UdpSocket>,
571        mut receiver: Receiver<(Vec<u8>, SocketAddr, bool, u8)>,
572    ) {
573        let connected = self.close_notifier.clone();
574        let s = s.clone();
575        tokio::spawn(async move {
576            loop {
577                tokio::select! {
578                    a = receiver.recv() => {
579                        match a {
580                            Some(p) => {
581                                match RaknetSocket::sendto(&s, &p.0, &p.1, p.2, p.3).await{
582                                    Ok(_) => {},
583                                    Err(e) => {
584                                        raknet_log_debug!("sendto error : {}" , e);
585                                        break;
586                                    },
587                                }
588                            },
589                            None => {
590                                raknet_log_debug!("sender worker's receiver channel closed");
591                                break;
592                            },
593                        };
594                    },
595                    _ = connected.acquire() => {
596                        raknet_log_debug!("sender close notified");
597                        break;
598                    }
599                }
600            }
601
602            raknet_log_debug!("sender worker closed");
603        });
604    }
605
606    fn start_tick(&self, s: &Arc<UdpSocket>, collecter: Option<Arc<Mutex<Sender<SocketAddr>>>>) {
607        let connected = self.close_notifier.clone();
608        let s = s.clone();
609        let peer_addr = self.peer_addr;
610        let sendq = self.sendq.clone();
611        let recvq = self.recvq.clone();
612        let mut last_monitor_tick = cur_timestamp_millis();
613        let enable_loss = self.enable_loss.clone();
614        let loss_rate = self.loss_rate.clone();
615        let last_heartbeat_time = self.last_heartbeat_time.clone();
616        tokio::spawn(async move {
617            loop {
618                sleep(std::time::Duration::from_millis(
619                    SendQ::DEFAULT_TIMEOUT_MILLS as u64,
620                ))
621                .await;
622
623                // flush nack
624                let mut recvq = recvq.lock().await;
625                let nacks = recvq.get_nack();
626                if !nacks.is_empty() {
627                    let nack = Nack {
628                        record_count: nacks.len() as u16,
629                        sequences: nacks,
630                    };
631
632                    let buf = write_packet_nack(&nack).unwrap();
633                    RaknetSocket::sendto(
634                        &s,
635                        &buf,
636                        &peer_addr,
637                        enable_loss.load(Ordering::Relaxed),
638                        loss_rate.load(Ordering::Relaxed),
639                    )
640                    .await
641                    .unwrap();
642                }
643
644                //flush sendq
645                let mut sendq = sendq.write().await;
646                for f in sendq.flush(cur_timestamp_millis(), &peer_addr) {
647                    let data = f.serialize().unwrap();
648                    RaknetSocket::sendto(
649                        &s,
650                        &data,
651                        &peer_addr,
652                        enable_loss.load(Ordering::Relaxed),
653                        loss_rate.load(Ordering::Relaxed),
654                    )
655                    .await
656                    .unwrap();
657                }
658
659                //monitor log
660                if cur_timestamp_millis() - last_monitor_tick > 10000 {
661                    raknet_log_debug!("peer addr : {} , sendq size : {} , sentq size : {} , rto : {} , recvq size : {} ,  recvq fragment size : {} , ordered queue size : {} - {:?}" , 
662                        peer_addr,
663                        sendq.get_reliable_queue_size(),
664                        sendq.get_sent_queue_size(),
665                        sendq.get_rto(),
666                        recvq.get_size(),
667                        recvq.get_fragment_queue_size(),
668                        recvq.get_ordered_packet(),
669                        recvq.get_ordered_keys()
670                    );
671                    last_monitor_tick = cur_timestamp_millis();
672                }
673
674                // if exceed 60s not received any packet will close connection.
675                if cur_timestamp_millis() - last_heartbeat_time.load(Ordering::Relaxed)
676                    > RECEIVE_TIMEOUT
677                {
678                    raknet_log_debug!("recv timeout");
679                    connected.close();
680                    break;
681                }
682
683                if connected.is_closed() {
684                    for _ in 0..10 {
685                        RaknetSocket::sendto(
686                            &s,
687                            &[PacketID::Disconnect.to_u8()],
688                            &peer_addr,
689                            enable_loss.load(Ordering::Relaxed),
690                            loss_rate.load(Ordering::Relaxed),
691                        )
692                        .await
693                        .unwrap();
694                    }
695                    break;
696                }
697            }
698
699            match collecter {
700                Some(p) => {
701                    match p.lock().await.send(peer_addr).await {
702                        Ok(_) => {}
703                        Err(e) => {
704                            raknet_log_error!("channel send error : {}", e);
705                        }
706                    };
707                }
708                None => {}
709            }
710            raknet_log_debug!("{} , ticker finished", peer_addr);
711        });
712    }
713
714    /// Close Raknet Socket.
715    /// Normally you don't need to call this method, the RaknetSocket will be closed automatically when it is released.
716    /// This method can be called repeatedly.
717    ///
718    /// # Example
719    /// ```ignore
720    /// let (latency, motd) = socket::RaknetSocket::ping("127.0.0.1:19132".parse().unwrap()).await.unwrap();
721    /// assert!((0..10).contains(&latency));
722    /// ```
723    pub async fn close(&self) -> Result<()> {
724        if !self.close_notifier.is_closed() {
725            self.sendq
726                .write()
727                .await
728                .insert(Reliability::Reliable, &[PacketID::Disconnect.to_u8()])?;
729            self.close_notifier.close();
730        }
731        Ok(())
732    }
733
734    /// Unconnected ping a Raknet Server and return latency and motd.
735    ///
736    /// # Example
737    /// ```ignore
738    /// let (latency, motd) = socket::RaknetSocket::ping("127.0.0.1:19132".parse().unwrap()).await.unwrap();
739    /// assert!((0..10).contains(&latency));
740    /// ```
741    pub async fn ping(addr: &SocketAddr) -> Result<(i64, String)> {
742        let s = match UdpSocket::bind("0.0.0.0:0").await {
743            Ok(p) => p,
744            Err(_) => return Err(RaknetError::BindAdressError),
745        };
746
747        loop {
748            let packet = PacketUnconnectedPing {
749                time: cur_timestamp_millis(),
750                magic: true,
751                guid: rand::random(),
752            };
753
754            let buf = write_packet_ping(&packet)?;
755
756            match s.send_to(buf.as_slice(), addr).await {
757                Ok(_) => {}
758                Err(e) => {
759                    raknet_log_error!("udp socket sendto error {}", e);
760                    return Err(RaknetError::SocketError);
761                }
762            };
763
764            let mut buf = [0u8; 1024];
765
766            match match tokio::time::timeout(
767                std::time::Duration::from_secs(5),
768                s.recv_from(&mut buf),
769            )
770            .await
771            {
772                Ok(p) => p,
773                Err(_) => {
774                    continue;
775                }
776            } {
777                Ok(p) => p,
778                Err(_) => return Err(RaknetError::SocketError),
779            };
780
781            if let Ok(p) = read_packet_pong(&buf) {
782                return Ok((p.time - packet.time, p.motd));
783            };
784
785            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
786        }
787    }
788
789    /// Send a packet
790    ///
791    /// packet must be `0xfe` as the first byte, using other values of bytes may cause unexpected errors.
792    ///
793    /// Except Reliability::ReliableOrdered, all other reliability packets must be less than MTU - 60 (default 1340 bytes), otherwise RaknetError::PacketSizeExceedMTU will be returned
794    ///
795    /// # Example
796    /// ```ignore
797    /// let socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
798    /// socket.send(&[0xfe], Reliability::ReliableOrdered).await.unwrap();
799    /// ```
800    pub async fn send(&self, buf: &[u8], r: Reliability) -> Result<()> {
801        if buf.is_empty() {
802            return Err(RaknetError::PacketHeaderError);
803        }
804
805        if buf[0] != 0xfe {
806            return Err(RaknetError::PacketHeaderError);
807        }
808
809        if self.close_notifier.is_closed() {
810            return Err(RaknetError::ConnectionClosed);
811        }
812
813        //flush sendq
814        let mut sendq = self.sendq.write().await;
815        sendq.insert(r, buf)?;
816        let sender = self.sender.clone();
817        for f in sendq.flush(cur_timestamp_millis(), &self.peer_addr) {
818            let data = f.serialize().unwrap();
819            sender
820                .send((
821                    data,
822                    self.peer_addr,
823                    self.enable_loss.load(Ordering::Relaxed),
824                    self.loss_rate.load(Ordering::Relaxed),
825                ))
826                .await
827                .unwrap();
828        }
829        Ok(())
830    }
831
832    /// Wait all packet acked
833    ///
834    /// # Example
835    /// ```ignore
836    /// let socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
837    /// socket.send(&[0xfe], Reliability::ReliableOrdered).await.unwrap();
838    /// socket.flush().await.unwrap();
839    /// ```
840    pub async fn flush(&self) -> Result<()> {
841        loop {
842            {
843                if self.close_notifier.is_closed() {
844                    return Err(RaknetError::ConnectionClosed);
845                }
846                let sendq = self.sendq.read().await;
847                if sendq.is_empty() {
848                    return Ok(());
849                }
850            }
851            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
852        }
853    }
854
855    /// Recv a packet
856    ///
857    /// # Example
858    /// ```ignore
859    /// let socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
860    /// let buf = socket.recv().await.unwrap();
861    /// if buf[0] == 0xfe{
862    ///    //do something
863    /// }
864    /// ```
865    pub async fn recv(&self) -> Result<Vec<u8>> {
866        match self.user_data_receiver.lock().await.recv().await {
867            Some(p) => Ok(p),
868            None => {
869                if self.close_notifier.is_closed() {
870                    return Err(RaknetError::ConnectionClosed);
871                }
872                Err(RaknetError::SocketError)
873            }
874        }
875    }
876
877    /// Returns the socket address of the remote peer of this Raknet connection.
878    ///
879    /// # Example
880    /// ```ignore
881    /// let socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
882    /// assert_eq!(socket.peer_addr().unwrap(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 19132)));
883    /// ```
884    pub fn peer_addr(&self) -> Result<SocketAddr> {
885        Ok(self.peer_addr)
886    }
887
888    /// Returns the socket address of the local half of this Raknet connection.
889    ///
890    /// # Example
891    /// ```ignore
892    /// let mut socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
893    /// assert_eq!(socket.local_addr().unwrap().ip(), IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
894    /// ```
895    pub fn local_addr(&self) -> Result<SocketAddr> {
896        Ok(self.local_addr)
897    }
898
899    /// return the raknet version used by this connection.
900    pub fn raknet_version(&self) -> Result<u8> {
901        Ok(self.raknet_version)
902    }
903
904    /// Set the packet loss rate and use it for testing
905    ///
906    /// The `stage` parameter ranges from 0 to 10, indicating a packet loss rate of 0% to 100%.
907    /// # Example
908    /// ```ignore
909    /// let mut socket = RaknetSocket::connect("127.0.0.1:19132".parse().unwrap()).await.unwrap();
910    /// // set 20% loss packet rate.
911    /// socket.set_loss_rate(8);
912    /// ```
913    pub fn set_loss_rate(&mut self, stage: u8) {
914        self.enable_loss.store(true, Ordering::Relaxed);
915        self.loss_rate.store(stage, Ordering::Relaxed);
916    }
917
918    async fn drop_watcher(&self) {
919        let close_notifier = self.close_notifier.clone();
920        let drop_notifier = self.drop_notifier.clone();
921        tokio::spawn(async move {
922            raknet_log_debug!("socket drop watcher start");
923            drop_notifier.notify_one();
924
925            drop_notifier.notified().await;
926
927            if close_notifier.is_closed() {
928                raknet_log_debug!("socket close notifier closed");
929                return;
930            }
931
932            close_notifier.close();
933
934            raknet_log_debug!("socket drop watcher closed");
935        });
936
937        self.drop_notifier.notified().await;
938    }
939}
940
941impl Drop for RaknetSocket {
942    fn drop(&mut self) {
943        self.drop_notifier.notify_one();
944    }
945}