risq 0.4.1

Re-implementation of Bisq (https://github.com/bisq-network/bisq) in rust
use crate::{
    bisq::payload::{network_envelope, NetworkEnvelope},
    error,
    prelude::{
        io::{AsyncRead, ReadHalf},
        net::TcpStream,
        Async, Stream,
    },
};
use futures::try_ready;
use prost::{encoding::decode_varint, Message};
use std::{collections::VecDeque, io};

enum MessageStreamState {
    MessageInProgress {
        size: usize,
        pos: usize,
        buf: Vec<u8>,
    },
    BetweenMessages {
        buf: [u8; 10],
        pos: usize,
    },
    Empty,
}
pub struct MessageStream {
    reader: ReadHalf<TcpStream>,
    state: MessageStreamState,
    buffer: VecDeque<NetworkEnvelope>,
}
impl MessageStream {
    pub fn new(reader: ReadHalf<TcpStream>) -> MessageStream {
        MessageStream {
            reader,
            state: MessageStreamState::BetweenMessages {
                buf: [0; 10],
                pos: 0,
            },
            buffer: VecDeque::new(),
        }
    }
    fn next_from_buffer(&mut self) -> Option<network_envelope::Message> {
        let msg = self.buffer.pop_front()?.message;
        match msg {
            Some(network_envelope::Message::BundleOfEnvelopes(msg)) => {
                msg.envelopes
                    .into_iter()
                    .rev()
                    .for_each(|envelope| self.buffer.push_front(envelope));
                self.next_from_buffer()
            }
            None => self.next_from_buffer(),
            _ => msg,
        }
    }
}
impl Stream for MessageStream {
    type Item = network_envelope::Message;
    type Error = error::Error;

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        if let Some(msg) = self.next_from_buffer() {
            debug!("Receiving msg: {:?}", msg);
            return Ok(Async::Ready(Some(msg)));
        }
        let next_read = match self.state {
            MessageStreamState::Empty => panic!("Stream is already finished"),
            MessageStreamState::BetweenMessages {
                ref mut buf,
                ref mut pos,
            } => {
                while *pos < buf.len() {
                    let n = try_ready!(self.reader.poll_read(&mut buf[*pos..=*pos]));
                    if n == 0 {
                        self.state = MessageStreamState::Empty;
                        return Err(
                            io::Error::new(io::ErrorKind::UnexpectedEof, "early eof").into()
                        );
                    }
                    let old_pos = *pos;
                    *pos += n;
                    if buf[old_pos] & 0b1000_0000 == 0 {
                        break;
                    }
                }
                let mut size_reader: VecDeque<u8> = buf.iter().take(*pos).cloned().collect();
                let size = decode_varint(&mut size_reader)? as usize;
                let buf = vec![0; size];
                self.state = MessageStreamState::MessageInProgress { size, pos: 0, buf };
                return self.poll();
            }
            MessageStreamState::MessageInProgress {
                ref mut size,
                ref mut pos,
                ref mut buf,
            } => {
                while *pos < *size {
                    let n = try_ready!(self.reader.poll_read(&mut buf[*pos..]));
                    *pos += n;
                    if n == 0 {
                        return Err(
                            io::Error::new(io::ErrorKind::UnexpectedEof, "early eof").into()
                        );
                    }
                }
                match NetworkEnvelope::decode(&*buf) {
                    Ok(res) => res,
                    Err(e) => {
                        self.state = MessageStreamState::Empty;
                        debug!("Decode error {:?}", e);
                        return Err(e.into());
                    }
                }
            }
        };
        self.buffer.push_back(next_read);
        self.state = MessageStreamState::BetweenMessages {
            buf: [0; 10],
            pos: 0,
        };
        self.poll()
    }
}