iroh_relay/client/
conn.rs

1//! Manages client-side connections to the relay server.
2//!
3//! based on tailscale/derp/derp_client.go
4
5use std::{
6    pin::Pin,
7    task::{Context, Poll, ready},
8};
9
10use iroh_base::SecretKey;
11use n0_error::{ensure, stack_error};
12use n0_future::{Sink, Stream};
13use tracing::debug;
14
15use super::KeyCache;
16#[cfg(not(wasm_browser))]
17use crate::client::streams::{MaybeTlsStream, ProxyStream};
18use crate::{
19    MAX_PACKET_SIZE,
20    protos::{
21        handshake,
22        relay::{ClientToRelayMsg, Error as ProtoError, RelayToClientMsg},
23        streams::WsBytesFramed,
24    },
25};
26
27/// Error for sending messages to the relay server.
28#[stack_error(derive, add_meta, from_sources, std_sources)]
29#[allow(missing_docs)]
30#[non_exhaustive]
31pub enum SendError {
32    #[error(transparent)]
33    StreamError {
34        #[cfg(not(wasm_browser))]
35        source: tokio_websockets::Error,
36        #[cfg(wasm_browser)]
37        source: ws_stream_wasm::WsErr,
38    },
39    #[error("Exceeds max packet size ({MAX_PACKET_SIZE}): {size}")]
40    ExceedsMaxPacketSize { size: usize },
41    #[error("Attempted to send empty packet")]
42    EmptyPacket {},
43}
44
45/// Errors when receiving messages from the relay server.
46#[stack_error(derive, add_meta, from_sources, std_sources)]
47#[allow(missing_docs)]
48#[non_exhaustive]
49pub enum RecvError {
50    #[error(transparent)]
51    Protocol { source: ProtoError },
52    #[error(transparent)]
53    StreamError {
54        #[cfg(not(wasm_browser))]
55        source: tokio_websockets::Error,
56        #[cfg(wasm_browser)]
57        source: ws_stream_wasm::WsErr,
58    },
59}
60
61/// A connection to a relay server.
62///
63/// This holds a connection to a relay server.  It is:
64///
65/// - A [`Stream`] for [`RelayToClientMsg`] to receive from the server.
66/// - A [`Sink`] for [`ClientToRelayMsg`] to send to the server.
67#[derive(derive_more::Debug)]
68pub(crate) struct Conn {
69    #[cfg(not(wasm_browser))]
70    #[debug("tokio_websockets::WebSocketStream")]
71    pub(crate) conn: WsBytesFramed<MaybeTlsStream<ProxyStream>>,
72    #[cfg(wasm_browser)]
73    #[debug("ws_stream_wasm::WsStream")]
74    pub(crate) conn: WsBytesFramed,
75    pub(crate) key_cache: KeyCache,
76}
77
78impl Conn {
79    /// Constructs a new websocket connection, including the initial server handshake.
80    pub(crate) async fn new(
81        #[cfg(not(wasm_browser))] io: tokio_websockets::WebSocketStream<
82            MaybeTlsStream<ProxyStream>,
83        >,
84        #[cfg(wasm_browser)] io: ws_stream_wasm::WsStream,
85        key_cache: KeyCache,
86        secret_key: &SecretKey,
87    ) -> Result<Self, handshake::Error> {
88        let mut conn = WsBytesFramed { io };
89
90        // exchange information with the server
91        debug!("server_handshake: started");
92        handshake::clientside(&mut conn, secret_key).await?;
93        debug!("server_handshake: done");
94
95        Ok(Self { conn, key_cache })
96    }
97
98    #[cfg(all(test, feature = "server"))]
99    pub(crate) fn test(io: tokio::io::DuplexStream) -> Self {
100        use crate::protos::relay::MAX_FRAME_SIZE;
101        Self {
102            conn: WsBytesFramed {
103                io: tokio_websockets::ClientBuilder::new()
104                    .limits(
105                        tokio_websockets::Limits::default().max_payload_len(Some(MAX_FRAME_SIZE)),
106                    )
107                    .take_over(MaybeTlsStream::Test(io)),
108            },
109            key_cache: KeyCache::test(),
110        }
111    }
112}
113
114impl Stream for Conn {
115    type Item = Result<RelayToClientMsg, RecvError>;
116
117    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
118        match ready!(Pin::new(&mut self.conn).poll_next(cx)) {
119            Some(Ok(msg)) => {
120                let message = RelayToClientMsg::from_bytes(msg, &self.key_cache);
121                Poll::Ready(Some(message.map_err(Into::into)))
122            }
123            Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
124            None => Poll::Ready(None),
125        }
126    }
127}
128
129impl Sink<ClientToRelayMsg> for Conn {
130    type Error = SendError;
131
132    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133        Pin::new(&mut self.conn).poll_ready(cx).map_err(Into::into)
134    }
135
136    fn start_send(mut self: Pin<&mut Self>, frame: ClientToRelayMsg) -> Result<(), Self::Error> {
137        let size = frame.encoded_len();
138        ensure!(
139            size <= MAX_PACKET_SIZE,
140            SendError::ExceedsMaxPacketSize { size }
141        );
142        if let ClientToRelayMsg::Datagrams { datagrams, .. } = &frame {
143            ensure!(!datagrams.contents.is_empty(), SendError::EmptyPacket);
144        }
145
146        Pin::new(&mut self.conn)
147            .start_send(frame.to_bytes().freeze())
148            .map_err(Into::into)
149    }
150
151    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152        Pin::new(&mut self.conn).poll_flush(cx).map_err(Into::into)
153    }
154
155    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156        Pin::new(&mut self.conn).poll_close(cx).map_err(Into::into)
157    }
158}