1#![doc(
59 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
60 html_favicon_url = "https://commonware.xyz/favicon.ico"
61)]
62
63pub mod utils;
64
65use crate::utils::codec::{recv_frame, send_frame};
66use bytes::{Buf, Bytes};
67use commonware_codec::{DecodeExt, Encode as _, Error as CodecError};
68use commonware_cryptography::{
69 handshake::{
70 self, dial_end, dial_start, listen_end, listen_start, Ack, Context,
71 Error as HandshakeError, RecvCipher, SendCipher, Syn, SynAck,
72 },
73 transcript::Transcript,
74 Signer,
75};
76use commonware_macros::select;
77use commonware_runtime::{Clock, Error as RuntimeError, Sink, Stream};
78use commonware_utils::{hex, SystemTimeExt};
79use rand_core::CryptoRngCore;
80use std::{future::Future, ops::Range, time::Duration};
81use thiserror::Error;
82
83const CIPHERTEXT_OVERHEAD: u32 = {
84 assert!(handshake::CIPHERTEXT_OVERHEAD <= u32::MAX as usize);
85 handshake::CIPHERTEXT_OVERHEAD as u32
86};
87
88#[derive(Error, Debug)]
90pub enum Error {
91 #[error("handshake error: {0}")]
92 HandshakeError(HandshakeError),
93 #[error("unable to decode: {0}")]
94 UnableToDecode(CodecError),
95 #[error("peer rejected: {}", hex(_0))]
96 PeerRejected(Vec<u8>),
97 #[error("recv failed")]
98 RecvFailed(RuntimeError),
99 #[error("recv too large: {0} bytes")]
100 RecvTooLarge(usize),
101 #[error("invalid varint length prefix")]
102 InvalidVarint,
103 #[error("send failed")]
104 SendFailed(RuntimeError),
105 #[error("send zero size")]
106 SendZeroSize,
107 #[error("send too large: {0} bytes")]
108 SendTooLarge(usize),
109 #[error("connection closed")]
110 StreamClosed,
111 #[error("handshake timed out")]
112 HandshakeTimeout,
113}
114
115impl From<CodecError> for Error {
116 fn from(value: CodecError) -> Self {
117 Self::UnableToDecode(value)
118 }
119}
120
121impl From<HandshakeError> for Error {
122 fn from(value: HandshakeError) -> Self {
123 Self::HandshakeError(value)
124 }
125}
126
127#[derive(Clone)]
134pub struct Config<S> {
135 pub signing_key: S,
139
140 pub namespace: Vec<u8>,
143
144 pub max_message_size: u32,
146
147 pub synchrony_bound: Duration,
149
150 pub max_handshake_age: Duration,
152
153 pub handshake_timeout: Duration,
155}
156
157impl<S> Config<S> {
158 pub fn time_information(&self, ctx: &impl Clock) -> (u64, Range<u64>) {
160 fn duration_to_u64(d: Duration) -> u64 {
161 u64::try_from(d.as_millis()).expect("duration ms should fit in an u64")
162 }
163 let current_time_ms = duration_to_u64(ctx.current().epoch());
164 let ok_timestamps = (current_time_ms
165 .saturating_sub(duration_to_u64(self.max_handshake_age)))
166 ..(current_time_ms.saturating_add(duration_to_u64(self.synchrony_bound)));
167 (current_time_ms, ok_timestamps)
168 }
169}
170
171pub async fn dial<R: CryptoRngCore + Clock, S: Signer, I: Stream, O: Sink>(
174 mut ctx: R,
175 config: Config<S>,
176 peer: S::PublicKey,
177 mut stream: I,
178 mut sink: O,
179) -> Result<(Sender<O>, Receiver<I>), Error> {
180 let timeout = ctx.sleep(config.handshake_timeout);
181 let inner_routine = async move {
182 send_frame(
183 &mut sink,
184 config.signing_key.public_key().encode().as_ref(),
185 config.max_message_size,
186 )
187 .await?;
188
189 let (current_time, ok_timestamps) = config.time_information(&ctx);
190 let (state, syn) = dial_start(
191 &mut ctx,
192 Context::new(
193 &Transcript::new(&config.namespace),
194 current_time,
195 ok_timestamps,
196 config.signing_key,
197 peer,
198 ),
199 );
200 send_frame(&mut sink, syn.encode(), config.max_message_size).await?;
201
202 let syn_ack_bytes = recv_frame(&mut stream, config.max_message_size).await?;
203 let syn_ack = SynAck::<S::Signature>::decode(syn_ack_bytes)?;
204
205 let (ack, send, recv) = dial_end(state, syn_ack)?;
206 send_frame(&mut sink, ack.encode(), config.max_message_size).await?;
207
208 Ok((
209 Sender {
210 cipher: send,
211 sink,
212 max_message_size: config.max_message_size,
213 },
214 Receiver {
215 cipher: recv,
216 stream,
217 max_message_size: config.max_message_size,
218 },
219 ))
220 };
221
222 select! {
223 x = inner_routine => { x } ,
224 _ = timeout => { Err(Error::HandshakeTimeout) }
225 }
226}
227
228pub async fn listen<
231 R: CryptoRngCore + Clock,
232 S: Signer,
233 I: Stream,
234 O: Sink,
235 Fut: Future<Output = bool>,
236 F: FnOnce(S::PublicKey) -> Fut,
237>(
238 mut ctx: R,
239 bouncer: F,
240 config: Config<S>,
241 mut stream: I,
242 mut sink: O,
243) -> Result<(S::PublicKey, Sender<O>, Receiver<I>), Error> {
244 let timeout = ctx.sleep(config.handshake_timeout);
245 let inner_routine = async move {
246 let peer_bytes = recv_frame(&mut stream, config.max_message_size).await?;
247 let peer = S::PublicKey::decode(peer_bytes)?;
248 if !bouncer(peer.clone()).await {
249 return Err(Error::PeerRejected(peer.encode().to_vec()));
250 }
251
252 let msg1_bytes = recv_frame(&mut stream, config.max_message_size).await?;
253 let msg1 = Syn::<S::Signature>::decode(msg1_bytes)?;
254
255 let (current_time, ok_timestamps) = config.time_information(&ctx);
256 let (state, syn_ack) = listen_start(
257 &mut ctx,
258 Context::new(
259 &Transcript::new(&config.namespace),
260 current_time,
261 ok_timestamps,
262 config.signing_key,
263 peer.clone(),
264 ),
265 msg1,
266 )?;
267 send_frame(&mut sink, syn_ack.encode(), config.max_message_size).await?;
268
269 let ack_bytes = recv_frame(&mut stream, config.max_message_size).await?;
270 let ack = Ack::decode(ack_bytes)?;
271
272 let (send, recv) = listen_end(state, ack)?;
273
274 Ok((
275 peer,
276 Sender {
277 cipher: send,
278 sink,
279 max_message_size: config.max_message_size,
280 },
281 Receiver {
282 cipher: recv,
283 stream,
284 max_message_size: config.max_message_size,
285 },
286 ))
287 };
288
289 select! {
290 x = inner_routine => { x } ,
291 _ = timeout => { Err(Error::HandshakeTimeout) }
292 }
293}
294
295pub struct Sender<O> {
297 cipher: SendCipher,
298 sink: O,
299 max_message_size: u32,
300}
301
302impl<O: Sink> Sender<O> {
303 pub async fn send(&mut self, mut buf: impl Buf) -> Result<(), Error> {
305 let msg = buf.copy_to_bytes(buf.remaining());
307 let c = self.cipher.send(msg.as_ref())?;
308
309 send_frame(
310 &mut self.sink,
311 Bytes::from(c),
312 self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD),
313 )
314 .await?;
315 Ok(())
316 }
317}
318
319pub struct Receiver<I> {
321 cipher: RecvCipher,
322 stream: I,
323 max_message_size: u32,
324}
325
326impl<I: Stream> Receiver<I> {
327 pub async fn recv(&mut self) -> Result<Bytes, Error> {
329 let c = recv_frame(
330 &mut self.stream,
331 self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD),
332 )
333 .await?;
334 Ok(self.cipher.recv(&c)?.into())
335 }
336}
337
338#[cfg(test)]
339mod test {
340 use super::*;
341 use commonware_cryptography::{ed25519::PrivateKey, Signer};
342 use commonware_runtime::{deterministic, mocks, Runner as _, Spawner as _};
343
344 const NAMESPACE: &[u8] = b"fuzz_transport";
345 const MAX_MESSAGE_SIZE: u32 = 64 * 1024; #[test]
348 fn test_can_setup_and_send_messages() -> Result<(), Error> {
349 let executor = deterministic::Runner::default();
350 executor.start(|context| async move {
351 let dialer_crypto = PrivateKey::from_seed(42);
352 let listener_crypto = PrivateKey::from_seed(24);
353
354 let (dialer_sink, listener_stream) = mocks::Channel::init();
355 let (listener_sink, dialer_stream) = mocks::Channel::init();
356
357 let dialer_config = Config {
358 signing_key: dialer_crypto.clone(),
359 namespace: NAMESPACE.to_vec(),
360 max_message_size: MAX_MESSAGE_SIZE,
361 synchrony_bound: Duration::from_secs(1),
362 max_handshake_age: Duration::from_secs(1),
363 handshake_timeout: Duration::from_secs(1),
364 };
365
366 let listener_config = Config {
367 signing_key: listener_crypto.clone(),
368 namespace: NAMESPACE.to_vec(),
369 max_message_size: MAX_MESSAGE_SIZE,
370 synchrony_bound: Duration::from_secs(1),
371 max_handshake_age: Duration::from_secs(1),
372 handshake_timeout: Duration::from_secs(1),
373 };
374
375 let listener_handle = context.clone().spawn(move |context| async move {
376 listen(
377 context,
378 |_| async { true },
379 listener_config,
380 listener_stream,
381 listener_sink,
382 )
383 .await
384 });
385
386 let (mut dialer_sender, mut dialer_receiver) = dial(
387 context,
388 dialer_config,
389 listener_crypto.public_key(),
390 dialer_stream,
391 dialer_sink,
392 )
393 .await?;
394
395 let (listener_peer, mut listener_sender, mut listener_receiver) =
396 listener_handle.await.unwrap()?;
397 assert_eq!(listener_peer, dialer_crypto.public_key());
398 let messages: Vec<&'static [u8]> = vec![b"A", b"B", b"C"];
399 for msg in &messages {
400 dialer_sender.send(&msg[..]).await?;
401 let syn_ack = listener_receiver.recv().await?;
402 assert_eq!(msg, &syn_ack);
403 listener_sender.send(&msg[..]).await?;
404 let ack = dialer_receiver.recv().await?;
405 assert_eq!(msg, &ack);
406 }
407 Ok(())
408 })
409 }
410}