Skip to main content

compio_io/framed/
read.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll, ready},
4};
5
6use compio_buf::{BufResult, IntoInner, IoBufMut};
7use futures_util::Stream;
8
9use super::*;
10use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer, framed::frame::Framer};
11
12type ReadResult<B> = BufResult<usize, Buffer<B>>;
13
14pub struct State<Io, B> {
15    inner: StateInner<Io, B>,
16    eof: bool,
17}
18
19impl<Io> State<Io, Vec<u8>> {
20    pub fn empty() -> Self {
21        State {
22            inner: StateInner::Configuring(None, Some(Buffer::new())),
23            eof: false,
24        }
25    }
26}
27
28impl<Io, B> State<Io, B> {
29    pub fn with_io<I>(self, io: I) -> State<I, B> {
30        let StateInner::Configuring(_, b) = self.inner else {
31            panic_config_polled()
32        };
33        State {
34            inner: StateInner::Configuring(Some(io), b),
35            eof: false,
36        }
37    }
38
39    pub fn with_buf<Buf: IoBufMut>(self, buf: Buf) -> State<Io, Buf> {
40        let StateInner::Configuring(io, _) = self.inner else {
41            panic_config_polled()
42        };
43        State {
44            inner: StateInner::Configuring(io, Some(Buffer::new_with(buf))),
45            eof: false,
46        }
47    }
48}
49
50enum StateInner<Io, B> {
51    Configuring(Option<Io>, Option<Buffer<B>>),
52    Idle(Option<(Io, Buffer<B>)>),
53    Reading(PinBoxFuture<(Io, ReadResult<B>)>),
54}
55
56impl<R, W, C, F, In, Out, B> Stream for Framed<R, W, C, F, In, Out, B>
57where
58    R: AsyncRead + 'static,
59    C: Decoder<Out, B>,
60    F: Framer<B>,
61    B: IoBufMut,
62    Self: Unpin,
63{
64    type Item = Result<Out, C::Error>;
65
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        let this = self.get_mut();
68
69        loop {
70            match &mut this.read_state.inner {
71                StateInner::Configuring(io, buf) => {
72                    let io = io.take().expect("Inconsistent state");
73                    let buf = buf.take().expect("Inconsistent state");
74                    this.read_state.inner = StateInner::Idle(Some((io, buf)));
75                }
76                StateInner::Idle(idle) => {
77                    let (mut io, mut buf) = idle.take().expect("Inconsistent state");
78
79                    // First try decode from the buffer
80                    let inner = buf.inner();
81                    if let Some(frame) = this.framer.extract(inner)? {
82                        let (begin, end) = (inner.begin(), inner.end());
83                        let slice = frame.slice(buf.take_inner()).flatten(); // focus on only the payload
84                        let decoded = this.codec.decode(&slice);
85                        let inner = slice.into_inner();
86                        if let Some(end) = end {
87                            buf.restore_inner(inner.slice(begin..end));
88                        } else {
89                            buf.restore_inner(inner.slice(begin..));
90                        }
91
92                        if buf.advance(frame.len()) {
93                            buf.reset();
94                        }
95
96                        this.read_state.inner = StateInner::Idle(Some((io, buf)));
97
98                        return Poll::Ready(Some(decoded));
99                    }
100
101                    buf.reserve(16);
102
103                    let fut = Box::pin(async move {
104                        let res = buf.with(|buf| io.append(buf)).await;
105                        (io, BufResult(res, buf))
106                    });
107
108                    this.read_state.inner = StateInner::Reading(fut)
109                }
110                StateInner::Reading(fut) => {
111                    let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
112                    this.read_state.inner = StateInner::Idle(Some((io, buf)));
113                    if res? == 0 {
114                        // It's the second time EOF is reached, return None
115                        if this.read_state.eof {
116                            return Poll::Ready(None);
117                        }
118
119                        this.read_state.eof = true;
120                    }
121                }
122            };
123        }
124    }
125}