tmkms_p2p/
async_secret_connection.rs

1//! Async Secret Connection type.
2
3#![cfg(feature = "async")]
4
5use crate::{
6    Error, MAX_MSG_LEN, PublicKey, Result, ed25519,
7    encryption::{Frame, RecvState, SendState},
8    handshake, proto,
9    traits::{AsyncReadMsg, AsyncWriteMsg},
10};
11use ed25519_dalek::Signer;
12use prost::Message;
13use tokio::io::{self, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
14
15#[cfg(doc)]
16use crate::IdentitySecret;
17
18/// Encrypted connection between peers in a CometBFT network, implemented using asynchronous I/O
19/// provided by the Tokio async runtime.
20pub struct AsyncSecretConnection<Io> {
21    /// Message reader which holds the read-half of the I/O object and the associated symmetric
22    /// cipher state.
23    reader: AsyncSecretReader<Io>,
24
25    /// Message writer which holds the write-half of the I/O object and the associated symmetric
26    /// cipher state.
27    writer: AsyncSecretWriter<Io>,
28
29    /// Our identity's Ed25519 public key.
30    local_public_key: PublicKey,
31
32    /// Remote peer's Ed25519 public key.
33    peer_public_key: PublicKey,
34}
35
36impl<Io: AsyncReadExt + AsyncWriteExt + Send + Sync + Unpin> AsyncSecretConnection<Io> {
37    /// Performs a handshake and returns a new `AsyncSecretConnection`, authenticating ourselves
38    /// with the provided `Identity` (Ed25519 signing key).
39    ///
40    /// The [`IdentitySecret`] type can be used as an `identity_key`.
41    ///
42    /// # Errors
43    ///
44    /// - if sharing of the pubkey fails
45    /// - if sharing of the signature fails
46    /// - if receiving the signature fails
47    /// - if verifying the signature fails
48    pub async fn new<Identity>(io: Io, identity_key: &Identity) -> Result<Self>
49    where
50        Identity: Signer<ed25519::Signature>,
51        ed25519::VerifyingKey: for<'a> From<&'a Identity>,
52    {
53        // Start a handshake process, generating a local ephemeral X25519 public key.
54        let local_public_key: PublicKey = ed25519::VerifyingKey::from(identity_key).into();
55        let (mut initial_state, initial_message) = handshake::InitialState::new();
56        let (mut io_read, mut io_write) = io::split(io);
57
58        // Send our ephemeral X25519 public key to the remote peer (unencrypted) and simultaneously
59        // read theirs.
60        let initial_message = initial_message.encode_length_delimited_to_vec();
61        let write_future = io_write.write_all(&initial_message);
62        let read_future = read_initial_msg(&mut io_read);
63        let (peer_initial_bytes, _) = tokio::try_join!(read_future, write_future)?;
64
65        // Compute signature over the handshake transcript and initialize symmetric cipher state
66        // using shared secret computed using X25519.
67        let peer_initial_msg =
68            handshake::InitialMessage::decode_length_delimited(peer_initial_bytes.as_slice())?;
69        let (challenge, cipher_state) = initial_state.got_key(peer_initial_msg.pub_key)?;
70
71        // Create the async message reader and writer objects.
72        let mut reader = AsyncSecretReader {
73            io: io_read,
74            recv_state: cipher_state.recv_state,
75        };
76        let mut writer = AsyncSecretWriter {
77            io: io_write,
78            send_state: cipher_state.send_state,
79        };
80
81        // Send our identity's Ed25519 public key and signature over the transcript to the peer.
82        let write_future = writer.write_msg(proto::p2p::AuthSigMessage {
83            pub_key: Some(local_public_key.into()),
84            sig: challenge.sign_challenge(identity_key).to_vec(),
85        });
86
87        // Read the peer's Ed25519 public key and use it to verify their signature over the
88        // handshake transcript.
89        let read_future = reader.read_msg::<proto::p2p::AuthSigMessage>();
90        let (peer_auth_sig_msg, _) = tokio::try_join!(read_future, write_future)?;
91
92        // Verify the key and signature validate for our computed Merlin transcript hash
93        let peer_public_key = challenge.got_signature(peer_auth_sig_msg)?;
94
95        // All good!
96        Ok(Self {
97            reader,
98            writer,
99            local_public_key,
100            peer_public_key,
101        })
102    }
103}
104
105impl<Io> AsyncSecretConnection<Io> {
106    /// Get the local (i.e. our) [`PublicKey`].
107    pub fn local_public_key(&self) -> &PublicKey {
108        &self.local_public_key
109    }
110
111    /// Returns the remote peer's [`PublicKey`].
112    ///
113    /// # Panics
114    /// - if the peer's public key is not initialized (library-internal bug)
115    pub fn peer_public_key(&self) -> PublicKey {
116        self.peer_public_key
117    }
118
119    /// Split this [`AsyncSecretConnection`] into an [`AsyncSecretReader`] and [`AsyncSecretWriter`]
120    /// which can be used independently of each other.
121    pub fn split(self) -> (AsyncSecretReader<Io>, AsyncSecretWriter<Io>) {
122        (self.reader, self.writer)
123    }
124}
125
126impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretConnection<Io> {
127    #[inline]
128    fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
129        self.reader.read_msg()
130    }
131}
132
133impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretConnection<Io> {
134    #[inline]
135    fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
136        self.writer.write_msg(msg)
137    }
138}
139
140/// Async encrypted message reader type which wraps the read-half of an underlying I/O object.
141pub struct AsyncSecretReader<Io> {
142    /// Inner async I/O reader object this connection type wraps.
143    io: ReadHalf<Io>,
144
145    /// Symmetric cipher state including the current nonce.
146    recv_state: RecvState,
147}
148
149impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncSecretReader<Io> {
150    /// Read and decrypt a frame from the network.
151    #[inline]
152    async fn read_frame(&mut self) -> Result<Frame> {
153        let mut bytes = [0u8; Frame::ENCRYPTED_SIZE];
154        self.io.read_exact(&mut bytes).await?;
155
156        let mut frame = Frame::from_ciphertext(bytes);
157        self.recv_state.decrypt_frame(&mut frame)?;
158        Ok(frame)
159    }
160
161    /// Read and decrypt a message `M` from the underlying I/O object.
162    ///
163    /// Core implementation of the `AsyncReadMsg` trait, written as an `async fn` for simplicity.
164    async fn _read_msg<M: Message + Default>(&mut self) -> Result<M> {
165        let frame = self.read_frame().await?;
166        let frame_plaintext = frame.plaintext()?;
167
168        // Decode the length prefix on the proto
169        let msg_len = proto::decode_length_delimiter_inclusive(frame_plaintext)?;
170
171        if msg_len > MAX_MSG_LEN {
172            return Err(Error::MessageSize { size: msg_len });
173        }
174
175        // Skip the heap if the proto fits in a single message frame
176        if frame_plaintext.len() == msg_len {
177            return Ok(M::decode_length_delimited(frame_plaintext)?);
178        }
179
180        let mut msg = Vec::with_capacity(msg_len);
181        msg.extend_from_slice(frame_plaintext);
182
183        while msg.len() < msg_len {
184            msg.extend_from_slice(self.read_frame().await?.plaintext()?);
185        }
186
187        Ok(M::decode_length_delimited(msg.as_slice())?)
188    }
189}
190
191impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretReader<Io> {
192    #[inline]
193    fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
194        self._read_msg()
195    }
196}
197
198/// Async encrypted message writer type which wraps the write-half of an underlying I/O object.
199pub struct AsyncSecretWriter<Io> {
200    /// Inner async I/O writer object this connection type wraps.
201    io: WriteHalf<Io>,
202
203    /// Symmetric cipher state including the current nonce.
204    send_state: SendState,
205}
206
207impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncSecretWriter<Io> {
208    /// Encrypt and write a frame to the network.
209    #[inline]
210    async fn write_frame(&mut self, plaintext: &[u8]) -> Result<()> {
211        let mut frame = Frame::from_plaintext(plaintext)?;
212        self.send_state.encrypt_frame(&mut frame)?;
213        Ok(self.io.write_all(frame.ciphertext()?).await?)
214    }
215
216    /// Encrypt and write a message `M` to the underlying I/O object.
217    ///
218    /// Core implementation of the `AsyncWriteMsg` trait, written as an `async fn` for simplicity.
219    async fn _write_msg<M: Message>(&mut self, msg: M) -> Result<()> {
220        let bytes = msg.encode_length_delimited_to_vec();
221
222        for chunk in bytes.chunks(Frame::MAX_PLAINTEXT_SIZE) {
223            self.write_frame(chunk).await?;
224        }
225
226        Ok(())
227    }
228}
229
230impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretWriter<Io> {
231    #[inline]
232    fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
233        self._write_msg(msg)
234    }
235}
236
237/// Read the `handshake::InitialMessage` from the underlying `Io` object.
238async fn read_initial_msg<Io: AsyncReadExt + Unpin>(
239    io: &mut Io,
240) -> io::Result<[u8; 1 + handshake::InitialMessage::LENGTH]> {
241    // Read the remote side's initial message containing their X25519 public key
242    let mut buf = [0u8; 1 + handshake::InitialMessage::LENGTH]; // extra byte for length prefix
243    io.read_exact(&mut buf).await?;
244    Ok(buf)
245}
246
247// NOTE: tests are in `tests/async_secret_connection.rs`