1use log::*;
2use std::{
3 sync::RwLock,
4 time::Duration
5};
6use async_std::{
7 sync::{Arc},
8 task,
9 future
10};
11use futures::future::{Abortable, AbortHandle};
12use async_trait::{async_trait};
13use cyfs_base::*;
14use crate::{
15 types::*,
16 protocol::{*, v0::*},
17 interface::{*, udp::MTU_LARGE},
18 history::keystore,
19 sn::client::PingClientCalledEvent,
20 tunnel::{TunnelState, TunnelContainer, ProxyType, BuildTunnelParams},
21 stack::{Stack, WeakStack}
22};
23use super::super::{
24 action::*,
25 builder::*,
26 proxy::*
27};
28
29struct ConnectingState {
30 proxy: Option<ProxyBuilder>,
31 waiter: StateWaiter
32}
33
34enum ConnectTunnelBuilderState {
35 Connecting(ConnectingState),
36 Establish,
37 Closed
38}
39
40struct ConnectTunnelBuilderImpl {
41 stack: WeakStack,
42 start_at: Timestamp,
43 tunnel: TunnelContainer,
44 params: BuildTunnelParams,
45 sequence: TempSeq,
46 state: RwLock<ConnectTunnelBuilderState>
47}
48
49#[derive(Clone)]
50pub struct ConnectTunnelBuilder(Arc<ConnectTunnelBuilderImpl>);
51
52impl std::fmt::Display for ConnectTunnelBuilder {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "ConnectTunnelBuilder{{tunnel:{}}}", self.0.tunnel)
55 }
56}
57
58impl ConnectTunnelBuilder {
59 pub fn new(stack: WeakStack, tunnel: TunnelContainer, params: BuildTunnelParams) -> Self {
60 let sequence = tunnel.generate_sequence();
61 Self(Arc::new(ConnectTunnelBuilderImpl {
62 stack,
63 start_at: bucky_time_now(),
64 tunnel,
65 params,
66 sequence,
67 state: RwLock::new(ConnectTunnelBuilderState::Connecting(ConnectingState {
68 proxy: None,
69 waiter:StateWaiter::new()
70 }))
71 }))
72 }
73
74 fn escaped(&self) -> Duration {
75 let now = bucky_time_now();
76 if now > self.0.start_at {
77 Duration::from_micros(now - self.0.start_at)
78 } else {
79 Duration::from_micros(0)
80 }
81 }
82
83 async fn build_inner(&self) -> BuckyResult<()> {
84 let stack = Stack::from(&self.0.stack);
85 let local = stack.sn_client().ping().default_local();
86 let build_params = &self.0.params;
87
88 let first_box = Arc::new(self.first_box(&local).await);
89
90 info!("{} build with key {}", self, first_box.key());
91 let remote_id = build_params.remote_const.device_id();
92 let cached_remote = stack.device_cache().get_inner(&remote_id);
93 let known_remote = cached_remote.as_ref().or_else(|| build_params.remote_desc.as_ref());
94
95 let actions = if let Some(remote) = known_remote {
96 info!("{} explore_endpoint_pair with known remote {:?}", self, remote.connect_info().endpoints());
97 self.explore_endpoint_pair(remote, first_box.clone(), |ep| ep.is_static_wan())
98 } else {
99 vec![]
100 };
101
102 if actions.len() == 0 {
103 let nearest_sn = build_params.nearest_sn(&stack);
104 if let Some(sn) = nearest_sn {
105 info!("{} call nearest sn, sn={}", self, sn);
106 let timeout_ret = future::timeout(stack.config().tunnel.retry_sn_timeout, self.call_sn(vec![sn.clone()], first_box.clone())).await;
107 let retry_sn_list = match timeout_ret {
108 Ok(finish_ret) => {
109 match finish_ret {
110 Ok(_) => {
111 info!("{} call nearest sn finished, sn={}", self, sn);
112 if TunnelBuilderState::Establish != self.state() {
113 let escaped = self.escaped();
114 if stack.config().tunnel.retry_sn_timeout < escaped {
115 Some(Duration::from_secs(0))
116 } else {
117 Some(stack.config().tunnel.retry_sn_timeout - escaped)
118 }
119 } else {
120 None
121 }
122 },
123 Err(err) => {
124 if err.code() == BuckyErrorCode::Interrupted {
125 info!("{} call nearest sn canceled, sn={}", self, sn);
126 None
127 } else {
128 error!("{} call nearest sn failed, sn={}, err={}", self, sn, err);
129 Some(Duration::from_secs(0))
130 }
131 }
132 }
133 },
134 Err(_) => {
135 warn!("{} call nearest sn timeout {}", self, sn);
136 Some(Duration::from_secs(0))
137 }
138 };
139 if let Some(delay) = retry_sn_list {
140 if future::timeout(delay, self.wait_establish()).await.is_err() {
141 if let Some(sn_list) = build_params.retry_sn_list(&stack, &sn) {
142 info!("{} retry sn list call, sn={:?}", self, sn_list);
143 let _ = self.call_sn(sn_list, first_box).await;
144 }
145 }
146 }
147 } else if let Some(remote) = known_remote {
148 info!("{} explore_endpoint_pair with known remote {:?} again", self, remote.connect_info().endpoints());
149 let _ = self.explore_endpoint_pair(remote, first_box.clone(), |_| true);
150 } else {
151 warn!("{} no sn and unkown remote", self);
152 }
153 }
154
155 Ok(())
156 }
157
158 pub async fn build(&self) {
159 self.sync_tunnel_state();
160 let _ = self.build_inner().await.
161 map_err(|err| {
162 error!("{} build failed for {}", self, err);
163 let waiter = {
164 let state = &mut *self.0.state.write().unwrap();
165 match state {
166 ConnectTunnelBuilderState::Connecting(connecting) => {
167 info!("{} connecting=>dead", self);
168 let mut ret_waiter = StateWaiter::new();
169 connecting.waiter.transfer_into(&mut ret_waiter);
170 *state = ConnectTunnelBuilderState::Closed;
171 Some(ret_waiter)
172 },
173 ConnectTunnelBuilderState::Closed => {
174 None
176 },
177 ConnectTunnelBuilderState::Establish => {
178 None
180 }
181 }
182 };
183 if let Some(waiter) = waiter {
184 waiter.wake();
185 }
186 });
187 }
188
189 fn sync_tunnel_state(&self) {
190 let builder = self.clone();
191 task::spawn(async move {
192 let tunnel_state = builder.0.tunnel.wait_active().await;
193 let waiter = match tunnel_state {
194 TunnelState::Active(_) => {
195 let state = &mut *builder.0.state.write().unwrap();
196 match state {
197 ConnectTunnelBuilderState::Connecting(connecting) => {
198 info!("{} connecting=>establish", builder);
199 let mut ret_waiter = StateWaiter::new();
200 connecting.waiter.transfer_into(&mut ret_waiter);
201 *state = ConnectTunnelBuilderState::Establish;
202 Some(ret_waiter)
203 },
204 ConnectTunnelBuilderState::Closed => {
205 None
207 },
208 ConnectTunnelBuilderState::Establish => {
209 unreachable!()
210 }
211 }
212 },
213 TunnelState::Dead | TunnelState::Connecting => {
214 let state = &mut *builder.0.state.write().unwrap();
215 match state {
216 ConnectTunnelBuilderState::Connecting(connecting) => {
217 info!("{} connecting=>dead", builder);
218 let mut ret_waiter = StateWaiter::new();
219 connecting.waiter.transfer_into(&mut ret_waiter);
220 *state = ConnectTunnelBuilderState::Closed;
221 Some(ret_waiter)
222 },
223 ConnectTunnelBuilderState::Closed => {
224 None
226 },
227 ConnectTunnelBuilderState::Establish => {
228 None
230 }
231 }
232 },
233 };
234 if let Some(waiter) = waiter {
235 waiter.wake();
236 }
237 });
238 }
239
240 async fn call_sn(&self, sn_list: Vec<DeviceId>, first_box: Arc<PackageBox>) -> BuckyResult<()> {
241 let (cancel, reg) = AbortHandle::new_pair();
242
243 let builder = self.clone();
244 task::spawn(async move {
245 let _ = builder.wait_establish().await;
246 cancel.abort();
247 });
248
249 let (sender, receiver) = async_std::channel::bounded::<BuckyResult<()>>(1);
250 let builder = self.clone();
251 task::spawn(async move {
252 let result = Abortable::new(builder.call_sn_inner(sn_list.clone(), first_box), reg).await;
253 let result = match result {
254 Ok(result) => result,
255 Err(_) => {
256 info!("{} call sn interrupted, sn={:?}", builder, sn_list);
257 Err(BuckyError::new(BuckyErrorCode::Interrupted, "canceled"))
258 }
259 };
260 let _ = sender.try_send(result);
261 });
262
263 receiver.recv().await.unwrap()
264 }
265
266 async fn call_sn_inner(&self, sn_list: Vec<DeviceId>, first_box: Arc<PackageBox>) -> BuckyResult<()> {
267 let stack = Stack::from(&self.0.stack);
268 let tunnel = &self.0.tunnel;
269 let call_session = stack.sn_client().call().call(
270 None,
271 tunnel.remote(),
272 &sn_list,
273 |sn_call| {
274 let mut context = udp::PackageBoxEncodeContext::from(sn_call);
275 let mut buf = vec![0u8; MTU_LARGE];
278 let b = first_box.raw_encode_with_context(&mut buf, &mut context, &None).unwrap();
279 let len = MTU_LARGE - b.len();
281 buf.truncate(len);
282 info!("{} encode first box to sn call, len: {}, package_box {:?}", self, len, first_box);
283 buf
284 }).await.map_err(|err| {
285 error!("{} call sn failed, sn={:?}, err={}", self, sn_list, err);
286 err
287 })?;
288
289 let mut success = false;
290 loop {
291 if let Some(session) = call_session.next().await
292 .map_err(|err| {error!("{} call sn failed, sn={:?}, err={}", self, sn_list, err); err})
293 .ok().and_then(|opt| opt) {
294 match session.result().unwrap() {
295 Ok(remote) => {
296 if let Some(proxy_buidler) = {
297 info!("{} call sn session responsed, sn={:?}, endpoints={:?}", self, session.sn(), remote.connect_info().endpoints());
298 let state = &mut *self.0.state.write().unwrap();
299 match state {
300 ConnectTunnelBuilderState::Connecting(connecting) => {
301 if connecting.proxy.is_none() {
302 let proxy = ProxyBuilder::new(
303 tunnel.clone(),
304 remote.get_obj_update_time(),
305 first_box.clone());
306 debug!("{} create proxy builder", self);
307 connecting.proxy = Some(proxy);
308 }
309 connecting.proxy.clone()
310 },
311 _ => {
312 debug!("{} ignore proxy builder for not in connecting state", self);
313 None
314 }
315 }
316 } {
317 for proxy in stack.proxy_manager().active_proxies() {
319 let _ = proxy_buidler.syn_proxy(ProxyType::Active(proxy)).await;
320 }
321 for proxy in remote.connect_info().passive_pn_list().iter().cloned() {
322 let _ = proxy_buidler.syn_proxy(ProxyType::Passive(proxy)).await;
323 }
324 }
325
326 success = true;
327 let _ = self.explore_endpoint_pair(&remote, first_box.clone(), |_| true);
328 },
329 Err(err) => {
330 error!("{} call sn session failed, sn={:?}, err={}", self, session.sn(), err);
331 }
332 }
333 } else {
334 break;
335 }
336 }
337
338 if success {
339 Ok(())
340 } else {
341 error!("{} call sn session failed, sn={:?}", self, sn_list);
342 Err(BuckyError::new(BuckyErrorCode::Failed, "all failed"))
343 }
344 }
345
346 fn explore_endpoint_pair<F: Fn(&Endpoint) -> bool>(&self, remote: &Device, first_box: Arc<PackageBox>, filter: F) -> Vec<DynBuildTunnelAction> {
347 let stack = Stack::from(&self.0.stack);
348 let tunnel = &self.0.tunnel;
349 let net_listener = stack.net_manager().listener();
350
351 let mut actions = vec![];
352
353 let connect_info = remote.connect_info();
354
355 for udp_interface in net_listener.udp().iter().filter(|ui| ui.local().addr().is_ipv4()) {
357 for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_udp() && ep.is_same_ip_version(&udp_interface.local()) && filter(ep)) {
358 if let Ok((udp_tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((udp_interface.local(), *remote_ep)), ProxyType::None) {
359 if newly_created {
360 let action = SynUdpTunnel::new(
361 udp_tunnel,
362 first_box.clone(),
363 tunnel.config().udp.holepunch_interval);
364 actions.push(Box::new(action) as DynBuildTunnelAction);
365 }
366 }
367 }
368 }
369
370 for remote_ep in connect_info.endpoints().iter().filter(|ep| ep.is_tcp() && filter(ep)) {
372 if let Ok((tunnel, newly_created)) = tunnel.create_tunnel(EndpointPair::from((Endpoint::default_tcp(remote_ep), *remote_ep)), ProxyType::None) {
373 if newly_created {
374 let action = ConnectTcpTunnel::new(tunnel);
375 actions.push(Box::new(action) as DynBuildTunnelAction);
376 }
377 }
378 }
379 actions
382 }
383
384 async fn first_box(&self, local: &Device) -> PackageBox {
386 let stack = Stack::from(&self.0.stack);
387 let tunnel = &self.0.tunnel;
388
389 let key_stub = stack.keystore().create_key(tunnel.remote_const(), true);
390 let mut first_box = PackageBox::encrypt_box(tunnel.remote().clone(), key_stub.key.clone());
392
393 let syn_tunnel = SynTunnel {
394 protocol_version: self.0.tunnel.protocol_version(),
395 stack_version: self.0.tunnel.stack_version(),
396 to_device_id: tunnel.remote().clone(),
397 from_device_desc: local.clone(),
398 sequence: self.sequence(),
399 send_time: bucky_time_now()
400 };
401 if let keystore::EncryptedKey::Unconfirmed(key_encrypted) = key_stub.encrypted {
402 let mut exchange = Exchange::from((&syn_tunnel, key_encrypted, key_stub.key.mix_key));
403 let _ = exchange.sign(stack.keystore().signer()).await;
404 first_box.push(exchange);
405 }
406 first_box.push(syn_tunnel);
407 first_box
408 }
409}
410
411#[async_trait]
412impl TunnelBuilder for ConnectTunnelBuilder {
413 fn sequence(&self) -> TempSeq {
414 self.0.sequence
415 }
416 fn state(&self) -> TunnelBuilderState {
417 match &*self.0.state.read().unwrap() {
418 ConnectTunnelBuilderState::Connecting(_) => TunnelBuilderState::Connecting,
419 ConnectTunnelBuilderState::Establish => TunnelBuilderState::Establish,
420 ConnectTunnelBuilderState::Closed => TunnelBuilderState::Closed
421 }
422 }
423 async fn wait_establish(&self) -> Result<(), BuckyError> {
424 let (state, waiter) = {
425 let state = &mut *self.0.state.write().unwrap();
426 match state {
427 ConnectTunnelBuilderState::Connecting(connecting) => {
428 (TunnelBuilderState::Connecting, Some(connecting.waiter.new_waiter()))
429 },
430 ConnectTunnelBuilderState::Establish => {
431 (TunnelBuilderState::Establish, None)
432 },
433 ConnectTunnelBuilderState::Closed => {
434 (TunnelBuilderState::Closed, None)
435 }
436 }
437 };
438 match if let Some(waiter) = waiter {
439 StateWaiter::wait(waiter, | | self.state()).await
440 } else {
441 state
442 } {
443 TunnelBuilderState::Establish => Ok(()),
444 TunnelBuilderState::Closed => Err(BuckyError::new(BuckyErrorCode::Failed, "builder failed")),
445 TunnelBuilderState::Connecting => unreachable!()
446 }
447 }
448}
449
450impl PingClientCalledEvent for ConnectTunnelBuilder {
451 fn on_called(&self, called: &SnCalled, _context: ()) -> Result<(), BuckyError> {
452 let builder = self.clone();
453 let active_pn_list = called.active_pn_list.clone();
454 let remote_timestamp = called.peer_info.get_obj_update_time();
455 task::spawn(async move {
456 let stack = Stack::from(&builder.0.stack);
457 let first_box = builder.first_box(&stack.sn_client().ping().default_local()).await;
458 if let Some(proxy_builder) = {
459 let state = &mut *builder.0.state.write().unwrap();
460 match state {
461 ConnectTunnelBuilderState::Connecting(connecting) => {
462 if connecting.proxy.is_none() {
463 let proxy = ProxyBuilder::new(
464 builder.0.tunnel.clone(),
465 remote_timestamp,
466 Arc::new(first_box));
467 debug!("{} create proxy builder", builder);
468 connecting.proxy = Some(proxy);
469 }
470 connecting.proxy.clone()
471 },
472 _ => None
473 }
474 } {
475 for proxy in active_pn_list {
477 let _ = proxy_builder.syn_proxy(ProxyType::Active(proxy));
478 }
479 }
480 });
481
482 Ok(())
483 }
484}
485
486
487impl OnPackage<AckProxy, &DeviceId> for ConnectTunnelBuilder {
488 fn on_package(&self, pkg: &AckProxy, proxy: &DeviceId) -> Result<OnPackageResult, BuckyError> {
489 if let Some(proxy_builder) = match &*self.0.state.read().unwrap() {
490 ConnectTunnelBuilderState::Connecting(connecting) => {
491 connecting.proxy.clone()
492 },
493 _ => {
494 None
495 }
496 } {
497 proxy_builder.on_package(pkg, proxy)
498 } else {
499 let err = BuckyError::new(BuckyErrorCode::ErrorState, "proxy builder not exists");
500 debug!("{} ignore ack proxy from {} for {}", self, proxy, err);
501 Err(err)
502 }
503 }
504}
505
506
507
508