1use cyfs_base::*;
2use crate::{
3 stack::{
4 WeakStack,
5 Stack
6 },
7 datagram::{
8 self,
9 DatagramOptions,
10 DatagramTunnelGuard
11 },
12 types::*,
13};
14use async_std::{
15 sync::Arc,
16 task,
17 future,
18};
19use std::time::Duration;
20
21struct PingStubImpl {
22 stack: WeakStack,
23}
24
25#[derive(Clone)]
26pub struct PingStub(Arc<PingStubImpl>);
27
28impl PingStub {
29 pub fn new(weak_stack: WeakStack) -> Self {
30 Self(Arc::new(PingStubImpl {
31 stack: weak_stack,
32 }))
33 }
34
35 pub fn listen(&self) {
36 let stack = Stack::from(&self.0.stack);
37 let tunnel = stack.datagram_manager().bind_reserved(datagram::ReservedVPort::Debug).unwrap();
38 task::spawn(async move {
39 loop {
40 match tunnel.recv_v().await {
41 Ok(datagrams) => {
42 for datagram in datagrams {
43 let mut options = datagram.options.clone();
44 let len = datagram.data.len();
45 if let Err(err) = tunnel.send_to(
46 datagram.data.as_ref(),
47 &mut options,
48 &datagram.source.remote,
49 datagram.source.vport) {
50 error!("ping from remote={:?} vport={:?} len={} resp err={:?}",
51 datagram.source.remote, datagram.source.vport, len, err);
52 } else {
53 debug!("ping from remote={:?} vport={:?} len={}",
54 datagram.source.remote, datagram.source.vport, len);
55 }
56 }
57 },
58 Err(err) => {
59 error!("ping recv err={:?}", err);
60 }
61 }
62 }
63 });
64 }
65
66 pub fn ping(&self) -> BuckyResult<u64> {
67 let t = bucky_time_now();
68
69 Ok(bucky_time_now() - t)
70 }
71}
72
73struct PingerImpl {
74 datagram_tunnel: DatagramTunnelGuard,
75}
76
77#[derive(Clone)]
78pub struct Pinger(Arc<PingerImpl>);
79
80impl Pinger {
81 pub fn open(weak_stack: WeakStack) -> BuckyResult<Self> {
82 let stack = Stack::from(&weak_stack);
83 let datagram_tunnel = stack.datagram_manager().bind(0)
84 .map_err(|err| format!("bind datagram tunnel failed for {}", err))?;
85
86 Ok(Self(Arc::new(PingerImpl {
87 datagram_tunnel,
88 })))
89 }
90
91 pub async fn ping(&self, remote: Device, timeout: Duration, buf: &[u8]) -> BuckyResult<Option<u64>> { let mut options = DatagramOptions::default();
93
94 let ts = cyfs_base::bucky_time_now();
95 options.sequence = Some(TempSeq::from(ts as u32));
96
97 if let Err(err) = self.0.datagram_tunnel.send_to(
98 buf,
99 &mut options,
100 &remote.desc().device_id(),
101 datagram::ReservedVPort::Debug.into()) {
102 return Err(BuckyError::new(BuckyErrorCode::CodeError, format!("ping remote={:?} send err={:?}", remote, err)))
103 }
104
105 match future::timeout(timeout, self.0.datagram_tunnel.recv_v()).await {
106 Err(err) => {
107 return Err(BuckyError::new(BuckyErrorCode::CodeError, format!("ping remote={:?} wait err={:?}", remote, err)))
108 },
109 Ok(res) => {
110 let cost = cyfs_base::bucky_time_now() - ts;
111 match res {
112 Err(err) => {
113 return Err(BuckyError::new(BuckyErrorCode::CodeError, format!("ping remote={:?} err={:?}", remote, err)))
114 },
115 Ok(datagrams) => {
116 for datagram in datagrams {
117 if let Some(opt) = datagram.options.sequence {
118 if opt == options.sequence.unwrap() {
119
120 return Ok(Some(cost))
121 }
122 }
123 }
124
125 return Ok(None)
126 }
127 }
128 }
129 }
130 }
131}