simple_message_channels/
reader.rs1use futures::future::{Future, FutureExt};
2use futures::io::{AsyncRead, AsyncReadExt};
3use futures::stream::Stream;
4use futures::task::{Context, Poll};
5use std::io::{Error, ErrorKind};
6use std::pin::Pin;
7
8use crate::{Message, MAX_MESSAGE_SIZE};
9
10pub struct Reader<R> {
27 future: Pin<Box<dyn Future<Output = Result<(Message, R), Error>> + Send>>,
28 finished: bool,
29}
30
31impl<R> Reader<R>
32where
33 R: AsyncRead + Send + Unpin + 'static,
34{
35 pub fn new(reader: R) -> Self {
37 Self {
38 future: decoder(reader).boxed(),
39 finished: false,
40 }
41 }
42}
43
44impl<R> Stream for Reader<R>
46where
47 R: AsyncRead + Send + Unpin + 'static,
48{
49 type Item = Result<Message, Error>;
50 fn poll_next(
51 mut self: Pin<&mut Self>,
52 cx: &mut Context<'_>,
53 ) -> Poll<Option<Result<Message, Error>>> {
54 if self.finished {
55 return Poll::Ready(None);
56 }
57 match self.future.poll_unpin(cx) {
58 Poll::Pending => Poll::Pending,
59 Poll::Ready(result) => {
60 match result {
61 Ok((message, reader)) => {
62 self.future = decoder(reader).boxed();
64 Poll::Ready(Some(Ok(message)))
65 }
66 Err(error) => {
67 self.finished = true;
68 Poll::Ready(Some(Err(error)))
69 }
70 }
71 }
72 }
73 }
74}
75
76pub async fn decoder<'a, R>(mut reader: R) -> Result<(Message, R), Error>
80where
81 R: AsyncRead + Send + Unpin + 'static,
82{
83 let mut varint: u64 = 0;
84 let mut factor = 1;
85 let mut headerbuf = vec![0u8; 1];
86 loop {
88 reader.read_exact(&mut headerbuf).await?;
89 let byte = headerbuf[0];
90
91 if byte == 0 {
93 continue;
94 }
95
96 varint = varint + (byte as u64 & 127) * factor;
97 if byte < 128 {
98 break;
99 }
100 if varint > MAX_MESSAGE_SIZE {
101 return Err(Error::new(ErrorKind::InvalidInput, "Message too long"));
102 }
103 factor = factor * 128;
104 }
105
106 let mut messagebuf = vec![0u8; varint as usize];
108 reader.read_exact(&mut messagebuf).await?;
109 let message = Message::from_buf(&messagebuf)?;
110 Ok((message, reader))
111}