1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use super::framed::Fuse;
use super::Decoder;
use bytes::BytesMut;
use futures::io::AsyncRead;
use futures::{ready, Sink, Stream, TryStreamExt};
use std::io;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct FramedRead<T, D> {
inner: FramedRead2<Fuse<T, D>>,
}
impl<T, D> FramedRead<T, D>
where
T: AsyncRead,
D: Decoder,
{
pub fn new(inner: T, decoder: D) -> Self {
Self {
inner: framed_read_2(Fuse(inner, decoder)),
}
}
}
impl<T, D> Stream for FramedRead<T, D>
where
T: AsyncRead + Unpin,
D: Decoder,
{
type Item = Result<D::Item, D::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.try_poll_next_unpin(cx)
}
}
pub struct FramedRead2<T> {
inner: T,
buffer: BytesMut,
}
const INITIAL_CAPACITY: usize = 8 * 1024;
pub fn framed_read_2<T>(inner: T) -> FramedRead2<T> {
FramedRead2 {
inner,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
}
}
impl<T> Stream for FramedRead2<T>
where
T: AsyncRead + Decoder + Unpin,
{
type Item = Result<T::Item, T::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let mut buf = [0u8; INITIAL_CAPACITY];
loop {
let n = ready!(Pin::new(&mut this.inner).poll_read(cx, &mut buf))?;
this.buffer.extend_from_slice(&buf[..n]);
match this.inner.decode(&mut this.buffer)? {
Some(item) => return Poll::Ready(Some(Ok(item))),
None => {
if this.buffer.is_empty() {
return Poll::Ready(None);
} else if n == 0 {
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"bytes remaining in stream",
)
.into())));
}
}
}
}
}
}
impl<T, I> Sink<I> for FramedRead2<T>
where
T: Sink<I> + Unpin,
{
type SinkError = T::SinkError;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::SinkError>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> {
Pin::new(&mut self.inner).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::SinkError>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::SinkError>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}