use crate::*;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{BoxStream, Stream, StreamExt};
use one_err::*;
use std::future::Future;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
const MAX_FRAME: usize = 1024 * 8;
pub mod traits {
use super::*;
pub trait AsS3Sender<T>: 'static + Send + Sync
where
T: 'static + serde::Serialize + Send,
{
fn send(&self, t: T) -> BoxFuture<'static, LairResult<()>>;
fn get_enc_ctx_key(
&self,
) -> SharedSizedLockedArray<{ sodoken::secretstream::KEYBYTES }>;
fn get_dec_ctx_key(
&self,
) -> SharedSizedLockedArray<{ sodoken::secretstream::KEYBYTES }>;
fn shutdown(&self) -> BoxFuture<'static, LairResult<()>>;
}
pub trait AsS3Receiver<T>:
'static + Send + Stream<Item = LairResult<T>> + Unpin
where
T: for<'de> serde::Deserialize<'de>,
{
}
}
use traits::*;
pub struct S3Sender<T>(pub Arc<dyn AsS3Sender<T>>);
impl<T> Clone for S3Sender<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> S3Sender<T>
where
T: 'static + serde::Serialize + Send,
{
pub fn send(
&self,
t: T,
) -> impl Future<Output = LairResult<()>> + 'static + Send {
AsS3Sender::send(&*self.0, t)
}
pub fn get_enc_ctx_key(
&self,
) -> SharedSizedLockedArray<{ sodoken::secretstream::KEYBYTES }> {
AsS3Sender::get_enc_ctx_key(&*self.0)
}
pub fn get_dec_ctx_key(
&self,
) -> SharedSizedLockedArray<{ sodoken::secretstream::KEYBYTES }> {
AsS3Sender::get_dec_ctx_key(&*self.0)
}
pub fn shutdown(
&self,
) -> impl Future<Output = LairResult<()>> + 'static + Send {
AsS3Sender::shutdown(&*self.0)
}
}
pub struct S3Receiver<T>(pub Box<dyn AsS3Receiver<T>>);
impl<T> Stream for S3Receiver<T>
where
T: for<'de> serde::Deserialize<'de>,
{
type Item = LairResult<T>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.0), cx)
}
}
pub fn new_s3_server<T, S, R>(
send: S,
recv: R,
srv_id_pub_key: Arc<[u8; 32]>,
srv_id_sec_key: SharedSizedLockedArray<32>,
) -> impl Future<Output = LairResult<(S3Sender<T>, S3Receiver<T>)>> + 'static + Send
where
T: 'static + serde::Serialize + for<'de> serde::Deserialize<'de> + Send,
S: 'static + tokio::io::AsyncWrite + Send + Unpin,
R: 'static + tokio::io::AsyncRead + Send + Unpin,
{
async move {
let mut send: PrivRawSend = Box::new(send);
let mut recv: PrivRawRecv = Box::new(recv);
let mut cipher = [0; 64 + sodoken::crypto_box::XSALSA_SEALBYTES];
recv.read_exact(&mut cipher).await?;
let mut msg = [0; 64];
sodoken::crypto_box::xsalsa_seal_open(
&mut msg,
&cipher,
&srv_id_pub_key,
&srv_id_sec_key.lock().unwrap().lock(),
)?;
let mut oth_cbox_pub: [u8; 32] = [0; 32];
oth_cbox_pub.copy_from_slice(&msg[..32]);
let mut oth_kx_pub: [u8; 32] = [0; 32];
oth_kx_pub.copy_from_slice(&msg[32..]);
let mut eph_kx_pub = [0; sodoken::crypto_box::XSALSA_PUBLICKEYBYTES];
let mut eph_kx_sec = sodoken::SizedLockedArray::<
{ sodoken::crypto_box::XSALSA_SECRETKEYBYTES },
>::new()?;
sodoken::crypto_box::xsalsa_keypair(
&mut eph_kx_pub,
&mut eph_kx_sec.lock(),
)?;
let mut cipher = [0; 32 + sodoken::crypto_box::XSALSA_SEALBYTES];
sodoken::crypto_box::xsalsa_seal(
&mut cipher,
&eph_kx_pub,
&oth_cbox_pub,
)?;
send.write_all(&cipher).await?;
let mut rx = sodoken::SizedLockedArray::<
{ sodoken::kx::SESSIONKEYBYTES },
>::new()?;
let mut tx = sodoken::SizedLockedArray::<
{ sodoken::kx::SESSIONKEYBYTES },
>::new()?;
sodoken::kx::client_session_keys(
&mut rx.lock(),
&mut tx.lock(),
&eph_kx_pub,
&eph_kx_sec.lock(),
&oth_kx_pub,
)?;
let rx = Arc::new(Mutex::new(rx));
let tx = Arc::new(Mutex::new(tx));
let (enc, dec) =
priv_init_ss(&mut send, tx.clone(), &mut recv, rx.clone()).await?;
let (send, recv) = priv_framed(send, recv);
let (send, recv) = priv_crypt(send, enc, recv, dec);
let send: PrivSend<T> = PrivSend::new(send, tx, rx);
let send: S3Sender<T> = S3Sender(Arc::new(send));
let recv: PrivRecv<T> = PrivRecv::new(recv);
let recv: S3Receiver<T> = S3Receiver(Box::new(recv));
Ok((send, recv))
}
}
pub fn new_s3_client<T, S, R>(
send: S,
recv: R,
srv_id_pub_key: BinDataSized<32>,
) -> impl Future<Output = LairResult<(S3Sender<T>, S3Receiver<T>)>> + 'static + Send
where
T: 'static + serde::Serialize + for<'de> serde::Deserialize<'de> + Send,
S: 'static + tokio::io::AsyncWrite + Send + Unpin,
R: 'static + tokio::io::AsyncRead + Send + Unpin,
{
async move {
let mut send: PrivRawSend = Box::new(send);
let mut recv: PrivRawRecv = Box::new(recv);
let mut eph_cbox_pub = [0; sodoken::crypto_box::XSALSA_PUBLICKEYBYTES];
let mut eph_cbox_sec = sodoken::SizedLockedArray::<
{ sodoken::crypto_box::XSALSA_SECRETKEYBYTES },
>::new()?;
sodoken::crypto_box::xsalsa_keypair(
&mut eph_cbox_pub,
&mut eph_cbox_sec.lock(),
)?;
let mut eph_kx_pub = [0; sodoken::crypto_box::XSALSA_PUBLICKEYBYTES];
let mut eph_kx_sec = sodoken::SizedLockedArray::<
{ sodoken::crypto_box::XSALSA_SECRETKEYBYTES },
>::new()?;
sodoken::crypto_box::xsalsa_keypair(
&mut eph_kx_pub,
&mut eph_kx_sec.lock(),
)?;
let mut message: [u8; 64] = [0; 64];
message[..32].copy_from_slice(&eph_cbox_pub);
message[32..].copy_from_slice(&eph_kx_pub);
let mut cipher = [0; 64 + sodoken::crypto_box::XSALSA_SEALBYTES];
sodoken::crypto_box::xsalsa_seal(
&mut cipher,
&message,
&srv_id_pub_key,
)?;
send.write_all(&cipher).await?;
let mut cipher = [0; 32 + sodoken::crypto_box::XSALSA_SEALBYTES];
recv.read_exact(&mut cipher).await?;
let mut oth_eph_kx_pub = [0; 32];
sodoken::crypto_box::xsalsa_seal_open(
&mut oth_eph_kx_pub,
&cipher,
&eph_cbox_pub,
&eph_cbox_sec.lock(),
)?;
let mut rx = sodoken::SizedLockedArray::<
{ sodoken::kx::SESSIONKEYBYTES },
>::new()?;
let mut tx = sodoken::SizedLockedArray::<
{ sodoken::kx::SESSIONKEYBYTES },
>::new()?;
sodoken::kx::server_session_keys(
&mut rx.lock(),
&mut tx.lock(),
&eph_kx_pub,
&eph_kx_sec.lock(),
&oth_eph_kx_pub,
)?;
let rx = Arc::new(Mutex::new(rx));
let tx = Arc::new(Mutex::new(tx));
let (enc, dec) =
priv_init_ss(&mut send, tx.clone(), &mut recv, rx.clone()).await?;
let (send, recv) = priv_framed(send, recv);
let (send, recv) = priv_crypt(send, enc, recv, dec);
let send: PrivSend<T> = PrivSend::new(send, tx, rx);
let send: S3Sender<T> = S3Sender(Arc::new(send));
let recv: PrivRecv<T> = PrivRecv::new(recv);
let recv: S3Receiver<T> = S3Receiver(Box::new(recv));
Ok((send, recv))
}
}
type PrivRawSend = Box<dyn tokio::io::AsyncWrite + 'static + Send + Unpin>;
type PrivRawRecv = Box<dyn tokio::io::AsyncRead + 'static + Send + Unpin>;
mod framed;
use framed::*;
mod crypt;
use crypt::*;
mod inner;
use crate::types::SharedSizedLockedArray;
use inner::*;
fn priv_init_ss<'a>(
send: &'a mut PrivRawSend,
tx: SharedSizedLockedArray<{ sodoken::secretstream::KEYBYTES }>,
recv: &'a mut PrivRawRecv,
rx: SharedSizedLockedArray<{ sodoken::secretstream::KEYBYTES }>,
) -> impl Future<
Output = LairResult<(
sodoken::secretstream::State,
sodoken::secretstream::State,
)>,
>
+ 'a
+ Send {
async move {
let mut header = [0; sodoken::secretstream::HEADERBYTES];
let mut enc = sodoken::secretstream::State::default();
sodoken::secretstream::init_push(
&mut enc,
&mut header,
&tx.lock().unwrap().lock(),
)?;
send.write_all(&header).await?;
recv.read_exact(&mut header).await?;
let mut dec = sodoken::secretstream::State::default();
sodoken::secretstream::init_pull(
&mut dec,
&header,
&rx.lock().unwrap().lock(),
)?;
Ok((enc, dec))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_sodium_secretstream() {
let mut srv_id_pub = [0; sodoken::crypto_box::XSALSA_PUBLICKEYBYTES];
let mut srv_id_sec = sodoken::SizedLockedArray::<
{ sodoken::crypto_box::XSALSA_SECRETKEYBYTES },
>::new()
.unwrap();
sodoken::crypto_box::xsalsa_keypair(
&mut srv_id_pub,
&mut srv_id_sec.lock(),
)
.unwrap();
let srv_id_pub = Arc::new(srv_id_pub);
let srv_id_sec = Arc::new(Mutex::new(srv_id_sec));
let (alice, bob) = tokio::io::duplex(4096);
let (alice_recv, alice_send) = tokio::io::split(alice);
let alice_fut = new_s3_client::<usize, _, _>(
alice_send,
alice_recv,
srv_id_pub.clone().into(),
);
let (bob_recv, bob_send) = tokio::io::split(bob);
let bob_fut = new_s3_server::<usize, _, _>(
bob_send, bob_recv, srv_id_pub, srv_id_sec,
);
let ((alice_send, mut alice_recv), (bob_send, mut bob_recv)) =
futures::future::try_join(alice_fut, bob_fut).await.unwrap();
assert_eq!(
&*alice_send.get_enc_ctx_key().lock().unwrap().lock(),
&*bob_send.get_dec_ctx_key().lock().unwrap().lock(),
);
assert_eq!(
&*alice_send.get_dec_ctx_key().lock().unwrap().lock(),
&*bob_send.get_enc_ctx_key().lock().unwrap().lock(),
);
alice_send.send(42).await.unwrap();
bob_send.send(99).await.unwrap();
alice_send.shutdown().await.unwrap();
bob_send.shutdown().await.unwrap();
assert_eq!(42, bob_recv.next().await.unwrap().unwrap());
assert_eq!(99, alice_recv.next().await.unwrap().unwrap());
assert_eq!(
std::io::ErrorKind::UnexpectedEof,
bob_recv.next().await.unwrap().unwrap_err().io_kind(),
);
assert_eq!(
std::io::ErrorKind::UnexpectedEof,
alice_recv.next().await.unwrap().unwrap_err().io_kind(),
);
}
}