simple_message_channels/
reader.rs

1use futures::future::{Future, FutureExt};
2use futures::io::{AsyncRead, AsyncReadExt};
3use futures::stream::Stream;
4use futures::task::{Context, Poll};
5use std::io::{Error, ErrorKind};
6use std::pin::Pin;
7
8use crate::{Message, MAX_MESSAGE_SIZE};
9
10/// A reader for SMC messages.
11///
12/// Takes any [`futures::io::AsyncRead`] and is a
13/// [`async_std::stream::Stream`] of [`Message`]s.
14///
15/// # Example
16///
17/// ```rust
18/// use simple_message_channels::Reader;
19/// let stdin = io::stdin().lock().await;
20/// let mut reader = Reader::new(stdin);
21/// while let Some(msg) = reader.next().await {
22///     let msg = msg?;
23///     println!("Received: ch {} typ {} msg {:?}", msg.channel, msg.typ, text);
24/// }
25/// ```
26pub struct Reader<R> {
27    future: Pin<Box<dyn Future<Output = Result<(Message, R), Error>> + Send>>,
28    finished: bool,
29}
30
31impl<R> Reader<R>
32where
33    R: AsyncRead + Send + Unpin + 'static,
34{
35    /// Create a new message reader from any [`futures::io::AsyncRead`].
36    pub fn new(reader: R) -> Self {
37        Self {
38            future: decoder(reader).boxed(),
39            finished: false,
40        }
41    }
42}
43
44// Proxy to the internal Reader instance and decode messages.
45impl<R> Stream for Reader<R>
46where
47    R: AsyncRead + Send + Unpin + 'static,
48{
49    type Item = Result<Message, Error>;
50    fn poll_next(
51        mut self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53    ) -> Poll<Option<Result<Message, Error>>> {
54        if self.finished {
55            return Poll::Ready(None);
56        }
57        match self.future.poll_unpin(cx) {
58            Poll::Pending => Poll::Pending,
59            Poll::Ready(result) => {
60                match result {
61                    Ok((message, reader)) => {
62                        // Re-init the future.
63                        self.future = decoder(reader).boxed();
64                        Poll::Ready(Some(Ok(message)))
65                    }
66                    Err(error) => {
67                        self.finished = true;
68                        Poll::Ready(Some(Err(error)))
69                    }
70                }
71            }
72        }
73    }
74}
75
76/// Decode a single message from a Reader instance.
77///
78/// Returns either an error or both the message and the Reader instance.
79pub async fn decoder<'a, R>(mut reader: R) -> Result<(Message, R), Error>
80where
81    R: AsyncRead + Send + Unpin + 'static,
82{
83    let mut varint: u64 = 0;
84    let mut factor = 1;
85    let mut headerbuf = vec![0u8; 1];
86    // Read initial varint (message length).
87    loop {
88        reader.read_exact(&mut headerbuf).await?;
89        let byte = headerbuf[0];
90
91        // Skip empty bytes (may be keepalive pings).
92        if byte == 0 {
93            continue;
94        }
95
96        varint = varint + (byte as u64 & 127) * factor;
97        if byte < 128 {
98            break;
99        }
100        if varint > MAX_MESSAGE_SIZE {
101            return Err(Error::new(ErrorKind::InvalidInput, "Message too long"));
102        }
103        factor = factor * 128;
104    }
105
106    // Read main message.
107    let mut messagebuf = vec![0u8; varint as usize];
108    reader.read_exact(&mut messagebuf).await?;
109    let message = Message::from_buf(&messagebuf)?;
110    Ok((message, reader))
111}