use crate::BUFFER_SIZE;
use crate::protocol::{
Role, perform_client_handshake, read_encrypted_frame, write_encrypted_frame, write_join_frame,
};
use anyhow::Result;
use bytes::BytesMut;
use std::marker::PhantomData;
use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpStream, ToSocketAddrs};
pub struct Producer;
pub struct Consumer;
pub struct HydraClient<R> {
session_key: [u8; 32],
buf_reader: BufReader<OwnedReadHalf>,
buf_writer: BufWriter<OwnedWriteHalf>,
mem_pool: BytesMut,
_role: PhantomData<R>,
}
impl HydraClient<Producer> {
pub async fn connect<A: ToSocketAddrs>(
addr: A,
session_id: &[u8; 64],
session_key: [u8; 32],
) -> Result<Self> {
let stream = TcpStream::connect(addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) = stream.into_split();
let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
write_join_frame(
&mut writer,
Role::Producer,
session_id,
&transport_key,
&mut mem_pool,
)
.await?;
Ok(Self {
buf_reader: reader,
buf_writer: writer,
session_key,
mem_pool,
_role: PhantomData,
})
}
pub async fn broadcast(&mut self, data: &[u8]) -> Result<()> {
write_encrypted_frame(
&mut self.buf_writer,
data,
&self.session_key,
&mut self.mem_pool,
)
.await
}
}
impl HydraClient<Consumer> {
pub async fn connect<A: ToSocketAddrs>(
addr: A,
session_id: &[u8; 64],
session_key: [u8; 32],
) -> Result<Self> {
let stream = TcpStream::connect(addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) = stream.into_split();
let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
write_join_frame(
&mut writer,
Role::Consumer,
session_id,
&transport_key,
&mut mem_pool,
)
.await?;
Ok(Self {
buf_reader: reader,
buf_writer: writer,
session_key,
mem_pool,
_role: PhantomData,
})
}
pub async fn recv(&mut self) -> Result<&[u8]> {
let decrypted =
read_encrypted_frame(&mut self.buf_reader, &self.session_key, &mut self.mem_pool)
.await?;
Ok(decrypted)
}
}
impl<R> HydraClient<R> {
pub async fn close(&mut self) -> Result<()> {
self.buf_writer.flush().await?;
self.buf_writer.shutdown().await?;
Ok(())
}
}