async_codec_util/
lib.rs

1//! Utilities for working with the traits from the
2//! [async-codec](https://crates.io/crates/async-codec) crate.
3#![deny(missing_docs)]
4
5extern crate async_codec;
6extern crate atm_io_utils;
7extern crate futures_core;
8extern crate futures_io;
9extern crate futures_executor;
10extern crate futures_util;
11
12#[cfg(test)]
13extern crate async_byteorder;
14#[cfg(test)]
15extern crate async_ringbuffer;
16#[cfg(test)]
17#[macro_use(quickcheck)]
18extern crate quickcheck;
19
20pub mod encoder;
21pub mod decoder;
22pub mod testing;
23
24use async_codec::{AsyncEncode, AsyncEncodeLen, AsyncDecode, DecodeError, PollEnc, PollDec};
25use futures_core::{Future, Poll};
26use futures_core::Async::{Ready, Pending};
27use futures_core::task::Context;
28use futures_io::{AsyncRead, AsyncWrite, Error as FutIoErr};
29
30/// Encode a value into an `AsyncWrite`, using an `AsyncEncode`.
31pub fn encode<W, C>(writer: W, co: C) -> Encoder<W, C> {
32    Encoder::new(writer, co)
33}
34
35/// Future for fully encoding an `AsyncEncode` into an `AsyncWrite`.
36pub struct Encoder<W, C> {
37    writer: Option<W>,
38    enc: Option<C>,
39    written: usize,
40}
41
42impl<W, C> Encoder<W, C> {
43    /// Create a new `Encoder` wrapping an `AsyncWrite` and consuming an `AsyncEncode`.
44    pub fn new(writer: W, enc: C) -> Encoder<W, C> {
45        Encoder {
46            writer: Some(writer),
47            enc: Some(enc),
48            written: 0,
49        }
50    }
51}
52
53impl<W, C> Encoder<W, C>
54    where W: AsyncWrite,
55          C: AsyncEncodeLen
56{
57    /// Return the exact number of bytes this will still write.
58    ///
59    /// Panics if called after the future completed.
60    pub fn remaining_bytes(&mut self) -> usize {
61        let enc = self.enc
62            .take()
63            .expect("Used encoder future after completion");
64        let remaining = enc.remaining_bytes();
65        self.enc = Some(enc);
66        remaining
67    }
68}
69
70impl<W, C> Future for Encoder<W, C>
71    where W: AsyncWrite,
72          C: AsyncEncode
73{
74    type Item = (W, usize);
75    type Error = (W, FutIoErr);
76
77    fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
78        let mut writer = self.writer
79            .take()
80            .expect("Polled encoder future after completion");
81        let enc = self.enc
82            .take()
83            .expect("Polled encoder future after completion");
84
85        match enc.poll_encode(cx, &mut writer) {
86            PollEnc::Done(written) => Ok(Ready((writer, self.written + written))),
87            PollEnc::Progress(enc, written) => {
88                self.written += written;
89                self.writer = Some(writer);
90                self.enc = Some(enc);
91                self.poll(cx)
92            }
93            PollEnc::Pending(enc) => {
94                self.writer = Some(writer);
95                self.enc = Some(enc);
96                Ok(Pending)
97            }
98            PollEnc::Errored(err) => Err((writer, err)),
99        }
100    }
101}
102
103/// Decode a value from an `AsyncRead`, using an `AsyncDecode`.
104pub fn decode<R, D>(reader: R, dec: D) -> Decoder<R, D> {
105    Decoder::new(reader, dec)
106}
107
108/// Future for fully decoding an `AsyncDecode` from an `AsyncRead`.
109pub struct Decoder<R, D> {
110    reader: Option<R>,
111    dec: Option<D>,
112    read: usize,
113}
114
115impl<R, D> Decoder<R, D> {
116    /// Create a new `Decoder` wrapping an `AsyncRead` and consuming an `AsyncDecode`.
117    pub fn new(reader: R, dec: D) -> Decoder<R, D> {
118        Decoder {
119            reader: Some(reader),
120            dec: Some(dec),
121            read: 0,
122        }
123    }
124}
125
126impl<R, D> Future for Decoder<R, D>
127    where R: AsyncRead,
128          D: AsyncDecode
129{
130    type Item = (R, D::Item, usize);
131    type Error = (R, DecodeError<D::Error>);
132
133    fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
134        let mut reader = self.reader
135            .take()
136            .expect("Polled decoder future after completion");
137        let dec = self.dec
138            .take()
139            .expect("Polled decoder future after completion");
140
141        match dec.poll_decode(cx, &mut reader) {
142            PollDec::Done(item, read) => Ok(Ready((reader, item, self.read + read))),
143            PollDec::Progress(dec, read) => {
144                self.read += read;
145                self.reader = Some(reader);
146                self.dec = Some(dec);
147                self.poll(cx)
148            }
149            PollDec::Pending(dec) => {
150                self.reader = Some(reader);
151                self.dec = Some(dec);
152                Ok(Pending)
153            }
154            PollDec::Errored(err) => Err((reader, err)),
155        }
156    }
157}