bore_cli/
shared.rs

1//! Shared data structures, utilities, and protocol definitions.
2
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use futures_util::{SinkExt, StreamExt};
7use serde::{de::DeserializeOwned, Deserialize, Serialize};
8use tokio::io::{self, AsyncRead, AsyncWrite};
9use tokio::time::timeout;
10use tokio_util::codec::{AnyDelimiterCodec, Framed, FramedParts};
11use tracing::trace;
12use uuid::Uuid;
13
14/// TCP port used for control connections with the server.
15pub const CONTROL_PORT: u16 = 7835;
16
17/// Maximum byte length for a JSON frame in the stream.
18pub const MAX_FRAME_LENGTH: usize = 256;
19
20/// Timeout for network connections and initial protocol messages.
21pub const NETWORK_TIMEOUT: Duration = Duration::from_secs(3);
22
23/// A message from the client on the control connection.
24#[derive(Debug, Serialize, Deserialize)]
25pub enum ClientMessage {
26    /// Response to an authentication challenge from the server.
27    Authenticate(String),
28
29    /// Initial client message specifying a port to forward.
30    Hello(u16),
31
32    /// Accepts an incoming TCP connection, using this stream as a proxy.
33    Accept(Uuid),
34}
35
36/// A message from the server on the control connection.
37#[derive(Debug, Serialize, Deserialize)]
38pub enum ServerMessage {
39    /// Authentication challenge, sent as the first message, if enabled.
40    Challenge(Uuid),
41
42    /// Response to a client's initial message, with actual public port.
43    Hello(u16),
44
45    /// No-op used to test if the client is still reachable.
46    Heartbeat,
47
48    /// Asks the client to accept a forwarded TCP connection.
49    Connection(Uuid),
50
51    /// Indicates a server error that terminates the connection.
52    Error(String),
53}
54
55/// Transport stream with JSON frames delimited by null characters.
56pub struct Delimited<U>(Framed<U, AnyDelimiterCodec>);
57
58impl<U: AsyncRead + AsyncWrite + Unpin> Delimited<U> {
59    /// Construct a new delimited stream.
60    pub fn new(stream: U) -> Self {
61        let codec = AnyDelimiterCodec::new_with_max_length(vec![0], vec![0], MAX_FRAME_LENGTH);
62        Self(Framed::new(stream, codec))
63    }
64
65    /// Read the next null-delimited JSON instruction from a stream.
66    pub async fn recv<T: DeserializeOwned>(&mut self) -> Result<Option<T>> {
67        trace!("waiting to receive json message");
68        if let Some(next_message) = self.0.next().await {
69            let byte_message = next_message.context("frame error, invalid byte length")?;
70            let serialized_obj =
71                serde_json::from_slice(&byte_message).context("unable to parse message")?;
72            Ok(serialized_obj)
73        } else {
74            Ok(None)
75        }
76    }
77
78    /// Read the next null-delimited JSON instruction, with a default timeout.
79    ///
80    /// This is useful for parsing the initial message of a stream for handshake or
81    /// other protocol purposes, where we do not want to wait indefinitely.
82    pub async fn recv_timeout<T: DeserializeOwned>(&mut self) -> Result<Option<T>> {
83        timeout(NETWORK_TIMEOUT, self.recv())
84            .await
85            .context("timed out waiting for initial message")?
86    }
87
88    /// Send a null-terminated JSON instruction on a stream.
89    pub async fn send<T: Serialize>(&mut self, msg: T) -> Result<()> {
90        trace!("sending json message");
91        self.0.send(serde_json::to_string(&msg)?).await?;
92        Ok(())
93    }
94
95    /// Consume this object, returning current buffers and the inner transport.
96    pub fn into_parts(self) -> FramedParts<U, AnyDelimiterCodec> {
97        self.0.into_parts()
98    }
99}
100
101/// Copy data mutually between two read/write streams.
102pub async fn proxy<S1, S2>(stream1: S1, stream2: S2) -> io::Result<()>
103where
104    S1: AsyncRead + AsyncWrite + Unpin,
105    S2: AsyncRead + AsyncWrite + Unpin,
106{
107    let (mut s1_read, mut s1_write) = io::split(stream1);
108    let (mut s2_read, mut s2_write) = io::split(stream2);
109    tokio::select! {
110        res = io::copy(&mut s1_read, &mut s2_write) => res,
111        res = io::copy(&mut s2_read, &mut s1_write) => res,
112    }?;
113    Ok(())
114}