1use crate::bytes::cast_mut;
2use crate::msg::*;
3
4use crate::NonceGen;
5use core::cmp::min;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use futures_core::ready;
9use futures_io::{self as io, AsyncRead};
10use ssb_crypto::secretbox::{Key, Nonce};
11use thiserror::Error;
12
13#[derive(Debug, Error)]
14enum BoxStreamError {
15 #[error("IO error: {source}")]
16 Io {
17 #[from]
18 source: io::Error,
19 },
20 #[error("Failed to decrypt header")]
21 HeaderOpenFailed,
22 #[error("Failed to decrypt body")]
23 BodyOpenFailed,
24}
25
26impl From<BoxStreamError> for io::Error {
27 fn from(err: BoxStreamError) -> io::Error {
28 match err {
29 BoxStreamError::Io { source } => source,
30 err => io::Error::new(io::ErrorKind::InvalidData, err),
31 }
32 }
33}
34
35pub struct BoxReader<R, B> {
36 inner: R,
37 buffer: B,
38 state: State,
39 key: Key,
40 nonces: NonceGen,
41}
42
43impl<R, B> BoxReader<R, B> {
44 pub fn with_buffer(inner: R, key: Key, nonce: Nonce, buffer: B) -> BoxReader<R, B> {
45 BoxReader {
46 inner,
47 buffer,
48 state: State::ReadingHead {
49 head: [0; Head::SIZE],
50 pos: 0,
51 },
52 key,
53 nonces: NonceGen::with_starting_nonce(nonce),
54 }
55 }
56
57 pub fn is_closed(&self) -> bool {
58 match self.state {
59 State::Done => true,
60 _ => false,
61 }
62 }
63
64 pub fn into_inner(self) -> R {
65 self.inner
66 }
67}
68
69impl<R> BoxReader<R, Vec<u8>> {
70 pub fn new(inner: R, key: Key, nonce: Nonce) -> BoxReader<R, Vec<u8>> {
71 BoxReader::with_buffer(inner, key, nonce, std::vec![0; 4096])
72 }
73}
74
75enum State {
76 Ready { body_size: usize, pos: usize },
77 ReadingHead { head: [u8; Head::SIZE], pos: usize },
78 ReadingBody { head: HeadPayload, pos: usize },
79 Done,
80}
81
82impl<R: AsyncRead, B> AsyncRead for BoxReader<R, B>
83where
84 R: Unpin + AsyncRead + 'static,
85 B: AsMut<[u8]> + Unpin,
86{
87 fn poll_read(
88 self: Pin<&mut Self>,
89 cx: &mut Context,
90 out: &mut [u8],
91 ) -> Poll<Result<usize, io::Error>> {
92 let mut this = self.get_mut();
93
94 match this.state {
95 State::Ready { body_size, pos } => {
96 let n = min(out.len(), body_size - pos);
97 out[..n].copy_from_slice(&this.buffer.as_mut()[pos..pos + n]);
98 if pos + n == body_size {
99 this.state = State::ReadingHead {
101 head: [0; Head::SIZE],
102 pos: 0,
103 };
104 } else {
105 this.state = State::Ready {
106 body_size,
107 pos: pos + n,
108 }
109 }
110 Poll::Ready(Ok(n))
111 }
112
113 State::ReadingHead { mut head, pos } => {
114 let n = ready!(Pin::new(&mut this.inner).poll_read(cx, &mut head[pos..]))?;
115 if n == head.len() - pos {
116 let hd = cast_mut::<Head>(&mut head[..])
118 .open(&this.key, this.nonces.next())
119 .ok_or(io::Error::from(BoxStreamError::HeaderOpenFailed))?;
120
121 if hd.is_goodbye() {
122 this.state = State::Done;
123 Poll::Ready(Ok(0))
124 } else {
125 this.state = State::ReadingBody { head: *hd, pos: 0 };
126 Pin::new(&mut this).poll_read(cx, out)
127 }
128 } else {
129 this.state = State::ReadingHead { head, pos: pos + n };
130 Poll::Pending
131 }
132 }
133
134 State::ReadingBody { head, pos } => {
135 let body_size = head.body_size.get() as usize;
136 let n = ready!(Pin::new(&mut this.inner)
137 .poll_read(cx, &mut this.buffer.as_mut()[pos..body_size]))?;
138
139 if n == body_size - pos {
140 if this.key.open(
142 &mut this.buffer.as_mut()[..body_size],
143 &head.body_hmac,
144 &this.nonces.next(),
145 ) {
146 this.state = State::Ready { body_size, pos: 0 };
147 Pin::new(&mut this).poll_read(cx, out)
148 } else {
149 Poll::Ready(Err(BoxStreamError::BodyOpenFailed.into()))
150 }
151 } else {
152 this.state = State::ReadingBody { head, pos: pos + n };
153 Poll::Pending
154 }
155 }
156
157 State::Done => Poll::Ready(Ok(0)),
158 }
159 }
160}