use std::sync::{Arc, RwLock};
use crate::rlpx::{
error::PeerConnectionError,
message::{self as rlpx, EthCapVersion},
utils::ecdh_xchng,
};
use super::handshake::{LocalState, RemoteState};
use aes::{
Aes256Enc,
cipher::{BlockEncrypt as _, KeyInit as _, KeyIvInit, StreamCipher as _},
};
use bytes::{Buf, BytesMut};
use ethrex_common::{
H128, H256,
utils::{keccak, truncate_array},
};
use ethrex_crypto::keccak::{Keccak256, keccak_hash};
use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder, Framed};
const MAX_MESSAGE_SIZE: u32 = 0xFFFFFF;
type Aes256Ctr64BE = ctr::Ctr64BE<aes::Aes256>;
pub struct RLPxCodec {
pub(crate) mac_key: H256,
pub(crate) ingress_mac: Keccak256,
pub(crate) egress_mac: Keccak256,
pub(crate) ingress_aes: Aes256Ctr64BE,
pub(crate) egress_aes: Aes256Ctr64BE,
pub(crate) eth_version: Arc<RwLock<EthCapVersion>>,
}
impl RLPxCodec {
pub(crate) fn new(
local_state: &LocalState,
remote_state: &RemoteState,
hashed_nonces: [u8; 32],
eth_version: Arc<RwLock<EthCapVersion>>,
) -> Result<Self, PeerConnectionError> {
let ephemeral_key_secret =
ecdh_xchng(&local_state.ephemeral_key, &remote_state.ephemeral_key).map_err(
|error| {
PeerConnectionError::CryptographyError(format!(
"Invalid generated ephemeral key secret: {error}"
))
},
)?;
let shared_secret = keccak_hash([ephemeral_key_secret, hashed_nonces].concat());
let aes_key = keccak([ephemeral_key_secret, shared_secret].concat());
let mac_key = keccak([ephemeral_key_secret, aes_key.0].concat());
let egress_mac = Keccak256::default()
.update(mac_key ^ remote_state.nonce)
.update(&local_state.init_message);
let ingress_mac = Keccak256::default()
.update(mac_key ^ local_state.nonce)
.update(&remote_state.init_message);
let ingress_aes = <Aes256Ctr64BE as KeyIvInit>::new(&aes_key.0.into(), &[0; 16].into());
let egress_aes = ingress_aes.clone();
Ok(Self {
mac_key,
ingress_mac,
egress_mac,
ingress_aes,
egress_aes,
eth_version,
})
}
}
impl std::fmt::Debug for RLPxCodec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RLPxCodec")
.field("mac_key", &self.mac_key)
.field("ingress_mac", &"ingress_mac")
.field("egress_mac", &"egress_mac")
.field("ingress_aes", &"Aes256Ctr64BE")
.field("egress_aes", &"Aes256Ctr64BE")
.field("eth_version", &self.eth_version)
.finish()
}
}
impl Decoder for RLPxCodec {
type Item = rlpx::Message;
type Error = PeerConnectionError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let mac_aes_cipher = Aes256Enc::new_from_slice(&self.mac_key.0)?;
if src.len() < 32 {
return Ok(None);
}
let mut frame_header = [0; 32];
frame_header.copy_from_slice(&src[..32]);
let (header_ciphertext, header_mac) =
frame_header.split_at_mut_checked(16).ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid frame header length".to_owned())
})?;
let header_mac_seed = {
let mac_digest: [u8; 16] = truncate_array(self.ingress_mac.clone().finalize());
let mut seed = mac_digest.into();
mac_aes_cipher.encrypt_block(&mut seed);
(H128(seed.into())
^ H128(header_ciphertext.try_into().map_err(|_| {
PeerConnectionError::CryptographyError(
"Invalid header ciphertext length".to_owned(),
)
})?))
.0
};
let mut temp_ingress_mac = self.ingress_mac.clone();
temp_ingress_mac.update(header_mac_seed);
let expected_header_mac = H128(truncate_array(temp_ingress_mac.clone().finalize()));
if header_mac != expected_header_mac.0 {
return Err(PeerConnectionError::InvalidMessageFrame(
"Mismatched header mac".to_string(),
));
}
let header_text = header_ciphertext;
let mut temp_ingress_aes = self.ingress_aes.clone();
temp_ingress_aes.try_apply_keystream(header_text)?;
if header_text.len() < 3 {
return Err(PeerConnectionError::CryptographyError(
"Invalid header text length".to_owned(),
));
}
let frame_size = u32::from_be_bytes([0, header_text[0], header_text[1], header_text[2]]);
let padded_size = frame_size.next_multiple_of(16);
if padded_size > MAX_MESSAGE_SIZE {
return Err(PeerConnectionError::InvalidMessageLength);
}
let total_message_size = (32 + padded_size + 16) as usize;
if src.len() < total_message_size {
src.reserve(total_message_size - src.len());
return Ok(None);
}
let mut frame_data = src
.get(32..total_message_size)
.ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid frame data length".to_owned())
})?
.to_vec();
src.advance(total_message_size);
self.ingress_mac = temp_ingress_mac.clone();
self.ingress_aes = temp_ingress_aes;
let (frame_ciphertext, frame_mac) = frame_data
.split_at_mut_checked(padded_size as usize)
.ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid frame data length".to_owned())
})?;
self.ingress_mac.update(&frame_ciphertext);
let frame_mac_seed = {
let mac_digest: [u8; 16] = truncate_array(self.ingress_mac.clone().finalize());
let mut seed = mac_digest.into();
mac_aes_cipher.encrypt_block(&mut seed);
(H128(seed.into()) ^ H128(mac_digest)).0
};
self.ingress_mac.update(frame_mac_seed);
let expected_frame_mac: [u8; 16] = truncate_array(self.ingress_mac.clone().finalize());
if frame_mac != expected_frame_mac {
return Err(PeerConnectionError::InvalidMessageFrame(
"Mismatched frame mac".to_string(),
));
}
self.ingress_aes.try_apply_keystream(frame_ciphertext)?;
let (frame_data, _padding) = frame_ciphertext
.split_at_checked(frame_size as usize)
.ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid frame size".to_owned())
})?;
let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(frame_data)?;
Ok(Some(rlpx::Message::decode(
msg_id,
msg_data,
*self
.eth_version
.read()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?,
)?))
}
fn framed<S: AsyncRead + AsyncWrite + Sized>(self, io: S) -> Framed<S, Self>
where
Self: Sized,
{
Framed::new(io, self)
}
}
impl Encoder<rlpx::Message> for RLPxCodec {
type Error = PeerConnectionError;
fn encode(&mut self, message: rlpx::Message, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let mut frame_data = vec![];
message.encode(
&mut frame_data,
*self
.eth_version
.read()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?,
)?;
let mac_aes_cipher = Aes256Enc::new_from_slice(&self.mac_key.0)?;
let mut header = Vec::with_capacity(32);
let frame_size = frame_data.len().to_be_bytes();
header.extend_from_slice(frame_size.get(5..8).ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid frame size".to_owned())
})?);
let header_data = (0_u8, 0_u8);
header_data.encode(&mut header);
header.resize(16, 0);
self.egress_aes
.try_apply_keystream(header.get_mut(..16).ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid header length".to_owned())
})?)?;
let header_mac_seed = {
let mac_digest: [u8; 16] = truncate_array(self.egress_mac.clone().finalize());
let mut seed = mac_digest.into();
mac_aes_cipher.encrypt_block(&mut seed);
let header_data = header
.get(..16)
.ok_or_else(|| {
PeerConnectionError::CryptographyError("Invalid header length".to_owned())
})?
.try_into()
.map_err(|_| {
PeerConnectionError::CryptographyError("Invalid header length".to_owned())
})?;
H128(seed.into()) ^ H128(header_data)
};
self.egress_mac.update(header_mac_seed);
let header_mac = self.egress_mac.clone().finalize();
let header_mac_data: [u8; 16] = truncate_array(header_mac);
header.extend_from_slice(&header_mac_data);
buffer.extend_from_slice(&header);
frame_data.resize(frame_data.len().next_multiple_of(16), 0);
self.egress_aes.try_apply_keystream(&mut frame_data)?;
let frame_ciphertext = frame_data;
buffer.extend_from_slice(&frame_ciphertext);
self.egress_mac.update(&frame_ciphertext);
let frame_mac_seed = {
let mac_digest: [u8; 16] = truncate_array(self.egress_mac.clone().finalize());
let mut seed = mac_digest.into();
mac_aes_cipher.encrypt_block(&mut seed);
(H128(seed.into()) ^ H128(mac_digest)).0
};
self.egress_mac.update(frame_mac_seed);
let frame_mac = self.egress_mac.clone().finalize();
buffer.extend_from_slice(&frame_mac[..16]);
Ok(())
}
}