cyfs_bdt/tunnel/
udp.rs

1use log::*;
2use std::{
3    time::Duration, 
4    sync::RwLock, 
5    sync::atomic::{AtomicI32, AtomicU64, Ordering}
6};
7use async_std::{
8    sync::{Arc}, 
9    future
10, task};
11use async_trait::{async_trait};
12use cyfs_base::*;
13use crate::{
14    types::*,
15    protocol::{self, *, v0::*},
16    MTU,
17    interface::{*, udp::{PackageBoxEncodeContext, OnUdpPackageBox}}, 
18};
19use super::{
20    tunnel::{self, DynamicTunnel, TunnelOwner, ProxyType}, 
21    TunnelContainer
22};
23
24struct ConnectingState {
25    container: TunnelContainer, 
26    owner: Box<dyn TunnelOwner>, 
27    interface: udp::Interface, 
28    waiter: StateWaiter
29}
30
31struct ActiveState {
32    key: MixAesKey, 
33    // 记录active 这个tunnel时的,远端的 device body 的update time
34    remote_timestamp: Timestamp, 
35    container: TunnelContainer, 
36    owner: Box<dyn TunnelOwner>, 
37    interface: udp::Interface, 
38}
39
40enum TunnelState {
41    Connecting(ConnectingState), 
42    Active(ActiveState), 
43    Dead
44}
45
46impl std::fmt::Display for TunnelState {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            TunnelState::Connecting(_) => write!(f, "connecting"), 
50            TunnelState::Active(active_state) => 
51                write!(f, "Active:{{key:{}}}", 
52                    active_state.key), 
53            TunnelState::Dead => write!(f, "dead")
54        }
55    }
56}
57
58impl From<&TunnelState> for tunnel::TunnelState {
59    fn from(state: &TunnelState) -> Self {
60        match state {
61            TunnelState::Connecting(_) => tunnel::TunnelState::Connecting, 
62            TunnelState::Active(active_state) => tunnel::TunnelState::Active(active_state.remote_timestamp), 
63            TunnelState::Dead => tunnel::TunnelState::Dead
64        }
65    }
66}
67
68#[derive(Clone)]
69pub struct Config {
70    pub holepunch_interval: Duration, 
71    pub connect_timeout: Duration, 
72    pub ping_interval: Duration, 
73    pub ping_timeout: Duration
74}
75
76struct TunnelImpl {
77    local: Endpoint, 
78    remote: Endpoint,  
79    proxy: ProxyType, 
80    state: RwLock<TunnelState>, 
81    keeper_count: AtomicI32, 
82    last_active: AtomicU64,
83    mtu: usize,
84}
85
86#[derive(Clone)]
87pub struct Tunnel(Arc<TunnelImpl>);
88
89impl std::fmt::Display for Tunnel {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        write!(f, "UdpTunnel{{local:{},remote:{}}}", tunnel::Tunnel::local(self), tunnel::Tunnel::remote(self))
92    }
93}
94
95impl Tunnel {
96    pub fn new(
97        container: TunnelContainer, 
98        owner: Box<dyn TunnelOwner>, 
99        interface: udp::Interface, 
100        remote: Endpoint, 
101        proxy: ProxyType) -> Self {
102        let local = interface.local();
103        let state = TunnelState::Connecting(ConnectingState {
104            container: container.clone(), 
105            owner: owner.clone_as_tunnel_owner(), 
106            interface, 
107            waiter: StateWaiter::new()
108        });
109        let tunnel = Self(Arc::new(TunnelImpl {
110            mtu: MTU,
111            local, 
112            remote, 
113            proxy, 
114            state: RwLock::new(state), 
115            keeper_count: AtomicI32::new(0), 
116            last_active: AtomicU64::new(0)
117        }));
118        
119        {
120            let tunnel = tunnel.clone();
121            let connect_timeout = container.config().udp.connect_timeout;
122            task::spawn(async move {
123                match future::timeout(connect_timeout, tunnel.wait_active()).await {
124                    Ok(_state) => {
125                        // assert_eq!(state, tunnel::TunnelState::Active, "state should be active");
126                    }, 
127                    Err(_err) => {
128                        let waiter = {
129                            let state = &mut *tunnel.0.state.write().unwrap();
130                            match state {
131                                TunnelState::Connecting(connecting) => {
132                                    let mut waiter = StateWaiter::new();
133                                    connecting.waiter.transfer_into(&mut waiter);
134                                    *state = TunnelState::Dead;
135                                    Some(waiter)
136                                }, 
137                                TunnelState::Active(_) => {
138                                    // do nothing
139                                    None
140                                },
141                                TunnelState::Dead => {
142                                    // do nothing
143                                    None
144                                }
145                            }
146                        };
147                        if let Some(waiter) = waiter  {
148                            info!("{} dead for connecting timeout", tunnel);
149                            waiter.wake();
150                            owner.sync_tunnel_state(&DynamicTunnel::new(tunnel.clone()), tunnel::TunnelState::Connecting, tunnel::TunnelState::Dead);
151                        }
152                    }
153                }
154            });
155        }
156
157        tunnel
158    }
159
160    pub fn try_update_key(&self, by_box: &PackageBox) -> Result<(), BuckyError> {
161        let state = &mut *self.0.state.write().unwrap();
162        match state {
163            TunnelState::Active(active_state) => {
164                if active_state.key.mix_key != by_box.key().mix_key {
165                    debug!("{} update active state key from {} to {}", self, active_state.key, by_box.key());
166                    active_state.key = by_box.key().clone();
167                    Ok(())
168                } else {
169                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "same key"))
170                }
171            },
172            _ => {
173                Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
174            }
175        }
176    }
177
178    async fn wait_active(&self) -> tunnel::TunnelState {
179        let (state, opt_waiter) = {
180            let state = &mut *self.0.state.write().unwrap();
181            match state {
182                TunnelState::Connecting(ref mut connecting_state) => {
183                    let waiter = connecting_state.waiter.new_waiter();
184                    (tunnel::TunnelState::Connecting, Some(waiter))
185                },
186                TunnelState::Active(active_state) => {
187                    (tunnel::TunnelState::Active(active_state.remote_timestamp), None)
188                },
189                TunnelState::Dead => {
190                    (tunnel::TunnelState::Dead, None)
191                }
192            }
193        };
194        if let Some(waiter) = opt_waiter {
195            StateWaiter::wait(waiter, | | tunnel::Tunnel::state(self)).await
196        } else {
197            state
198        }
199    }
200
201    fn active_by_package(&self, by_box: &PackageBox, remote_timestamp: Option<Timestamp>) -> BuckyResult<TunnelContainer> {
202        self.active(by_box.key(), by_box.has_exchange(), remote_timestamp)
203    }
204
205    pub fn active(&self, key: &MixAesKey, exchange: bool, remote_timestamp: Option<Timestamp>) -> BuckyResult<TunnelContainer> { 
206        let (container, to_sync, waiter) = {
207            let state = &mut *self.0.state.write().unwrap(); 
208            match state {
209                TunnelState::Connecting(connecting_state) => {
210                    if let Some(remote_timestamp) = remote_timestamp {
211                        let mut waiter = StateWaiter::new();
212                        connecting_state.waiter.transfer_into(&mut waiter);
213                        info!("{} change state from Connecting to Active with mix_key:{}", self, key);
214                        let owner = connecting_state.owner.clone_as_tunnel_owner();
215                        let container = connecting_state.container.clone();
216                        *state = TunnelState::Active(ActiveState {
217                            container: container.clone(), 
218                            owner: owner.clone_as_tunnel_owner(), 
219                            remote_timestamp, 
220                            interface: connecting_state.interface.clone(),
221                            key: key.clone(),
222                        });
223                        Ok((container, 
224                            Some((tunnel::TunnelState::Connecting, 
225                                tunnel::TunnelState::Active(remote_timestamp), 
226                                owner)),  
227                            Some(waiter)))
228                    } else {
229                        Ok((connecting_state.container.clone(), None, None))
230                    }
231                }, 
232                TunnelState::Active(active_state) => {
233                    let former_state = tunnel::TunnelState::Active(active_state.remote_timestamp);
234                    if let Some(remote_timestamp) = remote_timestamp {
235                        if active_state.remote_timestamp < remote_timestamp {
236                            debug!("{} update active remote timestamp {}", self, remote_timestamp);
237                            active_state.remote_timestamp = remote_timestamp;
238                        } 
239                    }
240                    if exchange && key.mix_key != active_state.key.mix_key {
241                        debug!("{} update active state key from {} to {}", self, active_state.key, key);
242                        active_state.key = key.clone();
243                    }
244                    Ok((active_state.container.clone(), 
245                        Some((former_state, 
246                            tunnel::TunnelState::Active(active_state.remote_timestamp), 
247                            active_state.owner.clone_as_tunnel_owner())), 
248                        None))
249                },
250                TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
251            }
252        }?;
253        
254        if let Some(waiter) = waiter {
255            waiter.wake();
256        }
257
258        if let Some((former_state, new_state, owner)) = to_sync {
259            self.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
260            if former_state != new_state {
261                owner.sync_tunnel_state(&tunnel::DynamicTunnel::new(self.clone()), former_state, new_state);
262            }
263        }
264
265        Ok(container)
266    }
267
268    pub fn send_box(&self, package_box: &PackageBox) -> Result<(), BuckyError> {
269        let interface = {
270            let state = &*self.0.state.read().unwrap();
271            match state {
272                TunnelState::Connecting(connecting) => Ok(connecting.interface.clone()), 
273                TunnelState::Active(active) => Ok(active.interface.clone()), 
274                TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel dead"))
275            }
276        }?;
277        let mut context = PackageBoxEncodeContext::default();
278        context.set_ignore_exchange(ProxyType::None != self.0.proxy);
279        interface.send_box_to(&mut context, package_box, tunnel::Tunnel::remote(self))?;
280        Ok(())
281    }
282
283    pub fn raw_data_max_len() -> usize {
284        udp::MTU
285    }
286
287    pub(super) fn raw_data_header_len_impl() -> usize {
288        KeyMixHash::raw_bytes().unwrap()
289    }
290
291    pub fn raw_data_max_payload_len() -> usize {
292        Self::raw_data_max_len() - Self::raw_data_header_len_impl()
293    }
294
295
296    pub fn owner(&self) -> Option<TunnelContainer> {
297        let state = &*self.0.state.read().unwrap();
298        match state {
299            TunnelState::Connecting(connecting) => Some(connecting.container.clone()),  
300            TunnelState::Active(active) => Some(active.container.clone()), 
301            TunnelState::Dead => None
302        }
303    }
304}
305
306#[async_trait]
307impl tunnel::Tunnel for Tunnel {
308    fn mtu(&self) -> usize {
309        self.0.mtu
310    }
311
312    fn ptr_eq(&self, other: &tunnel::DynamicTunnel) -> bool {
313        *self.local() == *other.as_ref().local() 
314        && *self.remote() == *other.as_ref().remote()
315        && Arc::ptr_eq(&self.0, &other.clone_as_tunnel::<Tunnel>().0)
316    }
317
318    fn as_any(&self) -> &dyn std::any::Any {
319        self
320    }
321
322    fn local(&self) -> &Endpoint {
323        &self.0.local
324    }
325
326    fn remote(&self) -> &Endpoint {
327        &self.0.remote
328    }
329
330    fn proxy(&self) -> ProxyType {
331        self.0.proxy.clone()
332    }
333
334    fn state(&self) -> tunnel::TunnelState {
335        let state = &*self.0.state.read().unwrap();
336        tunnel::TunnelState::from(state)
337    }
338
339    
340    fn raw_data_header_len(&self) -> usize {
341        Self::raw_data_header_len_impl()
342    }
343
344    fn send_raw_data(&self, data: &mut [u8]) -> Result<usize, BuckyError> {
345        let (key, interface) = {
346            let state = &*self.0.state.read().unwrap();
347            match state {
348                TunnelState::Connecting(_) => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel not active")), 
349                TunnelState::Active(active) => Ok((active.key.clone(), active.interface.clone())), 
350                TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel dead"))
351            }
352        }?;
353
354        assert_eq!(data.len() > Self::raw_data_header_len_impl(), true);
355        
356        interface.send_raw_data_to(&key, data, tunnel::Tunnel::remote(self))
357    }
358
359    fn send_package(&self, package: DynamicPackage) -> Result<usize, BuckyError> {
360        let (tunnel_container, interface, key) = {
361            if let TunnelState::Active(active_state) =  &*self.0.state.read().unwrap() {
362            Ok((active_state.container.clone(), active_state.interface.clone(), active_state.key.clone()))
363        } else {
364            Err(BuckyError::new(BuckyErrorCode::ErrorState, "send packages on tunnel not active"))
365        }}?;
366        trace!("{} send packages with key {}", self, key);
367        let package_box = PackageBox::from_package(tunnel_container.remote().clone(), key, package);
368        let mut context = PackageBoxEncodeContext::default();
369        context.set_ignore_exchange(ProxyType::None != self.0.proxy);
370        let sent_len = interface.send_box_to(&mut context, &package_box, tunnel::Tunnel::remote(self))?;
371        Ok(sent_len)
372    }
373
374    fn retain_keeper(&self) {
375        info!("{} retain keeper", self);
376        if 0 == self.0.keeper_count.fetch_add(1, Ordering::SeqCst) {
377            if let Some((container, owner, cur_state)) = {
378                let state = &*self.0.state.write().unwrap();
379                if let TunnelState::Active(active_state) = state {
380                    Some((active_state.container.clone(),
381                    active_state.owner.clone_as_tunnel_owner(),  
382                    tunnel::TunnelState::Active(active_state.remote_timestamp)))
383                } else {
384                    None
385                }
386            } {
387                let tunnel = self.clone();
388                let ping_interval = container.config().udp.ping_interval;
389                let ping_timeout = container.config().udp.ping_timeout;
390
391                task::spawn(async move {
392                    loop {
393                        if tunnel.0.keeper_count.load(Ordering::SeqCst) == 0 {
394                            break;
395                        }
396                        let now = bucky_time_now();
397                        let last_active = tunnel.0.last_active.load(Ordering::SeqCst);
398                        if now > last_active {
399                            let miss_active_time = Duration::from_micros(now - last_active);
400                            if miss_active_time > ping_timeout {
401                                let state = &mut *tunnel.0.state.write().unwrap();
402                                if let TunnelState::Active(_) = state {
403                                    info!("{} dead for ping timeout", tunnel);
404                                    *state = TunnelState::Dead;
405                                    break;
406                                } else {
407                                    break;
408                                }
409                            }
410                            if miss_active_time > ping_interval {
411                                if tunnel.0.keeper_count.load(Ordering::SeqCst) > 0 {
412                                    debug!("{} send ping", tunnel);
413                                    let ping = PingTunnel {
414                                        package_id: 0,
415                                        send_time: now,
416                                        recv_data: 0,
417                                    };
418                                    let _ = tunnel::Tunnel::send_package(&tunnel, DynamicPackage::from(ping));
419                                }
420                            }
421                        }
422                        
423                        let _ = future::timeout(ping_interval, future::pending::<()>()).await;
424                    };
425                    owner.sync_tunnel_state(&tunnel::DynamicTunnel::new(tunnel.clone()), cur_state, tunnel.state());
426                });
427            } else {
428                return;
429            }
430        }
431    }
432
433    fn release_keeper(&self) {
434        info!("{} release keeper", self);
435        self.0.keeper_count.fetch_add(-1, Ordering::SeqCst);
436    }
437
438    fn reset(&self) {
439        info!("{} reset to Dead", self);
440        let mut state = self.0.state.write().unwrap();
441        *state = TunnelState::Dead;
442    }
443
444    fn mark_dead(&self, former_state: tunnel::TunnelState) {
445        let notify = match &former_state {
446            tunnel::TunnelState::Connecting => {
447                let state = &mut *self.0.state.write().unwrap();
448                match state {
449                    TunnelState::Connecting(connecting) => {
450                        info!("{} Connecting=>Dead", self);
451                        let owner = connecting.owner.clone_as_tunnel_owner();
452                        *state = TunnelState::Dead;
453                        Some((owner, tunnel::TunnelState::Dead))
454                    }, 
455                    _ => {
456                        None
457                    }
458                }
459            }, 
460            tunnel::TunnelState::Active(remote_timestamp) => {
461                let remote_timestamp = *remote_timestamp;
462                let state = &mut *self.0.state.write().unwrap();
463                match state {
464                    TunnelState::Active(active) => {
465                        let owner = active.owner.clone_as_tunnel_owner();
466                        if active.remote_timestamp == remote_timestamp {
467                            info!("{} Active({})=>Dead for active by {}", self, active.remote_timestamp, remote_timestamp);
468                            *state = TunnelState::Dead;
469                            Some((owner, tunnel::TunnelState::Dead))
470                        } else {
471                            None
472                        }
473                    }, 
474                    _ => {
475                        None
476                    }
477                }
478            }, 
479            tunnel::TunnelState::Dead => None
480        };
481
482        if let Some((owner, new_state)) = notify {
483            owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, new_state);
484        }
485    }
486}
487
488impl OnUdpPackageBox for Tunnel {
489    fn on_udp_package_box(&self, udp_box: udp::UdpPackageBox) -> Result<(), BuckyError> {
490        for p in udp_box.as_ref().packages_no_exchange() {
491            match downcast_tunnel_handle!(p, |p| self.on_package(p, udp_box.as_ref()))? {
492                OnPackageResult::Break => break, 
493                _ => continue,
494            };
495        };
496        Ok(())
497        
498    }
499}
500
501impl OnPackage<SynTunnel, &PackageBox> for Tunnel {
502    fn on_package(&self, syn_tunnel: &SynTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
503        let container = self.active_by_package(in_box, Some(syn_tunnel.from_device_desc.body().as_ref().unwrap().update_time()))?;
504        // TODO: 考虑合并ack 和 session data
505        // 回复ack tunnel
506        let ack = AckTunnel {
507            protocol_version: container.protocol_version(), 
508            stack_version: container.stack_version(), 
509            sequence: syn_tunnel.sequence,
510            result: 0,
511            send_time: 0,
512            mtu: udp::MTU as u16,
513            to_device_desc: container.stack().sn_client().ping().default_local()
514        };
515
516        let mut package_box = PackageBox::encrypt_box(
517            container.remote().clone(), 
518            in_box.key().clone());
519        package_box.append(vec![DynamicPackage::from(ack)]);
520        let _ = self.send_box(&package_box);
521         // 传回给 container 处理
522         container.on_package(syn_tunnel, None)
523    }
524}
525
526impl OnPackage<AckTunnel, &PackageBox> for Tunnel {
527    fn on_package(&self, pkg: &AckTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
528        let container = self.active_by_package(in_box, Some(pkg.to_device_desc.body().as_ref().unwrap().update_time()))?;
529        // 传回给 container 处理
530        container.on_package(pkg, None)
531    }
532}
533
534impl OnPackage<AckAckTunnel, &PackageBox> for Tunnel {
535    fn on_package(&self, _: &AckAckTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
536        let _ = self.active_by_package(in_box, None)?;
537        // do nothing
538        Ok(OnPackageResult::Handled)
539    }
540}
541
542impl OnPackage<PingTunnel, &PackageBox> for Tunnel {
543    fn on_package(&self, ping: &PingTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
544        let _ = self.active_by_package(in_box, None)?;
545        let ping_resp = PingTunnelResp {
546            ack_package_id: ping.package_id,
547            send_time: bucky_time_now(),
548            recv_data: 0,
549        };
550        let _ = tunnel::Tunnel::send_package(self, DynamicPackage::from(ping_resp));
551        Ok(OnPackageResult::Handled)
552    }
553}
554
555impl OnPackage<PingTunnelResp, &PackageBox> for Tunnel {
556    fn on_package(&self, _: &PingTunnelResp, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
557        let _ = self.active_by_package(in_box, None)?;
558        // do nothing
559        Ok(OnPackageResult::Handled)
560    }
561}
562
563impl OnPackage<Datagram, &PackageBox> for Tunnel {
564    fn on_package(&self, pkg: &Datagram, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
565        let container = self.active_by_package(in_box, None)?;
566        // 传回给 container 处理
567        container.on_package(pkg, None)
568    }
569}
570
571impl OnPackage<SessionData, &PackageBox> for Tunnel {
572    fn on_package(&self, pkg: &SessionData, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
573        let container = self.active_by_package(in_box, None)?;
574        // 传回给 container 处理
575        container.on_package(pkg, None)
576    }
577}
578
579impl OnPackage<TcpSynConnection, &PackageBox> for Tunnel {
580    fn on_package(&self, pkg: &TcpSynConnection, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
581        let container = self.active_by_package(in_box, None)?;
582        // 传回给 container 处理
583        container.on_package(pkg, None)
584    }
585}
586
587
588
589
590
591
592
593
594
595