1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
6use futures::Stream;
7
8use crate::sink::Sink;
9
10pub struct Connect<Io, Codec, St = ()>
11where
12 Codec: Encoder + Decoder,
13{
14 io: Io,
15 sink: Sink<<Codec as Encoder>::Item>,
16 _t: PhantomData<(St, Codec)>,
17}
18
19impl<Io, Codec> Connect<Io, Codec>
20where
21 Io: AsyncRead + AsyncWrite,
22 Codec: Encoder + Decoder,
23{
24 pub(crate) fn new(io: Io, sink: Sink<<Codec as Encoder>::Item>) -> Self {
25 Self {
26 io,
27 sink,
28 _t: PhantomData,
29 }
30 }
31
32 pub fn codec(self, codec: Codec) -> ConnectResult<Io, (), Codec> {
33 ConnectResult {
34 state: (),
35 sink: self.sink,
36 framed: Framed::new(self.io, codec),
37 }
38 }
39}
40
41#[pin_project::pin_project]
42pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
43 pub(crate) state: St,
44 pub(crate) framed: Framed<Io, Codec>,
45 pub(crate) sink: Sink<<Codec as Encoder>::Item>,
46}
47
48impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
49 #[inline]
50 pub fn sink(&self) -> &Sink<<Codec as Encoder>::Item> {
51 &self.sink
52 }
53
54 #[inline]
55 pub fn get_ref(&self) -> &Io {
56 self.framed.get_ref()
57 }
58
59 #[inline]
60 pub fn get_mut(&mut self) -> &mut Io {
61 self.framed.get_mut()
62 }
63
64 #[inline]
65 pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec> {
66 ConnectResult {
67 state,
68 framed: self.framed,
69 sink: self.sink,
70 }
71 }
72}
73
74impl<Io, St, Codec> Stream for ConnectResult<Io, St, Codec>
75where
76 Io: AsyncRead + AsyncWrite,
77 Codec: Encoder + Decoder,
78{
79 type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
80
81 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 self.project().framed.next_item(cx)
83 }
84}
85
86impl<Io, St, Codec> futures::Sink<<Codec as Encoder>::Item> for ConnectResult<Io, St, Codec>
87where
88 Io: AsyncRead + AsyncWrite,
89 Codec: Encoder + Decoder,
90{
91 type Error = <Codec as Encoder>::Error;
92
93 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 if self.framed.is_write_ready() {
95 Poll::Ready(Ok(()))
96 } else {
97 Poll::Pending
98 }
99 }
100
101 fn start_send(
102 self: Pin<&mut Self>,
103 item: <Codec as Encoder>::Item,
104 ) -> Result<(), Self::Error> {
105 self.project().framed.write(item)
106 }
107
108 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109 self.get_mut().framed.flush(cx)
110 }
111
112 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113 self.get_mut().framed.close(cx)
114 }
115}