cyfs_bdt/stream/
container.rs

1mod dep {
2    pub use super::super::{
3        package::PackageStream, stream_provider::StreamProvider, tcp::TcpStream,
4    };
5    pub use crate::{
6        types::*, 
7        interface::*,
8        protocol::{*, v0::*},
9        stack::{Stack, WeakStack},
10        tunnel::{
11            self, AcceptReverseTcpStream, AcceptStreamBuilder, BuildTunnelAction,
12            BuildTunnelParams, ConnectPackageStream, ConnectStreamAction, ConnectStreamBuilder,
13            ConnectStreamState, ConnectTcpStream, ProxyType, StreamConnectorSelector, Tunnel,
14            TunnelBuilder, TunnelContainer, TunnelGuard, TunnelState,
15        },
16        types::*,
17    };
18    pub use async_std::{future, sync::Arc, task};
19    pub use cyfs_base::*;
20    pub use futures::future::{AbortHandle, Abortable, Aborted};
21    pub use std::{fmt, future::Future, net::Shutdown, ops::Deref, sync::RwLock, time::Duration};
22}
23
24const ANSWER_MAX_LEN: usize = 1024*25;
25
26mod connector {
27    use super::dep::*;
28    use super::StreamContainer;
29
30    pub struct Connector {
31        pub question: Vec<u8>,
32        pub waiter: StateWaiter,
33        pub state: State,
34        pub start_at: Timestamp,
35    }
36
37    pub enum State {
38        Unknown,
39        Tcp(TcpProvider),
40        Reverse(ReverseProvider),
41        Package(PackageProvider),
42        Builder(BuilderProvider),
43    }
44
45    impl State {
46        pub fn remote_timestamp(&self) -> Option<Timestamp> {
47            match self {
48                Self::Unknown => None,
49                Self::Tcp(tcp) => Some(tcp.1),
50                Self::Reverse(tcp) => Some(tcp.remote_timestamp),
51                Self::Package(package) => Some(package.1),
52                Self::Builder(_) => None,
53            }
54        }
55        pub fn from(
56            params: (&StreamContainer, StreamConnectorSelector),
57        ) -> (Self, Box<dyn Provider>) {
58            match params.1 {
59                StreamConnectorSelector::Package(remote_timestamp) => {
60                    let provider = PackageProvider::new(params.0, remote_timestamp);
61                    (State::Package(provider.clone()), Box::new(provider))
62                }
63                StreamConnectorSelector::Tcp(tunnel, remote_timestamp) => {
64                    if !tunnel.is_reverse() {
65                        let provider = TcpProvider(tunnel, remote_timestamp);
66                        (State::Tcp(provider.clone()), Box::new(provider))
67                    } else {
68                        let provider = ReverseProvider::new(params.0, tunnel, remote_timestamp);
69                        (State::Reverse(provider.clone()), Box::new(provider))
70                    }
71                }
72                StreamConnectorSelector::Builder(builder) => {
73                    let provider = BuilderProvider(builder);
74                    (State::Builder(provider.clone()), Box::new(provider))
75                }
76            }
77        }
78    }
79
80    pub trait Provider {
81        fn begin_connect(&self, stream: &StreamContainer);
82    }
83
84    #[derive(Clone)]
85    pub struct TcpProvider(tunnel::tcp::Tunnel, Timestamp);
86
87    impl fmt::Display for TcpProvider {
88        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89            write!(
90                f,
91                "TcpProvider:{{tunnel:{}, remote_timestamp:{}}}",
92                self.0, self.1
93            )
94        }
95    }
96
97    impl Provider for TcpProvider {
98        fn begin_connect(&self, stream: &StreamContainer) {
99            debug!("{} connect with {}", stream, self);
100            let action = ConnectTcpStream::new(
101                stream.0.stack.clone(),
102                stream.clone(),
103                self.0.clone(),
104            );
105            let stream = stream.clone();
106            task::spawn(async move {
107                match action.wait_pre_establish().await {
108                    ConnectStreamState::PreEstablish => match action.continue_connect().await {
109                        Ok(selector) => {
110                            let _ = stream.establish_with(selector).await;
111                        }
112                        Err(err) => {
113                            let _ = stream.cancel_connecting_with(&err);
114                        }
115                    },
116                    _ => {
117                        let _ = stream.cancel_connecting_with(&BuckyError::new(
118                            BuckyErrorCode::ErrorState,
119                            "action not pre establish",
120                        ));
121                    }
122                }
123            });
124        }
125    }
126
127    #[derive(Clone)]
128    pub struct PackageProvider(ConnectPackageStream, Timestamp);
129
130    impl PackageProvider {
131        fn new(stream: &StreamContainer, remote_timestamp: Timestamp) -> Self {
132            Self(ConnectPackageStream::new(stream.clone()), remote_timestamp)
133        }
134    }
135
136    impl fmt::Display for PackageProvider {
137        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138            write!(f, "PackageProvider:{{remote_timestamp:{}}}", self.1)
139        }
140    }
141
142    impl Provider for PackageProvider {
143        fn begin_connect(&self, stream: &StreamContainer) {
144            debug!("{} connect with {}", stream, self);
145            let action = self.0.clone();
146            let stream = stream.clone();
147            task::spawn(async move {
148                match action.wait_pre_establish().await {
149                    ConnectStreamState::PreEstablish => match action.continue_connect().await {
150                        Ok(selector) => {
151                            let _ = stream.establish_with(selector).await;
152                        }
153                        Err(err) => {
154                            let _ = stream.cancel_connecting_with(&err);
155                        }
156                    },
157                    _ => {
158                        let _ = stream.cancel_connecting_with(&BuckyError::new(
159                            BuckyErrorCode::ErrorState,
160                            "action not pre establish",
161                        ));
162                    }
163                }
164            });
165
166            self.0.begin();
167        }
168    }
169
170    impl AsRef<ConnectPackageStream> for PackageProvider {
171        fn as_ref(&self) -> &ConnectPackageStream {
172            &self.0
173        }
174    }
175
176    #[derive(Clone)]
177    pub struct ReverseProvider {
178        pub remote_timestamp: Timestamp,
179        pub local: Endpoint,
180        pub action: AcceptReverseTcpStream,
181    }
182
183    impl fmt::Display for ReverseProvider {
184        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185            write!(
186                f,
187                "ReverseProvider:{{local:{}, remote_timestamp:{}}}",
188                self.local, self.remote_timestamp
189            )
190        }
191    }
192
193    impl ReverseProvider {
194        fn new(
195            stream: &StreamContainer,
196            tunnel: tunnel::tcp::Tunnel,
197            remote_timestamp: Timestamp,
198        ) -> Self {
199            Self {
200                remote_timestamp,
201                local: *tunnel.local(),
202                action: AcceptReverseTcpStream::new(
203                    stream.clone(),
204                    *tunnel.local(),
205                    *tunnel.remote(),
206                ),
207            }
208        }
209    }
210
211    impl Provider for ReverseProvider {
212        fn begin_connect(&self, stream: &StreamContainer) {
213            debug!("{} connect with {}", stream, self);
214            let provider = self.clone();
215            let stream = stream.clone();
216            let mut syn_tcp = stream.syn_tcp_stream().unwrap();
217
218            let stack = stream.stack();
219            let listener = stack.net_manager().listener();
220            let mut endpoints = vec![];
221            for t in listener.tcp() {
222                let outer = t.outer();
223                if outer.is_some() {
224                    let outer = outer.unwrap();
225                    if outer == self.local || t.local() == self.local {
226                        endpoints.push(outer);
227                    }
228                } else {
229                    endpoints.push(self.local);
230                }
231            }
232            syn_tcp.reverse_endpoint = Some(endpoints);
233
234            task::spawn(async move {
235                loop {
236                    match future::timeout(
237                        stream.stack().config().stream.stream.package.connect_resend_interval,
238                        provider.action.wait_pre_establish(),
239                    )
240                    .await
241                    {
242                        Err(_) => {
243                            if let Some(tunnel) = stream.tunnel() {
244                                let _ = tunnel.send_packages(vec![DynamicPackage::from(syn_tcp.clone())]);
245                            }
246                        }
247                        Ok(state) => {
248                            match state {
249                                ConnectStreamState::PreEstablish => {
250                                    match provider.action.continue_connect().await {
251                                        Ok(selector) => {
252                                            let _ = stream.establish_with(selector).await;
253                                        }
254                                        Err(err) => {
255                                            let _ = stream.cancel_connecting_with(&err);
256                                        }
257                                    }
258                                }
259                                _ => {
260                                    let _ =
261                                        stream.cancel_connecting_with(&BuckyError::new(
262                                            BuckyErrorCode::ErrorState,
263                                            "action not pre establish",
264                                        ));
265                                }
266                            };
267                            break;
268                        }
269                    }
270                }
271            });
272        }
273    }
274
275    #[derive(Clone)]
276    pub struct BuilderProvider(ConnectStreamBuilder);
277
278    impl Provider for BuilderProvider {
279        fn begin_connect(&self, _stream: &StreamContainer) {
280            let builder = self.0.clone();
281            task::spawn(async move {
282                builder.build().await;
283            });
284        }
285    }
286
287    impl AsRef<ConnectStreamBuilder> for BuilderProvider {
288        fn as_ref(&self) -> &ConnectStreamBuilder {
289            &self.0
290        }
291    }
292}
293
294mod acceptor {
295    use super::dep::*;
296
297    pub struct Acceptor {
298        pub remote_id: IncreaseId,
299        pub waiter: StateWaiter,
300        pub builder: AcceptStreamBuilder,
301    }
302}
303
304use async_std::io::prelude::{Read, Write};
305pub use dep::Shutdown;
306use dep::*;
307use futures::io::ErrorKind;
308use log::*;
309use std::pin::Pin;
310use std::task::{Context, Poll, Waker};
311
312enum StreamConnectingState {
313    Connect(connector::Connector),
314    Accept(acceptor::Acceptor),
315}
316
317struct StreamEstablishState {
318    start_at: Timestamp,
319    remote_timestamp: Timestamp,
320    provider: Box<dyn StreamProvider>,
321}
322
323#[derive(Clone)]
324pub struct Config {
325    pub retry_sn_timeout: Duration, 
326    pub connect_timeout: Duration,
327    pub nagle: Duration,
328    pub recv_timeout: Duration,
329    pub recv_buffer: usize,
330    pub send_buffer: usize,
331    pub drain: f32,
332    pub tcp: super::tcp::Config,
333    pub package: super::package::Config,
334}
335
336impl Config {
337    pub fn recv_drain(&self) -> usize {
338        (self.recv_buffer as f32 * self.drain) as usize
339    }
340
341    pub fn send_drain(&self) -> usize {
342        (self.send_buffer as f32 * self.drain) as usize
343    }
344}
345
346#[derive(Eq, PartialEq, Debug)]
347pub enum StreamState {
348    Connecting,
349    Establish(Timestamp),
350    Closing,
351    Closed,
352}
353impl fmt::Display for StreamState {
354    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355        match self {
356            StreamState::Connecting => write!(f, "StreamState::Connecting"),
357            StreamState::Establish(remote_timestamp) => {
358                write!(f, "StreamState::Establish({})", remote_timestamp)
359            }
360            StreamState::Closing => write!(f, "StreamState::Closing"),
361            StreamState::Closed => write!(f, "StreamState::Closed"),
362        }
363    }
364}
365
366enum StreamStateImpl {
367    Initial(TunnelGuard),
368    Connecting(StreamConnectingState, TunnelGuard),
369    Establish(StreamEstablishState, TunnelGuard),
370    Closing(StreamEstablishState, TunnelGuard),
371    Closed,
372}
373
374struct PackageStreamProviderSelector {}
375pub enum StreamProviderSelector {
376    Package(IncreaseId /*remote id*/, Option<SessionData>),
377    Tcp(async_std::net::TcpStream, MixAesKey, Option<TcpAckConnection>),
378}
379
380struct StreamContainerImpl {
381    stack: WeakStack, 
382    remote_device: DeviceId,
383    remote_port: u16,
384    local_id: IncreaseId,
385    sequence: TempSeq,
386    state: RwLock<StreamStateImpl>,
387    answer_data: RwLock<Option<Vec<u8>>>,
388}
389
390#[derive(Clone)]
391pub struct StreamContainer(Arc<StreamContainerImpl>);
392
393
394impl fmt::Display for StreamContainer {
395    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396        write!(
397            f,
398            "StreamContainer {{sequence:{:?}, local:{}, remote:{}, port:{}, id:{} }}",
399            self.sequence(),
400            self.stack().local_device_id(),
401            self.remote().0,
402            self.remote().1,
403            self.local_id()
404        )
405    }
406}
407
408impl StreamContainer {
409    pub(super) fn new(
410        weak_stack: WeakStack,
411        tunnel: TunnelGuard,
412        remote_port: u16,
413        local_id: IncreaseId,
414        sequence: TempSeq,
415    ) -> StreamContainer {
416        let stream_impl = StreamContainerImpl {
417            stack: weak_stack, 
418            remote_device: tunnel.remote().clone(), 
419            remote_port,
420            local_id,
421            sequence: sequence,
422            state: RwLock::new(StreamStateImpl::Initial(tunnel)),
423            answer_data: RwLock::new(None)
424        };
425
426        StreamContainer(Arc::new(stream_impl))
427    }
428
429    //收到ack以后继续连接, 可以完成时在builder里面调用 establish_with
430    pub(super) fn accept(&self, remote_id: IncreaseId) {
431        let state = &mut *self.0.state.write().unwrap();
432        if let StreamStateImpl::Initial(tunnel) = &*state {
433            info!("{} initial=>accepting remote_id {}", self, remote_id);
434            *state =
435                StreamStateImpl::Connecting(StreamConnectingState::Accept(acceptor::Acceptor {
436                    remote_id,
437                    waiter: StateWaiter::new(),
438                    builder: AcceptStreamBuilder::new(self.0.stack.clone(), self.clone(), tunnel.as_ref().clone()),
439                }), tunnel.clone());
440            let stream = self.clone();
441            task::spawn(async move {
442                match future::timeout(
443                    stream
444                        .stack()
445                        .config()
446                        .stream
447                        .stream
448                        .connect_timeout,
449                    stream.wait_establish(),
450                )
451                .await
452                {
453                    Ok(r) => r,
454                    Err(err) => {
455                        let err = err.into();
456                        match stream.cancel_connecting_with(&err) {
457                            Ok(_) => Err(err),
458                            Err(_) => Ok(()),
459                        }
460                    }
461                }
462            });
463        } else {
464            unreachable!("not initial")
465        }
466    }
467
468    // 开始发起连接,连接完成或者失败时返回
469    pub(super) async fn connect(
470        &self,
471        question: Vec<u8>,
472        build_params: BuildTunnelParams,
473    ) -> BuckyResult<()> {
474        // initial connector
475        let connector = connector::Connector {
476            question: Vec::from(question),
477            waiter: StateWaiter::new(),
478            state: connector::State::Unknown,
479            start_at: bucky_time_now(),
480        };
481
482        // enter connecting connector unknown
483        let tunnel = {
484            let mut state = self.0.state.write().unwrap();
485            if let StreamStateImpl::Initial(tunnel) = &*state {
486                info!("{} initial=>connecting", self);
487                let tunnel = tunnel.clone();
488                *state = StreamStateImpl::Connecting(StreamConnectingState::Connect(connector), tunnel.clone());
489                tunnel
490            } else {
491                unreachable!("not initial")
492            }
493        };
494
495        // 从tunnel container返回要用的connector provider
496        let _ = match tunnel.select_stream_connector(build_params, self.clone()).await
497        {
498            Ok(selector) => {
499                let (connector_state, connector_provider) =
500                    connector::State::from((self, selector));
501                // enter connecting connector provider
502                {
503                    let state = &mut *self.0.state.write().unwrap();
504                    if let StreamStateImpl::Connecting(ref mut connecting, _) = state {
505                        if let StreamConnectingState::Connect(ref mut connector) = connecting {
506                            if let connector::State::Unknown = connector.state {
507                                connector.state = connector_state;
508                                connector.start_at = bucky_time_now();
509                            } else {
510                                unreachable!()
511                            }
512                        } else {
513                            unreachable!()
514                        }
515                    } else {
516                        unreachable!()
517                    }
518                }
519                connector_provider.begin_connect(self);
520                Ok(())
521            }
522            Err(err) => Err(err),
523        }?;
524
525        match future::timeout(
526            self.stack().config().stream.stream.connect_timeout,
527            self.wait_establish(),
528        )
529        .await
530        {
531            Ok(r) => r,
532            Err(err) => {
533                let err = err.into();
534                match self.cancel_connecting_with(&err) {
535                    Ok(_) => Err(err),
536                    Err(_) => Ok(()),
537                }
538            }
539        }
540    }
541
542    //以selector指定的方式联通
543    pub(crate) async fn establish_with(
544        &self,
545        selector: StreamProviderSelector
546    ) -> BuckyResult<()> {
547        let tunnel = self.tunnel()
548            .ok_or_else(|| BuckyError::new(
549                BuckyErrorCode::ErrorState,
550                "tunnel not active",
551            )).map_err(|e| {
552                error!("{} try establish failed for {}", self, e);
553                e
554            })?;
555        let remote_timestamp = match tunnel.wait_active().await {
556            TunnelState::Active(remote_timetamp) => Ok(remote_timetamp),
557            _ => Err(BuckyError::new(
558                BuckyErrorCode::ErrorState,
559                "tunnel not active",
560            )),
561        }
562        .map_err(|e| {
563            error!("{} try establish failed for {}", self, e);
564            e
565        })?;
566
567        let (provider, provider_stub, answer_data) = match selector {
568            StreamProviderSelector::Package(remote_id, ack) => {
569                let answer_data = match ack {
570                    Some(session_data) => {
571                        if session_data.payload.as_ref().len() > 0 {
572                            let mut answer = vec![0; session_data.payload.as_ref().len()];
573                            answer.copy_from_slice(session_data.payload.as_ref());
574                            answer
575                        } else {
576                            vec![]
577                        }
578                    },
579                    _ => vec![],
580                };
581
582                let stream = PackageStream::new(self, tunnel.as_ref(), self.local_id().clone(), remote_id)?;
583                (
584                    Box::new(stream.clone()) as Box<dyn StreamProvider>,
585                    Box::new(stream) as Box<dyn StreamProvider>,
586                    answer_data,
587                )
588            }
589            StreamProviderSelector::Tcp(socket, key, ack) => {
590                let answer_data = match ack {
591                    Some(tcp_ack_connection) => {
592                        if tcp_ack_connection.payload.as_ref().len() > 0 {
593                            let mut answer = vec![0; tcp_ack_connection.payload.as_ref().len()];
594                            answer.copy_from_slice(tcp_ack_connection.payload.as_ref());
595                            answer
596                        } else {
597                            vec![]
598                        }
599                    },
600                    _ => vec![],
601                };
602
603                let stream = TcpStream::new(self.clone(), socket, key.enc_key)?;
604                (
605                    Box::new(stream.clone()) as Box<dyn StreamProvider>,
606                    Box::new(stream) as Box<dyn StreamProvider>,
607                    answer_data,
608                )
609            }
610        };
611
612        let state = &mut *self.0.state.write().unwrap();
613        let waiter = match state {
614            StreamStateImpl::Connecting(ref mut connecting_state, tunnel) => match connecting_state {
615                StreamConnectingState::Accept(ref mut acceptor) => {
616                    let waiter = acceptor.waiter.transfer();
617                    info!("{} accepting=>establish with provider {}", self, provider);
618                    *state = StreamStateImpl::Establish(StreamEstablishState {
619                        start_at: bucky_time_now(),
620                        remote_timestamp,
621                        provider: provider_stub,
622                    }, tunnel.clone());
623                    Ok(waiter)
624                }
625                StreamConnectingState::Connect(ref mut connector) => {
626                    let waiter = connector.waiter.transfer();
627                    info!("{} connecting=>establish with {}", self, provider);
628                    *state = StreamStateImpl::Establish(StreamEstablishState {
629                        start_at: bucky_time_now(),
630                        remote_timestamp,
631                        provider: provider_stub,
632                    }, tunnel.clone());
633
634                    if answer_data.len() > 0 {
635                        let data = &mut *self.0.answer_data.write().unwrap();
636                        *data = Some(answer_data);
637                    }
638
639                    Ok(waiter)
640                }
641            },
642            _ => Err(BuckyError::new(
643                BuckyErrorCode::ErrorState,
644                "stream not connecting",
645            )),
646        }
647        .map_err(|e| {
648            error!("{} try establish failed for {}", self, e);
649            e
650        })?;
651        // 开始stream provider的收发
652        provider.start(self);
653        // 唤醒等待 establish 的waiter
654        waiter.wake();
655        Ok(())
656    }
657
658    pub(crate) fn cancel_connecting_with(&self, err: &BuckyError) -> BuckyResult<()> {
659        warn!("{} cancel connecting with error: {}", self, err);
660        let state = &mut *self.0.state.write().unwrap();
661        let (waiter, state_dump) = match state {
662            StreamStateImpl::Connecting(ref mut connecting_state, tunnel) => match connecting_state {
663                StreamConnectingState::Accept(ref mut acceptor) => {
664                    let waiter = acceptor.waiter.transfer();
665                    info!("{} accepting=>closed", self);
666                    *state = StreamStateImpl::Closed;
667                    Ok((waiter, None))
668                }
669                StreamConnectingState::Connect(ref mut connector) => {
670                    let waiter = connector.waiter.transfer();
671                    let tunnel = tunnel.clone();
672                    info!("{} connecting=>closed", self);
673                    let state_dump = connector
674                        .state
675                        .remote_timestamp()
676                        .map(|r| (tunnel, r, connector.start_at));
677                    *state = StreamStateImpl::Closed;
678                    Ok((waiter, state_dump))
679                }
680            },
681            _ => Err(BuckyError::new(
682                BuckyErrorCode::ErrorState,
683                "stream not connecting",
684            )),
685        }?;
686        if let Some((tunnel, remote_timestamp, start_at)) = state_dump {
687            error!("{} mark tunnel dead", self);
688            let _ = tunnel.mark_dead(remote_timestamp, start_at);
689        }
690        // 唤醒等待 establish 的waiter
691        waiter.wake();
692        Ok(())
693    }
694
695    pub(crate) async fn wait_establish(&self) -> BuckyResult<()> {
696        let waiter = {
697            let state = &mut *self.0.state.write().unwrap();
698            match state {
699                StreamStateImpl::Connecting(ref mut connecting, _) => {
700                    let waiter = match connecting {
701                        StreamConnectingState::Accept(ref mut acceptor) => {
702                            acceptor.waiter.new_waiter()
703                        }
704                        StreamConnectingState::Connect(ref mut connector) => {
705                            connector.waiter.new_waiter()
706                        }
707                    };
708                    Ok(Some(waiter))
709                }
710                StreamStateImpl::Establish(..) => Ok(None),
711                _ => {
712                    warn!("{} wait establish failed, neither StreamStateImpl::Connecting nor StreamStateImpl::Establish", self);
713                    Err(BuckyError::new(
714                        BuckyErrorCode::ErrorState,
715                        "stream not established",
716                    ))
717                }
718            }
719        }?;
720
721        if let Some(waiter) = waiter {
722            match StateWaiter::wait(waiter, || self.state()).await {
723                StreamState::Establish(_) => Ok(()),
724                _ => {
725                    error!(
726                        "{} wait establish failed, for stream state not establish,state={}",
727                        self,
728                        self.state()
729                    );
730                    Err(BuckyError::new(
731                        BuckyErrorCode::ErrorState,
732                        "stream not established",
733                    ))
734                }
735            }
736        } else {
737            Ok(())
738        }
739    }
740
741    pub(crate) fn syn_session_data(&self) -> Option<SessionData> {
742        {
743            match &*self.0.state.read().unwrap() {
744                StreamStateImpl::Connecting(connecting, _) => match connecting {
745                    StreamConnectingState::Connect(connector) => Some(connector.question.clone()),
746                    _ => {
747                        unreachable!()
748                    }
749                },
750                _ => None,
751            }
752        }
753        .map(|question| {
754            let mut session = SessionData::new();
755            session.stream_pos = 0;
756            session.syn_info = Some(SessionSynInfo {
757                sequence: self.sequence(),
758                from_session_id: self.local_id().clone(),
759                to_vport: self.remote().1,
760            });
761            session.session_id = self.local_id().clone();
762            session.send_time = bucky_time_now();
763            session.flags_add(SESSIONDATA_FLAG_SYN);
764            session.payload = TailedOwnedData::from(question);
765            session
766        })
767    }
768
769    pub(crate) fn syn_ack_session_data(&self, answer: &[u8]) -> Option<SessionData> {
770        {
771            match &*self.0.state.read().unwrap() {
772                StreamStateImpl::Connecting(connecting, _) => match connecting {
773                    StreamConnectingState::Accept(acceptor) => Some(acceptor.remote_id.clone()),
774                    _ => {
775                        unreachable!()
776                    }
777                },
778                _ => None,
779            }
780        }
781        .map(|remote_id| {
782            let mut session = SessionData::new();
783            session.stream_pos = 0;
784            session.syn_info = Some(SessionSynInfo {
785                sequence: self.sequence(),
786                from_session_id: self.local_id().clone(),
787                to_vport: 0,
788            });
789            session.ack_stream_pos = 0;
790            session.send_time = bucky_time_now();
791            session.flags_add(SESSIONDATA_FLAG_SYN | SESSIONDATA_FLAG_ACK);
792            session.to_session_id = Some(remote_id.clone());
793            session.session_id = remote_id;
794            let mut payload = vec![0u8; answer.len()];
795            payload.copy_from_slice(answer);
796            session.payload = TailedOwnedData::from(payload);
797            session
798        })
799    }
800
801    pub(crate) fn syn_tcp_stream(&self) -> Option<TcpSynConnection> {
802        {
803            match &*self.0.state.read().unwrap() {
804                StreamStateImpl::Connecting(connecting, _) => match connecting {
805                    StreamConnectingState::Connect(connector) => Some(connector.question.clone()),
806                    _ => {
807                        unreachable!()
808                    }
809                },
810                _ => None,
811            }
812        }
813        .map(|question| {
814            let local_device = self.stack().sn_client().ping().default_local();
815            TcpSynConnection {
816                sequence: self.sequence(),
817                result: 0u8,
818                to_vport: self.remote().1,
819                from_session_id: self.local_id(),
820                from_device_desc: local_device,
821                to_device_id: self.remote().0.clone(),
822                reverse_endpoint: None,
823                payload: TailedOwnedData::from(question),
824            }
825        })
826    }
827
828    pub(crate) fn ack_tcp_stream(&self, answer: &[u8]) -> Option<TcpAckConnection> {
829        {
830            match &*self.0.state.read().unwrap() {
831                StreamStateImpl::Connecting(connecting, _) => match connecting {
832                    StreamConnectingState::Accept(acceptor) => Some(acceptor.remote_id.clone()),
833                    _ => {
834                        unreachable!()
835                    }
836                },
837                _ => None,
838            }
839        }
840        .map(|remote_id| {
841            let mut payload = vec![0u8; answer.len()];
842            payload.copy_from_slice(answer);
843
844            TcpAckConnection {
845                sequence: self.sequence(),
846                to_session_id: remote_id,
847                result: TCP_ACK_CONNECTION_RESULT_OK,
848                to_device_desc: self.stack().sn_client().ping().default_local(),
849                payload: TailedOwnedData::from(payload),
850            }
851        })
852    }
853
854    pub(crate) fn ack_ack_tcp_stream(&self, result: u8) -> TcpAckAckConnection {
855        TcpAckAckConnection {
856            sequence: self.sequence(),
857            result,
858        }
859    }
860
861    pub fn tunnel(&self) -> Option<TunnelGuard> {
862        match &*self.0.state.read().unwrap() {
863            StreamStateImpl::Initial(tunnel) => Some(tunnel.clone()),
864            StreamStateImpl::Connecting(_, tunnel) => Some(tunnel.clone()),
865            StreamStateImpl::Establish(_, tunnel) => Some(tunnel.clone()),
866            StreamStateImpl::Closing(_, tunnel) => Some(tunnel.clone()),
867            StreamStateImpl::Closed => None
868        }
869    }
870
871    pub fn state(&self) -> StreamState {
872        match &*self.0.state.read().unwrap() {
873            StreamStateImpl::Initial(_) => unreachable!(),
874            StreamStateImpl::Connecting(..) => StreamState::Connecting,
875            StreamStateImpl::Establish(establish, ..) => {
876                StreamState::Establish(establish.remote_timestamp)
877            }
878            StreamStateImpl::Closing(..) => StreamState::Closing,
879            StreamStateImpl::Closed => StreamState::Closed,
880        }
881    }
882
883    pub(crate) fn is_connecting(&self) -> bool {
884        let state = self.0.state.read().unwrap();
885        let s1 = state.deref();
886        match s1 {
887            StreamStateImpl::Connecting(..) => true,
888            _ => false,
889        }
890    }
891
892    pub(crate) fn acceptor(&self) -> Option<AcceptStreamBuilder> {
893        if let StreamStateImpl::Connecting(connecting, _) = &*self.0.state.read().unwrap() {
894            if let StreamConnectingState::Accept(acceptor) = connecting {
895                return Some(acceptor.builder.clone());
896            }
897        }
898        None
899    }
900
901    pub(crate) fn stack(&self) -> Stack {
902        Stack::from(&self.0.stack)
903    }
904
905    pub(crate) fn break_with_error(&self, err: BuckyError, reserving: bool, marking: bool) {
906        error!("{} break with err {}", self, err);
907        let state_dump = {
908            let state = &mut *self.0.state.write().unwrap();
909            match state {
910                StreamStateImpl::Establish(establish, tunnel) => {
911                    let tunnel = tunnel.clone();
912                    let state_dump = if marking {
913                        Some((tunnel, establish.remote_timestamp, establish.start_at))
914                    } else {
915                        None
916                    };
917                    *state = StreamStateImpl::Closed;
918                    state_dump
919                }
920                _ => None,
921            }
922        };
923        if let Some((tunnel, remote_timestamp, start_at)) = state_dump {
924            debug!("{} mark tunnel dead for break", self);
925            let _ = tunnel.mark_dead(remote_timestamp, start_at);
926        }
927        self.stack().stream_manager().remove_stream(self, reserving);
928    }
929
930    pub(super) fn on_shutdown(&self, reserving: bool) {
931        *self.0.state.write().unwrap() = StreamStateImpl::Closed;
932        self.stack().stream_manager().remove_stream(self, reserving);
933    }
934
935
936    pub async fn confirm(&self, answer: &[u8]) -> BuckyResult<()> {
937        if answer.len() > ANSWER_MAX_LEN {
938            return Err(BuckyError::new(
939                BuckyErrorCode::Failed,
940                format!("answer's length large than {}", ANSWER_MAX_LEN),
941            ));
942        }
943
944        let builder = {
945            let state = &*self.0.state.read().unwrap();
946            match state {
947                StreamStateImpl::Connecting(connecting, _) => match connecting {
948                    StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
949                    _ => Err(BuckyError::new(
950                        BuckyErrorCode::ErrorState,
951                        "confirm on error state",
952                    )),
953                },
954                _ => Err(BuckyError::new(
955                    BuckyErrorCode::ErrorState,
956                    "confirm on error state",
957                )),
958            }?
959        };
960        match builder.confirm(answer) {
961            Err(e) => {
962                error!("{} confirm failed for {}", self, e);
963                Err(e)
964            }
965            Ok(v) => {
966                info!("{} confirmed", self);
967                Ok(v)
968            }
969        }
970    }
971
972    pub fn shutdown(&self, which: Shutdown) -> std::io::Result<()> {
973        info!("{} shutdown", self);
974        let provider = {
975            let state = &*self.0.state.read().unwrap();
976            match state {
977                StreamStateImpl::Establish(s, _) => Ok(Some(s.provider.clone_as_provider())),
978				StreamStateImpl::Closed => Ok(None),
979                _ => {
980                    //TODO 其他状态暂时不支持shutdown
981                    Err(std::io::Error::new(
982                        std::io::ErrorKind::Other,
983                        "not support when state is not establish",
984                    ))
985                }
986            }
987        }
988        .map_err(|e| {
989            error!("{} shutdown failed for {}", self, e);
990            e
991        })?;
992
993        if let Some(provider) = provider {
994            provider.shutdown(which, &self)
995        } else {
996            Ok(())
997        }
998    }
999
1000    pub fn readable(&self) -> StreamReadableFuture {
1001        StreamReadableFuture {
1002            stream: self.clone(),
1003        }
1004    }
1005
1006    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<usize>> {
1007        let provider = {
1008            let state = &*self.0.state.read().unwrap();
1009            match state {
1010                StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1011                _ => None,
1012            }
1013        };
1014        if let Some(provider) = provider {
1015            provider.poll_readable(cx)
1016        } else {
1017            Poll::Ready(Err(std::io::Error::new(
1018                ErrorKind::NotConnected,
1019                "not establish",
1020            )))
1021        }
1022    }
1023
1024    fn has_first_answer(&self) -> bool {
1025        self.0.answer_data.read().unwrap().is_some()
1026    }
1027
1028    fn read_first_answer(&self, buf: &mut [u8]) -> usize {
1029        if self.has_first_answer() {
1030            let answer_data = &mut *self.0.answer_data.write().unwrap();
1031            match answer_data {
1032                Some(answer_buf) => {
1033                    let mut read_len = answer_buf.len();
1034                    if read_len > buf.len() {
1035                        read_len = buf.len();
1036                    }
1037        
1038                    buf[..read_len].copy_from_slice(&answer_buf[..read_len]);
1039        
1040                    if read_len == answer_buf.len() {
1041                        *answer_data = None;
1042                    } else {
1043                        let data = &answer_buf[read_len..];
1044
1045                        *answer_data = Some(data.to_vec())
1046                    }
1047        
1048                    read_len
1049                },
1050                None => 0
1051            }
1052        } else {
1053            0
1054        }
1055    }
1056
1057    fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
1058        debug!("{} poll read {} bytes", self, buf.len());
1059        let read_len = self.read_first_answer(buf);
1060        if read_len > 0 {
1061            return Poll::Ready(Ok(read_len));
1062        }
1063
1064        let provider = {
1065            let state = &*self.0.state.read().unwrap();
1066            match state {
1067                StreamStateImpl::Initial(_) | StreamStateImpl::Connecting(..) => {
1068                    trace!(
1069                        "{} poll-write in initial/connecting.",
1070                        self,
1071                    );
1072                    None
1073                }, 
1074                StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1075                StreamStateImpl::Closing(s, _) => Some(s.provider.clone_as_provider()), 
1076                _ => {
1077                    return Poll::Ready(Ok(0));
1078                }
1079            }
1080        };
1081        
1082        if let Some (provider) = provider {
1083            let read_len = self.read_first_answer(buf);
1084            if read_len > 0 {
1085                return Poll::Ready(Ok(read_len));
1086            }
1087            provider.poll_read(cx, buf)
1088        } else {
1089            let waker = cx.waker().clone();
1090            let stream = self.clone();
1091            task::spawn(async move {
1092                let _ = stream.wait_establish().await;
1093                waker.wake();
1094            });
1095            Poll::Pending
1096        }
1097    }
1098
1099    fn poll_write(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
1100        debug!("{} poll write {} bytes", self, buf.len());
1101        self.poll_write_wait_establish(cx.waker().clone(), |provider| {
1102            provider.poll_write(cx, buf)
1103        })
1104    }
1105
1106    fn poll_flush(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1107        debug!("{} poll flush", self);
1108        self.poll_write_wait_establish(cx.waker().clone(), |provider| {
1109            provider.poll_flush(cx)
1110        })
1111    }
1112
1113    fn poll_close(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1114        debug!("{} poll close", self);
1115        let provider = {
1116            let state = &*self.0.state.read().unwrap();
1117            match state {
1118                StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1119                StreamStateImpl::Initial(_) => {
1120                    debug!("poll-close, {} in initial.", self);
1121                    None
1122                }
1123                StreamStateImpl::Connecting(..) => {
1124                    debug!("poll-close, {} in connecting.", self);
1125                    None
1126                }
1127                StreamStateImpl::Closing(s, _) => {
1128                    debug!("poll-close, {} in closing.", self);
1129                    Some(s.provider.clone_as_provider())
1130                }
1131                StreamStateImpl::Closed => {
1132                    debug!("poll-close, {} in closed ready.", self);
1133                    return Poll::Ready(Ok(()));
1134                }
1135            }
1136        };
1137
1138        match provider {
1139            Some(provider) => provider.poll_close(cx),
1140            None => {
1141                let _ = self.cancel_connecting_with(&BuckyError::new(
1142                    BuckyErrorCode::ConnectionAborted,
1143                    "user close",
1144                ));
1145                Poll::Ready(Err(std::io::Error::new(
1146                    ErrorKind::ConnectionAborted,
1147                    "close by user",
1148                )))
1149            }
1150        }
1151    }
1152
1153    pub fn remote(&self) -> (&DeviceId, u16) {
1154        (&self.0.remote_device, self.0.remote_port)
1155    }
1156
1157    pub fn sequence(&self) -> TempSeq {
1158        self.0.sequence
1159    }
1160
1161    pub fn local_id(&self) -> IncreaseId {
1162        self.0.local_id
1163    }
1164
1165    pub fn local_ep(&self) -> Option<Endpoint> {
1166        let state = &*self.0.state.read().unwrap();
1167        match state {
1168            StreamStateImpl::Establish(s, _) => Some(*s.provider.local_ep()),
1169            _ => None,
1170        }
1171    }
1172
1173    pub fn remote_ep(&self) -> Option<Endpoint> {
1174        let state = &*self.0.state.read().unwrap();
1175        match state {
1176            StreamStateImpl::Establish(s, _) => Some(*s.provider.remote_ep()),
1177            _ => None,
1178        }
1179    }
1180
1181    fn poll_write_wait_establish<R>(
1182        &self,
1183        waker: Waker,
1184        mut proc: impl FnMut(&dyn StreamProvider) -> Poll<std::io::Result<R>>,
1185    ) -> Poll<std::io::Result<R>> {
1186        let provider = {
1187            let state = &*self.0.state.read().unwrap();
1188            match state {
1189                StreamStateImpl::Establish(s, _) => Some(s.provider.clone_as_provider()),
1190                StreamStateImpl::Initial(..) | StreamStateImpl::Connecting(..) => {
1191                    trace!(
1192                        "{} poll-write in initial/connecting.",
1193                        self,
1194                    );
1195                    None
1196                }, 
1197                _ => {
1198                    let msg = format!("{} poll-write in close.", self);
1199                    error!("{}", msg);
1200                    return Poll::Ready(Err(std::io::Error::new(ErrorKind::NotConnected, msg)));
1201                }
1202            }
1203        };
1204
1205        match provider {
1206            Some(provider) => proc(&*provider),
1207            None => {
1208                let stream = self.clone();
1209                task::spawn(async move {
1210                    let _ = stream.wait_establish().await;
1211                    waker.wake();
1212                });
1213                Poll::Pending
1214            }
1215        }
1216    }
1217}
1218
1219pub struct StreamReadableFuture {
1220    stream: StreamContainer,
1221}
1222
1223impl Future for StreamReadableFuture {
1224    type Output = std::io::Result<usize>;
1225
1226    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1227        self.stream.poll_readable(cx)
1228    }
1229}
1230
1231
1232impl OnPackage<TcpSynConnection, tcp::AcceptInterface> for StreamContainer {
1233    fn on_package(
1234        &self,
1235        pkg: &TcpSynConnection,
1236        interface: tcp::AcceptInterface,
1237    ) -> BuckyResult<OnPackageResult> {
1238        debug!("{} on package {} from {}", self, pkg, interface);
1239        // syn tcp 直接转给builder
1240        let state = &*self.0.state.read().unwrap();
1241        let builder = match state {
1242            StreamStateImpl::Connecting(connecting, _) => match connecting {
1243                StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
1244                _ => Err(BuckyError::new(
1245                    BuckyErrorCode::ErrorState,
1246                    "syn session data on error state",
1247                )),
1248            },
1249            _ => Err(BuckyError::new(
1250                BuckyErrorCode::ErrorState,
1251                "syn session data on error state",
1252            )),
1253        }?;
1254        builder.on_package(pkg, interface)
1255    }
1256}
1257
1258impl OnPackage<SessionData> for StreamContainer {
1259    fn on_package(&self, pkg: &SessionData, _: Option<()>) -> BuckyResult<OnPackageResult> {
1260        if pkg.is_syn() {
1261            debug!("{} on package {}", self, pkg);
1262            // syn session data直接转给builder
1263            let state = &*self.0.state.read().unwrap();
1264            let builder = match state {
1265                StreamStateImpl::Connecting(connecting, _) => match connecting {
1266                    StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
1267                    _ => Err(BuckyError::new(
1268                        BuckyErrorCode::ErrorState,
1269                        "syn session data on error state",
1270                    )),
1271                },
1272                _ => Err(BuckyError::new(
1273                    BuckyErrorCode::ErrorState,
1274                    "syn session data on error state",
1275                )),
1276            }?;
1277            builder.on_package(pkg, None)
1278        } else if pkg.is_syn_ack() {
1279            debug!("{} on package {}", self, pkg);
1280            // 传给 connector provider
1281            let handler: Box<dyn OnPackage<SessionData>> = match &*self.0.state.read().unwrap() {
1282                StreamStateImpl::Connecting(connecting, _) => match connecting {
1283                    StreamConnectingState::Connect(connector) => match &connector.state {
1284                        connector::State::Package(package_provider) => {
1285                            Ok(Box::new(package_provider.as_ref().clone())
1286                                as Box<dyn OnPackage<SessionData>>)
1287                        }
1288                        connector::State::Builder(builder_provider) => {
1289                            Ok(Box::new(builder_provider.as_ref().clone())
1290                                as Box<dyn OnPackage<SessionData>>)
1291                        }
1292                        _ => unreachable!(),
1293                    },
1294                    _ => Err(BuckyError::new(
1295                        BuckyErrorCode::ErrorState,
1296                        "syn ack session data on error state",
1297                    )),
1298                },
1299                StreamStateImpl::Establish(state, _) => {
1300                    match state.provider.clone_as_package_handler() {
1301                        Some(h) => Ok(h),
1302                        None => Err(BuckyError::new(
1303                            BuckyErrorCode::InternalError,
1304                            "clone handler failed",
1305                        )),
1306                    }
1307                }
1308                _ => Err(BuckyError::new(
1309                    BuckyErrorCode::ErrorState,
1310                    "syn session data on error state",
1311                )),
1312            }?;
1313            handler.on_package(pkg, None)
1314        } else {
1315            trace!("{} on package {}", self, pkg);
1316            //进读锁转给provider
1317            let opt_handler: Option<Box<dyn OnPackage<SessionData>>> =
1318                match &*self.0.state.read().unwrap() {
1319                    StreamStateImpl::Establish(state, _) => state.provider.clone_as_package_handler(),
1320                    StreamStateImpl::Connecting(connecting, _) => match connecting {
1321                        StreamConnectingState::Connect(_) => None,
1322                        StreamConnectingState::Accept(acceptor) => {
1323                            Some(Box::new(acceptor.builder.clone()))
1324                        }
1325                    },
1326                    StreamStateImpl::Closing(state, _) => {
1327                        state.provider.clone_as_package_handler()
1328                        // unimplemented!()
1329                    }
1330                    _ => None,
1331                };
1332            match opt_handler {
1333                Some(provider) => provider.on_package(pkg, None),
1334                None => Ok(OnPackageResult::Handled),
1335            }
1336        }
1337    }
1338}
1339
1340impl OnPackage<TcpSynConnection> for StreamContainer {
1341    fn on_package(
1342        &self,
1343        pkg: &TcpSynConnection,
1344        _: Option<()>,
1345    ) -> BuckyResult<OnPackageResult> {
1346        debug!("{} on package {}", self, pkg);
1347        assert_eq!(pkg.reverse_endpoint.is_some(), true);
1348        // syn tcp 直接转给builder
1349        let state = &*self.0.state.read().unwrap();
1350        let builder = match state {
1351            StreamStateImpl::Connecting(connecting, _) => match connecting {
1352                StreamConnectingState::Accept(acceptor) => Ok(acceptor.builder.clone()),
1353                _ => Err(BuckyError::new(
1354                    BuckyErrorCode::ErrorState,
1355                    "tcp syn connection on error state",
1356                )),
1357            },
1358            _ => Err(BuckyError::new(
1359                BuckyErrorCode::ErrorState,
1360                "tcp syn connection on error state",
1361            )),
1362        }?;
1363        builder.on_package(pkg, None)
1364    }
1365}
1366
1367impl OnPackage<TcpAckConnection, tcp::AcceptInterface> for StreamContainer {
1368    fn on_package(
1369        &self,
1370        pkg: &TcpAckConnection,
1371        interface: tcp::AcceptInterface,
1372    ) -> BuckyResult<OnPackageResult> {
1373        debug!("{} on package {} from {}", self, pkg, interface);
1374        let opt_handler = {
1375            let state = &*self.0.state.read().unwrap();
1376            match state {
1377                StreamStateImpl::Connecting(connecting, _) => {
1378                    match connecting {
1379                        StreamConnectingState::Connect(connector) => {
1380                            match &connector.state {
1381                                connector::State::Builder(builder) => {
1382                                    Some(Box::new(builder.as_ref().clone())
1383                                        as Box<
1384                                            dyn OnPackage<TcpAckConnection, tcp::AcceptInterface>,
1385                                        >)
1386                                }
1387                                connector::State::Reverse(reverse) => {
1388                                    if reverse.action.local().eq(interface.local())
1389                                    /*&& reverse.action.remote().eq(interface.remote())*/
1390                                    {
1391                                        Some(Box::new(reverse.action.clone())
1392                                            as Box<
1393                                                dyn OnPackage<
1394                                                    TcpAckConnection,
1395                                                    tcp::AcceptInterface,
1396                                                >,
1397                                            >)
1398                                    } else {
1399                                        debug!(
1400                                            "{} ignore incoming stream {} for local is {}",
1401                                            self,
1402                                            interface,
1403                                            reverse.action.local()
1404                                        );
1405                                        None
1406                                    }
1407                                }
1408                                _ => None,
1409                            }
1410                        }
1411                        _ => None,
1412                    }
1413                }
1414                _ => None,
1415            }
1416        };
1417        opt_handler
1418            .ok_or_else(|| {
1419                BuckyError::new(
1420                    BuckyErrorCode::ErrorState,
1421                    "tcp ack connection on error state",
1422                )
1423            })
1424            .and_then(|handler| handler.on_package(pkg, interface.clone()))
1425            .map_err(|err| {
1426                let stream = self.clone();
1427                task::spawn(async move {
1428                    let ack_ack_stream = stream.ack_ack_tcp_stream(TCP_ACK_CONNECTION_RESULT_REFUSED);
1429                    let _ = match interface
1430                        .confirm_accept(vec![DynamicPackage::from(ack_ack_stream)])
1431                        .await
1432                    {
1433                        Ok(_) => {
1434                            debug!(
1435                                "{} confirm {} with refuse tcp connection ",
1436                                stream,
1437                                interface
1438                            );
1439                        }
1440                        Err(e) => {
1441                            warn!(
1442                                "{} confirm {} with tcp ack ack connection failed for {}",
1443                                stream,
1444                                interface,
1445                                e
1446                            );
1447                            let tunnel = stream.tunnel()
1448                                .ok_or_else(|| BuckyError::new(BuckyErrorCode::ErrorState, "stream's closed"))
1449                                .and_then(|tunnel| tunnel.create_tunnel::<tunnel::tcp::Tunnel>(
1450                                    EndpointPair::from((
1451                                        *interface.local(),
1452                                        Endpoint::default_tcp(interface.local()),
1453                                    )),
1454                                    ProxyType::None,
1455                                ));
1456                            if let Ok((tunnel, _)) = tunnel {
1457                                tunnel.mark_dead(tunnel.state());
1458                            }
1459                        }
1460                    };
1461                });
1462                err
1463            })
1464    }
1465}
1466
1467struct StreamGuardImpl(StreamContainer);
1468
1469impl Drop for StreamGuardImpl {
1470    fn drop(&mut self) {
1471        debug!("{} droped and will closed", self.0);
1472
1473        let _ = self.0.shutdown(Shutdown::Both);
1474    }
1475}
1476
1477#[derive(Clone)]
1478pub struct StreamGuard(Arc<StreamGuardImpl>);
1479
1480impl fmt::Display for StreamGuard {
1481    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1482        write!(f, "StreamGuard {{stream:{}}}", (*self.0).0)
1483    }
1484}
1485
1486impl Deref for StreamGuard {
1487    type Target = StreamContainer;
1488    fn deref(&self) -> &StreamContainer {
1489        &(*self.0).0
1490    }
1491}
1492
1493impl From<StreamContainer> for StreamGuard {
1494    fn from(stream: StreamContainer) -> Self {
1495        Self(Arc::new(StreamGuardImpl(stream)))
1496    }
1497}
1498
1499impl StreamGuard {
1500    pub fn display_ref_count(&self) {
1501        info!(
1502            "bdt stream ref counts: seq={:?}, impl=({}, {}), container=({},{})",
1503            self.sequence(),
1504            Arc::strong_count(&self.0),
1505            Arc::weak_count(&self.0),
1506            Arc::strong_count(&self.0 .0 .0),
1507            Arc::weak_count(&self.0 .0 .0)
1508        );
1509    }
1510}
1511
1512impl Read for StreamGuard {
1513    fn poll_read(
1514        self: Pin<&mut Self>,
1515        cx: &mut Context<'_>,
1516        buf: &mut [u8],
1517    ) -> Poll<std::io::Result<usize>> {
1518        Pin::new(&mut &*self).poll_read(cx, buf)
1519    }
1520
1521    fn poll_read_vectored(
1522        self: Pin<&mut Self>,
1523        cx: &mut Context<'_>,
1524        bufs: &mut [std::io::IoSliceMut<'_>],
1525    ) -> Poll<std::io::Result<usize>> {
1526        Pin::new(&mut &*self).poll_read_vectored(cx, bufs)
1527    }
1528}
1529
1530impl Read for &StreamGuard {
1531    fn poll_read(
1532        self: Pin<&mut Self>,
1533        cx: &mut Context<'_>,
1534        buf: &mut [u8],
1535    ) -> Poll<std::io::Result<usize>> {
1536        let guard_impl = self.0.clone();
1537        let container = &guard_impl.0;
1538        container.poll_read(cx, buf)
1539    }
1540}
1541
1542impl Write for StreamGuard {
1543    fn poll_write(
1544        self: Pin<&mut Self>,
1545        cx: &mut Context<'_>,
1546        buf: &[u8],
1547    ) -> Poll<std::io::Result<usize>> {
1548        Pin::new(&mut &*self).poll_write(cx, buf).map_err(|e| {
1549            error!("stream guard poll_write error: {}", e);
1550            e
1551        })
1552    }
1553
1554    fn poll_write_vectored(
1555        self: Pin<&mut Self>,
1556        cx: &mut Context<'_>,
1557        bufs: &[std::io::IoSlice<'_>],
1558    ) -> Poll<std::io::Result<usize>> {
1559        Pin::new(&mut &*self)
1560            .poll_write_vectored(cx, bufs)
1561            .map_err(|e| {
1562                error!("{} poll_write_vectored error: {}", (*self), e);
1563                e
1564            })
1565    }
1566
1567    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1568        Pin::new(&mut &*self).poll_flush(cx).map_err(|e| {
1569            error!("{} poll_flush error: {}", (*self), e);
1570            e
1571        })
1572    }
1573
1574    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1575        Pin::new(&mut &*self).poll_close(cx).map_err(|e| {
1576            error!("{} poll_close error: {}", (*self), e);
1577            e
1578        })
1579    }
1580}
1581
1582impl Write for &StreamGuard {
1583    fn poll_write(
1584        self: Pin<&mut Self>,
1585        cx: &mut Context<'_>,
1586        buf: &[u8],
1587    ) -> Poll<std::io::Result<usize>> {
1588        let guard_impl = self.0.clone();
1589        let container = &guard_impl.0;
1590        container.poll_write(cx, buf).map_err(|e| {
1591            error!("{} poll_write error: {}", (*self), e);
1592            e
1593        })
1594    }
1595
1596    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1597        let guard_impl = self.0.clone();
1598        let container = &guard_impl.0;
1599        container.poll_flush(cx).map_err(|e| {
1600            error!("{} poll_flush error: {}", (*self), e);
1601            e
1602        })
1603    }
1604
1605    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1606        let guard_impl = self.0.clone();
1607        let container = &guard_impl.0;
1608        container.poll_close(cx).map_err(|e| {
1609            error!("{} poll_close error: {}", (*self), e);
1610            e
1611        })
1612    }
1613}