async_codec_lite/framed/
mod.rs

1mod inner;
2mod read;
3mod write;
4
5pub use self::{read::FramedRead, write::FramedWrite};
6
7use self::inner::{FramedInner, RWFrames, ReadFrame, WriteFrame};
8use crate::{
9    codec::{Decoder, Encoder},
10    error::Error,
11};
12pub use bytes::{Bytes, BytesMut};
13use futures_core::Stream;
14use futures_io::{AsyncRead, AsyncWrite};
15use futures_sink::Sink;
16use pin_project_lite::pin_project;
17use std::{
18    fmt,
19    pin::Pin,
20    task::{Context, Poll},
21};
22
23pin_project! {
24    pub struct Framed<T, U> {
25        #[pin]
26        inner: FramedInner<T, U, RWFrames>
27    }
28}
29
30impl<T, U> Framed<T, U> {
31    pub fn new(inner: T, codec: U) -> Framed<T, U> {
32        Framed {
33            inner: FramedInner {
34                inner,
35                codec,
36                state: Default::default(),
37            },
38        }
39    }
40
41    pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
42        Framed {
43            inner: FramedInner {
44                inner,
45                codec,
46                state: RWFrames {
47                    read: ReadFrame {
48                        buffer: BytesMut::with_capacity(capacity),
49                        eof: false,
50                        has_errored: false,
51                        is_readable: false,
52                    },
53                    write: WriteFrame::default(),
54                },
55            },
56        }
57    }
58
59    pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
60        Framed {
61            inner: FramedInner {
62                inner: parts.io,
63                codec: parts.codec,
64                state: RWFrames {
65                    read: parts.read_buf.into(),
66                    write: parts.write_buf.into(),
67                },
68            },
69        }
70    }
71
72    pub fn get_ref(&self) -> &T {
73        &self.inner.inner
74    }
75
76    pub fn get_mut(&mut self) -> &mut T {
77        &mut self.inner.inner
78    }
79
80    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
81        self.project().inner.project().inner
82    }
83
84    pub fn codec(&self) -> &U {
85        &self.inner.codec
86    }
87
88    pub fn codec_mut(&mut self) -> &mut U {
89        &mut self.inner.codec
90    }
91
92    pub fn read_buffer(&self) -> &BytesMut {
93        &self.inner.state.read.buffer
94    }
95
96    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
97        &mut self.inner.state.read.buffer
98    }
99
100    pub fn into_inner(self) -> T {
101        self.inner.inner
102    }
103
104    pub fn into_parts(self) -> FramedParts<T, U> {
105        FramedParts {
106            io: self.inner.inner,
107            codec: self.inner.codec,
108            read_buf: self.inner.state.read.buffer,
109            write_buf: self.inner.state.write.buffer,
110            _priv: (),
111        }
112    }
113}
114
115impl<T, U> Stream for Framed<T, U>
116where
117    T: AsyncRead,
118    U: Decoder,
119{
120    type Item = Result<U::Item, Error<U::Error>>;
121
122    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123        self.project().inner.poll_next(cx)
124    }
125}
126
127impl<T, U> Sink<U::Item> for Framed<T, U>
128where
129    T: AsyncWrite,
130    U: Encoder,
131{
132    type Error = Error<U::Error>;
133
134    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135        self.project().inner.poll_ready(cx)
136    }
137
138    fn start_send(self: Pin<&mut Self>, item: U::Item) -> Result<(), Self::Error> {
139        self.project().inner.start_send(item)
140    }
141
142    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143        self.project().inner.poll_flush(cx)
144    }
145
146    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        self.project().inner.poll_close(cx)
148    }
149}
150
151impl<T, U> fmt::Debug for Framed<T, U>
152where
153    T: fmt::Debug,
154    U: fmt::Debug,
155{
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        f.debug_struct("Framed")
158            .field("io", self.get_ref())
159            .field("codec", self.codec())
160            .finish()
161    }
162}
163
164#[derive(Debug)]
165#[allow(clippy::manual_non_exhaustive)]
166pub struct FramedParts<T, U> {
167    pub io: T,
168    pub codec: U,
169    pub read_buf: BytesMut,
170    pub write_buf: BytesMut,
171    _priv: (),
172}
173
174impl<T, U> FramedParts<T, U> {
175    pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
176    where
177        U: Encoder,
178    {
179        FramedParts {
180            io,
181            codec,
182            read_buf: BytesMut::new(),
183            write_buf: BytesMut::new(),
184            _priv: (),
185        }
186    }
187}