actix_ioframe/
connect.rs

1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
6use futures::Stream;
7
8use crate::sink::Sink;
9
10pub struct Connect<Io, Codec, St = ()>
11where
12    Codec: Encoder + Decoder,
13{
14    io: Io,
15    sink: Sink<<Codec as Encoder>::Item>,
16    _t: PhantomData<(St, Codec)>,
17}
18
19impl<Io, Codec> Connect<Io, Codec>
20where
21    Io: AsyncRead + AsyncWrite,
22    Codec: Encoder + Decoder,
23{
24    pub(crate) fn new(io: Io, sink: Sink<<Codec as Encoder>::Item>) -> Self {
25        Self {
26            io,
27            sink,
28            _t: PhantomData,
29        }
30    }
31
32    pub fn codec(self, codec: Codec) -> ConnectResult<Io, (), Codec> {
33        ConnectResult {
34            state: (),
35            sink: self.sink,
36            framed: Framed::new(self.io, codec),
37        }
38    }
39}
40
41#[pin_project::pin_project]
42pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
43    pub(crate) state: St,
44    pub(crate) framed: Framed<Io, Codec>,
45    pub(crate) sink: Sink<<Codec as Encoder>::Item>,
46}
47
48impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
49    #[inline]
50    pub fn sink(&self) -> &Sink<<Codec as Encoder>::Item> {
51        &self.sink
52    }
53
54    #[inline]
55    pub fn get_ref(&self) -> &Io {
56        self.framed.get_ref()
57    }
58
59    #[inline]
60    pub fn get_mut(&mut self) -> &mut Io {
61        self.framed.get_mut()
62    }
63
64    #[inline]
65    pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec> {
66        ConnectResult {
67            state,
68            framed: self.framed,
69            sink: self.sink,
70        }
71    }
72}
73
74impl<Io, St, Codec> Stream for ConnectResult<Io, St, Codec>
75where
76    Io: AsyncRead + AsyncWrite,
77    Codec: Encoder + Decoder,
78{
79    type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
80
81    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82        self.project().framed.next_item(cx)
83    }
84}
85
86impl<Io, St, Codec> futures::Sink<<Codec as Encoder>::Item> for ConnectResult<Io, St, Codec>
87where
88    Io: AsyncRead + AsyncWrite,
89    Codec: Encoder + Decoder,
90{
91    type Error = <Codec as Encoder>::Error;
92
93    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94        if self.framed.is_write_ready() {
95            Poll::Ready(Ok(()))
96        } else {
97            Poll::Pending
98        }
99    }
100
101    fn start_send(
102        self: Pin<&mut Self>,
103        item: <Codec as Encoder>::Item,
104    ) -> Result<(), Self::Error> {
105        self.project().framed.write(item)
106    }
107
108    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109        self.get_mut().framed.flush(cx)
110    }
111
112    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113        self.get_mut().framed.close(cx)
114    }
115}