1use std::{
2 pin::Pin,
3 task::{Context, Poll, ready},
4};
5
6use compio_buf::BufResult;
7use futures_util::Stream;
8
9use super::*;
10use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer, framed::frame::Framer};
11
12type ReadResult = BufResult<usize, Buffer>;
13
14pub struct State<Io> {
15 inner: StateInner<Io>,
16 eof: bool,
17}
18
19impl<Io> State<Io> {
20 pub fn new(io: Io, buf: Buffer) -> Self {
21 State {
22 inner: StateInner::Idle(Some((io, buf))),
23 eof: false,
24 }
25 }
26
27 pub fn empty() -> Self {
28 State {
29 inner: StateInner::Idle(None),
30 eof: false,
31 }
32 }
33}
34
35enum StateInner<Io> {
36 Idle(Option<(Io, Buffer)>),
37 Reading(PinBoxFuture<(Io, ReadResult)>),
38}
39
40impl<R, W, C, F, In, Out> Stream for Framed<R, W, C, F, In, Out>
41where
42 R: AsyncRead + 'static,
43 C: Decoder<Out>,
44 F: Framer,
45 Self: Unpin,
46{
47 type Item = Result<Out, C::Error>;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let this = self.get_mut();
51
52 loop {
53 match &mut this.read_state.inner {
54 StateInner::Idle(idle) => {
55 let (mut io, mut buf) = idle.take().expect("Inconsistent state");
56 let slice = buf.slice();
57
58 if let Some(frame) = this.framer.extract(slice) {
60 let decoded = this.codec.decode(frame.payload(slice))?;
61 buf.advance(frame.len());
62
63 if buf.all_done() {
64 buf.reset();
65 }
66
67 this.read_state.inner = StateInner::Idle(Some((io, buf)));
68
69 return Poll::Ready(Some(Ok(decoded)));
70 }
71
72 buf.reserve(16);
73
74 let fut = Box::pin(async move {
75 let res = buf.with(|buf| io.append(buf)).await;
76 (io, BufResult(res, buf))
77 });
78
79 this.read_state.inner = StateInner::Reading(fut)
80 }
81 StateInner::Reading(fut) => {
82 let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
83 this.read_state.inner = StateInner::Idle(Some((io, buf)));
84 if res? == 0 {
85 if this.read_state.eof {
87 return Poll::Ready(None);
88 }
89
90 this.read_state.eof = true;
91 }
92 }
93 };
94 }
95 }
96}