1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use futures::future::{Future, FutureExt};
use futures::io::{AsyncRead, AsyncReadExt};
use futures::stream::Stream;
use futures::task::{Context, Poll};
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use crate::{Message, MAX_MESSAGE_SIZE};
pub struct Reader<R> {
future: Pin<Box<dyn Future<Output = Result<(Message, R), Error>> + Send>>,
finished: bool,
}
impl<R> Reader<R>
where
R: AsyncRead + Send + Unpin + 'static,
{
pub fn new(reader: R) -> Self {
Self {
future: decoder(reader).boxed(),
finished: false,
}
}
}
impl<R> Stream for Reader<R>
where
R: AsyncRead + Send + Unpin + 'static,
{
type Item = Result<Message, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Message, Error>>> {
if self.finished {
return Poll::Ready(None);
}
match self.future.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
match result {
Ok((message, reader)) => {
self.future = decoder(reader).boxed();
Poll::Ready(Some(Ok(message)))
}
Err(error) => {
self.finished = true;
Poll::Ready(Some(Err(error)))
}
}
}
}
}
}
pub async fn decoder<'a, R>(mut reader: R) -> Result<(Message, R), Error>
where
R: AsyncRead + Send + Unpin + 'static,
{
let mut varint: u64 = 0;
let mut factor = 1;
let mut headerbuf = vec![0u8; 1];
loop {
reader.read_exact(&mut headerbuf).await?;
let byte = headerbuf[0];
if byte == 0 {
continue;
}
varint = varint + (byte as u64 & 127) * factor;
if byte < 128 {
break;
}
if varint > MAX_MESSAGE_SIZE {
return Err(Error::new(ErrorKind::InvalidInput, "Message too long"));
}
factor = factor * 128;
}
let mut messagebuf = vec![0u8; varint as usize];
reader.read_exact(&mut messagebuf).await?;
let message = Message::from_buf(&messagebuf)?;
Ok((message, reader))
}