async_codec_util/decoder/
and_then.rs1use async_codec::{AsyncDecode, PollDec};
2use async_codec::PollDec::{Done, Progress, Pending, Errored};
3use futures_core::task::Context;
4use futures_io::AsyncRead;
5
6enum State<S, T, F> {
7 First(S, F),
8 Second(T),
9}
10
11pub struct AndThen<S, T, F>(State<S, T, F>);
13
14impl<S, T, F> AndThen<S, T, F> {
15 pub fn new(first: S, f: F) -> AndThen<S, T, F> {
17 AndThen(State::First(first, f))
18 }
19}
20
21impl<S, T, F> AsyncDecode for AndThen<S, T, F>
22 where S: AsyncDecode,
23 T: AsyncDecode<Error = S::Error>,
24 F: FnOnce(S::Item) -> T
25{
26 type Item = T::Item;
27 type Error = T::Error;
28
29 fn poll_decode<R: AsyncRead>(mut self,
30 cx: &mut Context,
31 reader: &mut R)
32 -> PollDec<Self::Item, Self, Self::Error> {
33 match self.0 {
34 State::First(first, f) => {
35 match first.poll_decode(cx, reader) {
36 Done(item, read) => {
37 self.0 = State::Second(f(item));
38 Progress(self, read)
39 }
40 Progress(first, read) => {
41 self.0 = State::First(first, f);
42 Progress(self, read)
43 }
44 Pending(first) => {
45 self.0 = State::First(first, f);
46 Pending(self)
47 }
48 Errored(err) => Errored(err),
49 }
50 }
51 State::Second(second) => {
52 match second.poll_decode(cx, reader) {
53 Done(item, read) => Done(item, read),
54 Progress(second, read) => {
55 self.0 = State::Second(second);
56 Progress(self, read)
57 }
58 Pending(second) => {
59 self.0 = State::Second(second);
60 Pending(self)
61 }
62 Errored(err) => Errored(err),
63 }
64 }
65 }
66 }
67}