algae_cli/
streams.rs

1use std::iter;
2
3use age::{Decryptor, Encryptor, Identity, Recipient};
4use miette::{Context as _, IntoDiagnostic as _, Result};
5use tokio::io::AsyncWriteExt as _;
6use tokio_util::compat::{FuturesAsyncReadCompatExt as _, FuturesAsyncWriteCompatExt as _};
7use tracing::trace;
8
9/// Encrypt a bytestream given a [`Recipient`].
10pub async fn encrypt_stream<R: tokio::io::AsyncRead + Unpin, W: futures::AsyncWrite + Unpin>(
11	mut reader: R,
12	writer: W,
13	key: Box<dyn Recipient + Send>,
14) -> Result<u64> {
15	let mut encrypting_writer = Encryptor::with_recipients(iter::once(&*key as _))
16		.expect("BUG: a single recipient is always given")
17		.wrap_async_output(writer)
18		.await
19		.into_diagnostic()?
20		.compat_write();
21
22	let bytes = tokio::io::copy(&mut reader, &mut encrypting_writer)
23		.await
24		.into_diagnostic()
25		.wrap_err("encrypting data in stream")?;
26
27	encrypting_writer
28		.shutdown()
29		.await
30		.into_diagnostic()
31		.wrap_err("closing the encrypted output")?;
32
33	trace!(?bytes, "bytestream encrypted");
34
35	Ok(bytes)
36}
37
38/// Decrypt a bytestream given an [`Identity`].
39pub async fn decrypt_stream<R: futures::AsyncRead + Unpin, W: tokio::io::AsyncWrite + Unpin>(
40	reader: R,
41	mut writer: W,
42	key: Box<dyn Identity>,
43) -> Result<u64> {
44	let mut decrypting_reader = Decryptor::new_async(reader)
45		.await
46		.into_diagnostic()?
47		.decrypt_async(iter::once(&*key))
48		.into_diagnostic()?
49		.compat();
50
51	let bytes = tokio::io::copy(&mut decrypting_reader, &mut writer)
52		.await
53		.into_diagnostic()
54		.wrap_err("decrypting data")?;
55
56	writer
57		.shutdown()
58		.await
59		.into_diagnostic()
60		.wrap_err("closing the output stream")?;
61
62	trace!(?bytes, "bytestream decrypted");
63
64	Ok(bytes)
65}