cyfs_bdt/tunnel/builder/connect_stream/
package.rs

1use std::{
2    //time::Duration,
3    sync::RwLock, 
4};
5use async_std::{sync::{Arc}, future, task};
6use async_trait::{async_trait};
7use cyfs_base::*;
8use crate::{
9    types::*, 
10    protocol::{*, v0::*},  
11    stream::{StreamContainer, StreamProviderSelector}
12};
13use super::super::{action::*};
14use super::{action::*};
15use log::*;
16
17enum ConnectPackageStreamState {
18    Connecting1(StateWaiter), 
19    PreEstablish(SessionData), 
20    Connecting2, 
21    Establish, 
22    Closed
23}
24
25impl std::fmt::Display for ConnectPackageStreamState {
26    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
27        match self {
28            ConnectPackageStreamState::Connecting1(_) => write!(f, "Connecting1"),
29            ConnectPackageStreamState::PreEstablish(_) => write!(f, "PreEstablish"),
30            ConnectPackageStreamState::Connecting2 => write!(f, "Connecting2"),
31            ConnectPackageStreamState::Establish => write!(f, "Establish"),
32            ConnectPackageStreamState::Closed => write!(f, "Closed"),
33        }
34    }
35}
36
37struct ConnectPackageStreamImpl {
38    local: Endpoint, 
39    remote: Endpoint,
40    stream: StreamContainer, 
41    state: RwLock<ConnectPackageStreamState>,
42}   
43
44#[derive(Clone)]
45pub struct ConnectPackageStream(Arc<ConnectPackageStreamImpl>);
46
47impl std::fmt::Display for ConnectPackageStream {
48    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
49        write!(f, "ConnectPackageStream {{stream:{}}}", self.0.stream)
50    }
51}
52
53impl ConnectPackageStream {
54    pub fn endpoint_pair() -> EndpointPair {
55        EndpointPair::from((Endpoint::default(), Endpoint::default()))
56    }
57
58    pub fn new(stream: StreamContainer) -> Self {
59        let a = Self(Arc::new(ConnectPackageStreamImpl {
60            local: Endpoint::default(), 
61            remote: Endpoint::default(),
62            stream: stream.clone(),  
63            state: RwLock::new(ConnectPackageStreamState::Connecting1(StateWaiter::new()))
64        }));
65
66        {
67            // 同步 stream 的establish状态
68            // 当stream 的wait establish 返回时,action要么已经进入establish状态了,要么中止所有动作进入closed状态
69            let ca = a.clone();
70            let stream = stream.clone();
71            task::spawn(async move {
72                let opt_waiter = match stream.wait_establish().await {
73                    Ok(_) => {
74                        let state = &mut *ca.0.state.write().unwrap();
75                        match state {
76                            ConnectPackageStreamState::Connecting1(ref mut waiter) => {
77                                let waiter = Some(waiter.transfer());
78                                *state = ConnectPackageStreamState::Closed;
79                                waiter
80                            }, 
81                            ConnectPackageStreamState::Establish => {
82                                // do nothing
83                                None
84                            }, 
85                            _ => {
86                                *state = ConnectPackageStreamState::Closed;
87                                None
88                            }
89                        }
90                    },
91                    Err(_) => {
92                        let state = &mut *ca.0.state.write().unwrap();
93                        match state {
94                            ConnectPackageStreamState::Connecting1(ref mut waiter) => {
95                                let waiter = Some(waiter.transfer());
96                                *state = ConnectPackageStreamState::Closed;
97                                waiter
98                            }, 
99                            _ => {
100                                *state = ConnectPackageStreamState::Closed;
101                                None
102                            }
103                        }
104                    }
105                };
106                
107                if let Some(waiter) = opt_waiter {
108                    waiter.wake()
109                }
110            });
111        }
112        a
113    }
114
115    pub fn begin(&self) {
116        // 只要还处于 connecting1 状态, 不断重发syn session data
117        let ca = self.clone();
118        let syn_session_data = ca.0.stream.syn_session_data();
119        if let Some(syn_session_data) = syn_session_data {
120            let resend_interval = ca.0.stream.stack().config().stream.stream.package.connect_resend_interval;
121            task::spawn(async move {
122                //在进入pre establish之前,重发 syn session data
123                loop {
124                    match ca.state() {
125                        ConnectStreamState::Connecting1 => {
126                            trace!("{} send sync session data", ca);
127                            if let Some(tunnel) = ca.0.stream.tunnel() {
128                                let _ = tunnel.send_packages(vec![DynamicPackage::from(syn_session_data.clone_with_data())]);
129                            } else {
130                                break;
131                            }
132                        }, 
133                        _ => break
134                    };
135                    future::timeout(resend_interval, future::pending::<()>()).await.err();
136                }
137            });
138        } else {
139            debug!("{} ingore sync sync session data proc for stream not in connecting state", self);
140        }
141    }
142}
143
144#[async_trait]
145impl BuildTunnelAction for ConnectPackageStream {
146    fn local(&self) -> &Endpoint {
147        &self.0.local
148    }
149
150    fn remote(&self) -> &Endpoint {
151        &self.0.remote
152    }
153}
154
155#[async_trait]
156impl ConnectStreamAction for ConnectPackageStream {
157    fn clone_as_connect_stream_action(&self) -> DynConnectStreamAction {
158        Box::new(self.clone())
159    }
160
161    fn as_any(&self) -> &dyn std::any::Any {
162        self
163    }
164
165    fn state(&self) -> ConnectStreamState {
166        match &*self.0.state.read().unwrap() {
167            ConnectPackageStreamState::Connecting1(_) => ConnectStreamState::Connecting1, 
168            ConnectPackageStreamState::PreEstablish(_) => ConnectStreamState::PreEstablish,
169            ConnectPackageStreamState::Connecting2 => ConnectStreamState::Connecting2,
170            ConnectPackageStreamState::Establish => ConnectStreamState::Establish,
171            ConnectPackageStreamState::Closed => ConnectStreamState::Closed,
172        }
173    }
174    async fn wait_pre_establish(&self) -> ConnectStreamState {
175        let (state, opt_waiter) = match &mut *self.0.state.write().unwrap() {
176            ConnectPackageStreamState::Connecting1(ref mut waiter) => {
177                (ConnectStreamState::Connecting1, Some(waiter.new_waiter()))
178            }, 
179            ConnectPackageStreamState::PreEstablish(_) => (ConnectStreamState::PreEstablish, None),
180            ConnectPackageStreamState::Connecting2 => (ConnectStreamState::Connecting2, None),
181            ConnectPackageStreamState::Establish => (ConnectStreamState::Establish, None),
182            ConnectPackageStreamState::Closed => (ConnectStreamState::Closed, None),
183        };
184        if let Some(waiter) = opt_waiter {
185            StateWaiter::wait(waiter, | | self.state()).await
186        } else {
187            state
188        }
189    }
190
191    async fn continue_connect(&self) -> BuckyResult<StreamProviderSelector> {
192        // 让 package stream 联通, 发送不带 syn flag的session data包的逻辑应该在 package stream provider中完成 
193        let sesstion_data = {
194            let state = &mut *self.0.state.write().unwrap();
195            match state {
196                ConnectPackageStreamState::PreEstablish(syn_ack) => {
197                    info!("{} PreEstablish=>Establish", self);
198                    let sesstion_data = syn_ack.clone_with_data();
199                    *state = ConnectPackageStreamState::Establish;
200                    Ok(sesstion_data)
201                }, 
202                _ => {
203                    error!("{} continue connect failed for in state {}", self, state);
204                    Err(BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on package stream not pre establish"))
205                }
206            }
207        }?;
208
209        let remote_id = sesstion_data.syn_info.clone().unwrap().from_session_id;
210
211        Ok(StreamProviderSelector::Package(remote_id, Some(sesstion_data)))
212    }
213}
214
215impl OnPackage<SessionData> for ConnectPackageStream {
216    fn on_package(&self, pkg: &SessionData, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
217        if pkg.is_syn() {
218            unreachable!()
219        } else if pkg.is_syn_ack() {
220            // 在 connecting1 状态下收到 syn ack session data, 进入pre establish
221            let opt_waiter = {
222                let state = &mut *self.0.state.write().unwrap();
223                match state {
224                    ConnectPackageStreamState::Connecting1(ref mut waiter) => {
225                        info!("{} Connecting1=>PreEstablish", self);
226                        let waiter = Some(waiter.transfer());
227                        *state = ConnectPackageStreamState::PreEstablish(pkg.clone_with_data());
228                        waiter
229                    }, 
230                    _ => {None}
231                }
232            };
233            if let Some(waiter) = opt_waiter {
234                waiter.wake();
235            }
236            Ok(OnPackageResult::Handled)
237        } else {
238            unreachable!()
239        }
240    }
241}
242
243impl From<DynConnectStreamAction> for ConnectPackageStream {
244    fn from(action: DynConnectStreamAction) -> Self {
245        action.as_any().downcast_ref::<Self>().unwrap().clone()
246    }
247}