ssb_packetstream/
stream.rs1use byteorder::{BigEndian, ByteOrder};
2use core::pin::Pin;
3use core::task::{Context, Poll, Poll::Pending, Poll::Ready};
4use futures::io::{AsyncRead, AsyncReadExt};
5use futures::stream::Stream;
6use std::mem::replace;
7
8use crate::packet::*;
9use crate::PinFut;
10use snafu::{ensure, ResultExt, Snafu};
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14 #[snafu(display("Failed to receive packet: {}", source))]
15 Recv { source: std::io::Error },
16
17 #[snafu(display("IO error while reading packet header: {}", source))]
18 Header { source: std::io::Error },
19
20 #[snafu(display(
21 "IO error while reading packet body. Body size: {}. Error: {}",
22 size,
23 source
24 ))]
25 Body { size: usize, source: std::io::Error },
26
27 #[snafu(display("PacketStream underlying reader closed without goodbye"))]
28 NoGoodbye {},
29}
30
31async fn recv<R>(r: &mut R) -> Result<Option<Packet>, Error>
32where
33 R: AsyncRead + Unpin,
34{
35 let mut head = [0; 9];
36 let n = r.read(&mut head).await.context(Header)?;
37 ensure!(n != 0, NoGoodbye);
38 if n < head.len() {
39 r.read_exact(&mut head[n..]).await.context(Header)?;
40 }
41
42 if head == [0u8; 9] {
43 return Ok(None); }
45
46 let body_len = BigEndian::read_u32(&head[1..5]) as usize;
47 let id = BigEndian::read_i32(&head[5..]);
48
49 let mut body = vec![0; body_len];
50 r.read_exact(&mut body)
51 .await
52 .context(Body { size: body_len })?;
53
54 Ok(Some(Packet::new(
55 head[0].into(),
56 head[0].into(),
57 head[0].into(),
58 id,
59 body,
60 )))
61}
62
63async fn recv_move<R>(mut r: R) -> (R, Result<Option<Packet>, Error>)
64where
65 R: AsyncRead + Unpin + 'static,
66{
67 let res = recv(&mut r).await;
68 (r, res)
69}
70
71pub struct PacketStream<R> {
96 state: State<R>,
97}
98impl<R> PacketStream<R> {
99 pub fn new(r: R) -> PacketStream<R> {
100 PacketStream {
101 state: State::Ready(r),
102 }
103 }
104
105 pub fn is_closed(&self) -> bool {
106 match &self.state {
107 State::Closed(_) => true,
108 _ => false,
109 }
110 }
111
112 pub fn into_inner(mut self) -> R {
113 match self.state.take() {
114 State::Ready(r) | State::Closed(r) => r,
115 _ => panic!(),
116 }
117 }
118}
119
120enum State<R> {
121 Ready(R),
122 Waiting(PinFut<(R, Result<Option<Packet>, Error>)>),
123 Closed(R),
124 Invalid,
125}
126impl<R> State<R> {
127 fn take(&mut self) -> Self {
128 replace(self, State::Invalid)
129 }
130}
131
132fn next<R>(state: State<R>, cx: &mut Context) -> (State<R>, Poll<Option<Result<Packet, Error>>>)
133where
134 R: AsyncRead + Unpin + 'static,
135{
136 match state {
137 State::Ready(r) => next(State::Waiting(Box::pin(recv_move(r))), cx),
138 State::Waiting(mut f) => match f.as_mut().poll(cx) {
139 Pending => (State::Waiting(f), Pending),
140 Ready((r, Ok(None))) => (State::Closed(r), Ready(None)),
141 Ready((r, Err(e))) => (State::Closed(r), Ready(Some(Err(e)))),
142 Ready((r, res)) => (State::Ready(r), Ready(res.transpose())),
143 },
144 State::Closed(r) => (State::Closed(r), Ready(None)),
145 State::Invalid => panic!(),
146 }
147}
148
149impl<R: AsyncRead + Unpin + 'static> Stream for PacketStream<R> {
150 type Item = Result<Packet, Error>;
151
152 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
153 let (state, poll) = next(self.state.take(), cx);
154 self.state = state;
155 poll
156 }
157}