Skip to main content

hydra_sync/
client.rs

1use crate::BUFFER_SIZE;
2use crate::protocol::{
3    Role, perform_client_handshake, read_encrypted_frame, write_encrypted_frame, write_join_frame,
4};
5use anyhow::{Result, bail};
6use bytes::BytesMut;
7use std::net::SocketAddr;
8use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
9use tokio::net::TcpStream;
10use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
11
12/// `HydraClient` connects to the relay server as a producer or consumer, performs handshake, and sends/receives encrypted frames.
13/// It maintains an internal memory pool (18 mb) for zero-copy encryption/decryption and buffering.
14/// The `broadcast` method allows producers to send encrypted frames to all connected consumers in the same session,
15/// while the `recv` method allows consumers to receive and decrypt frames from the producer.
16pub struct HydraClient {
17    role: Role,
18    session_key: [u8; 32],
19    buf_reader: BufReader<OwnedReadHalf>,
20    buf_writer: BufWriter<OwnedWriteHalf>,
21    mem_pool: BytesMut,
22}
23
24impl HydraClient {
25    /// Connects to the relay server, performs handshake, and sends a join frame with the producer role and session_id.
26    pub async fn connect_producer(
27        addr: SocketAddr,
28        session_id: &[u8; 64],
29        session_key: [u8; 32],
30    ) -> Result<Self> {
31        let stream = TcpStream::connect(addr).await?;
32        stream.set_nodelay(true)?;
33
34        let (reader, writer) = stream.into_split();
35        let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
36        let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
37        let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
38        let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
39        write_join_frame(
40            &mut writer,
41            Role::Producer,
42            session_id,
43            &transport_key,
44            &mut mem_pool,
45        )
46        .await?;
47
48        Ok(Self {
49            role: Role::Producer,
50            buf_reader: reader,
51            buf_writer: writer,
52            session_key,
53            mem_pool,
54        })
55    }
56
57    /// Broadcasts the given data as an encrypted frame to all connected consumers (zero-copy) in the same session.
58    /// `broadcast` is only available for producers and will return an error if called on a consumer client.
59    pub async fn broadcast(&mut self, data: &[u8]) -> Result<()> {
60        if self.role != Role::Producer {
61            bail!("broadcast is only available for producers");
62        }
63        write_encrypted_frame(
64            &mut self.buf_writer,
65            data,
66            &self.session_key,
67            &mut self.mem_pool,
68        )
69        .await
70    }
71
72    /// Connects to the relay server, performs handshake, and sends a join frame with the consumer role and session_id.
73    pub async fn connect_consumer(
74        addr: SocketAddr,
75        session_id: &[u8; 64],
76        session_key: [u8; 32],
77    ) -> Result<Self> {
78        let stream = TcpStream::connect(addr).await?;
79        stream.set_nodelay(true)?;
80        let (reader, writer) = stream.into_split();
81        let mut writer = BufWriter::with_capacity(BUFFER_SIZE, writer);
82        let mut reader = BufReader::with_capacity(BUFFER_SIZE, reader);
83
84        let transport_key = perform_client_handshake(&mut reader, &mut writer).await?;
85        let mut mem_pool = BytesMut::with_capacity(1024 * 1024 * 18);
86        write_join_frame(
87            &mut writer,
88            Role::Consumer,
89            session_id,
90            &transport_key,
91            &mut mem_pool,
92        )
93        .await?;
94
95        Ok(Self {
96            role: Role::Consumer,
97            buf_reader: reader,
98            buf_writer: writer,
99            session_key,
100            mem_pool,
101        })
102    }
103
104    /// Receives the next encrypted frame from the producer, decrypts it, and returns the plaintext data as a byte slice.
105    /// The returned slice is valid until the next call to `recv` or `broadcast`, which may reuse the internal memory pool buffer.
106    /// `recv` is only available for consumers and will return an error if called on a producer client.
107    pub async fn recv(&mut self) -> Result<&[u8]> {
108        if self.role != Role::Consumer {
109            bail!("recv is only available for consumers");
110        }
111        let decrypted =
112            read_encrypted_frame(&mut self.buf_reader, &self.session_key, &mut self.mem_pool)
113                .await?;
114        Ok(decrypted)
115    }
116
117    /// Queries the relay server for current status, returns total connected clients and active sessions.
118    pub async fn server_status(&mut self) -> Result<()> {
119        todo!(
120            "Implement server status query, returns: uptime total_client_connected, total_sessions"
121        );
122    }
123
124    /// Closes the client connection gracefully by flushing and shutting down the writer (proper FIN).
125    pub async fn close(&mut self) -> Result<()> {
126        self.buf_writer.flush().await?;
127        self.buf_writer.shutdown().await?;
128        Ok(())
129    }
130}