cyfs_bdt/tunnel/builder/connect_stream/
tcp.rs1use log::*;
2use std::{
3 sync::RwLock,
4};
5use async_std::{sync::Arc, task};
6use async_trait::{async_trait};
7use cyfs_base::*;
8use crate::{
9 types::*,
10 protocol::{*, v0::*},
11 interface::*,
12 stream::{StreamContainer, StreamProviderSelector},
13 tunnel::{self, Tunnel, ProxyType},
14 stack::{Stack, WeakStack}
15};
16use super::super::{action::*};
17use super::{action::*};
18
19enum ConnectTcpStreamState {
20 Connecting1(StateWaiter),
21 PreEstablish(tcp::Interface),
22 Connecting2,
23 Establish,
24 Closed
25}
26
27struct ConnectTcpStreamImpl {
28 stack: WeakStack,
29 tunnel: tunnel::tcp::Tunnel,
30 stream: StreamContainer,
31 state: RwLock<ConnectTcpStreamState>,
32}
33
34impl std::fmt::Display for ConnectTcpStream {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "ConnectTcpStream{{stream:{},local:{},remote:{}}}", self.0.stream, self.local(), self.remote())
37 }
38}
39
40#[derive(Clone)]
41pub struct ConnectTcpStream(Arc<ConnectTcpStreamImpl>);
42
43impl ConnectTcpStream {
44 pub fn new(
45 stack: WeakStack,
46 stream: StreamContainer,
47 tunnel: tunnel::tcp::Tunnel) -> Self {
48 let a = Self(Arc::new(ConnectTcpStreamImpl {
49 stack,
50 stream: stream.clone(),
51 tunnel,
52 state: RwLock::new(ConnectTcpStreamState::Connecting1(StateWaiter::new()))
53 }));
54
55 {
56 let ca = a.clone();
60 let stream = stream.clone();
61 task::spawn(async move {
62 let (opt_waiter, tunnel_interface) = match stream.wait_establish().await {
63 Ok(_) => {
64 let state = &mut *ca.0.state.write().unwrap();
65 match state {
66 ConnectTcpStreamState::Connecting1(ref mut waiter) => {
67 let waiter = Some(waiter.transfer());
68 *state = ConnectTcpStreamState::Closed;
69 (waiter, None)
70 },
71 ConnectTcpStreamState::PreEstablish(interface) => {
72 let interface = interface.clone();
73 *state = ConnectTcpStreamState::Closed;
74 (None, Some(interface))
75 },
76 ConnectTcpStreamState::Establish => {
77 (None, None)
79 },
80 _ => {
81 *state = ConnectTcpStreamState::Closed;
82 (None, None)
83 }
84 }
85 },
86 Err(_) => {
87 let state = &mut *ca.0.state.write().unwrap();
88 let ret = match state {
89 ConnectTcpStreamState::Connecting1(ref mut waiter) => {
90 let waiter = Some(waiter.transfer());
91 (waiter, None)
92 },
93 ConnectTcpStreamState::PreEstablish(interface) => {
94 (None, Some(interface.clone()))
95 },
96 _ => {
97 (None, None)
98 }
99 };
100 *state = ConnectTcpStreamState::Closed;
101 ret
102 }
103 };
104
105 if let Some(waiter) = opt_waiter {
106 waiter.wake()
107 }
108
109 if let Some(interface) = tunnel_interface {
110 let _ = ca.0.tunnel.connect_with_interface(interface);
111 }
112 });
113 }
114
115 {
116 let ca = a.clone();
119 task::spawn(async move {
120 if let Some(tunnel) = ca.0.stream.tunnel() {
121 let keystore = tunnel.stack().keystore().clone();
122 let key = keystore.create_key(tunnel.remote_const(), false);
123 let connect_result = tcp::Interface::connect(*ca.remote(),
125 tunnel.remote().clone(),
126 tunnel.remote_const().clone(),
127 key.key,
128 Stack::from(&ca.0.stack).config().tunnel.tcp.connect_timeout
129 ).await;
130
131
132 let opt_waiter = match connect_result {
133 Ok(interface) => {
134 let state = &mut *ca.0.state.write().unwrap();
135 match state {
136 ConnectTcpStreamState::Connecting1(waiter) => {
137 debug!("{} Connecting1=>PreEstablish", ca);
138 let waiter = Some(waiter.transfer());
139 *state = ConnectTcpStreamState::PreEstablish(interface);
140 waiter
141 },
142 _ => {
143 None
144 }
145 }
146 },
147 Err(_) => {
148 ca.0.tunnel.mark_dead(ca.0.tunnel.state());
149 let state = &mut *ca.0.state.write().unwrap();
150 match state {
151 ConnectTcpStreamState::Connecting1(waiter) => {
152 debug!("{} Connecting1=>Closed", ca);
153 let waiter = Some(waiter.transfer());
154 *state = ConnectTcpStreamState::Closed;
155 waiter
156 },
157 _ => {
158 *state = ConnectTcpStreamState::Closed;
159 None
160 }
161 }
162 }
163 };
164
165 if let Some(waiter) = opt_waiter {
166 waiter.wake()
167 }
168 }
169 });
170 }
171
172 a
173 }
174}
175
176impl BuildTunnelAction for ConnectTcpStream {
177 fn local(&self) -> &Endpoint {
178 &self.0.tunnel.local()
179 }
180
181 fn remote(&self) -> &Endpoint {
182 &self.0.tunnel.remote()
183 }
184}
185
186#[async_trait]
187impl ConnectStreamAction for ConnectTcpStream {
188 fn clone_as_connect_stream_action(&self) -> DynConnectStreamAction {
189 Box::new(self.clone())
190 }
191
192 fn as_any(&self) -> &dyn std::any::Any {
193 self
194 }
195
196 fn state(&self) -> ConnectStreamState {
197 match &*self.0.state.read().unwrap() {
198 ConnectTcpStreamState::Connecting1(_) => ConnectStreamState::Connecting1,
199 ConnectTcpStreamState::PreEstablish(_) => ConnectStreamState::PreEstablish,
200 ConnectTcpStreamState::Connecting2 => ConnectStreamState::Connecting2,
201 ConnectTcpStreamState::Establish => ConnectStreamState::Establish,
202 ConnectTcpStreamState::Closed => ConnectStreamState::Closed,
203 }
204 }
205 async fn wait_pre_establish(&self) -> ConnectStreamState {
206 let (state, opt_waiter) = match &mut *self.0.state.write().unwrap() {
207 ConnectTcpStreamState::Connecting1(ref mut waiter) => {
208 (ConnectStreamState::Connecting1, Some(waiter.new_waiter()))
209 },
210 ConnectTcpStreamState::PreEstablish(_) => (ConnectStreamState::PreEstablish, None),
211 ConnectTcpStreamState::Connecting2 => (ConnectStreamState::Connecting2, None),
212 ConnectTcpStreamState::Establish => (ConnectStreamState::Establish, None),
213 ConnectTcpStreamState::Closed => (ConnectStreamState::Closed, None),
214 };
215 if let Some(waiter) = opt_waiter {
216 StateWaiter::wait(waiter, | | self.state()).await
217 } else {
218 state
219 }
220 }
221
222 async fn continue_connect(&self) -> BuckyResult<StreamProviderSelector> {
223 let interface = {
225 let state = &mut *self.0.state.write().unwrap();
226 match state {
227 ConnectTcpStreamState::PreEstablish(interface) => {
228 debug!("{} PreEstablish=>Connecting2", self);
229 let interface = interface.clone();
230 *state = ConnectTcpStreamState::Connecting2;
231 Ok(interface)
232 },
233 _ => {
234 let err = BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on tcp stream not pre establish");
235 debug!("{} continue_connect failed for err", self);
236 Err(err)
237 }
238 }
239 }?;
240 let syn_stream = self.0.stream.syn_tcp_stream().ok_or_else(|| BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on stream not connecting"))?;
241 let ack = match interface.confirm_connect(&Stack::from(&self.0.stack), vec![DynamicPackage::from(syn_stream)], Stack::from(&self.0.stack).config().tunnel.tcp.confirm_timeout).await {
242 Ok(resp_box) => {
243 let packages = resp_box.packages_no_exchange();
244 if packages.len() == 1 && packages[0].cmd_code() == PackageCmdCode::TcpAckConnection {
245 let ack: &TcpAckConnection = packages[0].as_ref();
246 let _ = self.0.tunnel.pre_active(ack.to_device_desc.body().as_ref().unwrap().update_time());
249 let state = &mut *self.0.state.write().unwrap();
250 match state {
251 ConnectTcpStreamState::Connecting2 => {
252 *state = ConnectTcpStreamState::Establish;
253 Ok(ack.clone())
254 },
255 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tcp stram got ack but action not in connecting2 state"))
256 }
257 } else {
258 let state = &mut *self.0.state.write().unwrap();
259 *state = ConnectTcpStreamState::Closed;
260 Err(BuckyError::new(BuckyErrorCode::InvalidInput, "tcp stream got error confirm when expecting TcpAckConnection"))
261 }
262 },
263 Err(e) => {
264 let state = &mut *self.0.state.write().unwrap();
265 *state = ConnectTcpStreamState::Closed;
266 Err(e)
267 }
268 }.map_err(|e| {
269 e
271 })?;
272
273 Ok(StreamProviderSelector::Tcp(
274 interface.socket().clone(),
275 interface.key().clone(),
276 Some(ack.clone())))
277 }
278}
279
280
281enum AcceptReverseTcpStreamState {
282 Connecting1(StateWaiter),
283 PreEstablish(tcp::AcceptInterface),
284 Connecting2,
285 Establish,
286 Closed
287}
288
289struct AcceptReverseTcpStreamImpl {
290 local: Endpoint,
291 remote: Endpoint,
292 stream: StreamContainer,
293 state: RwLock<AcceptReverseTcpStreamState>
294}
295
296impl std::fmt::Display for AcceptReverseTcpStream {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(f, "AcceptReverseTcpStream{{stream:{},local:{},remote:{}}}", self.0.stream, self.0.local, self.0.remote)
299 }
300}
301
302#[derive(Clone)]
303pub struct AcceptReverseTcpStream(Arc<AcceptReverseTcpStreamImpl>);
304
305impl AcceptReverseTcpStream {
306 pub fn new(stream: StreamContainer, local: Endpoint, remote: Endpoint) -> Self {
307 let a = Self(Arc::new(AcceptReverseTcpStreamImpl {
308 local,
309 remote,
310 stream: stream.clone(),
311 state: RwLock::new(AcceptReverseTcpStreamState::Connecting1(StateWaiter::new()))
312 }));
313
314 {
315 let ca = a.clone();
318 let stream = stream.clone();
319 task::spawn(async move {
320 let (waiter, interface) = match stream.wait_establish().await {
321 Ok(_) => {
322 let state = &mut *ca.0.state.write().unwrap();
323 match state {
324 AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
325 let waiter = Some(waiter.transfer());
326 *state = AcceptReverseTcpStreamState::Closed;
327 (waiter, None)
328 },
329 AcceptReverseTcpStreamState::PreEstablish(interface) => {
330 (None, Some(interface.clone()))
331 },
332 AcceptReverseTcpStreamState::Establish => {
333 (None, None)
335 },
336 _ => {
337 *state = AcceptReverseTcpStreamState::Closed;
338 (None, None)
339 }
340 }
341 },
342 Err(_) => {
343 let state = &mut *ca.0.state.write().unwrap();
344 match state {
345 AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
346 let waiter = Some(waiter.transfer());
347 *state = AcceptReverseTcpStreamState::Closed;
348 (waiter, None)
349 },
350 AcceptReverseTcpStreamState::PreEstablish(interface) => {
351 (None, Some(interface.clone()))
352 },
353 _ => {
354 *state = AcceptReverseTcpStreamState::Closed;
355 (None, None)
356 }
357 }
358 }
359 };
360
361 if let Some(waiter) = waiter {
362 waiter.wake()
363 }
364
365 if let Some(interface) = interface {
366 let ack_ack_stream = stream.ack_ack_tcp_stream(TCP_ACK_CONNECTION_RESULT_REFUSED);
367 let _ = match interface.confirm_accept(vec![DynamicPackage::from(ack_ack_stream)]).await {
368 Ok(_) => {
369 debug!("{} confirm {} with refuse tcp connection ", stream, interface);
370 },
371 Err(e) => {
372 warn!("{} confirm {} with tcp ack ack connection failed for {}", stream, interface, e);
373 if let Some(tunnel) = stream.tunnel() {
374 let tunnel = tunnel.create_tunnel::<tunnel::tcp::Tunnel>(EndpointPair::from((*interface.local(), Endpoint::default_tcp(interface.local()))), ProxyType::None);
375 if let Ok((tunnel, _)) = tunnel {
376 tunnel.mark_dead(tunnel.state());
377 }
378 }
379 }
380 };
381 }
382 });
383 }
384
385
386 a
387 }
388}
389
390
391impl BuildTunnelAction for AcceptReverseTcpStream {
392 fn local(&self) -> &Endpoint {
393 &self.0.local
394 }
395
396 fn remote(&self) -> &Endpoint {
397 &self.0.remote
398 }
399}
400
401#[async_trait]
402impl ConnectStreamAction for AcceptReverseTcpStream {
403 fn clone_as_connect_stream_action(&self) -> DynConnectStreamAction {
404 Box::new(self.clone())
405 }
406
407 fn as_any(&self) -> &dyn std::any::Any {
408 self
409 }
410
411 fn state(&self) -> ConnectStreamState {
412 match &*self.0.state.read().unwrap() {
413 AcceptReverseTcpStreamState::Connecting1(_) => ConnectStreamState::Connecting1,
414 AcceptReverseTcpStreamState::PreEstablish(_) => ConnectStreamState::PreEstablish,
415 AcceptReverseTcpStreamState::Connecting2 => ConnectStreamState::Connecting2,
416 AcceptReverseTcpStreamState::Establish => ConnectStreamState::Establish,
417 AcceptReverseTcpStreamState::Closed => ConnectStreamState::Closed,
418 }
419 }
420
421 async fn wait_pre_establish(&self) -> ConnectStreamState {
422 let (state, opt_waiter) = match &mut *self.0.state.write().unwrap() {
423 AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
424 (ConnectStreamState::Connecting1, Some(waiter.new_waiter()))
425 },
426 AcceptReverseTcpStreamState::PreEstablish(_) => (ConnectStreamState::PreEstablish, None),
427 AcceptReverseTcpStreamState::Connecting2 => (ConnectStreamState::Connecting2, None),
428 AcceptReverseTcpStreamState::Establish => (ConnectStreamState::Establish, None),
429 AcceptReverseTcpStreamState::Closed => (ConnectStreamState::Closed, None),
430 };
431 if let Some(waiter) = opt_waiter {
432 StateWaiter::wait(waiter, | | self.state()).await
433 } else {
434 state
435 }
436 }
437
438 async fn continue_connect(&self) -> BuckyResult<StreamProviderSelector> {
439 let interface = {
441 let state = &mut *self.0.state.write().unwrap();
442 match state {
443 AcceptReverseTcpStreamState::PreEstablish(interface) => {
444 let interface = interface.clone();
445 *state = AcceptReverseTcpStreamState::Connecting2;
446 Ok(interface)
447 },
448 _ => {
449 Err(BuckyError::new(BuckyErrorCode::ErrorState, "continue connect on tcp stream not pre establish"))
450 }
451 }
452 }?;
453 let ack_ack_stream = self.0.stream.ack_ack_tcp_stream(TCP_ACK_CONNECTION_RESULT_OK);
454 let _ = match interface.confirm_accept(vec![DynamicPackage::from(ack_ack_stream)]).await {
455 Ok(_) => {
456 let state = &mut *self.0.state.write().unwrap();
457 match state {
458 AcceptReverseTcpStreamState::Connecting2 => {
459 *state = AcceptReverseTcpStreamState::Establish;
460 Ok(())
461 },
462 _ => Err(BuckyError::new(BuckyErrorCode::ErrorState, "tcp stram got ack but action not in connecting2 state"))
463 }
464 },
465 Err(e) => {
466 let state = &mut *self.0.state.write().unwrap();
467 *state = AcceptReverseTcpStreamState::Closed;
468 Err(e)
469 }
470 }?;
471
472 Ok(StreamProviderSelector::Tcp(
473 interface.socket().clone(),
474 interface.key().clone(),
475 None))
476 }
477}
478
479impl OnPackage<TcpAckConnection, tcp::AcceptInterface> for AcceptReverseTcpStream {
480 fn on_package(&self, _pkg: &TcpAckConnection, interface: tcp::AcceptInterface) -> Result<OnPackageResult, BuckyError> {
481 let waiter = {
483 let state = &mut *self.0.state.write().unwrap();
484 match state {
485 AcceptReverseTcpStreamState::Connecting1(ref mut waiter) => {
486 debug!("{} Connecting1=>PreEstablish", self);
487 let waiter = waiter.transfer();
488 *state = AcceptReverseTcpStreamState::PreEstablish(interface);
489 Ok(waiter)
490 },
491 _ => {
492 debug!("{} ingnore tcp ack connection for not in connecting1", self);
493 Err(BuckyError::new(BuckyErrorCode::ErrorState, "not in connecting1"))
494 }
495 }
496 }?;
497 waiter.wake();
498 Ok(OnPackageResult::Handled)
499 }
500}
501
502impl From<DynConnectStreamAction> for AcceptReverseTcpStream {
503 fn from(action: DynConnectStreamAction) -> Self {
504 action.as_any().downcast_ref::<Self>().unwrap().clone()
505 }
506}