1mod inner;
2mod read;
3mod write;
4
5pub use self::{read::FramedRead, write::FramedWrite};
6
7use self::inner::{FramedInner, RWFrames, ReadFrame, WriteFrame};
8use crate::{
9 codec::{Decoder, Encoder},
10 error::Error,
11};
12pub use bytes::{Bytes, BytesMut};
13use futures_core::Stream;
14use futures_io::{AsyncRead, AsyncWrite};
15use futures_sink::Sink;
16use pin_project_lite::pin_project;
17use std::{
18 fmt,
19 pin::Pin,
20 task::{Context, Poll},
21};
22
23pin_project! {
24 pub struct Framed<T, U> {
25 #[pin]
26 inner: FramedInner<T, U, RWFrames>
27 }
28}
29
30impl<T, U> Framed<T, U> {
31 pub fn new(inner: T, codec: U) -> Framed<T, U> {
32 Framed {
33 inner: FramedInner {
34 inner,
35 codec,
36 state: Default::default(),
37 },
38 }
39 }
40
41 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
42 Framed {
43 inner: FramedInner {
44 inner,
45 codec,
46 state: RWFrames {
47 read: ReadFrame {
48 buffer: BytesMut::with_capacity(capacity),
49 eof: false,
50 has_errored: false,
51 is_readable: false,
52 },
53 write: WriteFrame::default(),
54 },
55 },
56 }
57 }
58
59 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
60 Framed {
61 inner: FramedInner {
62 inner: parts.io,
63 codec: parts.codec,
64 state: RWFrames {
65 read: parts.read_buf.into(),
66 write: parts.write_buf.into(),
67 },
68 },
69 }
70 }
71
72 pub fn get_ref(&self) -> &T {
73 &self.inner.inner
74 }
75
76 pub fn get_mut(&mut self) -> &mut T {
77 &mut self.inner.inner
78 }
79
80 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
81 self.project().inner.project().inner
82 }
83
84 pub fn codec(&self) -> &U {
85 &self.inner.codec
86 }
87
88 pub fn codec_mut(&mut self) -> &mut U {
89 &mut self.inner.codec
90 }
91
92 pub fn read_buffer(&self) -> &BytesMut {
93 &self.inner.state.read.buffer
94 }
95
96 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
97 &mut self.inner.state.read.buffer
98 }
99
100 pub fn into_inner(self) -> T {
101 self.inner.inner
102 }
103
104 pub fn into_parts(self) -> FramedParts<T, U> {
105 FramedParts {
106 io: self.inner.inner,
107 codec: self.inner.codec,
108 read_buf: self.inner.state.read.buffer,
109 write_buf: self.inner.state.write.buffer,
110 _priv: (),
111 }
112 }
113}
114
115impl<T, U> Stream for Framed<T, U>
116where
117 T: AsyncRead,
118 U: Decoder,
119{
120 type Item = Result<U::Item, Error<U::Error>>;
121
122 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123 self.project().inner.poll_next(cx)
124 }
125}
126
127impl<T, U> Sink<U::Item> for Framed<T, U>
128where
129 T: AsyncWrite,
130 U: Encoder,
131{
132 type Error = Error<U::Error>;
133
134 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135 self.project().inner.poll_ready(cx)
136 }
137
138 fn start_send(self: Pin<&mut Self>, item: U::Item) -> Result<(), Self::Error> {
139 self.project().inner.start_send(item)
140 }
141
142 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143 self.project().inner.poll_flush(cx)
144 }
145
146 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147 self.project().inner.poll_close(cx)
148 }
149}
150
151impl<T, U> fmt::Debug for Framed<T, U>
152where
153 T: fmt::Debug,
154 U: fmt::Debug,
155{
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("Framed")
158 .field("io", self.get_ref())
159 .field("codec", self.codec())
160 .finish()
161 }
162}
163
164#[derive(Debug)]
165#[allow(clippy::manual_non_exhaustive)]
166pub struct FramedParts<T, U> {
167 pub io: T,
168 pub codec: U,
169 pub read_buf: BytesMut,
170 pub write_buf: BytesMut,
171 _priv: (),
172}
173
174impl<T, U> FramedParts<T, U> {
175 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
176 where
177 U: Encoder,
178 {
179 FramedParts {
180 io,
181 codec,
182 read_buf: BytesMut::new(),
183 write_buf: BytesMut::new(),
184 _priv: (),
185 }
186 }
187}