cyfs_bdt/tunnel/builder/connect_stream/
package.rs1use std::{
2 sync::RwLock,
4};
5use async_std::{sync::{Arc}, future, task};
6use async_trait::{async_trait};
7use cyfs_base::*;
8use crate::{
9 types::*,
10 protocol::{*, v0::*},
11 stream::{StreamContainer, StreamProviderSelector}
12};
13use super::super::{action::*};
14use super::{action::*};
15use log::*;
16
17enum ConnectPackageStreamState {
18 Connecting1(StateWaiter),
19 PreEstablish(SessionData),
20 Connecting2,
21 Establish,
22 Closed
23}
24
25impl std::fmt::Display for ConnectPackageStreamState {
26 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
27 match self {
28 ConnectPackageStreamState::Connecting1(_) => write!(f, "Connecting1"),
29 ConnectPackageStreamState::PreEstablish(_) => write!(f, "PreEstablish"),
30 ConnectPackageStreamState::Connecting2 => write!(f, "Connecting2"),
31 ConnectPackageStreamState::Establish => write!(f, "Establish"),
32 ConnectPackageStreamState::Closed => write!(f, "Closed"),
33 }
34 }
35}
36
37struct ConnectPackageStreamImpl {
38 local: Endpoint,
39 remote: Endpoint,
40 stream: StreamContainer,
41 state: RwLock<ConnectPackageStreamState>,
42}
43
44#[derive(Clone)]
45pub struct ConnectPackageStream(Arc<ConnectPackageStreamImpl>);
46
47impl std::fmt::Display for ConnectPackageStream {
48 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
49 write!(f, "ConnectPackageStream {{stream:{}}}", self.0.stream)
50 }
51}
52
53impl ConnectPackageStream {
54 pub fn endpoint_pair() -> EndpointPair {
55 EndpointPair::from((Endpoint::default(), Endpoint::default()))
56 }
57
58 pub fn new(stream: StreamContainer) -> Self {
59 let a = Self(Arc::new(ConnectPackageStreamImpl {
60 local: Endpoint::default(),
61 remote: Endpoint::default(),
62 stream: stream.clone(),
63 state: RwLock::new(ConnectPackageStreamState::Connecting1(StateWaiter::new()))
64 }));
65
66 {
67 let ca = a.clone();
70 let stream = stream.clone();
71 task::spawn(async move {
72 let opt_waiter = match stream.wait_establish().await {
73 Ok(_) => {
74 let state = &mut *ca.0.state.write().unwrap();
75 match state {
76 ConnectPackageStreamState::Connecting1(ref mut waiter) => {
77 let waiter = Some(waiter.transfer());
78 *state = ConnectPackageStreamState::Closed;
79 waiter
80 },
81 ConnectPackageStreamState::Establish => {
82 None
84 },
85 _ => {
86 *state = ConnectPackageStreamState::Closed;
87 None
88 }
89 }
90 },
91 Err(_) => {
92 let state = &mut *ca.0.state.write().unwrap();
93 match state {
94 ConnectPackageStreamState::Connecting1(ref mut waiter) => {
95 let waiter = Some(waiter.transfer());
96 *state = ConnectPackageStreamState::Closed;
97 waiter
98 },
99 _ => {
100 *state = ConnectPackageStreamState::Closed;
101 None
102 }
103 }
104 }
105 };
106
107 if let Some(waiter) = opt_waiter {
108 waiter.wake()
109 }
110 });
111 }
112 a
113 }
114
115 pub fn begin(&self) {
116 let ca = self.clone();
118 let syn_session_data = ca.0.stream.syn_session_data();
119 if let Some(syn_session_data) = syn_session_data {
120 let resend_interval = ca.0.stream.stack().config().stream.stream.package.connect_resend_interval;
121 task::spawn(async move {
122 loop {
124 match ca.state() {
125 ConnectStreamState::Connecting1 => {
126 trace!("{} send sync session data", ca);
127 if let Some(tunnel) = ca.0.stream.tunnel() {
128 let _ = tunnel.send_packages(vec![DynamicPackage::from(syn_session_data.clone_with_data())]);
129 } else {
130 break;
131 }
132 },
133 _ => break
134 };
135 future::timeout(resend_interval, future::pending::<()>()).await.err();
136 }
137 });
138 } else {
139 debug!("{} ingore sync sync session data proc for stream not in connecting state", self);
140 }
141 }
142}
143
144#[async_trait]
145impl BuildTunnelAction for ConnectPackageStream {
146 fn local(&self) -> &Endpoint {
147 &self.0.local
148 }
149
150 fn remote(&self) -> &Endpoint {
151 &self.0.remote
152 }
153}
154
155#[async_trait]
156impl ConnectStreamAction for ConnectPackageStream {
157 fn clone_as_connect_stream_action(&self) -> DynConnectStreamAction {
158 Box::new(self.clone())
159 }
160
161 fn as_any(&self) -> &dyn std::any::Any {
162 self
163 }
164
165 fn state(&self) -> ConnectStreamState {
166 match &*self.0.state.read().unwrap() {
167 ConnectPackageStreamState::Connecting1(_) => ConnectStreamState::Connecting1,
168 ConnectPackageStreamState::PreEstablish(_) => ConnectStreamState::PreEstablish,
169 ConnectPackageStreamState::Connecting2 => ConnectStreamState::Connecting2,
170 ConnectPackageStreamState::Establish => ConnectStreamState::Establish,
171 ConnectPackageStreamState::Closed => ConnectStreamState::Closed,
172 }
173 }
174 async fn wait_pre_establish(&self) -> ConnectStreamState {
175 let (state, opt_waiter) = match &mut *self.0.state.write().unwrap() {
176 ConnectPackageStreamState::Connecting1(ref mut waiter) => {
177 (ConnectStreamState::Connecting1, Some(waiter.new_waiter()))
178 },
179 ConnectPackageStreamState::PreEstablish(_) => (ConnectStreamState::PreEstablish, None),
180 ConnectPackageStreamState::Connecting2 => (ConnectStreamState::Connecting2, None),
181 ConnectPackageStreamState::Establish => (ConnectStreamState::Establish, None),
182 ConnectPackageStreamState::Closed => (ConnectStreamState::Closed, None),
183 };
184 if let Some(waiter) = opt_waiter {
185 StateWaiter::wait(waiter, | | self.state()).await
186 } else {
187 state
188 }
189 }
190
191 async fn continue_connect(&self) -> BuckyResult<StreamProviderSelector> {
192 let sesstion_data = {
194 let state = &mut *self.0.state.write().unwrap();
195 match state {
196 ConnectPackageStreamState::PreEstablish(syn_ack) => {
197 info!("{} PreEstablish=>Establish", self);
198 let sesstion_data = syn_ack.clone_with_data();
199 *state = ConnectPackageStreamState::Establish;
200 Ok(sesstion_data)
201 },
202 _ => {
203 error!("{} continue connect failed for in state {}", self, state);
204 Err(BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on package stream not pre establish"))
205 }
206 }
207 }?;
208
209 let remote_id = sesstion_data.syn_info.clone().unwrap().from_session_id;
210
211 Ok(StreamProviderSelector::Package(remote_id, Some(sesstion_data)))
212 }
213}
214
215impl OnPackage<SessionData> for ConnectPackageStream {
216 fn on_package(&self, pkg: &SessionData, _: Option<()>) -> Result<OnPackageResult, BuckyError> {
217 if pkg.is_syn() {
218 unreachable!()
219 } else if pkg.is_syn_ack() {
220 let opt_waiter = {
222 let state = &mut *self.0.state.write().unwrap();
223 match state {
224 ConnectPackageStreamState::Connecting1(ref mut waiter) => {
225 info!("{} Connecting1=>PreEstablish", self);
226 let waiter = Some(waiter.transfer());
227 *state = ConnectPackageStreamState::PreEstablish(pkg.clone_with_data());
228 waiter
229 },
230 _ => {None}
231 }
232 };
233 if let Some(waiter) = opt_waiter {
234 waiter.wake();
235 }
236 Ok(OnPackageResult::Handled)
237 } else {
238 unreachable!()
239 }
240 }
241}
242
243impl From<DynConnectStreamAction> for ConnectPackageStream {
244 fn from(action: DynConnectStreamAction) -> Self {
245 action.as_any().downcast_ref::<Self>().unwrap().clone()
246 }
247}