1use log::*;
2use std::{
3 sync::RwLock
4};
5use async_std::{sync::{Arc}, task};
6use async_trait::{async_trait};
7use cyfs_base::*;
8use crate::{
9 types::*,
10 protocol::{*, v0::*},
11 tunnel::{TunnelState, ProxyType, TunnelContainer},
12 stack::{Stack, WeakStack}
13};
14use super::{
15 action::*,
16 builder::*,
17 proxy::*
18};
19
20
21struct ConnectingState {
22 waiter: StateWaiter,
23 proxy: Option<ProxyBuilder>
24}
25
26enum AcceptTunnelBuilderState {
27 Connecting(ConnectingState),
28 Establish,
29 Closed
30}
31
32struct AcceptTunnelBuilderImpl {
33 stack: WeakStack,
34 tunnel: TunnelContainer,
35 sequence: TempSeq,
36 state: RwLock<AcceptTunnelBuilderState>
37}
38
39#[derive(Clone)]
40pub struct AcceptTunnelBuilder(Arc<AcceptTunnelBuilderImpl>);
41
42impl std::fmt::Display for AcceptTunnelBuilder {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "AcceptTunnelBuilder{{tunnel:{}}}", self.0.tunnel)
45 }
46}
47
48impl AcceptTunnelBuilder {
49 pub fn new(stack: WeakStack, tunnel: TunnelContainer, sequence: TempSeq) -> Self {
50 Self(Arc::new(AcceptTunnelBuilderImpl {
51 stack,
52 tunnel,
53 sequence,
54 state: RwLock::new(AcceptTunnelBuilderState::Connecting(ConnectingState {
55 waiter: StateWaiter::new(),
56 proxy: None
57 }))
58 }))
59 }
60
61 pub async fn build(&self, caller_box: PackageBox, active_pn_list: Vec<DeviceId>) -> Result<(), BuckyError> {
62 info!("{} build", self);
63 self.sync_tunnel_state();
64 {
65 let stack = Stack::from(&self.0.stack);
66 let local = stack.sn_client().ping().default_local();
67 let syn_tunnel: &SynTunnel = caller_box.packages_no_exchange()[0].as_ref();
68 let tunnel = &self.0.tunnel;
70 let ack_tunnel = SynTunnel {
71 protocol_version: tunnel.protocol_version(),
72 stack_version: tunnel.stack_version(),
73 to_device_id: syn_tunnel.from_device_desc.desc().device_id(),
74 sequence: syn_tunnel.sequence,
75 from_device_desc: local,
76 send_time: 0
77 };
78 let mut first_box = PackageBox::encrypt_box(caller_box.remote().clone(), caller_box.key().clone());
79 first_box.append(vec![DynamicPackage::from(ack_tunnel)]);
80 let first_box = Arc::new(first_box);
81
82 info!("{} build with key {}", self, first_box.key());
83 let _ = self.explore_endpoint_pair(&syn_tunnel.from_device_desc, first_box.clone(), |_| true);
84
85 if let Some(proxy_builder) = {
86 let state = &mut *self.0.state.write().unwrap();
87 match state {
88 AcceptTunnelBuilderState::Connecting(connecting) => {
89 if connecting.proxy.is_none() {
90 connecting.proxy = Some(ProxyBuilder::new(
91 self.0.tunnel.clone(),
92 syn_tunnel.from_device_desc.get_obj_update_time(),
93 first_box.clone()));
94 debug!("{} create proxy buidler", self);
95 }
96 connecting.proxy.clone()
97 },
98 _ => None
99 }
100 } {
101 for proxy in active_pn_list {
102 let _ = proxy_builder.syn_proxy(ProxyType::Active(proxy)).await;
103 }
104 for proxy in stack.proxy_manager().passive_proxies() {
105 let _ = proxy_builder.syn_proxy(ProxyType::Passive(proxy)).await;
106 }
107 } else {
108 debug!("{} ignore proxy build for not connecting", self);
109 }
110
111 Ok(())
112 }.map_err(|e| {info!("{} ingnore build for {}", self, e);e})
113 }
114
115 fn sync_tunnel_state(&self) {
116 let builder = self.clone();
117 task::spawn(async move {
118 let tunnel_state = builder.0.tunnel.wait_active().await;
119 let waiter = match tunnel_state {
120 TunnelState::Active(_) => {
121 let state = &mut *builder.0.state.write().unwrap();
122 match state {
123 AcceptTunnelBuilderState::Connecting(connecting) => {
124 info!("{} connecting=>establish", builder);
125 let mut ret_waiter = StateWaiter::new();
126 connecting.waiter.transfer_into(&mut ret_waiter);
127 *state = AcceptTunnelBuilderState::Establish;
128 Some(ret_waiter)
129 },
130 AcceptTunnelBuilderState::Closed => {
131 None
133 },
134 AcceptTunnelBuilderState::Establish => {
135 unreachable!()
136 }
137 }
138 },
139 TunnelState::Dead => {
140 let state = &mut *builder.0.state.write().unwrap();
141 match state {
142 AcceptTunnelBuilderState::Connecting(connecting) => {
143 info!("{} connecting=>dead", builder);
144 let mut ret_waiter = StateWaiter::new();
145 connecting.waiter.transfer_into(&mut ret_waiter);
146 *state = AcceptTunnelBuilderState::Closed;
147 Some(ret_waiter)
148 },
149 AcceptTunnelBuilderState::Closed => {
150 None
152 },
153 AcceptTunnelBuilderState::Establish => {
154 None
156 }
157 }
158 },
159 TunnelState::Connecting => {
160 unreachable!()
161 }
162 };
163 if let Some(waiter) = waiter {
164 waiter.wake();
165 }
166 });
167 }
168
169
170 fn explore_endpoint_pair<F: Fn(&Endpoint) -> bool>(&self, remote: &Device, first_box: Arc<PackageBox>, filter: F) -> Vec<DynBuildTunnelAction> {
171 let stack = Stack::from(&self.0.stack);
172 let tunnel = &self.0.tunnel;
173 let net_listener = stack.net_manager().listener();
174
175 let mut actions = vec![];
176
177 let connect_info = remote.connect_info();
178 for udp_interface in net_listener.udp() {
179 for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_udp() && ep.is_same_ip_version(&udp_interface.local()) && (ep.addr().is_ipv6() || (ep.addr().is_ipv4() && filter(ep)))) {
180 if let Ok((udp_tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((udp_interface.local(), *remote_ep)), ProxyType::None) {
181 if newly_created {
182 let action = SynUdpTunnel::new(
183 udp_tunnel,
184 first_box.clone(),
185 tunnel.config().udp.holepunch_interval);
186 actions.push(Box::new(action) as DynBuildTunnelAction);
187 }
188 }
189 }
190 }
191
192 for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_tcp() && (ep.addr().is_ipv6() || (ep.addr().is_ipv4() && filter(ep)))) {
194 if let Ok((tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((Endpoint::default_tcp(remote_ep), *remote_ep)), ProxyType::None) {
195 if newly_created {
196 let action = ConnectTcpTunnel::new(tunnel);
197 actions.push(Box::new(action) as DynBuildTunnelAction);
198 }
199 }
200 }
201 actions
204 }
205}
206
207#[async_trait]
208impl TunnelBuilder for AcceptTunnelBuilder {
209 fn sequence(&self) -> TempSeq {
210 self.0.sequence
211 }
212 fn state(&self) -> TunnelBuilderState {
213 match &*self.0.state.read().unwrap() {
214 AcceptTunnelBuilderState::Connecting(_) => TunnelBuilderState::Connecting,
215 AcceptTunnelBuilderState::Establish => TunnelBuilderState::Establish,
216 AcceptTunnelBuilderState::Closed => TunnelBuilderState::Closed
217 }
218 }
219 async fn wait_establish(&self) -> Result<(), BuckyError> {
220 let (state, waiter) = {
221 let state = &mut *self.0.state.write().unwrap();
222 match state {
223 AcceptTunnelBuilderState::Connecting(connecting) => {
224 (TunnelBuilderState::Connecting, Some(connecting.waiter.new_waiter()))
225 },
226 AcceptTunnelBuilderState::Establish => {
227 (TunnelBuilderState::Establish, None)
228 },
229 AcceptTunnelBuilderState::Closed => {
230 (TunnelBuilderState::Closed, None)
231 }
232 }
233 };
234 match if let Some(waiter) = waiter {
235 StateWaiter::wait(waiter, | | self.state()).await
236 } else {
237 state
238 } {
239 TunnelBuilderState::Establish => Ok(()),
240 TunnelBuilderState::Closed => Err(BuckyError::new(BuckyErrorCode::Failed, "builder failed")),
241 TunnelBuilderState::Connecting => unreachable!()
242 }
243 }
244}
245
246impl OnPackage<AckProxy, &DeviceId> for AcceptTunnelBuilder {
247 fn on_package(&self, pkg: &AckProxy, proxy: &DeviceId) -> Result<OnPackageResult, BuckyError> {
248 if let Some(proxy_builder) = match &*self.0.state.read().unwrap() {
249 AcceptTunnelBuilderState::Connecting(connecting) => connecting.proxy.clone(),
250 _ => None
251 } {
252 proxy_builder.on_package(pkg, proxy)
253 } else {
254 let err = BuckyError::new(BuckyErrorCode::ErrorState, "proxy builder not exists");
255 debug!("{} ignore ack proxy from {} for {}", self, proxy, err);
256 Err(err)
257 }
258 }
259}
260
261
262
263