async_codec_util/decoder/
chain.rs

1use async_codec::{AsyncDecode, PollDec};
2use async_codec::PollDec::{Done, Progress, Pending, Errored};
3use futures_core::task::Context;
4use futures_io::AsyncRead;
5
6enum State<S, T>
7    where S: AsyncDecode
8{
9    First(S, T),
10    Second(T, S::Item),
11}
12
13/// Chain two decoders, running them in sequence.
14pub struct Chain<S, T>(State<S, T>) where S: AsyncDecode;
15
16impl<S, T> Chain<S, T>
17    where S: AsyncDecode
18{
19    /// Create new `Chain` which first decodes via the given `S` and then decodes via the given `T`.
20    pub fn new(first: S, second: T) -> Chain<S, T> {
21        Chain(State::First(first, second))
22    }
23}
24
25impl<S, T> AsyncDecode for Chain<S, T>
26    where S: AsyncDecode,
27          T: AsyncDecode<Error = S::Error>
28{
29    type Item = (S::Item, T::Item);
30    type Error = S::Error;
31
32    fn poll_decode<R: AsyncRead>(mut self,
33                                 cx: &mut Context,
34                                 reader: &mut R)
35                                 -> PollDec<Self::Item, Self, Self::Error> {
36        match self.0 {
37            State::First(first, second) => {
38                match first.poll_decode(cx, reader) {
39                    Done(item, read) => {
40                        self.0 = State::Second(second, item);
41                        Progress(self, read)
42                    }
43                    Progress(first, read) => {
44                        self.0 = State::First(first, second);
45                        Progress(self, read)
46                    }
47                    Pending(first) => {
48                        self.0 = State::First(first, second);
49                        Pending(self)
50                    }
51                    Errored(err) => Errored(err),
52                }
53            }
54
55            State::Second(second, first_item) => {
56                match second.poll_decode(cx, reader) {
57                    Done(item, read) => Done((first_item, item), read),
58                    Progress(second, read) => {
59                        self.0 = State::Second(second, first_item);
60                        Progress(self, read)
61                    }
62                    Pending(second) => {
63                        self.0 = State::Second(second, first_item);
64                        Pending(self)
65                    }
66                    Errored(err) => Errored(err),
67                }
68            }
69        }
70    }
71}