cometbft_p2p/
async_secret_connection.rs1#![cfg(feature = "async")]
4
5use crate::{
6 Error, MAX_MSG_LEN, PublicKey, Result, ed25519,
7 encryption::{Frame, RecvState, SendState},
8 handshake, proto,
9 traits::{AsyncReadMsg, AsyncWriteMsg},
10};
11use ed25519_dalek::Signer;
12use prost::Message;
13use tokio::io::{self, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
14
15#[cfg(doc)]
16use crate::IdentitySecret;
17
18pub struct AsyncSecretConnection<Io> {
21 reader: AsyncSecretReader<Io>,
24
25 writer: AsyncSecretWriter<Io>,
28
29 local_public_key: PublicKey,
31
32 peer_public_key: PublicKey,
34}
35
36impl<Io: AsyncReadExt + AsyncWriteExt + Send + Sync + Unpin> AsyncSecretConnection<Io> {
37 pub async fn new<Identity>(io: Io, identity_key: &Identity) -> Result<Self>
49 where
50 Identity: Signer<ed25519::Signature>,
51 ed25519::VerifyingKey: for<'a> From<&'a Identity>,
52 {
53 let local_public_key: PublicKey = ed25519::VerifyingKey::from(identity_key).into();
55 let (mut initial_state, initial_message) = handshake::InitialState::new();
56 let (mut io_read, mut io_write) = io::split(io);
57
58 let initial_message = initial_message.encode_length_delimited_to_vec();
61 let write_future = io_write.write_all(&initial_message);
62 let read_future = read_initial_msg(&mut io_read);
63 let (peer_initial_bytes, _) = tokio::try_join!(read_future, write_future)?;
64
65 let peer_initial_msg =
68 handshake::InitialMessage::decode_length_delimited(peer_initial_bytes.as_slice())?;
69 let (challenge, cipher_state) = initial_state.got_key(peer_initial_msg.pub_key)?;
70
71 let mut reader = AsyncSecretReader {
73 io: io_read,
74 recv_state: cipher_state.recv_state,
75 };
76 let mut writer = AsyncSecretWriter {
77 io: io_write,
78 send_state: cipher_state.send_state,
79 };
80
81 let write_future = writer.write_msg(proto::p2p::AuthSigMessage {
83 pub_key: Some(local_public_key.into()),
84 sig: challenge.sign_challenge(identity_key).to_vec(),
85 });
86
87 let read_future = reader.read_msg::<proto::p2p::AuthSigMessage>();
90 let (peer_auth_sig_msg, _) = tokio::try_join!(read_future, write_future)?;
91
92 let peer_public_key = challenge.got_signature(peer_auth_sig_msg)?;
94
95 Ok(Self {
97 reader,
98 writer,
99 local_public_key,
100 peer_public_key,
101 })
102 }
103}
104
105impl<Io> AsyncSecretConnection<Io> {
106 pub fn local_public_key(&self) -> &PublicKey {
108 &self.local_public_key
109 }
110
111 pub fn peer_public_key(&self) -> &PublicKey {
113 &self.peer_public_key
114 }
115
116 pub fn split(self) -> (AsyncSecretReader<Io>, AsyncSecretWriter<Io>) {
119 (self.reader, self.writer)
120 }
121}
122
123impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretConnection<Io> {
124 #[inline]
125 fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
126 self.reader.read_msg()
127 }
128}
129
130impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretConnection<Io> {
131 #[inline]
132 fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
133 self.writer.write_msg(msg)
134 }
135}
136
137pub struct AsyncSecretReader<Io> {
139 io: ReadHalf<Io>,
141
142 recv_state: RecvState,
144}
145
146impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncSecretReader<Io> {
147 #[inline]
149 async fn read_frame(&mut self) -> Result<Frame> {
150 let mut bytes = [0u8; Frame::ENCRYPTED_SIZE];
151 self.io.read_exact(&mut bytes).await?;
152
153 let mut frame = Frame::from_ciphertext(bytes);
154 self.recv_state.decrypt_frame(&mut frame)?;
155 Ok(frame)
156 }
157
158 async fn _read_msg<M: Message + Default>(&mut self) -> Result<M> {
162 let frame = self.read_frame().await?;
163 let frame_plaintext = frame.plaintext()?;
164
165 let msg_len = proto::decode_length_delimiter_inclusive(frame_plaintext)?;
167
168 if msg_len > MAX_MSG_LEN {
169 return Err(Error::MessageSize { size: msg_len });
170 }
171
172 if frame_plaintext.len() == msg_len {
174 return Ok(M::decode_length_delimited(frame_plaintext)?);
175 }
176
177 let mut msg = Vec::with_capacity(msg_len);
178 msg.extend_from_slice(frame_plaintext);
179
180 while msg.len() < msg_len {
181 msg.extend_from_slice(self.read_frame().await?.plaintext()?);
182 }
183
184 Ok(M::decode_length_delimited(msg.as_slice())?)
185 }
186}
187
188impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretReader<Io> {
189 #[inline]
190 fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
191 self._read_msg()
192 }
193}
194
195pub struct AsyncSecretWriter<Io> {
197 io: WriteHalf<Io>,
199
200 send_state: SendState,
202}
203
204impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncSecretWriter<Io> {
205 #[inline]
207 async fn write_frame(&mut self, plaintext: &[u8]) -> Result<()> {
208 let mut frame = Frame::from_plaintext(plaintext)?;
209 self.send_state.encrypt_frame(&mut frame)?;
210 Ok(self.io.write_all(frame.ciphertext()?).await?)
211 }
212
213 async fn _write_msg<M: Message>(&mut self, msg: M) -> Result<()> {
217 let bytes = msg.encode_length_delimited_to_vec();
218
219 for chunk in bytes.chunks(Frame::MAX_PLAINTEXT_SIZE) {
220 self.write_frame(chunk).await?;
221 }
222
223 Ok(())
224 }
225}
226
227impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretWriter<Io> {
228 #[inline]
229 fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
230 self._write_msg(msg)
231 }
232}
233
234async fn read_initial_msg<Io: AsyncReadExt + Unpin>(
236 io: &mut Io,
237) -> io::Result<[u8; 1 + handshake::InitialMessage::LENGTH]> {
238 let mut buf = [0u8; 1 + handshake::InitialMessage::LENGTH]; io.read_exact(&mut buf).await?;
241 Ok(buf)
242}
243
244