1use std::{
2 pin::Pin,
3 task::{Context, Poll, ready},
4};
5
6use compio_buf::{BufResult, IntoInner, IoBufMut};
7use futures_util::Stream;
8
9use super::*;
10use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer, framed::frame::Framer};
11
12type ReadResult<B> = BufResult<usize, Buffer<B>>;
13
14pub struct State<Io, B> {
15 inner: StateInner<Io, B>,
16 eof: bool,
17}
18
19impl<Io> State<Io, Vec<u8>> {
20 pub fn empty() -> Self {
21 State {
22 inner: StateInner::Configuring(None, Some(Buffer::new())),
23 eof: false,
24 }
25 }
26}
27
28impl<Io, B> State<Io, B> {
29 pub fn with_io<I>(self, io: I) -> State<I, B> {
30 let StateInner::Configuring(_, b) = self.inner else {
31 panic_config_polled()
32 };
33 State {
34 inner: StateInner::Configuring(Some(io), b),
35 eof: false,
36 }
37 }
38
39 pub fn with_buf<Buf: IoBufMut>(self, buf: Buf) -> State<Io, Buf> {
40 let StateInner::Configuring(io, _) = self.inner else {
41 panic_config_polled()
42 };
43 State {
44 inner: StateInner::Configuring(io, Some(Buffer::new_with(buf))),
45 eof: false,
46 }
47 }
48}
49
50enum StateInner<Io, B> {
51 Configuring(Option<Io>, Option<Buffer<B>>),
52 Idle(Option<(Io, Buffer<B>)>),
53 Reading(PinBoxFuture<(Io, ReadResult<B>)>),
54}
55
56impl<R, W, C, F, In, Out, B> Stream for Framed<R, W, C, F, In, Out, B>
57where
58 R: AsyncRead + 'static,
59 C: Decoder<Out, B>,
60 F: Framer<B>,
61 B: IoBufMut,
62 Self: Unpin,
63{
64 type Item = Result<Out, C::Error>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 let this = self.get_mut();
68
69 loop {
70 match &mut this.read_state.inner {
71 StateInner::Configuring(io, buf) => {
72 let io = io.take().expect("Inconsistent state");
73 let buf = buf.take().expect("Inconsistent state");
74 this.read_state.inner = StateInner::Idle(Some((io, buf)));
75 }
76 StateInner::Idle(idle) => {
77 let (mut io, mut buf) = idle.take().expect("Inconsistent state");
78
79 let inner = buf.inner();
81 if let Some(frame) = this.framer.extract(inner)? {
82 let (begin, end) = (inner.begin(), inner.end());
83 let slice = frame.slice(buf.take_inner()).flatten(); let decoded = this.codec.decode(&slice);
85 let inner = slice.into_inner();
86 if let Some(end) = end {
87 buf.restore_inner(inner.slice(begin..end));
88 } else {
89 buf.restore_inner(inner.slice(begin..));
90 }
91
92 if buf.advance(frame.len()) {
93 buf.reset();
94 }
95
96 this.read_state.inner = StateInner::Idle(Some((io, buf)));
97
98 return Poll::Ready(Some(decoded));
99 }
100
101 buf.reserve(16);
102
103 let fut = Box::pin(async move {
104 let res = buf.with(|buf| io.append(buf)).await;
105 (io, BufResult(res, buf))
106 });
107
108 this.read_state.inner = StateInner::Reading(fut)
109 }
110 StateInner::Reading(fut) => {
111 let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
112 this.read_state.inner = StateInner::Idle(Some((io, buf)));
113 if res? == 0 {
114 if this.read_state.eof {
116 return Poll::Ready(None);
117 }
118
119 this.read_state.eof = true;
120 }
121 }
122 };
123 }
124 }
125}