fractal_storage_client/stream/
ed25519.rs1use crate::keys::{Privkey, Pubkey, Secret};
2use bytes::{Buf, Bytes, BytesMut};
3use ed25519_dalek_fiat::{
4 Digest, ExpandedSecretKey, PublicKey, SecretKey, Sha512, Signature, SIGNATURE_LENGTH,
5};
6use futures::stream::Stream;
7use futures::task::Context;
8use futures::task::Poll;
9use std::error::Error as StdError;
10use std::fmt::{Display, Formatter, Result as FmtResult};
11use std::pin::Pin;
12
13pub trait ToChaCha20 {
14 fn to_chacha20_key(&self) -> chacha20::Key;
15}
16
17impl ToChaCha20 for Privkey {
18 fn to_chacha20_key(&self) -> chacha20::Key {
19 self.derive_secret().to_chacha20_key()
20 }
21}
22
23impl ToChaCha20 for Secret {
24 fn to_chacha20_key(&self) -> chacha20::Key {
25 chacha20::Key::clone_from_slice(self.as_slice())
26 }
27}
28
29pub struct SignStream<E: StdError> {
34 privkey: Privkey,
35 hasher: Sha512,
36 stream: Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send + Sync>>,
37 eof: bool,
38}
39
40impl<E: StdError> SignStream<E> {
41 pub fn new<S: Stream<Item = Result<Bytes, E>> + Send + Sync + 'static>(
44 stream: S,
45 privkey: &Privkey,
46 ) -> Self {
47 SignStream {
48 hasher: Sha512::new(),
49 eof: false,
50 privkey: privkey.clone(),
51 stream: Box::pin(stream),
52 }
53 }
54}
55
56impl<E: StdError> Stream for SignStream<E> {
57 type Item = Result<Bytes, E>;
58
59 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60 if self.eof {
61 return Poll::Ready(None);
62 }
63
64 let result = Pin::new(&mut self.stream).poll_next(cx);
65 match &result {
66 Poll::Ready(Some(Ok(bytes))) => {
67 self.hasher.update(bytes);
68 }
69 Poll::Ready(Some(Err(_error))) => self.eof = true,
70 Poll::Ready(None) => {
71 self.eof = true;
72 let secret_key = SecretKey::from_bytes(self.privkey.as_slice()).unwrap();
73 let public_key: PublicKey = (&secret_key).into();
74 let secret_key: ExpandedSecretKey = (&secret_key).into();
75
76 let result = secret_key.sign_prehashed(self.hasher.clone(), &public_key, None);
77 match result {
78 Ok(signature) => {
79 let signature = signature.to_bytes().to_vec();
80 return Poll::Ready(Some(Ok(Bytes::from(signature))));
81 }
82 Err(_error) => unimplemented!(),
83 }
84 }
85 _ => {}
86 }
87
88 result
89 }
90}
91
92pub struct VerifyStream<E: StdError> {
95 pubkey: Pubkey,
96 hasher: Sha512,
97 stream: Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send + Sync>>,
98 verification: Option<bool>,
99 buffer: BytesMut,
100 queue: Option<Bytes>,
101}
102
103#[derive(Debug)]
104pub enum VerifyError<E: StdError> {
105 Stream(E),
106 Length,
107 Invalid(ed25519_dalek_fiat::ed25519::Error),
108 Incorrect,
109}
110
111impl<E: StdError> Display for VerifyError<E> {
112 fn fmt(&self, f: &mut Formatter) -> FmtResult {
113 use VerifyError::*;
114 match self {
115 Stream(error) => write!(f, "{}", error),
116 Incorrect => write!(f, "ed25519 signature validation incorrect"),
117 Invalid(error) => write!(f, "invalid ed25519 signature: {:?}", error),
118 Length => write!(f, "ed25519 signature missing"),
119 }
120 }
121}
122
123impl<E: StdError> StdError for VerifyError<E> {}
124
125impl<E: StdError> VerifyStream<E> {
126 pub fn new<S: Stream<Item = Result<Bytes, E>> + Send + Sync + 'static>(
128 pubkey: &Pubkey,
129 stream: S,
130 ) -> VerifyStream<E> {
131 VerifyStream {
132 pubkey: pubkey.clone(),
133 hasher: Sha512::new(),
134 stream: Box::pin(stream),
135 verification: None,
136 buffer: BytesMut::with_capacity(SIGNATURE_LENGTH),
137 queue: None,
138 }
139 }
140
141 pub fn verify(&self) -> Option<bool> {
143 self.verification
144 }
145}
146
147impl<E: StdError> Stream for VerifyStream<E> {
148 type Item = Result<Bytes, VerifyError<E>>;
149
150 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151 if self.verification.is_some() {
153 return Poll::Ready(None);
154 }
155
156 if let Some(queue) = self.queue.clone() {
157 self.queue = None;
158 return Poll::Ready(Some(Ok(queue)));
159 }
160
161 let result = Pin::new(&mut self.stream).poll_next(cx);
162 match result {
163 Poll::Ready(Some(Ok(mut bytes))) => {
164 let total_length = self.buffer.len() + bytes.len();
166 if total_length <= SIGNATURE_LENGTH {
167 self.buffer.extend_from_slice(&bytes);
169 return Poll::Ready(Some(Ok(Bytes::new())));
170 }
171
172 let done_bytes = total_length - SIGNATURE_LENGTH;
174
175 if done_bytes >= self.buffer.len() {
177 let retval = self.buffer.clone().freeze();
178
179 let new_buffer = bytes.split_off(bytes.len() - SIGNATURE_LENGTH);
181 self.buffer.clear();
182 self.buffer.extend_from_slice(&new_buffer);
183
184 if bytes.len() > 0 {
186 self.queue = Some(bytes.clone());
187 }
188
189 self.hasher.update(&retval);
191 self.hasher.update(&bytes);
192
193 Poll::Ready(Some(Ok(retval)))
195 } else {
196 let mut retval = self.buffer.clone();
197 let buffer_fragment = retval.split_off(done_bytes);
198 self.buffer.clear();
199 self.buffer.extend_from_slice(&buffer_fragment);
200 self.buffer.extend_from_slice(&bytes);
201
202 self.hasher.update(&retval);
203 Poll::Ready(Some(Ok(retval.freeze())))
204 }
205 }
206 Poll::Ready(Some(Err(error))) => {
207 self.verification = Some(false);
208 Poll::Ready(Some(Err(VerifyError::Stream(error))))
209 }
210 Poll::Ready(None) => {
211 if self.buffer.len() < SIGNATURE_LENGTH {
212 self.verification = Some(false);
213 return Poll::Ready(Some(Err(VerifyError::Length)));
214 }
215
216 let mut signature = [0; SIGNATURE_LENGTH];
217 self.buffer.copy_to_slice(&mut signature);
218 let signature = match Signature::from_bytes(&signature) {
219 Ok(signature) => signature,
220 Err(e) => {
221 self.verification = Some(false);
222 return Poll::Ready(Some(Err(VerifyError::Invalid(e))));
223 }
224 };
225
226 let pubkey = PublicKey::from_bytes(self.pubkey.as_slice()).unwrap();
227 let result = pubkey
228 .verify_prehashed(self.hasher.clone(), None, &signature)
229 .is_ok();
230 self.verification = Some(result);
231 if !result {
232 Poll::Ready(Some(Err(VerifyError::Incorrect)))
233 } else {
234 Poll::Ready(None)
235 }
236 }
237 Poll::Pending => Poll::Pending,
238 }
239 }
240}
241
242#[cfg(test)]
243#[tokio::test]
244async fn sign_empty_stream() {
245 use futures::StreamExt;
246 let key = Privkey::generate();
247 let stream = futures::stream::iter(vec![]);
248 let mut stream = SignStream::<std::io::Error>::new(stream, &key);
249
250 let result = stream.next().await.unwrap();
251 assert_eq!(result.unwrap().len(), 64);
252
253 assert!(stream.next().await.is_none());
254 assert!(stream.next().await.is_none());
255}
256
257#[cfg(test)]
258#[tokio::test]
259async fn sign_single_stream() {
260 use futures::StreamExt;
261 let key = Privkey::generate();
262 let data: Bytes = "this is some test data".into();
263 let stream = futures::stream::iter(vec![Ok(data.clone())]);
264 let mut stream = SignStream::<std::io::Error>::new(stream, &key);
265
266 let result = stream.next().await.unwrap();
267 assert_eq!(result.unwrap(), data);
268
269 let result = stream.next().await.unwrap();
270 assert_eq!(result.unwrap().len(), 64);
271
272 assert!(stream.next().await.is_none());
273 assert!(stream.next().await.is_none());
274}
275
276#[cfg(test)]
277#[tokio::test]
278async fn sign_multi_stream() {
279 use futures::StreamExt;
280 let key = Privkey::generate();
281 let data1: Bytes = "this is some test data".into();
282 let data2: Bytes = "hello world".into();
283 let data3: Bytes = "oj is guilty".into();
284 let stream = futures::stream::iter(vec![
285 Ok(data1.clone()),
286 Ok(data2.clone()),
287 Ok(data3.clone()),
288 ]);
289 let mut stream = SignStream::<std::io::Error>::new(stream, &key);
290
291 let result = stream.next().await.unwrap();
292 assert_eq!(result.unwrap(), data1);
293 let result = stream.next().await.unwrap();
294 assert_eq!(result.unwrap(), data2);
295 let result = stream.next().await.unwrap();
296 assert_eq!(result.unwrap(), data3);
297
298 let result = stream.next().await.unwrap();
299 assert_eq!(result.unwrap().len(), 64);
300
301 assert!(stream.next().await.is_none());
302 assert!(stream.next().await.is_none());
303}
304
305#[cfg(test)]
306#[tokio::test]
307async fn sign_error_stream() {
308 use futures::StreamExt;
309 let key = Privkey::generate();
310 let data1: Bytes = "this is some test data".into();
311 let data2: Bytes = "the answer is 42".into();
312 let stream = futures::stream::iter(vec![
313 Ok(data1.clone()),
314 Err(std::io::Error::new(std::io::ErrorKind::Other, "error")),
315 Ok(data2.clone()),
316 ]);
317 let mut stream = SignStream::<std::io::Error>::new(stream, &key);
318
319 let result = stream.next().await.unwrap();
320 assert_eq!(result.unwrap(), data1);
321 let result = stream.next().await.unwrap();
322 assert!(result.is_err());
323
324 assert!(stream.next().await.is_none());
326 assert!(stream.next().await.is_none());
327}
328
329#[cfg(test)]
330#[tokio::test]
331async fn verify_empty_stream() {
332 use futures::StreamExt;
333 let key = Privkey::generate().pubkey();
334 let stream = futures::stream::iter(vec![]);
335 let mut stream = VerifyStream::<std::io::Error>::new(&key, Box::pin(stream));
336
337 let result = stream.next().await.unwrap();
338 assert!(result.is_err());
339
340 assert!(stream.next().await.is_none());
341 assert!(stream.next().await.is_none());
342}
343
344#[cfg(test)]
345#[tokio::test]
346async fn verify_missing_stream() {
347 use futures::StreamExt;
348 let key = Privkey::generate().pubkey();
349 let data1: Bytes = "this is some short test".into();
350 let data2: Bytes = "data that is used to assess".into();
351 let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
352 let mut stream = VerifyStream::<std::io::Error>::new(&key, Box::pin(stream));
353
354 let result = stream.next().await.unwrap();
355 assert_eq!(result.unwrap().len(), 0);
356
357 let result = stream.next().await.unwrap();
358 assert_eq!(result.unwrap().len(), 0);
359
360 assert!(stream.next().await.unwrap().is_err());
361 assert!(stream.next().await.is_none());
362}
363
364#[cfg(test)]
365#[tokio::test]
366async fn verify_correct_stream() {
367 use futures::StreamExt;
368 let privkey = Privkey::generate();
369 let pubkey = privkey.pubkey();
370
371 let data1: Bytes = "this is some short test".into();
372 let data2: Bytes = "data that is used to assess".into();
373 let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
374 let stream = SignStream::<std::io::Error>::new(stream, &privkey);
375 let mut stream = VerifyStream::<std::io::Error>::new(&pubkey, Box::pin(stream));
376
377 let result = stream.next().await.unwrap();
378 assert_eq!(result.unwrap().len(), 0);
379 let result = stream.next().await.unwrap();
380 assert_eq!(result.unwrap().len(), 0);
381
382 let result = stream.next().await.unwrap();
383 assert_eq!(result.unwrap().len(), data1.len() + data2.len());
384
385 assert!(stream.next().await.is_none());
386}
387
388#[cfg(test)]
389#[tokio::test]
390async fn verify_incorrect_stream() {
391 use futures::StreamExt;
392 let privkey = Privkey::generate();
393 let pubkey = Privkey::generate().pubkey();
394
395 let data1: Bytes = "this is some short test".into();
396 let data2: Bytes = "data that is used to assess".into();
397 let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
398 let stream = SignStream::<std::io::Error>::new(stream, &privkey);
399 let mut stream = VerifyStream::<std::io::Error>::new(&pubkey, Box::pin(stream));
400
401 let result = stream.next().await.unwrap();
402 assert_eq!(result.unwrap().len(), 0);
403 let result = stream.next().await.unwrap();
404 assert_eq!(result.unwrap().len(), 0);
405
406 let result = stream.next().await.unwrap();
407 assert_eq!(result.unwrap().len(), data1.len() + data2.len());
408
409 let result = stream.next().await.unwrap();
411 assert!(result.is_err());
412
413 assert!(stream.next().await.is_none());
414}
415
416#[cfg(test)]
417#[tokio::test]
418async fn verify_corrupt_stream() {
419 use futures::StreamExt;
420 let privkey = Privkey::generate();
421 let pubkey = privkey.pubkey();
422
423 let data1: Bytes = "this is some short test".into();
424 let data2: Bytes = "data that is used to assess".into();
425 let stream = futures::stream::iter(vec![Ok(data1.clone()), Ok(data2.clone())]);
426 let mut stream = SignStream::<std::io::Error>::new(stream, &privkey);
427 let mut data = vec![];
428 while let Some(item) = stream.next().await {
429 if data.len() > 0 {
430 data.push(item);
431 } else {
432 let mut item: BytesMut = item.unwrap().chunk().into();
434 item[0] = 56;
435 data.push(Ok(item.freeze()));
436 }
437 }
438 let stream = futures::stream::iter(data);
439 let mut stream = VerifyStream::<std::io::Error>::new(&pubkey, Box::pin(stream));
440
441 let result = stream.next().await.unwrap();
442 assert_eq!(result.unwrap().len(), 0);
443 let result = stream.next().await.unwrap();
444 assert_eq!(result.unwrap().len(), 0);
445
446 let result = stream.next().await.unwrap();
447 assert_eq!(result.unwrap().len(), data1.len() + data2.len());
448
449 let result = stream.next().await.unwrap();
451 assert!(result.is_err());
452
453 assert!(stream.next().await.is_none());
454}