tmkms_p2p/
async_secret_connection.rs1#![cfg(feature = "async")]
4
5use crate::{
6 Error, MAX_MSG_LEN, PublicKey, Result, ed25519,
7 encryption::{Frame, RecvState, SendState},
8 handshake, proto,
9 traits::{AsyncReadMsg, AsyncWriteMsg},
10};
11use ed25519_dalek::Signer;
12use prost::Message;
13use tokio::io::{self, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
14
15#[cfg(doc)]
16use crate::IdentitySecret;
17
18pub struct AsyncSecretConnection<Io> {
21 reader: AsyncSecretReader<Io>,
24
25 writer: AsyncSecretWriter<Io>,
28
29 local_public_key: PublicKey,
31
32 peer_public_key: PublicKey,
34}
35
36impl<Io: AsyncReadExt + AsyncWriteExt + Send + Sync + Unpin> AsyncSecretConnection<Io> {
37 pub async fn new<Identity>(io: Io, identity_key: &Identity) -> Result<Self>
49 where
50 Identity: Signer<ed25519::Signature>,
51 ed25519::VerifyingKey: for<'a> From<&'a Identity>,
52 {
53 let local_public_key: PublicKey = ed25519::VerifyingKey::from(identity_key).into();
55 let (mut initial_state, initial_message) = handshake::InitialState::new();
56 let (mut io_read, mut io_write) = io::split(io);
57
58 let initial_message = initial_message.encode_length_delimited_to_vec();
61 let write_future = io_write.write_all(&initial_message);
62 let read_future = read_initial_msg(&mut io_read);
63 let (peer_initial_bytes, _) = tokio::try_join!(read_future, write_future)?;
64
65 let peer_initial_msg =
68 handshake::InitialMessage::decode_length_delimited(peer_initial_bytes.as_slice())?;
69 let (challenge, cipher_state) = initial_state.got_key(peer_initial_msg.pub_key)?;
70
71 let mut reader = AsyncSecretReader {
73 io: io_read,
74 recv_state: cipher_state.recv_state,
75 };
76 let mut writer = AsyncSecretWriter {
77 io: io_write,
78 send_state: cipher_state.send_state,
79 };
80
81 let write_future = writer.write_msg(proto::p2p::AuthSigMessage {
83 pub_key: Some(local_public_key.into()),
84 sig: challenge.sign_challenge(identity_key).to_vec(),
85 });
86
87 let read_future = reader.read_msg::<proto::p2p::AuthSigMessage>();
90 let (peer_auth_sig_msg, _) = tokio::try_join!(read_future, write_future)?;
91
92 let peer_public_key = challenge.got_signature(peer_auth_sig_msg)?;
94
95 Ok(Self {
97 reader,
98 writer,
99 local_public_key,
100 peer_public_key,
101 })
102 }
103}
104
105impl<Io> AsyncSecretConnection<Io> {
106 pub fn local_public_key(&self) -> &PublicKey {
108 &self.local_public_key
109 }
110
111 pub fn peer_public_key(&self) -> PublicKey {
116 self.peer_public_key
117 }
118
119 pub fn split(self) -> (AsyncSecretReader<Io>, AsyncSecretWriter<Io>) {
122 (self.reader, self.writer)
123 }
124}
125
126impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretConnection<Io> {
127 #[inline]
128 fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
129 self.reader.read_msg()
130 }
131}
132
133impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretConnection<Io> {
134 #[inline]
135 fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
136 self.writer.write_msg(msg)
137 }
138}
139
140pub struct AsyncSecretReader<Io> {
142 io: ReadHalf<Io>,
144
145 recv_state: RecvState,
147}
148
149impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncSecretReader<Io> {
150 #[inline]
152 async fn read_frame(&mut self) -> Result<Frame> {
153 let mut bytes = [0u8; Frame::ENCRYPTED_SIZE];
154 self.io.read_exact(&mut bytes).await?;
155
156 let mut frame = Frame::from_ciphertext(bytes);
157 self.recv_state.decrypt_frame(&mut frame)?;
158 Ok(frame)
159 }
160
161 async fn _read_msg<M: Message + Default>(&mut self) -> Result<M> {
165 let frame = self.read_frame().await?;
166 let frame_plaintext = frame.plaintext()?;
167
168 let msg_len = proto::decode_length_delimiter_inclusive(frame_plaintext)?;
170
171 if msg_len > MAX_MSG_LEN {
172 return Err(Error::MessageSize { size: msg_len });
173 }
174
175 if frame_plaintext.len() == msg_len {
177 return Ok(M::decode_length_delimited(frame_plaintext)?);
178 }
179
180 let mut msg = Vec::with_capacity(msg_len);
181 msg.extend_from_slice(frame_plaintext);
182
183 while msg.len() < msg_len {
184 msg.extend_from_slice(self.read_frame().await?.plaintext()?);
185 }
186
187 Ok(M::decode_length_delimited(msg.as_slice())?)
188 }
189}
190
191impl<Io: AsyncReadExt + Send + Sync + Unpin> AsyncReadMsg for AsyncSecretReader<Io> {
192 #[inline]
193 fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync {
194 self._read_msg()
195 }
196}
197
198pub struct AsyncSecretWriter<Io> {
200 io: WriteHalf<Io>,
202
203 send_state: SendState,
205}
206
207impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncSecretWriter<Io> {
208 #[inline]
210 async fn write_frame(&mut self, plaintext: &[u8]) -> Result<()> {
211 let mut frame = Frame::from_plaintext(plaintext)?;
212 self.send_state.encrypt_frame(&mut frame)?;
213 Ok(self.io.write_all(frame.ciphertext()?).await?)
214 }
215
216 async fn _write_msg<M: Message>(&mut self, msg: M) -> Result<()> {
220 let bytes = msg.encode_length_delimited_to_vec();
221
222 for chunk in bytes.chunks(Frame::MAX_PLAINTEXT_SIZE) {
223 self.write_frame(chunk).await?;
224 }
225
226 Ok(())
227 }
228}
229
230impl<Io: AsyncWriteExt + Send + Sync + Unpin> AsyncWriteMsg for AsyncSecretWriter<Io> {
231 #[inline]
232 fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync {
233 self._write_msg(msg)
234 }
235}
236
237async fn read_initial_msg<Io: AsyncReadExt + Unpin>(
239 io: &mut Io,
240) -> io::Result<[u8; 1 + handshake::InitialMessage::LENGTH]> {
241 let mut buf = [0u8; 1 + handshake::InitialMessage::LENGTH]; io.read_exact(&mut buf).await?;
244 Ok(buf)
245}
246
247