1use crate::BUFFER_SIZE;
2use crate::protocol::{
3 Role, perform_client_handshake, read_encrypted_frame, write_encrypted_frame, write_join_frame,
4};
5use anyhow::Result;
6use bytes::BytesMut;
7use std::marker::PhantomData;
8use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
9use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
10use tokio::net::{TcpStream, ToSocketAddrs};
11
12pub struct Producer;
14pub struct Consumer;
16
17pub struct HydraClient<R> {
40 session_key: [u8; 32],
41 buf_reader: BufReader<OwnedReadHalf>,
42 buf_writer: BufWriter<OwnedWriteHalf>,
43 mem_pool: BytesMut,
44 _role: PhantomData<R>,
45}
46
47impl HydraClient<Producer> {
48 pub async fn connect<A: ToSocketAddrs>(
50 addr: A,
51 session_id: &[u8; 64],
52 session_key: [u8; 32],
53 ) -> Result<Self> {
54 let stream = TcpStream::connect(addr).await?;
55 stream.set_nodelay(true)?;
56 let (reader, writer) = stream.into_split();
57 let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
58 let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
59 let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
60 let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
61 write_join_frame(
62 &mut writer,
63 Role::Producer,
64 session_id,
65 &transport_key,
66 &mut mem_pool,
67 )
68 .await?;
69
70 Ok(Self {
71 buf_reader: reader,
72 buf_writer: writer,
73 session_key,
74 mem_pool,
75 _role: PhantomData,
76 })
77 }
78
79 pub async fn broadcast(&mut self, data: &[u8]) -> Result<()> {
81 write_encrypted_frame(
82 &mut self.buf_writer,
83 data,
84 &self.session_key,
85 &mut self.mem_pool,
86 )
87 .await
88 }
89}
90
91impl HydraClient<Consumer> {
92 pub async fn connect<A: ToSocketAddrs>(
94 addr: A,
95 session_id: &[u8; 64],
96 session_key: [u8; 32],
97 ) -> Result<Self> {
98 let stream = TcpStream::connect(addr).await?;
99 stream.set_nodelay(true)?;
100 let (reader, writer) = stream.into_split();
101 let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
102 let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
103
104 let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
105 let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
106 write_join_frame(
107 &mut writer,
108 Role::Consumer,
109 session_id,
110 &transport_key,
111 &mut mem_pool,
112 )
113 .await?;
114
115 Ok(Self {
116 buf_reader: reader,
117 buf_writer: writer,
118 session_key,
119 mem_pool,
120 _role: PhantomData,
121 })
122 }
123
124 pub async fn recv(&mut self) -> Result<&[u8]> {
127 let decrypted =
128 read_encrypted_frame(&mut self.buf_reader, &self.session_key, &mut self.mem_pool)
129 .await?;
130 Ok(decrypted)
131 }
132}
133
134impl<R> HydraClient<R> {
135 pub async fn close(&mut self) -> Result<()> {
137 self.buf_writer.flush().await?;
138 self.buf_writer.shutdown().await?;
139 Ok(())
140 }
141}