Skip to main content

ssb_packetstream/
stream.rs

1use byteorder::{BigEndian, ByteOrder};
2use core::pin::Pin;
3use core::task::{Context, Poll, Poll::Pending, Poll::Ready};
4use futures::io::{AsyncRead, AsyncReadExt};
5use futures::stream::Stream;
6use std::mem::replace;
7
8use crate::packet::*;
9use crate::PinFut;
10use snafu::{ensure, ResultExt, Snafu};
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14    #[snafu(display("Failed to receive packet: {}", source))]
15    Recv { source: std::io::Error },
16
17    #[snafu(display("IO error while reading packet header: {}", source))]
18    Header { source: std::io::Error },
19
20    #[snafu(display(
21        "IO error while reading packet body. Body size: {}. Error: {}",
22        size,
23        source
24    ))]
25    Body { size: usize, source: std::io::Error },
26
27    #[snafu(display("PacketStream underlying reader closed without goodbye"))]
28    NoGoodbye {},
29}
30
31async fn recv<R>(r: &mut R) -> Result<Option<Packet>, Error>
32where
33    R: AsyncRead + Unpin,
34{
35    let mut head = [0; 9];
36    let n = r.read(&mut head).await.context(Header)?;
37    ensure!(n != 0, NoGoodbye);
38    if n < head.len() {
39        r.read_exact(&mut head[n..]).await.context(Header)?;
40    }
41
42    if head == [0u8; 9] {
43        return Ok(None); // RPC goodbye
44    }
45
46    let body_len = BigEndian::read_u32(&head[1..5]) as usize;
47    let id = BigEndian::read_i32(&head[5..]);
48
49    let mut body = vec![0; body_len];
50    r.read_exact(&mut body)
51        .await
52        .context(Body { size: body_len })?;
53
54    Ok(Some(Packet::new(
55        head[0].into(),
56        head[0].into(),
57        head[0].into(),
58        id,
59        body,
60    )))
61}
62
63async fn recv_move<R>(mut r: R) -> (R, Result<Option<Packet>, Error>)
64where
65    R: AsyncRead + Unpin + 'static,
66{
67    let res = recv(&mut r).await;
68    (r, res)
69}
70
71/// # Examples
72/// ```rust
73/// use futures::executor::block_on;
74/// use futures::sink::SinkExt;
75/// use futures::stream::TryStreamExt;
76/// use ssb_packetstream::*;
77///
78/// let p = Packet::new(IsStream::Yes,
79///                     IsEnd::No,
80///                     BodyType::Binary,
81///                     12345,
82///                     vec![1,2,3,4,5]);
83///
84/// let (writer, reader) = async_ringbuffer::ring_buffer(64);
85///
86/// let mut sink = PacketSink::new(writer);
87/// let mut stream = PacketStream::new(reader);
88/// block_on(async {
89///     sink.send(p).await;
90///     let r = stream.try_next().await.unwrap().unwrap();
91///     assert_eq!(&r.body, &[1,2,3,4,5]);
92///     assert_eq!(r.id, 12345);
93/// });
94/// ```
95pub struct PacketStream<R> {
96    state: State<R>,
97}
98impl<R> PacketStream<R> {
99    pub fn new(r: R) -> PacketStream<R> {
100        PacketStream {
101            state: State::Ready(r),
102        }
103    }
104
105    pub fn is_closed(&self) -> bool {
106        match &self.state {
107            State::Closed(_) => true,
108            _ => false,
109        }
110    }
111
112    pub fn into_inner(mut self) -> R {
113        match self.state.take() {
114            State::Ready(r) | State::Closed(r) => r,
115            _ => panic!(),
116        }
117    }
118}
119
120enum State<R> {
121    Ready(R),
122    Waiting(PinFut<(R, Result<Option<Packet>, Error>)>),
123    Closed(R),
124    Invalid,
125}
126impl<R> State<R> {
127    fn take(&mut self) -> Self {
128        replace(self, State::Invalid)
129    }
130}
131
132fn next<R>(state: State<R>, cx: &mut Context) -> (State<R>, Poll<Option<Result<Packet, Error>>>)
133where
134    R: AsyncRead + Unpin + 'static,
135{
136    match state {
137        State::Ready(r) => next(State::Waiting(Box::pin(recv_move(r))), cx),
138        State::Waiting(mut f) => match f.as_mut().poll(cx) {
139            Pending => (State::Waiting(f), Pending),
140            Ready((r, Ok(None))) => (State::Closed(r), Ready(None)),
141            Ready((r, Err(e))) => (State::Closed(r), Ready(Some(Err(e)))),
142            Ready((r, res)) => (State::Ready(r), Ready(res.transpose())),
143        },
144        State::Closed(r) => (State::Closed(r), Ready(None)),
145        State::Invalid => panic!(),
146    }
147}
148
149impl<R: AsyncRead + Unpin + 'static> Stream for PacketStream<R> {
150    type Item = Result<Packet, Error>;
151
152    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
153        let (state, poll) = next(self.state.take(), cx);
154        self.state = state;
155        poll
156    }
157}