iroh_relay/client/
conn.rs1use std::{
6 pin::Pin,
7 task::{Context, Poll, ready},
8};
9
10use iroh_base::SecretKey;
11use n0_error::{ensure, stack_error};
12use n0_future::{Sink, Stream};
13use tracing::debug;
14
15use super::KeyCache;
16#[cfg(not(wasm_browser))]
17use crate::client::streams::{MaybeTlsStream, ProxyStream};
18use crate::{
19 MAX_PACKET_SIZE,
20 protos::{
21 handshake,
22 relay::{ClientToRelayMsg, Error as ProtoError, RelayToClientMsg},
23 streams::WsBytesFramed,
24 },
25};
26
27#[stack_error(derive, add_meta, from_sources, std_sources)]
29#[allow(missing_docs)]
30#[non_exhaustive]
31pub enum SendError {
32 #[error(transparent)]
33 StreamError {
34 #[cfg(not(wasm_browser))]
35 source: tokio_websockets::Error,
36 #[cfg(wasm_browser)]
37 source: ws_stream_wasm::WsErr,
38 },
39 #[error("Exceeds max packet size ({MAX_PACKET_SIZE}): {size}")]
40 ExceedsMaxPacketSize { size: usize },
41 #[error("Attempted to send empty packet")]
42 EmptyPacket {},
43}
44
45#[stack_error(derive, add_meta, from_sources, std_sources)]
47#[allow(missing_docs)]
48#[non_exhaustive]
49pub enum RecvError {
50 #[error(transparent)]
51 Protocol { source: ProtoError },
52 #[error(transparent)]
53 StreamError {
54 #[cfg(not(wasm_browser))]
55 source: tokio_websockets::Error,
56 #[cfg(wasm_browser)]
57 source: ws_stream_wasm::WsErr,
58 },
59}
60
61#[derive(derive_more::Debug)]
68pub(crate) struct Conn {
69 #[cfg(not(wasm_browser))]
70 #[debug("tokio_websockets::WebSocketStream")]
71 pub(crate) conn: WsBytesFramed<MaybeTlsStream<ProxyStream>>,
72 #[cfg(wasm_browser)]
73 #[debug("ws_stream_wasm::WsStream")]
74 pub(crate) conn: WsBytesFramed,
75 pub(crate) key_cache: KeyCache,
76}
77
78impl Conn {
79 pub(crate) async fn new(
81 #[cfg(not(wasm_browser))] io: tokio_websockets::WebSocketStream<
82 MaybeTlsStream<ProxyStream>,
83 >,
84 #[cfg(wasm_browser)] io: ws_stream_wasm::WsStream,
85 key_cache: KeyCache,
86 secret_key: &SecretKey,
87 ) -> Result<Self, handshake::Error> {
88 let mut conn = WsBytesFramed { io };
89
90 debug!("server_handshake: started");
92 handshake::clientside(&mut conn, secret_key).await?;
93 debug!("server_handshake: done");
94
95 Ok(Self { conn, key_cache })
96 }
97
98 #[cfg(all(test, feature = "server"))]
99 pub(crate) fn test(io: tokio::io::DuplexStream) -> Self {
100 use crate::protos::relay::MAX_FRAME_SIZE;
101 Self {
102 conn: WsBytesFramed {
103 io: tokio_websockets::ClientBuilder::new()
104 .limits(
105 tokio_websockets::Limits::default().max_payload_len(Some(MAX_FRAME_SIZE)),
106 )
107 .take_over(MaybeTlsStream::Test(io)),
108 },
109 key_cache: KeyCache::test(),
110 }
111 }
112}
113
114impl Stream for Conn {
115 type Item = Result<RelayToClientMsg, RecvError>;
116
117 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
118 match ready!(Pin::new(&mut self.conn).poll_next(cx)) {
119 Some(Ok(msg)) => {
120 let message = RelayToClientMsg::from_bytes(msg, &self.key_cache);
121 Poll::Ready(Some(message.map_err(Into::into)))
122 }
123 Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
124 None => Poll::Ready(None),
125 }
126 }
127}
128
129impl Sink<ClientToRelayMsg> for Conn {
130 type Error = SendError;
131
132 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133 Pin::new(&mut self.conn).poll_ready(cx).map_err(Into::into)
134 }
135
136 fn start_send(mut self: Pin<&mut Self>, frame: ClientToRelayMsg) -> Result<(), Self::Error> {
137 let size = frame.encoded_len();
138 ensure!(
139 size <= MAX_PACKET_SIZE,
140 SendError::ExceedsMaxPacketSize { size }
141 );
142 if let ClientToRelayMsg::Datagrams { datagrams, .. } = &frame {
143 ensure!(!datagrams.contents.is_empty(), SendError::EmptyPacket);
144 }
145
146 Pin::new(&mut self.conn)
147 .start_send(frame.to_bytes().freeze())
148 .map_err(Into::into)
149 }
150
151 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152 Pin::new(&mut self.conn).poll_flush(cx).map_err(Into::into)
153 }
154
155 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 Pin::new(&mut self.conn).poll_close(cx).map_err(Into::into)
157 }
158}