1use log::*;
2use std::{
3 fmt,
4 time::Duration,
5 collections::{BTreeMap, LinkedList},
6 sync::{Arc, RwLock}
7};
8use async_std::{
9 future,
10 task
11};
12use cyfs_base::*;
13use crate::{
14 types::*,
15 protocol::{*, v0::*},
16 interface::{*, udp::{self, OnUdpPackageBox, OnUdpRawData}, tcp::{OnTcpInterface}},
17 sn::client::PingClientCalledEvent,
18 stack::{Stack, WeakStack}
19};
20use super::container::{TunnelGuard, TunnelContainer, Config};
21
22struct TunnelKeeper {
23 reserving: Option<Timestamp>,
24 tunnel: TunnelGuard
25}
26
27impl TunnelKeeper {
28 fn get(&self) -> TunnelGuard {
29 self.tunnel.clone()
30 }
31
32 fn check(&mut self, when: Timestamp) -> bool {
33 if self.tunnel.ref_count() > 1 {
34 self.reserving = None;
35 true
36 } else if let Some(expire_at) = self.reserving {
37 if when > expire_at {
38 info!("{} expired at {}", &*self.tunnel, expire_at);
39 false
40 } else {
41 true
42 }
43 } else {
44 self.reserving = Some(when + self.tunnel.config().retain_timeout.as_micros() as u64);
45 true
46 }
47 }
48}
49
50struct TunnelManagerImpl {
51 stack: WeakStack,
52 entries: RwLock<BTreeMap<DeviceId, TunnelKeeper>>
53}
54
55#[derive(Clone)]
56pub struct TunnelManager(Arc<TunnelManagerImpl>);
57
58impl TunnelManager {
59 pub fn new(stack: WeakStack) -> Self {
60 let manager = Self(Arc::new(TunnelManagerImpl {
61 stack,
62 entries: RwLock::new(BTreeMap::new())
63 }));
64
65 {
66 let manager = manager.clone();
67 task::spawn(async move {
68 loop {
69 manager.check_recycle(bucky_time_now());
70 let _ = future::timeout(Duration::from_secs(1), future::pending::<()>()).await;
71 }
72 });
73 }
74
75 manager
76 }
77
78 fn check_recycle(&self, when: Timestamp) {
79 let mut entries = self.0.entries.write().unwrap();
80 let mut remove = LinkedList::new();
81
82 for (remote, keeper) in entries.iter_mut() {
83 if !keeper.check(when) {
84 remove.push_back(remote.clone());
85 }
86 }
87
88 for remote in remove {
89 info!("{} will remove tunnel for not used, channel={}", self, remote);
90 entries.remove(&remote);
91 }
92 }
93
94 fn config_for(&self, _remote_const: &DeviceDesc) -> Config {
95 let stack = Stack::from(&self.0.stack);
97 stack.config().tunnel.clone()
98 }
99
100 pub(crate) fn create_container(&self, remote_const: &DeviceDesc) -> Result<TunnelGuard, BuckyError> {
101 let remote = remote_const.device_id();
102 debug!("{} create new tunnel container of remote {}", self, remote);
103 let mut entries = self.0.entries.write().unwrap();
104 if let Some(tunnel) = entries.get(&remote) {
105 Ok(tunnel.get())
106 } else {
107 let tunnel = TunnelGuard::new(TunnelContainer::new(self.0.stack.clone(), remote_const.clone(), self.config_for(remote_const)));
108 entries.insert(remote, TunnelKeeper { reserving: None, tunnel: tunnel.clone() });
109 Ok(tunnel)
110 }
111 }
112
113 pub(crate) fn container_of(&self, remote: &DeviceId) -> Option<TunnelGuard> {
114 let entries = self.0.entries.read().unwrap();
115 entries.get(&remote).map(|tunnel| {
116 tunnel.get()
117 })
118 }
119
120 pub fn reset(&self) {
121 let entries = self.0.entries.read().unwrap();
122 for (_, tunnel) in entries.iter() {
123 tunnel.get().reset();
124 }
125 }
126
127 pub(crate) fn on_statistic(&self) -> String {
128 let tunnel_count = self.0.entries.read().unwrap().len();
129 format!("TunnelCount: {}", tunnel_count)
130 }
131}
132
133impl fmt::Display for TunnelManager {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 write!(f, "TunnelManager{{local:{}}}", Stack::from(&self.0.stack).local_device_id())
136 }
137}
138
139impl OnUdpPackageBox for TunnelManager {
140 fn on_udp_package_box(&self, package_box: udp::UdpPackageBox) -> Result<(), BuckyError> {
141 trace!("{} on_udp_package_box from remote {}", self, package_box.as_ref().remote());
142 if let Some(tunnel) = self.container_of(package_box.as_ref().remote()) {
143 tunnel.on_udp_package_box(package_box)
144 } else {
145 let first_package = &package_box.as_ref().packages_no_exchange()[0];
146 if first_package.cmd_code() == PackageCmdCode::SynTunnel {
147 let syn_tunnel: &SynTunnel = first_package.as_ref();
148 let tunnel = self.create_container(syn_tunnel.from_device_desc.desc())?;
150 tunnel.on_udp_package_box(package_box)
151 } else {
156 debug!("{} ignore udp package box from remote:{}, for first package is {:?}", self, package_box.as_ref().remote(), first_package.cmd_code());
157 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's first package shoud be SynTunnel"))
159 }
160 }
161 }
162}
163
164impl OnUdpRawData<(udp::Interface,DeviceId, MixAesKey, Endpoint)> for TunnelManager {
165 fn on_udp_raw_data(&self, data: &[u8], context: (udp::Interface, DeviceId, MixAesKey, Endpoint)) -> Result<(), BuckyError> {
166 trace!("{} on_udp_raw_data from remote {}", self, context.1);
167 if let Some(tunnel) = self.container_of(&context.1) {
168 tunnel.on_udp_raw_data(data, context)
169 } else {
170 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's first package shoud be SynTunnel"))
171 }
172 }
173}
174
175impl OnTcpInterface for TunnelManager {
176 fn on_tcp_interface(&self, interface: tcp::AcceptInterface, first_box: PackageBox) -> Result<OnPackageResult, BuckyError> {
177 if let Some(tunnel) = self.container_of(first_box.remote()) {
179 tunnel.on_tcp_interface(interface, first_box)
180 } else {
181 let first_package = &first_box.packages_no_exchange()[0];
182 if first_package.cmd_code() == PackageCmdCode::SynTunnel {
183 let syn_tunnel: &SynTunnel = first_package.as_ref();
184 let tunnel = self.create_container(syn_tunnel.from_device_desc.desc())?;
185 tunnel.on_tcp_interface(interface, first_box)
186 } else if first_package.cmd_code() == PackageCmdCode::TcpSynConnection {
187 let syn_tcp_stream: &TcpSynConnection = first_package.as_ref();
188 let tunnel = self.create_container(syn_tcp_stream.from_device_desc.desc())?;
189 tunnel.on_tcp_interface(interface, first_box)
190 } else {
191 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's tcp interface's first package shoud be SynTunnel or TcpSynConnection"))
192 }
193 }
194 }
195}
196
197impl PingClientCalledEvent<PackageBox> for TunnelManager {
198 fn on_called(&self, called: &SnCalled, caller_box: PackageBox) -> Result<(), BuckyError> {
199 debug!("{} on_called from remote {} sequence {:?}", self, called.peer_info.desc().device_id(), called.seq);
200 let first_package = &caller_box.packages_no_exchange()[0];
201 if first_package.cmd_code() != PackageCmdCode::SynTunnel {
202 debug!("{} ignore udp package box from remote:{}, for first package is {:?}", self, called.peer_info.desc().device_id(), first_package.cmd_code());
203 return Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tunnel's first package shoud be SynTunnel"));
204 }
205 if let Some(tunnel) = self.container_of(caller_box.remote()) {
206 tunnel.on_called(called, caller_box)
207 } else {
208 let syn_tunnel: &SynTunnel = first_package.as_ref();
209 let tunnel = self.create_container(syn_tunnel.from_device_desc.desc())?;
210 tunnel.on_called(called, caller_box)
211 }
212 }
213}