cyfs_bdt/tunnel/
tcp.rs

1use log::*;
2use std::{
3    sync::atomic::{AtomicI32, AtomicU64, Ordering},  
4    time::Duration
5};
6use cyfs_debug::Mutex;
7use async_std::{
8    sync::{Arc}, 
9    channel::{bounded, Sender, Receiver}, 
10    task, 
11    future
12};
13use futures::future::{Abortable, AbortHandle, AbortRegistration};
14use async_trait::{async_trait};
15use ringbuf;
16use cyfs_base::*;
17use crate::{
18    types::*,
19    protocol::{self, *, v0::*},
20    history::keystore, 
21    MTU,
22    interface::{self, *, tcp::{OnTcpInterface, RecvBox, PackageInterface}}
23};
24use super::{tunnel::{self, DynamicTunnel, TunnelOwner, ProxyType}, TunnelContainer};
25
26#[derive(Clone)]
27pub struct Config {
28    pub connect_timeout: Duration, 
29    pub confirm_timeout: Duration, 
30    pub accept_timeout: Duration,
31    // 调用retain_keeper 之后延迟多久开始尝试从preactive 进入 active状态
32    pub retain_connect_delay: Duration, 
33    pub ping_interval: Duration, 
34    pub ping_timeout: Duration,
35
36    pub package_buffer: usize, 
37    pub piece_buffer: usize,
38    // 检查发送piece buffer的间隔
39    pub piece_interval: Duration,
40}
41
42enum TunnelState {
43    Connecting(ConnectingState),
44    // 通过历史判定可以联通,但是并没有创建 Interface的状态
45    // 当通过Tunnel发包时,进入Connecting状态去连接 
46    PreActive(PreActiveState), 
47    Active(ActiveState), 
48    Dead, 
49} 
50
51
52// #[derive(Clone, Copy)]
53enum ConnectorState {
54    None, 
55    Connecting, 
56    ReverseConnecting(AbortHandle)
57}
58
59struct ConnectingState {
60    owner: TunnelContainer, 
61    connector: ConnectorState 
62}
63
64
65
66enum PackageElem {
67    Package(DynamicPackage), 
68    RawData(Vec<u8>),  
69}
70
71enum CommandElem {
72    Discard(usize)
73}
74
75enum SignalElem {
76    Package(PackageElem), 
77    Command(CommandElem)
78}
79
80
81struct PreActiveState {
82    owner: TunnelContainer, 
83    connector: ConnectorState, 
84    remote_timestamp: Timestamp, 
85    signal_writer: Sender<SignalElem>,
86    signal_reader: Receiver<SignalElem>, 
87}
88
89struct ActiveState {
90    owner: TunnelContainer, 
91    interface: tcp::PackageInterface, 
92    remote_timestamp: Timestamp, 
93    syn_seq: TempSeq, 
94    signal_writer: Sender<SignalElem>,
95    piece_writer: ringbuf::Producer<u8>, 
96    dead_waiters: StateWaiter
97}
98
99struct TunnelImpl {
100    remote_device_id: DeviceId, 
101    local_remote: EndpointPair, 
102    keeper_count: AtomicI32, 
103    last_active: AtomicU64, 
104    retain_connect_timestamp: AtomicU64, 
105    state: Mutex<TunnelState>,
106    mtu: usize,
107}
108
109#[derive(Clone)]
110pub struct Tunnel(Arc<TunnelImpl>);
111
112impl std::fmt::Display for Tunnel {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "TcpTunnel{{remote_device:{}, local:{}, remote:{}}}", self.0.remote_device_id, tunnel::Tunnel::local(self), tunnel::Tunnel::remote(self))
115    }
116}
117
118impl Tunnel {
119    pub fn new(
120        owner: TunnelContainer, 
121        ep_pair: EndpointPair) -> Self {
122        let remote_device_id = owner.remote().clone();
123        let tunnel = Self(Arc::new(TunnelImpl {
124            mtu: MTU-12, 
125            remote_device_id, 
126            local_remote: ep_pair, 
127            keeper_count: AtomicI32::new(0), 
128            last_active: AtomicU64::new(0), 
129            retain_connect_timestamp: AtomicU64::new(0), 
130            state: Mutex::new(TunnelState::Connecting(
131                ConnectingState {
132                    owner, 
133                    connector: ConnectorState::None
134                }))
135        }));
136        info!("{} created with state: {:?}", tunnel, tunnel::Tunnel::state(&tunnel));
137        tunnel
138    }
139
140    pub fn pre_active(&self, remote_timestamp: Timestamp) -> BuckyResult<TunnelContainer> {
141        self.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
142        struct NextStep {
143            owner: TunnelContainer, 
144            former_state: tunnel::TunnelState, 
145            cur_state: tunnel::TunnelState
146        }
147        let next_step = {
148            let state = &mut *self.0.state.lock().unwrap();
149            match state {
150                TunnelState::Connecting(connecting) => {
151                    info!("{} Connecting=>PreActive", self);
152                    let owner = connecting.owner.clone();
153                    let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
154                    *state = TunnelState::PreActive(PreActiveState {
155                        owner: owner.clone(), 
156                        remote_timestamp, 
157                        connector: match connecting.connector {
158                            ConnectorState::None => ConnectorState::None, 
159                            ConnectorState::Connecting => ConnectorState::Connecting, 
160                            ConnectorState::ReverseConnecting(_) => unreachable!()
161                        },
162                        signal_writer, 
163                        signal_reader
164                    });
165                    Ok(NextStep {
166                        owner, 
167                        former_state: tunnel::TunnelState::Connecting, 
168                        cur_state: tunnel::TunnelState::Active(remote_timestamp)})
169                }, 
170                TunnelState::PreActive(pre_active) => {
171                    if pre_active.remote_timestamp > remote_timestamp {
172                        Ok((tunnel::TunnelState::Active(remote_timestamp), tunnel::TunnelState::Active(remote_timestamp)))
173                    } else if pre_active.remote_timestamp == remote_timestamp {
174                        Ok((tunnel::TunnelState::Active(remote_timestamp), tunnel::TunnelState::Active(remote_timestamp)))
175                    } else {
176                        let former_state = tunnel::TunnelState::Active(pre_active.remote_timestamp);
177                        pre_active.remote_timestamp = remote_timestamp;
178                        Ok((former_state, tunnel::TunnelState::Active(remote_timestamp)))
179                    }.map(|(former_state, cur_state)| NextStep {
180                        owner: pre_active.owner.clone(), 
181                        former_state, 
182                        cur_state 
183                    })
184                }, 
185                TunnelState::Active(active) => {
186                    if active.remote_timestamp < remote_timestamp {
187                        info!("{} Active=>PreActive", self);
188                        let owner = active.owner.clone();
189                        let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
190                        let former_state = tunnel::TunnelState::Active(active.remote_timestamp);
191                        *state = TunnelState::PreActive(PreActiveState {
192                            owner: owner.clone(), 
193                            remote_timestamp, 
194                            connector: ConnectorState::None, 
195                            signal_writer, 
196                            signal_reader
197                        });
198                        Ok(NextStep {
199                            owner, 
200                            former_state, 
201                            cur_state: tunnel::TunnelState::Active(remote_timestamp)
202                        })
203                    } else {
204                        Ok(NextStep {
205                            owner: active.owner.clone(), 
206                            former_state: tunnel::TunnelState::Active(active.remote_timestamp), 
207                            cur_state: tunnel::TunnelState::Active(active.remote_timestamp)
208                        })
209                    }
210                },
211                TunnelState::Dead => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel dead"))
212            }
213        }?;
214        if next_step.former_state != next_step.cur_state {
215            next_step.owner.sync_tunnel_state(
216                &DynamicTunnel::new(self.clone()), 
217                next_step.former_state, 
218                next_step.cur_state);
219        }
220        Ok(next_step.owner)
221    }
222
223    pub fn is_reverse(&self) -> bool {
224        tunnel::Tunnel::remote(self).addr().port() == 0
225    }
226
227    pub fn is_data_piece_full(&self) -> BuckyResult<bool> {
228        let state = &mut *self.0.state.lock().unwrap();
229        match state {
230            TunnelState::Active(active) => {
231                Ok(active.piece_writer.capacity() == active.piece_writer.len()) 
232            }, 
233            _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
234        }
235    }
236
237    pub fn discard_data_piece(&self) -> BuckyResult<()> {
238        let (signal_writer, len) = {
239            let state = &mut *self.0.state.lock().unwrap();
240            match state {
241                TunnelState::Active(active) => Ok((active.signal_writer.clone(), active.piece_writer.len())), 
242                _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
243            }
244        }?;
245        if len > 0 {
246            info!("{} send discard command: {}", self, len);
247            let _ = signal_writer.try_send(SignalElem::Command(CommandElem::Discard(len)));
248        }
249        Ok(())
250    }
251
252    pub fn send_data_piece(&self, buf: &[u8]) -> BuckyResult<()> {
253        let state = &mut *self.0.state.lock().unwrap();
254        match state {
255            TunnelState::Active(active) => {
256                if active.piece_writer.remaining() >= buf.len() {
257                    let len = active.piece_writer.push_slice(buf);
258                    assert_eq!(len, buf.len());
259                    Ok(())
260                } else {
261                    Err(BuckyError::new(BuckyErrorCode::Pending, "full"))
262                }
263            }, 
264            _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "not active"))
265        }
266    }
267
268    fn active_with_interface(&self, interface: Result<(tcp::PackageInterface, Timestamp, TempSeq), BuckyError>) {
269        match interface {
270            Ok((interface, remote_timestamp, syn_seq)) => {
271                struct NextStep {
272                    owner: TunnelContainer, 
273                    former_state: tunnel::TunnelState, 
274                    cur_state: tunnel::TunnelState, 
275                    signal_reader: Receiver<SignalElem>, 
276                    piece_reader: ringbuf::Consumer<u8>, 
277                    reverse_waiter: Option<AbortHandle>, 
278                    to_close: Option<tcp::PackageInterface>, 
279                    dead_waiter: AbortRegistration
280                }
281                self.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
282                if let Some(next_step) = {
283                    let state = &mut *self.0.state.lock().unwrap();
284                    match state {
285                        TunnelState::Connecting(connecting) => {
286                            info!("{} connecting => active(remote:{}, seq:{:?})", self, remote_timestamp, syn_seq);
287                            let owner = connecting.owner.clone();
288                            let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
289                            let ring_buf = ringbuf::RingBuffer::<u8>::new(owner.config().tcp.piece_buffer * udp::MTU);
290                            let (piece_writer, piece_reader) = ring_buf.split();
291                            let mut dead_waiters = StateWaiter::new();
292                            let dead_waiter = dead_waiters.new_waiter();
293                            *state = TunnelState::Active(ActiveState {
294                                owner: owner.clone(), 
295                                interface: interface.clone(), 
296                                remote_timestamp, 
297                                syn_seq, 
298                                signal_writer, 
299                                piece_writer, 
300                                dead_waiters
301                            });
302                            
303                            Some(NextStep {
304                                owner, 
305                                former_state: tunnel::TunnelState::Connecting, 
306                                cur_state: tunnel::TunnelState::Active(remote_timestamp), 
307                                signal_reader, 
308                                piece_reader,   
309                                reverse_waiter: None, 
310                                to_close: None, 
311                                dead_waiter
312                            })
313                        }, 
314                        TunnelState::PreActive(pre_active) => {
315                            //FIXME: 检查 preactive 的 remote timestamp 和  active 的 remote timestamp
316                            info!("{} PreActive => Active(remote:{}, seq:{:?})", self, remote_timestamp, syn_seq);
317                            let former_state = tunnel::TunnelState::Active(pre_active.remote_timestamp);
318                            let owner = pre_active.owner.clone();
319
320                            let ring_buf = ringbuf::RingBuffer::<u8>::new(owner.config().tcp.piece_buffer * udp::MTU);
321                            let (piece_writer, piece_reader) = ring_buf.split();
322
323                            let signal_reader = pre_active.signal_reader.clone();
324                            let signal_writer = pre_active.signal_writer.clone();
325
326                            let mut dead_waiters = StateWaiter::new();
327                            let dead_waiter = dead_waiters.new_waiter();
328
329                            let reverse_waiter = match &pre_active.connector {
330                                ConnectorState::ReverseConnecting(waiter) => {
331                                    Some(waiter.clone())
332                                }, 
333                                _ => None
334                            };
335                            *state = TunnelState::Active(ActiveState {
336                                owner: owner.clone(), 
337                                interface: interface.clone(),  
338                                remote_timestamp, 
339                                syn_seq, 
340                                signal_writer, 
341                                piece_writer, 
342                                dead_waiters
343                            });
344                            Some(NextStep {
345                                owner, 
346                                former_state, 
347                                cur_state: tunnel::TunnelState::Active(remote_timestamp), 
348                                signal_reader, 
349                                piece_reader, 
350                                reverse_waiter, 
351                                to_close: None, 
352                                dead_waiter
353                            })
354                        },
355                        TunnelState::Active(active) => {
356                            if active.remote_timestamp < remote_timestamp 
357                                || active.syn_seq < syn_seq {
358                                info!("{} Active(remote:{}, seq:{:?}) => Active(remote:{}, seq:{:?})", self, active.remote_timestamp, active.syn_seq, remote_timestamp, syn_seq);
359                                let former_state = tunnel::TunnelState::Active(active.remote_timestamp);
360                                let owner = active.owner.clone();
361    
362                                let ring_buf = ringbuf::RingBuffer::<u8>::new(owner.config().tcp.piece_buffer * udp::MTU);
363                                let (piece_writer, piece_reader) = ring_buf.split();
364                                let (signal_writer, signal_reader) = bounded(owner.config().tcp.package_buffer);
365                                let to_close = Some(active.interface.clone());
366                                
367                                let mut dead_waiters = StateWaiter::new();
368                                let dead_waiter = dead_waiters.new_waiter();
369
370                                *state = TunnelState::Active(ActiveState {
371                                    owner: owner.clone(), 
372                                    interface: interface.clone(),  
373                                    remote_timestamp, 
374                                    syn_seq, 
375                                    signal_writer, 
376                                    piece_writer, 
377                                    dead_waiters
378                                });
379                                Some(NextStep {
380                                    owner, 
381                                    former_state, 
382                                    cur_state: tunnel::TunnelState::Active(remote_timestamp), 
383                                    signal_reader, 
384                                    piece_reader, 
385                                    reverse_waiter: None, 
386                                    to_close, 
387                                    dead_waiter
388                                })
389                            } else {
390                                None
391                            }
392                        },
393                        _ => None
394                    }
395                } {
396                    if let Some(reverse_waiter) = next_step.reverse_waiter {
397                        reverse_waiter.abort();
398                    }
399                    self.start_recv(next_step.owner.clone(), interface, next_step.dead_waiter);
400                    self.start_send(next_step.owner.clone(), next_step.signal_reader, next_step.piece_reader);
401
402                    if next_step.former_state != next_step.cur_state {
403                        next_step.owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), next_step.former_state, next_step.cur_state);
404                    }
405
406                    if let Some(to_close) = next_step.to_close {
407                        info!("{} will close older {}", self, to_close);
408                        to_close.close();
409                    }
410                }
411            }, 
412            Err(err) => {
413                info!("{} dead for {}", self, err);
414                if let Some((owner, former_state)) = {
415                    let state = &mut *self.0.state.lock().unwrap();
416                    match state {
417                        TunnelState::Connecting(connecting) => {
418                            info!("{} connecting => dead", self);
419                            let owner = connecting.owner.clone();
420                            *state = TunnelState::Dead;
421                            Some((owner, tunnel::TunnelState::Connecting))
422                        }, 
423                        TunnelState::PreActive(pre_active) => {
424                            info!("{} PreActive => dead", self);
425                            let former_state = tunnel::TunnelState::Active(pre_active.remote_timestamp);
426                            let owner = pre_active.owner.clone();
427                            *state = TunnelState::Dead;
428                            Some((owner, former_state))
429                        },
430                        TunnelState::Active(_) => {
431                            None
432                        },
433                        _ => {
434                            // do nothing
435                            None
436                        }
437                    }
438                } {
439                    owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, tunnel::TunnelState::Dead);
440                }
441            }
442        }
443    }
444
445    pub(super) fn connect(&self) -> Result<(), BuckyError> {
446        if !self.is_reverse() {
447            info!("{} connect", self);
448            let owner = {
449                let state = &mut *self.0.state.lock().unwrap();
450                match state {
451                    // build tunnel的时候会从Connecting状态调用Connect
452                    TunnelState::Connecting(connecting) => {
453                        match connecting.connector {
454                            ConnectorState::None => {
455                                connecting.connector = ConnectorState::Connecting;
456                                Ok(connecting.owner.clone())
457                            }, 
458                            _ => {
459                                Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
460                            }
461                        } 
462                    }, 
463                    TunnelState::PreActive(pre_active) => {
464                        match pre_active.connector {
465                            ConnectorState::None => {
466                                pre_active.connector = ConnectorState::Connecting;
467                                Ok(pre_active.owner.clone())
468                            }, 
469                            _ => {
470                                Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
471                            }
472                        } 
473                    },
474                    TunnelState::Active(_) => {
475                        Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
476                    }, 
477                    TunnelState::Dead => {
478                        Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already dead"))
479                    }
480                }
481            }.map_err(|err| {debug!("{} connect failed for {}", self, err); err})?;
482            
483            let tunnel = self.clone();
484            task::spawn(async move {
485                tunnel.active_with_interface(tunnel.connect_inner(owner.clone(), None).await);
486            });
487                
488        } else {
489            info!("{} reverse connect", self);
490            let (owner, reg) = {
491                let state = &mut *self.0.state.lock().unwrap();
492                match state {
493                    TunnelState::Connecting(_) => {
494                        unreachable!()
495                    }, 
496                    TunnelState::PreActive(pre_active) => {
497                        match pre_active.connector {
498                            ConnectorState::None => {
499                                let (waiter, reg) = AbortHandle::new_pair();
500                                pre_active.connector = ConnectorState::ReverseConnecting(waiter);
501                                Ok((pre_active.owner.clone(), reg))
502                            }, 
503                            _ => {
504                                debug!("{} ignore reverse connect for reverse connecting", self);
505                                Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
506                            }
507                        } 
508                    },
509                    TunnelState::Active(_) => {
510                        Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
511                    }, 
512                    TunnelState::Dead => {
513                        Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already dead"))
514                    }
515                }
516            }.map_err(|err| {debug!("{} connect failed for {}", self, err); err})?;
517
518            let tunnel = self.clone();
519            task::spawn(async move {
520                match tunnel.reverse_connect_inner(owner, reg).await {
521                    Ok(_) => {
522                        // do nothing
523                    }, 
524                    Err(err) => {
525                        info!("{} reverse connect failed for {}", tunnel, err);
526                        tunnel.active_with_interface(Err(err));
527                    }
528                };
529            });
530        }
531        Ok(())
532    }
533
534    pub(crate) fn connect_with_interface(&self, interface: tcp::Interface) -> Result<(), BuckyError> {
535        info!("{} connect_with_interface", self);
536        let owner = {
537            let state = &mut *self.0.state.lock().unwrap();
538            match state {
539                TunnelState::Connecting(connecting) => {
540                    match connecting.connector {
541                        ConnectorState::None => {
542                            connecting.connector = ConnectorState::Connecting;
543                            Ok(connecting.owner.clone())
544                        }, 
545                        _ => {
546                            Err(BuckyError::new(BuckyErrorCode::AlreadyExists, "connector exists"))
547                        }
548                    } 
549                }, 
550                TunnelState::PreActive(_) => {
551                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
552                },
553                TunnelState::Active(_) => {
554                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already active"))
555                }, 
556                TunnelState::Dead => {
557                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel already dead"))
558                }
559            }
560        }.map_err(|err| {debug!("{} connect failed for {}", self, err); err})?;
561        
562        let tunnel = self.clone();
563        task::spawn(async move {
564            
565            tunnel.active_with_interface(tunnel.connect_inner(owner.clone(), Some(interface)).await);
566        });
567        Ok(())
568    }
569
570    async fn connect_inner(&self, owner: TunnelContainer, interface: Option<tcp::Interface>) -> Result<(tcp::PackageInterface, Timestamp, TempSeq), BuckyError> {
571        info!("{} connect interface", self);
572        let stack = owner.stack();
573        let key_stub = stack.keystore().create_key(owner.remote_const(), true);
574        let interface = if let Some(interface) = interface {
575            Ok(interface)
576        } else {
577            tcp::Interface::connect(
578            // tunnel::Tunnel::local(self).addr().ip(), 
579            *tunnel::Tunnel::remote(self), 
580            owner.remote().clone(), 
581            owner.remote_const().clone(), 
582            key_stub.key, 
583            owner.config().tcp.connect_timeout).await
584        }?;
585        let syn_seq = owner.generate_sequence();
586        let syn_tunnel = SynTunnel {
587            protocol_version: owner.protocol_version(), 
588            stack_version: owner.stack_version(),  
589            to_device_id: owner.remote().clone(),
590            sequence: syn_seq.clone(),
591            from_device_desc: stack.sn_client().ping().default_local(), 
592            send_time: bucky_time_now()
593        };
594        let resp_box = interface.confirm_connect(&stack, vec![DynamicPackage::from(syn_tunnel)], owner.config().tcp.confirm_timeout).await?;
595        
596        if resp_box.packages().len() != 1 {
597            Err(BuckyError::new(BuckyErrorCode::InvalidData, "should response AckTunnel"))
598        } else if resp_box.packages()[0].cmd_code() != PackageCmdCode::AckTunnel {
599            Err(BuckyError::new(BuckyErrorCode::InvalidData, "should response AckTunnel"))
600        } else {
601            let ack_tunnel: &AckTunnel = resp_box.packages()[0].as_ref();
602            let _ = owner.on_package(ack_tunnel, None);
603            if ack_tunnel.result == ACK_TUNNEL_RESULT_OK {
604                let remote_timestamp = ack_tunnel.to_device_desc.body().as_ref().unwrap().update_time();
605                Ok((interface.into(), remote_timestamp, syn_seq))
606            } else if ack_tunnel.result == ACK_TUNNEL_RESULT_REFUSED {
607                Err(BuckyError::new(BuckyErrorCode::InvalidData, "refused"))
608            } else {
609                Err(BuckyError::new(BuckyErrorCode::InvalidData, "should response AckTunnel"))
610            }            
611        }
612    }
613
614    async fn reverse_connect_inner(&self, owner: TunnelContainer, reg: AbortRegistration) -> Result<(), BuckyError> {
615        let stack = owner.stack();
616        let remote = stack.device_cache().get_inner(owner.remote()).ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "device not cached"))?;
617        let sn_id = remote.connect_info().sn_list().get(0).ok_or_else(| | BuckyError::new(BuckyErrorCode::NotFound, "device no sn"))?;
618
619        let key_stub = stack.keystore().create_key(owner.remote_const(), true);
620        let mut syn_box = PackageBox::encrypt_box(owner.remote().clone(), key_stub.key.clone());
621        let syn_tunnel = SynTunnel {
622            protocol_version: owner.protocol_version(), 
623            stack_version: owner.stack_version(),  
624            to_device_id: owner.remote().clone(),
625            sequence: owner.generate_sequence(),
626            from_device_desc: stack.sn_client().ping().default_local(), 
627            send_time: bucky_time_now()
628        };
629        if let keystore::EncryptedKey::Unconfirmed(encrypted) = key_stub.encrypted {
630            let mut exchg = Exchange::from((&syn_tunnel, encrypted, key_stub.key.mix_key));
631            exchg.sign(stack.keystore().signer()).await?;
632            syn_box.push(exchg);
633        }
634        syn_box.push(syn_tunnel);
635
636        let listener = stack.net_manager().listener();
637        let mut endpoints = vec![];
638        for t in listener.tcp() {
639            let outer = t.outer();
640            if outer.is_some() {
641                let outer = outer.unwrap();
642                if outer.eq(tunnel::Tunnel::local(self)) 
643                    || t.local().eq(tunnel::Tunnel::local(self)) {
644                    endpoints.push(outer);
645                } 
646            } else {
647                endpoints.push(*tunnel::Tunnel::local(self));
648            }
649        }
650
651        let call_session = stack.sn_client().call().call(
652            Some(&endpoints), 
653            owner.remote(),
654            &vec![sn_id.clone()],
655            |sn_call| {
656                let mut context = udp::PackageBoxEncodeContext::from(sn_call);
657                let mut buf = vec![0u8; interface::udp::MTU_LARGE];
658                let enc_len = syn_box.raw_tail_encode_with_context(&mut buf, &mut context, &None).unwrap().len();
659                buf.truncate(enc_len);
660                buf
661            }).await?;
662                
663        let waiter = Abortable::new(call_session.next(), reg);
664        let _ = future::timeout(owner.config().connect_timeout, waiter).await?;
665        Ok(())
666    }
667
668    fn on_interface_error(&self, from: &PackageInterface, err: &BuckyError) {
669        error!("{} interface error {} from {}", self, err, from);
670
671        let notify = {
672            let state = &mut *self.0.state.lock().unwrap();
673            match state { 
674                TunnelState::Active(active) => {
675                    let owner = active.owner.clone();
676                    if active.interface.ptr_eq(from) {
677                        info!("{} Active({})=>Dead for interface error", self, active.remote_timestamp);
678                        let former_state = tunnel::TunnelState::Active(active.remote_timestamp);
679                        let mut dead_waiters = StateWaiter::new();
680                        std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
681                        *state = TunnelState::Dead;
682                        Some((owner, former_state, Some(dead_waiters)))
683                    } else {
684                        None
685                    }
686                }, 
687                _ => None
688            }
689        };
690
691        if let Some((owner, former_state, dead_waiters)) = notify {
692            if let Some(dead_waiters) = dead_waiters {
693                dead_waiters.wake();
694            }
695            owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, tunnel::TunnelState::Dead);
696        }
697        
698    }
699
700    fn start_send(
701        &self, 
702        owner: TunnelContainer, 
703        signal_reader: Receiver<SignalElem>, 
704        piece_reader: ringbuf::Consumer<u8>,  
705        ) {
706        let tunnel = self.clone();
707        task::spawn(async move {
708            let stub = {
709                match &*tunnel.0.state.lock().unwrap() {
710                    TunnelState::Active(active) => {
711                        Ok(active.interface.clone())
712                    }, 
713                    _ => {
714                        Err(BuckyError::new(BuckyErrorCode::ErrorState, "break send loop for invalid state"))
715                    }
716                }
717            };
718            
719            if stub.is_err() {
720                return ;
721            }
722            let interface = stub.unwrap();
723            let mut piece_reader = piece_reader;
724
725            info!("{} send loop start, {}", tunnel, owner.config().tcp.piece_interval.as_millis());
726            loop {
727                let mut send_buf = [0u8; udp::MTU_LARGE];
728                let mut piece_buf = [0u8; udp::MTU];
729              
730                fn handle_command(
731                    piece_reader: &mut ringbuf::Consumer<u8>, 
732                    command: CommandElem 
733                ) -> BuckyResult<()> {
734                    match command {
735                        CommandElem::Discard(len) => piece_reader.discard(len),
736                    };
737                    Ok(())
738                }
739
740                async fn handle_package(
741                    interface: &PackageInterface, 
742                    send_buf: &mut [u8], 
743                    pkg: PackageElem) -> BuckyResult<()> {
744                    match pkg {
745                        PackageElem::Package(package) => interface.send_package(send_buf, package, false).await, 
746                        PackageElem::RawData(data) => interface.send_raw_data(data).await
747                    }
748                }
749
750                async fn handle_signal(
751                    interface: &PackageInterface, 
752                    piece_reader: &mut ringbuf::Consumer<u8>, 
753                    send_buf: &mut [u8], 
754                    signal: SignalElem
755                ) -> BuckyResult<()> {
756                    match signal {
757                        SignalElem::Package(package) => handle_package(interface, send_buf, package).await, 
758                        SignalElem::Command(command) => handle_command(piece_reader, command)
759                    }
760                }
761
762
763                async fn handle_piece(
764                    interface: &PackageInterface, 
765                    send_buf: &mut [u8], 
766                    piece_reader: &mut ringbuf::Consumer<u8>, 
767                    signal_reader: &Receiver<SignalElem>) -> BuckyResult<()> {
768                    loop {
769                        let len = piece_reader.pop_slice(send_buf);
770                        if len > 0 {
771                            assert_eq!(len, send_buf.len());
772                            let (box_len, _) = u16::raw_decode(send_buf).unwrap();
773                            match interface.send_raw_buffer(&mut send_buf[..u16::raw_bytes().unwrap() + box_len as usize]).await {
774                                Ok(_) => {
775                                    //continue
776                                }, 
777                                Err(err) => {
778                                    break Err(err);
779                                }
780                            }
781                        } else {
782                            break Ok(());
783                        }
784                        if signal_reader.len() > 0 {
785                            // 优先处理package
786                            break Ok(());
787                        }
788                    }
789                }
790
791                match future::timeout(owner.config().tcp.piece_interval, signal_reader.recv()).await {
792                    Ok(recv) => {
793                        match recv {
794                            Ok(signal) => {
795                                match handle_signal(&interface, &mut piece_reader, &mut send_buf, signal).await {
796                                    Ok(_) => {
797                                        if signal_reader.len() == 0 {
798                                            match handle_piece(&interface, &mut piece_buf, &mut piece_reader, &signal_reader).await {
799                                                Ok(_) => {
800                                                    // continue
801                                                }, 
802                                                Err(err) => {
803                                                    tunnel.on_interface_error(&interface, &err);
804                                                    info!("{} send loop break for err {}", tunnel, err);
805                                                    break;
806                                                }
807                                            }
808                                        }
809                                    }, 
810                                    Err(err) => {
811                                        tunnel.on_interface_error(&interface, &err);
812                                        info!("{} send loop break for err {}", tunnel, err);
813                                        break;
814                                    }
815                                }
816                            }, 
817                            Err(err) => {
818                                info!("{} send loop break for err {}", tunnel, err);
819                                break;
820                            }
821                        }
822                    }
823                    Err(_err) => {
824                        match handle_piece(&interface, &mut piece_buf, &mut piece_reader, &signal_reader).await {
825                            Ok(_) => {
826                                // continue
827                            }, 
828                            Err(err) => {
829                                tunnel.on_interface_error(&interface, &err);
830                                info!("{} send loop break for err {}", tunnel, err);
831                                break;
832                            }
833                        }
834                    }
835                }
836            }
837        });
838    }
839
840    async fn recv_inner(
841        tunnel: Self, 
842        owner: TunnelContainer, 
843        interface: tcp::PackageInterface) {
844        // recv loop
845        let mut recv_buf = [0u8; udp::MTU_LARGE];
846        loop {
847            // tunnel显式销毁时,需要shutdown tcp stream; 这里receive_package就会出错了
848            match interface.receive_package(&mut recv_buf).await {
849                Ok(recv_box) => {
850                    tunnel.0.last_active.store(bucky_time_now(), Ordering::SeqCst);
851                    match recv_box {
852                        RecvBox::Package(package_box) => {
853                            let stack = owner.stack();
854                            if package_box.has_exchange() {
855                                // let exchange: &Exchange = package_box.packages()[0].as_ref();
856                                stack.keystore().add_key(package_box.key(), package_box.remote());
857                            }
858                            if let Err(err) = package_box.packages().iter().try_for_each(|pkg| {
859                                if pkg.cmd_code() == PackageCmdCode::PingTunnel {
860                                    tunnel.on_package(AsRef::<PingTunnel>::as_ref(pkg), None).map(|_| ())
861                                } else if pkg.cmd_code() == PackageCmdCode::PingTunnelResp {
862                                    tunnel.on_package(AsRef::<PingTunnelResp>::as_ref(pkg), None).map(|_| ())
863                                } else {
864                                    downcast_session_handle!(pkg, |pkg| owner.on_package(pkg, None)).map(|_| ())
865                                }
866                            }) {
867                                warn!("{} package error {}", tunnel, err);
868                            }
869                        }, 
870                        RecvBox::RawData(raw_data) => {
871                            let _ = owner.on_raw_data(raw_data, DynamicTunnel::new(tunnel.clone()));
872                        }
873                    }
874                }, 
875                Err(err) => {
876                    tunnel.on_interface_error(&interface, &err);
877                    break;
878                }
879            }
880        }
881    }
882
883    fn start_recv(
884        &self, 
885        owner: TunnelContainer, 
886        interface: tcp::PackageInterface, 
887        dead_waiter: AbortRegistration) {
888        let (cancel, reg) = AbortHandle::new_pair();
889        task::spawn(Abortable::new(Self::recv_inner(self.clone(), owner, interface), reg)); 
890        let tunnel = self.clone();
891        task::spawn(async move {
892            let _ = StateWaiter::wait(dead_waiter, || ()).await;
893            error!("{} break recv loop for tunnel dead", tunnel);
894            cancel.abort();
895        });
896    }
897
898    async fn retain_connect(
899        &self, 
900        retain_connect_timestamp: Timestamp, 
901        ping_interval: Duration, 
902        ping_timeout: Duration) {
903        if self.0.retain_connect_timestamp.load(Ordering::SeqCst) != retain_connect_timestamp {
904            debug!("ignore retain connect for timestamp missmatch, tunnel:{}", self);
905            return ;
906        }
907        if self.0.keeper_count.load(Ordering::SeqCst) == 0 {
908            debug!("ignore retain connect for zero retain count, tunnel:{}", self);
909            return ;
910        }
911
912        if !self.is_reverse() {
913            info!("begin retain connect, tunnel:{}", self);
914            let _ = self.connect();
915        } 
916
917        let tunnel = self.clone();
918        task::spawn(async move {
919            loop {
920                if tunnel.0.keeper_count.load(Ordering::SeqCst) == 0 {
921                    info!("break ping loop for release keeper, tunnel:{}", tunnel);
922                    break;
923                }
924                match {
925                    let state = &*tunnel.0.state.lock().unwrap();
926                    match state {
927                        TunnelState::Active(active) => {
928                            Ok(Some(active.owner.clone()))
929                        }, 
930                        TunnelState::Dead => {
931                            Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel's dead"))
932                        }, 
933                        _ => {
934                            Ok(None)
935                        }
936                    }
937                } {
938                    Ok(owner) => {
939                        if owner.is_some() {
940                            let now = bucky_time_now();
941                            let miss_active_time = Duration::from_micros(now - tunnel.0.last_active.load(Ordering::SeqCst));
942                            if miss_active_time > ping_timeout {
943                                if let Some((owner, cur_state, dead_waiters)) = {
944                                    let state = &mut *tunnel.0.state.lock().unwrap();
945                                    if let TunnelState::Active(active) = state {
946                                        error!("dead for ping timeout, tunnel:{}", tunnel);
947                                        let cur_state = tunnel::TunnelState::Active(active.remote_timestamp);
948                                        let owner = active.owner.clone();
949                                        let mut dead_waiters = StateWaiter::new();
950                                        std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
951                                        *state = TunnelState::Dead;
952                                        Some((owner, cur_state, dead_waiters))
953                                    } else {
954                                        None
955                                    }
956                                } {
957                                    dead_waiters.wake();
958                                    owner.sync_tunnel_state(&tunnel::DynamicTunnel::new(tunnel.clone()), cur_state, tunnel::Tunnel::state(&tunnel));
959                                }
960                                break;
961                            }
962                            if miss_active_time > ping_interval {
963                                if tunnel.0.keeper_count.load(Ordering::SeqCst) > 0 {
964                                    info!("send ping, tunnel:{}", tunnel);
965                                    let ping = PingTunnel {
966                                        package_id: 0,
967                                        send_time: now,
968                                        recv_data: 0,
969                                    };
970                                    let _ = tunnel::Tunnel::send_package(&tunnel, DynamicPackage::from(ping));
971                                }
972                            }
973                        }
974                        let _ = future::timeout(ping_interval, future::pending::<()>()).await;
975                    }, 
976                    Err(e) => {
977                        error!("break ping loop, tunnel:{}, err:{}", tunnel, e);
978                        break;
979                    }
980                }
981            };
982        });
983    }
984}
985
986#[async_trait]
987impl tunnel::Tunnel for Tunnel {
988    fn mtu(&self) -> usize {
989        self.0.mtu
990    }
991
992    fn as_any(&self) -> &dyn std::any::Any {
993        self
994    }
995
996    fn local(&self) -> &Endpoint {
997        self.0.local_remote.local()
998    }
999
1000    fn remote(&self) -> &Endpoint {
1001        self.0.local_remote.remote()
1002    }
1003
1004    fn proxy(&self) -> ProxyType {
1005        ProxyType::None
1006    }
1007
1008    fn state(&self) -> tunnel::TunnelState {
1009        match &*self.0.state.lock().unwrap() {
1010            TunnelState::Connecting(_) => {
1011                tunnel::TunnelState::Connecting
1012            }, 
1013            TunnelState::PreActive(pre_active) => {
1014                tunnel::TunnelState::Active(pre_active.remote_timestamp)
1015            },
1016            TunnelState::Active(active) => {
1017                tunnel::TunnelState::Active(active.remote_timestamp)
1018            }, 
1019            TunnelState::Dead => {
1020                tunnel::TunnelState::Dead
1021            }
1022        }
1023    } 
1024
1025    fn send_package(&self, package: DynamicPackage) -> Result<usize, BuckyError> {
1026        if package.cmd_code() == PackageCmdCode::SessionData {
1027            return Err(BuckyError::new(BuckyErrorCode::UnSupport, "session data should not send from tcp tunnel"));
1028        }
1029        let (signal_writer, to_connect) = {
1030            match &*self.0.state.lock().unwrap() {
1031                TunnelState::PreActive(pre_active) => {
1032                    Ok((pre_active.signal_writer.clone(), true))
1033                }, 
1034                TunnelState::Active(active) => {
1035                    Ok((active.signal_writer.clone(), false))
1036                },
1037                _ => {
1038                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel not active"))
1039                }
1040            }
1041        }?;
1042        let _ = signal_writer.try_send(SignalElem::Package(PackageElem::Package(package)));
1043        if to_connect {
1044            let _ = self.connect();
1045        }
1046        
1047        Ok(0)
1048    }
1049
1050    fn raw_data_header_len(&self) -> usize {
1051        tcp::PackageInterface::raw_header_data_len()
1052    }
1053
1054    fn send_raw_data(&self, data: &mut [u8]) -> Result<usize, BuckyError> {
1055        let (signal_writer, to_connect) = {
1056            match &*self.0.state.lock().unwrap() {
1057                TunnelState::PreActive(pre_active) => {
1058                    Ok((pre_active.signal_writer.clone(), true))
1059                }, 
1060                TunnelState::Active(active) => {
1061                    Ok((active.signal_writer.clone(), false))
1062                },
1063                _ => {
1064                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "tunnel not active"))
1065                }
1066            }
1067        }?;
1068        let len = data.len();
1069        let _ = signal_writer.try_send(SignalElem::Package(PackageElem::RawData(Vec::from(data))));
1070        if to_connect {
1071            let _ = self.connect();
1072        }
1073        Ok(len)
1074    }
1075
1076    fn ptr_eq(&self, other: &tunnel::DynamicTunnel) -> bool {
1077        *self.local() == *other.as_ref().local() 
1078        && *self.remote() == *other.as_ref().remote()
1079        && Arc::ptr_eq(&self.0, &other.clone_as_tunnel::<Tunnel>().0)
1080    }
1081
1082    fn retain_keeper(&self) {
1083        info!("{} retain keeper", self);
1084        if 0 != self.0.keeper_count.fetch_add(1, Ordering::SeqCst) {
1085            return ;
1086        }
1087        let owner = {
1088            let state = &mut *self.0.state.lock().unwrap();
1089            match state {
1090                TunnelState::Connecting(_) => None, 
1091                TunnelState::PreActive(pre_active) => Some(pre_active.owner.clone()), 
1092                TunnelState::Active(_) => None, 
1093                TunnelState::Dead => None
1094            }
1095        };
1096        if owner.is_none() {
1097            return ;
1098        }
1099
1100        let owner = owner.unwrap();
1101        let retain_connect_timestamp = bucky_time_now();
1102        self.0.retain_connect_timestamp.store(retain_connect_timestamp, Ordering::SeqCst);
1103        let tunnel = self.clone();
1104        task::spawn(async move {
1105            let _ = future::timeout(owner.config().tcp.retain_connect_delay, future::pending::<()>()).await;
1106            tunnel.retain_connect(retain_connect_timestamp, owner.config().tcp.ping_interval, owner.config().tcp.ping_timeout).await;
1107        });
1108    }
1109
1110    fn release_keeper(&self) {
1111        info!("{} release keeper", self);
1112        self.0.keeper_count.fetch_add(-1, Ordering::SeqCst);
1113    }
1114
1115    fn reset(&self) {
1116        info!("{} reset to Dead", self);
1117        if let Some(dead_waiters) = {
1118            let state = &mut *self.0.state.lock().unwrap();
1119            match state {
1120                TunnelState::Active(active) => {
1121                    let mut dead_waiters = StateWaiter::new();
1122                    std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
1123                    *state = TunnelState::Dead;
1124                    Some(dead_waiters)
1125                }, 
1126                _ => None
1127            }
1128        } {
1129            dead_waiters.wake();
1130        }
1131    }
1132
1133    fn mark_dead(&self, former_state: tunnel::TunnelState) {
1134        let notify = match &former_state {
1135            tunnel::TunnelState::Connecting => {
1136                let state = &mut *self.0.state.lock().unwrap();
1137                match state {
1138                    TunnelState::Connecting(connecting) => {
1139                        info!("{} Connecting=>Dead", self);
1140                        let owner = connecting.owner.clone();
1141                        *state = TunnelState::Dead;
1142                        Some((owner, tunnel::TunnelState::Dead, None))
1143                    }, 
1144                    _ => {
1145                        None
1146                    }
1147                }
1148            }, 
1149            tunnel::TunnelState::Active(remote_timestamp) => {
1150                let remote_timestamp = *remote_timestamp;
1151                let state = &mut *self.0.state.lock().unwrap();
1152                match state {
1153                    TunnelState::Active(active) => {
1154                        let owner = active.owner.clone();
1155                        if active.remote_timestamp == remote_timestamp {
1156                            info!("{} Active({})=>Dead for active by {}", self, active.remote_timestamp, remote_timestamp);
1157                            let mut dead_waiters = StateWaiter::new();
1158                            std::mem::swap(&mut dead_waiters, &mut active.dead_waiters);
1159                            *state = TunnelState::Dead;
1160                            Some((owner, tunnel::TunnelState::Dead, Some(dead_waiters)))
1161                        } else {
1162                            None
1163                        }
1164                    }, 
1165                    _ => {
1166                        None
1167                    }
1168                }
1169            }, 
1170            tunnel::TunnelState::Dead => None
1171        };
1172
1173        if let Some((owner, new_state, dead_waiters)) = notify {
1174            if let Some(dead_waiters) = dead_waiters {
1175                dead_waiters.wake();
1176            }
1177            owner.sync_tunnel_state(&DynamicTunnel::new(self.clone()), former_state, new_state);
1178        }
1179    }
1180}
1181
1182impl OnPackage<PingTunnel> for Tunnel {
1183    fn on_package(&self, ping: &PingTunnel, _context: Option<()>) -> Result<OnPackageResult, BuckyError> {
1184        let ping_resp = PingTunnelResp {
1185            ack_package_id: ping.package_id,
1186            send_time: bucky_time_now(),
1187            recv_data: 0,
1188        };
1189        let _ = tunnel::Tunnel::send_package(self, DynamicPackage::from(ping_resp));
1190        Ok(OnPackageResult::Handled)
1191    }
1192}
1193
1194impl OnPackage<PingTunnelResp> for Tunnel {
1195    fn on_package(&self, _pkg: &PingTunnelResp, _context: Option<()>) -> Result<OnPackageResult, BuckyError> {
1196        // do nothing
1197        Ok(OnPackageResult::Handled)
1198    }
1199}
1200
1201impl OnTcpInterface for Tunnel {
1202    fn on_tcp_interface(&self, interface: tcp::AcceptInterface, first_box: PackageBox) -> Result<OnPackageResult, BuckyError> {
1203        assert_eq!(self.is_reverse(), true);
1204        assert_eq!(first_box.packages_no_exchange().len(), 1);
1205        let first_package = &first_box.packages_no_exchange()[0];
1206        if first_package.cmd_code() == PackageCmdCode::SynTunnel {
1207            let syn_tunnel: &SynTunnel = first_package.as_ref();
1208            let remote_timestamp = syn_tunnel.from_device_desc.body().as_ref().unwrap().update_time();
1209            let (owner, ret) = {
1210                let state = &mut *self.0.state.lock().unwrap();
1211                match state {
1212                    TunnelState::Connecting(connecting) => {
1213                        info!("{} accept interface {} in connecting", self, interface);
1214                        (Some(connecting.owner.clone()), ACK_TUNNEL_RESULT_OK)
1215                    }, 
1216                    TunnelState::PreActive(pre_active) => {
1217                        info!("{} accept interface {} in PreActive", self, interface);
1218                        (Some(pre_active.owner.clone()), ACK_TUNNEL_RESULT_OK)
1219                    }, 
1220                    TunnelState::Active(active) => {
1221                        if active.remote_timestamp < remote_timestamp {
1222                            info!("{} accept interface {} for active remote timestamp update from {} to {}", self, interface, active.remote_timestamp, remote_timestamp);
1223                            (Some(active.owner.clone()), ACK_TUNNEL_RESULT_OK)    
1224                        } else if active.syn_seq < syn_tunnel.sequence {
1225                            info!("{} accept interface {} for active sequence update from {:?} to {:?}", self, interface, active.syn_seq, syn_tunnel.sequence);
1226                            (Some(active.owner.clone()), ACK_TUNNEL_RESULT_OK)
1227                        } else {
1228                            info!("{} refuse interface {} for already active", self, interface);
1229                            (Some(active.owner.clone()), ACK_TUNNEL_RESULT_REFUSED)
1230                        }
1231                    }, 
1232                    TunnelState::Dead => {
1233                        info!("{} refuse interface {} for dead", self, interface);
1234                        (None, ACK_TUNNEL_RESULT_REFUSED)
1235                    }
1236                }
1237            };
1238            if let Some(owner) = owner {
1239                owner.on_package(syn_tunnel, None)?;
1240                let ack_tunnel = AckTunnel {
1241                    protocol_version: owner.protocol_version(), 
1242                    stack_version: owner.stack_version(),  
1243                    sequence: syn_tunnel.sequence,
1244                    result: ret,
1245                    send_time: bucky_time_now(),
1246                    mtu: udp::MTU as u16,
1247                    to_device_desc: owner.stack().sn_client().ping().default_local()
1248                };
1249                let tunnel = self.clone();
1250                task::spawn(async move {
1251                    let syn_seq = ack_tunnel.sequence;
1252                    let confirm_ret = interface.confirm_accept(vec![DynamicPackage::from(ack_tunnel)]).await;
1253                    if ret == ACK_TUNNEL_RESULT_OK {
1254                        tunnel.active_with_interface(confirm_ret.map(|_| (interface.into(), remote_timestamp, syn_seq)));
1255                    } else {
1256                        // do nothing
1257                    }
1258                });
1259            }
1260            Ok(OnPackageResult::Handled)
1261        } else if first_package.cmd_code() == PackageCmdCode::TcpSynConnection {
1262            let syn_stream: &TcpSynConnection = first_package.as_ref();
1263            let owner = self.pre_active(syn_stream.from_device_desc.body().as_ref().unwrap().update_time())?;
1264            owner.on_package(syn_stream, interface)
1265        } else if first_package.cmd_code() == PackageCmdCode::TcpAckConnection {
1266            let ack_stream: &TcpAckConnection = first_package.as_ref();
1267            let owner = self.pre_active(ack_stream.to_device_desc.body().as_ref().unwrap().update_time())?;
1268            owner.on_package(ack_stream, interface)
1269        } else {
1270            unreachable!()
1271        }
1272    }
1273}
1274
1275