async_codec_util/decoder/
chain.rs1use async_codec::{AsyncDecode, PollDec};
2use async_codec::PollDec::{Done, Progress, Pending, Errored};
3use futures_core::task::Context;
4use futures_io::AsyncRead;
5
6enum State<S, T>
7 where S: AsyncDecode
8{
9 First(S, T),
10 Second(T, S::Item),
11}
12
13pub struct Chain<S, T>(State<S, T>) where S: AsyncDecode;
15
16impl<S, T> Chain<S, T>
17 where S: AsyncDecode
18{
19 pub fn new(first: S, second: T) -> Chain<S, T> {
21 Chain(State::First(first, second))
22 }
23}
24
25impl<S, T> AsyncDecode for Chain<S, T>
26 where S: AsyncDecode,
27 T: AsyncDecode<Error = S::Error>
28{
29 type Item = (S::Item, T::Item);
30 type Error = S::Error;
31
32 fn poll_decode<R: AsyncRead>(mut self,
33 cx: &mut Context,
34 reader: &mut R)
35 -> PollDec<Self::Item, Self, Self::Error> {
36 match self.0 {
37 State::First(first, second) => {
38 match first.poll_decode(cx, reader) {
39 Done(item, read) => {
40 self.0 = State::Second(second, item);
41 Progress(self, read)
42 }
43 Progress(first, read) => {
44 self.0 = State::First(first, second);
45 Progress(self, read)
46 }
47 Pending(first) => {
48 self.0 = State::First(first, second);
49 Pending(self)
50 }
51 Errored(err) => Errored(err),
52 }
53 }
54
55 State::Second(second, first_item) => {
56 match second.poll_decode(cx, reader) {
57 Done(item, read) => Done((first_item, item), read),
58 Progress(second, read) => {
59 self.0 = State::Second(second, first_item);
60 Progress(self, read)
61 }
62 Pending(second) => {
63 self.0 = State::Second(second, first_item);
64 Pending(self)
65 }
66 Errored(err) => Errored(err),
67 }
68 }
69 }
70 }
71}