ssb_boxstream/
read.rs

1use crate::bytes::cast_mut;
2use crate::msg::*;
3
4use crate::NonceGen;
5use core::cmp::min;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use futures_core::ready;
9use futures_io::{self as io, AsyncRead};
10use ssb_crypto::secretbox::{Key, Nonce};
11use thiserror::Error;
12
13#[derive(Debug, Error)]
14enum BoxStreamError {
15    #[error("IO error: {source}")]
16    Io {
17        #[from]
18        source: io::Error,
19    },
20    #[error("Failed to decrypt header")]
21    HeaderOpenFailed,
22    #[error("Failed to decrypt body")]
23    BodyOpenFailed,
24}
25
26impl From<BoxStreamError> for io::Error {
27    fn from(err: BoxStreamError) -> io::Error {
28        match err {
29            BoxStreamError::Io { source } => source,
30            err => io::Error::new(io::ErrorKind::InvalidData, err),
31        }
32    }
33}
34
35pub struct BoxReader<R, B> {
36    inner: R,
37    buffer: B,
38    state: State,
39    key: Key,
40    nonces: NonceGen,
41}
42
43impl<R, B> BoxReader<R, B> {
44    pub fn with_buffer(inner: R, key: Key, nonce: Nonce, buffer: B) -> BoxReader<R, B> {
45        BoxReader {
46            inner,
47            buffer,
48            state: State::ReadingHead {
49                head: [0; Head::SIZE],
50                pos: 0,
51            },
52            key,
53            nonces: NonceGen::with_starting_nonce(nonce),
54        }
55    }
56
57    pub fn is_closed(&self) -> bool {
58        match self.state {
59            State::Done => true,
60            _ => false,
61        }
62    }
63
64    pub fn into_inner(self) -> R {
65        self.inner
66    }
67}
68
69impl<R> BoxReader<R, Vec<u8>> {
70    pub fn new(inner: R, key: Key, nonce: Nonce) -> BoxReader<R, Vec<u8>> {
71        BoxReader::with_buffer(inner, key, nonce, std::vec![0; 4096])
72    }
73}
74
75enum State {
76    Ready { body_size: usize, pos: usize },
77    ReadingHead { head: [u8; Head::SIZE], pos: usize },
78    ReadingBody { head: HeadPayload, pos: usize },
79    Done,
80}
81
82impl<R: AsyncRead, B> AsyncRead for BoxReader<R, B>
83where
84    R: Unpin + AsyncRead + 'static,
85    B: AsMut<[u8]> + Unpin,
86{
87    fn poll_read(
88        self: Pin<&mut Self>,
89        cx: &mut Context,
90        out: &mut [u8],
91    ) -> Poll<Result<usize, io::Error>> {
92        let mut this = self.get_mut();
93
94        match this.state {
95            State::Ready { body_size, pos } => {
96                let n = min(out.len(), body_size - pos);
97                out[..n].copy_from_slice(&this.buffer.as_mut()[pos..pos + n]);
98                if pos + n == body_size {
99                    // need to read a new box
100                    this.state = State::ReadingHead {
101                        head: [0; Head::SIZE],
102                        pos: 0,
103                    };
104                } else {
105                    this.state = State::Ready {
106                        body_size,
107                        pos: pos + n,
108                    }
109                }
110                Poll::Ready(Ok(n))
111            }
112
113            State::ReadingHead { mut head, pos } => {
114                let n = ready!(Pin::new(&mut this.inner).poll_read(cx, &mut head[pos..]))?;
115                if n == head.len() - pos {
116                    // done reading head
117                    let hd = cast_mut::<Head>(&mut head[..])
118                        .open(&this.key, this.nonces.next())
119                        .ok_or(io::Error::from(BoxStreamError::HeaderOpenFailed))?;
120
121                    if hd.is_goodbye() {
122                        this.state = State::Done;
123                        Poll::Ready(Ok(0))
124                    } else {
125                        this.state = State::ReadingBody { head: *hd, pos: 0 };
126                        Pin::new(&mut this).poll_read(cx, out)
127                    }
128                } else {
129                    this.state = State::ReadingHead { head, pos: pos + n };
130                    Poll::Pending
131                }
132            }
133
134            State::ReadingBody { head, pos } => {
135                let body_size = head.body_size.get() as usize;
136                let n = ready!(Pin::new(&mut this.inner)
137                    .poll_read(cx, &mut this.buffer.as_mut()[pos..body_size]))?;
138
139                if n == body_size - pos {
140                    // Done reading body, open it.
141                    if this.key.open(
142                        &mut this.buffer.as_mut()[..body_size],
143                        &head.body_hmac,
144                        &this.nonces.next(),
145                    ) {
146                        this.state = State::Ready { body_size, pos: 0 };
147                        Pin::new(&mut this).poll_read(cx, out)
148                    } else {
149                        Poll::Ready(Err(BoxStreamError::BodyOpenFailed.into()))
150                    }
151                } else {
152                    this.state = State::ReadingBody { head, pos: pos + n };
153                    Poll::Pending
154                }
155            }
156
157            State::Done => Poll::Ready(Ok(0)),
158        }
159    }
160}