1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use futures::stream::TryStream;
use futures::Stream;
use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use std::pin::Pin;
use std::task::Poll;
#[derive(Debug)]
pub struct HalvesStream<St> {
inner: St,
length_field_length: usize,
}
impl<St> HalvesStream<St> {
pub fn new(inner: St, length_field_length: usize) -> Self {
Self {
inner,
length_field_length,
}
}
}
impl<St> Stream for HalvesStream<St>
where
St: TryStream<Error = std::io::Error> + Unpin,
St::Ok: AsyncRead + AsyncWrite,
{
type Item = Result<
(
FramedWrite<WriteHalf<St::Ok>, LengthDelimitedCodec>,
FramedRead<ReadHalf<St::Ok>, LengthDelimitedCodec>,
),
St::Error,
>;
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut std::task::Context,
) -> Poll<Option<Self::Item>> {
match futures::ready!(Pin::new(&mut self.inner).try_poll_next(ctx)) {
None => None.into(),
Some(Err(err)) => Poll::Ready(Some(Err(err))),
Some(Ok(stream)) => {
let (reader, writer) = tokio::io::split(stream);
let framed_write = LengthDelimitedCodec::builder()
.length_field_length(self.length_field_length)
.new_write(writer);
let framed_read = LengthDelimitedCodec::builder()
.length_field_length(self.length_field_length)
.new_read(reader);
Poll::Ready(Some(Ok((framed_write, framed_read))))
}
}
}
}