monoio_codec/
framed.rs

1// Part of the helper functions and tests are borrowed from tokio-util.
2
3use std::{
4    borrow::{Borrow, BorrowMut},
5    fmt,
6    future::Future,
7};
8
9use bytes::{Buf, BufMut, BytesMut};
10use monoio::{
11    buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut, SliceMut},
12    io::{sink::Sink, stream::Stream, AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt},
13    BufResult,
14};
15
16use crate::{Decoded, Decoder, Encoder};
17
18const INITIAL_CAPACITY: usize = 8 * 1024;
19const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
20const RESERVE: usize = 4096;
21
22pub struct FramedInner<IO, Codec, S> {
23    io: IO,
24    codec: Codec,
25    state: S,
26}
27
28#[derive(Debug)]
29pub struct ReadState {
30    state: State,
31    buffer: BytesMut,
32}
33
34impl ReadState {
35    #[inline]
36    fn with_capacity(capacity: usize) -> Self {
37        Self {
38            state: State::Framing(None),
39            buffer: BytesMut::with_capacity(capacity),
40        }
41    }
42}
43
44impl Default for ReadState {
45    #[inline]
46    fn default() -> Self {
47        Self::with_capacity(INITIAL_CAPACITY)
48    }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52enum State {
53    Framing(Option<usize>),
54    Pausing,
55    Paused,
56    Errored,
57}
58
59#[derive(Debug)]
60pub struct WriteState {
61    buffer: BytesMut,
62}
63
64impl Default for WriteState {
65    #[inline]
66    fn default() -> Self {
67        Self {
68            buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
69        }
70    }
71}
72
73#[derive(Debug, Default)]
74pub struct RWState {
75    read: ReadState,
76    write: WriteState,
77}
78
79impl Borrow<ReadState> for RWState {
80    #[inline]
81    fn borrow(&self) -> &ReadState {
82        &self.read
83    }
84}
85impl BorrowMut<ReadState> for RWState {
86    #[inline]
87    fn borrow_mut(&mut self) -> &mut ReadState {
88        &mut self.read
89    }
90}
91impl Borrow<WriteState> for RWState {
92    #[inline]
93    fn borrow(&self) -> &WriteState {
94        &self.write
95    }
96}
97impl BorrowMut<WriteState> for RWState {
98    #[inline]
99    fn borrow_mut(&mut self) -> &mut WriteState {
100        &mut self.write
101    }
102}
103
104impl<IO, Codec, S> FramedInner<IO, Codec, S> {
105    #[inline]
106    const fn new(io: IO, codec: Codec, state: S) -> Self {
107        Self { io, codec, state }
108    }
109
110    async fn peek_data<'a, 'b>(io: &'b mut IO, state: &'a mut S) -> std::io::Result<&'a mut [u8]>
111    where
112        IO: AsyncReadRent,
113        S: BorrowMut<ReadState>,
114    {
115        let read_state: &mut ReadState = state.borrow_mut();
116        let state = &mut read_state.state;
117        let buffer = &mut read_state.buffer;
118
119        if !buffer.is_empty() {
120            return Ok(buffer.as_mut());
121        }
122        buffer.reserve(RESERVE);
123
124        macro_rules! ok {
125            ($result: expr, $state: expr) => {
126                match $result {
127                    Ok(x) => x,
128                    Err(e) => {
129                        *$state = State::Errored;
130                        return Err(e);
131                    }
132                }
133            };
134        }
135
136        // Read data
137        let end = buffer.capacity();
138        let owned_buf = std::mem::take(buffer);
139        let owned_slice = unsafe { SliceMut::new_unchecked(owned_buf, 0, end) };
140        let (result, owned_slice) = io.read(owned_slice).await;
141        *buffer = owned_slice.into_inner();
142        let n = ok!(result, state);
143        if n == 0 {
144            *state = State::Paused;
145        }
146        Ok(buffer.as_mut())
147    }
148
149    // In tokio there are 5 states. But since we use pure async here,
150    // we do not need to return Pending so we don't need to save the state
151    // when Pending returned. We only need to save state when return
152    // `Option<Item>`.
153    // We have 4 states: Framing, Pausing, Paused and Errored.
154    async fn next_with(
155        io: &mut IO,
156        codec: &mut Codec,
157        state: &mut S,
158    ) -> Option<Result<Codec::Item, Codec::Error>>
159    where
160        IO: AsyncReadRent,
161        Codec: Decoder,
162        S: BorrowMut<ReadState>,
163    {
164        macro_rules! ok {
165            ($result: expr, $state: expr) => {
166                match $result {
167                    Ok(x) => x,
168                    Err(e) => {
169                        *$state = State::Errored;
170                        return Some(Err(e.into()));
171                    }
172                }
173            };
174        }
175
176        let read_state: &mut ReadState = state.borrow_mut();
177        let state = &mut read_state.state;
178        let buffer = &mut read_state.buffer;
179
180        loop {
181            match state {
182                // On framing, we will decode first. If the decoder needs more data,
183                // we will do read and await it.
184                // If we get an error or eof, we will transfer state.
185                State::Framing(hint) => loop {
186                    if !matches!(hint, Some(size) if buffer.len() < *size) && !buffer.is_empty() {
187                        // If we get a Some hint and the buffer length is less than it, we do not
188                        // decode. If the buffer is empty, we we do not decode.
189                        *hint = match ok!(codec.decode(buffer), state) {
190                            Decoded::Some(item) => {
191                                // When we decoded something, we should clear the hint.
192                                *hint = None;
193                                return Some(Ok(item));
194                            }
195                            Decoded::Insufficient => None,
196                            Decoded::InsufficientAtLeast(size) => Some(size),
197                        };
198                    }
199
200                    let reserve = match *hint {
201                        Some(size) if size > buffer.len() => RESERVE.max(size - buffer.len()),
202                        _ => RESERVE,
203                    };
204                    buffer.reserve(reserve);
205                    let (begin, end) = {
206                        let buffer_ptr = buffer.write_ptr();
207                        let slice_to_write = buffer.chunk_mut();
208                        let begin =
209                            unsafe { slice_to_write.as_mut_ptr().offset_from(buffer_ptr) } as usize;
210                        let end = begin + slice_to_write.len();
211                        (begin, end)
212                    };
213                    let owned_buf = std::mem::take(buffer);
214                    let owned_slice = unsafe { SliceMut::new_unchecked(owned_buf, begin, end) };
215                    let (result, owned_slice) = io.read(owned_slice).await;
216                    *buffer = owned_slice.into_inner();
217                    let n = ok!(result, state);
218                    if n == 0 {
219                        *state = State::Pausing;
220                        break;
221                    }
222                },
223                // On Pausing, we will loop decode_eof until None or Error.
224                State::Pausing => {
225                    return match ok!(codec.decode_eof(buffer), state) {
226                        Decoded::Some(item) => Some(Ok(item)),
227                        _ => {
228                            // Buffer has no data, we can transfer to Paused.
229                            *state = State::Paused;
230                            None
231                        }
232                    };
233                }
234                // On Paused, we need to read directly.
235                State::Paused => {
236                    buffer.reserve(RESERVE);
237                    let (begin, end) = {
238                        let buffer_ptr = buffer.write_ptr();
239                        let slice_to_write = buffer.chunk_mut();
240                        let begin =
241                            unsafe { slice_to_write.as_mut_ptr().offset_from(buffer_ptr) } as usize;
242                        let end = begin + slice_to_write.len();
243                        (begin, end)
244                    };
245                    let owned_buf = std::mem::take(buffer);
246                    let owned_slice = unsafe { SliceMut::new_unchecked(owned_buf, begin, end) };
247                    let (result, owned_slice) = io.read(owned_slice).await;
248                    *buffer = owned_slice.into_inner();
249                    let n = ok!(result, state);
250                    if n == 0 {
251                        // still paused
252                        return None;
253                    }
254                    // read something, then we move to framing state
255                    *state = State::Framing(None);
256                }
257                // On Errored, we need to return None and trans to Paused.
258                State::Errored => {
259                    *state = State::Paused;
260                    return None;
261                }
262            }
263        }
264    }
265
266    async fn flush(io: &mut IO, state: &mut S) -> std::io::Result<()>
267    where
268        IO: AsyncWriteRent,
269        S: BorrowMut<WriteState>,
270    {
271        let WriteState { buffer } = state.borrow_mut();
272        if buffer.is_empty() {
273            return Ok(());
274        }
275        // This action does not allocate.
276        let buf = std::mem::take(buffer);
277        let (result, buf) = io.write_all(buf).await;
278        *buffer = buf;
279        result?;
280        buffer.clear();
281        io.flush().await?;
282        Ok(())
283    }
284
285    #[inline]
286    async fn send_with<Item>(
287        io: &mut IO,
288        codec: &mut Codec,
289        state: &mut S,
290        item: Item,
291    ) -> Result<(), Codec::Error>
292    where
293        IO: AsyncWriteRent,
294        Codec: Encoder<Item>,
295        S: BorrowMut<WriteState>,
296    {
297        if state.borrow_mut().buffer.len() >= BACKPRESSURE_BOUNDARY {
298            Self::flush(io, state).await?;
299        }
300        codec.encode(item, &mut state.borrow_mut().buffer)?;
301        Ok(())
302    }
303}
304
305impl<IO, Codec, S> AsyncReadRent for FramedInner<IO, Codec, S>
306where
307    IO: AsyncReadRent,
308    S: BorrowMut<ReadState>,
309{
310    async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
311        let read_state: &mut ReadState = self.state.borrow_mut();
312        let state = &mut read_state.state;
313        let buffer = &mut read_state.buffer;
314
315        if buf.bytes_total() == 0 {
316            return (Ok(0), buf);
317        }
318
319        // Copy existing data if there is some.
320        let to_copy = buf.bytes_total().min(buffer.len());
321        if to_copy != 0 {
322            unsafe {
323                buf.write_ptr()
324                    .copy_from_nonoverlapping(buffer.as_ptr(), to_copy);
325                buf.set_init(to_copy);
326            }
327            buffer.advance(to_copy);
328            return (Ok(to_copy), buf);
329        }
330
331        // Read to buf directly if buf size is bigger than some threshold.
332        if buf.bytes_total() > INITIAL_CAPACITY {
333            let (res, buf) = self.io.read(buf).await;
334            return match res {
335                Ok(0) => {
336                    *state = State::Pausing;
337                    (Ok(0), buf)
338                }
339                Ok(n) => (Ok(n), buf),
340                Err(e) => {
341                    *state = State::Errored;
342                    (Err(e), buf)
343                }
344            };
345        }
346        // Read to inner buffer and copy to buf.
347        buffer.reserve(INITIAL_CAPACITY);
348        let owned_buffer = std::mem::take(buffer);
349        let (res, owned_buffer) = self.io.read(owned_buffer).await;
350        *buffer = owned_buffer;
351        match res {
352            Ok(0) => {
353                *state = State::Pausing;
354                return (Ok(0), buf);
355            }
356            Err(e) => {
357                *state = State::Errored;
358                return (Err(e), buf);
359            }
360            _ => (),
361        }
362        let to_copy = buf.bytes_total().min(buffer.len());
363        unsafe {
364            buf.write_ptr()
365                .copy_from_nonoverlapping(buffer.as_ptr(), to_copy);
366            buf.set_init(to_copy);
367        }
368        buffer.advance(to_copy);
369        (Ok(to_copy), buf)
370    }
371
372    async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
373        let slice = match IoVecWrapperMut::new(buf) {
374            Ok(slice) => slice,
375            Err(buf) => return (Ok(0), buf),
376        };
377
378        let (result, slice) = self.read(slice).await;
379        buf = slice.into_inner();
380        if let Ok(n) = result {
381            unsafe { buf.set_init(n) };
382        }
383        (result, buf)
384    }
385}
386
387impl<IO, Codec, S> Stream for FramedInner<IO, Codec, S>
388where
389    IO: AsyncReadRent,
390    Codec: Decoder,
391    S: BorrowMut<ReadState>,
392{
393    type Item = Result<Codec::Item, Codec::Error>;
394
395    #[inline]
396    async fn next(&mut self) -> Option<Self::Item> {
397        Self::next_with(&mut self.io, &mut self.codec, &mut self.state).await
398    }
399}
400
401impl<IO, Codec, S> AsyncWriteRent for FramedInner<IO, Codec, S>
402where
403    IO: AsyncWriteRent,
404    S: BorrowMut<WriteState>,
405{
406    async fn write<T: monoio::buf::IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
407        let WriteState { buffer } = self.state.borrow_mut();
408        if buffer.len() >= BACKPRESSURE_BOUNDARY || buf.bytes_init() >= INITIAL_CAPACITY {
409            // flush buffer
410            if let Err(e) = AsyncWriteRent::flush(self).await {
411                return (Err(e), buf);
412            }
413            // write directly
414            return self.io.write_all(buf).await;
415        }
416        // copy to buffer
417        let cap = buffer.capacity() - buffer.len();
418        let size = buf.bytes_init().min(cap);
419        let slice = unsafe { std::slice::from_raw_parts(buf.read_ptr(), size) };
420        buffer.extend_from_slice(slice);
421        (Ok(size), buf)
422    }
423
424    #[inline]
425    async fn writev<T: monoio::buf::IoVecBuf>(&mut self, buf: T) -> BufResult<usize, T> {
426        let slice = match monoio::buf::IoVecWrapper::new(buf) {
427            Ok(slice) => slice,
428            Err(buf) => return (Ok(0), buf),
429        };
430
431        let (result, slice) = self.write(slice).await;
432        (result, slice.into_inner())
433    }
434
435    #[inline]
436    async fn flush(&mut self) -> std::io::Result<()> {
437        FramedInner::<_, Codec, _>::flush(&mut self.io, &mut self.state).await
438    }
439
440    #[inline]
441    async fn shutdown(&mut self) -> std::io::Result<()> {
442        AsyncWriteRent::flush(self).await?;
443        self.io.shutdown().await?;
444        Ok(())
445    }
446}
447
448impl<IO, Codec, S, Item> Sink<Item> for FramedInner<IO, Codec, S>
449where
450    IO: AsyncWriteRent,
451    Codec: Encoder<Item>,
452    S: BorrowMut<WriteState>,
453{
454    type Error = Codec::Error;
455
456    #[inline]
457    async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
458        if self.state.borrow_mut().buffer.len() >= BACKPRESSURE_BOUNDARY {
459            FramedInner::<_, Codec, _>::flush(&mut self.io, &mut self.state).await?;
460        }
461        self.codec
462            .encode(item, &mut self.state.borrow_mut().buffer)?;
463        Ok(())
464    }
465
466    #[inline]
467    async fn flush(&mut self) -> Result<(), Self::Error> {
468        AsyncWriteRent::flush(self).await?;
469        Ok(())
470    }
471
472    #[inline]
473    async fn close(&mut self) -> Result<(), Self::Error> {
474        AsyncWriteRent::shutdown(self).await?;
475        Ok(())
476    }
477}
478
479pub struct Framed<IO, Codec> {
480    inner: FramedInner<IO, Codec, RWState>,
481}
482
483pub struct FramedRead<IO, Codec> {
484    inner: FramedInner<IO, Codec, ReadState>,
485}
486
487pub struct FramedWrite<IO, Codec> {
488    inner: FramedInner<IO, Codec, WriteState>,
489}
490
491impl<IO, Codec> Framed<IO, Codec> {
492    #[inline]
493    pub fn new(io: IO, codec: Codec) -> Self {
494        Self {
495            inner: FramedInner::new(io, codec, RWState::default()),
496        }
497    }
498
499    #[inline]
500    pub fn with_capacity(io: IO, codec: Codec, capacity: usize) -> Self {
501        Self {
502            inner: FramedInner::new(
503                io,
504                codec,
505                RWState {
506                    read: ReadState::with_capacity(capacity),
507                    write: Default::default(),
508                },
509            ),
510        }
511    }
512
513    /// Returns a reference to the underlying I/O stream wrapped by
514    /// `Framed`.
515    ///
516    /// Note that care should be taken to not tamper with the underlying stream
517    /// of data coming in as it may corrupt the stream of frames otherwise
518    /// being worked with.
519    #[inline]
520    pub fn get_ref(&self) -> &IO {
521        &self.inner.io
522    }
523
524    /// Returns a mutable reference to the underlying I/O stream wrapped by
525    /// `Framed`.
526    ///
527    /// Note that care should be taken to not tamper with the underlying stream
528    /// of data coming in as it may corrupt the stream of frames otherwise
529    /// being worked with.
530    #[inline]
531    pub fn get_mut(&mut self) -> &mut IO {
532        &mut self.inner.io
533    }
534
535    /// Returns a reference to the underlying codec wrapped by
536    /// `Framed`.
537    ///
538    /// Note that care should be taken to not tamper with the underlying codec
539    /// as it may corrupt the stream of frames otherwise being worked with.
540    #[inline]
541    pub fn codec(&self) -> &Codec {
542        &self.inner.codec
543    }
544
545    /// Returns a mutable reference to the underlying codec wrapped by
546    /// `Framed`.
547    ///
548    /// Note that care should be taken to not tamper with the underlying codec
549    /// as it may corrupt the stream of frames otherwise being worked with.
550    #[inline]
551    pub fn codec_mut(&mut self) -> &mut Codec {
552        &mut self.inner.codec
553    }
554
555    /// Maps the codec `U` to `C`, preserving the read and write buffers
556    /// wrapped by `Framed`.
557    ///
558    /// Note that care should be taken to not tamper with the underlying codec
559    /// as it may corrupt the stream of frames otherwise being worked with.
560    #[inline]
561    pub fn map_codec<CodecNew, F>(self, map: F) -> Framed<IO, CodecNew>
562    where
563        F: FnOnce(Codec) -> CodecNew,
564    {
565        let FramedInner { io, codec, state } = self.inner;
566        Framed {
567            inner: FramedInner {
568                io,
569                codec: map(codec),
570                state,
571            },
572        }
573    }
574
575    /// Returns a reference to the read buffer.
576    #[inline]
577    pub fn read_buffer(&self) -> &BytesMut {
578        &self.inner.state.read.buffer
579    }
580
581    /// Returns a mutable reference to the read buffer.
582    #[inline]
583    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
584        &mut self.inner.state.read.buffer
585    }
586
587    /// Returns io and a mutable reference to the read buffer.
588    #[inline]
589    pub fn read_state_mut(&mut self) -> (&mut IO, &mut BytesMut) {
590        (&mut self.inner.io, &mut self.inner.state.read.buffer)
591    }
592
593    /// Returns a reference to the write buffer.
594    #[inline]
595    pub fn write_buffer(&self) -> &BytesMut {
596        &self.inner.state.write.buffer
597    }
598
599    /// Returns a mutable reference to the write buffer.
600    #[inline]
601    pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
602        &mut self.inner.state.write.buffer
603    }
604
605    /// Consumes the `Framed`, returning its underlying I/O stream.
606    ///
607    /// Note that care should be taken to not tamper with the underlying stream
608    /// of data coming in as it may corrupt the stream of frames otherwise
609    /// being worked with.
610    #[inline]
611    pub fn into_inner(self) -> IO {
612        self.inner.io
613    }
614
615    /// Equivalent to Stream::next but with custom codec.
616    #[inline]
617    pub async fn next_with<C: Decoder>(
618        &mut self,
619        codec: &mut C,
620    ) -> Option<Result<C::Item, C::Error>>
621    where
622        IO: AsyncReadRent,
623    {
624        FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
625    }
626
627    /// Await some new data.
628    /// Useful to do read timeout.
629    pub fn peek_data(&mut self) -> impl Future<Output = std::io::Result<&mut [u8]>>
630    where
631        IO: AsyncReadRent,
632    {
633        FramedInner::<_, Codec, _>::peek_data(&mut self.inner.io, &mut self.inner.state)
634    }
635
636    /// Equivalent to Sink::send but with custom codec.
637    #[inline]
638    pub async fn send_with<C: Encoder<Item>, Item>(
639        &mut self,
640        codec: &mut C,
641        item: Item,
642    ) -> Result<(), C::Error>
643    where
644        IO: AsyncWriteRent,
645        C: Encoder<Item>,
646    {
647        FramedInner::send_with(&mut self.inner.io, codec, &mut self.inner.state, item).await
648    }
649}
650
651impl<T, U> fmt::Debug for Framed<T, U>
652where
653    T: fmt::Debug,
654    U: fmt::Debug,
655{
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        f.debug_struct("Framed")
658            .field("io", self.get_ref())
659            .field("codec", self.codec())
660            .finish()
661    }
662}
663
664impl<IO, Codec> FramedRead<IO, Codec> {
665    pub fn new(io: IO, decoder: Codec) -> Self {
666        Self {
667            inner: FramedInner::new(io, decoder, ReadState::default()),
668        }
669    }
670
671    pub fn with_capacity(io: IO, codec: Codec, capacity: usize) -> Self {
672        Self {
673            inner: FramedInner::new(io, codec, ReadState::with_capacity(capacity)),
674        }
675    }
676
677    /// Returns a reference to the underlying I/O stream wrapped by
678    /// `FramedRead`.
679    ///
680    /// Note that care should be taken to not tamper with the underlying stream
681    /// of data coming in as it may corrupt the stream of frames otherwise
682    /// being worked with.
683    pub fn get_ref(&self) -> &IO {
684        &self.inner.io
685    }
686
687    /// Returns a mutable reference to the underlying I/O stream wrapped by
688    /// `FramedRead`.
689    ///
690    /// Note that care should be taken to not tamper with the underlying stream
691    /// of data coming in as it may corrupt the stream of frames otherwise
692    /// being worked with.
693    pub fn get_mut(&mut self) -> &mut IO {
694        &mut self.inner.io
695    }
696
697    /// Consumes the `FramedRead`, returning its underlying I/O stream.
698    ///
699    /// Note that care should be taken to not tamper with the underlying stream
700    /// of data coming in as it may corrupt the stream of frames otherwise
701    /// being worked with.
702    pub fn into_inner(self) -> IO {
703        self.inner.io
704    }
705
706    /// Returns a reference to the underlying decoder.
707    pub fn decoder(&self) -> &Codec {
708        &self.inner.codec
709    }
710
711    /// Returns a mutable reference to the underlying decoder.
712    pub fn decoder_mut(&mut self) -> &mut Codec {
713        &mut self.inner.codec
714    }
715
716    /// Maps the decoder `D` to `C`, preserving the read buffer
717    /// wrapped by `Framed`.
718    pub fn map_decoder<CodecNew, F>(self, map: F) -> FramedRead<IO, CodecNew>
719    where
720        F: FnOnce(Codec) -> CodecNew,
721    {
722        let FramedInner { io, codec, state } = self.inner;
723        FramedRead {
724            inner: FramedInner {
725                io,
726                codec: map(codec),
727                state,
728            },
729        }
730    }
731
732    /// Returns a reference to the read buffer.
733    pub fn read_buffer(&self) -> &BytesMut {
734        &self.inner.state.buffer
735    }
736
737    /// Returns a mutable reference to the read buffer.
738    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
739        &mut self.inner.state.buffer
740    }
741
742    /// Returns io and a mutable reference to the read buffer.
743    pub fn read_state_mut(&mut self) -> (&mut IO, &mut BytesMut) {
744        (&mut self.inner.io, &mut self.inner.state.buffer)
745    }
746
747    /// Equivalent to Stream::next but with custom codec.
748    pub async fn next_with<C: Decoder>(
749        &mut self,
750        codec: &mut C,
751    ) -> Option<Result<C::Item, C::Error>>
752    where
753        IO: AsyncReadRent,
754    {
755        FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
756    }
757
758    /// Await some new data.
759    /// Useful to do read timeout.
760    pub fn peek_data(&mut self) -> impl Future<Output = std::io::Result<&mut [u8]>>
761    where
762        IO: AsyncReadRent,
763    {
764        FramedInner::<_, Codec, _>::peek_data(&mut self.inner.io, &mut self.inner.state)
765    }
766}
767
768impl<T, D> fmt::Debug for FramedRead<T, D>
769where
770    T: fmt::Debug,
771    D: fmt::Debug,
772{
773    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774        f.debug_struct("FramedRead")
775            .field("inner", &self.get_ref())
776            .field("decoder", &self.decoder())
777            .field("state", &self.inner.state.state)
778            .field("buffer", &self.read_buffer())
779            .finish()
780    }
781}
782
783impl<IO, Codec> FramedWrite<IO, Codec> {
784    pub fn new(io: IO, encoder: Codec) -> Self {
785        Self {
786            inner: FramedInner::new(io, encoder, WriteState::default()),
787        }
788    }
789
790    /// Returns a reference to the underlying I/O stream wrapped by
791    /// `FramedWrite`.
792    ///
793    /// Note that care should be taken to not tamper with the underlying stream
794    /// of data coming in as it may corrupt the stream of frames otherwise
795    /// being worked with.
796    pub fn get_ref(&self) -> &IO {
797        &self.inner.io
798    }
799
800    /// Returns a mutable reference to the underlying I/O stream wrapped by
801    /// `FramedWrite`.
802    ///
803    /// Note that care should be taken to not tamper with the underlying stream
804    /// of data coming in as it may corrupt the stream of frames otherwise
805    /// being worked with.
806    pub fn get_mut(&mut self) -> &mut IO {
807        &mut self.inner.io
808    }
809
810    /// Consumes the `FramedWrite`, returning its underlying I/O stream.
811    ///
812    /// Note that care should be taken to not tamper with the underlying stream
813    /// of data coming in as it may corrupt the stream of frames otherwise
814    /// being worked with.
815    pub fn into_inner(self) -> IO {
816        self.inner.io
817    }
818
819    /// Returns a reference to the underlying encoder.
820    pub fn encoder(&self) -> &Codec {
821        &self.inner.codec
822    }
823
824    /// Returns a mutable reference to the underlying encoder.
825    pub fn encoder_mut(&mut self) -> &mut Codec {
826        &mut self.inner.codec
827    }
828
829    /// Maps the encoder `E` to `C`, preserving the write buffer
830    /// wrapped by `Framed`.
831    pub fn map_encoder<CodecNew, F>(self, map: F) -> FramedWrite<IO, CodecNew>
832    where
833        F: FnOnce(Codec) -> CodecNew,
834    {
835        let FramedInner { io, codec, state } = self.inner;
836        FramedWrite {
837            inner: FramedInner {
838                io,
839                codec: map(codec),
840                state,
841            },
842        }
843    }
844
845    /// Returns a reference to the write buffer.
846    pub fn write_buffer(&self) -> &BytesMut {
847        &self.inner.state.buffer
848    }
849
850    /// Returns a mutable reference to the write buffer.
851    pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
852        &mut self.inner.state.buffer
853    }
854}
855
856impl<T, U> fmt::Debug for FramedWrite<T, U>
857where
858    T: fmt::Debug,
859    U: fmt::Debug,
860{
861    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
862        f.debug_struct("FramedWrite")
863            .field("inner", &self.get_ref())
864            .field("encoder", &self.encoder())
865            .field("buffer", &self.inner.state.buffer)
866            .finish()
867    }
868}
869
870impl<IO, Codec> Stream for Framed<IO, Codec>
871where
872    IO: AsyncReadRent,
873    Codec: Decoder,
874{
875    type Item = <FramedInner<IO, Codec, RWState> as Stream>::Item;
876
877    #[inline]
878    async fn next(&mut self) -> Option<Self::Item> {
879        self.inner.next().await
880    }
881}
882
883impl<IO, Codec> Stream for FramedRead<IO, Codec>
884where
885    IO: AsyncReadRent,
886    Codec: Decoder,
887{
888    type Item = <FramedInner<IO, Codec, ReadState> as Stream>::Item;
889
890    #[inline]
891    async fn next(&mut self) -> Option<Self::Item> {
892        self.inner.next().await
893    }
894}
895
896impl<IO, Codec, Item> Sink<Item> for Framed<IO, Codec>
897where
898    IO: AsyncWriteRent,
899    Codec: Encoder<Item>,
900{
901    type Error = <FramedInner<IO, Codec, RWState> as Sink<Item>>::Error;
902
903    #[inline]
904    async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
905        self.inner.send(item).await
906    }
907
908    #[inline]
909    async fn flush(&mut self) -> Result<(), Self::Error> {
910        Sink::flush(&mut self.inner).await
911    }
912
913    #[inline]
914    async fn close(&mut self) -> Result<(), Self::Error> {
915        self.inner.close().await
916    }
917}
918
919impl<IO, Codec, Item> Sink<Item> for FramedWrite<IO, Codec>
920where
921    IO: AsyncWriteRent,
922    Codec: Encoder<Item>,
923{
924    type Error = <FramedInner<IO, Codec, WriteState> as Sink<Item>>::Error;
925
926    #[inline]
927    async fn send(&mut self, item: Item) -> Result<(), Self::Error> {
928        self.inner.send(item).await
929    }
930
931    #[inline]
932    async fn flush(&mut self) -> Result<(), Self::Error> {
933        Sink::flush(&mut self.inner).await
934    }
935
936    #[inline]
937    async fn close(&mut self) -> Result<(), Self::Error> {
938        self.inner.close().await
939    }
940}
941
942pub trait StreamWithCodec<T> {
943    type Item;
944
945    fn next_with<'a>(&'a mut self, codec: &'a mut T) -> impl Future<Output = Option<Self::Item>>;
946}
947
948pub trait SinkWithCodec<T, Item>
949where
950    T: Encoder<Item>,
951{
952    fn send_with<'a>(
953        &'a mut self,
954        codec: &'a mut T,
955        item: Item,
956    ) -> impl Future<Output = Result<(), T::Error>>;
957
958    fn flush(&mut self) -> impl Future<Output = Result<(), T::Error>>;
959}
960
961impl<Codec: Decoder, IO: AsyncReadRent, AnyCodec> StreamWithCodec<Codec>
962    for FramedRead<IO, AnyCodec>
963{
964    type Item = Result<Codec::Item, Codec::Error>;
965
966    #[inline]
967    async fn next_with<'a>(&'a mut self, codec: &'a mut Codec) -> Option<Self::Item> {
968        FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
969    }
970}
971
972impl<Codec: Decoder, IO: AsyncReadRent, AnyCodec> StreamWithCodec<Codec> for Framed<IO, AnyCodec> {
973    type Item = Result<Codec::Item, Codec::Error>;
974
975    #[inline]
976    async fn next_with<'a>(&'a mut self, codec: &'a mut Codec) -> Option<Self::Item> {
977        FramedInner::next_with(&mut self.inner.io, codec, &mut self.inner.state).await
978    }
979}
980
981impl<Codec: Encoder<Item>, IO: AsyncWriteRent, AnyCodec, Item> SinkWithCodec<Codec, Item>
982    for FramedWrite<IO, AnyCodec>
983{
984    #[inline]
985    async fn send_with<'a>(
986        &'a mut self,
987        codec: &'a mut Codec,
988        item: Item,
989    ) -> Result<(), Codec::Error> {
990        FramedInner::send_with(&mut self.inner.io, codec, &mut self.inner.state, item).await
991    }
992
993    #[inline]
994    async fn flush(&mut self) -> Result<(), Codec::Error> {
995        FramedInner::<_, (), _>::flush(&mut self.inner.io, &mut self.inner.state)
996            .await
997            .map_err(|e| e.into())
998    }
999}
1000
1001impl<Codec: Encoder<Item>, IO: AsyncWriteRent, AnyCodec, Item> SinkWithCodec<Codec, Item>
1002    for Framed<IO, AnyCodec>
1003{
1004    #[inline]
1005    async fn send_with<'a>(
1006        &'a mut self,
1007        codec: &'a mut Codec,
1008        item: Item,
1009    ) -> Result<(), Codec::Error> {
1010        FramedInner::send_with(&mut self.inner.io, codec, &mut self.inner.state, item).await
1011    }
1012
1013    #[inline]
1014    async fn flush(&mut self) -> Result<(), Codec::Error> {
1015        FramedInner::<_, (), _>::flush(&mut self.inner.io, &mut self.inner.state)
1016            .await
1017            .map_err(|e| e.into())
1018    }
1019}
1020
1021impl<IO: AsyncReadRent, Codec> AsyncReadRent for Framed<IO, Codec> {
1022    #[inline]
1023    async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1024        self.inner.read(buf).await
1025    }
1026
1027    #[inline]
1028    async fn readv<T: IoVecBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1029        self.inner.readv(buf).await
1030    }
1031}
1032
1033impl<IO: AsyncReadRent, Codec> AsyncReadRent for FramedRead<IO, Codec> {
1034    #[inline]
1035    async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1036        self.inner.read(buf).await
1037    }
1038
1039    #[inline]
1040    async fn readv<T: IoVecBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
1041        self.inner.readv(buf).await
1042    }
1043}
1044
1045impl<IO: AsyncWriteRent, Codec> AsyncWriteRent for Framed<IO, Codec> {
1046    #[inline]
1047    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
1048        self.inner.write(buf).await
1049    }
1050
1051    #[inline]
1052    async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> {
1053        self.inner.writev(buf_vec).await
1054    }
1055
1056    #[inline]
1057    async fn flush(&mut self) -> std::io::Result<()> {
1058        self.inner.flush().await
1059    }
1060
1061    #[inline]
1062    async fn shutdown(&mut self) -> std::io::Result<()> {
1063        self.inner.shutdown().await
1064    }
1065}
1066
1067impl<IO: AsyncWriteRent, Codec> AsyncWriteRent for FramedWrite<IO, Codec> {
1068    #[inline]
1069    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
1070        self.inner.write(buf).await
1071    }
1072
1073    #[inline]
1074    async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> {
1075        self.inner.writev(buf_vec).await
1076    }
1077
1078    #[inline]
1079    async fn flush(&mut self) -> std::io::Result<()> {
1080        self.inner.flush().await
1081    }
1082
1083    #[inline]
1084    async fn shutdown(&mut self) -> std::io::Result<()> {
1085        self.inner.shutdown().await
1086    }
1087}