actori_ioframe/
connect.rs1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actori_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
6use actori_utils::mpsc::Receiver;
7use futures::Stream;
8
9pub struct Connect<Io, Codec>
10where
11 Codec: Encoder + Decoder,
12{
13 io: Io,
14 _t: PhantomData<Codec>,
15}
16
17impl<Io, Codec> Connect<Io, Codec>
18where
19 Io: AsyncRead + AsyncWrite,
20 Codec: Encoder + Decoder,
21{
22 pub(crate) fn new(io: Io) -> Self {
23 Self {
24 io,
25 _t: PhantomData,
26 }
27 }
28
29 pub fn codec(
30 self,
31 codec: Codec,
32 ) -> ConnectResult<Io, (), Codec, Receiver<<Codec as Encoder>::Item>> {
33 ConnectResult {
34 state: (),
35 out: None,
36 framed: Framed::new(self.io, codec),
37 }
38 }
39}
40
41#[pin_project::pin_project]
42pub struct ConnectResult<Io, St, Codec: Encoder + Decoder, Out> {
43 pub(crate) state: St,
44 pub(crate) out: Option<Out>,
45 pub(crate) framed: Framed<Io, Codec>,
46}
47
48impl<Io, St, Codec: Encoder + Decoder, Out: Unpin> ConnectResult<Io, St, Codec, Out> {
49 #[inline]
50 pub fn get_ref(&self) -> &Io {
51 self.framed.get_ref()
52 }
53
54 #[inline]
55 pub fn get_mut(&mut self) -> &mut Io {
56 self.framed.get_mut()
57 }
58
59 pub fn out<U>(self, out: U) -> ConnectResult<Io, St, Codec, U>
60 where
61 U: Stream<Item = <Codec as Encoder>::Item> + Unpin,
62 {
63 ConnectResult {
64 state: self.state,
65 framed: self.framed,
66 out: Some(out),
67 }
68 }
69
70 #[inline]
71 pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec, Out> {
72 ConnectResult {
73 state,
74 framed: self.framed,
75 out: self.out,
76 }
77 }
78}
79
80impl<Io, St, Codec, Out> Stream for ConnectResult<Io, St, Codec, Out>
81where
82 Io: AsyncRead + AsyncWrite,
83 Codec: Encoder + Decoder,
84{
85 type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
86
87 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88 self.project().framed.next_item(cx)
89 }
90}
91
92impl<Io, St, Codec, Out> futures::Sink<<Codec as Encoder>::Item>
93 for ConnectResult<Io, St, Codec, Out>
94where
95 Io: AsyncRead + AsyncWrite,
96 Codec: Encoder + Decoder,
97{
98 type Error = <Codec as Encoder>::Error;
99
100 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101 if self.framed.is_write_ready() {
102 Poll::Ready(Ok(()))
103 } else {
104 Poll::Pending
105 }
106 }
107
108 fn start_send(
109 self: Pin<&mut Self>,
110 item: <Codec as Encoder>::Item,
111 ) -> Result<(), Self::Error> {
112 self.project().framed.write(item)
113 }
114
115 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
116 self.get_mut().framed.flush(cx)
117 }
118
119 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120 self.get_mut().framed.close(cx)
121 }
122}