commonware_stream/
lib.rs

1//! Exchange messages over arbitrary transport.
2//!
3//! # Design
4//!
5//! ## Handshake
6//!
7//! c.f. [commonware_cryptography::handshake]. One difference here is that the listener does not
8//! know the dialer's public key in advance. Instead, the dialer tells the listener its public key
9//! in the first message. The listener has an opportunity to reject the connection if it does not
10//! wish to connect ([listen] takes in an arbitrary function to implement this).
11//!
12//! ## Encryption
13//!
14//! All traffic is encrypted using ChaCha20-Poly1305. A shared secret is established using an
15//! ephemeral X25519 Diffie-Hellman key exchange. This secret, combined with the handshake
16//! transcript, is used to derive keys for both the handshake's key confirmation messages and
17//! the post-handshake data traffic. Binding the derived keys to the handshake transcript prevents
18//! man-in-the-middle and transcript substitution attacks.
19//!
20//! Each directional cipher uses a 12-byte nonce derived from a counter that is incremented for each
21//! message sent. This counter has sufficient cardinality for over 2.5 trillion years of continuous
22//! communication at a rate of 1 billion messages per second—sufficient for all practical use cases.
23//! This ensures that well-behaving peers can remain connected indefinitely as long as they both
24//! remain online (maximizing p2p network stability). In the unlikely case of counter overflow, the
25//! connection will be terminated and a new connection should be established. This method prevents
26//! nonce reuse (which would compromise message confidentiality) while saving bandwidth (as there is
27//! no need to transmit nonces explicitly).
28//!
29//! # Security
30//!
31//! ## Requirements
32//!
33//! - **Pre-Shared Namespace**: Peers must agree on a unique, application-specific namespace
34//!   out-of-band to prevent cross-application replay attacks.
35//! - **Time Synchronization**: Peer clocks must be synchronized to within the `synchrony_bound`
36//!   to correctly validate timestamps.
37//!
38//! ## Provided
39//!
40//! - **Mutual Authentication**: Both parties prove ownership of their static private keys through
41//!   signatures.
42//! - **Forward Secrecy**: Ephemeral encryption keys ensure that any compromise of long-term static keys
43//!   doesn't expose the contents of previous sessions.
44//! - **Session Uniqueness**: A listener's [commonware_cryptography::handshake::SynAck] is bound to the dialer's [commonware_cryptography::handshake::Syn] message and
45//!   [commonware_cryptography::handshake::Ack]s are bound to the complete handshake transcript, preventing replay attacks and ensuring
46//!   message integrity.
47//! - **Handshake Timeout**: A configurable deadline is enforced for handshake completion to protect
48//!   against malicious peers that create connections but abandon handshakes.
49//!
50//! ## Not Provided
51//!
52//! - **Anonymity**: Peer identities are not hidden during handshakes from network observers (both active
53//!   and passive).
54//! - **Padding**: Messages are encrypted as-is, allowing an attacker to perform traffic analysis.
55//! - **Future Secrecy**: If a peer's static private key is compromised, future sessions will be exposed.
56//! - **0-RTT**: The protocol does not support 0-RTT handshakes (resumed sessions).
57
58#![doc(
59    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
60    html_favicon_url = "https://commonware.xyz/favicon.ico"
61)]
62
63pub mod utils;
64
65use crate::utils::codec::{recv_frame, send_frame};
66use bytes::{Buf, Bytes};
67use commonware_codec::{DecodeExt, Encode as _, Error as CodecError};
68use commonware_cryptography::{
69    handshake::{
70        self, dial_end, dial_start, listen_end, listen_start, Ack, Context,
71        Error as HandshakeError, RecvCipher, SendCipher, Syn, SynAck,
72    },
73    transcript::Transcript,
74    Signer,
75};
76use commonware_macros::select;
77use commonware_runtime::{Clock, Error as RuntimeError, Sink, Stream};
78use commonware_utils::{hex, SystemTimeExt};
79use rand_core::CryptoRngCore;
80use std::{future::Future, ops::Range, time::Duration};
81use thiserror::Error;
82
83const CIPHERTEXT_OVERHEAD: u32 = {
84    assert!(handshake::CIPHERTEXT_OVERHEAD <= u32::MAX as usize);
85    handshake::CIPHERTEXT_OVERHEAD as u32
86};
87
88/// Errors that can occur when interacting with a stream.
89#[derive(Error, Debug)]
90pub enum Error {
91    #[error("handshake error: {0}")]
92    HandshakeError(HandshakeError),
93    #[error("unable to decode: {0}")]
94    UnableToDecode(CodecError),
95    #[error("peer rejected: {}", hex(_0))]
96    PeerRejected(Vec<u8>),
97    #[error("recv failed")]
98    RecvFailed(RuntimeError),
99    #[error("recv too large: {0} bytes")]
100    RecvTooLarge(usize),
101    #[error("invalid varint length prefix")]
102    InvalidVarint,
103    #[error("send failed")]
104    SendFailed(RuntimeError),
105    #[error("send zero size")]
106    SendZeroSize,
107    #[error("send too large: {0} bytes")]
108    SendTooLarge(usize),
109    #[error("connection closed")]
110    StreamClosed,
111    #[error("handshake timed out")]
112    HandshakeTimeout,
113}
114
115impl From<CodecError> for Error {
116    fn from(value: CodecError) -> Self {
117        Self::UnableToDecode(value)
118    }
119}
120
121impl From<HandshakeError> for Error {
122    fn from(value: HandshakeError) -> Self {
123        Self::HandshakeError(value)
124    }
125}
126
127/// Configuration for a connection.
128///
129/// # Warning
130///
131/// Synchronize this configuration across all peers.
132/// Mismatched configurations may cause dropped connections or parsing errors.
133#[derive(Clone)]
134pub struct Config<S> {
135    /// The private key used for signing messages.
136    ///
137    /// This proves our own identity to other peers.
138    pub signing_key: S,
139
140    /// Unique prefix for all signed messages. Should be application-specific.
141    /// Prevents replay attacks across different applications using the same keys.
142    pub namespace: Vec<u8>,
143
144    /// Maximum message size (in bytes). Prevents memory exhaustion DoS attacks.
145    pub max_message_size: u32,
146
147    /// Maximum time drift allowed for future timestamps. Handles clock skew.
148    pub synchrony_bound: Duration,
149
150    /// Maximum age of handshake messages before rejection.
151    pub max_handshake_age: Duration,
152
153    /// The allotted time for the handshake to complete.
154    pub handshake_timeout: Duration,
155}
156
157impl<S> Config<S> {
158    /// Computes current time and acceptable timestamp range.
159    pub fn time_information(&self, ctx: &impl Clock) -> (u64, Range<u64>) {
160        fn duration_to_u64(d: Duration) -> u64 {
161            u64::try_from(d.as_millis()).expect("duration ms should fit in an u64")
162        }
163        let current_time_ms = duration_to_u64(ctx.current().epoch());
164        let ok_timestamps = (current_time_ms
165            .saturating_sub(duration_to_u64(self.max_handshake_age)))
166            ..(current_time_ms.saturating_add(duration_to_u64(self.synchrony_bound)));
167        (current_time_ms, ok_timestamps)
168    }
169}
170
171/// Establishes an authenticated connection to a peer as the dialer.
172/// Returns sender and receiver for encrypted communication.
173pub async fn dial<R: CryptoRngCore + Clock, S: Signer, I: Stream, O: Sink>(
174    mut ctx: R,
175    config: Config<S>,
176    peer: S::PublicKey,
177    mut stream: I,
178    mut sink: O,
179) -> Result<(Sender<O>, Receiver<I>), Error> {
180    let timeout = ctx.sleep(config.handshake_timeout);
181    let inner_routine = async move {
182        send_frame(
183            &mut sink,
184            config.signing_key.public_key().encode().as_ref(),
185            config.max_message_size,
186        )
187        .await?;
188
189        let (current_time, ok_timestamps) = config.time_information(&ctx);
190        let (state, syn) = dial_start(
191            &mut ctx,
192            Context::new(
193                &Transcript::new(&config.namespace),
194                current_time,
195                ok_timestamps,
196                config.signing_key,
197                peer,
198            ),
199        );
200        send_frame(&mut sink, syn.encode(), config.max_message_size).await?;
201
202        let syn_ack_bytes = recv_frame(&mut stream, config.max_message_size).await?;
203        let syn_ack = SynAck::<S::Signature>::decode(syn_ack_bytes)?;
204
205        let (ack, send, recv) = dial_end(state, syn_ack)?;
206        send_frame(&mut sink, ack.encode(), config.max_message_size).await?;
207
208        Ok((
209            Sender {
210                cipher: send,
211                sink,
212                max_message_size: config.max_message_size,
213            },
214            Receiver {
215                cipher: recv,
216                stream,
217                max_message_size: config.max_message_size,
218            },
219        ))
220    };
221
222    select! {
223        x = inner_routine => { x } ,
224        _ = timeout => { Err(Error::HandshakeTimeout) }
225    }
226}
227
228/// Accepts an authenticated connection from a peer as the listener.
229/// Returns the peer's identity, sender, and receiver for encrypted communication.
230pub async fn listen<
231    R: CryptoRngCore + Clock,
232    S: Signer,
233    I: Stream,
234    O: Sink,
235    Fut: Future<Output = bool>,
236    F: FnOnce(S::PublicKey) -> Fut,
237>(
238    mut ctx: R,
239    bouncer: F,
240    config: Config<S>,
241    mut stream: I,
242    mut sink: O,
243) -> Result<(S::PublicKey, Sender<O>, Receiver<I>), Error> {
244    let timeout = ctx.sleep(config.handshake_timeout);
245    let inner_routine = async move {
246        let peer_bytes = recv_frame(&mut stream, config.max_message_size).await?;
247        let peer = S::PublicKey::decode(peer_bytes)?;
248        if !bouncer(peer.clone()).await {
249            return Err(Error::PeerRejected(peer.encode().to_vec()));
250        }
251
252        let msg1_bytes = recv_frame(&mut stream, config.max_message_size).await?;
253        let msg1 = Syn::<S::Signature>::decode(msg1_bytes)?;
254
255        let (current_time, ok_timestamps) = config.time_information(&ctx);
256        let (state, syn_ack) = listen_start(
257            &mut ctx,
258            Context::new(
259                &Transcript::new(&config.namespace),
260                current_time,
261                ok_timestamps,
262                config.signing_key,
263                peer.clone(),
264            ),
265            msg1,
266        )?;
267        send_frame(&mut sink, syn_ack.encode(), config.max_message_size).await?;
268
269        let ack_bytes = recv_frame(&mut stream, config.max_message_size).await?;
270        let ack = Ack::decode(ack_bytes)?;
271
272        let (send, recv) = listen_end(state, ack)?;
273
274        Ok((
275            peer,
276            Sender {
277                cipher: send,
278                sink,
279                max_message_size: config.max_message_size,
280            },
281            Receiver {
282                cipher: recv,
283                stream,
284                max_message_size: config.max_message_size,
285            },
286        ))
287    };
288
289    select! {
290        x = inner_routine => { x } ,
291        _ = timeout => { Err(Error::HandshakeTimeout) }
292    }
293}
294
295/// Sends encrypted messages to a peer.
296pub struct Sender<O> {
297    cipher: SendCipher,
298    sink: O,
299    max_message_size: u32,
300}
301
302impl<O: Sink> Sender<O> {
303    /// Encrypts and sends a message to the peer.
304    pub async fn send(&mut self, mut buf: impl Buf) -> Result<(), Error> {
305        // Copy the buffer to ensure contiguous memory for encryption.
306        let msg = buf.copy_to_bytes(buf.remaining());
307        let c = self.cipher.send(msg.as_ref())?;
308
309        send_frame(
310            &mut self.sink,
311            Bytes::from(c),
312            self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD),
313        )
314        .await?;
315        Ok(())
316    }
317}
318
319/// Receives encrypted messages from a peer.
320pub struct Receiver<I> {
321    cipher: RecvCipher,
322    stream: I,
323    max_message_size: u32,
324}
325
326impl<I: Stream> Receiver<I> {
327    /// Receives and decrypts a message from the peer.
328    pub async fn recv(&mut self) -> Result<Bytes, Error> {
329        let c = recv_frame(
330            &mut self.stream,
331            self.max_message_size.saturating_add(CIPHERTEXT_OVERHEAD),
332        )
333        .await?;
334        Ok(self.cipher.recv(&c)?.into())
335    }
336}
337
338#[cfg(test)]
339mod test {
340    use super::*;
341    use commonware_cryptography::{ed25519::PrivateKey, Signer};
342    use commonware_runtime::{deterministic, mocks, Runner as _, Spawner as _};
343
344    const NAMESPACE: &[u8] = b"fuzz_transport";
345    const MAX_MESSAGE_SIZE: u32 = 64 * 1024; // 64KB buffer
346
347    #[test]
348    fn test_can_setup_and_send_messages() -> Result<(), Error> {
349        let executor = deterministic::Runner::default();
350        executor.start(|context| async move {
351            let dialer_crypto = PrivateKey::from_seed(42);
352            let listener_crypto = PrivateKey::from_seed(24);
353
354            let (dialer_sink, listener_stream) = mocks::Channel::init();
355            let (listener_sink, dialer_stream) = mocks::Channel::init();
356
357            let dialer_config = Config {
358                signing_key: dialer_crypto.clone(),
359                namespace: NAMESPACE.to_vec(),
360                max_message_size: MAX_MESSAGE_SIZE,
361                synchrony_bound: Duration::from_secs(1),
362                max_handshake_age: Duration::from_secs(1),
363                handshake_timeout: Duration::from_secs(1),
364            };
365
366            let listener_config = Config {
367                signing_key: listener_crypto.clone(),
368                namespace: NAMESPACE.to_vec(),
369                max_message_size: MAX_MESSAGE_SIZE,
370                synchrony_bound: Duration::from_secs(1),
371                max_handshake_age: Duration::from_secs(1),
372                handshake_timeout: Duration::from_secs(1),
373            };
374
375            let listener_handle = context.clone().spawn(move |context| async move {
376                listen(
377                    context,
378                    |_| async { true },
379                    listener_config,
380                    listener_stream,
381                    listener_sink,
382                )
383                .await
384            });
385
386            let (mut dialer_sender, mut dialer_receiver) = dial(
387                context,
388                dialer_config,
389                listener_crypto.public_key(),
390                dialer_stream,
391                dialer_sink,
392            )
393            .await?;
394
395            let (listener_peer, mut listener_sender, mut listener_receiver) =
396                listener_handle.await.unwrap()?;
397            assert_eq!(listener_peer, dialer_crypto.public_key());
398            let messages: Vec<&'static [u8]> = vec![b"A", b"B", b"C"];
399            for msg in &messages {
400                dialer_sender.send(&msg[..]).await?;
401                let syn_ack = listener_receiver.recv().await?;
402                assert_eq!(msg, &syn_ack);
403                listener_sender.send(&msg[..]).await?;
404                let ack = dialer_receiver.recv().await?;
405                assert_eq!(msg, &ack);
406            }
407            Ok(())
408        })
409    }
410}