Skip to main content

hydra_sync/
client.rs

1use crate::BUFFER_SIZE;
2use crate::protocol::{
3    Role, perform_client_handshake, read_encrypted_frame, write_encrypted_frame, write_join_frame,
4};
5use anyhow::Result;
6use bytes::BytesMut;
7use std::marker::PhantomData;
8use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
9use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
10use tokio::net::{TcpStream, ToSocketAddrs};
11
12/// Unit structs to represent the `Role::Producer` at the type level for better safety and clarity.
13pub struct Producer;
14/// Unit structs to represent the `Role::Consumer` at the type level for better safety and clarity.
15pub struct Consumer;
16
17/// `HydraClient` connects to the relay server as a producer or consumer, performs handshake, and sends/receives encrypted frames.
18/// It maintains an internal memory pool (18 mb) for zero-copy crypto and buffering.
19/// The `broadcast` method allows producers to send encrypted frames to all connected consumers in the same session,
20/// while the `recv` method allows consumers to receive and decrypt frames from the producer.
21///
22/// ```no_run
23/// use hydra_sync::client::{HydraClient, Producer, Consumer};
24///
25/// #[tokio::main]
26/// async fn main() {
27///     let addr = "127.0.0.1:8000";
28///     let session_id = [0xFFu8; 64];
29///     let session_key = [0xAAu8; 32];
30///
31///     let mut producer = HydraClient::<Producer>::connect(addr, &session_id, session_key).await.unwrap();
32///     producer.broadcast(b"I luv you >.<").await.unwrap(); // sends to all consumer
33///
34///     let mut consumer = HydraClient::<Consumer>::connect(addr, &session_id, session_key).await.unwrap();
35///     consumer.recv().await.unwrap(); // recv whatever next frame on ring buf
36/// }
37/// ```
38///
39pub struct HydraClient<R> {
40    session_key: [u8; 32],
41    buf_reader: BufReader<OwnedReadHalf>,
42    buf_writer: BufWriter<OwnedWriteHalf>,
43    mem_pool: BytesMut,
44    _role: PhantomData<R>,
45}
46
47impl HydraClient<Producer> {
48    /// Connects to the server, performs handshake, and sends a join frame with `Role::Producer` and session_id.
49    pub async fn connect<A: ToSocketAddrs>(
50        addr: A,
51        session_id: &[u8; 64],
52        session_key: [u8; 32],
53    ) -> Result<Self> {
54        let stream = TcpStream::connect(addr).await?;
55        stream.set_nodelay(true)?;
56        let (reader, writer) = stream.into_split();
57        let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
58        let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
59        let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
60        let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
61        write_join_frame(
62            &mut writer,
63            Role::Producer,
64            session_id,
65            &transport_key,
66            &mut mem_pool,
67        )
68        .await?;
69
70        Ok(Self {
71            buf_reader: reader,
72            buf_writer: writer,
73            session_key,
74            mem_pool,
75            _role: PhantomData,
76        })
77    }
78
79    /// Broadcasts the given data as an encrypted frame to all connected consumers (zero-copy) in the same session.
80    pub async fn broadcast(&mut self, data: &[u8]) -> Result<()> {
81        write_encrypted_frame(
82            &mut self.buf_writer,
83            data,
84            &self.session_key,
85            &mut self.mem_pool,
86        )
87        .await
88    }
89}
90
91impl HydraClient<Consumer> {
92    /// Connects to the server, performs handshake, and sends a join frame with the `Role::Consumer` and session_id.
93    pub async fn connect<A: ToSocketAddrs>(
94        addr: A,
95        session_id: &[u8; 64],
96        session_key: [u8; 32],
97    ) -> Result<Self> {
98        let stream = TcpStream::connect(addr).await?;
99        stream.set_nodelay(true)?;
100        let (reader, writer) = stream.into_split();
101        let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
102        let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
103
104        let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
105        let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
106        write_join_frame(
107            &mut writer,
108            Role::Consumer,
109            session_id,
110            &transport_key,
111            &mut mem_pool,
112        )
113        .await?;
114
115        Ok(Self {
116            buf_reader: reader,
117            buf_writer: writer,
118            session_key,
119            mem_pool,
120            _role: PhantomData,
121        })
122    }
123
124    /// Receives the next encrypted frame from the producer, decrypts it, and returns the plaintext data as a byte slice.
125    /// The returned slice is valid until the next call to `recv` or `broadcast`, which may reuse the internal memory pool buffer.
126    pub async fn recv(&mut self) -> Result<&[u8]> {
127        let decrypted =
128            read_encrypted_frame(&mut self.buf_reader, &self.session_key, &mut self.mem_pool)
129                .await?;
130        Ok(decrypted)
131    }
132}
133
134impl<R> HydraClient<R> {
135    /// Closes the client connection gracefully by flushing and shutting down the writer (proper FIN).
136    pub async fn close(&mut self) -> Result<()> {
137        self.buf_writer.flush().await?;
138        self.buf_writer.shutdown().await?;
139        Ok(())
140    }
141}