cyfs_bdt/tunnel/builder/
accept_tunnel.rs

1use log::*;
2use std::{
3    sync::RwLock
4};
5use async_std::{sync::{Arc}, task};
6use async_trait::{async_trait};
7use cyfs_base::*;
8use crate::{
9    types::*, 
10    protocol::{*, v0::*},
11    tunnel::{TunnelState, ProxyType, TunnelContainer}, 
12    stack::{Stack, WeakStack}
13};
14use super::{
15    action::*, 
16    builder::*, 
17    proxy::*
18};
19
20
21struct ConnectingState {
22    waiter: StateWaiter, 
23    proxy: Option<ProxyBuilder>
24}
25
26enum AcceptTunnelBuilderState {
27    Connecting(ConnectingState), 
28    Establish, 
29    Closed
30}
31
32struct AcceptTunnelBuilderImpl {
33    stack: WeakStack, 
34    tunnel: TunnelContainer, 
35    sequence: TempSeq, 
36    state: RwLock<AcceptTunnelBuilderState>
37}
38
39#[derive(Clone)]
40pub struct AcceptTunnelBuilder(Arc<AcceptTunnelBuilderImpl>);
41
42impl std::fmt::Display for AcceptTunnelBuilder {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "AcceptTunnelBuilder{{tunnel:{}}}", self.0.tunnel)
45    }
46}
47
48impl AcceptTunnelBuilder {
49    pub fn new(stack: WeakStack, tunnel: TunnelContainer, sequence: TempSeq) -> Self {
50        Self(Arc::new(AcceptTunnelBuilderImpl {
51            stack, 
52            tunnel, 
53            sequence, 
54            state: RwLock::new(AcceptTunnelBuilderState::Connecting(ConnectingState {
55                waiter: StateWaiter::new(), 
56                proxy: None
57            }))
58        }))
59    }
60
61    pub async fn build(&self, caller_box: PackageBox, active_pn_list: Vec<DeviceId>) -> Result<(), BuckyError> {
62        info!("{} build", self);
63        self.sync_tunnel_state();
64        {
65            let stack = Stack::from(&self.0.stack);
66            let local = stack.sn_client().ping().default_local();
67            let syn_tunnel: &SynTunnel = caller_box.packages_no_exchange()[0].as_ref();           
68            // first box 包含 ack tunnel 和 session data
69            let tunnel = &self.0.tunnel;
70            let ack_tunnel = SynTunnel {
71                protocol_version: tunnel.protocol_version(), 
72                stack_version: tunnel.stack_version(), 
73                to_device_id: syn_tunnel.from_device_desc.desc().device_id(),
74                sequence: syn_tunnel.sequence,
75                from_device_desc: local,
76                send_time: 0
77            };
78            let mut first_box = PackageBox::encrypt_box(caller_box.remote().clone(), caller_box.key().clone());
79            first_box.append(vec![DynamicPackage::from(ack_tunnel)]);
80            let first_box = Arc::new(first_box);
81
82            info!("{} build with key {}", self, first_box.key());
83            let _ = self.explore_endpoint_pair(&syn_tunnel.from_device_desc, first_box.clone(), |_| true);
84
85            if let Some(proxy_builder) = {
86                let state = &mut *self.0.state.write().unwrap();
87                match state {
88                    AcceptTunnelBuilderState::Connecting(connecting) => {
89                        if connecting.proxy.is_none() {
90                            connecting.proxy = Some(ProxyBuilder::new(
91                                self.0.tunnel.clone(), 
92                                syn_tunnel.from_device_desc.get_obj_update_time(),  
93                                first_box.clone()));
94                            debug!("{} create proxy buidler", self);
95                        }
96                        connecting.proxy.clone()
97                    },
98                    _ => None
99                }
100            } {
101                for proxy in active_pn_list {
102                    let _ = proxy_builder.syn_proxy(ProxyType::Active(proxy)).await;
103                }
104                for proxy in stack.proxy_manager().passive_proxies() {
105                    let _ = proxy_builder.syn_proxy(ProxyType::Passive(proxy)).await;
106                }
107            } else {
108                debug!("{} ignore proxy build for not connecting", self);
109            }
110
111            Ok(())
112        }.map_err(|e| {info!("{} ingnore build for {}", self, e);e})
113    }
114
115    fn sync_tunnel_state(&self) {
116        let builder = self.clone();
117        task::spawn(async move {
118            let tunnel_state = builder.0.tunnel.wait_active().await;
119            let waiter = match tunnel_state {
120                TunnelState::Active(_) => {
121                    let state = &mut *builder.0.state.write().unwrap();
122                    match state {
123                        AcceptTunnelBuilderState::Connecting(connecting) => {
124                            info!("{} connecting=>establish", builder);
125                            let mut ret_waiter = StateWaiter::new();
126                            connecting.waiter.transfer_into(&mut ret_waiter);
127                            *state = AcceptTunnelBuilderState::Establish;
128                            Some(ret_waiter)
129                        }, 
130                        AcceptTunnelBuilderState::Closed => {
131                            //存在closed之后tunnel联通的情况,忽略
132                            None
133                        }, 
134                        AcceptTunnelBuilderState::Establish => {
135                            unreachable!()
136                        }
137                    }
138                }, 
139                TunnelState::Dead => {
140                    let state = &mut *builder.0.state.write().unwrap();
141                    match state {
142                        AcceptTunnelBuilderState::Connecting(connecting) => {
143                            info!("{} connecting=>dead", builder);
144                            let mut ret_waiter = StateWaiter::new();
145                            connecting.waiter.transfer_into(&mut ret_waiter);
146                            *state = AcceptTunnelBuilderState::Closed;
147                            Some(ret_waiter)
148                        }, 
149                        AcceptTunnelBuilderState::Closed => {
150                            //存在closed之后tunnel dead的情况,忽略
151                            None
152                        }, 
153                        AcceptTunnelBuilderState::Establish => {
154                            //存在establish之后tunnel dead的情况,忽略
155                            None
156                        }
157                    }
158                }, 
159                TunnelState::Connecting => {
160                    unreachable!()
161                }
162            };
163            if let Some(waiter) = waiter {
164                waiter.wake();
165            }
166        });
167    }
168
169
170    fn explore_endpoint_pair<F: Fn(&Endpoint) -> bool>(&self, remote: &Device, first_box: Arc<PackageBox>, filter: F) -> Vec<DynBuildTunnelAction> {
171        let stack = Stack::from(&self.0.stack);
172        let tunnel = &self.0.tunnel;
173        let net_listener = stack.net_manager().listener();
174
175        let mut actions = vec![];
176
177        let connect_info = remote.connect_info();
178        for udp_interface in net_listener.udp() {
179            for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_udp() && ep.is_same_ip_version(&udp_interface.local()) && (ep.addr().is_ipv6() || (ep.addr().is_ipv4() && filter(ep)))) {
180                if let Ok((udp_tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((udp_interface.local(), *remote_ep)), ProxyType::None) {
181                    if newly_created {
182                        let action = SynUdpTunnel::new(
183                            udp_tunnel, 
184                            first_box.clone(), 
185                            tunnel.config().udp.holepunch_interval); 
186                        actions.push(Box::new(action) as DynBuildTunnelAction);
187                    }
188                }      
189            }
190        }
191
192        // for local_ip in net_listener.ip_set() {
193            for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_tcp() && (ep.addr().is_ipv6() || (ep.addr().is_ipv4() && filter(ep)))) {
194                if let Ok((tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((Endpoint::default_tcp(remote_ep), *remote_ep)), ProxyType::None) {
195                    if newly_created {
196                        let action = ConnectTcpTunnel::new(tunnel);
197                        actions.push(Box::new(action) as DynBuildTunnelAction);
198                    }
199                }    
200            }  
201        // }
202
203        actions
204    }  
205}
206
207#[async_trait]
208impl TunnelBuilder for AcceptTunnelBuilder {
209    fn sequence(&self) -> TempSeq {
210        self.0.sequence
211    }
212    fn state(&self) -> TunnelBuilderState {
213        match &*self.0.state.read().unwrap() {
214            AcceptTunnelBuilderState::Connecting(_) => TunnelBuilderState::Connecting, 
215            AcceptTunnelBuilderState::Establish => TunnelBuilderState::Establish,
216            AcceptTunnelBuilderState::Closed => TunnelBuilderState::Closed
217        }
218    }
219    async fn wait_establish(&self) -> Result<(), BuckyError> {
220        let (state, waiter) = {
221            let state = &mut *self.0.state.write().unwrap();
222            match state {
223                AcceptTunnelBuilderState::Connecting(connecting) => {
224                    (TunnelBuilderState::Connecting, Some(connecting.waiter.new_waiter()))
225                },
226                AcceptTunnelBuilderState::Establish => {
227                    (TunnelBuilderState::Establish, None)
228                },
229                AcceptTunnelBuilderState::Closed => {
230                    (TunnelBuilderState::Closed, None)
231                }
232            }
233        };
234        match if let Some(waiter) = waiter {
235            StateWaiter::wait(waiter, | | self.state()).await
236        } else {
237            state
238        } {
239            TunnelBuilderState::Establish => Ok(()), 
240            TunnelBuilderState::Closed => Err(BuckyError::new(BuckyErrorCode::Failed, "builder failed")),
241            TunnelBuilderState::Connecting => unreachable!()
242        }
243    }
244}
245
246impl OnPackage<AckProxy, &DeviceId> for AcceptTunnelBuilder {
247    fn on_package(&self, pkg: &AckProxy, proxy: &DeviceId) -> Result<OnPackageResult, BuckyError> {
248        if let Some(proxy_builder) = match &*self.0.state.read().unwrap() {
249            AcceptTunnelBuilderState::Connecting(connecting) => connecting.proxy.clone(),
250            _ => None
251        } {
252            proxy_builder.on_package(pkg, proxy)
253        } else {
254            let err = BuckyError::new(BuckyErrorCode::ErrorState, "proxy builder not exists");
255            debug!("{} ignore ack proxy from {} for {}", self, proxy, err);
256            Err(err)
257        }
258    }
259}
260
261
262
263