actori_ioframe/
connect.rs

1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actori_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
6use actori_utils::mpsc::Receiver;
7use futures::Stream;
8
9pub struct Connect<Io, Codec>
10where
11    Codec: Encoder + Decoder,
12{
13    io: Io,
14    _t: PhantomData<Codec>,
15}
16
17impl<Io, Codec> Connect<Io, Codec>
18where
19    Io: AsyncRead + AsyncWrite,
20    Codec: Encoder + Decoder,
21{
22    pub(crate) fn new(io: Io) -> Self {
23        Self {
24            io,
25            _t: PhantomData,
26        }
27    }
28
29    pub fn codec(
30        self,
31        codec: Codec,
32    ) -> ConnectResult<Io, (), Codec, Receiver<<Codec as Encoder>::Item>> {
33        ConnectResult {
34            state: (),
35            out: None,
36            framed: Framed::new(self.io, codec),
37        }
38    }
39}
40
41#[pin_project::pin_project]
42pub struct ConnectResult<Io, St, Codec: Encoder + Decoder, Out> {
43    pub(crate) state: St,
44    pub(crate) out: Option<Out>,
45    pub(crate) framed: Framed<Io, Codec>,
46}
47
48impl<Io, St, Codec: Encoder + Decoder, Out: Unpin> ConnectResult<Io, St, Codec, Out> {
49    #[inline]
50    pub fn get_ref(&self) -> &Io {
51        self.framed.get_ref()
52    }
53
54    #[inline]
55    pub fn get_mut(&mut self) -> &mut Io {
56        self.framed.get_mut()
57    }
58
59    pub fn out<U>(self, out: U) -> ConnectResult<Io, St, Codec, U>
60    where
61        U: Stream<Item = <Codec as Encoder>::Item> + Unpin,
62    {
63        ConnectResult {
64            state: self.state,
65            framed: self.framed,
66            out: Some(out),
67        }
68    }
69
70    #[inline]
71    pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec, Out> {
72        ConnectResult {
73            state,
74            framed: self.framed,
75            out: self.out,
76        }
77    }
78}
79
80impl<Io, St, Codec, Out> Stream for ConnectResult<Io, St, Codec, Out>
81where
82    Io: AsyncRead + AsyncWrite,
83    Codec: Encoder + Decoder,
84{
85    type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
86
87    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88        self.project().framed.next_item(cx)
89    }
90}
91
92impl<Io, St, Codec, Out> futures::Sink<<Codec as Encoder>::Item>
93    for ConnectResult<Io, St, Codec, Out>
94where
95    Io: AsyncRead + AsyncWrite,
96    Codec: Encoder + Decoder,
97{
98    type Error = <Codec as Encoder>::Error;
99
100    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101        if self.framed.is_write_ready() {
102            Poll::Ready(Ok(()))
103        } else {
104            Poll::Pending
105        }
106    }
107
108    fn start_send(
109        self: Pin<&mut Self>,
110        item: <Codec as Encoder>::Item,
111    ) -> Result<(), Self::Error> {
112        self.project().framed.write(item)
113    }
114
115    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
116        self.get_mut().framed.flush(cx)
117    }
118
119    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120        self.get_mut().framed.close(cx)
121    }
122}