cyfs_bdt/tunnel/
container.rs

1use log::*;
2use async_std::{
3    future, 
4    task, 
5};
6use std::{
7    time::Duration, 
8    fmt, 
9    sync::{RwLock, Arc, Weak}, 
10    collections::{BTreeMap, LinkedList}, 
11    ops::Deref,
12    convert::TryFrom
13};
14use cyfs_base::*;
15use crate::{
16    types::*,
17    protocol::{*, v0::*},
18    interface::{
19        self, 
20        udp::{
21            OnUdpPackageBox, 
22            OnUdpRawData
23        },
24        tcp::{
25            OnTcpInterface
26        }
27    },
28    sn::client::{SnCache, PingClientCalledEvent}, 
29    stream::{StreamContainer, RemoteSequence}, 
30    stack::{Stack, WeakStack}, 
31    MTU
32};
33use super::{
34    tunnel::*, 
35    builder::*, 
36    udp, 
37    tcp
38};
39use core::mem;
40
41#[derive(Clone)]
42pub struct BuildTunnelParams {
43    pub remote_const: DeviceDesc, 
44    pub remote_sn: Option<Vec<DeviceId>>, 
45    pub remote_desc: Option<Device>,
46}
47
48impl fmt::Display for BuildTunnelParams {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        write!(f, "BuildTunnelParams{{remote_sn: {:?}, remote_desc:{}}}", self.remote_sn, self.remote_desc.is_some())
51    }
52}
53
54impl BuildTunnelParams {
55    pub(crate) fn nearest_sn(&self, stack: &Stack) -> Option<DeviceId> {
56        let remote = self.remote_const.device_id();
57
58        let cached_remote = stack.device_cache().get_inner(&remote);
59        let known_remote = cached_remote.as_ref().or_else(|| self.remote_desc.as_ref());
60
61        known_remote.and_then(|device| SnCache::nearest_sn_of(&remote, device.connect_info().sn_list()))
62            .or_else(|| self.remote_sn.as_ref().and_then(|sn_list| SnCache::nearest_sn_of(&remote, sn_list)))
63            .or_else(|| SnCache::nearest_sn_of(&remote, stack.sn_client().cache().known_list().as_slice()))
64    }
65
66    pub(crate) fn retry_sn_list(&self, stack: &Stack, nearest: &DeviceId) -> Option<Vec<DeviceId>> {
67        self.remote_sn.clone().or_else(|| Some(stack.sn_client().cache().known_list()))
68            .map(|sn_list| sn_list.into_iter().filter(|sn| sn != nearest).collect())
69
70    }
71}
72
73#[derive(Clone)]
74pub struct Config {
75    pub retain_timeout: Duration,  
76    pub retry_sn_timeout: Duration, 
77    pub connect_timeout: Duration, 
78    pub tcp: tcp::Config, 
79    pub udp: udp::Config
80}
81
82enum TunnelBuildState {
83    Idle, 
84    ConnectStream(ConnectStreamBuilder), 
85    AcceptStream(AcceptStreamBuilder), 
86    ConnectTunnel(ConnectTunnelBuilder), 
87    AcceptTunnel(AcceptTunnelBuilder)
88}
89
90pub enum StreamConnectorSelector {
91    Package(Timestamp), 
92    Tcp(tcp::Tunnel, Timestamp),
93    Builder(ConnectStreamBuilder)
94}
95
96struct TunnelDeadState {
97    //标记从哪个状态进入的dead
98    former_state: TunnelState, 
99    //什么时候进入dead
100    when: Timestamp
101}
102
103struct TunnelConnectingState {
104    waiter: StateWaiter, 
105    build_state: TunnelBuildState, 
106    packages: LinkedList<(DynamicPackage, bool)>
107}
108
109struct TunnelActiveState {
110    remote_timestamp: Timestamp, 
111    default_tunnel: DynamicTunnel
112}
113
114// 这里为了逻辑简单,dead状态之后不能回退;
115enum TunnelStateImpl {
116    Connecting(TunnelConnectingState), 
117    Active(TunnelActiveState),
118    Dead(TunnelDeadState)
119}
120
121struct TunnelContainerState {
122    last_update: Timestamp, 
123    tunnel_state: TunnelStateImpl, 
124    tunnel_entries: BTreeMap<EndpointPair, DynamicTunnel>
125}
126
127struct TunnelContainerImpl {
128    stack: WeakStack,
129    config: Config, 
130    remote: DeviceId,  
131    remote_const: DeviceDesc, 
132    sequence_generator: TempSeqGenerator, 
133    state: RwLock<TunnelContainerState>,
134}
135
136#[derive(Clone)]
137pub struct TunnelContainer(Arc<TunnelContainerImpl>);
138
139impl TunnelContainer {
140    pub(super) fn new(stack: WeakStack, remote_const: DeviceDesc, config: Config) -> Self {
141        Self(Arc::new(TunnelContainerImpl {
142            stack, 
143            config, 
144            remote: remote_const.device_id(), 
145            remote_const, 
146            sequence_generator: TempSeqGenerator::new(), 
147            state: RwLock::new(TunnelContainerState {
148                tunnel_entries: BTreeMap::new(), 
149                last_update: bucky_time_now(), 
150                tunnel_state: TunnelStateImpl::Connecting(TunnelConnectingState {
151                    waiter: StateWaiter::new(), 
152                    build_state: TunnelBuildState::Idle, 
153                    packages: LinkedList::new()
154                })
155            }), 
156        }))
157    }
158
159    pub fn mtu(&self) -> usize {
160        if let Ok(tunnel) = self.default_tunnel() {
161            tunnel.mtu()
162        } else {
163            MTU-12
164        }
165    }
166
167    fn sync_connecting(&self) {
168        let connect_timeout = self.config().connect_timeout;
169        let tunnel = self.clone();
170        task::spawn(async move {
171            match future::timeout(connect_timeout, tunnel.wait_active()).await {
172                Ok(_state) => {
173                    // do thing
174                }, 
175                Err(_err) => {
176                    let waiter = {
177                        let state = &mut *tunnel.0.state.write().unwrap();
178                        match &mut state.tunnel_state {
179                            TunnelStateImpl::Connecting(connecting) => {
180                                let mut ret_waiter = StateWaiter::new();
181                                connecting.waiter.transfer_into(&mut ret_waiter);
182                                state.last_update = bucky_time_now();
183                                state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
184                                    former_state: TunnelState::Connecting, 
185                                    when: bucky_time_now()
186                                });
187                                state.tunnel_entries.clear();
188                                Some(ret_waiter)
189                            }, 
190                            _ => {
191                                None
192                            }
193                        }
194                    };
195                    if let Some(waiter) = waiter {
196                        info!("{} dead for connect timeout", tunnel);
197                        waiter.wake();
198                    }
199                }
200            }
201        });
202    }
203
204    pub fn config(&self) -> &Config {
205        &self.0.config
206    }
207
208    pub fn stack(&self) -> Stack {
209        Stack::from(&self.0.stack)
210    }
211
212    pub fn remote(&self) -> &DeviceId {
213        &self.0.remote
214    }
215
216    pub fn remote_const(&self) -> &DeviceDesc {
217        &self.0.remote_const
218    }
219
220    pub fn protocol_version(&self) -> u8 {
221        0
222    }
223
224    pub fn stack_version(&self) -> u32 {
225        0
226    }
227
228    pub fn default_tunnel(&self) -> BuckyResult<DynamicTunnel> {
229        let state = self.0.state.read().unwrap();
230        match &state.tunnel_state {
231            TunnelStateImpl::Active(active) => {
232                Ok(active.default_tunnel.clone())
233            }, 
234            TunnelStateImpl::Connecting(_) => {
235                let entries = &state.tunnel_entries;
236                let mut iter = entries.iter();
237                loop {
238                    match iter.next() {
239                        Some((ep_pair, tunnel)) => {
240                            if let TunnelState::Active(_) = tunnel.as_ref().state() {
241                                if ep_pair.protocol() == Protocol::Udp {
242                                    break Some(tunnel.clone());
243                                } 
244                            }
245                        },
246                        None => break None
247                    }
248                }.ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "no default tunnel"))
249            },
250            TunnelStateImpl::Dead(_) => {
251                Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
252            }
253        }
254    }
255
256    pub fn default_udp_tunnel(&self) -> BuckyResult<udp::Tunnel> {
257        let tunnel = self.default_tunnel()?;
258        if tunnel.as_ref().local().is_udp() {
259            Ok(tunnel.clone_as_tunnel())
260        } else {
261            Err(BuckyError::new(BuckyErrorCode::NotMatch, "default tunnel not udp"))
262        }
263    }
264
265    pub fn send_packages(&self, packages: Vec<DynamicPackage>) -> Result<(), BuckyError> {
266        let tunnel = self.default_tunnel()?;
267        for package in packages {
268            tunnel.as_ref().send_package(package)?;
269        }
270        Ok(())
271    }
272
273    pub fn send_package(&self, package: DynamicPackage, plaintext: bool) -> BuckyResult<()> {
274        if plaintext {
275            assert_eq!(package.cmd_code(), PackageCmdCode::Datagram);
276            let tunnel = self.default_tunnel()?;
277
278            let mut buf = vec![0u8; MTU];
279
280            let buf_len = buf.len();
281            let enc_from = tunnel.as_ref().raw_data_header_len();
282
283            let mut context = merge_context::FirstEncode::new();
284            let enc: &dyn RawEncodeWithContext<merge_context::FirstEncode> = package.as_ref();
285            let buf_ptr = enc.raw_encode_with_context(&mut buf[enc_from..], &mut context, &None)?;
286
287            let len = buf_len - buf_ptr.len();
288
289            let _ = tunnel.as_ref().send_raw_data(&mut buf[..len])?;
290            Ok(())
291        } else {
292            let tunnel = self.default_tunnel()?;
293            let _ = tunnel.as_ref().send_package(package)?;
294            Ok(())
295        }     
296    }
297
298    pub fn build_send(&self, package: DynamicPackage, build_params: BuildTunnelParams, plaintext: bool) -> BuckyResult<()> {
299        let (tunnel_and_package, builder) = {
300            let mut state = self.0.state.write().unwrap();
301            match &mut state.tunnel_state {
302                TunnelStateImpl::Active(active) => {
303                    (Some((active.default_tunnel.clone(), package)), None)
304                }, 
305                TunnelStateImpl::Connecting(connecting) => {
306                    connecting.packages.push_back((package, plaintext));
307                    (None, match connecting.build_state {
308                        TunnelBuildState::Idle => {
309                            // 创建新的 tunnel builder
310                            let builder = ConnectTunnelBuilder::new(self.0.stack.clone(), self.clone(), build_params);
311                            connecting.build_state = TunnelBuildState::ConnectTunnel(builder.clone());
312                            Some(builder)
313                        }, 
314                        _ => {
315                            // do nothing
316                            None
317                        }
318                    })
319                },
320                TunnelStateImpl::Dead(_) => {
321                    let builder = ConnectTunnelBuilder::new(self.0.stack.clone(), self.clone(), build_params);
322                    state.last_update = bucky_time_now();
323                    let mut packages = LinkedList::new();
324                    packages.push_back((package, plaintext));
325                    state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
326                        waiter: StateWaiter::new(), 
327                        build_state: TunnelBuildState::ConnectTunnel(builder.clone()), 
328                        packages
329                    });
330                    (None, Some(builder))
331                }
332            }
333        };
334
335        if let Some((tunnel, package)) = tunnel_and_package {
336            trace!("{} send packages from {}", self, tunnel.as_ref().as_ref());
337            self.send_package(package, plaintext)
338        } else if let Some(builder) = builder {
339            //FIXME: 加入到connecting的 send 缓存里面去  
340            self.stack().keystore().reset_peer(self.remote());
341            self.stack().device_cache().remove_inner(self.remote());
342            self.sync_connecting();          
343            task::spawn(async move {
344                builder.build().await;
345            });
346            Ok(())
347        } else {
348            Ok(())
349        }
350    }
351
352    pub fn state(&self) -> TunnelState {
353        match &self.0.state.read().unwrap().tunnel_state {
354            TunnelStateImpl::Connecting(_) => TunnelState::Connecting, 
355            TunnelStateImpl::Active(active) => TunnelState::Active(active.remote_timestamp), 
356            TunnelStateImpl::Dead(_) => TunnelState::Dead
357        }
358    }
359
360    pub async fn wait_active(&self) -> TunnelState {
361        let (state, waiter) = {
362            let mut state = self.0.state.write().unwrap();
363            match &mut state.tunnel_state {
364                TunnelStateImpl::Connecting(connecting) => {
365                    (TunnelState::Connecting, Some(connecting.waiter.new_waiter()))
366                },
367                TunnelStateImpl::Active(active) => {
368                    (TunnelState::Active(active.remote_timestamp), None)
369                },
370                TunnelStateImpl::Dead(_) => {
371                    (TunnelState::Dead, None)
372                }
373            }
374        };
375        if let Some(waiter) = waiter {
376            StateWaiter::wait(waiter, | | self.state()).await
377        } else {
378            state
379        }
380    }
381
382    pub fn tunnel_of<T: 'static + Tunnel + Clone>(&self, ep_pair: &EndpointPair) -> Option<T> {
383        let tunnel_impl = &self.0;
384        let entries = &tunnel_impl.state.read().unwrap().tunnel_entries;
385        entries.get(ep_pair).map(|tunnel| tunnel.clone_as_tunnel())
386    }
387
388    pub fn create_tunnel<T: 'static + Tunnel + Clone>(
389        &self, 
390        ep_pair: EndpointPair, 
391        proxy: ProxyType) -> BuckyResult<(T, bool)> {
392        trace!("{} try create tunnel on {}", self, ep_pair);
393        let stack = self.stack();
394        if stack.net_manager().listener().endpoints().get(ep_pair.remote()).is_some() {
395            trace!("{} ignore creat tunnel on {} for remote is self", self, ep_pair);
396            return Err(BuckyError::new(BuckyErrorCode::InvalidInput, "remote is self"));
397        }
398
399        let tunnel_impl = &self.0;
400        let (tunnel, newly_create) = {
401            let entries = &mut tunnel_impl.state.write().unwrap().tunnel_entries;
402            if let Some(tunnel) = entries.get(&ep_pair) {
403                //FIXME: 如果是NAT1的情况,存在在收到AckProxy之前,从ProxyEndpoint上收到通过代理转发过来的RN包,
404                //      此时udp_tunnel会已经存在,并且ProxyType为None;应当考虑在这这修改udp_tunnel的ProxyType,并且触发syn_tunnel_state
405                //      以正确的选择default tunnel
406                trace!("{} create tunnel return existing tunnel", self);
407                (tunnel.clone(), None)
408            } else {
409                let dynamic_tunnel = match ep_pair.protocol() {
410                    Protocol::Udp => {
411                        let stack = Stack::from(&tunnel_impl.stack);
412                        let interface = stack.net_manager().listener().udp_of(ep_pair.local()).ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "udp interface not found"))?.clone();
413                        DynamicTunnel::new(udp::Tunnel::new(
414                            self.clone(), 
415                            self.clone_as_tunnel_owner(),  
416                            interface, 
417                            *ep_pair.remote(), 
418                            proxy))
419                    },
420                    Protocol::Tcp => {
421                        DynamicTunnel::new(tcp::Tunnel::new(self.clone(), ep_pair.clone()))
422                    },
423                    _ => {
424                        unreachable!()
425                    }
426                };
427                let tunnel = dynamic_tunnel.clone();
428                info!("{} tunnel newly created on {} ", self, ep_pair);
429                entries.insert(ep_pair, dynamic_tunnel);
430                (tunnel.clone(), Some(tunnel))
431            }
432        };
433        Ok((tunnel.clone_as_tunnel(), newly_create.is_some()))
434        
435    }
436
437    pub(crate) fn generate_sequence(&self) -> TempSeq {
438        self.0.sequence_generator.generate()
439    }
440
441    fn select_stream_connector_by_exists(
442        remote_timestamp: Timestamp, 
443        tunnel_entries: &BTreeMap<EndpointPair, DynamicTunnel>) -> Option<StreamConnectorSelector> {
444        struct Priority {
445            tcp: Option<tcp::Tunnel>,
446            reverse_tcp: Option<tcp::Tunnel>, 
447            package: bool
448        }
449
450        let p = {
451            let mut priority = Priority {
452                tcp: None, 
453                reverse_tcp: None, 
454                package: false
455            };
456            for (_, tunnel) in tunnel_entries {
457                if let TunnelState::Active(_) = tunnel.as_ref().state() {
458                    if tunnel.as_ref().local().is_tcp() {
459                        let tunnel = tunnel.clone_as_tunnel::<tcp::Tunnel>();
460                        if tunnel.is_reverse() && priority.reverse_tcp.is_none() {
461                            priority.reverse_tcp = Some(tunnel);
462                        } else {
463                            priority.tcp = Some(tunnel);
464                            break;
465                        }
466                    } else {
467                        priority.package = true;
468                    }
469                }
470            }
471
472            priority
473        };
474        
475        if p.tcp.is_some() {
476            let tunnel = p.tcp.unwrap();
477            Some(StreamConnectorSelector::Tcp(tunnel, remote_timestamp))
478        } else if p.reverse_tcp.is_some() {
479            let tunnel = p.reverse_tcp.unwrap();
480            Some(StreamConnectorSelector::Tcp(tunnel, remote_timestamp))
481        } else if p.package {
482            Some(StreamConnectorSelector::Package(remote_timestamp))
483        } else {
484            None
485        }
486    }
487
488    pub(crate) async fn select_stream_connector(
489        &self,
490        build_params: BuildTunnelParams,  
491        stream: StreamContainer) -> BuckyResult<StreamConnectorSelector> {
492        let tunnel_impl = &self.0;
493        let (selector, new_builder, exists_builder, tunnels) = {
494            let mut state = self.0.state.write().unwrap();
495            match &mut state.tunnel_state {
496                TunnelStateImpl::Active(active) => {
497                    let cur_timestamp = active.remote_timestamp;
498                    if let Some(selector) = Self::select_stream_connector_by_exists(
499                            cur_timestamp, 
500                            &state.tunnel_entries) {
501                        (Some(selector), None, None, None)
502                    } else {
503                        error!("{} active but no exists connector", self);
504                        let mut tunnel_entries = BTreeMap::new();
505                        std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
506
507                        let builder = ConnectStreamBuilder::new(
508                            tunnel_impl.stack.clone(), 
509                            build_params, 
510                            stream,
511                            self.clone());
512                        state.last_update = bucky_time_now();
513                        state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
514                            waiter: StateWaiter::new(), 
515                            build_state: TunnelBuildState::ConnectStream(builder.clone()), 
516                            packages: LinkedList::new()
517                        });
518                        let tunnels: Vec<DynamicTunnel> = tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect();
519                        (None, Some(builder), None, Some(tunnels))
520                    }
521                }, 
522                TunnelStateImpl::Connecting(connecting) => {
523                    match &mut connecting.build_state {
524                        TunnelBuildState::Idle => {
525                            let builder = ConnectStreamBuilder::new(
526                                tunnel_impl.stack.clone(), 
527                                build_params, 
528                                stream, 
529                                self.clone());
530                            connecting.build_state = TunnelBuildState::ConnectStream(builder.clone());
531                            (None, Some(builder), None, None)
532                        }, 
533                        TunnelBuildState::ConnectStream(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None), 
534                        TunnelBuildState::AcceptStream(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None),
535                        TunnelBuildState::ConnectTunnel(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None), 
536                        TunnelBuildState::AcceptTunnel(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None)
537                    }
538                },
539                TunnelStateImpl::Dead(_) => {
540                    let builder = ConnectStreamBuilder::new(
541                        tunnel_impl.stack.clone(), 
542                        build_params, 
543                        stream,
544                        self.clone());
545
546                    state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
547                        waiter: StateWaiter::new(), 
548                        build_state: TunnelBuildState::ConnectStream(builder.clone()), 
549                        packages: LinkedList::new()
550                    });
551
552                    (None, Some(builder), None, None)
553                }
554            }
555        };
556        if let Some(tunnels) = tunnels {
557            for tunnel in tunnels {
558                tunnel.as_ref().reset();
559            }
560        }
561        if let Some(selector) = selector {
562            Ok(selector)
563        } else if let Some(builder) = exists_builder {
564            // 如果buidler失败了,都返回错误
565            builder.wait_establish().await?;
566            let state = self.0.state.read().unwrap();
567            match &state.tunnel_state {
568                TunnelStateImpl::Active(active) => {
569                    Self::select_stream_connector_by_exists(active.remote_timestamp, &state.tunnel_entries)
570                        .ok_or_else(|| {
571                            error!("{} active but no exists connector", self);
572                            BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead")
573                        })
574                }, 
575                _ => {
576                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
577                }
578            }
579        } else if let Some(builder) = new_builder {
580            self.stack().keystore().reset_peer(self.remote());
581            self.stack().device_cache().remove_inner(self.remote());
582            self.sync_connecting();
583            Ok(StreamConnectorSelector::Builder(builder))
584        } else {
585            unreachable!()
586        } 
587    } 
588
589    pub(crate) fn payload_size(&self) -> usize {
590        1024 // FIXME:先写固定值,一般没连通时需要先ExchangeKey/Syn...,负载会比较小,连通时负载会比较大
591    }
592
593    pub fn reset(&self) {
594        let (tunnels, waiter) = {
595            let mut state = self.0.state.write().unwrap();
596            let (waiter, updated) = match &mut state.tunnel_state {
597                TunnelStateImpl::Connecting(connecting) => {
598                    let mut waiter = StateWaiter::new();
599                    connecting.waiter.transfer_into(&mut waiter);
600                    state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
601                        former_state: TunnelState::Connecting, 
602                        when: bucky_time_now()
603                    });
604                    (Some(waiter), true)
605                }, 
606                TunnelStateImpl::Active(active) => {
607                    state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
608                        former_state: TunnelState::Active(active.remote_timestamp), 
609                        when: bucky_time_now()
610                    });
611                    (None, true)
612                }, 
613                TunnelStateImpl::Dead(_) => {
614                    (None, false)
615                }
616            };
617            if updated {
618                state.last_update = bucky_time_now();
619            }
620            let mut tunnel_entries = BTreeMap::new();
621            std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
622            let tunnels: Vec<DynamicTunnel> = tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect();
623            (tunnels, waiter)
624        };
625        for tunnel in tunnels {
626            tunnel.as_ref().reset();
627        }
628        if let Some(waiter) = waiter {
629            waiter.wake();
630        }
631    }
632
633    pub(crate) fn mark_dead(&self, active_timestamp: Timestamp, last_update: Timestamp) -> BuckyResult<()> {
634        info!("{} mark dead with active timestamp {} last_update {}", self, active_timestamp, last_update);
635        let tunnels: Vec<DynamicTunnel> = {
636            let mut state = self.0.state.write().unwrap();
637            if state.last_update > last_update {
638                info!("{} ignore mark dead for updated", self);
639                Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel has updated"))
640            } else {
641                match &mut state.tunnel_state {
642                    TunnelStateImpl::Connecting(_) => {
643                        Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's connecting"))
644                    }, 
645                    TunnelStateImpl::Active(active) => {
646                        if active.default_tunnel.as_ref().local().is_tcp() {
647                            info!("{} ignore mark dead for tcp default", self);
648                            Err(BuckyError::new(BuckyErrorCode::ErrorState, "default tcp tunnel"))
649                        } else {
650                            let cur_timestamp = active.remote_timestamp;
651                            if cur_timestamp == active_timestamp {
652                                info!("{} Active({})=>Dead", self, cur_timestamp);
653                                state.last_update = bucky_time_now();
654                                state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
655                                    former_state: TunnelState::Active(cur_timestamp), 
656                                    when: bucky_time_now()
657                                });
658                                let mut tunnel_entries = BTreeMap::new();
659                                std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
660                                Ok(tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect())
661                            } else {
662                                Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's active"))
663                            }
664                        }
665                    }, 
666                    TunnelStateImpl::Dead(_) => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
667                }
668            }
669        }?;
670        for tunnel in tunnels {
671            tunnel.as_ref().reset();
672        }
673        Ok(())
674    }
675
676    pub(super) fn on_raw_data(&self, data: &[u8], tunnel: DynamicTunnel) -> BuckyResult<()> {
677        let tunnel_impl = &self.0;
678        let (cmd_code, buf) = u8::raw_decode(data)?;
679        let cmd_code = PackageCmdCode::try_from(cmd_code)?;
680        match cmd_code {
681            PackageCmdCode::Datagram => {
682                let (pkg, _) = Datagram::raw_decode_with_context(buf, &mut merge_context::OtherDecode::default())?;
683                let _ = Stack::from(&tunnel_impl.stack).datagram_manager().on_package(&pkg, (self, true));
684                Ok(())
685            },
686            PackageCmdCode::SessionData => unimplemented!(), 
687            _ => {
688                Stack::from(&tunnel_impl.stack).ndn().channel_manager().on_raw_data(data, (self, tunnel))
689            }, 
690        }
691    }
692
693
694    
695    // 注意R端的打洞包要用SynTunnel不能用AckTunnel
696    // 因为可能出现如下时序:L端收到R端的打洞包,停止继续发送打洞包;但是R端没有收到L端的打洞包,继续发送打洞包;
697    //   如果R端发的是AckTunnel,L端收到之后不会回复;如果L端改成对AckTunnel回复SynTunnel/AckTunnel都不合适,会导致循环回复
698    //   R端发SynTunnel的话,L端收到之后可以回复复AckTunnel 
699    // pub(super) fn syn_tunnel_package(&self, syn_tunnel: &SynTunnel, local: Device) -> SynTunnel {
700    //     SynTunnel {
701    //         protocol_version: self.protocol_version(), 
702    //         stack_version: self.stack_version(), 
703    //         from_device_id: local.desc().device_id(),
704    //         to_device_id: syn_tunnel.from_device_id.clone(),
705    //         sequence: syn_tunnel.sequence,
706    //         from_device_desc: local,
707    //         send_time: 0
708    //     }
709    // }
710
711    // pub(super) fn ack_tunnel_package(&self, syn_tunnel: &SynTunnel, local: Device) -> AckTunnel {
712    //     AckTunnel {
713    //         protocol_version: self.protocol_version(), 
714    //         stack_version: self.stack_version(), 
715    //         sequence: syn_tunnel.sequence,
716    //         result: 0,
717    //         send_time: 0,
718    //         mtu: c::MTU as u16,
719    //         to_device_desc: local       
720    //     }
721    // }
722}
723
724impl fmt::Display for TunnelContainer {
725    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
726        write!(f, "TunnelContainer{{local:{}, remote:{}}}",
727            Stack::from(&self.0.stack).local_device_id(), self.remote())
728    }
729}
730
731
732impl TunnelOwner for TunnelContainer {
733    fn sync_tunnel_state(&self, tunnel: &DynamicTunnel, former_state: TunnelState, new_state: TunnelState) {
734        //TODO: 这里的策略可以调整
735        struct NextStep {
736            old_default: Option<DynamicTunnel>, 
737            new_default: Option<DynamicTunnel>, 
738            reset_tunnels: LinkedList<DynamicTunnel>, 
739            waiters: StateWaiter, 
740            packages: LinkedList<(DynamicPackage, bool)>
741        }
742
743        let mut next_step = NextStep {
744            old_default: None, 
745            new_default: None, 
746            reset_tunnels: LinkedList::new(), 
747            waiters: StateWaiter::new(), 
748            packages: LinkedList::new()
749        };
750        match new_state {
751            TunnelState::Connecting => {
752                unreachable!()
753            }, 
754            TunnelState::Active(remote_timestamp) => {
755                let mut state = self.0.state.write().unwrap();
756                // 先从entries里面移除
757                let entries = &mut state.tunnel_entries;
758                let ep_pair = EndpointPair::from((*tunnel.as_ref().local(), *tunnel.as_ref().remote()));
759                let exists = {
760                    if let Some(stub) = entries.get(&ep_pair) {
761                        stub.as_ref().ptr_eq(tunnel) 
762                    } else {
763                        false
764                    }
765                };
766                let mut to_reset = vec![];
767
768                if exists {
769                    for (remote, tunnel) in &state.tunnel_entries {
770                        if let TunnelState::Active(active_timestamp) = tunnel.as_ref().state() {
771                            if active_timestamp < remote_timestamp {
772                                to_reset.push(remote.clone());
773                            } 
774                        }
775                    } 
776                    for remote in to_reset {
777                        next_step.reset_tunnels.push_back(state.tunnel_entries.remove(&remote).unwrap());
778                    }
779                    
780                    let updated = match &mut state.tunnel_state {
781                        TunnelStateImpl::Active(active) => {
782                            // 如果当前激活的tunnel 属于更新的对端Endpoints
783                            let remote_updated = active.remote_timestamp < remote_timestamp;
784                            let change_default = remote_updated || {
785                                if ProxyType::None != active.default_tunnel.as_ref().proxy() {
786                                    // 非代理优先
787                                    ProxyType::None == tunnel.as_ref().proxy()
788                                } else {
789                                    // 单纯的 udp 优先
790                                    tunnel.as_ref().local().is_udp() && active.default_tunnel.as_ref().local().is_tcp()
791                                    // 主动tcp 优先
792                                    // TODO: 简单的主动tcp 优先策略;因为存在如下时序, LN 只有tcp ep 没有udp ep;RN 有tcp ep 和 udp ep; LN 和 RN 在同内网,LN向RN发起stream connect,
793                                    //          在特定时序下, LN 和 RN 都会选择被动tcp 路径作为 default tunnel; RN向LN 发起stream connect,反连tcp 流程第一步向 SN call LN, 
794                                    //          但是LN没有udp ep,返回错误,进入错误的tunnel dead状态;
795                                    //       但是在其他情况, 这个策略可能导致同内网的peer对之间保持一条冗余的tcp 长连接;
796                                        || tunnel.as_ref().local().is_tcp() && tunnel.as_ref().remote().addr().port() != 0 && active.default_tunnel.as_ref().remote().addr().port() == 0
797                                }
798                            };
799                            if change_default {
800                                info!("{} change default from {} to {}", self, active.default_tunnel.as_ref().as_ref(), tunnel.as_ref().as_ref());
801                                next_step.old_default = Some(active.default_tunnel.clone());
802                                active.remote_timestamp = remote_timestamp;
803                                active.default_tunnel = tunnel.clone();
804                                next_step.new_default = Some(tunnel.clone());
805                            } 
806                            change_default
807                        }, 
808                        TunnelStateImpl::Connecting(connecting) => {
809                            info!("{} connecting=>active with default {}", self, tunnel.as_ref().as_ref());
810                            connecting.waiter.transfer_into(&mut next_step.waiters);
811
812                            mem::swap(&mut next_step.packages, &mut connecting.packages);
813
814                            state.tunnel_state = TunnelStateImpl::Active(TunnelActiveState {
815                                default_tunnel: tunnel.clone(), 
816                                remote_timestamp: remote_timestamp
817                            });
818
819                            next_step.new_default = Some(tunnel.clone());
820
821                            true
822                        },
823                        TunnelStateImpl::Dead(_) => {
824                            info!("{} dead=>active with default {}", self, tunnel.as_ref().as_ref());
825                            state.tunnel_state = TunnelStateImpl::Active(TunnelActiveState {
826                                default_tunnel: tunnel.clone(), 
827                                remote_timestamp: remote_timestamp
828                            });
829                            next_step.new_default = Some(tunnel.clone());
830                            true
831                        }
832                    };
833                    if updated {
834                        state.last_update = bucky_time_now();
835                    }
836                } else {
837                    warn!("{} reset tunnel {} for not in ep map", self, tunnel.as_ref().as_ref());
838                    next_step.reset_tunnels.push_back(tunnel.clone());
839                }
840            }, 
841            TunnelState::Dead => {
842                let state = &mut *self.0.state.write().unwrap();
843                // 先从entries里面移除
844                let entries = &mut state.tunnel_entries;
845                let ep_pair = EndpointPair::from((*tunnel.as_ref().local(), *tunnel.as_ref().remote()));
846                let exists = {
847                    if let Some(stub) = entries.get(&ep_pair) {
848                        if stub.as_ref().ptr_eq(tunnel) {
849                            info!("{} remove tunnel {}", self, tunnel.as_ref().as_ref());
850                            entries.remove(&ep_pair);
851                            true
852                        } else {
853                            false
854                        }
855                    } else {
856                        false
857                    }
858                };
859                
860                if exists {
861                    if let TunnelState::Active(remote_timestamp) = former_state {
862                        match &state.tunnel_state {
863                            TunnelStateImpl::Active(active) => {
864                                if active.remote_timestamp == remote_timestamp {
865                                    let default_tunnel = active.default_tunnel.clone();
866                                    info!("{} active=>dead for tunnel {} dead", self, tunnel.as_ref().as_ref());
867                                    for (_, tunnel) in &state.tunnel_entries {
868                                        next_step.reset_tunnels.push_back(tunnel.clone());
869                                    }
870                                    state.tunnel_entries.clear();
871                                    state.last_update = bucky_time_now();
872                                    state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
873                                        former_state: TunnelState::Active(active.remote_timestamp), 
874                                        when: bucky_time_now()
875                                    });
876                                    next_step.old_default = Some(default_tunnel);
877                                }
878                            }, 
879                            _ => {
880                                // do nothing
881                            }
882                        }
883                    } 
884                } 
885            }
886        };
887        next_step.waiters.wake();
888        if let Some(old) = next_step.old_default {
889            old.as_ref().release_keeper();
890        }
891        if let Some(new) = next_step.new_default {
892            new.as_ref().retain_keeper();
893        }
894
895        for tunnel in next_step.reset_tunnels {
896            tunnel.as_ref().reset();
897        }
898
899        for (package, plaintext) in next_step.packages {
900            let _ = self.send_package(package, plaintext);
901        }
902    }
903
904    fn clone_as_tunnel_owner(&self) -> Box<dyn TunnelOwner> {
905        Box::new(self.clone())
906    }
907}
908
909impl OnUdpPackageBox for TunnelContainer {
910    fn on_udp_package_box(&self, udp_box: interface::udp::UdpPackageBox) -> Result<(), BuckyError> {
911        // 先创建 udp tunnel
912        let ep_pair = EndpointPair::from((udp_box.local().local(), *udp_box.remote()));
913        let udp_tunnel = match self.tunnel_of::<udp::Tunnel>(&ep_pair) {
914            Some(tunnel) => {
915                Ok(tunnel)
916            }, 
917            None => self.create_tunnel::<udp::Tunnel>(ep_pair, ProxyType::None).map(|(t, _)| t)
918        }?;
919        // 为了udp 和 tcp tunnel的package 流向一致,直接把box转给udp tunnel,
920        // 需要一致处理的package从udp/tcp tunnel回调container的 OnPackage
921        udp_tunnel.on_udp_package_box(udp_box)
922    }
923}
924
925impl OnUdpRawData<(interface::udp::Interface, DeviceId, MixAesKey, Endpoint)> for TunnelContainer {
926    fn on_udp_raw_data(&self, data: &[u8], context: (interface::udp::Interface, DeviceId, MixAesKey, Endpoint)) -> Result<(), BuckyError> {
927        // // 先创建 udp tunnel
928        let (interface, _, key, remote) = context;
929        let ep_pair = EndpointPair::from((interface.local(), remote));
930        let udp_tunnel = match self.tunnel_of::<udp::Tunnel>(&ep_pair) {
931            Some(tunnel) => {
932                Ok(tunnel)
933            }, 
934            None => self.create_tunnel::<udp::Tunnel>(ep_pair, ProxyType::None).map(|(t, _)| t)
935        }?;
936        // 为了udp 和 tcp tunnel的package 流向一致,直接把box转给udp tunnel,
937        // 需要一致处理的package从udp/tcp tunnel回调container的 OnPackage
938        let _ = udp_tunnel.active(&key, false, None);
939        self.on_raw_data(data, DynamicTunnel::new(udp_tunnel))
940    }
941}
942
943impl OnTcpInterface for TunnelContainer {
944    fn on_tcp_interface(&self, interface: interface::tcp::AcceptInterface, first_box: PackageBox) -> Result<OnPackageResult, BuckyError> {
945        // 创建tcp tunnel
946        let ep_pair = EndpointPair::from((*interface.local(), Endpoint::default_tcp(interface.local())));
947        let tcp_tunnel = match self.tunnel_of::<tcp::Tunnel>(&ep_pair) {
948            Some(tunnel) => {
949                Ok(tunnel)
950            }, 
951            None => self.create_tunnel::<tcp::Tunnel>(ep_pair, ProxyType::None).map(|(t, _)| t)
952        }?;
953        // 为了udp 和 tcp tunnel的package 流向一致,直接把box转给tcp tunnel,
954        // 需要一致处理的package从udp/tcp tunnel回调container的 OnPackage
955        tcp_tunnel.on_tcp_interface(interface, first_box)
956    }
957}
958
959impl PingClientCalledEvent<PackageBox> for TunnelContainer {
960    fn on_called(&self, called: &SnCalled, caller_box: PackageBox) -> Result<(), BuckyError> {
961        let syn_tunnel: &SynTunnel = caller_box.packages_no_exchange()[0].as_ref();
962        let remote_timestamp = syn_tunnel.from_device_desc.body().as_ref().unwrap().update_time();
963        let _ = self.on_package(syn_tunnel, None)?;
964        if let Some(second_pkg) = caller_box.packages_no_exchange().get(1) {
965            match second_pkg.cmd_code() {
966                PackageCmdCode::SessionData => {
967                    let session_data: &SessionData = second_pkg.as_ref();
968                    if !session_data.is_syn() {
969                        debug!("{} ignore sn called for sesion data not syn", self);
970                        return Err(BuckyError::new(BuckyErrorCode::InvalidInput, "session data in sn called should has has syn flag"));
971                    }
972                    let _ = self.on_package(session_data, None)?;
973                    let remote_seq = RemoteSequence::from((self.remote().clone(), session_data.syn_info.as_ref().unwrap().sequence));
974                    let stream = Stack::from(&self.0.stack).stream_manager().stream_of_remote_sequence(&remote_seq);
975                    if stream.is_none() {
976                        debug!("{} ignore accept stream builder for stream of {} no more connecting", self, remote_seq);
977                        return Ok(());
978                    }
979                    let stream = stream.unwrap();
980                    let acceptor = stream.acceptor();
981                    if acceptor.is_none() {
982                        debug!("{} ignore accept stream builder for stream of {} no more connecting", self, remote_seq);
983                        return Ok(());
984                    }
985                    let acceptor = acceptor.unwrap();
986                    //把builder提升到tunnel container,并且开始build
987                    match {
988                        let mut state = self.0.state.write().unwrap();
989                        match &mut state.tunnel_state {
990                            TunnelStateImpl::Connecting(connecting) => {
991                                match &mut connecting.build_state {
992                                    TunnelBuildState::Idle => {
993                                        connecting.build_state = TunnelBuildState::AcceptStream(acceptor.clone()); 
994                                        Ok((vec![], None))
995                                    },
996                                    TunnelBuildState::ConnectStream(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))), 
997                                    TunnelBuildState::ConnectTunnel(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))), 
998                                    _ => Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "another builder exists"))
999                                        // FIXME: how to ?
1000                                        // another builder exists
1001                                        // 1. if existing builder is passive, 
1002                                        //	means that 2 or more sn called package got from sn server before existing builder finish
1003                                        //		1.1	2nd sn called package has same sequence with 1st, usualy local peer send sn call package to more than one sn server, 
1004                                        //			or local peer retry building for same connection serveral times, 
1005                                        //			in this case, ignore this sn called package totally no problem
1006                                        //		1.2	2nd sn called package has different sequence with 1st, means 1st sn called package sent by connecting 1st connection, but failed,
1007                                        //			and then 2nd connection connect called caused another builder building for this connection; 
1008                                        //			or maybe local process exits before building finish and no history log written, 
1009                                        //			in this case, should cover builder with leatest sequence builder? 
1010                                        //			or wait existing builder finish? tcp stream can't got establish for local peer's connection instance doesn't exits, 
1011                                        //			that may cause a long lived mistake that no tcp tunnel can establish but acturely can; 
1012                                        //			we have to make sure reply all first ack tcp connection package even connecting connection doesn't exist to avoid this
1013                                        // 2. if existing builder is active,
1014                                        //		means that local and remote peer create active builder at just same time for connect connection or send package on tunnel, 
1015                                        //		in this case, we can ignore 2nd builder caused by getting sn called package, the reason is:
1016                                        //		2.1 if only udp tunnel exists between peers, both sides' builder will finish, 
1017                                        //			hole punched because both builders are sending syn tunnel package, and then session data reaches
1018                                        //		2.2 if tcp tunnel exits between peers, tcp interface can establish from at least one side, 
1019                                        //			connection can establish without builder, the other side's builder may fail, 
1020                                        //			but connection will use correct reverse stream connector in next retry
1021                                }
1022                            }, 
1023                            TunnelStateImpl::Active(active) => {
1024                                if active.remote_timestamp < remote_timestamp {
1025                                    state.last_update = bucky_time_now();
1026                                    state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1027                                        waiter: StateWaiter::new(), 
1028                                        build_state: TunnelBuildState::AcceptStream(acceptor.clone()), 
1029                                        packages: LinkedList::new()
1030                                    });
1031                                    let mut tunnel_entries = BTreeMap::new();
1032                                    std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
1033                                    Ok((tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect(), None))
1034                                } else {
1035                                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's active"))
1036                                }
1037                            }, 
1038                            TunnelStateImpl::Dead(_) => {
1039                                state.last_update = bucky_time_now();
1040                                state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1041                                    waiter: StateWaiter::new(), 
1042                                    build_state: TunnelBuildState::AcceptStream(acceptor.clone()), 
1043                                    packages: LinkedList::new()
1044                                });
1045                                Ok((vec![], None))
1046                            }   
1047                        }
1048                    } {
1049                        Err(err) => {
1050                            info!("{} ignore accept stream builder {} for {}", self, acceptor, err);
1051                        }, 
1052                        Ok((tunnels, builder)) => {
1053                            for tunnel in tunnels {
1054                                tunnel.as_ref().reset();
1055                            }
1056                            
1057                            if let Some(builder) = builder {
1058                                let _ = builder.on_called(called, ());
1059                            } else {
1060                                self.sync_connecting();
1061                                // 开始被动端的build
1062                                let _ = acceptor.on_called(called, caller_box);
1063                            }
1064                        }
1065                    }
1066                }, 
1067                _ => {
1068                    //TODO: 支持在sn call带第一个 tunnel 包
1069                    unreachable!()
1070                }
1071            }
1072        } else if called.reverse_endpoint_array.len() > 0 {
1073            info!("{} called for reverse connect tunnel", self);
1074            // let stack = self.stack();
1075            // let net_listener = stack.net_manager().listener();
1076            // for local in net_listener.ip_set() {
1077                for remote in &called.reverse_endpoint_array {
1078                    let ep_pair = EndpointPair::from((Endpoint::default_tcp(remote), *remote));
1079                    let tunnel = self.create_tunnel::<tcp::Tunnel>(ep_pair, ProxyType::None);
1080                    if let Ok((tunnel, _)) = tunnel {
1081                        let _ = tunnel.connect();
1082                    }
1083                }
1084            // }
1085        } else {
1086            let acceptor = AcceptTunnelBuilder::new(self.0.stack.clone(), self.clone(), syn_tunnel.sequence);
1087            match {
1088                let mut state = self.0.state.write().unwrap();
1089                match &mut state.tunnel_state {
1090                    TunnelStateImpl::Connecting(connecting) => {
1091                        match &mut connecting.build_state {
1092                            TunnelBuildState::Idle => {
1093                                connecting.build_state = TunnelBuildState::AcceptTunnel(acceptor.clone());
1094                                Ok((vec![], None))
1095                            },
1096                            TunnelBuildState::ConnectStream(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))), 
1097                            TunnelBuildState::ConnectTunnel(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))), 
1098                            _ => Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "another builder exists"))
1099                        }
1100                    }, 
1101                    TunnelStateImpl::Active(active) => {
1102                        if active.remote_timestamp < remote_timestamp {
1103                            state.last_update = bucky_time_now();
1104                            state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1105                                waiter: StateWaiter::new(), 
1106                                build_state: TunnelBuildState::AcceptTunnel(acceptor.clone()), 
1107                                packages: LinkedList::new()
1108                            });
1109                            let mut tunnel_entries = BTreeMap::new();
1110                            std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
1111                            Ok((tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect(), None))
1112                        } else {
1113                            Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's active"))
1114                        }
1115                    }, 
1116                    TunnelStateImpl::Dead(_) => {
1117                        state.last_update = bucky_time_now();
1118                        state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1119                            waiter: StateWaiter::new(), 
1120                            build_state: TunnelBuildState::AcceptTunnel(acceptor.clone()), 
1121                            packages: LinkedList::new()
1122                        });
1123                        Ok((vec![], None))
1124                    }   
1125                }
1126            } {
1127                Ok((tunnels, builder)) => {
1128                    for tunnel in tunnels {
1129                        tunnel.as_ref().reset();
1130                    }
1131                    if let Some(builder) = builder {
1132                        let _ = builder.on_called(called, ());
1133                    } else {
1134                        let active_pn_list = called.active_pn_list.clone();
1135                        self.sync_connecting();
1136                        // 开始被动端的build
1137                        task::spawn(async move {
1138                            let _ = acceptor.build(caller_box, active_pn_list).await;
1139                        });
1140                    }
1141                }, 
1142                Err(err) => {
1143                    debug!("{} ignore accept tunnel builder {} for {}", self, acceptor, err);
1144                }
1145            }
1146        }
1147        Ok(())
1148    }
1149}
1150
1151impl OnPackage<SynTunnel> for TunnelContainer {
1152    fn on_package(&self, pkg: &SynTunnel, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1153        // 缓存syn tunnel里面的 desc
1154        Stack::from(&self.0.stack).device_cache().add(&pkg.from_device_desc.desc().device_id(), &pkg.from_device_desc);
1155        Ok(OnPackageResult::Handled)
1156    }
1157}
1158
1159impl OnPackage<AckTunnel> for TunnelContainer {
1160    fn on_package(&self, pkg: &AckTunnel, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1161        // 缓存ack tunnel里面的 desc
1162        Stack::from(&self.0.stack).device_cache().add(self.remote(), &pkg.to_device_desc);
1163        Ok(OnPackageResult::Handled)
1164    }
1165}
1166
1167impl OnPackage<TcpSynConnection, interface::tcp::AcceptInterface> for TunnelContainer {
1168    fn on_package(&self, pkg: &TcpSynConnection, interface: interface::tcp::AcceptInterface) -> Result<OnPackageResult, BuckyError> {
1169        // 缓存ack tunnel里面的 desc
1170        let tunnel_impl = &self.0;
1171        Stack::from(&tunnel_impl.stack).device_cache().add(&self.remote(), &pkg.from_device_desc);
1172        // 丢给 stream manager
1173        Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, (self, interface))
1174            .map_err(|err| {
1175                debug!("{} handle package {} error {}", self, pkg, err);
1176                err
1177            })
1178    }
1179}
1180
1181impl OnPackage<TcpAckConnection, interface::tcp::AcceptInterface> for TunnelContainer {
1182    fn on_package(&self, pkg: &TcpAckConnection, interface: interface::tcp::AcceptInterface) -> Result<OnPackageResult, BuckyError> {
1183        // 缓存ack tunnel里面的 desc
1184        let tunnel_impl = &self.0;
1185        Stack::from(&tunnel_impl.stack).device_cache().add(&self.remote(), &pkg.to_device_desc);
1186        // 丢给 stream manager
1187        Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, (self, interface))
1188            .map_err(|err| {
1189                debug!("{} handle package {} error {}", self, pkg, err);
1190                err
1191            })
1192    }
1193}
1194
1195impl OnPackage<Datagram> for TunnelContainer {
1196    fn on_package(&self, pkg: &Datagram, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1197        Stack::from(&self.0.stack).datagram_manager().on_package(pkg, (self, false))
1198            .map_err(|err| {
1199                debug!("{} handle package {} error {}", self, pkg, err);
1200                err
1201            })
1202    }
1203}
1204
1205impl OnPackage<SessionData> for TunnelContainer {
1206    fn on_package(&self, pkg: &SessionData, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1207        let tunnel_impl = &self.0;
1208        //丢给 stream manager
1209        Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, self)
1210            .map_err(|err| {
1211                debug!("{} handle package {} error {}", self, pkg, err);
1212                err
1213            })
1214    }
1215}
1216
1217impl OnPackage<TcpSynConnection> for TunnelContainer {
1218    fn on_package(&self, pkg: &TcpSynConnection, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1219        let tunnel_impl = &self.0;
1220        //丢给 stream manager
1221        Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, self)
1222            .map_err(|err| {
1223                debug!("{} handle package {} error {}", self, pkg, err);
1224                err
1225            })
1226    }
1227}
1228
1229impl OnPackage<AckProxy, &DeviceId> for TunnelContainer {
1230    fn on_package(&self, pkg: &AckProxy, proxy: &DeviceId) -> Result<OnPackageResult, BuckyError> {
1231        let tunnel_impl = &self.0;
1232        let builder = if let TunnelStateImpl::Connecting(connecting) = &tunnel_impl.state.read().unwrap().tunnel_state {
1233            match &connecting.build_state {
1234                TunnelBuildState::ConnectStream(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>), 
1235                TunnelBuildState::ConnectTunnel(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>), 
1236                TunnelBuildState::AcceptStream(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>), 
1237                TunnelBuildState::AcceptTunnel(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>), 
1238                TunnelBuildState::Idle => None
1239            }.ok_or_else(|| BuckyError::new(BuckyErrorCode::ErrorState, "no builder"))
1240        } else {
1241            Err(BuckyError::new(BuckyErrorCode::ErrorState, "not connecting"))
1242        }.map_err(|err| {
1243            debug!("{} ignore ack proxy from {} for {}", self, proxy, err);
1244            err
1245        })?;
1246        builder.on_package(pkg, proxy)
1247    }
1248}
1249
1250struct ContainerRef(TunnelContainer);
1251
1252#[derive(Clone)]
1253pub struct TunnelGuard(Arc<ContainerRef>);
1254
1255#[derive(Clone)]
1256pub struct WeakTunnelGuard(Weak<ContainerRef>);
1257
1258impl WeakTunnelGuard {
1259    pub fn to_strong(&self) -> Option<TunnelGuard> {
1260        self.0.upgrade().map(|s| TunnelGuard(s))
1261    }
1262}
1263
1264impl TunnelGuard {
1265    pub(super) fn new(tunnel: TunnelContainer) -> Self {
1266        Self(Arc::new(ContainerRef(tunnel)))
1267    }
1268
1269    pub fn to_weak(&self) -> WeakTunnelGuard {
1270        WeakTunnelGuard(Arc::downgrade(&self.0))
1271    }
1272
1273    pub fn ref_count(&self) -> usize {
1274        Arc::strong_count(&self.0)
1275    }
1276}
1277
1278impl Drop for ContainerRef {
1279    fn drop(&mut self) {
1280        self.0.reset();
1281    }
1282}
1283
1284impl Deref for TunnelGuard {
1285    type Target = TunnelContainer;
1286    fn deref(&self) -> &TunnelContainer {
1287        &self.0.0
1288    }
1289}
1290
1291impl AsRef<TunnelContainer> for TunnelGuard {
1292    fn as_ref(&self) -> &TunnelContainer {
1293        &self.0.0
1294    }
1295}