1#![deny(missing_docs)]
4
5extern crate async_codec;
6extern crate atm_io_utils;
7extern crate futures_core;
8extern crate futures_io;
9extern crate futures_executor;
10extern crate futures_util;
11
12#[cfg(test)]
13extern crate async_byteorder;
14#[cfg(test)]
15extern crate async_ringbuffer;
16#[cfg(test)]
17#[macro_use(quickcheck)]
18extern crate quickcheck;
19
20pub mod encoder;
21pub mod decoder;
22pub mod testing;
23
24use async_codec::{AsyncEncode, AsyncEncodeLen, AsyncDecode, DecodeError, PollEnc, PollDec};
25use futures_core::{Future, Poll};
26use futures_core::Async::{Ready, Pending};
27use futures_core::task::Context;
28use futures_io::{AsyncRead, AsyncWrite, Error as FutIoErr};
29
30pub fn encode<W, C>(writer: W, co: C) -> Encoder<W, C> {
32 Encoder::new(writer, co)
33}
34
35pub struct Encoder<W, C> {
37 writer: Option<W>,
38 enc: Option<C>,
39 written: usize,
40}
41
42impl<W, C> Encoder<W, C> {
43 pub fn new(writer: W, enc: C) -> Encoder<W, C> {
45 Encoder {
46 writer: Some(writer),
47 enc: Some(enc),
48 written: 0,
49 }
50 }
51}
52
53impl<W, C> Encoder<W, C>
54 where W: AsyncWrite,
55 C: AsyncEncodeLen
56{
57 pub fn remaining_bytes(&mut self) -> usize {
61 let enc = self.enc
62 .take()
63 .expect("Used encoder future after completion");
64 let remaining = enc.remaining_bytes();
65 self.enc = Some(enc);
66 remaining
67 }
68}
69
70impl<W, C> Future for Encoder<W, C>
71 where W: AsyncWrite,
72 C: AsyncEncode
73{
74 type Item = (W, usize);
75 type Error = (W, FutIoErr);
76
77 fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
78 let mut writer = self.writer
79 .take()
80 .expect("Polled encoder future after completion");
81 let enc = self.enc
82 .take()
83 .expect("Polled encoder future after completion");
84
85 match enc.poll_encode(cx, &mut writer) {
86 PollEnc::Done(written) => Ok(Ready((writer, self.written + written))),
87 PollEnc::Progress(enc, written) => {
88 self.written += written;
89 self.writer = Some(writer);
90 self.enc = Some(enc);
91 self.poll(cx)
92 }
93 PollEnc::Pending(enc) => {
94 self.writer = Some(writer);
95 self.enc = Some(enc);
96 Ok(Pending)
97 }
98 PollEnc::Errored(err) => Err((writer, err)),
99 }
100 }
101}
102
103pub fn decode<R, D>(reader: R, dec: D) -> Decoder<R, D> {
105 Decoder::new(reader, dec)
106}
107
108pub struct Decoder<R, D> {
110 reader: Option<R>,
111 dec: Option<D>,
112 read: usize,
113}
114
115impl<R, D> Decoder<R, D> {
116 pub fn new(reader: R, dec: D) -> Decoder<R, D> {
118 Decoder {
119 reader: Some(reader),
120 dec: Some(dec),
121 read: 0,
122 }
123 }
124}
125
126impl<R, D> Future for Decoder<R, D>
127 where R: AsyncRead,
128 D: AsyncDecode
129{
130 type Item = (R, D::Item, usize);
131 type Error = (R, DecodeError<D::Error>);
132
133 fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
134 let mut reader = self.reader
135 .take()
136 .expect("Polled decoder future after completion");
137 let dec = self.dec
138 .take()
139 .expect("Polled decoder future after completion");
140
141 match dec.poll_decode(cx, &mut reader) {
142 PollDec::Done(item, read) => Ok(Ready((reader, item, self.read + read))),
143 PollDec::Progress(dec, read) => {
144 self.read += read;
145 self.reader = Some(reader);
146 self.dec = Some(dec);
147 self.poll(cx)
148 }
149 PollDec::Pending(dec) => {
150 self.reader = Some(reader);
151 self.dec = Some(dec);
152 Ok(Pending)
153 }
154 PollDec::Errored(err) => Err((reader, err)),
155 }
156 }
157}