cyfs_bdt/tunnel/
manager.rs

1use log::*;
2use std::{
3    fmt, 
4    time::Duration, 
5    collections::{BTreeMap, LinkedList}, 
6    sync::{Arc, RwLock}
7};
8use async_std::{
9    future, 
10    task
11};
12use cyfs_base::*;
13use crate::{
14    types::*, 
15    protocol::{*, v0::*}, 
16    interface::{*, udp::{self, OnUdpPackageBox, OnUdpRawData}, tcp::{OnTcpInterface}}, 
17    sn::client::PingClientCalledEvent, 
18    stack::{Stack, WeakStack}
19};
20use super::container::{TunnelGuard, TunnelContainer, Config};
21
22struct TunnelKeeper {
23    reserving: Option<Timestamp>, 
24    tunnel: TunnelGuard
25}
26
27impl TunnelKeeper {
28    fn get(&self) -> TunnelGuard {
29        self.tunnel.clone()
30    }
31
32    fn check(&mut self, when: Timestamp) -> bool {
33        if self.tunnel.ref_count() > 1 {
34            self.reserving = None;
35            true
36        } else if let Some(expire_at) = self.reserving {
37            if when > expire_at {
38                info!("{} expired at {}", &*self.tunnel, expire_at);
39                false
40            } else {
41                true
42            }
43        } else {
44            self.reserving = Some(when + self.tunnel.config().retain_timeout.as_micros() as u64);
45            true
46        }
47    }
48}
49
50struct TunnelManagerImpl {
51    stack: WeakStack, 
52    entries: RwLock<BTreeMap<DeviceId, TunnelKeeper>>
53}
54
55#[derive(Clone)]
56pub struct TunnelManager(Arc<TunnelManagerImpl>);
57
58impl TunnelManager {
59    pub fn new(stack: WeakStack) -> Self {
60        let manager = Self(Arc::new(TunnelManagerImpl {
61            stack, 
62            entries: RwLock::new(BTreeMap::new())
63        }));
64
65        {
66            let manager = manager.clone();
67            task::spawn(async move {
68                loop {
69                    manager.check_recycle(bucky_time_now());
70                    let _ = future::timeout(Duration::from_secs(1), future::pending::<()>()).await;           
71                }
72            });
73        }
74
75        manager
76    }
77
78    fn check_recycle(&self, when: Timestamp) {
79        let mut entries = self.0.entries.write().unwrap(); 
80        let mut remove = LinkedList::new();
81
82        for (remote, keeper) in entries.iter_mut() {
83            if !keeper.check(when) {
84                remove.push_back(remote.clone());
85            }
86        }
87
88        for remote in remove {
89            info!("{} will remove tunnel for not used, channel={}", self, remote);
90            entries.remove(&remote);
91        }
92    }
93
94    fn config_for(&self, _remote_const: &DeviceDesc) -> Config {
95        // FIXME: 特化对不同remote的 tunnel config
96        let stack = Stack::from(&self.0.stack);
97        stack.config().tunnel.clone()
98    }
99
100    pub(crate) fn create_container(&self, remote_const: &DeviceDesc) -> Result<TunnelGuard, BuckyError> {
101        let remote = remote_const.device_id();
102        debug!("{} create new tunnel container of remote {}", self, remote);
103        let mut entries = self.0.entries.write().unwrap();
104        if let Some(tunnel) = entries.get(&remote) {
105            Ok(tunnel.get())
106        } else {
107            let tunnel = TunnelGuard::new(TunnelContainer::new(self.0.stack.clone(), remote_const.clone(), self.config_for(remote_const)));
108            entries.insert(remote, TunnelKeeper { reserving: None, tunnel: tunnel.clone() });
109            Ok(tunnel)
110        } 
111    }
112
113    pub(crate) fn container_of(&self, remote: &DeviceId) -> Option<TunnelGuard> {
114        let entries = self.0.entries.read().unwrap();
115        entries.get(&remote).map(|tunnel| {
116            tunnel.get()
117        })
118    }
119
120    pub fn reset(&self) {
121        let entries = self.0.entries.read().unwrap();
122        for (_, tunnel) in entries.iter() {
123            tunnel.get().reset();
124        }
125    }
126
127    pub(crate) fn on_statistic(&self) -> String {
128        let tunnel_count = self.0.entries.read().unwrap().len();
129        format!("TunnelCount: {}", tunnel_count)
130    }
131}
132
133impl fmt::Display for TunnelManager {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        write!(f, "TunnelManager{{local:{}}}", Stack::from(&self.0.stack).local_device_id())
136    }
137}
138
139impl OnUdpPackageBox for TunnelManager {
140    fn on_udp_package_box(&self, package_box: udp::UdpPackageBox) -> Result<(), BuckyError> {
141        trace!("{} on_udp_package_box from remote {}", self, package_box.as_ref().remote());
142        if let Some(tunnel) = self.container_of(package_box.as_ref().remote()) {
143            tunnel.on_udp_package_box(package_box)
144        } else {
145            let first_package = &package_box.as_ref().packages_no_exchange()[0];
146            if first_package.cmd_code() == PackageCmdCode::SynTunnel {
147                let syn_tunnel: &SynTunnel = first_package.as_ref();
148                // if syn_tunnel.sequence.is_valid(bucky_time_now()) {
149                    let tunnel = self.create_container(syn_tunnel.from_device_desc.desc())?;
150                    tunnel.on_udp_package_box(package_box)
151                // } else {
152                    // debug!("{} ignore udp package box from remote:{}, for syn tunnel seq timeout", self, package_box.as_ref().remote());
153                    // Err(BuckyError::new(BuckyErrorCode::Timeout, "syn tunnel timeout"))
154                // }
155            } else {
156                debug!("{} ignore udp package box from remote:{}, for first package is {:?}", self, package_box.as_ref().remote(), first_package.cmd_code());
157                //FIXME: 支持从非syn tunnel包创建
158                Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's first package shoud be SynTunnel"))
159            }
160        }
161    }
162}
163
164impl OnUdpRawData<(udp::Interface,DeviceId, MixAesKey, Endpoint)> for TunnelManager {
165    fn on_udp_raw_data(&self, data: &[u8], context: (udp::Interface, DeviceId, MixAesKey, Endpoint)) -> Result<(), BuckyError> {
166        trace!("{} on_udp_raw_data from remote {}", self, context.1);
167        if let Some(tunnel) = self.container_of(&context.1) {
168            tunnel.on_udp_raw_data(data, context)
169        } else {
170            Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's first package shoud be SynTunnel"))
171        }
172    }
173}
174
175impl OnTcpInterface for TunnelManager {
176    fn on_tcp_interface(&self, interface: tcp::AcceptInterface, first_box: PackageBox) -> Result<OnPackageResult, BuckyError> {
177        //全部转给tunnel container
178        if let Some(tunnel) = self.container_of(first_box.remote()) {
179            tunnel.on_tcp_interface(interface, first_box)
180        } else {
181            let first_package = &first_box.packages_no_exchange()[0];
182            if first_package.cmd_code() == PackageCmdCode::SynTunnel {
183                let syn_tunnel: &SynTunnel = first_package.as_ref();
184                let tunnel = self.create_container(syn_tunnel.from_device_desc.desc())?;
185                tunnel.on_tcp_interface(interface, first_box)
186            } else if first_package.cmd_code() == PackageCmdCode::TcpSynConnection {
187                let syn_tcp_stream: &TcpSynConnection = first_package.as_ref();
188                let tunnel = self.create_container(syn_tcp_stream.from_device_desc.desc())?;
189                tunnel.on_tcp_interface(interface, first_box)
190            } else {
191                Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's tcp interface's first package shoud be SynTunnel or TcpSynConnection"))
192            }
193        }
194    }
195}
196
197impl PingClientCalledEvent<PackageBox> for TunnelManager {
198    fn on_called(&self, called: &SnCalled, caller_box: PackageBox) -> Result<(), BuckyError> {
199        debug!("{} on_called from remote {} sequence {:?}", self, called.peer_info.desc().device_id(), called.seq);
200        let first_package = &caller_box.packages_no_exchange()[0];
201        if first_package.cmd_code() != PackageCmdCode::SynTunnel {
202            debug!("{} ignore udp package box from remote:{}, for first package is {:?}", self, called.peer_info.desc().device_id(), first_package.cmd_code());
203            return Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's first package shoud be SynTunnel"));
204        }
205        if let Some(tunnel) = self.container_of(caller_box.remote()) {
206            tunnel.on_called(called, caller_box)
207        } else {
208            let syn_tunnel: &SynTunnel = first_package.as_ref();
209            let tunnel = self.create_container(syn_tunnel.from_device_desc.desc())?;
210            tunnel.on_called(called, caller_box)
211        }
212    }
213}