asynchronous_codec/
framed.rs1use super::framed_read::{framed_read_2, FramedRead2};
2use super::framed_write::{framed_write_2, FramedWrite2};
3use super::fuse::Fuse;
4use super::{Decoder, Encoder};
5use bytes::BytesMut;
6use futures_sink::Sink;
7use futures_util::io::{AsyncRead, AsyncWrite};
8use futures_util::stream::{Stream, TryStreamExt};
9use pin_project_lite::pin_project;
10use std::marker::Unpin;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16 #[derive(Debug)]
41 pub struct Framed<T, U> {
42 #[pin]
43 inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
44 }
45}
46
47impl<T, U> Deref for Framed<T, U> {
48 type Target = T;
49
50 fn deref(&self) -> &T {
51 &self.inner
52 }
53}
54
55impl<T, U> DerefMut for Framed<T, U> {
56 fn deref_mut(&mut self) -> &mut T {
57 &mut self.inner
58 }
59}
60
61impl<T, U> Framed<T, U>
62where
63 T: AsyncRead + AsyncWrite,
64 U: Decoder + Encoder,
65{
66 pub fn new(inner: T, codec: U) -> Self {
69 Self {
70 inner: framed_read_2(framed_write_2(Fuse::new(inner, codec), None), None),
71 }
72 }
73
74 pub fn from_parts(
78 FramedParts {
79 io,
80 codec,
81 write_buffer,
82 read_buffer,
83 ..
84 }: FramedParts<T, U>,
85 ) -> Self {
86 let framed_write = framed_write_2(Fuse::new(io, codec), Some(write_buffer));
87 let framed_read = framed_read_2(framed_write, Some(read_buffer));
88 Self { inner: framed_read }
89 }
90
91 pub fn into_parts(self) -> FramedParts<T, U> {
96 let (framed_write, read_buffer) = self.inner.into_parts();
97 let (fuse, write_buffer) = framed_write.into_parts();
98 FramedParts {
99 io: fuse.t,
100 codec: fuse.u,
101 read_buffer,
102 write_buffer,
103 _priv: (),
104 }
105 }
106
107 pub fn into_inner(self) -> T {
113 self.into_parts().io
114 }
115
116 pub fn codec(&self) -> &U {
122 &self.inner.u
123 }
124
125 pub fn codec_mut(&mut self) -> &mut U {
131 &mut self.inner.u
132 }
133
134 pub fn read_buffer(&self) -> &BytesMut {
136 self.inner.buffer()
137 }
138
139 pub fn send_high_water_mark(&self) -> usize {
143 self.inner.high_water_mark
144 }
145
146 pub fn set_send_high_water_mark(&mut self, hwm: usize) {
150 self.inner.high_water_mark = hwm;
151 }
152}
153
154impl<T, U> Stream for Framed<T, U>
155where
156 T: AsyncRead + Unpin,
157 U: Decoder,
158{
159 type Item = Result<U::Item, U::Error>;
160
161 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
162 self.inner.try_poll_next_unpin(cx)
163 }
164}
165
166impl<T, U> Sink<U::Item<'_>> for Framed<T, U>
167where
168 T: AsyncWrite + Unpin,
169 U: Encoder,
170{
171 type Error = U::Error;
172
173 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
174 self.project().inner.poll_ready(cx)
175 }
176 fn start_send(self: Pin<&mut Self>, item: U::Item<'_>) -> Result<(), Self::Error> {
177 self.project().inner.start_send(item)
178 }
179 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
180 self.project().inner.poll_flush(cx)
181 }
182 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
183 self.project().inner.poll_close(cx)
184 }
185}
186
187pub struct FramedParts<T, U> {
189 pub io: T,
191 pub codec: U,
193 pub read_buffer: BytesMut,
196 pub write_buffer: BytesMut,
199 _priv: (),
201}
202
203impl<T, U> FramedParts<T, U> {
204 pub fn map_codec<V, F>(self, f: F) -> FramedParts<T, V>
206 where
207 F: FnOnce(U) -> V,
208 {
209 FramedParts {
210 io: self.io,
211 codec: f(self.codec),
212 read_buffer: self.read_buffer,
213 write_buffer: self.write_buffer,
214 _priv: (),
215 }
216 }
217}