1use log::*;
2use std::{
3 time::Duration,
4 sync::RwLock,
5 sync::atomic::{AtomicI32, AtomicU64, Ordering}
6};
7use async_std::{
8 sync::{Arc},
9 future
10, task};
11use async_trait::{async_trait};
12use cyfs_base::*;
13use crate::{
14 types::*,
15 protocol::{self, *, v0::*},
16 MTU,
17 interface::{*, udp::{PackageBoxEncodeContext, OnUdpPackageBox}},
18};
19use super::{
20 tunnel::{self, DynamicTunnel, TunnelOwner, ProxyType},
21 TunnelContainer
22};
23
24struct ConnectingState {
25 container: TunnelContainer,
26 owner: Box<dyn TunnelOwner>,
27 interface: udp::Interface,
28 waiter: StateWaiter
29}
30
31struct ActiveState {
32 key: MixAesKey,
33 remote_timestamp: Timestamp,
35 container: TunnelContainer,
36 owner: Box<dyn TunnelOwner>,
37 interface: udp::Interface,
38}
39
40enum TunnelState {
41 Connecting(ConnectingState),
42 Active(ActiveState),
43 Dead
44}
45
46impl std::fmt::Display for TunnelState {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 TunnelState::Connecting(_) => write!(f, "connecting"),
50 TunnelState::Active(active_state) =>
51 write!(f, "Active:{{key:{}}}",
52 active_state.key),
53 TunnelState::Dead => write!(f, "dead")
54 }
55 }
56}
57
58impl From<&TunnelState> for tunnel::TunnelState {
59 fn from(state: &TunnelState) -> Self {
60 match state {
61 TunnelState::Connecting(_) => tunnel::TunnelState::Connecting,
62 TunnelState::Active(active_state) => tunnel::TunnelState::Active(active_state.remote_timestamp),
63 TunnelState::Dead => tunnel::TunnelState::Dead
64 }
65 }
66}
67
68#[derive(Clone)]
69pub struct Config {
70 pub holepunch_interval: Duration,
71 pub connect_timeout: Duration,
72 pub ping_interval: Duration,
73 pub ping_timeout: Duration
74}
75
76struct TunnelImpl {
77 local: Endpoint,
78 remote: Endpoint,
79 proxy: ProxyType,
80 state: RwLock<TunnelState>,
81 keeper_count: AtomicI32,
82 last_active: AtomicU64,
83 mtu: usize,
84}
85
86#[derive(Clone)]
87pub struct Tunnel(Arc<TunnelImpl>);
88
89impl std::fmt::Display for Tunnel {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 write!(f, "UdpTunnel{{local:{},remote:{}}}", tunnel::Tunnel::local(self), tunnel::Tunnel::remote(self))
92 }
93}
94
95impl Tunnel {
96 pub fn new(
97 container: TunnelContainer,
98 owner: Box<dyn TunnelOwner>,
99 interface: udp::Interface,
100 remote: Endpoint,
101 proxy: ProxyType) -> Self {
102 let local = interface.local();
103 let state = TunnelState::Connecting(ConnectingState {
104 container: container.clone(),
105 owner: owner.clone_as_tunnel_owner(),
106 interface,
107 waiter: StateWaiter::new()
108 });
109 let tunnel = Self(Arc::new(TunnelImpl {
110 mtu: MTU,
111 local,
112 remote,
113 proxy,
114 state: RwLock::new(state),
115 keeper_count: AtomicI32::new(0),
116 last_active: AtomicU64::new(0)
117 }));
118
119 {
120 let tunnel = tunnel.clone();
121 let connect_timeout = container.config().udp.connect_timeout;
122 task::spawn(async move {
123 match future::timeout(connect_timeout, tunnel.wait_active()).await {
124 Ok(_state) => {
125 },
127 Err(_err) => {
128 let waiter = {
129 let state = &mut *tunnel.0.state.write().unwrap();
130 match state {
131 TunnelState::Connecting(connecting) => {
132 let mut waiter = StateWaiter::new();
133 connecting.waiter.transfer_into(&mut waiter);
134 *state = TunnelState::Dead;
135 Some(waiter)
136 },
137 TunnelState::Active(_) => {
138 None
140 },
141 TunnelState::Dead => {
142 None
144 }
145 }
146 };
147 if let Some(waiter) = waiter {
148 info!("{} dead for connecting timeout", tunnel);
149 waiter.wake();
150 owner.sync_tunnel_state(&DynamicTunnel::new(tunnel.clone()), tunnel::TunnelState::Connecting, tunnel::TunnelState::Dead);
151 }
152 }
153 }
154 });
155 }
156
157 tunnel
158 }
159
160 pub fn try_update_key(&self, by_box: &PackageBox) -> Result<(), BuckyError> {
161 let state = &mut *self.0.state.write().unwrap();
162 match state {
163 TunnelState::Active(active_state) => {
164 if active_state.key.mix_key != by_box.key().mix_key {
165 debug!("{} update active state key from {} to {}", self, active_state.key, by_box.key());
166 active_state.key = by_box.key().clone();
167 Ok(())
168 } else {
169 Err(BuckyError::new(BuckyErrorCode::ErrorState, "same key"))
170 }
171 },
172 _ => {
173 Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
174 }
175 }
176 }
177
178 async fn wait_active(&self) -> tunnel::TunnelState {
179 let (state, opt_waiter) = {
180 let state = &mut *self.0.state.write().unwrap();
181 match state {
182 TunnelState::Connecting(ref mut connecting_state) => {
183 let waiter = connecting_state.waiter.new_waiter();
184 (tunnel::TunnelState::Connecting, Some(waiter))
185 },
186 TunnelState::Active(active_state) => {
187 (tunnel::TunnelState::Active(active_state.remote_timestamp), None)
188 },
189 TunnelState::Dead => {
190 (tunnel::TunnelState::Dead, None)
191 }
192 }
193 };
194 if let Some(waiter) = opt_waiter {
195 StateWaiter::wait(waiter, | | tunnel::Tunnel::state(self)).await
196 } else {
197 state
198 }
199 }
200
201 fn active_by_package(&self, by_box: &PackageBox, remote_timestamp: Option<Timestamp>) -> BuckyResult<TunnelContainer> {
202 self.active(by_box.key(), by_box.has_exchange(), remote_timestamp)
203 }
204
205 pub fn active(&self, key: &MixAesKey, exchange: bool, remote_timestamp: Option<Timestamp>) -> BuckyResult<TunnelContainer> {
206 let (container, to_sync, waiter) = {
207 let state = &mut *self.0.state.write().unwrap();
208 match state {
209 TunnelState::Connecting(connecting_state) => {
210 if let Some(remote_timestamp) = remote_timestamp {
211 let mut waiter = StateWaiter::new();
212 connecting_state.waiter.transfer_into(&mut waiter);
213 info!("{} change state from Connecting to Active with mix_key:{}", self, key);
214 let owner = connecting_state.owner.clone_as_tunnel_owner();
215 let container = connecting_state.container.clone();
216 *state = TunnelState::Active(ActiveState {
217 container: container.clone(),
218 owner: owner.clone_as_tunnel_owner(),
219 remote_timestamp,
220 interface: connecting_state.interface.clone(),
221 key: key.clone(),
222 });
223 Ok((container,
224 Some((tunnel::TunnelState::Connecting,
225 tunnel::TunnelState::Active(remote_timestamp),
226 owner)),
227 Some(waiter)))
228 } else {
229 Ok((connecting_state.container.clone(), None, None))
230 }
231 },
232 TunnelState::Active(active_state) => {
233 let former_state = tunnel::TunnelState::Active(active_state.remote_timestamp);
234 if let Some(remote_timestamp) = remote_timestamp {
235 if active_state.remote_timestamp < remote_timestamp {
236 debug!("{} update active remote timestamp {}", self, remote_timestamp);
237 active_state.remote_timestamp = remote_timestamp;
238 }
239 }
240 if exchange && key.mix_key != active_state.key.mix_key {
241 debug!("{} update active state key from {} to {}", self, active_state.key, key);
242 active_state.key = key.clone();
243 }
244 Ok((active_state.container.clone(),
245 Some((former_state,
246 tunnel::TunnelState::Active(active_state.remote_timestamp),
247 active_state.owner.clone_as_tunnel_owner())),
248 None))
249 },
250 TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
251 }
252 }?;
253
254 if let Some(waiter) = waiter {
255 waiter.wake();
256 }
257
258 if let Some((former_state, new_state, owner)) = to_sync {
259 self.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
260 if former_state != new_state {
261 owner.sync_tunnel_state(&tunnel::DynamicTunnel::new(self.clone()), former_state, new_state);
262 }
263 }
264
265 Ok(container)
266 }
267
268 pub fn send_box(&self, package_box: &PackageBox) -> Result<(), BuckyError> {
269 let interface = {
270 let state = &*self.0.state.read().unwrap();
271 match state {
272 TunnelState::Connecting(connecting) => Ok(connecting.interface.clone()),
273 TunnelState::Active(active) => Ok(active.interface.clone()),
274 TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel dead"))
275 }
276 }?;
277 let mut context = PackageBoxEncodeContext::default();
278 context.set_ignore_exchange(ProxyType::None != self.0.proxy);
279 interface.send_box_to(&mut context, package_box, tunnel::Tunnel::remote(self))?;
280 Ok(())
281 }
282
283 pub fn raw_data_max_len() -> usize {
284 udp::MTU
285 }
286
287 pub(super) fn raw_data_header_len_impl() -> usize {
288 KeyMixHash::raw_bytes().unwrap()
289 }
290
291 pub fn raw_data_max_payload_len() -> usize {
292 Self::raw_data_max_len() - Self::raw_data_header_len_impl()
293 }
294
295
296 pub fn owner(&self) -> Option<TunnelContainer> {
297 let state = &*self.0.state.read().unwrap();
298 match state {
299 TunnelState::Connecting(connecting) => Some(connecting.container.clone()),
300 TunnelState::Active(active) => Some(active.container.clone()),
301 TunnelState::Dead => None
302 }
303 }
304}
305
306#[async_trait]
307impl tunnel::Tunnel for Tunnel {
308 fn mtu(&self) -> usize {
309 self.0.mtu
310 }
311
312 fn ptr_eq(&self, other: &tunnel::DynamicTunnel) -> bool {
313 *self.local() == *other.as_ref().local()
314 && *self.remote() == *other.as_ref().remote()
315 && Arc::ptr_eq(&self.0, &other.clone_as_tunnel::<Tunnel>().0)
316 }
317
318 fn as_any(&self) -> &dyn std::any::Any {
319 self
320 }
321
322 fn local(&self) -> &Endpoint {
323 &self.0.local
324 }
325
326 fn remote(&self) -> &Endpoint {
327 &self.0.remote
328 }
329
330 fn proxy(&self) -> ProxyType {
331 self.0.proxy.clone()
332 }
333
334 fn state(&self) -> tunnel::TunnelState {
335 let state = &*self.0.state.read().unwrap();
336 tunnel::TunnelState::from(state)
337 }
338
339
340 fn raw_data_header_len(&self) -> usize {
341 Self::raw_data_header_len_impl()
342 }
343
344 fn send_raw_data(&self, data: &mut [u8]) -> Result<usize, BuckyError> {
345 let (key, interface) = {
346 let state = &*self.0.state.read().unwrap();
347 match state {
348 TunnelState::Connecting(_) => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel not active")),
349 TunnelState::Active(active) => Ok((active.key.clone(), active.interface.clone())),
350 TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel dead"))
351 }
352 }?;
353
354 assert_eq!(data.len() > Self::raw_data_header_len_impl(), true);
355
356 interface.send_raw_data_to(&key, data, tunnel::Tunnel::remote(self))
357 }
358
359 fn send_package(&self, package: DynamicPackage) -> Result<usize, BuckyError> {
360 let (tunnel_container, interface, key) = {
361 if let TunnelState::Active(active_state) = &*self.0.state.read().unwrap() {
362 Ok((active_state.container.clone(), active_state.interface.clone(), active_state.key.clone()))
363 } else {
364 Err(BuckyError::new(BuckyErrorCode::ErrorState, "send packages on tunnel not active"))
365 }}?;
366 trace!("{} send packages with key {}", self, key);
367 let package_box = PackageBox::from_package(tunnel_container.remote().clone(), key, package);
368 let mut context = PackageBoxEncodeContext::default();
369 context.set_ignore_exchange(ProxyType::None != self.0.proxy);
370 let sent_len = interface.send_box_to(&mut context, &package_box, tunnel::Tunnel::remote(self))?;
371 Ok(sent_len)
372 }
373
374 fn retain_keeper(&self) {
375 info!("{} retain keeper", self);
376 if 0 == self.0.keeper_count.fetch_add(1, Ordering::SeqCst) {
377 if let Some((container, owner, cur_state)) = {
378 let state = &*self.0.state.write().unwrap();
379 if let TunnelState::Active(active_state) = state {
380 Some((active_state.container.clone(),
381 active_state.owner.clone_as_tunnel_owner(),
382 tunnel::TunnelState::Active(active_state.remote_timestamp)))
383 } else {
384 None
385 }
386 } {
387 let tunnel = self.clone();
388 let ping_interval = container.config().udp.ping_interval;
389 let ping_timeout = container.config().udp.ping_timeout;
390
391 task::spawn(async move {
392 loop {
393 if tunnel.0.keeper_count.load(Ordering::SeqCst) == 0 {
394 break;
395 }
396 let now = bucky_time_now();
397 let last_active = tunnel.0.last_active.load(Ordering::SeqCst);
398 if now > last_active {
399 let miss_active_time = Duration::from_micros(now - last_active);
400 if miss_active_time > ping_timeout {
401 let state = &mut *tunnel.0.state.write().unwrap();
402 if let TunnelState::Active(_) = state {
403 info!("{} dead for ping timeout", tunnel);
404 *state = TunnelState::Dead;
405 break;
406 } else {
407 break;
408 }
409 }
410 if miss_active_time > ping_interval {
411 if tunnel.0.keeper_count.load(Ordering::SeqCst) > 0 {
412 debug!("{} send ping", tunnel);
413 let ping = PingTunnel {
414 package_id: 0,
415 send_time: now,
416 recv_data: 0,
417 };
418 let _ = tunnel::Tunnel::send_package(&tunnel, DynamicPackage::from(ping));
419 }
420 }
421 }
422
423 let _ = future::timeout(ping_interval, future::pending::<()>()).await;
424 };
425 owner.sync_tunnel_state(&tunnel::DynamicTunnel::new(tunnel.clone()), cur_state, tunnel.state());
426 });
427 } else {
428 return;
429 }
430 }
431 }
432
433 fn release_keeper(&self) {
434 info!("{} release keeper", self);
435 self.0.keeper_count.fetch_add(-1, Ordering::SeqCst);
436 }
437
438 fn reset(&self) {
439 info!("{} reset to Dead", self);
440 let mut state = self.0.state.write().unwrap();
441 *state = TunnelState::Dead;
442 }
443
444 fn mark_dead(&self, former_state: tunnel::TunnelState) {
445 let notify = match &former_state {
446 tunnel::TunnelState::Connecting => {
447 let state = &mut *self.0.state.write().unwrap();
448 match state {
449 TunnelState::Connecting(connecting) => {
450 info!("{} Connecting=>Dead", self);
451 let owner = connecting.owner.clone_as_tunnel_owner();
452 *state = TunnelState::Dead;
453 Some((owner, tunnel::TunnelState::Dead))
454 },
455 _ => {
456 None
457 }
458 }
459 },
460 tunnel::TunnelState::Active(remote_timestamp) => {
461 let remote_timestamp = *remote_timestamp;
462 let state = &mut *self.0.state.write().unwrap();
463 match state {
464 TunnelState::Active(active) => {
465 let owner = active.owner.clone_as_tunnel_owner();
466 if active.remote_timestamp == remote_timestamp {
467 info!("{} Active({})=>Dead for active by {}", self, active.remote_timestamp, remote_timestamp);
468 *state = TunnelState::Dead;
469 Some((owner, tunnel::TunnelState::Dead))
470 } else {
471 None
472 }
473 },
474 _ => {
475 None
476 }
477 }
478 },
479 tunnel::TunnelState::Dead => None
480 };
481
482 if let Some((owner, new_state)) = notify {
483 owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, new_state);
484 }
485 }
486}
487
488impl OnUdpPackageBox for Tunnel {
489 fn on_udp_package_box(&self, udp_box: udp::UdpPackageBox) -> Result<(), BuckyError> {
490 for p in udp_box.as_ref().packages_no_exchange() {
491 match downcast_tunnel_handle!(p, |p| self.on_package(p, udp_box.as_ref()))? {
492 OnPackageResult::Break => break,
493 _ => continue,
494 };
495 };
496 Ok(())
497
498 }
499}
500
501impl OnPackage<SynTunnel, &PackageBox> for Tunnel {
502 fn on_package(&self, syn_tunnel: &SynTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
503 let container = self.active_by_package(in_box, Some(syn_tunnel.from_device_desc.body().as_ref().unwrap().update_time()))?;
504 let ack = AckTunnel {
507 protocol_version: container.protocol_version(),
508 stack_version: container.stack_version(),
509 sequence: syn_tunnel.sequence,
510 result: 0,
511 send_time: 0,
512 mtu: udp::MTU as u16,
513 to_device_desc: container.stack().sn_client().ping().default_local()
514 };
515
516 let mut package_box = PackageBox::encrypt_box(
517 container.remote().clone(),
518 in_box.key().clone());
519 package_box.append(vec![DynamicPackage::from(ack)]);
520 let _ = self.send_box(&package_box);
521 container.on_package(syn_tunnel, None)
523 }
524}
525
526impl OnPackage<AckTunnel, &PackageBox> for Tunnel {
527 fn on_package(&self, pkg: &AckTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
528 let container = self.active_by_package(in_box, Some(pkg.to_device_desc.body().as_ref().unwrap().update_time()))?;
529 container.on_package(pkg, None)
531 }
532}
533
534impl OnPackage<AckAckTunnel, &PackageBox> for Tunnel {
535 fn on_package(&self, _: &AckAckTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
536 let _ = self.active_by_package(in_box, None)?;
537 Ok(OnPackageResult::Handled)
539 }
540}
541
542impl OnPackage<PingTunnel, &PackageBox> for Tunnel {
543 fn on_package(&self, ping: &PingTunnel, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
544 let _ = self.active_by_package(in_box, None)?;
545 let ping_resp = PingTunnelResp {
546 ack_package_id: ping.package_id,
547 send_time: bucky_time_now(),
548 recv_data: 0,
549 };
550 let _ = tunnel::Tunnel::send_package(self, DynamicPackage::from(ping_resp));
551 Ok(OnPackageResult::Handled)
552 }
553}
554
555impl OnPackage<PingTunnelResp, &PackageBox> for Tunnel {
556 fn on_package(&self, _: &PingTunnelResp, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
557 let _ = self.active_by_package(in_box, None)?;
558 Ok(OnPackageResult::Handled)
560 }
561}
562
563impl OnPackage<Datagram, &PackageBox> for Tunnel {
564 fn on_package(&self, pkg: &Datagram, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
565 let container = self.active_by_package(in_box, None)?;
566 container.on_package(pkg, None)
568 }
569}
570
571impl OnPackage<SessionData, &PackageBox> for Tunnel {
572 fn on_package(&self, pkg: &SessionData, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
573 let container = self.active_by_package(in_box, None)?;
574 container.on_package(pkg, None)
576 }
577}
578
579impl OnPackage<TcpSynConnection, &PackageBox> for Tunnel {
580 fn on_package(&self, pkg: &TcpSynConnection, in_box: &PackageBox) -> Result<OnPackageResult, BuckyError> {
581 let container = self.active_by_package(in_box, None)?;
582 container.on_package(pkg, None)
584 }
585}
586
587
588
589
590
591
592
593
594
595