fractal_storage_client/stream/
ed25519.rs

1use crate::keys::{Privkey, Pubkey, Secret};
2use bytes::{Buf, Bytes, BytesMut};
3use ed25519_dalek_fiat::{
4    Digest, ExpandedSecretKey, PublicKey, SecretKey, Sha512, Signature, SIGNATURE_LENGTH,
5};
6use futures::stream::Stream;
7use futures::task::Context;
8use futures::task::Poll;
9use std::error::Error as StdError;
10use std::fmt::{Display, Formatter, Result as FmtResult};
11use std::pin::Pin;
12
13pub trait ToChaCha20 {
14    fn to_chacha20_key(&self) -> chacha20::Key;
15}
16
17impl ToChaCha20 for Privkey {
18    fn to_chacha20_key(&self) -> chacha20::Key {
19        self.derive_secret().to_chacha20_key()
20    }
21}
22
23impl ToChaCha20 for Secret {
24    fn to_chacha20_key(&self) -> chacha20::Key {
25        chacha20::Key::clone_from_slice(self.as_slice())
26    }
27}
28
29/// This SignStream wraps around an existing Stream of Bytes, passing through
30/// all of the data, but with the twist that if no error has occured while
31/// streaming the data, it will append a valid Ed25519 Signature of the entire
32/// data stream generated with the private key that it posesses.
33pub struct SignStream<E: StdError> {
34    privkey: Privkey,
35    hasher: Sha512,
36    stream: Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send + Sync>>,
37    eof: bool,
38}
39
40impl<E: StdError> SignStream<E> {
41    /// Create a new SignStream instance, giving it a private key (this will
42    /// be copied and stored) and a pinned, boxed Stream instance.
43    pub fn new<S: Stream<Item = Result<Bytes, E>> + Send + Sync + 'static>(
44        stream: S,
45        privkey: &Privkey,
46    ) -> Self {
47        SignStream {
48            hasher: Sha512::new(),
49            eof: false,
50            privkey: privkey.clone(),
51            stream: Box::pin(stream),
52        }
53    }
54}
55
56impl<E: StdError> Stream for SignStream<E> {
57    type Item = Result<Bytes, E>;
58
59    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60        if self.eof {
61            return Poll::Ready(None);
62        }
63
64        let result = Pin::new(&mut self.stream).poll_next(cx);
65        match &result {
66            Poll::Ready(Some(Ok(bytes))) => {
67                self.hasher.update(bytes);
68            }
69            Poll::Ready(Some(Err(_error))) => self.eof = true,
70            Poll::Ready(None) => {
71                self.eof = true;
72                let secret_key = SecretKey::from_bytes(self.privkey.as_slice()).unwrap();
73                let public_key: PublicKey = (&secret_key).into();
74                let secret_key: ExpandedSecretKey = (&secret_key).into();
75
76                let result = secret_key.sign_prehashed(self.hasher.clone(), &public_key, None);
77                match result {
78                    Ok(signature) => {
79                        let signature = signature.to_bytes().to_vec();
80                        return Poll::Ready(Some(Ok(Bytes::from(signature))));
81                    }
82                    Err(_error) => unimplemented!(),
83                }
84            }
85            _ => {}
86        }
87
88        result
89    }
90}
91
92/// Given a public key and a signed Ed25519 stream, this stream adaptor will
93/// verify the stream on-the-fly.
94pub struct VerifyStream<E: StdError> {
95    pubkey: Pubkey,
96    hasher: Sha512,
97    stream: Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send + Sync>>,
98    verification: Option<bool>,
99    buffer: BytesMut,
100    queue: Option<Bytes>,
101}
102
103#[derive(Debug)]
104pub enum VerifyError<E: StdError> {
105    Stream(E),
106    Length,
107    Invalid(ed25519_dalek_fiat::ed25519::Error),
108    Incorrect,
109}
110
111impl<E: StdError> Display for VerifyError<E> {
112    fn fmt(&self, f: &mut Formatter) -> FmtResult {
113        use VerifyError::*;
114        match self {
115            Stream(error) => write!(f, "{}", error),
116            Incorrect => write!(f, "ed25519 signature validation incorrect"),
117            Invalid(error) => write!(f, "invalid ed25519 signature: {:?}", error),
118            Length => write!(f, "ed25519 signature missing"),
119        }
120    }
121}
122
123impl<E: StdError> StdError for VerifyError<E> {}
124
125impl<E: StdError> VerifyStream<E> {
126    /// Create a new VerifyStream instance from an existing public key and stream.
127    pub fn new<S: Stream<Item = Result<Bytes, E>> + Send + Sync + 'static>(
128        pubkey: &Pubkey,
129        stream: S,
130    ) -> VerifyStream<E> {
131        VerifyStream {
132            pubkey: pubkey.clone(),
133            hasher: Sha512::new(),
134            stream: Box::pin(stream),
135            verification: None,
136            buffer: BytesMut::with_capacity(SIGNATURE_LENGTH),
137            queue: None,
138        }
139    }
140
141    /// Check to see if the stream is verified yet.
142    pub fn verify(&self) -> Option<bool> {
143        self.verification
144    }
145}
146
147impl<E: StdError> Stream for VerifyStream<E> {
148    type Item = Result<Bytes, VerifyError<E>>;
149
150    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151        // if the verification is done, stop passing through data
152        if self.verification.is_some() {
153            return Poll::Ready(None);
154        }
155
156        if let Some(queue) = self.queue.clone() {
157            self.queue = None;
158            return Poll::Ready(Some(Ok(queue)));
159        }
160
161        let result = Pin::new(&mut self.stream).poll_next(cx);
162        match result {
163            Poll::Ready(Some(Ok(mut bytes))) => {
164                // if we haven't gotten a full signature yet, just read and keep pending.
165                let total_length = self.buffer.len() + bytes.len();
166                if total_length <= SIGNATURE_LENGTH {
167                    // we have nothing to return yet.
168                    self.buffer.extend_from_slice(&bytes);
169                    return Poll::Ready(Some(Ok(Bytes::new())));
170                }
171
172                // how many bytes are ready to return?
173                let done_bytes = total_length - SIGNATURE_LENGTH;
174
175                // do we return the entire buffer?
176                if done_bytes >= self.buffer.len() {
177                    let retval = self.buffer.clone().freeze();
178
179                    // split off new buffer
180                    let new_buffer = bytes.split_off(bytes.len() - SIGNATURE_LENGTH);
181                    self.buffer.clear();
182                    self.buffer.extend_from_slice(&new_buffer);
183
184                    // update queue
185                    if bytes.len() > 0 {
186                        self.queue = Some(bytes.clone());
187                    }
188
189                    // hash new data
190                    self.hasher.update(&retval);
191                    self.hasher.update(&bytes);
192
193                    // return previous buffer
194                    Poll::Ready(Some(Ok(retval)))
195                } else {
196                    let mut retval = self.buffer.clone();
197                    let buffer_fragment = retval.split_off(done_bytes);
198                    self.buffer.clear();
199                    self.buffer.extend_from_slice(&buffer_fragment);
200                    self.buffer.extend_from_slice(&bytes);
201
202                    self.hasher.update(&retval);
203                    Poll::Ready(Some(Ok(retval.freeze())))
204                }
205            }
206            Poll::Ready(Some(Err(error))) => {
207                self.verification = Some(false);
208                Poll::Ready(Some(Err(VerifyError::Stream(error))))
209            }
210            Poll::Ready(None) => {
211                if self.buffer.len() < SIGNATURE_LENGTH {
212                    self.verification = Some(false);
213                    return Poll::Ready(Some(Err(VerifyError::Length)));
214                }
215
216                let mut signature = [0; SIGNATURE_LENGTH];
217                self.buffer.copy_to_slice(&mut signature);
218                let signature = match Signature::from_bytes(&signature) {
219                    Ok(signature) => signature,
220                    Err(e) => {
221                        self.verification = Some(false);
222                        return Poll::Ready(Some(Err(VerifyError::Invalid(e))));
223                    }
224                };
225
226                let pubkey = PublicKey::from_bytes(self.pubkey.as_slice()).unwrap();
227                let result = pubkey
228                    .verify_prehashed(self.hasher.clone(), None, &signature)
229                    .is_ok();
230                self.verification = Some(result);
231                if !result {
232                    Poll::Ready(Some(Err(VerifyError::Incorrect)))
233                } else {
234                    Poll::Ready(None)
235                }
236            }
237            Poll::Pending => Poll::Pending,
238        }
239    }
240}
241
242#[cfg(test)]
243#[tokio::test]
244async fn sign_empty_stream() {
245    use futures::StreamExt;
246    let key = Privkey::generate();
247    let stream = futures::stream::iter(vec![]);
248    let mut stream = SignStream::<std::io::Error>::new(stream, &key);
249
250    let result = stream.next().await.unwrap();
251    assert_eq!(result.unwrap().len(), 64);
252
253    assert!(stream.next().await.is_none());
254    assert!(stream.next().await.is_none());
255}
256
257#[cfg(test)]
258#[tokio::test]
259async fn sign_single_stream() {
260    use futures::StreamExt;
261    let key = Privkey::generate();
262    let data: Bytes = "this is some test data".into();
263    let stream = futures::stream::iter(vec![Ok(data.clone())]);
264    let mut stream = SignStream::<std::io::Error>::new(stream, &key);
265
266    let result = stream.next().await.unwrap();
267    assert_eq!(result.unwrap(), data);
268
269    let result = stream.next().await.unwrap();
270    assert_eq!(result.unwrap().len(), 64);
271
272    assert!(stream.next().await.is_none());
273    assert!(stream.next().await.is_none());
274}
275
276#[cfg(test)]
277#[tokio::test]
278async fn sign_multi_stream() {
279    use futures::StreamExt;
280    let key = Privkey::generate();
281    let data1: Bytes = "this is some test data".into();
282    let data2: Bytes = "hello world".into();
283    let data3: Bytes = "oj is guilty".into();
284    let stream = futures::stream::iter(vec![
285        Ok(data1.clone()),
286        Ok(data2.clone()),
287        Ok(data3.clone()),
288    ]);
289    let mut stream = SignStream::<std::io::Error>::new(stream, &key);
290
291    let result = stream.next().await.unwrap();
292    assert_eq!(result.unwrap(), data1);
293    let result = stream.next().await.unwrap();
294    assert_eq!(result.unwrap(), data2);
295    let result = stream.next().await.unwrap();
296    assert_eq!(result.unwrap(), data3);
297
298    let result = stream.next().await.unwrap();
299    assert_eq!(result.unwrap().len(), 64);
300
301    assert!(stream.next().await.is_none());
302    assert!(stream.next().await.is_none());
303}
304
305#[cfg(test)]
306#[tokio::test]
307async fn sign_error_stream() {
308    use futures::StreamExt;
309    let key = Privkey::generate();
310    let data1: Bytes = "this is some test data".into();
311    let data2: Bytes = "the answer is 42".into();
312    let stream = futures::stream::iter(vec![
313        Ok(data1.clone()),
314        Err(std::io::Error::new(std::io::ErrorKind::Other, "error")),
315        Ok(data2.clone()),
316    ]);
317    let mut stream = SignStream::<std::io::Error>::new(stream, &key);
318
319    let result = stream.next().await.unwrap();
320    assert_eq!(result.unwrap(), data1);
321    let result = stream.next().await.unwrap();
322    assert!(result.is_err());
323
324    // do not produce signature after error
325    assert!(stream.next().await.is_none());
326    assert!(stream.next().await.is_none());
327}
328
329#[cfg(test)]
330#[tokio::test]
331async fn verify_empty_stream() {
332    use futures::StreamExt;
333    let key = Privkey::generate().pubkey();
334    let stream = futures::stream::iter(vec![]);
335    let mut stream = VerifyStream::<std::io::Error>::new(&key, Box::pin(stream));
336
337    let result = stream.next().await.unwrap();
338    assert!(result.is_err());
339
340    assert!(stream.next().await.is_none());
341    assert!(stream.next().await.is_none());
342}
343
344#[cfg(test)]
345#[tokio::test]
346async fn verify_missing_stream() {
347    use futures::StreamExt;
348    let key = Privkey::generate().pubkey();
349    let data1: Bytes = "this is some short test".into();
350    let data2: Bytes = "data that is used to assess".into();
351    let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
352    let mut stream = VerifyStream::<std::io::Error>::new(&key, Box::pin(stream));
353
354    let result = stream.next().await.unwrap();
355    assert_eq!(result.unwrap().len(), 0);
356
357    let result = stream.next().await.unwrap();
358    assert_eq!(result.unwrap().len(), 0);
359
360    assert!(stream.next().await.unwrap().is_err());
361    assert!(stream.next().await.is_none());
362}
363
364#[cfg(test)]
365#[tokio::test]
366async fn verify_correct_stream() {
367    use futures::StreamExt;
368    let privkey = Privkey::generate();
369    let pubkey = privkey.pubkey();
370
371    let data1: Bytes = "this is some short test".into();
372    let data2: Bytes = "data that is used to assess".into();
373    let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
374    let stream = SignStream::<std::io::Error>::new(stream, &privkey);
375    let mut stream = VerifyStream::<std::io::Error>::new(&pubkey, Box::pin(stream));
376
377    let result = stream.next().await.unwrap();
378    assert_eq!(result.unwrap().len(), 0);
379    let result = stream.next().await.unwrap();
380    assert_eq!(result.unwrap().len(), 0);
381
382    let result = stream.next().await.unwrap();
383    assert_eq!(result.unwrap().len(), data1.len() + data2.len());
384
385    assert!(stream.next().await.is_none());
386}
387
388#[cfg(test)]
389#[tokio::test]
390async fn verify_incorrect_stream() {
391    use futures::StreamExt;
392    let privkey = Privkey::generate();
393    let pubkey = Privkey::generate().pubkey();
394
395    let data1: Bytes = "this is some short test".into();
396    let data2: Bytes = "data that is used to assess".into();
397    let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
398    let stream = SignStream::<std::io::Error>::new(stream, &privkey);
399    let mut stream = VerifyStream::<std::io::Error>::new(&pubkey, Box::pin(stream));
400
401    let result = stream.next().await.unwrap();
402    assert_eq!(result.unwrap().len(), 0);
403    let result = stream.next().await.unwrap();
404    assert_eq!(result.unwrap().len(), 0);
405
406    let result = stream.next().await.unwrap();
407    assert_eq!(result.unwrap().len(), data1.len() + data2.len());
408
409    // error because signature is invalid
410    let result = stream.next().await.unwrap();
411    assert!(result.is_err());
412
413    assert!(stream.next().await.is_none());
414}
415
416#[cfg(test)]
417#[tokio::test]
418async fn verify_corrupt_stream() {
419    use futures::StreamExt;
420    let privkey = Privkey::generate();
421    let pubkey = privkey.pubkey();
422
423    let data1: Bytes = "this is some short test".into();
424    let data2: Bytes = "data that is used to assess".into();
425    let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
426    let mut stream = SignStream::<std::io::Error>::new(stream, &privkey);
427    let mut data = vec![];
428    while let Some(item) = stream.next().await {
429        if data.len() > 0 {
430            data.push(item);
431        } else {
432            // corrupt some data
433            let mut item: BytesMut = item.unwrap().chunk().into();
434            item[0] = 56;
435            data.push(Ok(item.freeze()));
436        }
437    }
438    let stream = futures::stream::iter(data);
439    let mut stream = VerifyStream::<std::io::Error>::new(&pubkey, Box::pin(stream));
440
441    let result = stream.next().await.unwrap();
442    assert_eq!(result.unwrap().len(), 0);
443    let result = stream.next().await.unwrap();
444    assert_eq!(result.unwrap().len(), 0);
445
446    let result = stream.next().await.unwrap();
447    assert_eq!(result.unwrap().len(), data1.len() + data2.len());
448
449    // error because signature is invalid
450    let result = stream.next().await.unwrap();
451    assert!(result.is_err());
452
453    assert!(stream.next().await.is_none());
454}