cyfs_bdt/sn/client/ping/
client.rs

1
2// use log::*;
3use std::{
4    sync::{Arc, RwLock,}, 
5    time::Duration, 
6};
7use async_std::{
8    task
9};
10use futures::future::AbortRegistration;
11use cyfs_base::*;
12use crate::{
13    types::*, 
14    protocol::{v0::*}, 
15    interface::{*, udp::{Interface}}, 
16    stack::{WeakStack, Stack},
17};
18use super::{
19    udp::{self, *}
20};
21
22#[derive(Clone)]
23pub struct PingConfig {
24    pub interval: Duration, 
25    pub udp: udp::Config
26}
27
28#[derive(Debug, Clone, Copy, Eq, PartialEq)]
29pub enum SnStatus {
30    Online, 
31    Offline
32}
33
34
35impl std::fmt::Display for SnStatus {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        let v = match self {
38            Self::Online => "online",
39            Self::Offline => "offline",
40        };
41
42        write!(f, "{}", v)
43    }
44}
45
46
47impl std::str::FromStr for SnStatus {
48    type Err = BuckyError;
49
50    fn from_str(s: &str) -> BuckyResult<Self> {
51        match s {
52            "online" => Ok(Self::Online),
53            "offline" => Ok(Self::Offline),
54            _ => {
55                let msg = format!("unknown SnStatus value: {}", s);
56                log::error!("{}", msg);
57
58                Err(BuckyError::new(BuckyErrorCode::InvalidData, msg))
59            }
60        }
61    }
62}
63
64#[derive(Clone, Debug)]
65pub struct PingSessionResp {
66    pub from: Endpoint, 
67    pub err: BuckyErrorCode, 
68    pub endpoints: Vec<Endpoint>
69}
70
71
72#[async_trait::async_trait]
73pub trait PingSession: Send + Sync + std::fmt::Display {
74    fn sn(&self) -> &DeviceId;
75    fn local(&self) -> Endpoint;
76    fn reset(&self,  local_device: Option<Device>, sn_endpoint: Option<Endpoint>) -> Box<dyn PingSession>;
77    fn clone_as_ping_session(&self) -> Box<dyn PingSession>;
78    async fn wait(&self) -> BuckyResult<PingSessionResp>;
79    fn stop(&self);
80    fn on_time_escape(&self, _now: Timestamp) {
81
82    }
83    fn on_udp_ping_resp(&self, _resp: &SnPingResp, _from: &Endpoint) -> BuckyResult<()> {
84        Ok(())
85    }
86}
87
88
89enum ActiveState {
90    FirstTry(Box<dyn PingSession>), 
91    SecondTry(Box<dyn PingSession>), 
92    Wait(Timestamp, Box<dyn PingSession>)
93}
94
95impl ActiveState {
96    fn cur_session(&self) -> Box<dyn PingSession> {
97        match self {
98            Self::FirstTry(session) => session.clone_as_ping_session(), 
99            Self::SecondTry(session) => session.clone_as_ping_session(),
100            Self::Wait(_, session) => session.clone_as_ping_session()
101        } 
102    }
103    fn trying_session(&self) -> Option<Box<dyn PingSession>> {
104        match self {
105            Self::FirstTry(session) => Some(session.clone_as_ping_session()), 
106            Self::SecondTry(session) => Some(session.clone_as_ping_session()),
107            _ => None 
108        } 
109    }
110}
111
112struct ClientState {
113    ipv4: Ipv4ClientState, 
114    ipv6: Ipv6ClientState
115}
116
117enum Ipv4ClientState {
118    Init(StateWaiter), 
119    Connecting {
120        waiter: StateWaiter, 
121        sessions: Vec<Box<dyn PingSession>>, 
122    }, 
123    Active {
124        waiter: StateWaiter, 
125        state: ActiveState
126    }, 
127    Timeout, 
128    Stopped
129}
130
131enum Ipv6ClientState {
132    None, 
133    Try(Box<dyn PingSession>),  
134    Wait(Timestamp, Box<dyn PingSession>)
135}
136
137struct ClientImpl {
138    stack: WeakStack, 
139    config: PingConfig, 
140    sn_index: usize,  
141    sn_id: DeviceId, 
142    sn: Device, 
143    gen_seq: Arc<TempSeqGenerator>, 
144    net_listener: NetListener, 
145    local_device: RwLock<Device>,  
146    state: RwLock<ClientState>
147}
148
149#[derive(Clone)]
150pub struct PingClient(Arc<ClientImpl>);
151
152impl std::fmt::Display for PingClient {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        let stack = Stack::from(&self.0.stack);
155        write!(f, "PingClients{{local:{}, sn:{}}}", stack.local_device_id(), self.sn())
156    }
157}
158
159impl PingClient {
160    pub(crate) fn new(
161        stack: WeakStack, 
162        config: PingConfig, 
163        gen_seq: Arc<TempSeqGenerator>, 
164        net_listener: NetListener, 
165        sn_index: usize, 
166        sn: Device, 
167        local_device: Device, 
168    ) -> Self {
169        let strong_stack = Stack::from(&stack);
170        let sn_id = sn.desc().device_id();
171        strong_stack.keystore().reset_peer(&sn_id);
172    
173        Self(Arc::new(ClientImpl {
174            stack, 
175            config, 
176            gen_seq, 
177            net_listener, 
178            sn, 
179            sn_id, 
180            sn_index, 
181            local_device: RwLock::new(local_device), 
182            state: RwLock::new(ClientState {
183                ipv4: Ipv4ClientState::Init(StateWaiter::new()), 
184                ipv6: Ipv6ClientState::None
185            })
186        }))
187    }
188
189    pub(crate) fn reset(
190        &self, 
191        net_listener: NetListener, 
192        local_device: Device, 
193    ) -> Self {
194        Self(Arc::new(ClientImpl {
195            stack: self.0.stack.clone(), 
196            config: self.0.config.clone(), 
197            sn_id: self.0.sn_id.clone(),
198            sn_index: self.0.sn_index, 
199            sn: self.0.sn.clone(), 
200            gen_seq: self.0.gen_seq.clone(), 
201            net_listener, 
202            local_device: RwLock::new(local_device), 
203            state: RwLock::new(ClientState {
204                ipv4: Ipv4ClientState::Init(StateWaiter::new()), 
205                ipv6: Ipv6ClientState::None
206            })
207        }))
208    }
209
210    pub fn ptr_eq(&self, other: &Self) -> bool {
211        Arc::ptr_eq(&self.0, &other.0)
212    }
213
214    pub fn local_device(&self) -> Device {
215        self.0.local_device.read().unwrap().clone()
216    }
217
218    fn net_listener(&self) -> &NetListener {
219        &self.0.net_listener
220    }
221
222    pub fn stop(&self) {
223        let (waiter, sessions) = {
224            let mut state = self.0.state.write().unwrap();
225            let (waiter, mut sessions) = match &mut state.ipv4 {
226                Ipv4ClientState::Init(waiter) => {
227                    let waiter = waiter.transfer();
228                    state.ipv4 = Ipv4ClientState::Stopped;
229                    (Some(waiter), vec![])
230                }, 
231                Ipv4ClientState::Connecting {
232                    waiter, 
233                    sessions
234                } => {
235                    let waiter = waiter.transfer();
236                    let sessions = sessions.iter().map(|s| s.clone_as_ping_session()).collect();
237                    state.ipv4 = Ipv4ClientState::Stopped;
238                    (Some(waiter), sessions)
239                },
240                Ipv4ClientState::Active {
241                    waiter, 
242                    state: active
243                } => {
244                    let waiter = waiter.transfer();
245                    let sessions = if let Some(session) = active.trying_session() {
246                        vec![session]
247                    } else {
248                        vec![]
249                    };
250                    state.ipv4 = Ipv4ClientState::Stopped;
251                    (Some(waiter), sessions)
252                },
253                _ => (None, vec![])
254            };
255
256            match &mut state.ipv6 {
257                Ipv6ClientState::Try(session) => {
258                    sessions.push(session.clone_as_ping_session());
259                    state.ipv6 = Ipv6ClientState::None
260                },
261                _ => {}
262            }
263
264            (waiter, sessions)
265        };
266
267        if let Some(waiter) = waiter {
268            waiter.wake()
269        };
270
271        for session in sessions {
272            session.stop();
273        }
274        
275    }
276
277
278    pub fn sn(&self) -> &DeviceId {
279        &self.0.sn_id
280    }
281
282    pub fn index(&self) -> usize {
283        self.0.sn_index
284    }
285
286
287    async fn update_local(&self, local: Endpoint, outer: Endpoint) {
288        let update = self.net_listener().update_outer(&local, &outer);
289        if update > UpdateOuterResult::None {
290            info!("{} update local {} => {}", self, local, outer);
291            let mut local_dev = self.local_device();
292            let device_sn_list = local_dev.mut_connect_info().mut_sn_list();
293            device_sn_list.clear();
294            device_sn_list.push(self.sn().clone());
295
296            let device_endpoints = local_dev.mut_connect_info().mut_endpoints();
297            device_endpoints.clear();
298            let bound_endpoints = self.net_listener().endpoints();
299            for ep in bound_endpoints {
300                device_endpoints.push(ep);
301            }
302
303            local_dev.body_mut().as_mut().unwrap().increase_update_time(bucky_time_now());
304
305            let stack = Stack::from(&self.0.stack);
306            let _ = sign_and_set_named_object_body(
307                stack.keystore().signer(),
308                &mut local_dev,
309                &SignatureSource::RefIndex(0),
310            ).await;
311
312           
313
314            let updated = {
315                let mut store = self.0.local_device.write().unwrap();
316                if store.body().as_ref().unwrap().update_time() < local_dev.body().as_ref().unwrap().update_time() {
317                    *store = local_dev;
318                    true
319                } else {
320                    false
321                }
322            };
323
324            if updated {
325                if local.addr().is_ipv6() {
326                    if let Ok(status) = self.wait_online().await {
327                        if SnStatus::Online == status {
328                            self.ping_ipv4_once();
329                        }
330                    }
331                } else {
332                    self.ping_ipv4_once();
333                }
334            }
335        }
336    }
337
338    fn ping_ipv4_once(&self) {
339        info!("{} ping once", self);
340        let mut state = self.0.state.write().unwrap();
341        match &mut state.ipv4 {
342            Ipv4ClientState::Active { 
343                state: active, 
344                .. 
345            } => {
346                match active {
347                    ActiveState::Wait(_, session) => {
348                        let session = session.reset(Some(self.local_device()), None);
349                        *active = ActiveState::FirstTry(session.clone_as_ping_session());
350                        {
351                        
352                            let client = self.clone();
353                            let session = session.clone_as_ping_session();
354                            task::spawn(async move {
355                                client.sync_session_resp(session.as_ref(), session.wait().await);
356                            });
357                        }
358                    }, 
359                    _ => {}
360                }
361            },
362            _ => {}
363        }
364    }
365
366    fn sync_session_resp(&self, session: &dyn PingSession, result: BuckyResult<PingSessionResp>) {
367        if session.local().addr().is_ipv4() {
368            self.sync_ipv4_session_resp(session, result);
369        } else if session.local().addr().is_ipv6() {
370            self.sync_ipv6_session_resp(session, result);
371        } else {
372            unreachable!()
373        }
374    }
375
376
377    fn sync_ipv6_session_resp(&self, session: &dyn PingSession, result: BuckyResult<PingSessionResp>) {
378        info!("{} wait session {} finished {:?}", self, session, result);
379
380        enum NextStep {
381            None, 
382            Update(Endpoint, Endpoint), 
383        }
384
385        let next = {
386            let mut state = self.0.state.write().unwrap();
387            match &state.ipv6 {
388                Ipv6ClientState::Try(session) => {
389                    let session = session.clone_as_ping_session();
390                    state.ipv6 = Ipv6ClientState::Wait(bucky_time_now() + self.0.config.interval.as_micros() as u64, session.reset(None, None));
391                    match result {
392                        Ok(resp) => if resp.endpoints.len() > 0 {
393                            NextStep::Update(session.local().clone(), resp.endpoints[0])
394                        } else {
395                            NextStep::None
396                        },
397                        Err(_) => NextStep::None
398                    }
399                },
400                _ => NextStep::None,
401            }
402        };
403
404        if let NextStep::Update(local, outer) = next {
405            let client = self.clone();
406            task::spawn(async move {
407                client.update_local(local, outer).await;
408            });
409        }
410    }
411
412
413    fn sync_ipv4_session_resp(&self, session: &dyn PingSession, result: BuckyResult<PingSessionResp>) {
414        info!("{} wait session {} finished {:?}", self, session, result);
415        struct NextStep {
416            waiter: Option<StateWaiter>, 
417            update: Option<(Endpoint, Endpoint)>, 
418            to_start: Option<Box<dyn PingSession>>, 
419            ping_once: bool, 
420            update_cache: Option<Option<Endpoint>>
421        }
422
423        impl NextStep {
424            fn none() -> Self {
425                Self {
426                    waiter: None, 
427                    update: None, 
428                    to_start: None, 
429                    ping_once: false, 
430                    update_cache: None
431                }
432            }
433        }
434
435        let next = {
436            let mut state = self.0.state.write().unwrap();
437            match &mut state.ipv4 {
438                Ipv4ClientState::Connecting {
439                    waiter, 
440                    sessions 
441                } => {
442                    if let Some(index) = sessions.iter().enumerate().find_map(|(index, exists)| if exists.local() == session.local() { Some(index) } else { None }) {
443                        match result {
444                            Ok(resp) => {
445                                let mut next = NextStep::none();
446                                next.waiter = Some(waiter.transfer());
447
448                                if resp.endpoints.len() > 0 {
449                                    next.update = Some((session.local(), resp.endpoints[0]));
450                                }
451
452                                info!("{} online", self);
453
454                                next.update_cache = Some(Some(resp.from));
455                                state.ipv4 = Ipv4ClientState::Active {
456                                    waiter: StateWaiter::new(), 
457                                    state: ActiveState::Wait(bucky_time_now() + self.0.config.interval.as_micros() as u64, session.reset(None, Some(resp.from)))
458                                };
459                                
460                                next
461                            }, 
462                            Err(_err) => {
463                                sessions.remove(index);
464                                let mut next = NextStep::none();
465                                if sessions.len() == 0 {
466                                    error!("{} timeout", self);
467                                    next.waiter = Some(waiter.transfer());
468                                    state.ipv4 = Ipv4ClientState::Timeout;
469                                }
470
471                                next
472                            }
473                        }
474                    } else {
475                        NextStep::none()
476                    }
477                }, 
478                Ipv4ClientState::Active { 
479                    waiter, 
480                    state: active 
481                } => {
482                    let mut next = NextStep::none();
483                    if !active.cur_session().local().is_same_ip_addr(&session.local()) {
484                        if let Ok(resp) = result {
485                            if resp.endpoints.len() > 0 {
486                                next.update = Some((session.local(), resp.endpoints[0]));
487                            }
488                        }
489                    } else if active.trying_session().and_then(|exists| if exists.local() == session.local() { Some(()) } else { None }).is_some() {
490                        match result {
491                            Ok(resp) => {
492                                *active = ActiveState::Wait(bucky_time_now() + self.0.config.interval.as_micros() as u64, session.reset(None, None));
493                                
494                                if resp.endpoints.len() > 0 {
495                                    next.update = Some((session.local(), resp.endpoints[0]));
496                                } else if resp.err == BuckyErrorCode::NotFound {
497                                    next.ping_once = true;
498                                }
499                            },
500                            Err(_err) => {
501                                match active {
502                                    ActiveState::FirstTry(session) => {
503                                        let stack = Stack::from(&self.0.stack);
504                                        stack.keystore().reset_peer(&self.sn());
505                                        let session = session.reset(None, None);
506                                        info!("{} start second try", self);
507                                        *active = ActiveState::SecondTry(session.clone_as_ping_session());
508                                        next.to_start = Some(session);
509                                    }, 
510                                    ActiveState::SecondTry(_) => {
511                                        next.waiter = Some(waiter.transfer());
512                                        error!("{} timeout", self);
513                                        state.ipv4 = Ipv4ClientState::Timeout;
514                                        next.update_cache = Some(None);
515                                    },
516                                    _ => {}
517                                }
518                            }
519                        }
520                    }
521                    next
522                }, 
523                _ => NextStep::none()
524            }
525        };
526
527        if let Some(update) = next.update_cache {
528            let stack = Stack::from(&self.0.stack);
529            if let Some(remote) = update {
530                stack.sn_client().cache().add_active(session.sn(), EndpointPair::from((session.local().clone(), remote)));
531            } else {
532                stack.sn_client().cache().remove_active(session.sn());
533            }
534        }
535
536        if let Some(session) = next.to_start {
537            let client = self.clone();
538            task::spawn(async move {
539                client.sync_session_resp(session.as_ref(), session.wait().await);
540            });
541        }
542
543        if let Some(waiter) = next.waiter {
544            waiter.wake();
545        }
546
547        if let Some((local, outer)) = next.update {
548            let client = self.clone();
549            task::spawn(async move {
550                client.update_local(local, outer).await;
551            });
552        } else if next.ping_once {
553            self.ping_ipv4_once();
554        }
555
556    }
557
558    pub async fn wait_offline(&self) -> BuckyResult<()> {
559        enum NextStep {
560            Wait(AbortRegistration),
561            Return(BuckyResult<()>)
562        }
563
564        let next = {
565            let mut state = self.0.state.write().unwrap();
566            match &mut state.ipv4 {
567                Ipv4ClientState::Stopped => NextStep::Return(Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"))), 
568                Ipv4ClientState::Active {
569                    waiter, 
570                    ..
571                } => NextStep::Wait(waiter.new_waiter()), 
572                Ipv4ClientState::Timeout =>  NextStep::Return(Ok(())), 
573                _ => NextStep::Return(Err(BuckyError::new(BuckyErrorCode::ErrorState, "not online"))), 
574            }
575        };
576       
577        match next {
578            NextStep::Return(result) => result, 
579            NextStep::Wait(waiter) => {
580                StateWaiter::wait(waiter, || {
581                    let state = self.0.state.read().unwrap();
582                    match &state.ipv4 {
583                        Ipv4ClientState::Stopped => Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled")), 
584                        Ipv4ClientState::Timeout =>  Ok(()), 
585                        _ => unreachable!()
586                    }
587                }).await
588            }
589        }
590    }
591
592    pub async fn wait_online(&self) -> BuckyResult<SnStatus> {
593        info!("{} waiting online", self);
594        enum NextStep {
595            Wait(AbortRegistration),
596            Start(AbortRegistration), 
597            Return(BuckyResult<SnStatus>)
598        }
599        let next = {
600            let mut state = self.0.state.write().unwrap();
601            match &mut state.ipv4 {
602                Ipv4ClientState::Init(waiter) => {
603                    let waiter = waiter.new_waiter();
604                    NextStep::Start(waiter)
605                }, 
606                Ipv4ClientState::Connecting{ waiter, ..} => NextStep::Wait(waiter.new_waiter()), 
607                Ipv4ClientState::Active {..} => NextStep::Return(Ok(SnStatus::Online)), 
608                Ipv4ClientState::Timeout =>  NextStep::Return(Ok(SnStatus::Offline)), 
609                Ipv4ClientState::Stopped => NextStep::Return(Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"))), 
610            }
611        };
612       
613        let state = || {
614            let state = self.0.state.read().unwrap();
615            match &state.ipv4 {
616                Ipv4ClientState::Active {..} => Ok(SnStatus::Online), 
617                Ipv4ClientState::Timeout =>  Ok(SnStatus::Offline), 
618                Ipv4ClientState::Stopped => Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled")), 
619                _ => unreachable!()
620            }
621        };
622
623        match next {
624            NextStep::Return(result) => result, 
625            NextStep::Wait(waiter) => StateWaiter::wait(waiter, state).await, 
626            NextStep::Start(waiter) => {
627                info!("{} started", self); 
628                let mut ipv6_session = None;
629                let mut ipv4_sessions = vec![];
630                for local in self.0.net_listener.udp().iter().filter(|interface| interface.local().addr().is_ipv4()) {
631                    let sn_endpoints: Vec<Endpoint> = self.0.sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_udp() && endpoint.is_same_ip_version(&local.local())).cloned().collect();
632                    if sn_endpoints.len() > 0 {
633                        let params = UdpSesssionParams {
634                            config: self.0.config.udp.clone(), 
635                            local: local.clone(),
636                            local_device: self.local_device(), 
637                            with_device: true, 
638                            sn_desc: self.0.sn.desc().clone(),
639                            sn_endpoints,  
640                        };
641                        let session = UdpPingSession::new(self.0.stack.clone(), self.0.gen_seq.clone(), params).clone_as_ping_session();
642                      
643                        info!("{} add session {}", self, session);
644                        ipv4_sessions.push(session);
645                    }
646                };
647
648
649                for local in self.0.net_listener.udp().iter().filter(|interface| interface.local().addr().is_ipv6()) {
650                    let sn_endpoints: Vec<Endpoint> = self.0.sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_udp() && endpoint.is_same_ip_version(&local.local())).cloned().collect();
651                    if sn_endpoints.len() > 0 {
652                        let params = UdpSesssionParams {
653                            config: self.0.config.udp.clone(), 
654                            local: local.clone(),
655                            local_device: self.local_device(), 
656                            with_device: false, 
657                            sn_desc: self.0.sn.desc().clone(),
658                            sn_endpoints,  
659                        };
660                        let session = UdpPingSession::new(self.0.stack.clone(), self.0.gen_seq.clone(), params).clone_as_ping_session();
661                        
662                        info!("{} add session {}", self, session);
663                        ipv6_session = Some(session);
664                        break; 
665                    }
666                };
667
668                let next = {
669                    let mut state = self.0.state.write().unwrap();
670                    match &mut state.ipv4 {
671                        Ipv4ClientState::Init(waiter) => {
672                            let waiter = waiter.transfer();
673                            if ipv4_sessions.len() > 0 {
674                                state.ipv4 = Ipv4ClientState::Connecting {
675                                    waiter, 
676                                    sessions: ipv4_sessions.iter().map(|s| s.clone_as_ping_session()).collect(), 
677                                };
678                                if let Some(session) = ipv6_session.as_ref() {
679                                    state.ipv6 = Ipv6ClientState::Try(session.clone_as_ping_session());
680                                }
681                                Ok(true)
682                            } else {
683                                state.ipv4 = Ipv4ClientState::Stopped;
684                                Err((BuckyError::new(BuckyErrorCode::Interrupted, "no bound endpoint"), waiter))
685                            }
686                        },
687                        _ => Ok(false)
688                    }
689                };
690
691                match next {
692                    Ok(start) => {
693                        if start {
694                            for session in ipv4_sessions.into_iter() {
695                                let client = self.clone();
696                                task::spawn(async move {
697                                    client.sync_session_resp(session.as_ref(), session.wait().await);
698                                });
699                            }
700                            if let Some(session) = ipv6_session {
701                                let client = self.clone();
702                                task::spawn(async move {
703                                    client.sync_session_resp(session.as_ref(), session.wait().await);
704                                });
705                            }
706                        }
707                        StateWaiter::wait(waiter, state).await
708                    },  
709                    Err((err, waiter)) => {
710                        waiter.wake();
711                        Err(err)
712                    }
713                }
714            }
715        }
716       
717    }
718
719    pub fn on_time_escape(&self, now: Timestamp) {
720        let sessions = {
721            let mut state = self.0.state.write().unwrap();
722            let mut sessions = match &mut state.ipv4 {
723                Ipv4ClientState::Connecting {
724                    sessions, 
725                    ..
726                } => sessions.iter().map(|session| session.clone_as_ping_session()).collect(), 
727                Ipv4ClientState::Active { 
728                    state: active, 
729                    .. 
730                } => {
731                    match active {
732                        ActiveState::Wait(next_time, session) => {
733                            if now > *next_time {
734                                let session = session.clone_as_ping_session();
735                                *active = ActiveState::FirstTry(session.clone_as_ping_session());
736                                {
737                                
738                                    let client = self.clone();
739                                    let session = session.clone_as_ping_session();
740                                    task::spawn(async move {
741                                        client.sync_session_resp(session.as_ref(), session.wait().await);
742                                    });
743                                }
744                                vec![session]
745                            } else {
746                                vec![]
747                            }
748                        }, 
749                        ActiveState::FirstTry(session) => vec![session.clone_as_ping_session()], 
750                        ActiveState::SecondTry(session) => vec![session.clone_as_ping_session()], 
751                    }
752                }, 
753                _ => vec![]
754            };
755
756            match &mut state.ipv6 {
757                Ipv6ClientState::Try(session) => {
758                    sessions.push(session.clone_as_ping_session());
759                }, 
760                Ipv6ClientState::Wait(next_time, session) => {
761                    if now > *next_time {
762                        let session = session.clone_as_ping_session();
763                        state.ipv6 = Ipv6ClientState::Try(session.clone_as_ping_session());
764                        sessions.push(session.clone_as_ping_session());
765                        {
766                            let client = self.clone();
767                            let session = session.clone_as_ping_session();
768                            task::spawn(async move {
769                                client.sync_session_resp(session.as_ref(), session.wait().await);
770                            });
771                        }
772                    }
773                },
774                _ => {}
775            }
776
777            sessions
778        };
779        
780        for session in sessions {
781            session.on_time_escape(now);
782        }
783    }
784
785    pub fn on_udp_ping_resp(&self, resp: &SnPingResp, from: &Endpoint, interface: Interface) {
786        let sessions = {
787            let state = self.0.state.read().unwrap();
788            
789            if from.addr().is_ipv4() {
790                match &state.ipv4 {
791                    Ipv4ClientState::Connecting {
792                        sessions, 
793                        ..
794                    } => sessions.iter().filter_map(|session| {
795                        if session.local() == interface.local() {
796                            Some(session.clone_as_ping_session())
797                        } else {
798                            None
799                        }
800                    }).collect(), 
801                    Ipv4ClientState::Active { 
802                        state: active, 
803                        .. 
804                    } => {
805                        if let Some(session) = active.trying_session().and_then(|session| if session.local() == interface.local() { Some(session) } else { None }) {
806                            vec![session]
807                        } else {
808                            vec![]
809                        }
810                    }, 
811                    _ => vec![]
812                }
813            } else {
814                match &state.ipv6 {
815                    Ipv6ClientState::Try(session) => if session.local() == interface.local() {
816                        vec![session.clone_as_ping_session()]
817                    } else {
818                        vec![]
819                    },  
820                    _ => vec![]
821                }
822            }
823        };
824
825        for session in sessions {
826            let _ = session.on_udp_ping_resp(resp, from);
827        }
828    }
829
830
831}
832
833
834
835