pbcodec 0.0.8

An asynchronous encoder/decoder for Protocol Buffers
Documentation
use std::io::{self, Read, Take};
use futures::{Async, Future, Poll};

use {Decode, Error, ErrorKind};
use future::read::{ReadByte, ReadBytes};
use future::util::{Phase2, Phase4, UnwrapTake};
use wire::WireType;
use wire::types::{Bit32, Bit64, LengthDelimited, Varint};
use super::convert::DecodeInto;

impl<R: Read> Decode<R> for Bit32 {
    type Future = DecodeInto<R, [u8; 4], Self>;
    fn decode(reader: R) -> Self::Future {
        DecodeInto::new(reader)
    }
}

impl<R: Read> Decode<R> for Bit64 {
    type Future = DecodeInto<R, [u8; 8], Self>;
    fn decode(reader: R) -> Self::Future {
        DecodeInto::new(reader)
    }
}

#[derive(Debug)]
pub struct DecodeVarint<R> {
    value: u64,
    bits: usize,
    future: ReadByte<R>,
}
impl<R> DecodeVarint<R> {
    fn new(reader: R) -> Self {
        DecodeVarint {
            value: 0,
            bits: 0,
            future: ReadByte::new(reader),
        }
    }
}
impl<R: Read> Future for DecodeVarint<R> {
    type Item = (R, Varint);
    type Error = Error<R>;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while let Async::Ready((r, b)) = track!(self.future.poll())? {
            self.value += u64::from(b & 0b0111_1111) << self.bits;
            self.bits += 7;
            let is_last = (b >> 7) == 0;
            if is_last {
                return Ok(Async::Ready((r, Varint(self.value))));
            } else if self.bits > 64 {
                failed!(r, ErrorKind::Invalid, "Too large Varint");
            } else {
                self.future = ReadByte::new(r)
            }
        }
        Ok(Async::NotReady)
    }
}
impl<R: Read> Decode<R> for Varint {
    type Future = DecodeVarint<R>;
    fn decode(reader: R) -> Self::Future {
        DecodeVarint::new(reader)
    }
}

#[derive(Debug)]
pub struct DecodeMaybeVarint<R>(DecodeVarint<R>);
impl<R> DecodeMaybeVarint<R> {
    pub fn new(reader: R) -> Self {
        DecodeMaybeVarint(DecodeVarint::new(reader))
    }
}
impl<R: Read> Future for DecodeMaybeVarint<R> {
    type Item = (R, Option<Varint>);
    type Error = Error<R>;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.0.poll() {
            Err(e) => {
                if *e.error.kind() == ErrorKind::UnexpectedEos && self.0.bits == 0 {
                    Ok(Async::Ready((e.stream, None)))
                } else {
                    Err(e)
                }
            }
            Ok(v) => Ok(v.map(|(r, v)| (r, Some(v)))),
        }
    }
}

#[derive(Debug)]
pub struct DecodeLengthDelimited<R, T>
where
    R: Read,
    T: Decode<Take<R>>,
{
    phase: Phase2<DecodeVarint<R>, UnwrapTake<T::Future>>,
}
impl<R, T> Future for DecodeLengthDelimited<R, T>
where
    R: Read,
    T: Decode<Take<R>>,
{
    type Item = (R, LengthDelimited<T>);
    type Error = Error<R>;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while let Async::Ready(phase) = track!(self.phase.poll())? {
            let next = match phase {
                Phase2::A((r, len)) => Phase2::B(UnwrapTake(T::decode(r.take(len.0)))),
                Phase2::B((r, value)) => {
                    return Ok(Async::Ready((r.into_inner(), LengthDelimited(value))))
                }
            };
            self.phase = next;
        }
        Ok(Async::NotReady)
    }
}
impl<R, T> Decode<R> for LengthDelimited<T>
where
    R: Read,
    T: Decode<Take<R>>,
{
    type Future = DecodeLengthDelimited<R, T>;
    fn decode(reader: R) -> Self::Future {
        let phase = Phase2::A(Varint::decode(reader));
        DecodeLengthDelimited { phase }
    }
}

#[derive(Debug)]
pub struct DiscardWireValue<R: Read> {
    phase: Phase4<
        DecodeVarint<R>,
        ReadBytes<R, [u8; 4]>,
        ReadBytes<R, [u8; 8]>,
        DecodeLengthDelimited<R, Null>,
    >,
}
impl<R: Read> DiscardWireValue<R> {
    pub fn new(reader: R, wire_type: WireType) -> Self {
        let phase = match wire_type {
            WireType::Varint => Phase4::A(DecodeVarint::new(reader)),
            WireType::Bit32 => Phase4::B(ReadBytes::new(reader, [0; 4])),
            WireType::Bit64 => Phase4::C(ReadBytes::new(reader, [0; 8])),
            WireType::LengthDelimited => Phase4::D(LengthDelimited::<Null>::decode(reader)),
        };
        DiscardWireValue { phase }
    }
}
impl<R: Read> Future for DiscardWireValue<R> {
    type Item = R;
    type Error = Error<R>;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if let Async::Ready(phase) = track!(self.phase.poll())? {
            Ok(Async::Ready(match phase {
                Phase4::A((r, _)) | Phase4::B((r, _)) | Phase4::C((r, _)) | Phase4::D((r, _)) => r,
            }))
        } else {
            Ok(Async::NotReady)
        }
    }
}

#[derive(Debug)]
struct Null;
impl<R: Read> Decode<R> for Null {
    type Future = DiscardAllBytes<R>;
    fn decode(reader: R) -> Self::Future {
        DiscardAllBytes {
            reader: Some(reader),
        }
    }
}

#[derive(Debug)]
struct DiscardAllBytes<R> {
    reader: Option<R>,
}
impl<R: Read> Future for DiscardAllBytes<R> {
    type Item = (R, Null);
    type Error = Error<R>;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let mut reader = self.reader
            .take()
            .expect("Cannot poll DiscardAllBytes twice");
        let mut buf = [0; 1024];
        loop {
            match reader.read(&mut buf) {
                Err(e) => {
                    if e.kind() == io::ErrorKind::WouldBlock {
                        self.reader = Some(reader);
                        return Ok(Async::NotReady);
                    }
                    failed_by_error!(reader, ErrorKind::Other, e);
                }
                Ok(0) => break,
                Ok(_) => {}
            }
        }
        Ok(Async::Ready((reader, Null)))
    }
}