1
2use std::{
4 sync::{Arc, RwLock,},
5 time::Duration,
6};
7use async_std::{
8 task
9};
10use futures::future::AbortRegistration;
11use cyfs_base::*;
12use crate::{
13 types::*,
14 protocol::{v0::*},
15 interface::{*, udp::{Interface}},
16 stack::{WeakStack, Stack},
17};
18use super::{
19 udp::{self, *}
20};
21
22#[derive(Clone)]
23pub struct PingConfig {
24 pub interval: Duration,
25 pub udp: udp::Config
26}
27
28#[derive(Debug, Clone, Copy, Eq, PartialEq)]
29pub enum SnStatus {
30 Online,
31 Offline
32}
33
34
35impl std::fmt::Display for SnStatus {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 let v = match self {
38 Self::Online => "online",
39 Self::Offline => "offline",
40 };
41
42 write!(f, "{}", v)
43 }
44}
45
46
47impl std::str::FromStr for SnStatus {
48 type Err = BuckyError;
49
50 fn from_str(s: &str) -> BuckyResult<Self> {
51 match s {
52 "online" => Ok(Self::Online),
53 "offline" => Ok(Self::Offline),
54 _ => {
55 let msg = format!("unknown SnStatus value: {}", s);
56 log::error!("{}", msg);
57
58 Err(BuckyError::new(BuckyErrorCode::InvalidData, msg))
59 }
60 }
61 }
62}
63
64#[derive(Clone, Debug)]
65pub struct PingSessionResp {
66 pub from: Endpoint,
67 pub err: BuckyErrorCode,
68 pub endpoints: Vec<Endpoint>
69}
70
71
72#[async_trait::async_trait]
73pub trait PingSession: Send + Sync + std::fmt::Display {
74 fn sn(&self) -> &DeviceId;
75 fn local(&self) -> Endpoint;
76 fn reset(&self, local_device: Option<Device>, sn_endpoint: Option<Endpoint>) -> Box<dyn PingSession>;
77 fn clone_as_ping_session(&self) -> Box<dyn PingSession>;
78 async fn wait(&self) -> BuckyResult<PingSessionResp>;
79 fn stop(&self);
80 fn on_time_escape(&self, _now: Timestamp) {
81
82 }
83 fn on_udp_ping_resp(&self, _resp: &SnPingResp, _from: &Endpoint) -> BuckyResult<()> {
84 Ok(())
85 }
86}
87
88
89enum ActiveState {
90 FirstTry(Box<dyn PingSession>),
91 SecondTry(Box<dyn PingSession>),
92 Wait(Timestamp, Box<dyn PingSession>)
93}
94
95impl ActiveState {
96 fn cur_session(&self) -> Box<dyn PingSession> {
97 match self {
98 Self::FirstTry(session) => session.clone_as_ping_session(),
99 Self::SecondTry(session) => session.clone_as_ping_session(),
100 Self::Wait(_, session) => session.clone_as_ping_session()
101 }
102 }
103 fn trying_session(&self) -> Option<Box<dyn PingSession>> {
104 match self {
105 Self::FirstTry(session) => Some(session.clone_as_ping_session()),
106 Self::SecondTry(session) => Some(session.clone_as_ping_session()),
107 _ => None
108 }
109 }
110}
111
112struct ClientState {
113 ipv4: Ipv4ClientState,
114 ipv6: Ipv6ClientState
115}
116
117enum Ipv4ClientState {
118 Init(StateWaiter),
119 Connecting {
120 waiter: StateWaiter,
121 sessions: Vec<Box<dyn PingSession>>,
122 },
123 Active {
124 waiter: StateWaiter,
125 state: ActiveState
126 },
127 Timeout,
128 Stopped
129}
130
131enum Ipv6ClientState {
132 None,
133 Try(Box<dyn PingSession>),
134 Wait(Timestamp, Box<dyn PingSession>)
135}
136
137struct ClientImpl {
138 stack: WeakStack,
139 config: PingConfig,
140 sn_index: usize,
141 sn_id: DeviceId,
142 sn: Device,
143 gen_seq: Arc<TempSeqGenerator>,
144 net_listener: NetListener,
145 local_device: RwLock<Device>,
146 state: RwLock<ClientState>
147}
148
149#[derive(Clone)]
150pub struct PingClient(Arc<ClientImpl>);
151
152impl std::fmt::Display for PingClient {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 let stack = Stack::from(&self.0.stack);
155 write!(f, "PingClients{{local:{}, sn:{}}}", stack.local_device_id(), self.sn())
156 }
157}
158
159impl PingClient {
160 pub(crate) fn new(
161 stack: WeakStack,
162 config: PingConfig,
163 gen_seq: Arc<TempSeqGenerator>,
164 net_listener: NetListener,
165 sn_index: usize,
166 sn: Device,
167 local_device: Device,
168 ) -> Self {
169 let strong_stack = Stack::from(&stack);
170 let sn_id = sn.desc().device_id();
171 strong_stack.keystore().reset_peer(&sn_id);
172
173 Self(Arc::new(ClientImpl {
174 stack,
175 config,
176 gen_seq,
177 net_listener,
178 sn,
179 sn_id,
180 sn_index,
181 local_device: RwLock::new(local_device),
182 state: RwLock::new(ClientState {
183 ipv4: Ipv4ClientState::Init(StateWaiter::new()),
184 ipv6: Ipv6ClientState::None
185 })
186 }))
187 }
188
189 pub(crate) fn reset(
190 &self,
191 net_listener: NetListener,
192 local_device: Device,
193 ) -> Self {
194 Self(Arc::new(ClientImpl {
195 stack: self.0.stack.clone(),
196 config: self.0.config.clone(),
197 sn_id: self.0.sn_id.clone(),
198 sn_index: self.0.sn_index,
199 sn: self.0.sn.clone(),
200 gen_seq: self.0.gen_seq.clone(),
201 net_listener,
202 local_device: RwLock::new(local_device),
203 state: RwLock::new(ClientState {
204 ipv4: Ipv4ClientState::Init(StateWaiter::new()),
205 ipv6: Ipv6ClientState::None
206 })
207 }))
208 }
209
210 pub fn ptr_eq(&self, other: &Self) -> bool {
211 Arc::ptr_eq(&self.0, &other.0)
212 }
213
214 pub fn local_device(&self) -> Device {
215 self.0.local_device.read().unwrap().clone()
216 }
217
218 fn net_listener(&self) -> &NetListener {
219 &self.0.net_listener
220 }
221
222 pub fn stop(&self) {
223 let (waiter, sessions) = {
224 let mut state = self.0.state.write().unwrap();
225 let (waiter, mut sessions) = match &mut state.ipv4 {
226 Ipv4ClientState::Init(waiter) => {
227 let waiter = waiter.transfer();
228 state.ipv4 = Ipv4ClientState::Stopped;
229 (Some(waiter), vec![])
230 },
231 Ipv4ClientState::Connecting {
232 waiter,
233 sessions
234 } => {
235 let waiter = waiter.transfer();
236 let sessions = sessions.iter().map(|s| s.clone_as_ping_session()).collect();
237 state.ipv4 = Ipv4ClientState::Stopped;
238 (Some(waiter), sessions)
239 },
240 Ipv4ClientState::Active {
241 waiter,
242 state: active
243 } => {
244 let waiter = waiter.transfer();
245 let sessions = if let Some(session) = active.trying_session() {
246 vec![session]
247 } else {
248 vec![]
249 };
250 state.ipv4 = Ipv4ClientState::Stopped;
251 (Some(waiter), sessions)
252 },
253 _ => (None, vec![])
254 };
255
256 match &mut state.ipv6 {
257 Ipv6ClientState::Try(session) => {
258 sessions.push(session.clone_as_ping_session());
259 state.ipv6 = Ipv6ClientState::None
260 },
261 _ => {}
262 }
263
264 (waiter, sessions)
265 };
266
267 if let Some(waiter) = waiter {
268 waiter.wake()
269 };
270
271 for session in sessions {
272 session.stop();
273 }
274
275 }
276
277
278 pub fn sn(&self) -> &DeviceId {
279 &self.0.sn_id
280 }
281
282 pub fn index(&self) -> usize {
283 self.0.sn_index
284 }
285
286
287 async fn update_local(&self, local: Endpoint, outer: Endpoint) {
288 let update = self.net_listener().update_outer(&local, &outer);
289 if update > UpdateOuterResult::None {
290 info!("{} update local {} => {}", self, local, outer);
291 let mut local_dev = self.local_device();
292 let device_sn_list = local_dev.mut_connect_info().mut_sn_list();
293 device_sn_list.clear();
294 device_sn_list.push(self.sn().clone());
295
296 let device_endpoints = local_dev.mut_connect_info().mut_endpoints();
297 device_endpoints.clear();
298 let bound_endpoints = self.net_listener().endpoints();
299 for ep in bound_endpoints {
300 device_endpoints.push(ep);
301 }
302
303 local_dev.body_mut().as_mut().unwrap().increase_update_time(bucky_time_now());
304
305 let stack = Stack::from(&self.0.stack);
306 let _ = sign_and_set_named_object_body(
307 stack.keystore().signer(),
308 &mut local_dev,
309 &SignatureSource::RefIndex(0),
310 ).await;
311
312
313
314 let updated = {
315 let mut store = self.0.local_device.write().unwrap();
316 if store.body().as_ref().unwrap().update_time() < local_dev.body().as_ref().unwrap().update_time() {
317 *store = local_dev;
318 true
319 } else {
320 false
321 }
322 };
323
324 if updated {
325 if local.addr().is_ipv6() {
326 if let Ok(status) = self.wait_online().await {
327 if SnStatus::Online == status {
328 self.ping_ipv4_once();
329 }
330 }
331 } else {
332 self.ping_ipv4_once();
333 }
334 }
335 }
336 }
337
338 fn ping_ipv4_once(&self) {
339 info!("{} ping once", self);
340 let mut state = self.0.state.write().unwrap();
341 match &mut state.ipv4 {
342 Ipv4ClientState::Active {
343 state: active,
344 ..
345 } => {
346 match active {
347 ActiveState::Wait(_, session) => {
348 let session = session.reset(Some(self.local_device()), None);
349 *active = ActiveState::FirstTry(session.clone_as_ping_session());
350 {
351
352 let client = self.clone();
353 let session = session.clone_as_ping_session();
354 task::spawn(async move {
355 client.sync_session_resp(session.as_ref(), session.wait().await);
356 });
357 }
358 },
359 _ => {}
360 }
361 },
362 _ => {}
363 }
364 }
365
366 fn sync_session_resp(&self, session: &dyn PingSession, result: BuckyResult<PingSessionResp>) {
367 if session.local().addr().is_ipv4() {
368 self.sync_ipv4_session_resp(session, result);
369 } else if session.local().addr().is_ipv6() {
370 self.sync_ipv6_session_resp(session, result);
371 } else {
372 unreachable!()
373 }
374 }
375
376
377 fn sync_ipv6_session_resp(&self, session: &dyn PingSession, result: BuckyResult<PingSessionResp>) {
378 info!("{} wait session {} finished {:?}", self, session, result);
379
380 enum NextStep {
381 None,
382 Update(Endpoint, Endpoint),
383 }
384
385 let next = {
386 let mut state = self.0.state.write().unwrap();
387 match &state.ipv6 {
388 Ipv6ClientState::Try(session) => {
389 let session = session.clone_as_ping_session();
390 state.ipv6 = Ipv6ClientState::Wait(bucky_time_now() + self.0.config.interval.as_micros() as u64, session.reset(None, None));
391 match result {
392 Ok(resp) => if resp.endpoints.len() > 0 {
393 NextStep::Update(session.local().clone(), resp.endpoints[0])
394 } else {
395 NextStep::None
396 },
397 Err(_) => NextStep::None
398 }
399 },
400 _ => NextStep::None,
401 }
402 };
403
404 if let NextStep::Update(local, outer) = next {
405 let client = self.clone();
406 task::spawn(async move {
407 client.update_local(local, outer).await;
408 });
409 }
410 }
411
412
413 fn sync_ipv4_session_resp(&self, session: &dyn PingSession, result: BuckyResult<PingSessionResp>) {
414 info!("{} wait session {} finished {:?}", self, session, result);
415 struct NextStep {
416 waiter: Option<StateWaiter>,
417 update: Option<(Endpoint, Endpoint)>,
418 to_start: Option<Box<dyn PingSession>>,
419 ping_once: bool,
420 update_cache: Option<Option<Endpoint>>
421 }
422
423 impl NextStep {
424 fn none() -> Self {
425 Self {
426 waiter: None,
427 update: None,
428 to_start: None,
429 ping_once: false,
430 update_cache: None
431 }
432 }
433 }
434
435 let next = {
436 let mut state = self.0.state.write().unwrap();
437 match &mut state.ipv4 {
438 Ipv4ClientState::Connecting {
439 waiter,
440 sessions
441 } => {
442 if let Some(index) = sessions.iter().enumerate().find_map(|(index, exists)| if exists.local() == session.local() { Some(index) } else { None }) {
443 match result {
444 Ok(resp) => {
445 let mut next = NextStep::none();
446 next.waiter = Some(waiter.transfer());
447
448 if resp.endpoints.len() > 0 {
449 next.update = Some((session.local(), resp.endpoints[0]));
450 }
451
452 info!("{} online", self);
453
454 next.update_cache = Some(Some(resp.from));
455 state.ipv4 = Ipv4ClientState::Active {
456 waiter: StateWaiter::new(),
457 state: ActiveState::Wait(bucky_time_now() + self.0.config.interval.as_micros() as u64, session.reset(None, Some(resp.from)))
458 };
459
460 next
461 },
462 Err(_err) => {
463 sessions.remove(index);
464 let mut next = NextStep::none();
465 if sessions.len() == 0 {
466 error!("{} timeout", self);
467 next.waiter = Some(waiter.transfer());
468 state.ipv4 = Ipv4ClientState::Timeout;
469 }
470
471 next
472 }
473 }
474 } else {
475 NextStep::none()
476 }
477 },
478 Ipv4ClientState::Active {
479 waiter,
480 state: active
481 } => {
482 let mut next = NextStep::none();
483 if !active.cur_session().local().is_same_ip_addr(&session.local()) {
484 if let Ok(resp) = result {
485 if resp.endpoints.len() > 0 {
486 next.update = Some((session.local(), resp.endpoints[0]));
487 }
488 }
489 } else if active.trying_session().and_then(|exists| if exists.local() == session.local() { Some(()) } else { None }).is_some() {
490 match result {
491 Ok(resp) => {
492 *active = ActiveState::Wait(bucky_time_now() + self.0.config.interval.as_micros() as u64, session.reset(None, None));
493
494 if resp.endpoints.len() > 0 {
495 next.update = Some((session.local(), resp.endpoints[0]));
496 } else if resp.err == BuckyErrorCode::NotFound {
497 next.ping_once = true;
498 }
499 },
500 Err(_err) => {
501 match active {
502 ActiveState::FirstTry(session) => {
503 let stack = Stack::from(&self.0.stack);
504 stack.keystore().reset_peer(&self.sn());
505 let session = session.reset(None, None);
506 info!("{} start second try", self);
507 *active = ActiveState::SecondTry(session.clone_as_ping_session());
508 next.to_start = Some(session);
509 },
510 ActiveState::SecondTry(_) => {
511 next.waiter = Some(waiter.transfer());
512 error!("{} timeout", self);
513 state.ipv4 = Ipv4ClientState::Timeout;
514 next.update_cache = Some(None);
515 },
516 _ => {}
517 }
518 }
519 }
520 }
521 next
522 },
523 _ => NextStep::none()
524 }
525 };
526
527 if let Some(update) = next.update_cache {
528 let stack = Stack::from(&self.0.stack);
529 if let Some(remote) = update {
530 stack.sn_client().cache().add_active(session.sn(), EndpointPair::from((session.local().clone(), remote)));
531 } else {
532 stack.sn_client().cache().remove_active(session.sn());
533 }
534 }
535
536 if let Some(session) = next.to_start {
537 let client = self.clone();
538 task::spawn(async move {
539 client.sync_session_resp(session.as_ref(), session.wait().await);
540 });
541 }
542
543 if let Some(waiter) = next.waiter {
544 waiter.wake();
545 }
546
547 if let Some((local, outer)) = next.update {
548 let client = self.clone();
549 task::spawn(async move {
550 client.update_local(local, outer).await;
551 });
552 } else if next.ping_once {
553 self.ping_ipv4_once();
554 }
555
556 }
557
558 pub async fn wait_offline(&self) -> BuckyResult<()> {
559 enum NextStep {
560 Wait(AbortRegistration),
561 Return(BuckyResult<()>)
562 }
563
564 let next = {
565 let mut state = self.0.state.write().unwrap();
566 match &mut state.ipv4 {
567 Ipv4ClientState::Stopped => NextStep::Return(Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"))),
568 Ipv4ClientState::Active {
569 waiter,
570 ..
571 } => NextStep::Wait(waiter.new_waiter()),
572 Ipv4ClientState::Timeout => NextStep::Return(Ok(())),
573 _ => NextStep::Return(Err(BuckyError::new(BuckyErrorCode::ErrorState, "not online"))),
574 }
575 };
576
577 match next {
578 NextStep::Return(result) => result,
579 NextStep::Wait(waiter) => {
580 StateWaiter::wait(waiter, || {
581 let state = self.0.state.read().unwrap();
582 match &state.ipv4 {
583 Ipv4ClientState::Stopped => Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled")),
584 Ipv4ClientState::Timeout => Ok(()),
585 _ => unreachable!()
586 }
587 }).await
588 }
589 }
590 }
591
592 pub async fn wait_online(&self) -> BuckyResult<SnStatus> {
593 info!("{} waiting online", self);
594 enum NextStep {
595 Wait(AbortRegistration),
596 Start(AbortRegistration),
597 Return(BuckyResult<SnStatus>)
598 }
599 let next = {
600 let mut state = self.0.state.write().unwrap();
601 match &mut state.ipv4 {
602 Ipv4ClientState::Init(waiter) => {
603 let waiter = waiter.new_waiter();
604 NextStep::Start(waiter)
605 },
606 Ipv4ClientState::Connecting{ waiter, ..} => NextStep::Wait(waiter.new_waiter()),
607 Ipv4ClientState::Active {..} => NextStep::Return(Ok(SnStatus::Online)),
608 Ipv4ClientState::Timeout => NextStep::Return(Ok(SnStatus::Offline)),
609 Ipv4ClientState::Stopped => NextStep::Return(Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"))),
610 }
611 };
612
613 let state = || {
614 let state = self.0.state.read().unwrap();
615 match &state.ipv4 {
616 Ipv4ClientState::Active {..} => Ok(SnStatus::Online),
617 Ipv4ClientState::Timeout => Ok(SnStatus::Offline),
618 Ipv4ClientState::Stopped => Err(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled")),
619 _ => unreachable!()
620 }
621 };
622
623 match next {
624 NextStep::Return(result) => result,
625 NextStep::Wait(waiter) => StateWaiter::wait(waiter, state).await,
626 NextStep::Start(waiter) => {
627 info!("{} started", self);
628 let mut ipv6_session = None;
629 let mut ipv4_sessions = vec![];
630 for local in self.0.net_listener.udp().iter().filter(|interface| interface.local().addr().is_ipv4()) {
631 let sn_endpoints: Vec<Endpoint> = self.0.sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_udp() && endpoint.is_same_ip_version(&local.local())).cloned().collect();
632 if sn_endpoints.len() > 0 {
633 let params = UdpSesssionParams {
634 config: self.0.config.udp.clone(),
635 local: local.clone(),
636 local_device: self.local_device(),
637 with_device: true,
638 sn_desc: self.0.sn.desc().clone(),
639 sn_endpoints,
640 };
641 let session = UdpPingSession::new(self.0.stack.clone(), self.0.gen_seq.clone(), params).clone_as_ping_session();
642
643 info!("{} add session {}", self, session);
644 ipv4_sessions.push(session);
645 }
646 };
647
648
649 for local in self.0.net_listener.udp().iter().filter(|interface| interface.local().addr().is_ipv6()) {
650 let sn_endpoints: Vec<Endpoint> = self.0.sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_udp() && endpoint.is_same_ip_version(&local.local())).cloned().collect();
651 if sn_endpoints.len() > 0 {
652 let params = UdpSesssionParams {
653 config: self.0.config.udp.clone(),
654 local: local.clone(),
655 local_device: self.local_device(),
656 with_device: false,
657 sn_desc: self.0.sn.desc().clone(),
658 sn_endpoints,
659 };
660 let session = UdpPingSession::new(self.0.stack.clone(), self.0.gen_seq.clone(), params).clone_as_ping_session();
661
662 info!("{} add session {}", self, session);
663 ipv6_session = Some(session);
664 break;
665 }
666 };
667
668 let next = {
669 let mut state = self.0.state.write().unwrap();
670 match &mut state.ipv4 {
671 Ipv4ClientState::Init(waiter) => {
672 let waiter = waiter.transfer();
673 if ipv4_sessions.len() > 0 {
674 state.ipv4 = Ipv4ClientState::Connecting {
675 waiter,
676 sessions: ipv4_sessions.iter().map(|s| s.clone_as_ping_session()).collect(),
677 };
678 if let Some(session) = ipv6_session.as_ref() {
679 state.ipv6 = Ipv6ClientState::Try(session.clone_as_ping_session());
680 }
681 Ok(true)
682 } else {
683 state.ipv4 = Ipv4ClientState::Stopped;
684 Err((BuckyError::new(BuckyErrorCode::Interrupted, "no bound endpoint"), waiter))
685 }
686 },
687 _ => Ok(false)
688 }
689 };
690
691 match next {
692 Ok(start) => {
693 if start {
694 for session in ipv4_sessions.into_iter() {
695 let client = self.clone();
696 task::spawn(async move {
697 client.sync_session_resp(session.as_ref(), session.wait().await);
698 });
699 }
700 if let Some(session) = ipv6_session {
701 let client = self.clone();
702 task::spawn(async move {
703 client.sync_session_resp(session.as_ref(), session.wait().await);
704 });
705 }
706 }
707 StateWaiter::wait(waiter, state).await
708 },
709 Err((err, waiter)) => {
710 waiter.wake();
711 Err(err)
712 }
713 }
714 }
715 }
716
717 }
718
719 pub fn on_time_escape(&self, now: Timestamp) {
720 let sessions = {
721 let mut state = self.0.state.write().unwrap();
722 let mut sessions = match &mut state.ipv4 {
723 Ipv4ClientState::Connecting {
724 sessions,
725 ..
726 } => sessions.iter().map(|session| session.clone_as_ping_session()).collect(),
727 Ipv4ClientState::Active {
728 state: active,
729 ..
730 } => {
731 match active {
732 ActiveState::Wait(next_time, session) => {
733 if now > *next_time {
734 let session = session.clone_as_ping_session();
735 *active = ActiveState::FirstTry(session.clone_as_ping_session());
736 {
737
738 let client = self.clone();
739 let session = session.clone_as_ping_session();
740 task::spawn(async move {
741 client.sync_session_resp(session.as_ref(), session.wait().await);
742 });
743 }
744 vec![session]
745 } else {
746 vec![]
747 }
748 },
749 ActiveState::FirstTry(session) => vec![session.clone_as_ping_session()],
750 ActiveState::SecondTry(session) => vec![session.clone_as_ping_session()],
751 }
752 },
753 _ => vec![]
754 };
755
756 match &mut state.ipv6 {
757 Ipv6ClientState::Try(session) => {
758 sessions.push(session.clone_as_ping_session());
759 },
760 Ipv6ClientState::Wait(next_time, session) => {
761 if now > *next_time {
762 let session = session.clone_as_ping_session();
763 state.ipv6 = Ipv6ClientState::Try(session.clone_as_ping_session());
764 sessions.push(session.clone_as_ping_session());
765 {
766 let client = self.clone();
767 let session = session.clone_as_ping_session();
768 task::spawn(async move {
769 client.sync_session_resp(session.as_ref(), session.wait().await);
770 });
771 }
772 }
773 },
774 _ => {}
775 }
776
777 sessions
778 };
779
780 for session in sessions {
781 session.on_time_escape(now);
782 }
783 }
784
785 pub fn on_udp_ping_resp(&self, resp: &SnPingResp, from: &Endpoint, interface: Interface) {
786 let sessions = {
787 let state = self.0.state.read().unwrap();
788
789 if from.addr().is_ipv4() {
790 match &state.ipv4 {
791 Ipv4ClientState::Connecting {
792 sessions,
793 ..
794 } => sessions.iter().filter_map(|session| {
795 if session.local() == interface.local() {
796 Some(session.clone_as_ping_session())
797 } else {
798 None
799 }
800 }).collect(),
801 Ipv4ClientState::Active {
802 state: active,
803 ..
804 } => {
805 if let Some(session) = active.trying_session().and_then(|session| if session.local() == interface.local() { Some(session) } else { None }) {
806 vec![session]
807 } else {
808 vec![]
809 }
810 },
811 _ => vec![]
812 }
813 } else {
814 match &state.ipv6 {
815 Ipv6ClientState::Try(session) => if session.local() == interface.local() {
816 vec![session.clone_as_ping_session()]
817 } else {
818 vec![]
819 },
820 _ => vec![]
821 }
822 }
823 };
824
825 for session in sessions {
826 let _ = session.on_udp_ping_resp(resp, from);
827 }
828 }
829
830
831}
832
833
834
835