1use log::*;
2use async_std::{
3 future,
4 task,
5};
6use std::{
7 time::Duration,
8 fmt,
9 sync::{RwLock, Arc, Weak},
10 collections::{BTreeMap, LinkedList},
11 ops::Deref,
12 convert::TryFrom
13};
14use cyfs_base::*;
15use crate::{
16 types::*,
17 protocol::{*, v0::*},
18 interface::{
19 self,
20 udp::{
21 OnUdpPackageBox,
22 OnUdpRawData
23 },
24 tcp::{
25 OnTcpInterface
26 }
27 },
28 sn::client::{SnCache, PingClientCalledEvent},
29 stream::{StreamContainer, RemoteSequence},
30 stack::{Stack, WeakStack},
31 MTU
32};
33use super::{
34 tunnel::*,
35 builder::*,
36 udp,
37 tcp
38};
39use core::mem;
40
41#[derive(Clone)]
42pub struct BuildTunnelParams {
43 pub remote_const: DeviceDesc,
44 pub remote_sn: Option<Vec<DeviceId>>,
45 pub remote_desc: Option<Device>,
46}
47
48impl fmt::Display for BuildTunnelParams {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 write!(f, "BuildTunnelParams{{remote_sn: {:?}, remote_desc:{}}}", self.remote_sn, self.remote_desc.is_some())
51 }
52}
53
54impl BuildTunnelParams {
55 pub(crate) fn nearest_sn(&self, stack: &Stack) -> Option<DeviceId> {
56 let remote = self.remote_const.device_id();
57
58 let cached_remote = stack.device_cache().get_inner(&remote);
59 let known_remote = cached_remote.as_ref().or_else(|| self.remote_desc.as_ref());
60
61 known_remote.and_then(|device| SnCache::nearest_sn_of(&remote, device.connect_info().sn_list()))
62 .or_else(|| self.remote_sn.as_ref().and_then(|sn_list| SnCache::nearest_sn_of(&remote, sn_list)))
63 .or_else(|| SnCache::nearest_sn_of(&remote, stack.sn_client().cache().known_list().as_slice()))
64 }
65
66 pub(crate) fn retry_sn_list(&self, stack: &Stack, nearest: &DeviceId) -> Option<Vec<DeviceId>> {
67 self.remote_sn.clone().or_else(|| Some(stack.sn_client().cache().known_list()))
68 .map(|sn_list| sn_list.into_iter().filter(|sn| sn != nearest).collect())
69
70 }
71}
72
73#[derive(Clone)]
74pub struct Config {
75 pub retain_timeout: Duration,
76 pub retry_sn_timeout: Duration,
77 pub connect_timeout: Duration,
78 pub tcp: tcp::Config,
79 pub udp: udp::Config
80}
81
82enum TunnelBuildState {
83 Idle,
84 ConnectStream(ConnectStreamBuilder),
85 AcceptStream(AcceptStreamBuilder),
86 ConnectTunnel(ConnectTunnelBuilder),
87 AcceptTunnel(AcceptTunnelBuilder)
88}
89
90pub enum StreamConnectorSelector {
91 Package(Timestamp),
92 Tcp(tcp::Tunnel, Timestamp),
93 Builder(ConnectStreamBuilder)
94}
95
96struct TunnelDeadState {
97 former_state: TunnelState,
99 when: Timestamp
101}
102
103struct TunnelConnectingState {
104 waiter: StateWaiter,
105 build_state: TunnelBuildState,
106 packages: LinkedList<(DynamicPackage, bool)>
107}
108
109struct TunnelActiveState {
110 remote_timestamp: Timestamp,
111 default_tunnel: DynamicTunnel
112}
113
114enum TunnelStateImpl {
116 Connecting(TunnelConnectingState),
117 Active(TunnelActiveState),
118 Dead(TunnelDeadState)
119}
120
121struct TunnelContainerState {
122 last_update: Timestamp,
123 tunnel_state: TunnelStateImpl,
124 tunnel_entries: BTreeMap<EndpointPair, DynamicTunnel>
125}
126
127struct TunnelContainerImpl {
128 stack: WeakStack,
129 config: Config,
130 remote: DeviceId,
131 remote_const: DeviceDesc,
132 sequence_generator: TempSeqGenerator,
133 state: RwLock<TunnelContainerState>,
134}
135
136#[derive(Clone)]
137pub struct TunnelContainer(Arc<TunnelContainerImpl>);
138
139impl TunnelContainer {
140 pub(super) fn new(stack: WeakStack, remote_const: DeviceDesc, config: Config) -> Self {
141 Self(Arc::new(TunnelContainerImpl {
142 stack,
143 config,
144 remote: remote_const.device_id(),
145 remote_const,
146 sequence_generator: TempSeqGenerator::new(),
147 state: RwLock::new(TunnelContainerState {
148 tunnel_entries: BTreeMap::new(),
149 last_update: bucky_time_now(),
150 tunnel_state: TunnelStateImpl::Connecting(TunnelConnectingState {
151 waiter: StateWaiter::new(),
152 build_state: TunnelBuildState::Idle,
153 packages: LinkedList::new()
154 })
155 }),
156 }))
157 }
158
159 pub fn mtu(&self) -> usize {
160 if let Ok(tunnel) = self.default_tunnel() {
161 tunnel.mtu()
162 } else {
163 MTU-12
164 }
165 }
166
167 fn sync_connecting(&self) {
168 let connect_timeout = self.config().connect_timeout;
169 let tunnel = self.clone();
170 task::spawn(async move {
171 match future::timeout(connect_timeout, tunnel.wait_active()).await {
172 Ok(_state) => {
173 },
175 Err(_err) => {
176 let waiter = {
177 let state = &mut *tunnel.0.state.write().unwrap();
178 match &mut state.tunnel_state {
179 TunnelStateImpl::Connecting(connecting) => {
180 let mut ret_waiter = StateWaiter::new();
181 connecting.waiter.transfer_into(&mut ret_waiter);
182 state.last_update = bucky_time_now();
183 state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
184 former_state: TunnelState::Connecting,
185 when: bucky_time_now()
186 });
187 state.tunnel_entries.clear();
188 Some(ret_waiter)
189 },
190 _ => {
191 None
192 }
193 }
194 };
195 if let Some(waiter) = waiter {
196 info!("{} dead for connect timeout", tunnel);
197 waiter.wake();
198 }
199 }
200 }
201 });
202 }
203
204 pub fn config(&self) -> &Config {
205 &self.0.config
206 }
207
208 pub fn stack(&self) -> Stack {
209 Stack::from(&self.0.stack)
210 }
211
212 pub fn remote(&self) -> &DeviceId {
213 &self.0.remote
214 }
215
216 pub fn remote_const(&self) -> &DeviceDesc {
217 &self.0.remote_const
218 }
219
220 pub fn protocol_version(&self) -> u8 {
221 0
222 }
223
224 pub fn stack_version(&self) -> u32 {
225 0
226 }
227
228 pub fn default_tunnel(&self) -> BuckyResult<DynamicTunnel> {
229 let state = self.0.state.read().unwrap();
230 match &state.tunnel_state {
231 TunnelStateImpl::Active(active) => {
232 Ok(active.default_tunnel.clone())
233 },
234 TunnelStateImpl::Connecting(_) => {
235 let entries = &state.tunnel_entries;
236 let mut iter = entries.iter();
237 loop {
238 match iter.next() {
239 Some((ep_pair, tunnel)) => {
240 if let TunnelState::Active(_) = tunnel.as_ref().state() {
241 if ep_pair.protocol() == Protocol::Udp {
242 break Some(tunnel.clone());
243 }
244 }
245 },
246 None => break None
247 }
248 }.ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "no default tunnel"))
249 },
250 TunnelStateImpl::Dead(_) => {
251 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
252 }
253 }
254 }
255
256 pub fn default_udp_tunnel(&self) -> BuckyResult<udp::Tunnel> {
257 let tunnel = self.default_tunnel()?;
258 if tunnel.as_ref().local().is_udp() {
259 Ok(tunnel.clone_as_tunnel())
260 } else {
261 Err(BuckyError::new(BuckyErrorCode::NotMatch, "default tunnel not udp"))
262 }
263 }
264
265 pub fn send_packages(&self, packages: Vec<DynamicPackage>) -> Result<(), BuckyError> {
266 let tunnel = self.default_tunnel()?;
267 for package in packages {
268 tunnel.as_ref().send_package(package)?;
269 }
270 Ok(())
271 }
272
273 pub fn send_package(&self, package: DynamicPackage, plaintext: bool) -> BuckyResult<()> {
274 if plaintext {
275 assert_eq!(package.cmd_code(), PackageCmdCode::Datagram);
276 let tunnel = self.default_tunnel()?;
277
278 let mut buf = vec![0u8; MTU];
279
280 let buf_len = buf.len();
281 let enc_from = tunnel.as_ref().raw_data_header_len();
282
283 let mut context = merge_context::FirstEncode::new();
284 let enc: &dyn RawEncodeWithContext<merge_context::FirstEncode> = package.as_ref();
285 let buf_ptr = enc.raw_encode_with_context(&mut buf[enc_from..], &mut context, &None)?;
286
287 let len = buf_len - buf_ptr.len();
288
289 let _ = tunnel.as_ref().send_raw_data(&mut buf[..len])?;
290 Ok(())
291 } else {
292 let tunnel = self.default_tunnel()?;
293 let _ = tunnel.as_ref().send_package(package)?;
294 Ok(())
295 }
296 }
297
298 pub fn build_send(&self, package: DynamicPackage, build_params: BuildTunnelParams, plaintext: bool) -> BuckyResult<()> {
299 let (tunnel_and_package, builder) = {
300 let mut state = self.0.state.write().unwrap();
301 match &mut state.tunnel_state {
302 TunnelStateImpl::Active(active) => {
303 (Some((active.default_tunnel.clone(), package)), None)
304 },
305 TunnelStateImpl::Connecting(connecting) => {
306 connecting.packages.push_back((package, plaintext));
307 (None, match connecting.build_state {
308 TunnelBuildState::Idle => {
309 let builder = ConnectTunnelBuilder::new(self.0.stack.clone(), self.clone(), build_params);
311 connecting.build_state = TunnelBuildState::ConnectTunnel(builder.clone());
312 Some(builder)
313 },
314 _ => {
315 None
317 }
318 })
319 },
320 TunnelStateImpl::Dead(_) => {
321 let builder = ConnectTunnelBuilder::new(self.0.stack.clone(), self.clone(), build_params);
322 state.last_update = bucky_time_now();
323 let mut packages = LinkedList::new();
324 packages.push_back((package, plaintext));
325 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
326 waiter: StateWaiter::new(),
327 build_state: TunnelBuildState::ConnectTunnel(builder.clone()),
328 packages
329 });
330 (None, Some(builder))
331 }
332 }
333 };
334
335 if let Some((tunnel, package)) = tunnel_and_package {
336 trace!("{} send packages from {}", self, tunnel.as_ref().as_ref());
337 self.send_package(package, plaintext)
338 } else if let Some(builder) = builder {
339 self.stack().keystore().reset_peer(self.remote());
341 self.stack().device_cache().remove_inner(self.remote());
342 self.sync_connecting();
343 task::spawn(async move {
344 builder.build().await;
345 });
346 Ok(())
347 } else {
348 Ok(())
349 }
350 }
351
352 pub fn state(&self) -> TunnelState {
353 match &self.0.state.read().unwrap().tunnel_state {
354 TunnelStateImpl::Connecting(_) => TunnelState::Connecting,
355 TunnelStateImpl::Active(active) => TunnelState::Active(active.remote_timestamp),
356 TunnelStateImpl::Dead(_) => TunnelState::Dead
357 }
358 }
359
360 pub async fn wait_active(&self) -> TunnelState {
361 let (state, waiter) = {
362 let mut state = self.0.state.write().unwrap();
363 match &mut state.tunnel_state {
364 TunnelStateImpl::Connecting(connecting) => {
365 (TunnelState::Connecting, Some(connecting.waiter.new_waiter()))
366 },
367 TunnelStateImpl::Active(active) => {
368 (TunnelState::Active(active.remote_timestamp), None)
369 },
370 TunnelStateImpl::Dead(_) => {
371 (TunnelState::Dead, None)
372 }
373 }
374 };
375 if let Some(waiter) = waiter {
376 StateWaiter::wait(waiter, | | self.state()).await
377 } else {
378 state
379 }
380 }
381
382 pub fn tunnel_of<T: 'static + Tunnel + Clone>(&self, ep_pair: &EndpointPair) -> Option<T> {
383 let tunnel_impl = &self.0;
384 let entries = &tunnel_impl.state.read().unwrap().tunnel_entries;
385 entries.get(ep_pair).map(|tunnel| tunnel.clone_as_tunnel())
386 }
387
388 pub fn create_tunnel<T: 'static + Tunnel + Clone>(
389 &self,
390 ep_pair: EndpointPair,
391 proxy: ProxyType) -> BuckyResult<(T, bool)> {
392 trace!("{} try create tunnel on {}", self, ep_pair);
393 let stack = self.stack();
394 if stack.net_manager().listener().endpoints().get(ep_pair.remote()).is_some() {
395 trace!("{} ignore creat tunnel on {} for remote is self", self, ep_pair);
396 return Err(BuckyError::new(BuckyErrorCode::InvalidInput, "remote is self"));
397 }
398
399 let tunnel_impl = &self.0;
400 let (tunnel, newly_create) = {
401 let entries = &mut tunnel_impl.state.write().unwrap().tunnel_entries;
402 if let Some(tunnel) = entries.get(&ep_pair) {
403 trace!("{} create tunnel return existing tunnel", self);
407 (tunnel.clone(), None)
408 } else {
409 let dynamic_tunnel = match ep_pair.protocol() {
410 Protocol::Udp => {
411 let stack = Stack::from(&tunnel_impl.stack);
412 let interface = stack.net_manager().listener().udp_of(ep_pair.local()).ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "udp interface not found"))?.clone();
413 DynamicTunnel::new(udp::Tunnel::new(
414 self.clone(),
415 self.clone_as_tunnel_owner(),
416 interface,
417 *ep_pair.remote(),
418 proxy))
419 },
420 Protocol::Tcp => {
421 DynamicTunnel::new(tcp::Tunnel::new(self.clone(), ep_pair.clone()))
422 },
423 _ => {
424 unreachable!()
425 }
426 };
427 let tunnel = dynamic_tunnel.clone();
428 info!("{} tunnel newly created on {} ", self, ep_pair);
429 entries.insert(ep_pair, dynamic_tunnel);
430 (tunnel.clone(), Some(tunnel))
431 }
432 };
433 Ok((tunnel.clone_as_tunnel(), newly_create.is_some()))
434
435 }
436
437 pub(crate) fn generate_sequence(&self) -> TempSeq {
438 self.0.sequence_generator.generate()
439 }
440
441 fn select_stream_connector_by_exists(
442 remote_timestamp: Timestamp,
443 tunnel_entries: &BTreeMap<EndpointPair, DynamicTunnel>) -> Option<StreamConnectorSelector> {
444 struct Priority {
445 tcp: Option<tcp::Tunnel>,
446 reverse_tcp: Option<tcp::Tunnel>,
447 package: bool
448 }
449
450 let p = {
451 let mut priority = Priority {
452 tcp: None,
453 reverse_tcp: None,
454 package: false
455 };
456 for (_, tunnel) in tunnel_entries {
457 if let TunnelState::Active(_) = tunnel.as_ref().state() {
458 if tunnel.as_ref().local().is_tcp() {
459 let tunnel = tunnel.clone_as_tunnel::<tcp::Tunnel>();
460 if tunnel.is_reverse() && priority.reverse_tcp.is_none() {
461 priority.reverse_tcp = Some(tunnel);
462 } else {
463 priority.tcp = Some(tunnel);
464 break;
465 }
466 } else {
467 priority.package = true;
468 }
469 }
470 }
471
472 priority
473 };
474
475 if p.tcp.is_some() {
476 let tunnel = p.tcp.unwrap();
477 Some(StreamConnectorSelector::Tcp(tunnel, remote_timestamp))
478 } else if p.reverse_tcp.is_some() {
479 let tunnel = p.reverse_tcp.unwrap();
480 Some(StreamConnectorSelector::Tcp(tunnel, remote_timestamp))
481 } else if p.package {
482 Some(StreamConnectorSelector::Package(remote_timestamp))
483 } else {
484 None
485 }
486 }
487
488 pub(crate) async fn select_stream_connector(
489 &self,
490 build_params: BuildTunnelParams,
491 stream: StreamContainer) -> BuckyResult<StreamConnectorSelector> {
492 let tunnel_impl = &self.0;
493 let (selector, new_builder, exists_builder, tunnels) = {
494 let mut state = self.0.state.write().unwrap();
495 match &mut state.tunnel_state {
496 TunnelStateImpl::Active(active) => {
497 let cur_timestamp = active.remote_timestamp;
498 if let Some(selector) = Self::select_stream_connector_by_exists(
499 cur_timestamp,
500 &state.tunnel_entries) {
501 (Some(selector), None, None, None)
502 } else {
503 error!("{} active but no exists connector", self);
504 let mut tunnel_entries = BTreeMap::new();
505 std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
506
507 let builder = ConnectStreamBuilder::new(
508 tunnel_impl.stack.clone(),
509 build_params,
510 stream,
511 self.clone());
512 state.last_update = bucky_time_now();
513 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
514 waiter: StateWaiter::new(),
515 build_state: TunnelBuildState::ConnectStream(builder.clone()),
516 packages: LinkedList::new()
517 });
518 let tunnels: Vec<DynamicTunnel> = tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect();
519 (None, Some(builder), None, Some(tunnels))
520 }
521 },
522 TunnelStateImpl::Connecting(connecting) => {
523 match &mut connecting.build_state {
524 TunnelBuildState::Idle => {
525 let builder = ConnectStreamBuilder::new(
526 tunnel_impl.stack.clone(),
527 build_params,
528 stream,
529 self.clone());
530 connecting.build_state = TunnelBuildState::ConnectStream(builder.clone());
531 (None, Some(builder), None, None)
532 },
533 TunnelBuildState::ConnectStream(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None),
534 TunnelBuildState::AcceptStream(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None),
535 TunnelBuildState::ConnectTunnel(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None),
536 TunnelBuildState::AcceptTunnel(builder) => (None, None, Some(Box::new(builder.clone()) as Box<dyn TunnelBuilder>), None)
537 }
538 },
539 TunnelStateImpl::Dead(_) => {
540 let builder = ConnectStreamBuilder::new(
541 tunnel_impl.stack.clone(),
542 build_params,
543 stream,
544 self.clone());
545
546 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
547 waiter: StateWaiter::new(),
548 build_state: TunnelBuildState::ConnectStream(builder.clone()),
549 packages: LinkedList::new()
550 });
551
552 (None, Some(builder), None, None)
553 }
554 }
555 };
556 if let Some(tunnels) = tunnels {
557 for tunnel in tunnels {
558 tunnel.as_ref().reset();
559 }
560 }
561 if let Some(selector) = selector {
562 Ok(selector)
563 } else if let Some(builder) = exists_builder {
564 builder.wait_establish().await?;
566 let state = self.0.state.read().unwrap();
567 match &state.tunnel_state {
568 TunnelStateImpl::Active(active) => {
569 Self::select_stream_connector_by_exists(active.remote_timestamp, &state.tunnel_entries)
570 .ok_or_else(|| {
571 error!("{} active but no exists connector", self);
572 BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead")
573 })
574 },
575 _ => {
576 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
577 }
578 }
579 } else if let Some(builder) = new_builder {
580 self.stack().keystore().reset_peer(self.remote());
581 self.stack().device_cache().remove_inner(self.remote());
582 self.sync_connecting();
583 Ok(StreamConnectorSelector::Builder(builder))
584 } else {
585 unreachable!()
586 }
587 }
588
589 pub(crate) fn payload_size(&self) -> usize {
590 1024 }
592
593 pub fn reset(&self) {
594 let (tunnels, waiter) = {
595 let mut state = self.0.state.write().unwrap();
596 let (waiter, updated) = match &mut state.tunnel_state {
597 TunnelStateImpl::Connecting(connecting) => {
598 let mut waiter = StateWaiter::new();
599 connecting.waiter.transfer_into(&mut waiter);
600 state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
601 former_state: TunnelState::Connecting,
602 when: bucky_time_now()
603 });
604 (Some(waiter), true)
605 },
606 TunnelStateImpl::Active(active) => {
607 state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
608 former_state: TunnelState::Active(active.remote_timestamp),
609 when: bucky_time_now()
610 });
611 (None, true)
612 },
613 TunnelStateImpl::Dead(_) => {
614 (None, false)
615 }
616 };
617 if updated {
618 state.last_update = bucky_time_now();
619 }
620 let mut tunnel_entries = BTreeMap::new();
621 std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
622 let tunnels: Vec<DynamicTunnel> = tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect();
623 (tunnels, waiter)
624 };
625 for tunnel in tunnels {
626 tunnel.as_ref().reset();
627 }
628 if let Some(waiter) = waiter {
629 waiter.wake();
630 }
631 }
632
633 pub(crate) fn mark_dead(&self, active_timestamp: Timestamp, last_update: Timestamp) -> BuckyResult<()> {
634 info!("{} mark dead with active timestamp {} last_update {}", self, active_timestamp, last_update);
635 let tunnels: Vec<DynamicTunnel> = {
636 let mut state = self.0.state.write().unwrap();
637 if state.last_update > last_update {
638 info!("{} ignore mark dead for updated", self);
639 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel has updated"))
640 } else {
641 match &mut state.tunnel_state {
642 TunnelStateImpl::Connecting(_) => {
643 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's connecting"))
644 },
645 TunnelStateImpl::Active(active) => {
646 if active.default_tunnel.as_ref().local().is_tcp() {
647 info!("{} ignore mark dead for tcp default", self);
648 Err(BuckyError::new(BuckyErrorCode::ErrorState, "default tcp tunnel"))
649 } else {
650 let cur_timestamp = active.remote_timestamp;
651 if cur_timestamp == active_timestamp {
652 info!("{} Active({})=>Dead", self, cur_timestamp);
653 state.last_update = bucky_time_now();
654 state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
655 former_state: TunnelState::Active(cur_timestamp),
656 when: bucky_time_now()
657 });
658 let mut tunnel_entries = BTreeMap::new();
659 std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
660 Ok(tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect())
661 } else {
662 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's active"))
663 }
664 }
665 },
666 TunnelStateImpl::Dead(_) => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
667 }
668 }
669 }?;
670 for tunnel in tunnels {
671 tunnel.as_ref().reset();
672 }
673 Ok(())
674 }
675
676 pub(super) fn on_raw_data(&self, data: &[u8], tunnel: DynamicTunnel) -> BuckyResult<()> {
677 let tunnel_impl = &self.0;
678 let (cmd_code, buf) = u8::raw_decode(data)?;
679 let cmd_code = PackageCmdCode::try_from(cmd_code)?;
680 match cmd_code {
681 PackageCmdCode::Datagram => {
682 let (pkg, _) = Datagram::raw_decode_with_context(buf, &mut merge_context::OtherDecode::default())?;
683 let _ = Stack::from(&tunnel_impl.stack).datagram_manager().on_package(&pkg, (self, true));
684 Ok(())
685 },
686 PackageCmdCode::SessionData => unimplemented!(),
687 _ => {
688 Stack::from(&tunnel_impl.stack).ndn().channel_manager().on_raw_data(data, (self, tunnel))
689 },
690 }
691 }
692
693
694
695 }
723
724impl fmt::Display for TunnelContainer {
725 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
726 write!(f, "TunnelContainer{{local:{}, remote:{}}}",
727 Stack::from(&self.0.stack).local_device_id(), self.remote())
728 }
729}
730
731
732impl TunnelOwner for TunnelContainer {
733 fn sync_tunnel_state(&self, tunnel: &DynamicTunnel, former_state: TunnelState, new_state: TunnelState) {
734 struct NextStep {
736 old_default: Option<DynamicTunnel>,
737 new_default: Option<DynamicTunnel>,
738 reset_tunnels: LinkedList<DynamicTunnel>,
739 waiters: StateWaiter,
740 packages: LinkedList<(DynamicPackage, bool)>
741 }
742
743 let mut next_step = NextStep {
744 old_default: None,
745 new_default: None,
746 reset_tunnels: LinkedList::new(),
747 waiters: StateWaiter::new(),
748 packages: LinkedList::new()
749 };
750 match new_state {
751 TunnelState::Connecting => {
752 unreachable!()
753 },
754 TunnelState::Active(remote_timestamp) => {
755 let mut state = self.0.state.write().unwrap();
756 let entries = &mut state.tunnel_entries;
758 let ep_pair = EndpointPair::from((*tunnel.as_ref().local(), *tunnel.as_ref().remote()));
759 let exists = {
760 if let Some(stub) = entries.get(&ep_pair) {
761 stub.as_ref().ptr_eq(tunnel)
762 } else {
763 false
764 }
765 };
766 let mut to_reset = vec![];
767
768 if exists {
769 for (remote, tunnel) in &state.tunnel_entries {
770 if let TunnelState::Active(active_timestamp) = tunnel.as_ref().state() {
771 if active_timestamp < remote_timestamp {
772 to_reset.push(remote.clone());
773 }
774 }
775 }
776 for remote in to_reset {
777 next_step.reset_tunnels.push_back(state.tunnel_entries.remove(&remote).unwrap());
778 }
779
780 let updated = match &mut state.tunnel_state {
781 TunnelStateImpl::Active(active) => {
782 let remote_updated = active.remote_timestamp < remote_timestamp;
784 let change_default = remote_updated || {
785 if ProxyType::None != active.default_tunnel.as_ref().proxy() {
786 ProxyType::None == tunnel.as_ref().proxy()
788 } else {
789 tunnel.as_ref().local().is_udp() && active.default_tunnel.as_ref().local().is_tcp()
791 || tunnel.as_ref().local().is_tcp() && tunnel.as_ref().remote().addr().port() != 0 && active.default_tunnel.as_ref().remote().addr().port() == 0
797 }
798 };
799 if change_default {
800 info!("{} change default from {} to {}", self, active.default_tunnel.as_ref().as_ref(), tunnel.as_ref().as_ref());
801 next_step.old_default = Some(active.default_tunnel.clone());
802 active.remote_timestamp = remote_timestamp;
803 active.default_tunnel = tunnel.clone();
804 next_step.new_default = Some(tunnel.clone());
805 }
806 change_default
807 },
808 TunnelStateImpl::Connecting(connecting) => {
809 info!("{} connecting=>active with default {}", self, tunnel.as_ref().as_ref());
810 connecting.waiter.transfer_into(&mut next_step.waiters);
811
812 mem::swap(&mut next_step.packages, &mut connecting.packages);
813
814 state.tunnel_state = TunnelStateImpl::Active(TunnelActiveState {
815 default_tunnel: tunnel.clone(),
816 remote_timestamp: remote_timestamp
817 });
818
819 next_step.new_default = Some(tunnel.clone());
820
821 true
822 },
823 TunnelStateImpl::Dead(_) => {
824 info!("{} dead=>active with default {}", self, tunnel.as_ref().as_ref());
825 state.tunnel_state = TunnelStateImpl::Active(TunnelActiveState {
826 default_tunnel: tunnel.clone(),
827 remote_timestamp: remote_timestamp
828 });
829 next_step.new_default = Some(tunnel.clone());
830 true
831 }
832 };
833 if updated {
834 state.last_update = bucky_time_now();
835 }
836 } else {
837 warn!("{} reset tunnel {} for not in ep map", self, tunnel.as_ref().as_ref());
838 next_step.reset_tunnels.push_back(tunnel.clone());
839 }
840 },
841 TunnelState::Dead => {
842 let state = &mut *self.0.state.write().unwrap();
843 let entries = &mut state.tunnel_entries;
845 let ep_pair = EndpointPair::from((*tunnel.as_ref().local(), *tunnel.as_ref().remote()));
846 let exists = {
847 if let Some(stub) = entries.get(&ep_pair) {
848 if stub.as_ref().ptr_eq(tunnel) {
849 info!("{} remove tunnel {}", self, tunnel.as_ref().as_ref());
850 entries.remove(&ep_pair);
851 true
852 } else {
853 false
854 }
855 } else {
856 false
857 }
858 };
859
860 if exists {
861 if let TunnelState::Active(remote_timestamp) = former_state {
862 match &state.tunnel_state {
863 TunnelStateImpl::Active(active) => {
864 if active.remote_timestamp == remote_timestamp {
865 let default_tunnel = active.default_tunnel.clone();
866 info!("{} active=>dead for tunnel {} dead", self, tunnel.as_ref().as_ref());
867 for (_, tunnel) in &state.tunnel_entries {
868 next_step.reset_tunnels.push_back(tunnel.clone());
869 }
870 state.tunnel_entries.clear();
871 state.last_update = bucky_time_now();
872 state.tunnel_state = TunnelStateImpl::Dead(TunnelDeadState {
873 former_state: TunnelState::Active(active.remote_timestamp),
874 when: bucky_time_now()
875 });
876 next_step.old_default = Some(default_tunnel);
877 }
878 },
879 _ => {
880 }
882 }
883 }
884 }
885 }
886 };
887 next_step.waiters.wake();
888 if let Some(old) = next_step.old_default {
889 old.as_ref().release_keeper();
890 }
891 if let Some(new) = next_step.new_default {
892 new.as_ref().retain_keeper();
893 }
894
895 for tunnel in next_step.reset_tunnels {
896 tunnel.as_ref().reset();
897 }
898
899 for (package, plaintext) in next_step.packages {
900 let _ = self.send_package(package, plaintext);
901 }
902 }
903
904 fn clone_as_tunnel_owner(&self) -> Box<dyn TunnelOwner> {
905 Box::new(self.clone())
906 }
907}
908
909impl OnUdpPackageBox for TunnelContainer {
910 fn on_udp_package_box(&self, udp_box: interface::udp::UdpPackageBox) -> Result<(), BuckyError> {
911 let ep_pair = EndpointPair::from((udp_box.local().local(), *udp_box.remote()));
913 let udp_tunnel = match self.tunnel_of::<udp::Tunnel>(&ep_pair) {
914 Some(tunnel) => {
915 Ok(tunnel)
916 },
917 None => self.create_tunnel::<udp::Tunnel>(ep_pair, ProxyType::None).map(|(t, _)| t)
918 }?;
919 udp_tunnel.on_udp_package_box(udp_box)
922 }
923}
924
925impl OnUdpRawData<(interface::udp::Interface, DeviceId, MixAesKey, Endpoint)> for TunnelContainer {
926 fn on_udp_raw_data(&self, data: &[u8], context: (interface::udp::Interface, DeviceId, MixAesKey, Endpoint)) -> Result<(), BuckyError> {
927 let (interface, _, key, remote) = context;
929 let ep_pair = EndpointPair::from((interface.local(), remote));
930 let udp_tunnel = match self.tunnel_of::<udp::Tunnel>(&ep_pair) {
931 Some(tunnel) => {
932 Ok(tunnel)
933 },
934 None => self.create_tunnel::<udp::Tunnel>(ep_pair, ProxyType::None).map(|(t, _)| t)
935 }?;
936 let _ = udp_tunnel.active(&key, false, None);
939 self.on_raw_data(data, DynamicTunnel::new(udp_tunnel))
940 }
941}
942
943impl OnTcpInterface for TunnelContainer {
944 fn on_tcp_interface(&self, interface: interface::tcp::AcceptInterface, first_box: PackageBox) -> Result<OnPackageResult, BuckyError> {
945 let ep_pair = EndpointPair::from((*interface.local(), Endpoint::default_tcp(interface.local())));
947 let tcp_tunnel = match self.tunnel_of::<tcp::Tunnel>(&ep_pair) {
948 Some(tunnel) => {
949 Ok(tunnel)
950 },
951 None => self.create_tunnel::<tcp::Tunnel>(ep_pair, ProxyType::None).map(|(t, _)| t)
952 }?;
953 tcp_tunnel.on_tcp_interface(interface, first_box)
956 }
957}
958
959impl PingClientCalledEvent<PackageBox> for TunnelContainer {
960 fn on_called(&self, called: &SnCalled, caller_box: PackageBox) -> Result<(), BuckyError> {
961 let syn_tunnel: &SynTunnel = caller_box.packages_no_exchange()[0].as_ref();
962 let remote_timestamp = syn_tunnel.from_device_desc.body().as_ref().unwrap().update_time();
963 let _ = self.on_package(syn_tunnel, None)?;
964 if let Some(second_pkg) = caller_box.packages_no_exchange().get(1) {
965 match second_pkg.cmd_code() {
966 PackageCmdCode::SessionData => {
967 let session_data: &SessionData = second_pkg.as_ref();
968 if !session_data.is_syn() {
969 debug!("{} ignore sn called for sesion data not syn", self);
970 return Err(BuckyError::new(BuckyErrorCode::InvalidInput, "session data in sn called should has has syn flag"));
971 }
972 let _ = self.on_package(session_data, None)?;
973 let remote_seq = RemoteSequence::from((self.remote().clone(), session_data.syn_info.as_ref().unwrap().sequence));
974 let stream = Stack::from(&self.0.stack).stream_manager().stream_of_remote_sequence(&remote_seq);
975 if stream.is_none() {
976 debug!("{} ignore accept stream builder for stream of {} no more connecting", self, remote_seq);
977 return Ok(());
978 }
979 let stream = stream.unwrap();
980 let acceptor = stream.acceptor();
981 if acceptor.is_none() {
982 debug!("{} ignore accept stream builder for stream of {} no more connecting", self, remote_seq);
983 return Ok(());
984 }
985 let acceptor = acceptor.unwrap();
986 match {
988 let mut state = self.0.state.write().unwrap();
989 match &mut state.tunnel_state {
990 TunnelStateImpl::Connecting(connecting) => {
991 match &mut connecting.build_state {
992 TunnelBuildState::Idle => {
993 connecting.build_state = TunnelBuildState::AcceptStream(acceptor.clone());
994 Ok((vec![], None))
995 },
996 TunnelBuildState::ConnectStream(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))),
997 TunnelBuildState::ConnectTunnel(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))),
998 _ => Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "another builder exists"))
999 }
1022 },
1023 TunnelStateImpl::Active(active) => {
1024 if active.remote_timestamp < remote_timestamp {
1025 state.last_update = bucky_time_now();
1026 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1027 waiter: StateWaiter::new(),
1028 build_state: TunnelBuildState::AcceptStream(acceptor.clone()),
1029 packages: LinkedList::new()
1030 });
1031 let mut tunnel_entries = BTreeMap::new();
1032 std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
1033 Ok((tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect(), None))
1034 } else {
1035 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's active"))
1036 }
1037 },
1038 TunnelStateImpl::Dead(_) => {
1039 state.last_update = bucky_time_now();
1040 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1041 waiter: StateWaiter::new(),
1042 build_state: TunnelBuildState::AcceptStream(acceptor.clone()),
1043 packages: LinkedList::new()
1044 });
1045 Ok((vec![], None))
1046 }
1047 }
1048 } {
1049 Err(err) => {
1050 info!("{} ignore accept stream builder {} for {}", self, acceptor, err);
1051 },
1052 Ok((tunnels, builder)) => {
1053 for tunnel in tunnels {
1054 tunnel.as_ref().reset();
1055 }
1056
1057 if let Some(builder) = builder {
1058 let _ = builder.on_called(called, ());
1059 } else {
1060 self.sync_connecting();
1061 let _ = acceptor.on_called(called, caller_box);
1063 }
1064 }
1065 }
1066 },
1067 _ => {
1068 unreachable!()
1070 }
1071 }
1072 } else if called.reverse_endpoint_array.len() > 0 {
1073 info!("{} called for reverse connect tunnel", self);
1074 for remote in &called.reverse_endpoint_array {
1078 let ep_pair = EndpointPair::from((Endpoint::default_tcp(remote), *remote));
1079 let tunnel = self.create_tunnel::<tcp::Tunnel>(ep_pair, ProxyType::None);
1080 if let Ok((tunnel, _)) = tunnel {
1081 let _ = tunnel.connect();
1082 }
1083 }
1084 } else {
1086 let acceptor = AcceptTunnelBuilder::new(self.0.stack.clone(), self.clone(), syn_tunnel.sequence);
1087 match {
1088 let mut state = self.0.state.write().unwrap();
1089 match &mut state.tunnel_state {
1090 TunnelStateImpl::Connecting(connecting) => {
1091 match &mut connecting.build_state {
1092 TunnelBuildState::Idle => {
1093 connecting.build_state = TunnelBuildState::AcceptTunnel(acceptor.clone());
1094 Ok((vec![], None))
1095 },
1096 TunnelBuildState::ConnectStream(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))),
1097 TunnelBuildState::ConnectTunnel(builder) => Ok((vec![], Some(Box::new(builder.clone()) as Box<dyn PingClientCalledEvent>))),
1098 _ => Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "another builder exists"))
1099 }
1100 },
1101 TunnelStateImpl::Active(active) => {
1102 if active.remote_timestamp < remote_timestamp {
1103 state.last_update = bucky_time_now();
1104 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1105 waiter: StateWaiter::new(),
1106 build_state: TunnelBuildState::AcceptTunnel(acceptor.clone()),
1107 packages: LinkedList::new()
1108 });
1109 let mut tunnel_entries = BTreeMap::new();
1110 std::mem::swap(&mut tunnel_entries, &mut state.tunnel_entries);
1111 Ok((tunnel_entries.into_iter().map(|(_, tunnel)| tunnel).collect(), None))
1112 } else {
1113 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's active"))
1114 }
1115 },
1116 TunnelStateImpl::Dead(_) => {
1117 state.last_update = bucky_time_now();
1118 state.tunnel_state = TunnelStateImpl::Connecting(TunnelConnectingState {
1119 waiter: StateWaiter::new(),
1120 build_state: TunnelBuildState::AcceptTunnel(acceptor.clone()),
1121 packages: LinkedList::new()
1122 });
1123 Ok((vec![], None))
1124 }
1125 }
1126 } {
1127 Ok((tunnels, builder)) => {
1128 for tunnel in tunnels {
1129 tunnel.as_ref().reset();
1130 }
1131 if let Some(builder) = builder {
1132 let _ = builder.on_called(called, ());
1133 } else {
1134 let active_pn_list = called.active_pn_list.clone();
1135 self.sync_connecting();
1136 task::spawn(async move {
1138 let _ = acceptor.build(caller_box, active_pn_list).await;
1139 });
1140 }
1141 },
1142 Err(err) => {
1143 debug!("{} ignore accept tunnel builder {} for {}", self, acceptor, err);
1144 }
1145 }
1146 }
1147 Ok(())
1148 }
1149}
1150
1151impl OnPackage<SynTunnel> for TunnelContainer {
1152 fn on_package(&self, pkg: &SynTunnel, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1153 Stack::from(&self.0.stack).device_cache().add(&pkg.from_device_desc.desc().device_id(), &pkg.from_device_desc);
1155 Ok(OnPackageResult::Handled)
1156 }
1157}
1158
1159impl OnPackage<AckTunnel> for TunnelContainer {
1160 fn on_package(&self, pkg: &AckTunnel, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1161 Stack::from(&self.0.stack).device_cache().add(self.remote(), &pkg.to_device_desc);
1163 Ok(OnPackageResult::Handled)
1164 }
1165}
1166
1167impl OnPackage<TcpSynConnection, interface::tcp::AcceptInterface> for TunnelContainer {
1168 fn on_package(&self, pkg: &TcpSynConnection, interface: interface::tcp::AcceptInterface) -> Result<OnPackageResult, BuckyError> {
1169 let tunnel_impl = &self.0;
1171 Stack::from(&tunnel_impl.stack).device_cache().add(&self.remote(), &pkg.from_device_desc);
1172 Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, (self, interface))
1174 .map_err(|err| {
1175 debug!("{} handle package {} error {}", self, pkg, err);
1176 err
1177 })
1178 }
1179}
1180
1181impl OnPackage<TcpAckConnection, interface::tcp::AcceptInterface> for TunnelContainer {
1182 fn on_package(&self, pkg: &TcpAckConnection, interface: interface::tcp::AcceptInterface) -> Result<OnPackageResult, BuckyError> {
1183 let tunnel_impl = &self.0;
1185 Stack::from(&tunnel_impl.stack).device_cache().add(&self.remote(), &pkg.to_device_desc);
1186 Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, (self, interface))
1188 .map_err(|err| {
1189 debug!("{} handle package {} error {}", self, pkg, err);
1190 err
1191 })
1192 }
1193}
1194
1195impl OnPackage<Datagram> for TunnelContainer {
1196 fn on_package(&self, pkg: &Datagram, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1197 Stack::from(&self.0.stack).datagram_manager().on_package(pkg, (self, false))
1198 .map_err(|err| {
1199 debug!("{} handle package {} error {}", self, pkg, err);
1200 err
1201 })
1202 }
1203}
1204
1205impl OnPackage<SessionData> for TunnelContainer {
1206 fn on_package(&self, pkg: &SessionData, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1207 let tunnel_impl = &self.0;
1208 Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, self)
1210 .map_err(|err| {
1211 debug!("{} handle package {} error {}", self, pkg, err);
1212 err
1213 })
1214 }
1215}
1216
1217impl OnPackage<TcpSynConnection> for TunnelContainer {
1218 fn on_package(&self, pkg: &TcpSynConnection, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
1219 let tunnel_impl = &self.0;
1220 Stack::from(&tunnel_impl.stack).stream_manager().on_package(pkg, self)
1222 .map_err(|err| {
1223 debug!("{} handle package {} error {}", self, pkg, err);
1224 err
1225 })
1226 }
1227}
1228
1229impl OnPackage<AckProxy, &DeviceId> for TunnelContainer {
1230 fn on_package(&self, pkg: &AckProxy, proxy: &DeviceId) -> Result<OnPackageResult, BuckyError> {
1231 let tunnel_impl = &self.0;
1232 let builder = if let TunnelStateImpl::Connecting(connecting) = &tunnel_impl.state.read().unwrap().tunnel_state {
1233 match &connecting.build_state {
1234 TunnelBuildState::ConnectStream(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>),
1235 TunnelBuildState::ConnectTunnel(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>),
1236 TunnelBuildState::AcceptStream(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>),
1237 TunnelBuildState::AcceptTunnel(builder) => Some(Box::new(builder.clone()) as Box<dyn OnPackage<AckProxy, &DeviceId>>),
1238 TunnelBuildState::Idle => None
1239 }.ok_or_else(|| BuckyError::new(BuckyErrorCode::ErrorState, "no builder"))
1240 } else {
1241 Err(BuckyError::new(BuckyErrorCode::ErrorState, "not connecting"))
1242 }.map_err(|err| {
1243 debug!("{} ignore ack proxy from {} for {}", self, proxy, err);
1244 err
1245 })?;
1246 builder.on_package(pkg, proxy)
1247 }
1248}
1249
1250struct ContainerRef(TunnelContainer);
1251
1252#[derive(Clone)]
1253pub struct TunnelGuard(Arc<ContainerRef>);
1254
1255#[derive(Clone)]
1256pub struct WeakTunnelGuard(Weak<ContainerRef>);
1257
1258impl WeakTunnelGuard {
1259 pub fn to_strong(&self) -> Option<TunnelGuard> {
1260 self.0.upgrade().map(|s| TunnelGuard(s))
1261 }
1262}
1263
1264impl TunnelGuard {
1265 pub(super) fn new(tunnel: TunnelContainer) -> Self {
1266 Self(Arc::new(ContainerRef(tunnel)))
1267 }
1268
1269 pub fn to_weak(&self) -> WeakTunnelGuard {
1270 WeakTunnelGuard(Arc::downgrade(&self.0))
1271 }
1272
1273 pub fn ref_count(&self) -> usize {
1274 Arc::strong_count(&self.0)
1275 }
1276}
1277
1278impl Drop for ContainerRef {
1279 fn drop(&mut self) {
1280 self.0.reset();
1281 }
1282}
1283
1284impl Deref for TunnelGuard {
1285 type Target = TunnelContainer;
1286 fn deref(&self) -> &TunnelContainer {
1287 &self.0.0
1288 }
1289}
1290
1291impl AsRef<TunnelContainer> for TunnelGuard {
1292 fn as_ref(&self) -> &TunnelContainer {
1293 &self.0.0
1294 }
1295}