1use crate::BUFFER_SIZE;
2use crate::protocol::{
3 Role, perform_client_handshake, read_encrypted_frame, write_encrypted_frame, write_join_frame,
4};
5use anyhow::{Result, bail};
6use bytes::BytesMut;
7use std::net::SocketAddr;
8use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
9use tokio::net::TcpStream;
10use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
11
12pub struct HydraClient {
17 role: Role,
18 session_key: [u8; 32],
19 buf_reader: BufReader<OwnedReadHalf>,
20 buf_writer: BufWriter<OwnedWriteHalf>,
21 mem_pool: BytesMut,
22}
23
24impl HydraClient {
25 pub async fn connect_producer(
27 addr: SocketAddr,
28 session_id: &[u8; 64],
29 session_key: [u8; 32],
30 ) -> Result<Self> {
31 let stream = TcpStream::connect(addr).await?;
32 stream.set_nodelay(true)?;
33
34 let (reader, writer) = stream.into_split();
35 let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
36 let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
37 let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
38 let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
39 write_join_frame(
40 &mut writer,
41 Role::Producer,
42 session_id,
43 &transport_key,
44 &mut mem_pool,
45 )
46 .await?;
47
48 Ok(Self {
49 role: Role::Producer,
50 buf_reader: reader,
51 buf_writer: writer,
52 session_key,
53 mem_pool,
54 })
55 }
56
57 pub async fn broadcast(&mut self, data: &[u8]) -> Result<()> {
60 if self.role != Role::Producer {
61 bail!("broadcast is only available for producers");
62 }
63 write_encrypted_frame(
64 &mut self.buf_writer,
65 data,
66 &self.session_key,
67 &mut self.mem_pool,
68 )
69 .await
70 }
71
72 pub async fn connect_consumer(
74 addr: SocketAddr,
75 session_id: &[u8; 64],
76 session_key: [u8; 32],
77 ) -> Result<Self> {
78 let stream = TcpStream::connect(addr).await?;
79 stream.set_nodelay(true)?;
80 let (reader, writer) = stream.into_split();
81 let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
82 let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
83
84 let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
85 let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
86 write_join_frame(
87 &mut writer,
88 Role::Consumer,
89 session_id,
90 &transport_key,
91 &mut mem_pool,
92 )
93 .await?;
94
95 Ok(Self {
96 role: Role::Consumer,
97 buf_reader: reader,
98 buf_writer: writer,
99 session_key,
100 mem_pool,
101 })
102 }
103
104 pub async fn recv(&mut self) -> Result<&[u8]> {
108 if self.role != Role::Consumer {
109 bail!("recv is only available for consumers");
110 }
111 let decrypted =
112 read_encrypted_frame(&mut self.buf_reader, &self.session_key, &mut self.mem_pool)
113 .await?;
114 Ok(decrypted)
115 }
116
117 pub async fn server_status(&mut self) -> Result<()> {
119 todo!(
120 "Implement server status query, returns: uptime total_client_connected, total_sessions"
121 );
122 }
123
124 pub async fn close(&mut self) -> Result<()> {
126 self.buf_writer.flush().await?;
127 self.buf_writer.shutdown().await?;
128 Ok(())
129 }
130}