integer-encoding 4.1.0

varint+zigzag and fixedint integer encoding/decoding (https://developers.google.com/protocol-buffers/docs/encoding)
Documentation
use std::io;
use std::io::{Read, Result};

use crate::fixed::FixedInt;
use crate::varint::{VarInt, VarIntMaxSize, MSB};

#[cfg(feature = "tokio_async")]
use tokio::io::{AsyncRead, AsyncReadExt};

#[cfg(feature = "futures_async")]
use futures_util::{io::AsyncRead, io::AsyncReadExt};

/// A trait for reading [`VarInts`] from any other `Reader`.
///
/// It's recommended to use a buffered reader, as many small reads will happen.
pub trait VarIntReader {
    /// Returns either the decoded integer, or an error.
    ///
    /// In general, this always reads a whole varint. If the encoded varint's value is bigger
    /// than the valid value range of `VI`, then the value is truncated.
    ///
    /// On EOF, an [`io::Error`] with [`io::ErrorKind::UnexpectedEof`] is returned.
    fn read_varint<VI: VarInt>(&mut self) -> Result<VI>;
}

#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
/// Like a VarIntReader, but returns a future.
#[async_trait::async_trait(?Send)]
pub trait VarIntAsyncReader {
    async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI>;
}

/// `VarIntProcessor` encapsulates the logic for decoding a [`VarInt`] byte-by-byte.
#[derive(Default)]
pub struct VarIntProcessor {
    buf: [u8; 10],
    maxsize: usize,
    i: usize,
}

impl VarIntProcessor {
    fn new<VI: VarIntMaxSize>() -> VarIntProcessor {
        VarIntProcessor {
            maxsize: VI::varint_max_size(),
            ..VarIntProcessor::default()
        }
    }
    fn push(&mut self, b: u8) -> Result<()> {
        if self.i >= self.maxsize {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "Unterminated varint",
            ));
        }
        self.buf[self.i] = b;
        self.i += 1;
        Ok(())
    }
    fn finished(&self) -> bool {
        self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
    }
    fn decode<VI: VarInt>(&self) -> Option<VI> {
        Some(VI::decode_var(&self.buf[0..self.i])?.0)
    }
}

#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
impl<AR: AsyncRead + Unpin> VarIntAsyncReader for AR {
    async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI> {
        let mut buf = [0_u8; 1];
        let mut p = VarIntProcessor::new::<VI>();

        while !p.finished() {
            let read = self.read(&mut buf).await?;

            // EOF
            if read == 0 && p.i == 0 {
                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
            }
            if read == 0 {
                break;
            }

            p.push(buf[0])?;
        }

        p.decode()
            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
    }
}

impl<R: Read> VarIntReader for R {
    fn read_varint<VI: VarInt>(&mut self) -> Result<VI> {
        let mut buf = [0_u8; 1];
        let mut p = VarIntProcessor::new::<VI>();

        while !p.finished() {
            let read = self.read(&mut buf)?;

            // EOF
            if read == 0 && p.i == 0 {
                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
            }
            if read == 0 {
                break;
            }

            p.push(buf[0])?;
        }

        p.decode()
            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
    }
}

/// A trait for reading [`FixedInts`] from any other `Reader`.
pub trait FixedIntReader {
    /// Read a fixed integer from a reader. How many bytes are read depends on `FI`.
    ///
    /// On EOF, an [`io::Error`] with [`io::ErrorKind::UnexpectedEof`] is returned.
    fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI>;
}

/// Like FixedIntReader, but returns a future.
#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
pub trait FixedIntAsyncReader {
    async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI>;
}

#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
#[async_trait::async_trait(?Send)]
impl<AR: AsyncRead + Unpin> FixedIntAsyncReader for AR {
    async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI> {
        let mut buf = [0_u8; 8];
        self.read_exact(&mut buf[0..std::mem::size_of::<FI>()])
            .await?;
        Ok(FI::decode_fixed(&buf[0..std::mem::size_of::<FI>()]).unwrap())
    }
}

impl<R: Read> FixedIntReader for R {
    fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI> {
        let mut buf = [0_u8; 8];
        self.read_exact(&mut buf[0..std::mem::size_of::<FI>()])?;
        Ok(FI::decode_fixed(&buf[0..std::mem::size_of::<FI>()]).unwrap())
    }
}