1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use async_std::future::Future;
use async_std::io::{BufReader, Error};
use async_std::prelude::*;
use async_std::stream::Stream;
use async_std::task::{Context, Poll};
use futures::io::AsyncRead;
use futures::pin_mut;
use std::pin::Pin;
use crate::Message;
pub struct Reader<R> {
inner: BufReader<R>,
}
impl<R> Reader<R>
where
R: AsyncRead,
{
pub fn new(reader: R) -> Self {
Self {
inner: BufReader::new(reader),
}
}
}
impl<R> Stream for Reader<R>
where
R: AsyncRead + Send + Unpin,
{
type Item = Result<Message, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Message, Error>>> {
let fut = decode(&mut self.inner);
pin_mut!(fut);
match fut.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => Poll::Ready(Some(result)),
}
}
}
pub async fn decode<'a, R>(reader: &mut R) -> Result<Message, Error>
where
R: AsyncRead + Unpin + 'a,
{
let mut varint: u64 = 0;
let mut factor = 1;
let mut headerbuf = vec![0u8; 1];
loop {
reader.read_exact(&mut headerbuf).await?;
let byte = headerbuf[0];
varint = varint + (byte as u64 & 127) * factor;
if byte < 128 {
break;
}
factor = factor * 128;
}
let mut messagebuf = vec![0u8; varint as usize];
reader.read_exact(&mut messagebuf).await?;
let message = Message::from_buf(&messagebuf)?;
Ok(message)
}