cyfs_bdt/tunnel/builder/connect_stream/
tcp.rs

1use log::*;
2use std::{
3    sync::RwLock,
4};
5use async_std::{sync::Arc, task};
6use async_trait::{async_trait};
7use cyfs_base::*;
8use crate::{
9    types::*, 
10    protocol::{*, v0::*}, 
11    interface::*, 
12    stream::{StreamContainer, StreamProviderSelector}, 
13    tunnel::{self, Tunnel, ProxyType}, 
14    stack::{Stack, WeakStack}
15};
16use super::super::{action::*};
17use super::{action::*};
18
19enum ConnectTcpStreamState {
20    Connecting1(StateWaiter), 
21    PreEstablish(tcp::Interface), 
22    Connecting2, 
23    Establish, 
24    Closed
25}
26
27struct ConnectTcpStreamImpl {
28    stack: WeakStack, 
29    tunnel: tunnel::tcp::Tunnel, 
30    stream: StreamContainer, 
31    state: RwLock<ConnectTcpStreamState>,
32}
33
34impl std::fmt::Display for ConnectTcpStream {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "ConnectTcpStream{{stream:{},local:{},remote:{}}}", self.0.stream, self.local(), self.remote())
37    }
38}
39
40#[derive(Clone)]
41pub struct ConnectTcpStream(Arc<ConnectTcpStreamImpl>);
42
43impl ConnectTcpStream {
44    pub fn new(
45        stack: WeakStack, 
46        stream: StreamContainer, 
47        tunnel: tunnel::tcp::Tunnel) -> Self {
48        let a = Self(Arc::new(ConnectTcpStreamImpl {
49            stack, 
50            stream: stream.clone(),
51            tunnel,   
52            state: RwLock::new(ConnectTcpStreamState::Connecting1(StateWaiter::new()))
53        }));
54
55        {
56            // 同步 stream 的establish状态
57            // 当stream 的wait establish 返回时,action要么已经进入establish状态了,要么中止所有动作进入closed状态
58            // 如果已经进入PreEstablish 状态了, 但是选择了其他的action 进入continue connect;已经联通的tcp interface交给对应的TcpTunnel来 active
59            let ca = a.clone();
60            let stream = stream.clone();
61            task::spawn(async move {
62                let (opt_waiter, tunnel_interface) = match stream.wait_establish().await {
63                    Ok(_) => {
64                        let state = &mut *ca.0.state.write().unwrap();
65                        match state {
66                            ConnectTcpStreamState::Connecting1(ref mut waiter) => {
67                                let waiter = Some(waiter.transfer());
68                                *state = ConnectTcpStreamState::Closed;
69                                (waiter, None)
70                            }, 
71                            ConnectTcpStreamState::PreEstablish(interface) => {
72                                let interface = interface.clone();
73                                *state = ConnectTcpStreamState::Closed;
74                                (None, Some(interface))
75                            }, 
76                            ConnectTcpStreamState::Establish => {
77                                // do nothing
78                                (None, None)
79                            }, 
80                            _ => {
81                                *state = ConnectTcpStreamState::Closed;
82                                (None, None)
83                            }
84                        }
85                    },
86                    Err(_) => {
87                        let state = &mut *ca.0.state.write().unwrap();
88                        let ret = match state {
89                            ConnectTcpStreamState::Connecting1(ref mut waiter) => {
90                                let waiter = Some(waiter.transfer());
91                                (waiter, None)
92                            }, 
93                            ConnectTcpStreamState::PreEstablish(interface) => {
94                                (None, Some(interface.clone()))
95                            }, 
96                            _ => {
97                                (None, None)
98                            }
99                        };
100                        *state = ConnectTcpStreamState::Closed;
101                        ret
102                    }
103                };
104                
105                if let Some(waiter) = opt_waiter {
106                    waiter.wake()
107                }
108
109                if let Some(interface) = tunnel_interface {
110                    let _ = ca.0.tunnel.connect_with_interface(interface);
111                }
112            });
113        }
114
115        {
116            // 发起tcp 连接,tcp 连上时,进入pre establish
117            // tcp连不上, 直接进入 closed 状态
118            let ca = a.clone();
119            task::spawn(async move {
120                if let Some(tunnel) = ca.0.stream.tunnel() {
121                    let keystore = tunnel.stack().keystore().clone();
122                    let key = keystore.create_key(tunnel.remote_const(), false);
123                    let connect_result = tcp::Interface::connect(/*ca.local().addr().ip(),*/
124                                                                        *ca.remote(),
125                                                                        tunnel.remote().clone(),
126                                                                        tunnel.remote_const().clone(),
127                                                                        key.key, 
128                                                                        Stack::from(&ca.0.stack).config().tunnel.tcp.connect_timeout
129                    ).await;
130
131                    
132                    let opt_waiter = match connect_result {
133                        Ok(interface) => {
134                            let state = &mut *ca.0.state.write().unwrap();
135                            match state {
136                                ConnectTcpStreamState::Connecting1(waiter) => {
137                                    debug!("{} Connecting1=>PreEstablish", ca);
138                                    let waiter = Some(waiter.transfer());
139                                    *state = ConnectTcpStreamState::PreEstablish(interface);
140                                    waiter
141                                }, 
142                                _ => {
143                                    None
144                                }
145                            }
146                        }, 
147                        Err(_) => {
148                            ca.0.tunnel.mark_dead(ca.0.tunnel.state());
149                            let state = &mut *ca.0.state.write().unwrap();
150                            match state {
151                                ConnectTcpStreamState::Connecting1(waiter) => {
152                                    debug!("{} Connecting1=>Closed", ca);
153                                    let waiter = Some(waiter.transfer());
154                                    *state = ConnectTcpStreamState::Closed;
155                                    waiter
156                                }, 
157                                _ => {
158                                    *state = ConnectTcpStreamState::Closed;
159                                    None
160                                }
161                            }
162                        }
163                    };
164
165                    if let Some(waiter) = opt_waiter {
166                        waiter.wake()
167                    }
168                }
169            });
170        }
171        
172        a
173    }
174}
175
176impl BuildTunnelAction for ConnectTcpStream {
177    fn local(&self) -> &Endpoint {
178        &self.0.tunnel.local()
179    }
180
181    fn remote(&self) -> &Endpoint {
182        &self.0.tunnel.remote()
183    }
184}
185
186#[async_trait]
187impl ConnectStreamAction for ConnectTcpStream {
188    fn clone_as_connect_stream_action(&self) -> DynConnectStreamAction {
189        Box::new(self.clone())
190    }
191
192    fn as_any(&self) -> &dyn std::any::Any {
193        self
194    }
195
196    fn state(&self) -> ConnectStreamState {
197        match &*self.0.state.read().unwrap() {
198            ConnectTcpStreamState::Connecting1(_) => ConnectStreamState::Connecting1, 
199            ConnectTcpStreamState::PreEstablish(_) => ConnectStreamState::PreEstablish,
200            ConnectTcpStreamState::Connecting2 => ConnectStreamState::Connecting2,
201            ConnectTcpStreamState::Establish => ConnectStreamState::Establish,
202            ConnectTcpStreamState::Closed => ConnectStreamState::Closed,
203        }
204    }
205    async fn wait_pre_establish(&self) -> ConnectStreamState {
206        let (state, opt_waiter) = match &mut *self.0.state.write().unwrap() {
207            ConnectTcpStreamState::Connecting1(ref mut waiter) => {
208                (ConnectStreamState::Connecting1, Some(waiter.new_waiter()))
209            }, 
210            ConnectTcpStreamState::PreEstablish(_) => (ConnectStreamState::PreEstablish, None),
211            ConnectTcpStreamState::Connecting2 => (ConnectStreamState::Connecting2, None),
212            ConnectTcpStreamState::Establish => (ConnectStreamState::Establish, None),
213            ConnectTcpStreamState::Closed => (ConnectStreamState::Closed, None),
214        };
215        if let Some(waiter) = opt_waiter {
216            StateWaiter::wait(waiter, | | self.state()).await
217        } else {
218            state
219        }
220    }
221
222    async fn continue_connect(&self) -> BuckyResult<StreamProviderSelector> {
223        // 向已经联通的tcp interface发 tcp syn connection,收到对端返回的tcp ack connection时establish
224        let interface = {
225            let state = &mut *self.0.state.write().unwrap();
226            match state {
227                ConnectTcpStreamState::PreEstablish(interface) => {
228                    debug!("{} PreEstablish=>Connecting2", self);
229                    let interface = interface.clone();
230                    *state = ConnectTcpStreamState::Connecting2;
231                    Ok(interface)
232                },
233                _ => {
234                    let err = BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on tcp stream not pre establish");
235                    debug!("{} continue_connect failed for err", self);
236                    Err(err)
237                }
238            }
239        }?;
240        let syn_stream = self.0.stream.syn_tcp_stream().ok_or_else(|| BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on stream not connecting"))?;
241        let ack = match interface.confirm_connect(&Stack::from(&self.0.stack), vec![DynamicPackage::from(syn_stream)], Stack::from(&self.0.stack).config().tunnel.tcp.confirm_timeout).await {
242            Ok(resp_box) => {
243                let packages = resp_box.packages_no_exchange();
244                if packages.len() == 1 && packages[0].cmd_code() == PackageCmdCode::TcpAckConnection {
245                    let ack: &TcpAckConnection = packages[0].as_ref();
246                    //FIXME: 处理TcpAckConnection中的字段
247                    // TcpStream 可以联通的时候,让对应的TcpTunnel进入pre active 状态
248                    let _ = self.0.tunnel.pre_active(ack.to_device_desc.body().as_ref().unwrap().update_time());
249                    let state = &mut *self.0.state.write().unwrap();
250                    match state {
251                        ConnectTcpStreamState::Connecting2 => {
252                            *state = ConnectTcpStreamState::Establish;
253                            Ok(ack.clone())
254                        }, 
255                        _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tcp stram got ack but action not in connecting2 state"))
256                    }
257                } else {
258                    let state = &mut *self.0.state.write().unwrap();
259                    *state = ConnectTcpStreamState::Closed;
260                    Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tcp stream got error confirm when expecting TcpAckConnection"))
261                }
262            }, 
263            Err(e) => {
264                let state = &mut *self.0.state.write().unwrap();
265                *state = ConnectTcpStreamState::Closed;
266                Err(e)
267            }
268        }.map_err(|e| {
269            // self.0.tunnel.mark_dead(self.0.tunnel.state());
270            e
271        })?;
272
273        Ok(StreamProviderSelector::Tcp(
274                interface.socket().clone(), 
275                interface.key().clone(), 
276                Some(ack.clone())))
277    }
278}
279
280
281enum AcceptReverseTcpStreamState {
282    Connecting1(StateWaiter), 
283    PreEstablish(tcp::AcceptInterface), 
284    Connecting2, 
285    Establish, 
286    Closed
287}
288
289struct AcceptReverseTcpStreamImpl {
290    local: Endpoint, 
291    remote: Endpoint,
292    stream: StreamContainer, 
293    state: RwLock<AcceptReverseTcpStreamState>
294}
295
296impl std::fmt::Display for AcceptReverseTcpStream {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        write!(f, "AcceptReverseTcpStream{{stream:{},local:{},remote:{}}}", self.0.stream, self.0.local, self.0.remote)
299    }
300}
301
302#[derive(Clone)]
303pub struct AcceptReverseTcpStream(Arc<AcceptReverseTcpStreamImpl>);
304
305impl AcceptReverseTcpStream {
306    pub fn new(stream: StreamContainer, local: Endpoint, remote: Endpoint) -> Self {
307        let a = Self(Arc::new(AcceptReverseTcpStreamImpl {
308            local, 
309            remote, 
310            stream: stream.clone(),  
311            state: RwLock::new(AcceptReverseTcpStreamState::Connecting1(StateWaiter::new()))
312        }));
313
314        {
315            // 同步 stream 的establish状态
316            // 当stream 的wait establish 返回时,action要么已经进入establish状态了,要么中止所有动作进入closed状态
317            let ca = a.clone();
318            let stream = stream.clone();
319            task::spawn(async move {
320                let (waiter, interface) = match stream.wait_establish().await {
321                    Ok(_) => {
322                        let state = &mut *ca.0.state.write().unwrap();
323                        match state {
324                            AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
325                                let waiter = Some(waiter.transfer());
326                                *state = AcceptReverseTcpStreamState::Closed;
327                                (waiter, None)
328                            }, 
329                            AcceptReverseTcpStreamState::PreEstablish(interface) => {
330                                (None, Some(interface.clone()))
331                            }, 
332                            AcceptReverseTcpStreamState::Establish => {
333                                // do nothing
334                                (None, None)
335                            }, 
336                            _ => {
337                                *state = AcceptReverseTcpStreamState::Closed;
338                                (None, None)
339                            }
340                        }
341                    },
342                    Err(_) => {
343                        let state = &mut *ca.0.state.write().unwrap();
344                        match state {
345                            AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
346                                let waiter = Some(waiter.transfer());
347                                *state = AcceptReverseTcpStreamState::Closed;
348                                (waiter, None)
349                            }, 
350                            AcceptReverseTcpStreamState::PreEstablish(interface) => {
351                                (None, Some(interface.clone()))
352                            }, 
353                            _ => {
354                                *state = AcceptReverseTcpStreamState::Closed;
355                                (None, None)
356                            }
357                        }
358                    }
359                };
360                
361                if let Some(waiter) = waiter {
362                    waiter.wake()
363                }
364
365                if let Some(interface) = interface {
366                    let ack_ack_stream = stream.ack_ack_tcp_stream(TCP_ACK_CONNECTION_RESULT_REFUSED);
367                    let _ = match interface.confirm_accept(vec![DynamicPackage::from(ack_ack_stream)]).await {
368                        Ok(_) => {
369                            debug!("{} confirm {} with refuse tcp connection ", stream, interface);
370                        }, 
371                        Err(e) => {
372                            warn!("{} confirm {} with tcp ack ack connection failed for {}", stream, interface, e);
373                            if let Some(tunnel) = stream.tunnel() {
374                                let tunnel = tunnel.create_tunnel::<tunnel::tcp::Tunnel>(EndpointPair::from((*interface.local(), Endpoint::default_tcp(interface.local()))), ProxyType::None);
375                                if let Ok((tunnel, _)) = tunnel {
376                                    tunnel.mark_dead(tunnel.state());
377                                }
378                            }
379                        }
380                    };
381                }
382            });
383        }
384
385
386        a
387    }
388}
389
390
391impl BuildTunnelAction for AcceptReverseTcpStream {
392    fn local(&self) -> &Endpoint {
393        &self.0.local
394    }
395
396    fn remote(&self) -> &Endpoint {
397        &self.0.remote
398    }
399}
400
401#[async_trait]
402impl ConnectStreamAction for AcceptReverseTcpStream {
403    fn clone_as_connect_stream_action(&self) -> DynConnectStreamAction {
404        Box::new(self.clone())
405    }
406
407    fn as_any(&self) -> &dyn std::any::Any {
408        self
409    }
410
411    fn state(&self) -> ConnectStreamState {
412        match &*self.0.state.read().unwrap() {
413            AcceptReverseTcpStreamState::Connecting1(_) => ConnectStreamState::Connecting1, 
414            AcceptReverseTcpStreamState::PreEstablish(_) => ConnectStreamState::PreEstablish,
415            AcceptReverseTcpStreamState::Connecting2 => ConnectStreamState::Connecting2,
416            AcceptReverseTcpStreamState::Establish => ConnectStreamState::Establish,
417            AcceptReverseTcpStreamState::Closed => ConnectStreamState::Closed,
418        }
419    }
420
421    async fn wait_pre_establish(&self) -> ConnectStreamState {
422        let (state, opt_waiter) = match &mut *self.0.state.write().unwrap() {
423            AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
424                (ConnectStreamState::Connecting1, Some(waiter.new_waiter()))
425            }, 
426            AcceptReverseTcpStreamState::PreEstablish(_) => (ConnectStreamState::PreEstablish, None),
427            AcceptReverseTcpStreamState::Connecting2 => (ConnectStreamState::Connecting2, None),
428            AcceptReverseTcpStreamState::Establish => (ConnectStreamState::Establish, None),
429            AcceptReverseTcpStreamState::Closed => (ConnectStreamState::Closed, None),
430        };
431        if let Some(waiter) = opt_waiter {
432            StateWaiter::wait(waiter, | | self.state()).await
433        } else {
434            state
435        }
436    }
437
438    async fn continue_connect(&self) -> BuckyResult<StreamProviderSelector> {
439        // 向已经联通的tcp interface发 tcp syn connection,收到对端返回的tcp ack connection时establish
440        let interface = {
441            let state = &mut *self.0.state.write().unwrap();
442            match state {
443                AcceptReverseTcpStreamState::PreEstablish(interface) => {
444                    let interface = interface.clone();
445                    *state = AcceptReverseTcpStreamState::Connecting2;
446                    Ok(interface)
447                },
448                _ => {
449                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on tcp stream not pre establish"))
450                }
451            }
452        }?;
453        let ack_ack_stream = self.0.stream.ack_ack_tcp_stream(TCP_ACK_CONNECTION_RESULT_OK);
454        let _ = match interface.confirm_accept(vec![DynamicPackage::from(ack_ack_stream)]).await {
455            Ok(_) => {
456                let state = &mut *self.0.state.write().unwrap();
457                match state {
458                    AcceptReverseTcpStreamState::Connecting2 => {
459                        *state = AcceptReverseTcpStreamState::Establish;
460                        Ok(())
461                    }, 
462                    _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tcp stram got ack but action not in connecting2 state"))
463                }
464            }, 
465            Err(e) => {
466                let state = &mut *self.0.state.write().unwrap();
467                *state = AcceptReverseTcpStreamState::Closed;
468                Err(e)
469            }
470        }?;
471
472        Ok(StreamProviderSelector::Tcp(
473                interface.socket().clone(), 
474                interface.key().clone(), 
475                None))
476    }
477}
478
479impl OnPackage<TcpAckConnection, tcp::AcceptInterface> for AcceptReverseTcpStream {
480    fn on_package(&self, _pkg: &TcpAckConnection, interface: tcp::AcceptInterface) -> Result<OnPackageResult, BuckyError> {
481        // 在 connecting1 状态下accept 到 带着 TcpAckConnection 的 tcp stream, 进入pre establish
482        let waiter = {
483            let state = &mut *self.0.state.write().unwrap();
484            match state {
485                AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
486                    debug!("{} Connecting1=>PreEstablish", self);
487                    let waiter = waiter.transfer();
488                    *state = AcceptReverseTcpStreamState::PreEstablish(interface);
489                    Ok(waiter)
490                }, 
491                _ => {
492                    debug!("{} ingnore tcp ack connection for not in connecting1", self);
493                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "not in connecting1"))
494                }
495            }
496        }?;
497        waiter.wake();
498        Ok(OnPackageResult::Handled)
499    }
500}
501
502impl From<DynConnectStreamAction> for AcceptReverseTcpStream {
503    fn from(action: DynConnectStreamAction) -> Self {
504        action.as_any().downcast_ref::<Self>().unwrap().clone()
505    }
506}