1#![doc(
59 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
60 html_favicon_url = "https://commonware.xyz/favicon.ico"
61)]
62
63pub mod utils;
64
65use crate::utils::codec::{recv_frame, send_frame};
66use bytes::Bytes;
67use commonware_codec::{DecodeExt, Encode as _, Error as CodecError};
68use commonware_cryptography::{
69 handshake::{
70 self, dial_end, dial_start, listen_end, listen_start, Ack, Context,
71 Error as HandshakeError, RecvCipher, SendCipher, Syn, SynAck,
72 },
73 transcript::Transcript,
74 Signer,
75};
76use commonware_macros::select;
77use commonware_runtime::{Clock, Error as RuntimeError, Sink, Stream};
78use commonware_utils::{hex, SystemTimeExt};
79use rand_core::CryptoRngCore;
80use std::{future::Future, ops::Range, time::Duration};
81use thiserror::Error;
82
83const CIPHERTEXT_OVERHEAD: u32 = {
84 assert!(handshake::CIPHERTEXT_OVERHEAD <= u32::MAX as usize);
85 handshake::CIPHERTEXT_OVERHEAD as u32
86};
87
88#[derive(Error, Debug)]
90pub enum Error {
91 #[error("handshake error: {0}")]
92 HandshakeError(HandshakeError),
93 #[error("unable to decode: {0}")]
94 UnableToDecode(CodecError),
95 #[error("peer rejected: {}", hex(_0))]
96 PeerRejected(Vec<u8>),
97 #[error("recv failed")]
98 RecvFailed(RuntimeError),
99 #[error("recv too large: {0} bytes")]
100 RecvTooLarge(usize),
101 #[error("invalid varint length prefix")]
102 InvalidVarint,
103 #[error("send failed")]
104 SendFailed(RuntimeError),
105 #[error("send zero size")]
106 SendZeroSize,
107 #[error("send too large: {0} bytes")]
108 SendTooLarge(usize),
109 #[error("connection closed")]
110 StreamClosed,
111 #[error("handshake timed out")]
112 HandshakeTimeout,
113}
114
115impl From<CodecError> for Error {
116 fn from(value: CodecError) -> Self {
117 Self::UnableToDecode(value)
118 }
119}
120
121impl From<HandshakeError> for Error {
122 fn from(value: HandshakeError) -> Self {
123 Self::HandshakeError(value)
124 }
125}
126
127#[derive(Clone)]
134pub struct Config<S> {
135 pub signing_key: S,
139
140 pub namespace: Vec<u8>,
143
144 pub max_message_size: u32,
146
147 pub synchrony_bound: Duration,
149
150 pub max_handshake_age: Duration,
152
153 pub handshake_timeout: Duration,
155}
156
157impl<S> Config<S> {
158 pub fn time_information(&self, ctx: &impl Clock) -> (u64, Range<u64>) {
160 fn duration_to_u64(d: Duration) -> u64 {
161 u64::try_from(d.as_millis()).expect("duration ms should fit in an u64")
162 }
163 let current_time_ms = duration_to_u64(ctx.current().epoch());
164 let ok_timestamps = (current_time_ms
165 .saturating_sub(duration_to_u64(self.max_handshake_age)))
166 ..(current_time_ms.saturating_add(duration_to_u64(self.synchrony_bound)));
167 (current_time_ms, ok_timestamps)
168 }
169}
170
171pub async fn dial<R: CryptoRngCore + Clock, S: Signer, I: Stream, O: Sink>(
174 mut ctx: R,
175 config: Config<S>,
176 peer: S::PublicKey,
177 mut stream: I,
178 mut sink: O,
179) -> Result<(Sender<O>, Receiver<I>), Error> {
180 let timeout = ctx.sleep(config.handshake_timeout);
181 let inner_routine = async move {
182 send_frame(
183 &mut sink,
184 config.signing_key.public_key().encode().as_ref(),
185 config.max_message_size,
186 )
187 .await?;
188
189 let (current_time, ok_timestamps) = config.time_information(&ctx);
190 let (state, syn) = dial_start(
191 &mut ctx,
192 Context::new(
193 &Transcript::new(&config.namespace),
194 current_time,
195 ok_timestamps,
196 config.signing_key,
197 peer,
198 ),
199 );
200 send_frame(&mut sink, &syn.encode(), config.max_message_size).await?;
201
202 let syn_ack_bytes = recv_frame(&mut stream, config.max_message_size).await?;
203 let syn_ack = SynAck::<S::Signature>::decode(syn_ack_bytes)?;
204
205 let (ack, send, recv) = dial_end(state, syn_ack)?;
206 send_frame(&mut sink, &ack.encode(), config.max_message_size).await?;
207
208 Ok((
209 Sender {
210 cipher: send,
211 sink,
212 max_message_size: config.max_message_size,
213 },
214 Receiver {
215 cipher: recv,
216 stream,
217 max_message_size: config.max_message_size,
218 },
219 ))
220 };
221
222 select! {
223 x = inner_routine => { x } ,
224 _ = timeout => { Err(Error::HandshakeTimeout) }
225 }
226}
227
228pub async fn listen<
231 R: CryptoRngCore + Clock,
232 S: Signer,
233 I: Stream,
234 O: Sink,
235 Fut: Future<Output = bool>,
236 F: FnOnce(S::PublicKey) -> Fut,
237>(
238 mut ctx: R,
239 bouncer: F,
240 config: Config<S>,
241 mut stream: I,
242 mut sink: O,
243) -> Result<(S::PublicKey, Sender<O>, Receiver<I>), Error> {
244 let timeout = ctx.sleep(config.handshake_timeout);
245 let inner_routine = async move {
246 let peer_bytes = recv_frame(&mut stream, config.max_message_size).await?;
247 let peer = S::PublicKey::decode(peer_bytes)?;
248 if !bouncer(peer.clone()).await {
249 return Err(Error::PeerRejected(peer.encode().to_vec()));
250 }
251
252 let msg1_bytes = recv_frame(&mut stream, config.max_message_size).await?;
253 let msg1 = Syn::<S::Signature>::decode(msg1_bytes)?;
254
255 let (current_time, ok_timestamps) = config.time_information(&ctx);
256 let (state, syn_ack) = listen_start(
257 &mut ctx,
258 Context::new(
259 &Transcript::new(&config.namespace),
260 current_time,
261 ok_timestamps,
262 config.signing_key,
263 peer.clone(),
264 ),
265 msg1,
266 )?;
267 send_frame(&mut sink, &syn_ack.encode(), config.max_message_size).await?;
268
269 let ack_bytes = recv_frame(&mut stream, config.max_message_size).await?;
270 let ack = Ack::decode(ack_bytes)?;
271
272 let (send, recv) = listen_end(state, ack)?;
273
274 Ok((
275 peer,
276 Sender {
277 cipher: send,
278 sink,
279 max_message_size: config.max_message_size,
280 },
281 Receiver {
282 cipher: recv,
283 stream,
284 max_message_size: config.max_message_size,
285 },
286 ))
287 };
288
289 select! {
290 x = inner_routine => { x } ,
291 _ = timeout => { Err(Error::HandshakeTimeout) }
292 }
293}
294
295pub struct Sender<O> {
297 cipher: SendCipher,
298 sink: O,
299 max_message_size: u32,
300}
301
302impl<O: Sink> Sender<O> {
303 pub async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
305 let c = self.cipher.send(msg)?;
306 send_frame(
307 &mut self.sink,
308 &c,
309 self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD),
310 )
311 .await?;
312 Ok(())
313 }
314}
315
316pub struct Receiver<I> {
318 cipher: RecvCipher,
319 stream: I,
320 max_message_size: u32,
321}
322
323impl<I: Stream> Receiver<I> {
324 pub async fn recv(&mut self) -> Result<Bytes, Error> {
326 let c = recv_frame(
327 &mut self.stream,
328 self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD),
329 )
330 .await?;
331 Ok(self.cipher.recv(&c)?.into())
332 }
333}
334
335#[cfg(test)]
336mod test {
337 use super::*;
338 use commonware_cryptography::{ed25519::PrivateKey, Signer};
339 use commonware_runtime::{deterministic, mocks, Runner as _, Spawner as _};
340
341 const NAMESPACE: &[u8] = b"fuzz_transport";
342 const MAX_MESSAGE_SIZE: u32 = 64 * 1024; #[test]
345 fn test_can_setup_and_send_messages() -> Result<(), Error> {
346 let executor = deterministic::Runner::default();
347 executor.start(|context| async move {
348 let dialer_crypto = PrivateKey::from_seed(42);
349 let listener_crypto = PrivateKey::from_seed(24);
350
351 let (dialer_sink, listener_stream) = mocks::Channel::init();
352 let (listener_sink, dialer_stream) = mocks::Channel::init();
353
354 let dialer_config = Config {
355 signing_key: dialer_crypto.clone(),
356 namespace: NAMESPACE.to_vec(),
357 max_message_size: MAX_MESSAGE_SIZE,
358 synchrony_bound: Duration::from_secs(1),
359 max_handshake_age: Duration::from_secs(1),
360 handshake_timeout: Duration::from_secs(1),
361 };
362
363 let listener_config = Config {
364 signing_key: listener_crypto.clone(),
365 namespace: NAMESPACE.to_vec(),
366 max_message_size: MAX_MESSAGE_SIZE,
367 synchrony_bound: Duration::from_secs(1),
368 max_handshake_age: Duration::from_secs(1),
369 handshake_timeout: Duration::from_secs(1),
370 };
371
372 let listener_handle = context.clone().spawn(move |context| async move {
373 listen(
374 context,
375 |_| async { true },
376 listener_config,
377 listener_stream,
378 listener_sink,
379 )
380 .await
381 });
382
383 let (mut dialer_sender, mut dialer_receiver) = dial(
384 context,
385 dialer_config,
386 listener_crypto.public_key(),
387 dialer_stream,
388 dialer_sink,
389 )
390 .await?;
391
392 let (listener_peer, mut listener_sender, mut listener_receiver) =
393 listener_handle.await.unwrap()?;
394 assert_eq!(listener_peer, dialer_crypto.public_key());
395 let messages: Vec<&'static [u8]> = vec![b"A", b"B", b"C"];
396 for msg in &messages {
397 dialer_sender.send(msg).await?;
398 let syn_ack = listener_receiver.recv().await?;
399 assert_eq!(msg, &syn_ack);
400 listener_sender.send(msg).await?;
401 let ack = dialer_receiver.recv().await?;
402 assert_eq!(msg, &ack);
403 }
404 Ok(())
405 })
406 }
407}