cyfs_bdt/tunnel/builder/connect_tunnel/
builder.rs

1use log::*;
2use std::{
3    sync::RwLock, 
4    time::Duration
5};
6use async_std::{
7    sync::{Arc}, 
8    task,
9    future
10};
11use futures::future::{Abortable, AbortHandle};
12use async_trait::{async_trait};
13use cyfs_base::*;
14use crate::{
15    types::*, 
16    protocol::{*, v0::*},
17    interface::{*, udp::MTU_LARGE}, 
18    history::keystore, 
19    sn::client::PingClientCalledEvent, 
20    tunnel::{TunnelState, TunnelContainer, ProxyType, BuildTunnelParams}, 
21    stack::{Stack, WeakStack}
22};
23use super::super::{
24    action::*, 
25    builder::*, 
26    proxy::*
27};
28
29struct ConnectingState {
30    proxy: Option<ProxyBuilder>, 
31    waiter: StateWaiter
32}
33
34enum ConnectTunnelBuilderState {
35    Connecting(ConnectingState), 
36    Establish, 
37    Closed
38}
39
40struct ConnectTunnelBuilderImpl {
41    stack: WeakStack, 
42    start_at: Timestamp, 
43    tunnel: TunnelContainer,
44    params: BuildTunnelParams, 
45    sequence: TempSeq,
46    state: RwLock<ConnectTunnelBuilderState>
47}
48
49#[derive(Clone)]
50pub struct ConnectTunnelBuilder(Arc<ConnectTunnelBuilderImpl>);
51
52impl std::fmt::Display for ConnectTunnelBuilder {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "ConnectTunnelBuilder{{tunnel:{}}}", self.0.tunnel)
55    }
56}
57
58impl ConnectTunnelBuilder {
59    pub fn new(stack: WeakStack, tunnel: TunnelContainer, params: BuildTunnelParams) -> Self {
60        let sequence = tunnel.generate_sequence();
61        Self(Arc::new(ConnectTunnelBuilderImpl {
62            stack, 
63            start_at: bucky_time_now(), 
64            tunnel,
65            params, 
66            sequence, 
67            state: RwLock::new(ConnectTunnelBuilderState::Connecting(ConnectingState {
68                proxy: None, 
69                waiter:StateWaiter::new()
70            }))
71        }))
72    }
73
74    fn escaped(&self) -> Duration {
75        let now = bucky_time_now();
76        if now > self.0.start_at {
77            Duration::from_micros(now - self.0.start_at)
78        } else {
79            Duration::from_micros(0)
80        }
81    }
82
83    async fn build_inner(&self) -> BuckyResult<()> {
84        let stack = Stack::from(&self.0.stack);
85        let local = stack.sn_client().ping().default_local();
86        let build_params = &self.0.params;
87
88        let first_box = Arc::new(self.first_box(&local).await);
89
90        info!("{} build with key {}", self, first_box.key());
91        let remote_id = build_params.remote_const.device_id();
92        let cached_remote = stack.device_cache().get_inner(&remote_id);
93        let known_remote = cached_remote.as_ref().or_else(|| build_params.remote_desc.as_ref());
94
95        let actions = if let Some(remote) = known_remote {
96            info!("{} explore_endpoint_pair with known remote {:?}", self, remote.connect_info().endpoints());
97            self.explore_endpoint_pair(remote, first_box.clone(), |ep| ep.is_static_wan())
98        } else {
99            vec![]
100        };
101   
102        if actions.len() == 0 {
103            let nearest_sn = build_params.nearest_sn(&stack);
104            if let Some(sn) = nearest_sn {
105                info!("{} call nearest sn, sn={}", self, sn);
106                let timeout_ret = future::timeout(stack.config().tunnel.retry_sn_timeout, self.call_sn(vec![sn.clone()], first_box.clone())).await;
107                let retry_sn_list = match timeout_ret {
108                    Ok(finish_ret) => {
109                        match finish_ret {
110                            Ok(_) => {
111                                info!("{} call nearest sn finished, sn={}", self, sn);
112                                if TunnelBuilderState::Establish != self.state() {
113                                    let escaped = self.escaped();
114                                    if stack.config().tunnel.retry_sn_timeout < escaped {
115                                        Some(Duration::from_secs(0))
116                                    } else {
117                                        Some(stack.config().tunnel.retry_sn_timeout - escaped)
118                                    }
119                                } else {
120                                    None
121                                }
122                            }, 
123                            Err(err) => {
124                                if err.code() == BuckyErrorCode::Interrupted {
125                                    info!("{} call nearest sn canceled, sn={}", self, sn);
126                                    None
127                                } else {
128                                    error!("{} call nearest sn failed, sn={}, err={}", self, sn, err);
129                                    Some(Duration::from_secs(0))
130                                }
131                            }
132                        }
133                    },
134                    Err(_) => {
135                        warn!("{} call nearest sn timeout {}", self, sn);
136                        Some(Duration::from_secs(0))
137                    }
138                };
139                if let Some(delay) = retry_sn_list {
140                    if future::timeout(delay, self.wait_establish()).await.is_err() {
141                        if let Some(sn_list) = build_params.retry_sn_list(&stack, &sn) {
142                            info!("{} retry sn list call, sn={:?}", self, sn_list);
143                            let _ = self.call_sn(sn_list, first_box).await;
144                        }
145                    }
146                }
147            } else if let Some(remote) = known_remote {
148                info!("{} explore_endpoint_pair with known remote {:?} again", self, remote.connect_info().endpoints());
149                let _ = self.explore_endpoint_pair(remote, first_box.clone(), |_| true);
150            } else {
151                warn!("{} no sn and unkown remote", self);
152            }
153        } 
154
155        Ok(())
156    }
157
158    pub async fn build(&self) {
159        self.sync_tunnel_state();
160        let _ = self.build_inner().await.
161            map_err(|err| {
162                error!("{} build failed for {}", self, err);
163                let waiter = {
164                    let state = &mut *self.0.state.write().unwrap();
165                    match state {
166                        ConnectTunnelBuilderState::Connecting(connecting) => {
167                            info!("{} connecting=>dead", self);
168                            let mut ret_waiter = StateWaiter::new();
169                            connecting.waiter.transfer_into(&mut ret_waiter);
170                            *state = ConnectTunnelBuilderState::Closed;
171                            Some(ret_waiter)
172                        }, 
173                        ConnectTunnelBuilderState::Closed => {
174                            //存在closed之后tunnel dead的情况,忽略
175                            None
176                        }, 
177                        ConnectTunnelBuilderState::Establish => {
178                            //存在establish之后tunnel dead的情况,忽略
179                            None
180                        }
181                    }
182                };
183                if let Some(waiter) = waiter {
184                    waiter.wake();
185                }
186            });
187    }
188
189    fn sync_tunnel_state(&self) {
190        let builder = self.clone();
191        task::spawn(async move {
192            let tunnel_state = builder.0.tunnel.wait_active().await;
193            let waiter = match tunnel_state {
194                TunnelState::Active(_) => {
195                    let state = &mut *builder.0.state.write().unwrap();
196                    match state {
197                        ConnectTunnelBuilderState::Connecting(connecting) => {
198                            info!("{} connecting=>establish", builder);
199                            let mut ret_waiter = StateWaiter::new();
200                            connecting.waiter.transfer_into(&mut ret_waiter);
201                            *state = ConnectTunnelBuilderState::Establish;
202                            Some(ret_waiter)
203                        }, 
204                        ConnectTunnelBuilderState::Closed => {
205                            //存在closed之后tunnel联通的情况,忽略
206                            None
207                        }, 
208                        ConnectTunnelBuilderState::Establish => {
209                            unreachable!()
210                        }
211                    }
212                }, 
213                TunnelState::Dead | TunnelState::Connecting => {
214                    let state = &mut *builder.0.state.write().unwrap();
215                    match state {
216                        ConnectTunnelBuilderState::Connecting(connecting) => {
217                            info!("{} connecting=>dead", builder);
218                            let mut ret_waiter = StateWaiter::new();
219                            connecting.waiter.transfer_into(&mut ret_waiter);
220                            *state = ConnectTunnelBuilderState::Closed;
221                            Some(ret_waiter)
222                        }, 
223                        ConnectTunnelBuilderState::Closed => {
224                            //存在closed之后tunnel dead的情况,忽略
225                            None
226                        }, 
227                        ConnectTunnelBuilderState::Establish => {
228                            //存在establish之后tunnel dead的情况,忽略
229                            None
230                        }
231                    }
232                },
233            };
234            if let Some(waiter) = waiter {
235                waiter.wake();
236            }
237        });
238    }
239
240    async fn call_sn(&self, sn_list: Vec<DeviceId>, first_box: Arc<PackageBox>) -> BuckyResult<()> {
241        let (cancel, reg) = AbortHandle::new_pair();
242
243        let builder = self.clone();
244        task::spawn(async move {
245            let _ = builder.wait_establish().await;
246            cancel.abort();
247        });
248
249        let (sender, receiver) = async_std::channel::bounded::<BuckyResult<()>>(1);
250        let builder = self.clone();
251        task::spawn(async move {
252            let result = Abortable::new(builder.call_sn_inner(sn_list.clone(), first_box), reg).await;
253            let result = match result {
254                Ok(result) => result, 
255                Err(_) => {
256                    info!("{} call sn interrupted, sn={:?}", builder, sn_list);
257                    Err(BuckyError::new(BuckyErrorCode::Interrupted, "canceled"))
258                }
259            };
260            let _ = sender.try_send(result);
261        });
262       
263        receiver.recv().await.unwrap()
264    }
265
266    async fn call_sn_inner(&self, sn_list: Vec<DeviceId>, first_box: Arc<PackageBox>) -> BuckyResult<()> {
267        let stack = Stack::from(&self.0.stack);
268        let tunnel = &self.0.tunnel;
269        let call_session = stack.sn_client().call().call(
270            None,
271            tunnel.remote(),
272            &sn_list, 
273            |sn_call| {
274                let mut context = udp::PackageBoxEncodeContext::from(sn_call);
275                //FIXME 先不调用raw_measure_with_context
276                //let len = first_box.raw_measure_with_context(&mut context).unwrap();
277                let mut buf = vec![0u8; MTU_LARGE];
278                let b = first_box.raw_encode_with_context(&mut buf, &mut context, &None).unwrap();
279                //buf[0..b.len()].as_ref()
280                let len = MTU_LARGE - b.len();
281                buf.truncate(len);
282                info!("{} encode first box to sn call, len: {}, package_box {:?}", self, len, first_box);
283                buf
284            }).await.map_err(|err| {
285                error!("{} call sn failed, sn={:?}, err={}", self, sn_list, err);
286                err
287            })?; 
288        
289        let mut success = false;
290        loop {
291            if let Some(session) = call_session.next().await
292                .map_err(|err| {error!("{} call sn failed, sn={:?}, err={}", self, sn_list, err); err})
293                .ok().and_then(|opt| opt) {
294                match session.result().unwrap() {
295                    Ok(remote) => {
296                        if let Some(proxy_buidler) = {
297                            info!("{} call sn session responsed, sn={:?}, endpoints={:?}", self, session.sn(), remote.connect_info().endpoints());
298                            let state = &mut *self.0.state.write().unwrap();
299                            match state {
300                                ConnectTunnelBuilderState::Connecting(connecting) => {
301                                    if connecting.proxy.is_none() {
302                                        let proxy = ProxyBuilder::new(
303                                            tunnel.clone(), 
304                                            remote.get_obj_update_time(),  
305                                            first_box.clone());
306                                        debug!("{} create proxy builder", self);
307                                        connecting.proxy = Some(proxy);
308                                    }
309                                    connecting.proxy.clone()
310                                }, 
311                                _ => {
312                                    debug!("{} ignore proxy builder for not in connecting state", self);
313                                    None
314                                }
315                            }
316                        } {
317                            //FIXME: 使用正确的proxy策略
318                            for proxy in stack.proxy_manager().active_proxies() {
319                                let _ = proxy_buidler.syn_proxy(ProxyType::Active(proxy)).await;
320                            }
321                            for proxy in remote.connect_info().passive_pn_list().iter().cloned() {
322                                let _ = proxy_buidler.syn_proxy(ProxyType::Passive(proxy)).await;
323                            }
324                        }
325
326                        success = true;
327                        let _ = self.explore_endpoint_pair(&remote, first_box.clone(), |_| true);
328                    },
329                    Err(err) => {
330                        error!("{} call sn session failed, sn={:?}, err={}", self, session.sn(), err);
331                    }
332                }
333            } else {
334                break;
335            }
336        }
337        
338        if success {
339            Ok(())
340        } else {
341            error!("{} call sn session failed, sn={:?}", self, sn_list);
342            Err(BuckyError::new(BuckyErrorCode::Failed, "all failed"))
343        }
344    }
345
346    fn explore_endpoint_pair<F: Fn(&Endpoint) -> bool>(&self, remote: &Device, first_box: Arc<PackageBox>, filter: F) -> Vec<DynBuildTunnelAction> {
347        let stack = Stack::from(&self.0.stack);
348        let tunnel = &self.0.tunnel;
349        let net_listener = stack.net_manager().listener();
350
351        let mut actions = vec![];
352
353        let connect_info = remote.connect_info();
354        
355        // FIXME: ipv6 udp frame may not support supper frame, simply ignore it now
356        for udp_interface in net_listener.udp().iter().filter(|ui| ui.local().addr().is_ipv4()) {
357            for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_udp() && ep.is_same_ip_version(&udp_interface.local()) && filter(ep)) {
358                if let Ok((udp_tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((udp_interface.local(), *remote_ep)), ProxyType::None) {
359                    if newly_created {
360                        let action = SynUdpTunnel::new(
361                            udp_tunnel, 
362                            first_box.clone(), 
363                            tunnel.config().udp.holepunch_interval); 
364                        actions.push(Box::new(action) as DynBuildTunnelAction);
365                    }
366                }  
367            }    
368        }
369
370        // for local_ip in net_listener.ip_set() {
371            for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_tcp() && filter(ep)) {
372                if let Ok((tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((Endpoint::default_tcp(remote_ep), *remote_ep)), ProxyType::None) {
373                    if newly_created {
374                        let action = ConnectTcpTunnel::new(tunnel);
375                        actions.push(Box::new(action) as DynBuildTunnelAction);
376                    }
377                }   
378            }   
379        // }
380
381        actions
382    }  
383
384    //FXIME: 这里有机会把要发的一个session包放进来
385    async fn first_box(&self, local: &Device) -> PackageBox {
386        let stack = Stack::from(&self.0.stack);
387        let tunnel = &self.0.tunnel;
388
389        let key_stub = stack.keystore().create_key(tunnel.remote_const(), true);
390        // 生成第一个package box
391        let mut first_box = PackageBox::encrypt_box(tunnel.remote().clone(), key_stub.key.clone());
392            
393        let syn_tunnel = SynTunnel {
394            protocol_version: self.0.tunnel.protocol_version(), 
395            stack_version: self.0.tunnel.stack_version(), 
396            to_device_id: tunnel.remote().clone(), 
397            from_device_desc: local.clone(),
398            sequence: self.sequence(), 
399            send_time: bucky_time_now()
400        };
401        if let keystore::EncryptedKey::Unconfirmed(key_encrypted) = key_stub.encrypted {
402            let mut exchange = Exchange::from((&syn_tunnel, key_encrypted, key_stub.key.mix_key));
403            let _ = exchange.sign(stack.keystore().signer()).await;
404            first_box.push(exchange);
405        }
406        first_box.push(syn_tunnel);
407        first_box
408    }
409}
410
411#[async_trait]
412impl TunnelBuilder for ConnectTunnelBuilder {
413    fn sequence(&self) -> TempSeq {
414        self.0.sequence
415    }
416    fn state(&self) -> TunnelBuilderState {
417        match &*self.0.state.read().unwrap() {
418            ConnectTunnelBuilderState::Connecting(_) => TunnelBuilderState::Connecting, 
419            ConnectTunnelBuilderState::Establish => TunnelBuilderState::Establish,
420            ConnectTunnelBuilderState::Closed => TunnelBuilderState::Closed
421        }
422    }
423    async fn wait_establish(&self) -> Result<(), BuckyError> {
424        let (state, waiter) = {
425            let state = &mut *self.0.state.write().unwrap();
426            match state {
427                ConnectTunnelBuilderState::Connecting(connecting) => {
428                    (TunnelBuilderState::Connecting, Some(connecting.waiter.new_waiter()))
429                },
430                ConnectTunnelBuilderState::Establish => {
431                    (TunnelBuilderState::Establish, None)
432                },
433                ConnectTunnelBuilderState::Closed => {
434                    (TunnelBuilderState::Closed, None)
435                }
436            }
437        };
438        match if let Some(waiter) = waiter {
439            StateWaiter::wait(waiter, | | self.state()).await
440        } else {
441            state
442        } {
443            TunnelBuilderState::Establish => Ok(()), 
444            TunnelBuilderState::Closed => Err(BuckyError::new(BuckyErrorCode::Failed, "builder failed")),
445            TunnelBuilderState::Connecting => unreachable!()
446        }
447    }
448}
449
450impl PingClientCalledEvent for ConnectTunnelBuilder {
451    fn on_called(&self, called: &SnCalled, _context: ()) -> Result<(), BuckyError> {
452        let builder = self.clone();
453        let active_pn_list = called.active_pn_list.clone();
454        let remote_timestamp = called.peer_info.get_obj_update_time();
455        task::spawn(async move {
456            let stack = Stack::from(&builder.0.stack);
457            let first_box = builder.first_box(&stack.sn_client().ping().default_local()).await;
458            if let Some(proxy_builder) = {
459                let state = &mut *builder.0.state.write().unwrap();
460                match state {
461                    ConnectTunnelBuilderState::Connecting(connecting) => {
462                        if connecting.proxy.is_none() {
463                            let proxy = ProxyBuilder::new(
464                                builder.0.tunnel.clone(), 
465                                remote_timestamp,  
466                                Arc::new(first_box));
467                            debug!("{} create proxy builder", builder);
468                            connecting.proxy = Some(proxy);
469                        }
470                        connecting.proxy.clone()
471                    }, 
472                    _ => None
473                }
474            } {
475                //FIXME: 使用正确的proxy策略
476                for proxy in active_pn_list {
477                    let _ = proxy_builder.syn_proxy(ProxyType::Active(proxy));
478                }
479            }
480        });
481        
482        Ok(())
483    }
484}
485
486
487impl OnPackage<AckProxy, &DeviceId> for ConnectTunnelBuilder {
488    fn on_package(&self, pkg: &AckProxy, proxy: &DeviceId) -> Result<OnPackageResult, BuckyError> {
489        if let Some(proxy_builder) = match &*self.0.state.read().unwrap() {
490            ConnectTunnelBuilderState::Connecting(connecting) => {
491                connecting.proxy.clone()
492            }, 
493            _ => {
494                None
495            }
496        } {
497            proxy_builder.on_package(pkg, proxy)
498        } else {
499            let err = BuckyError::new(BuckyErrorCode::ErrorState, "proxy builder not exists");
500            debug!("{} ignore ack proxy from {} for {}", self, proxy, err);
501            Err(err)
502        }
503    }
504}
505
506
507
508