compio_io/framed/
read.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll, ready},
4};
5
6use compio_buf::BufResult;
7use futures_util::Stream;
8
9use super::*;
10use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer, framed::frame::Framer};
11
12type ReadResult = BufResult<usize, Buffer>;
13
14pub struct State<Io> {
15    inner: StateInner<Io>,
16    eof: bool,
17}
18
19impl<Io> State<Io> {
20    pub fn new(io: Io, buf: Buffer) -> Self {
21        State {
22            inner: StateInner::Idle(Some((io, buf))),
23            eof: false,
24        }
25    }
26
27    pub fn empty() -> Self {
28        State {
29            inner: StateInner::Idle(None),
30            eof: false,
31        }
32    }
33}
34
35enum StateInner<Io> {
36    Idle(Option<(Io, Buffer)>),
37    Reading(PinBoxFuture<(Io, ReadResult)>),
38}
39
40impl<R, W, C, F, In, Out> Stream for Framed<R, W, C, F, In, Out>
41where
42    R: AsyncRead + 'static,
43    C: Decoder<Out>,
44    F: Framer,
45    Self: Unpin,
46{
47    type Item = Result<Out, C::Error>;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let this = self.get_mut();
51
52        loop {
53            match &mut this.read_state.inner {
54                StateInner::Idle(idle) => {
55                    let (mut io, mut buf) = idle.take().expect("Inconsistent state");
56                    let slice = buf.slice();
57
58                    // First try decode from the buffer
59                    if let Some(frame) = this.framer.extract(slice) {
60                        let decoded = this.codec.decode(frame.payload(slice))?;
61                        buf.advance(frame.len());
62
63                        if buf.all_done() {
64                            buf.reset();
65                        }
66
67                        this.read_state.inner = StateInner::Idle(Some((io, buf)));
68
69                        return Poll::Ready(Some(Ok(decoded)));
70                    }
71
72                    buf.reserve(16);
73
74                    let fut = Box::pin(async move {
75                        let res = buf.with(|buf| io.append(buf)).await;
76                        (io, BufResult(res, buf))
77                    });
78
79                    this.read_state.inner = StateInner::Reading(fut)
80                }
81                StateInner::Reading(fut) => {
82                    let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
83                    this.read_state.inner = StateInner::Idle(Some((io, buf)));
84                    if res? == 0 {
85                        // It's the second time EOF is reached, return None
86                        if this.read_state.eof {
87                            return Poll::Ready(None);
88                        }
89
90                        this.read_state.eof = true;
91                    }
92                }
93            };
94        }
95    }
96}