1mod dep {
2 pub use super::super::{
3 package::PackageStream, stream_provider::StreamProvider, tcp::TcpStream,
4 };
5 pub use crate::{
6 types::*,
7 interface::*,
8 protocol::{*, v0::*},
9 stack::{Stack, WeakStack},
10 tunnel::{
11 self, AcceptReverseTcpStream, AcceptStreamBuilder, BuildTunnelAction,
12 BuildTunnelParams, ConnectPackageStream, ConnectStreamAction, ConnectStreamBuilder,
13 ConnectStreamState, ConnectTcpStream, ProxyType, StreamConnectorSelector, Tunnel,
14 TunnelBuilder, TunnelContainer, TunnelGuard, TunnelState,
15 },
16 types::*,
17 };
18 pub use async_std::{future, sync::Arc, task};
19 pub use cyfs_base::*;
20 pub use futures::future::{AbortHandle, Abortable, Aborted};
21 pub use std::{fmt, future::Future, net::Shutdown, ops::Deref, sync::RwLock, time::Duration};
22}
23
24const ANSWER_MAX_LEN: usize = 1024*25;
25
26mod connector {
27 use super::dep::*;
28 use super::StreamContainer;
29
30 pub struct Connector {
31 pub question: Vec<u8>,
32 pub waiter: StateWaiter,
33 pub state: State,
34 pub start_at: Timestamp,
35 }
36
37 pub enum State {
38 Unknown,
39 Tcp(TcpProvider),
40 Reverse(ReverseProvider),
41 Package(PackageProvider),
42 Builder(BuilderProvider),
43 }
44
45 impl State {
46 pub fn remote_timestamp(&self) -> Option<Timestamp> {
47 match self {
48 Self::Unknown => None,
49 Self::Tcp(tcp) => Some(tcp.1),
50 Self::Reverse(tcp) => Some(tcp.remote_timestamp),
51 Self::Package(package) => Some(package.1),
52 Self::Builder(_) => None,
53 }
54 }
55 pub fn from(
56 params: (&StreamContainer, StreamConnectorSelector),
57 ) -> (Self, Box<dyn Provider>) {
58 match params.1 {
59 StreamConnectorSelector::Package(remote_timestamp) => {
60 let provider = PackageProvider::new(params.0, remote_timestamp);
61 (State::Package(provider.clone()), Box::new(provider))
62 }
63 StreamConnectorSelector::Tcp(tunnel, remote_timestamp) => {
64 if !tunnel.is_reverse() {
65 let provider = TcpProvider(tunnel, remote_timestamp);
66 (State::Tcp(provider.clone()), Box::new(provider))
67 } else {
68 let provider = ReverseProvider::new(params.0, tunnel, remote_timestamp);
69 (State::Reverse(provider.clone()), Box::new(provider))
70 }
71 }
72 StreamConnectorSelector::Builder(builder) => {
73 let provider = BuilderProvider(builder);
74 (State::Builder(provider.clone()), Box::new(provider))
75 }
76 }
77 }
78 }
79
80 pub trait Provider {
81 fn begin_connect(&self, stream: &StreamContainer);
82 }
83
84 #[derive(Clone)]
85 pub struct TcpProvider(tunnel::tcp::Tunnel, Timestamp);
86
87 impl fmt::Display for TcpProvider {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "TcpProvider:{{tunnel:{}, remote_timestamp:{}}}",
92 self.0, self.1
93 )
94 }
95 }
96
97 impl Provider for TcpProvider {
98 fn begin_connect(&self, stream: &StreamContainer) {
99 debug!("{} connect with {}", stream, self);
100 let action = ConnectTcpStream::new(
101 stream.0.stack.clone(),
102 stream.clone(),
103 self.0.clone(),
104 );
105 let stream = stream.clone();
106 task::spawn(async move {
107 match action.wait_pre_establish().await {
108 ConnectStreamState::PreEstablish => match action.continue_connect().await {
109 Ok(selector) => {
110 let _ = stream.establish_with(selector).await;
111 }
112 Err(err) => {
113 let _ = stream.cancel_connecting_with(&err);
114 }
115 },
116 _ => {
117 let _ = stream.cancel_connecting_with(&BuckyError::new(
118 BuckyErrorCode::ErrorState,
119 "action not pre establish",
120 ));
121 }
122 }
123 });
124 }
125 }
126
127 #[derive(Clone)]
128 pub struct PackageProvider(ConnectPackageStream, Timestamp);
129
130 impl PackageProvider {
131 fn new(stream: &StreamContainer, remote_timestamp: Timestamp) -> Self {
132 Self(ConnectPackageStream::new(stream.clone()), remote_timestamp)
133 }
134 }
135
136 impl fmt::Display for PackageProvider {
137 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138 write!(f, "PackageProvider:{{remote_timestamp:{}}}", self.1)
139 }
140 }
141
142 impl Provider for PackageProvider {
143 fn begin_connect(&self, stream: &StreamContainer) {
144 debug!("{} connect with {}", stream, self);
145 let action = self.0.clone();
146 let stream = stream.clone();
147 task::spawn(async move {
148 match action.wait_pre_establish().await {
149 ConnectStreamState::PreEstablish => match action.continue_connect().await {
150 Ok(selector) => {
151 let _ = stream.establish_with(selector).await;
152 }
153 Err(err) => {
154 let _ = stream.cancel_connecting_with(&err);
155 }
156 },
157 _ => {
158 let _ = stream.cancel_connecting_with(&BuckyError::new(
159 BuckyErrorCode::ErrorState,
160 "action not pre establish",
161 ));
162 }
163 }
164 });
165
166 self.0.begin();
167 }
168 }
169
170 impl AsRef<ConnectPackageStream> for PackageProvider {
171 fn as_ref(&self) -> &ConnectPackageStream {
172 &self.0
173 }
174 }
175
176 #[derive(Clone)]
177 pub struct ReverseProvider {
178 pub remote_timestamp: Timestamp,
179 pub local: Endpoint,
180 pub action: AcceptReverseTcpStream,
181 }
182
183 impl fmt::Display for ReverseProvider {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 write!(
186 f,
187 "ReverseProvider:{{local:{}, remote_timestamp:{}}}",
188 self.local, self.remote_timestamp
189 )
190 }
191 }
192
193 impl ReverseProvider {
194 fn new(
195 stream: &StreamContainer,
196 tunnel: tunnel::tcp::Tunnel,
197 remote_timestamp: Timestamp,
198 ) -> Self {
199 Self {
200 remote_timestamp,
201 local: *tunnel.local(),
202 action: AcceptReverseTcpStream::new(
203 stream.clone(),
204 *tunnel.local(),
205 *tunnel.remote(),
206 ),
207 }
208 }
209 }
210
211 impl Provider for ReverseProvider {
212 fn begin_connect(&self, stream: &StreamContainer) {
213 debug!("{} connect with {}", stream, self);
214 let provider = self.clone();
215 let stream = stream.clone();
216 let mut syn_tcp = stream.syn_tcp_stream().unwrap();
217
218 let stack = stream.stack();
219 let listener = stack.net_manager().listener();
220 let mut endpoints = vec![];
221 for t in listener.tcp() {
222 let outer = t.outer();
223 if outer.is_some() {
224 let outer = outer.unwrap();
225 if outer == self.local || t.local() == self.local {
226 endpoints.push(outer);
227 }
228 } else {
229 endpoints.push(self.local);
230 }
231 }
232 syn_tcp.reverse_endpoint = Some(endpoints);
233
234 task::spawn(async move {
235 loop {
236 match future::timeout(
237 stream.stack().config().stream.stream.package.connect_resend_interval,
238 provider.action.wait_pre_establish(),
239 )
240 .await
241 {
242 Err(_) => {
243 if let Some(tunnel) = stream.tunnel() {
244 let _ = tunnel.send_packages(vec![DynamicPackage::from(syn_tcp.clone())]);
245 }
246 }
247 Ok(state) => {
248 match state {
249 ConnectStreamState::PreEstablish => {
250 match provider.action.continue_connect().await {
251 Ok(selector) => {
252 let _ = stream.establish_with(selector).await;
253 }
254 Err(err) => {
255 let _ = stream.cancel_connecting_with(&err);
256 }
257 }
258 }
259 _ => {
260 let _ =
261 stream.cancel_connecting_with(&BuckyError::new(
262 BuckyErrorCode::ErrorState,
263 "action not pre establish",
264 ));
265 }
266 };
267 break;
268 }
269 }
270 }
271 });
272 }
273 }
274
275 #[derive(Clone)]
276 pub struct BuilderProvider(ConnectStreamBuilder);
277
278 impl Provider for BuilderProvider {
279 fn begin_connect(&self, _stream: &StreamContainer) {
280 let builder = self.0.clone();
281 task::spawn(async move {
282 builder.build().await;
283 });
284 }
285 }
286
287 impl AsRef<ConnectStreamBuilder> for BuilderProvider {
288 fn as_ref(&self) -> &ConnectStreamBuilder {
289 &self.0
290 }
291 }
292}
293
294mod acceptor {
295 use super::dep::*;
296
297 pub struct Acceptor {
298 pub remote_id: IncreaseId,
299 pub waiter: StateWaiter,
300 pub builder: AcceptStreamBuilder,
301 }
302}
303
304use async_std::io::prelude::{Read, Write};
305pub use dep::Shutdown;
306use dep::*;
307use futures::io::ErrorKind;
308use log::*;
309use std::pin::Pin;
310use std::task::{Context, Poll, Waker};
311
312enum StreamConnectingState {
313 Connect(connector::Connector),
314 Accept(acceptor::Acceptor),
315}
316
317struct StreamEstablishState {
318 start_at: Timestamp,
319 remote_timestamp: Timestamp,
320 provider: Box<dyn StreamProvider>,
321}
322
323#[derive(Clone)]
324pub struct Config {
325 pub retry_sn_timeout: Duration,
326 pub connect_timeout: Duration,
327 pub nagle: Duration,
328 pub recv_timeout: Duration,
329 pub recv_buffer: usize,
330 pub send_buffer: usize,
331 pub drain: f32,
332 pub tcp: super::tcp::Config,
333 pub package: super::package::Config,
334}
335
336impl Config {
337 pub fn recv_drain(&self) -> usize {
338 (self.recv_buffer as f32 * self.drain) as usize
339 }
340
341 pub fn send_drain(&self) -> usize {
342 (self.send_buffer as f32 * self.drain) as usize
343 }
344}
345
346#[derive(Eq, PartialEq, Debug)]
347pub enum StreamState {
348 Connecting,
349 Establish(Timestamp),
350 Closing,
351 Closed,
352}
353impl fmt::Display for StreamState {
354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355 match self {
356 StreamState::Connecting => write!(f, "StreamState::Connecting"),
357 StreamState::Establish(remote_timestamp) => {
358 write!(f, "StreamState::Establish({})", remote_timestamp)
359 }
360 StreamState::Closing => write!(f, "StreamState::Closing"),
361 StreamState::Closed => write!(f, "StreamState::Closed"),
362 }
363 }
364}
365
366enum StreamStateImpl {
367 Initial(TunnelGuard),
368 Connecting(StreamConnectingState, TunnelGuard),
369 Establish(StreamEstablishState, TunnelGuard),
370 Closing(StreamEstablishState, TunnelGuard),
371 Closed,
372}
373
374struct PackageStreamProviderSelector {}
375pub enum StreamProviderSelector {
376 Package(IncreaseId , Option<SessionData>),
377 Tcp(async_std::net::TcpStream, MixAesKey, Option<TcpAckConnection>),
378}
379
380struct StreamContainerImpl {
381 stack: WeakStack,
382 remote_device: DeviceId,
383 remote_port: u16,
384 local_id: IncreaseId,
385 sequence: TempSeq,
386 state: RwLock<StreamStateImpl>,
387 answer_data: RwLock<Option<Vec<u8>>>,
388}
389
390#[derive(Clone)]
391pub struct StreamContainer(Arc<StreamContainerImpl>);
392
393
394impl fmt::Display for StreamContainer {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 write!(
397 f,
398 "StreamContainer {{sequence:{:?}, local:{}, remote:{}, port:{}, id:{} }}",
399 self.sequence(),
400 self.stack().local_device_id(),
401 self.remote().0,
402 self.remote().1,
403 self.local_id()
404 )
405 }
406}
407
408impl StreamContainer {
409 pub(super) fn new(
410 weak_stack: WeakStack,
411 tunnel: TunnelGuard,
412 remote_port: u16,
413 local_id: IncreaseId,
414 sequence: TempSeq,
415 ) -> StreamContainer {
416 let stream_impl = StreamContainerImpl {
417 stack: weak_stack,
418 remote_device: tunnel.remote().clone(),
419 remote_port,
420 local_id,
421 sequence: sequence,
422 state: RwLock::new(StreamStateImpl::Initial(tunnel)),
423 answer_data: RwLock::new(None)
424 };
425
426 StreamContainer(Arc::new(stream_impl))
427 }
428
429 pub(super) fn accept(&self, remote_id: IncreaseId) {
431 let state = &mut *self.0.state.write().unwrap();
432 if let StreamStateImpl::Initial(tunnel) = &*state {
433 info!("{} initial=>accepting remote_id {}", self, remote_id);
434 *state =
435 StreamStateImpl::Connecting(StreamConnectingState::Accept(acceptor::Acceptor {
436 remote_id,
437 waiter: StateWaiter::new(),
438 builder: AcceptStreamBuilder::new(self.0.stack.clone(), self.clone(), tunnel.as_ref().clone()),
439 }), tunnel.clone());
440 let stream = self.clone();
441 task::spawn(async move {
442 match future::timeout(
443 stream
444 .stack()
445 .config()
446 .stream
447 .stream
448 .connect_timeout,
449 stream.wait_establish(),
450 )
451 .await
452 {
453 Ok(r) => r,
454 Err(err) => {
455 let err = err.into();
456 match stream.cancel_connecting_with(&err) {
457 Ok(_) => Err(err),
458 Err(_) => Ok(()),
459 }
460 }
461 }
462 });
463 } else {
464 unreachable!("not initial")
465 }
466 }
467
468 pub(super) async fn connect(
470 &self,
471 question: Vec<u8>,
472 build_params: BuildTunnelParams,
473 ) -> BuckyResult<()> {
474 let connector = connector::Connector {
476 question: Vec::from(question),
477 waiter: StateWaiter::new(),
478 state: connector::State::Unknown,
479 start_at: bucky_time_now(),
480 };
481
482 let tunnel = {
484 let mut state = self.0.state.write().unwrap();
485 if let StreamStateImpl::Initial(tunnel) = &*state {
486 info!("{} initial=>connecting", self);
487 let tunnel = tunnel.clone();
488 *state = StreamStateImpl::Connecting(StreamConnectingState::Connect(connector), tunnel.clone());
489 tunnel
490 } else {
491 unreachable!("not initial")
492 }
493 };
494
495 let _ = match tunnel.select_stream_connector(build_params, self.clone()).await
497 {
498 Ok(selector) => {
499 let (connector_state, connector_provider) =
500 connector::State::from((self, selector));
501 {
503 let state = &mut *self.0.state.write().unwrap();
504 if let StreamStateImpl::Connecting(ref mut connecting, _) = state {
505 if let StreamConnectingState::Connect(ref mut connector) = connecting {
506 if let connector::State::Unknown = connector.state {
507 connector.state = connector_state;
508 connector.start_at = bucky_time_now();
509 } else {
510 unreachable!()
511 }
512 } else {
513 unreachable!()
514 }
515 } else {
516 unreachable!()
517 }
518 }
519 connector_provider.begin_connect(self);
520 Ok(())
521 }
522 Err(err) => Err(err),
523 }?;
524
525 match future::timeout(
526 self.stack().config().stream.stream.connect_timeout,
527 self.wait_establish(),
528 )
529 .await
530 {
531 Ok(r) => r,
532 Err(err) => {
533 let err = err.into();
534 match self.cancel_connecting_with(&err) {
535 Ok(_) => Err(err),
536 Err(_) => Ok(()),
537 }
538 }
539 }
540 }
541
542 pub(crate) async fn establish_with(
544 &self,
545 selector: StreamProviderSelector
546 ) -> BuckyResult<()> {
547 let tunnel = self.tunnel()
548 .ok_or_else(|| BuckyError::new(
549 BuckyErrorCode::ErrorState,
550 "tunnel not active",
551 )).map_err(|e| {
552 error!("{} try establish failed for {}", self, e);
553 e
554 })?;
555 let remote_timestamp = match tunnel.wait_active().await {
556 TunnelState::Active(remote_timetamp) => Ok(remote_timetamp),
557 _ => Err(BuckyError::new(
558 BuckyErrorCode::ErrorState,
559 "tunnel not active",
560 )),
561 }
562 .map_err(|e| {
563 error!("{} try establish failed for {}", self, e);
564 e
565 })?;
566
567 let (provider, provider_stub, answer_data) = match selector {
568 StreamProviderSelector::Package(remote_id, ack) => {
569 let answer_data = match ack {
570 Some(session_data) => {
571 if session_data.payload.as_ref().len() > 0 {
572 let mut answer = vec![0; session_data.payload.as_ref().len()];
573 answer.copy_from_slice(session_data.payload.as_ref());
574 answer
575 } else {
576 vec![]
577 }
578 },
579 _ => vec![],
580 };
581
582 let stream = PackageStream::new(self, tunnel.as_ref(), self.local_id().clone(), remote_id)?;
583 (
584 Box::new(stream.clone()) as Box<dyn StreamProvider>,
585 Box::new(stream) as Box<dyn StreamProvider>,
586 answer_data,
587 )
588 }
589 StreamProviderSelector::Tcp(socket, key, ack) => {
590 let answer_data = match ack {
591 Some(tcp_ack_connection) => {
592 if tcp_ack_connection.payload.as_ref().len() > 0 {
593 let mut answer = vec![0; tcp_ack_connection.payload.as_ref().len()];
594 answer.copy_from_slice(tcp_ack_connection.payload.as_ref());
595 answer
596 } else {
597 vec![]
598 }
599 },
600 _ => vec![],
601 };
602
603 let stream = TcpStream::new(self.clone(), socket, key.enc_key)?;
604 (
605 Box::new(stream.clone()) as Box<dyn StreamProvider>,
606 Box::new(stream) as Box<dyn StreamProvider>,
607 answer_data,
608 )
609 }
610 };
611
612 let state = &mut *self.0.state.write().unwrap();
613 let waiter = match state {
614 StreamStateImpl::Connecting(ref mut connecting_state, tunnel) => match connecting_state {
615 StreamConnectingState::Accept(ref mut acceptor) => {
616 let waiter = acceptor.waiter.transfer();
617 info!("{} accepting=>establish with provider {}", self, provider);
618 *state = StreamStateImpl::Establish(StreamEstablishState {
619 start_at: bucky_time_now(),
620 remote_timestamp,
621 provider: provider_stub,
622 }, tunnel.clone());
623 Ok(waiter)
624 }
625 StreamConnectingState::Connect(ref mut connector) => {
626 let waiter = connector.waiter.transfer();
627 info!("{} connecting=>establish with {}", self, provider);
628 *state = StreamStateImpl::Establish(StreamEstablishState {
629 start_at: bucky_time_now(),
630 remote_timestamp,
631 provider: provider_stub,
632 }, tunnel.clone());
633
634 if answer_data.len() > 0 {
635 let data = &mut *self.0.answer_data.write().unwrap();
636 *data = Some(answer_data);
637 }
638
639 Ok(waiter)
640 }
641 },
642 _ => Err(BuckyError::new(
643 BuckyErrorCode::ErrorState,
644 "stream not connecting",
645 )),
646 }
647 .map_err(|e| {
648 error!("{} try establish failed for {}", self, e);
649 e
650 })?;
651 provider.start(self);
653 waiter.wake();
655 Ok(())
656 }
657
658 pub(crate) fn cancel_connecting_with(&self, err: &BuckyError) -> BuckyResult<()> {
659 warn!("{} cancel connecting with error: {}", self, err);
660 let state = &mut *self.0.state.write().unwrap();
661 let (waiter, state_dump) = match state {
662 StreamStateImpl::Connecting(ref mut connecting_state, tunnel) => match connecting_state {
663 StreamConnectingState::Accept(ref mut acceptor) => {
664 let waiter = acceptor.waiter.transfer();
665 info!("{} accepting=>closed", self);
666 *state = StreamStateImpl::Closed;
667 Ok((waiter, None))
668 }
669 StreamConnectingState::Connect(ref mut connector) => {
670 let waiter = connector.waiter.transfer();
671 let tunnel = tunnel.clone();
672 info!("{} connecting=>closed", self);
673 let state_dump = connector
674 .state
675 .remote_timestamp()
676 .map(|r| (tunnel, r, connector.start_at));
677 *state = StreamStateImpl::Closed;
678 Ok((waiter, state_dump))
679 }
680 },
681 _ => Err(BuckyError::new(
682 BuckyErrorCode::ErrorState,
683 "stream not connecting",
684 )),
685 }?;
686 if let Some((tunnel, remote_timestamp, start_at)) = state_dump {
687 error!("{} mark tunnel dead", self);
688 let _ = tunnel.mark_dead(remote_timestamp, start_at);
689 }
690 waiter.wake();
692 Ok(())
693 }
694
695 pub(crate) async fn wait_establish(&self) -> BuckyResult<()> {
696 let waiter = {
697 let state = &mut *self.0.state.write().unwrap();
698 match state {
699 StreamStateImpl::Connecting(ref mut connecting, _) => {
700 let waiter = match connecting {
701 StreamConnectingState::Accept(ref mut acceptor) => {
702 acceptor.waiter.new_waiter()
703 }
704 StreamConnectingState::Connect(ref mut connector) => {
705 connector.waiter.new_waiter()
706 }
707 };
708 Ok(Some(waiter))
709 }
710 StreamStateImpl::Establish(..) => Ok(None),
711 _ => {
712 warn!("{} wait establish failed, neither StreamStateImpl::Connecting nor StreamStateImpl::Establish", self);
713 Err(BuckyError::new(
714 BuckyErrorCode::ErrorState,
715 "stream not established",
716 ))
717 }
718 }
719 }?;
720
721 if let Some(waiter) = waiter {
722 match StateWaiter::wait(waiter, || self.state()).await {
723 StreamState::Establish(_) => Ok(()),
724 _ => {
725 error!(
726 "{} wait establish failed, for stream state not establish,state={}",
727 self,
728 self.state()
729 );
730 Err(BuckyError::new(
731 BuckyErrorCode::ErrorState,
732 "stream not established",
733 ))
734 }
735 }
736 } else {
737 Ok(())
738 }
739 }
740
741 pub(crate) fn syn_session_data(&self) -> Option<SessionData> {
742 {
743 match &*self.0.state.read().unwrap() {
744 StreamStateImpl::Connecting(connecting, _) => match connecting {
745 StreamConnectingState::Connect(connector) => Some(connector.question.clone()),
746 _ => {
747 unreachable!()
748 }
749 },
750 _ => None,
751 }
752 }
753 .map(|question| {
754 let mut session = SessionData::new();
755 session.stream_pos = 0;
756 session.syn_info = Some(SessionSynInfo {
757 sequence: self.sequence(),
758 from_session_id: self.local_id().clone(),
759 to_vport: self.remote().1,
760 });
761 session.session_id = self.local_id().clone();
762 session.send_time = bucky_time_now();
763 session.flags_add(SESSIONDATA_FLAG_SYN);
764 session.payload = TailedOwnedData::from(question);
765 session
766 })
767 }
768
769 pub(crate) fn syn_ack_session_data(&self, answer: &[u8]) -> Option<SessionData> {
770 {
771 match &*self.0.state.read().unwrap() {
772 StreamStateImpl::Connecting(connecting, _) => match connecting {
773 StreamConnectingState::Accept(acceptor) => Some(acceptor.remote_id.clone()),
774 _ => {
775 unreachable!()
776 }
777 },
778 _ => None,
779 }
780 }
781 .map(|remote_id| {
782 let mut session = SessionData::new();
783 session.stream_pos = 0;
784 session.syn_info = Some(SessionSynInfo {
785 sequence: self.sequence(),
786 from_session_id: self.local_id().clone(),
787 to_vport: 0,
788 });
789 session.ack_stream_pos = 0;
790 session.send_time = bucky_time_now();
791 session.flags_add(SESSIONDATA_FLAG_SYN | SESSIONDATA_FLAG_ACK);
792 session.to_session_id = Some(remote_id.clone());
793 session.session_id = remote_id;
794 let mut payload = vec![0u8; answer.len()];
795 payload.copy_from_slice(answer);
796 session.payload = TailedOwnedData::from(payload);
797 session
798 })
799 }
800
801 pub(crate) fn syn_tcp_stream(&self) -> Option<TcpSynConnection> {
802 {
803 match &*self.0.state.read().unwrap() {
804 StreamStateImpl::Connecting(connecting, _) => match connecting {
805 StreamConnectingState::Connect(connector) => Some(connector.question.clone()),
806 _ => {
807 unreachable!()
808 }
809 },
810 _ => None,
811 }
812 }
813 .map(|question| {
814 let local_device = self.stack().sn_client().ping().default_local();
815 TcpSynConnection {
816 sequence: self.sequence(),
817 result: 0u8,
818 to_vport: self.remote().1,
819 from_session_id: self.local_id(),
820 from_device_desc: local_device,
821 to_device_id: self.remote().0.clone(),
822 reverse_endpoint: None,
823 payload: TailedOwnedData::from(question),
824 }
825 })
826 }
827
828 pub(crate) fn ack_tcp_stream(&self, answer: &[u8]) -> Option<TcpAckConnection> {
829 {
830 match &*self.0.state.read().unwrap() {
831 StreamStateImpl::Connecting(connecting, _) => match connecting {
832 StreamConnectingState::Accept(acceptor) => Some(acceptor.remote_id.clone()),
833 _ => {
834 unreachable!()
835 }
836 },
837 _ => None,
838 }
839 }
840 .map(|remote_id| {
841 let mut payload = vec![0u8; answer.len()];
842 payload.copy_from_slice(answer);
843
844 TcpAckConnection {
845 sequence: self.sequence(),
846 to_session_id: remote_id,
847 result: TCP_ACK_CONNECTION_RESULT_OK,
848 to_device_desc: self.stack().sn_client().ping().default_local(),
849 payload: TailedOwnedData::from(payload),
850 }
851 })
852 }
853
854 pub(crate) fn ack_ack_tcp_stream(&self, result: u8) -> TcpAckAckConnection {
855 TcpAckAckConnection {
856 sequence: self.sequence(),
857 result,
858 }
859 }
860
861 pub fn tunnel(&self) -> Option<TunnelGuard> {
862 match &*self.0.state.read().unwrap() {
863 StreamStateImpl::Initial(tunnel) => Some(tunnel.clone()),
864 StreamStateImpl::Connecting(_, tunnel) => Some(tunnel.clone()),
865 StreamStateImpl::Establish(_, tunnel) => Some(tunnel.clone()),
866 StreamStateImpl::Closing(_, tunnel) => Some(tunnel.clone()),
867 StreamStateImpl::Closed => None
868 }
869 }
870
871 pub fn state(&self) -> StreamState {
872 match &*self.0.state.read().unwrap() {
873 StreamStateImpl::Initial(_) => unreachable!(),
874 StreamStateImpl::Connecting(..) => StreamState::Connecting,
875 StreamStateImpl::Establish(establish, ..) => {
876 StreamState::Establish(establish.remote_timestamp)
877 }
878 StreamStateImpl::Closing(..) => StreamState::Closing,
879 StreamStateImpl::Closed => StreamState::Closed,
880 }
881 }
882
883 pub(crate) fn is_connecting(&self) -> bool {
884 let state = self.0.state.read().unwrap();
885 let s1 = state.deref();
886 match s1 {
887 StreamStateImpl::Connecting(..) => true,
888 _ => false,
889 }
890 }
891
892 pub(crate) fn acceptor(&self) -> Option<AcceptStreamBuilder> {
893 if let StreamStateImpl::Connecting(connecting, _) = &*self.0.state.read().unwrap() {
894 if let StreamConnectingState::Accept(acceptor) = connecting {
895 return Some(acceptor.builder.clone());
896 }
897 }
898 None
899 }
900
901 pub(crate) fn stack(&self) -> Stack {
902 Stack::from(&self.0.stack)
903 }
904
905 pub(crate) fn break_with_error(&self, err: BuckyError, reserving: bool, marking: bool) {
906 error!("{} break with err {}", self, err);
907 let state_dump = {
908 let state = &mut *self.0.state.write().unwrap();
909 match state {
910 StreamStateImpl::Establish(establish, tunnel) => {
911 let tunnel = tunnel.clone();
912 let state_dump = if marking {
913 Some((tunnel, establish.remote_timestamp, establish.start_at))
914 } else {
915 None
916 };
917 *state = StreamStateImpl::Closed;
918 state_dump
919 }
920 _ => None,
921 }
922 };
923 if let Some((tunnel, remote_timestamp, start_at)) = state_dump {
924 debug!("{} mark tunnel dead for break", self);
925 let _ = tunnel.mark_dead(remote_timestamp, start_at);
926 }
927 self.stack().stream_manager().remove_stream(self, reserving);
928 }
929
930 pub(super) fn on_shutdown(&self, reserving: bool) {
931 *self.0.state.write().unwrap() = StreamStateImpl::Closed;
932 self.stack().stream_manager().remove_stream(self, reserving);
933 }
934
935
936 pub async fn confirm(&self, answer: &[u8]) -> BuckyResult<()> {
937 if answer.len() > ANSWER_MAX_LEN {
938 return Err(BuckyError::new(
939 BuckyErrorCode::Failed,
940 format!("answer's length large than {}", ANSWER_MAX_LEN),
941 ));
942 }
943
944 let builder = {
945 let state = &*self.0.state.read().unwrap();
946 match state {
947 StreamStateImpl::Connecting(connecting, _) => match connecting {
948 StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
949 _ => Err(BuckyError::new(
950 BuckyErrorCode::ErrorState,
951 "confirm on error state",
952 )),
953 },
954 _ => Err(BuckyError::new(
955 BuckyErrorCode::ErrorState,
956 "confirm on error state",
957 )),
958 }?
959 };
960 match builder.confirm(answer) {
961 Err(e) => {
962 error!("{} confirm failed for {}", self, e);
963 Err(e)
964 }
965 Ok(v) => {
966 info!("{} confirmed", self);
967 Ok(v)
968 }
969 }
970 }
971
972 pub fn shutdown(&self, which: Shutdown) -> std::io::Result<()> {
973 info!("{} shutdown", self);
974 let provider = {
975 let state = &*self.0.state.read().unwrap();
976 match state {
977 StreamStateImpl::Establish(s, _) => Ok(Some(s.provider.clone_as_provider())),
978 StreamStateImpl::Closed => Ok(None),
979 _ => {
980 Err(std::io::Error::new(
982 std::io::ErrorKind::Other,
983 "not support when state is not establish",
984 ))
985 }
986 }
987 }
988 .map_err(|e| {
989 error!("{} shutdown failed for {}", self, e);
990 e
991 })?;
992
993 if let Some(provider) = provider {
994 provider.shutdown(which, &self)
995 } else {
996 Ok(())
997 }
998 }
999
1000 pub fn readable(&self) -> StreamReadableFuture {
1001 StreamReadableFuture {
1002 stream: self.clone(),
1003 }
1004 }
1005
1006 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<usize>> {
1007 let provider = {
1008 let state = &*self.0.state.read().unwrap();
1009 match state {
1010 StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1011 _ => None,
1012 }
1013 };
1014 if let Some(provider) = provider {
1015 provider.poll_readable(cx)
1016 } else {
1017 Poll::Ready(Err(std::io::Error::new(
1018 ErrorKind::NotConnected,
1019 "not establish",
1020 )))
1021 }
1022 }
1023
1024 fn has_first_answer(&self) -> bool {
1025 self.0.answer_data.read().unwrap().is_some()
1026 }
1027
1028 fn read_first_answer(&self, buf: &mut [u8]) -> usize {
1029 if self.has_first_answer() {
1030 let answer_data = &mut *self.0.answer_data.write().unwrap();
1031 match answer_data {
1032 Some(answer_buf) => {
1033 let mut read_len = answer_buf.len();
1034 if read_len > buf.len() {
1035 read_len = buf.len();
1036 }
1037
1038 buf[..read_len].copy_from_slice(&answer_buf[..read_len]);
1039
1040 if read_len == answer_buf.len() {
1041 *answer_data = None;
1042 } else {
1043 let data = &answer_buf[read_len..];
1044
1045 *answer_data = Some(data.to_vec())
1046 }
1047
1048 read_len
1049 },
1050 None => 0
1051 }
1052 } else {
1053 0
1054 }
1055 }
1056
1057 fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
1058 debug!("{} poll read {} bytes", self, buf.len());
1059 let read_len = self.read_first_answer(buf);
1060 if read_len > 0 {
1061 return Poll::Ready(Ok(read_len));
1062 }
1063
1064 let provider = {
1065 let state = &*self.0.state.read().unwrap();
1066 match state {
1067 StreamStateImpl::Initial(_) | StreamStateImpl::Connecting(..) => {
1068 trace!(
1069 "{} poll-write in initial/connecting.",
1070 self,
1071 );
1072 None
1073 },
1074 StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1075 StreamStateImpl::Closing(s, _) => Some(s.provider.clone_as_provider()),
1076 _ => {
1077 return Poll::Ready(Ok(0));
1078 }
1079 }
1080 };
1081
1082 if let Some (provider) = provider {
1083 let read_len = self.read_first_answer(buf);
1084 if read_len > 0 {
1085 return Poll::Ready(Ok(read_len));
1086 }
1087 provider.poll_read(cx, buf)
1088 } else {
1089 let waker = cx.waker().clone();
1090 let stream = self.clone();
1091 task::spawn(async move {
1092 let _ = stream.wait_establish().await;
1093 waker.wake();
1094 });
1095 Poll::Pending
1096 }
1097 }
1098
1099 fn poll_write(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
1100 debug!("{} poll write {} bytes", self, buf.len());
1101 self.poll_write_wait_establish(cx.waker().clone(), |provider| {
1102 provider.poll_write(cx, buf)
1103 })
1104 }
1105
1106 fn poll_flush(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1107 debug!("{} poll flush", self);
1108 self.poll_write_wait_establish(cx.waker().clone(), |provider| {
1109 provider.poll_flush(cx)
1110 })
1111 }
1112
1113 fn poll_close(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1114 debug!("{} poll close", self);
1115 let provider = {
1116 let state = &*self.0.state.read().unwrap();
1117 match state {
1118 StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1119 StreamStateImpl::Initial(_) => {
1120 debug!("poll-close, {} in initial.", self);
1121 None
1122 }
1123 StreamStateImpl::Connecting(..) => {
1124 debug!("poll-close, {} in connecting.", self);
1125 None
1126 }
1127 StreamStateImpl::Closing(s, _) => {
1128 debug!("poll-close, {} in closing.", self);
1129 Some(s.provider.clone_as_provider())
1130 }
1131 StreamStateImpl::Closed => {
1132 debug!("poll-close, {} in closed ready.", self);
1133 return Poll::Ready(Ok(()));
1134 }
1135 }
1136 };
1137
1138 match provider {
1139 Some(provider) => provider.poll_close(cx),
1140 None => {
1141 let _ = self.cancel_connecting_with(&BuckyError::new(
1142 BuckyErrorCode::ConnectionAborted,
1143 "user close",
1144 ));
1145 Poll::Ready(Err(std::io::Error::new(
1146 ErrorKind::ConnectionAborted,
1147 "close by user",
1148 )))
1149 }
1150 }
1151 }
1152
1153 pub fn remote(&self) -> (&DeviceId, u16) {
1154 (&self.0.remote_device, self.0.remote_port)
1155 }
1156
1157 pub fn sequence(&self) -> TempSeq {
1158 self.0.sequence
1159 }
1160
1161 pub fn local_id(&self) -> IncreaseId {
1162 self.0.local_id
1163 }
1164
1165 pub fn local_ep(&self) -> Option<Endpoint> {
1166 let state = &*self.0.state.read().unwrap();
1167 match state {
1168 StreamStateImpl::Establish(s, _) => Some(*s.provider.local_ep()),
1169 _ => None,
1170 }
1171 }
1172
1173 pub fn remote_ep(&self) -> Option<Endpoint> {
1174 let state = &*self.0.state.read().unwrap();
1175 match state {
1176 StreamStateImpl::Establish(s, _) => Some(*s.provider.remote_ep()),
1177 _ => None,
1178 }
1179 }
1180
1181 fn poll_write_wait_establish<R>(
1182 &self,
1183 waker: Waker,
1184 mut proc: impl FnMut(&dyn StreamProvider) -> Poll<std::io::Result<R>>,
1185 ) -> Poll<std::io::Result<R>> {
1186 let provider = {
1187 let state = &*self.0.state.read().unwrap();
1188 match state {
1189 StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1190 StreamStateImpl::Initial(..) | StreamStateImpl::Connecting(..) => {
1191 trace!(
1192 "{} poll-write in initial/connecting.",
1193 self,
1194 );
1195 None
1196 },
1197 _ => {
1198 let msg = format!("{} poll-write in close.", self);
1199 error!("{}", msg);
1200 return Poll::Ready(Err(std::io::Error::new(ErrorKind::NotConnected, msg)));
1201 }
1202 }
1203 };
1204
1205 match provider {
1206 Some(provider) => proc(&*provider),
1207 None => {
1208 let stream = self.clone();
1209 task::spawn(async move {
1210 let _ = stream.wait_establish().await;
1211 waker.wake();
1212 });
1213 Poll::Pending
1214 }
1215 }
1216 }
1217}
1218
1219pub struct StreamReadableFuture {
1220 stream: StreamContainer,
1221}
1222
1223impl Future for StreamReadableFuture {
1224 type Output = std::io::Result<usize>;
1225
1226 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1227 self.stream.poll_readable(cx)
1228 }
1229}
1230
1231
1232impl OnPackage<TcpSynConnection, tcp::AcceptInterface> for StreamContainer {
1233 fn on_package(
1234 &self,
1235 pkg: &TcpSynConnection,
1236 interface: tcp::AcceptInterface,
1237 ) -> BuckyResult<OnPackageResult> {
1238 debug!("{} on package {} from {}", self, pkg, interface);
1239 let state = &*self.0.state.read().unwrap();
1241 let builder = match state {
1242 StreamStateImpl::Connecting(connecting, _) => match connecting {
1243 StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
1244 _ => Err(BuckyError::new(
1245 BuckyErrorCode::ErrorState,
1246 "syn session data on error state",
1247 )),
1248 },
1249 _ => Err(BuckyError::new(
1250 BuckyErrorCode::ErrorState,
1251 "syn session data on error state",
1252 )),
1253 }?;
1254 builder.on_package(pkg, interface)
1255 }
1256}
1257
1258impl OnPackage<SessionData> for StreamContainer {
1259 fn on_package(&self, pkg: &SessionData, _: Option<()>) -> BuckyResult<OnPackageResult> {
1260 if pkg.is_syn() {
1261 debug!("{} on package {}", self, pkg);
1262 let state = &*self.0.state.read().unwrap();
1264 let builder = match state {
1265 StreamStateImpl::Connecting(connecting, _) => match connecting {
1266 StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
1267 _ => Err(BuckyError::new(
1268 BuckyErrorCode::ErrorState,
1269 "syn session data on error state",
1270 )),
1271 },
1272 _ => Err(BuckyError::new(
1273 BuckyErrorCode::ErrorState,
1274 "syn session data on error state",
1275 )),
1276 }?;
1277 builder.on_package(pkg, None)
1278 } else if pkg.is_syn_ack() {
1279 debug!("{} on package {}", self, pkg);
1280 let handler: Box<dyn OnPackage<SessionData>> = match &*self.0.state.read().unwrap() {
1282 StreamStateImpl::Connecting(connecting, _) => match connecting {
1283 StreamConnectingState::Connect(connector) => match &connector.state {
1284 connector::State::Package(package_provider) => {
1285 Ok(Box::new(package_provider.as_ref().clone())
1286 as Box<dyn OnPackage<SessionData>>)
1287 }
1288 connector::State::Builder(builder_provider) => {
1289 Ok(Box::new(builder_provider.as_ref().clone())
1290 as Box<dyn OnPackage<SessionData>>)
1291 }
1292 _ => unreachable!(),
1293 },
1294 _ => Err(BuckyError::new(
1295 BuckyErrorCode::ErrorState,
1296 "syn ack session data on error state",
1297 )),
1298 },
1299 StreamStateImpl::Establish(state, _) => {
1300 match state.provider.clone_as_package_handler() {
1301 Some(h) => Ok(h),
1302 None => Err(BuckyError::new(
1303 BuckyErrorCode::InternalError,
1304 "clone handler failed",
1305 )),
1306 }
1307 }
1308 _ => Err(BuckyError::new(
1309 BuckyErrorCode::ErrorState,
1310 "syn session data on error state",
1311 )),
1312 }?;
1313 handler.on_package(pkg, None)
1314 } else {
1315 trace!("{} on package {}", self, pkg);
1316 let opt_handler: Option<Box<dyn OnPackage<SessionData>>> =
1318 match &*self.0.state.read().unwrap() {
1319 StreamStateImpl::Establish(state, _) => state.provider.clone_as_package_handler(),
1320 StreamStateImpl::Connecting(connecting, _) => match connecting {
1321 StreamConnectingState::Connect(_) => None,
1322 StreamConnectingState::Accept(acceptor) => {
1323 Some(Box::new(acceptor.builder.clone()))
1324 }
1325 },
1326 StreamStateImpl::Closing(state, _) => {
1327 state.provider.clone_as_package_handler()
1328 }
1330 _ => None,
1331 };
1332 match opt_handler {
1333 Some(provider) => provider.on_package(pkg, None),
1334 None => Ok(OnPackageResult::Handled),
1335 }
1336 }
1337 }
1338}
1339
1340impl OnPackage<TcpSynConnection> for StreamContainer {
1341 fn on_package(
1342 &self,
1343 pkg: &TcpSynConnection,
1344 _: Option<()>,
1345 ) -> BuckyResult<OnPackageResult> {
1346 debug!("{} on package {}", self, pkg);
1347 assert_eq!(pkg.reverse_endpoint.is_some(), true);
1348 let state = &*self.0.state.read().unwrap();
1350 let builder = match state {
1351 StreamStateImpl::Connecting(connecting, _) => match connecting {
1352 StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
1353 _ => Err(BuckyError::new(
1354 BuckyErrorCode::ErrorState,
1355 "tcp syn connection on error state",
1356 )),
1357 },
1358 _ => Err(BuckyError::new(
1359 BuckyErrorCode::ErrorState,
1360 "tcp syn connection on error state",
1361 )),
1362 }?;
1363 builder.on_package(pkg, None)
1364 }
1365}
1366
1367impl OnPackage<TcpAckConnection, tcp::AcceptInterface> for StreamContainer {
1368 fn on_package(
1369 &self,
1370 pkg: &TcpAckConnection,
1371 interface: tcp::AcceptInterface,
1372 ) -> BuckyResult<OnPackageResult> {
1373 debug!("{} on package {} from {}", self, pkg, interface);
1374 let opt_handler = {
1375 let state = &*self.0.state.read().unwrap();
1376 match state {
1377 StreamStateImpl::Connecting(connecting, _) => {
1378 match connecting {
1379 StreamConnectingState::Connect(connector) => {
1380 match &connector.state {
1381 connector::State::Builder(builder) => {
1382 Some(Box::new(builder.as_ref().clone())
1383 as Box<
1384 dyn OnPackage<TcpAckConnection, tcp::AcceptInterface>,
1385 >)
1386 }
1387 connector::State::Reverse(reverse) => {
1388 if reverse.action.local().eq(interface.local())
1389 {
1391 Some(Box::new(reverse.action.clone())
1392 as Box<
1393 dyn OnPackage<
1394 TcpAckConnection,
1395 tcp::AcceptInterface,
1396 >,
1397 >)
1398 } else {
1399 debug!(
1400 "{} ignore incoming stream {} for local is {}",
1401 self,
1402 interface,
1403 reverse.action.local()
1404 );
1405 None
1406 }
1407 }
1408 _ => None,
1409 }
1410 }
1411 _ => None,
1412 }
1413 }
1414 _ => None,
1415 }
1416 };
1417 opt_handler
1418 .ok_or_else(|| {
1419 BuckyError::new(
1420 BuckyErrorCode::ErrorState,
1421 "tcp ack connection on error state",
1422 )
1423 })
1424 .and_then(|handler| handler.on_package(pkg, interface.clone()))
1425 .map_err(|err| {
1426 let stream = self.clone();
1427 task::spawn(async move {
1428 let ack_ack_stream = stream.ack_ack_tcp_stream(TCP_ACK_CONNECTION_RESULT_REFUSED);
1429 let _ = match interface
1430 .confirm_accept(vec![DynamicPackage::from(ack_ack_stream)])
1431 .await
1432 {
1433 Ok(_) => {
1434 debug!(
1435 "{} confirm {} with refuse tcp connection ",
1436 stream,
1437 interface
1438 );
1439 }
1440 Err(e) => {
1441 warn!(
1442 "{} confirm {} with tcp ack ack connection failed for {}",
1443 stream,
1444 interface,
1445 e
1446 );
1447 let tunnel = stream.tunnel()
1448 .ok_or_else(|| BuckyError::new(BuckyErrorCode::ErrorState, "stream's closed"))
1449 .and_then(|tunnel| tunnel.create_tunnel::<tunnel::tcp::Tunnel>(
1450 EndpointPair::from((
1451 *interface.local(),
1452 Endpoint::default_tcp(interface.local()),
1453 )),
1454 ProxyType::None,
1455 ));
1456 if let Ok((tunnel, _)) = tunnel {
1457 tunnel.mark_dead(tunnel.state());
1458 }
1459 }
1460 };
1461 });
1462 err
1463 })
1464 }
1465}
1466
1467struct StreamGuardImpl(StreamContainer);
1468
1469impl Drop for StreamGuardImpl {
1470 fn drop(&mut self) {
1471 debug!("{} droped and will closed", self.0);
1472
1473 let _ = self.0.shutdown(Shutdown::Both);
1474 }
1475}
1476
1477#[derive(Clone)]
1478pub struct StreamGuard(Arc<StreamGuardImpl>);
1479
1480impl fmt::Display for StreamGuard {
1481 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1482 write!(f, "StreamGuard {{stream:{}}}", (*self.0).0)
1483 }
1484}
1485
1486impl Deref for StreamGuard {
1487 type Target = StreamContainer;
1488 fn deref(&self) -> &StreamContainer {
1489 &(*self.0).0
1490 }
1491}
1492
1493impl From<StreamContainer> for StreamGuard {
1494 fn from(stream: StreamContainer) -> Self {
1495 Self(Arc::new(StreamGuardImpl(stream)))
1496 }
1497}
1498
1499impl StreamGuard {
1500 pub fn display_ref_count(&self) {
1501 info!(
1502 "bdt stream ref counts: seq={:?}, impl=({}, {}), container=({},{})",
1503 self.sequence(),
1504 Arc::strong_count(&self.0),
1505 Arc::weak_count(&self.0),
1506 Arc::strong_count(&self.0 .0 .0),
1507 Arc::weak_count(&self.0 .0 .0)
1508 );
1509 }
1510}
1511
1512impl Read for StreamGuard {
1513 fn poll_read(
1514 self: Pin<&mut Self>,
1515 cx: &mut Context<'_>,
1516 buf: &mut [u8],
1517 ) -> Poll<std::io::Result<usize>> {
1518 Pin::new(&mut &*self).poll_read(cx, buf)
1519 }
1520
1521 fn poll_read_vectored(
1522 self: Pin<&mut Self>,
1523 cx: &mut Context<'_>,
1524 bufs: &mut [std::io::IoSliceMut<'_>],
1525 ) -> Poll<std::io::Result<usize>> {
1526 Pin::new(&mut &*self).poll_read_vectored(cx, bufs)
1527 }
1528}
1529
1530impl Read for &StreamGuard {
1531 fn poll_read(
1532 self: Pin<&mut Self>,
1533 cx: &mut Context<'_>,
1534 buf: &mut [u8],
1535 ) -> Poll<std::io::Result<usize>> {
1536 let guard_impl = self.0.clone();
1537 let container = &guard_impl.0;
1538 container.poll_read(cx, buf)
1539 }
1540}
1541
1542impl Write for StreamGuard {
1543 fn poll_write(
1544 self: Pin<&mut Self>,
1545 cx: &mut Context<'_>,
1546 buf: &[u8],
1547 ) -> Poll<std::io::Result<usize>> {
1548 Pin::new(&mut &*self).poll_write(cx, buf).map_err(|e| {
1549 error!("stream guard poll_write error: {}", e);
1550 e
1551 })
1552 }
1553
1554 fn poll_write_vectored(
1555 self: Pin<&mut Self>,
1556 cx: &mut Context<'_>,
1557 bufs: &[std::io::IoSlice<'_>],
1558 ) -> Poll<std::io::Result<usize>> {
1559 Pin::new(&mut &*self)
1560 .poll_write_vectored(cx, bufs)
1561 .map_err(|e| {
1562 error!("{} poll_write_vectored error: {}", (*self), e);
1563 e
1564 })
1565 }
1566
1567 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1568 Pin::new(&mut &*self).poll_flush(cx).map_err(|e| {
1569 error!("{} poll_flush error: {}", (*self), e);
1570 e
1571 })
1572 }
1573
1574 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1575 Pin::new(&mut &*self).poll_close(cx).map_err(|e| {
1576 error!("{} poll_close error: {}", (*self), e);
1577 e
1578 })
1579 }
1580}
1581
1582impl Write for &StreamGuard {
1583 fn poll_write(
1584 self: Pin<&mut Self>,
1585 cx: &mut Context<'_>,
1586 buf: &[u8],
1587 ) -> Poll<std::io::Result<usize>> {
1588 let guard_impl = self.0.clone();
1589 let container = &guard_impl.0;
1590 container.poll_write(cx, buf).map_err(|e| {
1591 error!("{} poll_write error: {}", (*self), e);
1592 e
1593 })
1594 }
1595
1596 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1597 let guard_impl = self.0.clone();
1598 let container = &guard_impl.0;
1599 container.poll_flush(cx).map_err(|e| {
1600 error!("{} poll_flush error: {}", (*self), e);
1601 e
1602 })
1603 }
1604
1605 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1606 let guard_impl = self.0.clone();
1607 let container = &guard_impl.0;
1608 container.poll_close(cx).map_err(|e| {
1609 error!("{} poll_close error: {}", (*self), e);
1610 e
1611 })
1612 }
1613}