use crate::BUFFER_SIZE;
use crate::protocol::{
Role, perform_client_handshake, read_encrypted_frame, write_encrypted_frame, write_join_frame,
};
use anyhow::{Result, bail};
use bytes::BytesMut;
use std::net::SocketAddr;
use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
use tokio::net::TcpStream;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
pub struct HydraClient {
role: Role,
session_key: [u8; 32],
buf_reader: BufReader<OwnedReadHalf>,
buf_writer: BufWriter<OwnedWriteHalf>,
mem_pool: BytesMut,
}
impl HydraClient {
pub async fn connect_producer(
addr: SocketAddr,
session_id: &[u8; 64],
session_key: [u8; 32],
) -> Result<Self> {
let stream = TcpStream::connect(addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) = stream.into_split();
let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
write_join_frame(
&mut writer,
Role::Producer,
session_id,
&transport_key,
&mut mem_pool,
)
.await?;
Ok(Self {
role: Role::Producer,
buf_reader: reader,
buf_writer: writer,
session_key,
mem_pool,
})
}
pub async fn broadcast(&mut self, data: &[u8]) -> Result<()> {
if self.role != Role::Producer {
bail!("broadcast is only available for producers");
}
write_encrypted_frame(
&mut self.buf_writer,
data,
&self.session_key,
&mut self.mem_pool,
)
.await
}
pub async fn connect_consumer(
addr: SocketAddr,
session_id: &[u8; 64],
session_key: [u8; 32],
) -> Result<Self> {
let stream = TcpStream::connect(addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) = stream.into_split();
let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
write_join_frame(
&mut writer,
Role::Consumer,
session_id,
&transport_key,
&mut mem_pool,
)
.await?;
Ok(Self {
role: Role::Consumer,
buf_reader: reader,
buf_writer: writer,
session_key,
mem_pool,
})
}
pub async fn recv(&mut self) -> Result<&[u8]> {
if self.role != Role::Consumer {
bail!("recv is only available for consumers");
}
let decrypted =
read_encrypted_frame(&mut self.buf_reader, &self.session_key, &mut self.mem_pool)
.await?;
Ok(decrypted)
}
pub async fn server_status(&mut self) -> Result<()> {
todo!(
"Implement server status query, returns: uptime total_client_connected, total_sessions"
);
}
pub async fn close(&mut self) -> Result<()> {
self.buf_writer.flush().await?;
self.buf_writer.shutdown().await?;
Ok(())
}
}