1use log::*;
2use std::{
3 sync::atomic::{AtomicI32, AtomicU64, Ordering},
4 time::Duration
5};
6use cyfs_debug::Mutex;
7use async_std::{
8 sync::{Arc},
9 channel::{bounded, Sender, Receiver},
10 task,
11 future
12};
13use futures::future::{Abortable, AbortHandle, AbortRegistration};
14use async_trait::{async_trait};
15use ringbuf;
16use cyfs_base::*;
17use crate::{
18 types::*,
19 protocol::{self, *, v0::*},
20 history::keystore,
21 MTU,
22 interface::{self, *, tcp::{OnTcpInterface, RecvBox, PackageInterface}}
23};
24use super::{tunnel::{self, DynamicTunnel, TunnelOwner, ProxyType}, TunnelContainer};
25
26#[derive(Clone)]
27pub struct Config {
28 pub connect_timeout: Duration,
29 pub confirm_timeout: Duration,
30 pub accept_timeout: Duration,
31 pub retain_connect_delay: Duration,
33 pub ping_interval: Duration,
34 pub ping_timeout: Duration,
35
36 pub package_buffer: usize,
37 pub piece_buffer: usize,
38 pub piece_interval: Duration,
40}
41
42enum TunnelState {
43 Connecting(ConnectingState),
44 PreActive(PreActiveState),
47 Active(ActiveState),
48 Dead,
49}
50
51
52enum ConnectorState {
54 None,
55 Connecting,
56 ReverseConnecting(AbortHandle)
57}
58
59struct ConnectingState {
60 owner: TunnelContainer,
61 connector: ConnectorState
62}
63
64
65
66enum PackageElem {
67 Package(DynamicPackage),
68 RawData(Vec<u8>),
69}
70
71enum CommandElem {
72 Discard(usize)
73}
74
75enum SignalElem {
76 Package(PackageElem),
77 Command(CommandElem)
78}
79
80
81struct PreActiveState {
82 owner: TunnelContainer,
83 connector: ConnectorState,
84 remote_timestamp: Timestamp,
85 signal_writer: Sender<SignalElem>,
86 signal_reader: Receiver<SignalElem>,
87}
88
89struct ActiveState {
90 owner: TunnelContainer,
91 interface: tcp::PackageInterface,
92 remote_timestamp: Timestamp,
93 syn_seq: TempSeq,
94 signal_writer: Sender<SignalElem>,
95 piece_writer: ringbuf::Producer<u8>,
96 dead_waiters: StateWaiter
97}
98
99struct TunnelImpl {
100 remote_device_id: DeviceId,
101 local_remote: EndpointPair,
102 keeper_count: AtomicI32,
103 last_active: AtomicU64,
104 retain_connect_timestamp: AtomicU64,
105 state: Mutex<TunnelState>,
106 mtu: usize,
107}
108
109#[derive(Clone)]
110pub struct Tunnel(Arc<TunnelImpl>);
111
112impl std::fmt::Display for Tunnel {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 write!(f, "TcpTunnel{{remote_device:{}, local:{}, remote:{}}}", self.0.remote_device_id, tunnel::Tunnel::local(self), tunnel::Tunnel::remote(self))
115 }
116}
117
118impl Tunnel {
119 pub fn new(
120 owner: TunnelContainer,
121 ep_pair: EndpointPair) -> Self {
122 let remote_device_id = owner.remote().clone();
123 let tunnel = Self(Arc::new(TunnelImpl {
124 mtu: MTU-12,
125 remote_device_id,
126 local_remote: ep_pair,
127 keeper_count: AtomicI32::new(0),
128 last_active: AtomicU64::new(0),
129 retain_connect_timestamp: AtomicU64::new(0),
130 state: Mutex::new(TunnelState::Connecting(
131 ConnectingState {
132 owner,
133 connector: ConnectorState::None
134 }))
135 }));
136 info!("{} created with state: {:?}", tunnel, tunnel::Tunnel::state(&tunnel));
137 tunnel
138 }
139
140 pub fn pre_active(&self, remote_timestamp: Timestamp) -> BuckyResult<TunnelContainer> {
141 self.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
142 struct NextStep {
143 owner: TunnelContainer,
144 former_state: tunnel::TunnelState,
145 cur_state: tunnel::TunnelState
146 }
147 let next_step = {
148 let state = &mut *self.0.state.lock().unwrap();
149 match state {
150 TunnelState::Connecting(connecting) => {
151 info!("{} Connecting=>PreActive", self);
152 let owner = connecting.owner.clone();
153 let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
154 *state = TunnelState::PreActive(PreActiveState {
155 owner: owner.clone(),
156 remote_timestamp,
157 connector: match connecting.connector {
158 ConnectorState::None => ConnectorState::None,
159 ConnectorState::Connecting => ConnectorState::Connecting,
160 ConnectorState::ReverseConnecting(_) => unreachable!()
161 },
162 signal_writer,
163 signal_reader
164 });
165 Ok(NextStep {
166 owner,
167 former_state: tunnel::TunnelState::Connecting,
168 cur_state: tunnel::TunnelState::Active(remote_timestamp)})
169 },
170 TunnelState::PreActive(pre_active) => {
171 if pre_active.remote_timestamp > remote_timestamp {
172 Ok((tunnel::TunnelState::Active(remote_timestamp), tunnel::TunnelState::Active(remote_timestamp)))
173 } else if pre_active.remote_timestamp == remote_timestamp {
174 Ok((tunnel::TunnelState::Active(remote_timestamp), tunnel::TunnelState::Active(remote_timestamp)))
175 } else {
176 let former_state = tunnel::TunnelState::Active(pre_active.remote_timestamp);
177 pre_active.remote_timestamp = remote_timestamp;
178 Ok((former_state, tunnel::TunnelState::Active(remote_timestamp)))
179 }.map(|(former_state, cur_state)| NextStep {
180 owner: pre_active.owner.clone(),
181 former_state,
182 cur_state
183 })
184 },
185 TunnelState::Active(active) => {
186 if active.remote_timestamp < remote_timestamp {
187 info!("{} Active=>PreActive", self);
188 let owner = active.owner.clone();
189 let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
190 let former_state = tunnel::TunnelState::Active(active.remote_timestamp);
191 *state = TunnelState::PreActive(PreActiveState {
192 owner: owner.clone(),
193 remote_timestamp,
194 connector: ConnectorState::None,
195 signal_writer,
196 signal_reader
197 });
198 Ok(NextStep {
199 owner,
200 former_state,
201 cur_state: tunnel::TunnelState::Active(remote_timestamp)
202 })
203 } else {
204 Ok(NextStep {
205 owner: active.owner.clone(),
206 former_state: tunnel::TunnelState::Active(active.remote_timestamp),
207 cur_state: tunnel::TunnelState::Active(active.remote_timestamp)
208 })
209 }
210 },
211 TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel dead"))
212 }
213 }?;
214 if next_step.former_state != next_step.cur_state {
215 next_step.owner.sync_tunnel_state(
216 &DynamicTunnel::new(self.clone()),
217 next_step.former_state,
218 next_step.cur_state);
219 }
220 Ok(next_step.owner)
221 }
222
223 pub fn is_reverse(&self) -> bool {
224 tunnel::Tunnel::remote(self).addr().port() == 0
225 }
226
227 pub fn is_data_piece_full(&self) -> BuckyResult<bool> {
228 let state = &mut *self.0.state.lock().unwrap();
229 match state {
230 TunnelState::Active(active) => {
231 Ok(active.piece_writer.capacity() == active.piece_writer.len())
232 },
233 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
234 }
235 }
236
237 pub fn discard_data_piece(&self) -> BuckyResult<()> {
238 let (signal_writer, len) = {
239 let state = &mut *self.0.state.lock().unwrap();
240 match state {
241 TunnelState::Active(active) => Ok((active.signal_writer.clone(), active.piece_writer.len())),
242 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
243 }
244 }?;
245 if len > 0 {
246 info!("{} send discard command: {}", self, len);
247 let _ = signal_writer.try_send(SignalElem::Command(CommandElem::Discard(len)));
248 }
249 Ok(())
250 }
251
252 pub fn send_data_piece(&self, buf: &[u8]) -> BuckyResult<()> {
253 let state = &mut *self.0.state.lock().unwrap();
254 match state {
255 TunnelState::Active(active) => {
256 if active.piece_writer.remaining() >= buf.len() {
257 let len = active.piece_writer.push_slice(buf);
258 assert_eq!(len, buf.len());
259 Ok(())
260 } else {
261 Err(BuckyError::new(BuckyErrorCode::Pending, "full"))
262 }
263 },
264 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
265 }
266 }
267
268 fn active_with_interface(&self, interface: Result<(tcp::PackageInterface, Timestamp, TempSeq), BuckyError>) {
269 match interface {
270 Ok((interface, remote_timestamp, syn_seq)) => {
271 struct NextStep {
272 owner: TunnelContainer,
273 former_state: tunnel::TunnelState,
274 cur_state: tunnel::TunnelState,
275 signal_reader: Receiver<SignalElem>,
276 piece_reader: ringbuf::Consumer<u8>,
277 reverse_waiter: Option<AbortHandle>,
278 to_close: Option<tcp::PackageInterface>,
279 dead_waiter: AbortRegistration
280 }
281 self.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
282 if let Some(next_step) = {
283 let state = &mut *self.0.state.lock().unwrap();
284 match state {
285 TunnelState::Connecting(connecting) => {
286 info!("{} connecting => active(remote:{}, seq:{:?})", self, remote_timestamp, syn_seq);
287 let owner = connecting.owner.clone();
288 let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
289 let ring_buf = ringbuf::RingBuffer::<u8>::new(owner.config().tcp.piece_buffer * udp::MTU);
290 let (piece_writer, piece_reader) = ring_buf.split();
291 let mut dead_waiters = StateWaiter::new();
292 let dead_waiter = dead_waiters.new_waiter();
293 *state = TunnelState::Active(ActiveState {
294 owner: owner.clone(),
295 interface: interface.clone(),
296 remote_timestamp,
297 syn_seq,
298 signal_writer,
299 piece_writer,
300 dead_waiters
301 });
302
303 Some(NextStep {
304 owner,
305 former_state: tunnel::TunnelState::Connecting,
306 cur_state: tunnel::TunnelState::Active(remote_timestamp),
307 signal_reader,
308 piece_reader,
309 reverse_waiter: None,
310 to_close: None,
311 dead_waiter
312 })
313 },
314 TunnelState::PreActive(pre_active) => {
315 info!("{} PreActive => Active(remote:{}, seq:{:?})", self, remote_timestamp, syn_seq);
317 let former_state = tunnel::TunnelState::Active(pre_active.remote_timestamp);
318 let owner = pre_active.owner.clone();
319
320 let ring_buf = ringbuf::RingBuffer::<u8>::new(owner.config().tcp.piece_buffer * udp::MTU);
321 let (piece_writer, piece_reader) = ring_buf.split();
322
323 let signal_reader = pre_active.signal_reader.clone();
324 let signal_writer = pre_active.signal_writer.clone();
325
326 let mut dead_waiters = StateWaiter::new();
327 let dead_waiter = dead_waiters.new_waiter();
328
329 let reverse_waiter = match &pre_active.connector {
330 ConnectorState::ReverseConnecting(waiter) => {
331 Some(waiter.clone())
332 },
333 _ => None
334 };
335 *state = TunnelState::Active(ActiveState {
336 owner: owner.clone(),
337 interface: interface.clone(),
338 remote_timestamp,
339 syn_seq,
340 signal_writer,
341 piece_writer,
342 dead_waiters
343 });
344 Some(NextStep {
345 owner,
346 former_state,
347 cur_state: tunnel::TunnelState::Active(remote_timestamp),
348 signal_reader,
349 piece_reader,
350 reverse_waiter,
351 to_close: None,
352 dead_waiter
353 })
354 },
355 TunnelState::Active(active) => {
356 if active.remote_timestamp < remote_timestamp
357 || active.syn_seq < syn_seq {
358 info!("{} Active(remote:{}, seq:{:?}) => Active(remote:{}, seq:{:?})", self, active.remote_timestamp, active.syn_seq, remote_timestamp, syn_seq);
359 let former_state = tunnel::TunnelState::Active(active.remote_timestamp);
360 let owner = active.owner.clone();
361
362 let ring_buf = ringbuf::RingBuffer::<u8>::new(owner.config().tcp.piece_buffer * udp::MTU);
363 let (piece_writer, piece_reader) = ring_buf.split();
364 let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
365 let to_close = Some(active.interface.clone());
366
367 let mut dead_waiters = StateWaiter::new();
368 let dead_waiter = dead_waiters.new_waiter();
369
370 *state = TunnelState::Active(ActiveState {
371 owner: owner.clone(),
372 interface: interface.clone(),
373 remote_timestamp,
374 syn_seq,
375 signal_writer,
376 piece_writer,
377 dead_waiters
378 });
379 Some(NextStep {
380 owner,
381 former_state,
382 cur_state: tunnel::TunnelState::Active(remote_timestamp),
383 signal_reader,
384 piece_reader,
385 reverse_waiter: None,
386 to_close,
387 dead_waiter
388 })
389 } else {
390 None
391 }
392 },
393 _ => None
394 }
395 } {
396 if let Some(reverse_waiter) = next_step.reverse_waiter {
397 reverse_waiter.abort();
398 }
399 self.start_recv(next_step.owner.clone(), interface, next_step.dead_waiter);
400 self.start_send(next_step.owner.clone(), next_step.signal_reader, next_step.piece_reader);
401
402 if next_step.former_state != next_step.cur_state {
403 next_step.owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), next_step.former_state, next_step.cur_state);
404 }
405
406 if let Some(to_close) = next_step.to_close {
407 info!("{} will close older {}", self, to_close);
408 to_close.close();
409 }
410 }
411 },
412 Err(err) => {
413 info!("{} dead for {}", self, err);
414 if let Some((owner, former_state)) = {
415 let state = &mut *self.0.state.lock().unwrap();
416 match state {
417 TunnelState::Connecting(connecting) => {
418 info!("{} connecting => dead", self);
419 let owner = connecting.owner.clone();
420 *state = TunnelState::Dead;
421 Some((owner, tunnel::TunnelState::Connecting))
422 },
423 TunnelState::PreActive(pre_active) => {
424 info!("{} PreActive => dead", self);
425 let former_state = tunnel::TunnelState::Active(pre_active.remote_timestamp);
426 let owner = pre_active.owner.clone();
427 *state = TunnelState::Dead;
428 Some((owner, former_state))
429 },
430 TunnelState::Active(_) => {
431 None
432 },
433 _ => {
434 None
436 }
437 }
438 } {
439 owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, tunnel::TunnelState::Dead);
440 }
441 }
442 }
443 }
444
445 pub(super) fn connect(&self) -> Result<(), BuckyError> {
446 if !self.is_reverse() {
447 info!("{} connect", self);
448 let owner = {
449 let state = &mut *self.0.state.lock().unwrap();
450 match state {
451 TunnelState::Connecting(connecting) => {
453 match connecting.connector {
454 ConnectorState::None => {
455 connecting.connector = ConnectorState::Connecting;
456 Ok(connecting.owner.clone())
457 },
458 _ => {
459 Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
460 }
461 }
462 },
463 TunnelState::PreActive(pre_active) => {
464 match pre_active.connector {
465 ConnectorState::None => {
466 pre_active.connector = ConnectorState::Connecting;
467 Ok(pre_active.owner.clone())
468 },
469 _ => {
470 Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
471 }
472 }
473 },
474 TunnelState::Active(_) => {
475 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
476 },
477 TunnelState::Dead => {
478 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already dead"))
479 }
480 }
481 }.map_err(|err| {debug!("{} connect failed for {}", self, err); err})?;
482
483 let tunnel = self.clone();
484 task::spawn(async move {
485 tunnel.active_with_interface(tunnel.connect_inner(owner.clone(), None).await);
486 });
487
488 } else {
489 info!("{} reverse connect", self);
490 let (owner, reg) = {
491 let state = &mut *self.0.state.lock().unwrap();
492 match state {
493 TunnelState::Connecting(_) => {
494 unreachable!()
495 },
496 TunnelState::PreActive(pre_active) => {
497 match pre_active.connector {
498 ConnectorState::None => {
499 let (waiter, reg) = AbortHandle::new_pair();
500 pre_active.connector = ConnectorState::ReverseConnecting(waiter);
501 Ok((pre_active.owner.clone(), reg))
502 },
503 _ => {
504 debug!("{} ignore reverse connect for reverse connecting", self);
505 Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
506 }
507 }
508 },
509 TunnelState::Active(_) => {
510 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
511 },
512 TunnelState::Dead => {
513 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already dead"))
514 }
515 }
516 }.map_err(|err| {debug!("{} connect failed for {}", self, err); err})?;
517
518 let tunnel = self.clone();
519 task::spawn(async move {
520 match tunnel.reverse_connect_inner(owner, reg).await {
521 Ok(_) => {
522 },
524 Err(err) => {
525 info!("{} reverse connect failed for {}", tunnel, err);
526 tunnel.active_with_interface(Err(err));
527 }
528 };
529 });
530 }
531 Ok(())
532 }
533
534 pub(crate) fn connect_with_interface(&self, interface: tcp::Interface) -> Result<(), BuckyError> {
535 info!("{} connect_with_interface", self);
536 let owner = {
537 let state = &mut *self.0.state.lock().unwrap();
538 match state {
539 TunnelState::Connecting(connecting) => {
540 match connecting.connector {
541 ConnectorState::None => {
542 connecting.connector = ConnectorState::Connecting;
543 Ok(connecting.owner.clone())
544 },
545 _ => {
546 Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
547 }
548 }
549 },
550 TunnelState::PreActive(_) => {
551 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
552 },
553 TunnelState::Active(_) => {
554 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
555 },
556 TunnelState::Dead => {
557 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already dead"))
558 }
559 }
560 }.map_err(|err| {debug!("{} connect failed for {}", self, err); err})?;
561
562 let tunnel = self.clone();
563 task::spawn(async move {
564
565 tunnel.active_with_interface(tunnel.connect_inner(owner.clone(), Some(interface)).await);
566 });
567 Ok(())
568 }
569
570 async fn connect_inner(&self, owner: TunnelContainer, interface: Option<tcp::Interface>) -> Result<(tcp::PackageInterface, Timestamp, TempSeq), BuckyError> {
571 info!("{} connect interface", self);
572 let stack = owner.stack();
573 let key_stub = stack.keystore().create_key(owner.remote_const(), true);
574 let interface = if let Some(interface) = interface {
575 Ok(interface)
576 } else {
577 tcp::Interface::connect(
578 *tunnel::Tunnel::remote(self),
580 owner.remote().clone(),
581 owner.remote_const().clone(),
582 key_stub.key,
583 owner.config().tcp.connect_timeout).await
584 }?;
585 let syn_seq = owner.generate_sequence();
586 let syn_tunnel = SynTunnel {
587 protocol_version: owner.protocol_version(),
588 stack_version: owner.stack_version(),
589 to_device_id: owner.remote().clone(),
590 sequence: syn_seq.clone(),
591 from_device_desc: stack.sn_client().ping().default_local(),
592 send_time: bucky_time_now()
593 };
594 let resp_box = interface.confirm_connect(&stack, vec![DynamicPackage::from(syn_tunnel)], owner.config().tcp.confirm_timeout).await?;
595
596 if resp_box.packages().len() != 1 {
597 Err(BuckyError::new(BuckyErrorCode::InvalidData, "should response AckTunnel"))
598 } else if resp_box.packages()[0].cmd_code() != PackageCmdCode::AckTunnel {
599 Err(BuckyError::new(BuckyErrorCode::InvalidData, "should response AckTunnel"))
600 } else {
601 let ack_tunnel: &AckTunnel = resp_box.packages()[0].as_ref();
602 let _ = owner.on_package(ack_tunnel, None);
603 if ack_tunnel.result == ACK_TUNNEL_RESULT_OK {
604 let remote_timestamp = ack_tunnel.to_device_desc.body().as_ref().unwrap().update_time();
605 Ok((interface.into(), remote_timestamp, syn_seq))
606 } else if ack_tunnel.result == ACK_TUNNEL_RESULT_REFUSED {
607 Err(BuckyError::new(BuckyErrorCode::InvalidData, "refused"))
608 } else {
609 Err(BuckyError::new(BuckyErrorCode::InvalidData, "should response AckTunnel"))
610 }
611 }
612 }
613
614 async fn reverse_connect_inner(&self, owner: TunnelContainer, reg: AbortRegistration) -> Result<(), BuckyError> {
615 let stack = owner.stack();
616 let remote = stack.device_cache().get_inner(owner.remote()).ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "device not cached"))?;
617 let sn_id = remote.connect_info().sn_list().get(0).ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "device no sn"))?;
618
619 let key_stub = stack.keystore().create_key(owner.remote_const(), true);
620 let mut syn_box = PackageBox::encrypt_box(owner.remote().clone(), key_stub.key.clone());
621 let syn_tunnel = SynTunnel {
622 protocol_version: owner.protocol_version(),
623 stack_version: owner.stack_version(),
624 to_device_id: owner.remote().clone(),
625 sequence: owner.generate_sequence(),
626 from_device_desc: stack.sn_client().ping().default_local(),
627 send_time: bucky_time_now()
628 };
629 if let keystore::EncryptedKey::Unconfirmed(encrypted) = key_stub.encrypted {
630 let mut exchg = Exchange::from((&syn_tunnel, encrypted, key_stub.key.mix_key));
631 exchg.sign(stack.keystore().signer()).await?;
632 syn_box.push(exchg);
633 }
634 syn_box.push(syn_tunnel);
635
636 let listener = stack.net_manager().listener();
637 let mut endpoints = vec![];
638 for t in listener.tcp() {
639 let outer = t.outer();
640 if outer.is_some() {
641 let outer = outer.unwrap();
642 if outer.eq(tunnel::Tunnel::local(self))
643 || t.local().eq(tunnel::Tunnel::local(self)) {
644 endpoints.push(outer);
645 }
646 } else {
647 endpoints.push(*tunnel::Tunnel::local(self));
648 }
649 }
650
651 let call_session = stack.sn_client().call().call(
652 Some(&endpoints),
653 owner.remote(),
654 &vec![sn_id.clone()],
655 |sn_call| {
656 let mut context = udp::PackageBoxEncodeContext::from(sn_call);
657 let mut buf = vec![0u8; interface::udp::MTU_LARGE];
658 let enc_len = syn_box.raw_tail_encode_with_context(&mut buf, &mut context, &None).unwrap().len();
659 buf.truncate(enc_len);
660 buf
661 }).await?;
662
663 let waiter = Abortable::new(call_session.next(), reg);
664 let _ = future::timeout(owner.config().connect_timeout, waiter).await?;
665 Ok(())
666 }
667
668 fn on_interface_error(&self, from: &PackageInterface, err: &BuckyError) {
669 error!("{} interface error {} from {}", self, err, from);
670
671 let notify = {
672 let state = &mut *self.0.state.lock().unwrap();
673 match state {
674 TunnelState::Active(active) => {
675 let owner = active.owner.clone();
676 if active.interface.ptr_eq(from) {
677 info!("{} Active({})=>Dead for interface error", self, active.remote_timestamp);
678 let former_state = tunnel::TunnelState::Active(active.remote_timestamp);
679 let mut dead_waiters = StateWaiter::new();
680 std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
681 *state = TunnelState::Dead;
682 Some((owner, former_state, Some(dead_waiters)))
683 } else {
684 None
685 }
686 },
687 _ => None
688 }
689 };
690
691 if let Some((owner, former_state, dead_waiters)) = notify {
692 if let Some(dead_waiters) = dead_waiters {
693 dead_waiters.wake();
694 }
695 owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, tunnel::TunnelState::Dead);
696 }
697
698 }
699
700 fn start_send(
701 &self,
702 owner: TunnelContainer,
703 signal_reader: Receiver<SignalElem>,
704 piece_reader: ringbuf::Consumer<u8>,
705 ) {
706 let tunnel = self.clone();
707 task::spawn(async move {
708 let stub = {
709 match &*tunnel.0.state.lock().unwrap() {
710 TunnelState::Active(active) => {
711 Ok(active.interface.clone())
712 },
713 _ => {
714 Err(BuckyError::new(BuckyErrorCode::ErrorState, "break send loop for invalid state"))
715 }
716 }
717 };
718
719 if stub.is_err() {
720 return ;
721 }
722 let interface = stub.unwrap();
723 let mut piece_reader = piece_reader;
724
725 info!("{} send loop start, {}", tunnel, owner.config().tcp.piece_interval.as_millis());
726 loop {
727 let mut send_buf = [0u8; udp::MTU_LARGE];
728 let mut piece_buf = [0u8; udp::MTU];
729
730 fn handle_command(
731 piece_reader: &mut ringbuf::Consumer<u8>,
732 command: CommandElem
733 ) -> BuckyResult<()> {
734 match command {
735 CommandElem::Discard(len) => piece_reader.discard(len),
736 };
737 Ok(())
738 }
739
740 async fn handle_package(
741 interface: &PackageInterface,
742 send_buf: &mut [u8],
743 pkg: PackageElem) -> BuckyResult<()> {
744 match pkg {
745 PackageElem::Package(package) => interface.send_package(send_buf, package, false).await,
746 PackageElem::RawData(data) => interface.send_raw_data(data).await
747 }
748 }
749
750 async fn handle_signal(
751 interface: &PackageInterface,
752 piece_reader: &mut ringbuf::Consumer<u8>,
753 send_buf: &mut [u8],
754 signal: SignalElem
755 ) -> BuckyResult<()> {
756 match signal {
757 SignalElem::Package(package) => handle_package(interface, send_buf, package).await,
758 SignalElem::Command(command) => handle_command(piece_reader, command)
759 }
760 }
761
762
763 async fn handle_piece(
764 interface: &PackageInterface,
765 send_buf: &mut [u8],
766 piece_reader: &mut ringbuf::Consumer<u8>,
767 signal_reader: &Receiver<SignalElem>) -> BuckyResult<()> {
768 loop {
769 let len = piece_reader.pop_slice(send_buf);
770 if len > 0 {
771 assert_eq!(len, send_buf.len());
772 let (box_len, _) = u16::raw_decode(send_buf).unwrap();
773 match interface.send_raw_buffer(&mut send_buf[..u16::raw_bytes().unwrap() + box_len as usize]).await {
774 Ok(_) => {
775 },
777 Err(err) => {
778 break Err(err);
779 }
780 }
781 } else {
782 break Ok(());
783 }
784 if signal_reader.len() > 0 {
785 break Ok(());
787 }
788 }
789 }
790
791 match future::timeout(owner.config().tcp.piece_interval, signal_reader.recv()).await {
792 Ok(recv) => {
793 match recv {
794 Ok(signal) => {
795 match handle_signal(&interface, &mut piece_reader, &mut send_buf, signal).await {
796 Ok(_) => {
797 if signal_reader.len() == 0 {
798 match handle_piece(&interface, &mut piece_buf, &mut piece_reader, &signal_reader).await {
799 Ok(_) => {
800 },
802 Err(err) => {
803 tunnel.on_interface_error(&interface, &err);
804 info!("{} send loop break for err {}", tunnel, err);
805 break;
806 }
807 }
808 }
809 },
810 Err(err) => {
811 tunnel.on_interface_error(&interface, &err);
812 info!("{} send loop break for err {}", tunnel, err);
813 break;
814 }
815 }
816 },
817 Err(err) => {
818 info!("{} send loop break for err {}", tunnel, err);
819 break;
820 }
821 }
822 }
823 Err(_err) => {
824 match handle_piece(&interface, &mut piece_buf, &mut piece_reader, &signal_reader).await {
825 Ok(_) => {
826 },
828 Err(err) => {
829 tunnel.on_interface_error(&interface, &err);
830 info!("{} send loop break for err {}", tunnel, err);
831 break;
832 }
833 }
834 }
835 }
836 }
837 });
838 }
839
840 async fn recv_inner(
841 tunnel: Self,
842 owner: TunnelContainer,
843 interface: tcp::PackageInterface) {
844 let mut recv_buf = [0u8; udp::MTU_LARGE];
846 loop {
847 match interface.receive_package(&mut recv_buf).await {
849 Ok(recv_box) => {
850 tunnel.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
851 match recv_box {
852 RecvBox::Package(package_box) => {
853 let stack = owner.stack();
854 if package_box.has_exchange() {
855 stack.keystore().add_key(package_box.key(), package_box.remote());
857 }
858 if let Err(err) = package_box.packages().iter().try_for_each(|pkg| {
859 if pkg.cmd_code() == PackageCmdCode::PingTunnel {
860 tunnel.on_package(AsRef::<PingTunnel>::as_ref(pkg), None).map(|_| ())
861 } else if pkg.cmd_code() == PackageCmdCode::PingTunnelResp {
862 tunnel.on_package(AsRef::<PingTunnelResp>::as_ref(pkg), None).map(|_| ())
863 } else {
864 downcast_session_handle!(pkg, |pkg| owner.on_package(pkg, None)).map(|_| ())
865 }
866 }) {
867 warn!("{} package error {}", tunnel, err);
868 }
869 },
870 RecvBox::RawData(raw_data) => {
871 let _ = owner.on_raw_data(raw_data, DynamicTunnel::new(tunnel.clone()));
872 }
873 }
874 },
875 Err(err) => {
876 tunnel.on_interface_error(&interface, &err);
877 break;
878 }
879 }
880 }
881 }
882
883 fn start_recv(
884 &self,
885 owner: TunnelContainer,
886 interface: tcp::PackageInterface,
887 dead_waiter: AbortRegistration) {
888 let (cancel, reg) = AbortHandle::new_pair();
889 task::spawn(Abortable::new(Self::recv_inner(self.clone(), owner, interface), reg));
890 let tunnel = self.clone();
891 task::spawn(async move {
892 let _ = StateWaiter::wait(dead_waiter, || ()).await;
893 error!("{} break recv loop for tunnel dead", tunnel);
894 cancel.abort();
895 });
896 }
897
898 async fn retain_connect(
899 &self,
900 retain_connect_timestamp: Timestamp,
901 ping_interval: Duration,
902 ping_timeout: Duration) {
903 if self.0.retain_connect_timestamp.load(Ordering::SeqCst) != retain_connect_timestamp {
904 debug!("ignore retain connect for timestamp missmatch, tunnel:{}", self);
905 return ;
906 }
907 if self.0.keeper_count.load(Ordering::SeqCst) == 0 {
908 debug!("ignore retain connect for zero retain count, tunnel:{}", self);
909 return ;
910 }
911
912 if !self.is_reverse() {
913 info!("begin retain connect, tunnel:{}", self);
914 let _ = self.connect();
915 }
916
917 let tunnel = self.clone();
918 task::spawn(async move {
919 loop {
920 if tunnel.0.keeper_count.load(Ordering::SeqCst) == 0 {
921 info!("break ping loop for release keeper, tunnel:{}", tunnel);
922 break;
923 }
924 match {
925 let state = &*tunnel.0.state.lock().unwrap();
926 match state {
927 TunnelState::Active(active) => {
928 Ok(Some(active.owner.clone()))
929 },
930 TunnelState::Dead => {
931 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
932 },
933 _ => {
934 Ok(None)
935 }
936 }
937 } {
938 Ok(owner) => {
939 if owner.is_some() {
940 let now = bucky_time_now();
941 let miss_active_time = Duration::from_micros(now - tunnel.0.last_active.load(Ordering::SeqCst));
942 if miss_active_time > ping_timeout {
943 if let Some((owner, cur_state, dead_waiters)) = {
944 let state = &mut *tunnel.0.state.lock().unwrap();
945 if let TunnelState::Active(active) = state {
946 error!("dead for ping timeout, tunnel:{}", tunnel);
947 let cur_state = tunnel::TunnelState::Active(active.remote_timestamp);
948 let owner = active.owner.clone();
949 let mut dead_waiters = StateWaiter::new();
950 std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
951 *state = TunnelState::Dead;
952 Some((owner, cur_state, dead_waiters))
953 } else {
954 None
955 }
956 } {
957 dead_waiters.wake();
958 owner.sync_tunnel_state(&tunnel::DynamicTunnel::new(tunnel.clone()), cur_state, tunnel::Tunnel::state(&tunnel));
959 }
960 break;
961 }
962 if miss_active_time > ping_interval {
963 if tunnel.0.keeper_count.load(Ordering::SeqCst) > 0 {
964 info!("send ping, tunnel:{}", tunnel);
965 let ping = PingTunnel {
966 package_id: 0,
967 send_time: now,
968 recv_data: 0,
969 };
970 let _ = tunnel::Tunnel::send_package(&tunnel, DynamicPackage::from(ping));
971 }
972 }
973 }
974 let _ = future::timeout(ping_interval, future::pending::<()>()).await;
975 },
976 Err(e) => {
977 error!("break ping loop, tunnel:{}, err:{}", tunnel, e);
978 break;
979 }
980 }
981 };
982 });
983 }
984}
985
986#[async_trait]
987impl tunnel::Tunnel for Tunnel {
988 fn mtu(&self) -> usize {
989 self.0.mtu
990 }
991
992 fn as_any(&self) -> &dyn std::any::Any {
993 self
994 }
995
996 fn local(&self) -> &Endpoint {
997 self.0.local_remote.local()
998 }
999
1000 fn remote(&self) -> &Endpoint {
1001 self.0.local_remote.remote()
1002 }
1003
1004 fn proxy(&self) -> ProxyType {
1005 ProxyType::None
1006 }
1007
1008 fn state(&self) -> tunnel::TunnelState {
1009 match &*self.0.state.lock().unwrap() {
1010 TunnelState::Connecting(_) => {
1011 tunnel::TunnelState::Connecting
1012 },
1013 TunnelState::PreActive(pre_active) => {
1014 tunnel::TunnelState::Active(pre_active.remote_timestamp)
1015 },
1016 TunnelState::Active(active) => {
1017 tunnel::TunnelState::Active(active.remote_timestamp)
1018 },
1019 TunnelState::Dead => {
1020 tunnel::TunnelState::Dead
1021 }
1022 }
1023 }
1024
1025 fn send_package(&self, package: DynamicPackage) -> Result<usize, BuckyError> {
1026 if package.cmd_code() == PackageCmdCode::SessionData {
1027 return Err(BuckyError::new(BuckyErrorCode::UnSupport, "session data should not send from tcp tunnel"));
1028 }
1029 let (signal_writer, to_connect) = {
1030 match &*self.0.state.lock().unwrap() {
1031 TunnelState::PreActive(pre_active) => {
1032 Ok((pre_active.signal_writer.clone(), true))
1033 },
1034 TunnelState::Active(active) => {
1035 Ok((active.signal_writer.clone(), false))
1036 },
1037 _ => {
1038 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel not active"))
1039 }
1040 }
1041 }?;
1042 let _ = signal_writer.try_send(SignalElem::Package(PackageElem::Package(package)));
1043 if to_connect {
1044 let _ = self.connect();
1045 }
1046
1047 Ok(0)
1048 }
1049
1050 fn raw_data_header_len(&self) -> usize {
1051 tcp::PackageInterface::raw_header_data_len()
1052 }
1053
1054 fn send_raw_data(&self, data: &mut [u8]) -> Result<usize, BuckyError> {
1055 let (signal_writer, to_connect) = {
1056 match &*self.0.state.lock().unwrap() {
1057 TunnelState::PreActive(pre_active) => {
1058 Ok((pre_active.signal_writer.clone(), true))
1059 },
1060 TunnelState::Active(active) => {
1061 Ok((active.signal_writer.clone(), false))
1062 },
1063 _ => {
1064 Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel not active"))
1065 }
1066 }
1067 }?;
1068 let len = data.len();
1069 let _ = signal_writer.try_send(SignalElem::Package(PackageElem::RawData(Vec::from(data))));
1070 if to_connect {
1071 let _ = self.connect();
1072 }
1073 Ok(len)
1074 }
1075
1076 fn ptr_eq(&self, other: &tunnel::DynamicTunnel) -> bool {
1077 *self.local() == *other.as_ref().local()
1078 && *self.remote() == *other.as_ref().remote()
1079 && Arc::ptr_eq(&self.0, &other.clone_as_tunnel::<Tunnel>().0)
1080 }
1081
1082 fn retain_keeper(&self) {
1083 info!("{} retain keeper", self);
1084 if 0 != self.0.keeper_count.fetch_add(1, Ordering::SeqCst) {
1085 return ;
1086 }
1087 let owner = {
1088 let state = &mut *self.0.state.lock().unwrap();
1089 match state {
1090 TunnelState::Connecting(_) => None,
1091 TunnelState::PreActive(pre_active) => Some(pre_active.owner.clone()),
1092 TunnelState::Active(_) => None,
1093 TunnelState::Dead => None
1094 }
1095 };
1096 if owner.is_none() {
1097 return ;
1098 }
1099
1100 let owner = owner.unwrap();
1101 let retain_connect_timestamp = bucky_time_now();
1102 self.0.retain_connect_timestamp.store(retain_connect_timestamp, Ordering::SeqCst);
1103 let tunnel = self.clone();
1104 task::spawn(async move {
1105 let _ = future::timeout(owner.config().tcp.retain_connect_delay, future::pending::<()>()).await;
1106 tunnel.retain_connect(retain_connect_timestamp, owner.config().tcp.ping_interval, owner.config().tcp.ping_timeout).await;
1107 });
1108 }
1109
1110 fn release_keeper(&self) {
1111 info!("{} release keeper", self);
1112 self.0.keeper_count.fetch_add(-1, Ordering::SeqCst);
1113 }
1114
1115 fn reset(&self) {
1116 info!("{} reset to Dead", self);
1117 if let Some(dead_waiters) = {
1118 let state = &mut *self.0.state.lock().unwrap();
1119 match state {
1120 TunnelState::Active(active) => {
1121 let mut dead_waiters = StateWaiter::new();
1122 std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
1123 *state = TunnelState::Dead;
1124 Some(dead_waiters)
1125 },
1126 _ => None
1127 }
1128 } {
1129 dead_waiters.wake();
1130 }
1131 }
1132
1133 fn mark_dead(&self, former_state: tunnel::TunnelState) {
1134 let notify = match &former_state {
1135 tunnel::TunnelState::Connecting => {
1136 let state = &mut *self.0.state.lock().unwrap();
1137 match state {
1138 TunnelState::Connecting(connecting) => {
1139 info!("{} Connecting=>Dead", self);
1140 let owner = connecting.owner.clone();
1141 *state = TunnelState::Dead;
1142 Some((owner, tunnel::TunnelState::Dead, None))
1143 },
1144 _ => {
1145 None
1146 }
1147 }
1148 },
1149 tunnel::TunnelState::Active(remote_timestamp) => {
1150 let remote_timestamp = *remote_timestamp;
1151 let state = &mut *self.0.state.lock().unwrap();
1152 match state {
1153 TunnelState::Active(active) => {
1154 let owner = active.owner.clone();
1155 if active.remote_timestamp == remote_timestamp {
1156 info!("{} Active({})=>Dead for active by {}", self, active.remote_timestamp, remote_timestamp);
1157 let mut dead_waiters = StateWaiter::new();
1158 std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
1159 *state = TunnelState::Dead;
1160 Some((owner, tunnel::TunnelState::Dead, Some(dead_waiters)))
1161 } else {
1162 None
1163 }
1164 },
1165 _ => {
1166 None
1167 }
1168 }
1169 },
1170 tunnel::TunnelState::Dead => None
1171 };
1172
1173 if let Some((owner, new_state, dead_waiters)) = notify {
1174 if let Some(dead_waiters) = dead_waiters {
1175 dead_waiters.wake();
1176 }
1177 owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, new_state);
1178 }
1179 }
1180}
1181
1182impl OnPackage<PingTunnel> for Tunnel {
1183 fn on_package(&self, ping: &PingTunnel, _context: Option<()>) -> Result<OnPackageResult, BuckyError> {
1184 let ping_resp = PingTunnelResp {
1185 ack_package_id: ping.package_id,
1186 send_time: bucky_time_now(),
1187 recv_data: 0,
1188 };
1189 let _ = tunnel::Tunnel::send_package(self, DynamicPackage::from(ping_resp));
1190 Ok(OnPackageResult::Handled)
1191 }
1192}
1193
1194impl OnPackage<PingTunnelResp> for Tunnel {
1195 fn on_package(&self, _pkg: &PingTunnelResp, _context: Option<()>) -> Result<OnPackageResult, BuckyError> {
1196 Ok(OnPackageResult::Handled)
1198 }
1199}
1200
1201impl OnTcpInterface for Tunnel {
1202 fn on_tcp_interface(&self, interface: tcp::AcceptInterface, first_box: PackageBox) -> Result<OnPackageResult, BuckyError> {
1203 assert_eq!(self.is_reverse(), true);
1204 assert_eq!(first_box.packages_no_exchange().len(), 1);
1205 let first_package = &first_box.packages_no_exchange()[0];
1206 if first_package.cmd_code() == PackageCmdCode::SynTunnel {
1207 let syn_tunnel: &SynTunnel = first_package.as_ref();
1208 let remote_timestamp = syn_tunnel.from_device_desc.body().as_ref().unwrap().update_time();
1209 let (owner, ret) = {
1210 let state = &mut *self.0.state.lock().unwrap();
1211 match state {
1212 TunnelState::Connecting(connecting) => {
1213 info!("{} accept interface {} in connecting", self, interface);
1214 (Some(connecting.owner.clone()), ACK_TUNNEL_RESULT_OK)
1215 },
1216 TunnelState::PreActive(pre_active) => {
1217 info!("{} accept interface {} in PreActive", self, interface);
1218 (Some(pre_active.owner.clone()), ACK_TUNNEL_RESULT_OK)
1219 },
1220 TunnelState::Active(active) => {
1221 if active.remote_timestamp < remote_timestamp {
1222 info!("{} accept interface {} for active remote timestamp update from {} to {}", self, interface, active.remote_timestamp, remote_timestamp);
1223 (Some(active.owner.clone()), ACK_TUNNEL_RESULT_OK)
1224 } else if active.syn_seq < syn_tunnel.sequence {
1225 info!("{} accept interface {} for active sequence update from {:?} to {:?}", self, interface, active.syn_seq, syn_tunnel.sequence);
1226 (Some(active.owner.clone()), ACK_TUNNEL_RESULT_OK)
1227 } else {
1228 info!("{} refuse interface {} for already active", self, interface);
1229 (Some(active.owner.clone()), ACK_TUNNEL_RESULT_REFUSED)
1230 }
1231 },
1232 TunnelState::Dead => {
1233 info!("{} refuse interface {} for dead", self, interface);
1234 (None, ACK_TUNNEL_RESULT_REFUSED)
1235 }
1236 }
1237 };
1238 if let Some(owner) = owner {
1239 owner.on_package(syn_tunnel, None)?;
1240 let ack_tunnel = AckTunnel {
1241 protocol_version: owner.protocol_version(),
1242 stack_version: owner.stack_version(),
1243 sequence: syn_tunnel.sequence,
1244 result: ret,
1245 send_time: bucky_time_now(),
1246 mtu: udp::MTU as u16,
1247 to_device_desc: owner.stack().sn_client().ping().default_local()
1248 };
1249 let tunnel = self.clone();
1250 task::spawn(async move {
1251 let syn_seq = ack_tunnel.sequence;
1252 let confirm_ret = interface.confirm_accept(vec![DynamicPackage::from(ack_tunnel)]).await;
1253 if ret == ACK_TUNNEL_RESULT_OK {
1254 tunnel.active_with_interface(confirm_ret.map(|_| (interface.into(), remote_timestamp, syn_seq)));
1255 } else {
1256 }
1258 });
1259 }
1260 Ok(OnPackageResult::Handled)
1261 } else if first_package.cmd_code() == PackageCmdCode::TcpSynConnection {
1262 let syn_stream: &TcpSynConnection = first_package.as_ref();
1263 let owner = self.pre_active(syn_stream.from_device_desc.body().as_ref().unwrap().update_time())?;
1264 owner.on_package(syn_stream, interface)
1265 } else if first_package.cmd_code() == PackageCmdCode::TcpAckConnection {
1266 let ack_stream: &TcpAckConnection = first_package.as_ref();
1267 let owner = self.pre_active(ack_stream.to_device_desc.body().as_ref().unwrap().update_time())?;
1268 owner.on_package(ack_stream, interface)
1269 } else {
1270 unreachable!()
1271 }
1272 }
1273}
1274
1275