cometbft_p2p/
async_secret_connection.rs

1//! Async Secret Connection type.
2
3#![cfg(feature = "async")]
4
5use crate::{
6    Error, MAX_MSG_LEN, PublicKey, Result, ed25519,
7    encryption::{Frame, RecvState, SendState},
8    handshake, proto,
9    traits::{AsyncReadMsg, AsyncWriteMsg},
10};
11use ed25519_dalek::Signer;
12use prost::Message;
13use tokio::io::{self, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
14
15#[cfg(doc)]
16use crate::IdentitySecret;
17
18/// Encrypted connection between peers in a CometBFT network, implemented using asynchronous I/O
19/// provided by the Tokio async runtime.
20pub struct AsyncSecretConnection<Io> {
21    /// Message reader which holds the read-half of the I/O object and the associated symmetric
22    /// cipher state.
23    reader: AsyncSecretReader<Io>,
24
25    /// Message writer which holds the write-half of the I/O object and the associated symmetric
26    /// cipher state.
27    writer: AsyncSecretWriter<Io>,
28
29    /// Our identity's Ed25519 public key.
30    local_public_key: PublicKey,
31
32    /// Remote peer's Ed25519 public key.
33    peer_public_key: PublicKey,
34}
35
36impl<Io: AsyncReadExt + AsyncWriteExt + Send + Sync + Unpin> AsyncSecretConnection<Io> {
37    /// Performs a handshake and returns a new `AsyncSecretConnection`, authenticating ourselves
38    /// with the provided `Identity` (Ed25519 signing key).
39    ///
40    /// The [`IdentitySecret`] type can be used as an `identity_key`.
41    ///
42    /// # Errors
43    ///
44    /// - if sharing of the pubkey fails
45    /// - if sharing of the signature fails
46    /// - if receiving the signature fails
47    /// - if verifying the signature fails
48    pub async fn new<Identity>(io: Io, identity_key: &Identity) -> Result<Self>
49    where
50        Identity: Signer<ed25519::Signature>,
51        ed25519::VerifyingKey: for<'a> From<&'a Identity>,
52    {
53        // Start a handshake process, generating a local ephemeral X25519 public key.
54        let local_public_key: PublicKey = ed25519::VerifyingKey::from(identity_key).into();
55        let (mut initial_state, initial_message) = handshake::InitialState::new();
56        let (mut io_read, mut io_write) = io::split(io);
57
58        // Send our ephemeral X25519 public key to the remote peer (unencrypted) and simultaneously
59        // read theirs.
60        let initial_message = initial_message.encode_length_delimited_to_vec();
61        let write_future = io_write.write_all(&initial_message);
62        let read_future = read_initial_msg(&mut io_read);
63        let (peer_initial_bytes, _) = tokio::try_join!(read_future, write_future)?;
64
65        // Compute signature over the handshake transcript and initialize symmetric cipher state
66        // using shared secret computed using X25519.
67        let peer_initial_msg =
68            handshake::InitialMessage::decode_length_delimited(peer_initial_bytes.as_slice())?;
69        let (challenge, cipher_state) = initial_state.got_key(peer_initial_msg.pub_key)?;
70
71        // Create the async message reader and writer objects.
72        let mut reader = AsyncSecretReader {
73            io: io_read,
74            recv_state: cipher_state.recv_state,
75        };
76        let mut writer = AsyncSecretWriter {
77            io: io_write,
78            send_state: cipher_state.send_state,
79        };
80
81        // Send our identity's Ed25519 public key and signature over the transcript to the peer.
82        let write_future = writer.write_msg(proto::p2p::AuthSigMessage {
83            pub_key: Some(local_public_key.into()),
84            sig: challenge.sign_challenge(identity_key).to_vec(),
85        });
86
87        // Read the peer's Ed25519 public key and use it to verify their signature over the
88        // handshake transcript.
89        let read_future = reader.read_msg::<proto::p2p::AuthSigMessage>();
90        let (peer_auth_sig_msg, _) = tokio::try_join!(read_future, write_future)?;
91
92        // Verify the key and signature validate for our computed Merlin transcript hash
93        let peer_public_key = challenge.got_signature(peer_auth_sig_msg)?;
94
95        // All good!
96        Ok(Self {
97            reader,
98            writer,
99            local_public_key,
100            peer_public_key,
101        })
102    }
103}
104
105impl<Io> AsyncSecretConnection<Io> {
106    /// Get the local (i.e. our) [`PublicKey`].
107    pub fn local_public_key(&self) -> &PublicKey {
108        &self.local_public_key
109    }
110
111    /// Returns the remote peer's [`PublicKey`].
112    pub fn peer_public_key(&self) -> &PublicKey {
113        &self.peer_public_key
114    }
115
116    /// Split this [`AsyncSecretConnection`] into an [`AsyncSecretReader`] and [`AsyncSecretWriter`]
117    /// which can be used independently of each other.
118    pub fn split(self) -> (AsyncSecretReader<Io>, AsyncSecretWriter<Io>) {
119        (self.reader, self.writer)
120    }
121}
122
123impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretConnection<Io> {
124    #[inline]
125    fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
126        self.reader.read_msg()
127    }
128}
129
130impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretConnection<Io> {
131    #[inline]
132    fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
133        self.writer.write_msg(msg)
134    }
135}
136
137/// Async encrypted message reader type which wraps the read-half of an underlying I/O object.
138pub struct AsyncSecretReader<Io> {
139    /// Inner async I/O reader object this connection type wraps.
140    io: ReadHalf<Io>,
141
142    /// Symmetric cipher state including the current nonce.
143    recv_state: RecvState,
144}
145
146impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncSecretReader<Io> {
147    /// Read and decrypt a frame from the network.
148    #[inline]
149    async fn read_frame(&mut self) -> Result<Frame> {
150        let mut bytes = [0u8; Frame::ENCRYPTED_SIZE];
151        self.io.read_exact(&mut bytes).await?;
152
153        let mut frame = Frame::from_ciphertext(bytes);
154        self.recv_state.decrypt_frame(&mut frame)?;
155        Ok(frame)
156    }
157
158    /// Read and decrypt a message `M` from the underlying I/O object.
159    ///
160    /// Core implementation of the `AsyncReadMsg` trait, written as an `async fn` for simplicity.
161    async fn _read_msg<M: Message + Default>(&mut self) -> Result<M> {
162        let frame = self.read_frame().await?;
163        let frame_plaintext = frame.plaintext()?;
164
165        // Decode the length prefix on the proto
166        let msg_len = proto::decode_length_delimiter_inclusive(frame_plaintext)?;
167
168        if msg_len > MAX_MSG_LEN {
169            return Err(Error::MessageSize { size: msg_len });
170        }
171
172        // Skip the heap if the proto fits in a single message frame
173        if frame_plaintext.len() == msg_len {
174            return Ok(M::decode_length_delimited(frame_plaintext)?);
175        }
176
177        let mut msg = Vec::with_capacity(msg_len);
178        msg.extend_from_slice(frame_plaintext);
179
180        while msg.len() < msg_len {
181            msg.extend_from_slice(self.read_frame().await?.plaintext()?);
182        }
183
184        Ok(M::decode_length_delimited(msg.as_slice())?)
185    }
186}
187
188impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretReader<Io> {
189    #[inline]
190    fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
191        self._read_msg()
192    }
193}
194
195/// Async encrypted message writer type which wraps the write-half of an underlying I/O object.
196pub struct AsyncSecretWriter<Io> {
197    /// Inner async I/O writer object this connection type wraps.
198    io: WriteHalf<Io>,
199
200    /// Symmetric cipher state including the current nonce.
201    send_state: SendState,
202}
203
204impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncSecretWriter<Io> {
205    /// Encrypt and write a frame to the network.
206    #[inline]
207    async fn write_frame(&mut self, plaintext: &[u8]) -> Result<()> {
208        let mut frame = Frame::from_plaintext(plaintext)?;
209        self.send_state.encrypt_frame(&mut frame)?;
210        Ok(self.io.write_all(frame.ciphertext()?).await?)
211    }
212
213    /// Encrypt and write a message `M` to the underlying I/O object.
214    ///
215    /// Core implementation of the `AsyncWriteMsg` trait, written as an `async fn` for simplicity.
216    async fn _write_msg<M: Message>(&mut self, msg: M) -> Result<()> {
217        let bytes = msg.encode_length_delimited_to_vec();
218
219        for chunk in bytes.chunks(Frame::MAX_PLAINTEXT_SIZE) {
220            self.write_frame(chunk).await?;
221        }
222
223        Ok(())
224    }
225}
226
227impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretWriter<Io> {
228    #[inline]
229    fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
230        self._write_msg(msg)
231    }
232}
233
234/// Read the `handshake::InitialMessage` from the underlying `Io` object.
235async fn read_initial_msg<Io: AsyncReadExt + Unpin>(
236    io: &mut Io,
237) -> io::Result<[u8; 1 + handshake::InitialMessage::LENGTH]> {
238    // Read the remote side's initial message containing their X25519 public key
239    let mut buf = [0u8; 1 + handshake::InitialMessage::LENGTH]; // extra byte for length prefix
240    io.read_exact(&mut buf).await?;
241    Ok(buf)
242}
243
244// NOTE: tests are in `tests/async_secret_connection.rs`