use super::wire_msg_header::WireMsgHeader;
use crate::messaging::{
data::{ClientMsg, DataResponse},
system::NodeMsg,
AntiEntropyMsg, AuthorityProof, Dst, Error, MsgId, MsgKind, NetworkMsg, Result,
};
use bytes::{BufMut, Bytes, BytesMut};
use custom_debug::Debug;
use serde::Serialize;
#[derive(Clone, Debug)]
pub struct WireMsg {
pub header: WireMsgHeader,
#[debug(skip)]
pub serialized_header: Option<Bytes>,
#[debug(skip)]
pub payload: Bytes,
pub dst: Dst,
#[debug(skip)]
pub serialized_dst: Option<Bytes>,
}
impl PartialEq for WireMsg {
fn eq(&self, other: &Self) -> bool {
self.header == other.header && self.payload == other.payload
}
}
impl WireMsg {
pub fn serialize_msg_payload<T: Serialize>(msg: &T) -> Result<Bytes> {
let mut bytes = BytesMut::new().writer();
rmp_serde::encode::write(&mut bytes, &msg).map_err(|err| {
Error::Serialisation(format!(
"could not serialize message payload with Msgpack: {err}",
))
})?;
Ok(bytes.into_inner().freeze())
}
fn serialize_dst_payload(dst: &Dst) -> Result<Bytes> {
let mut bytes = BytesMut::new().writer();
rmp_serde::encode::write(&mut bytes, dst).map_err(|err| {
Error::Serialisation(format!(
"could not serialize dst payload with Msgpack: {err}",
))
})?;
Ok(bytes.into_inner().freeze())
}
pub fn serialize_msg_dst(&self) -> Result<Bytes> {
Self::serialize_dst_payload(&self.dst)
}
pub fn new_msg(msg_id: MsgId, payload: Bytes, kind: MsgKind, dst: Dst) -> Self {
Self {
header: WireMsgHeader::new(msg_id, kind),
dst,
payload,
serialized_dst: None,
serialized_header: None,
}
}
pub fn from(bytes: (Bytes, Bytes, Bytes)) -> Result<Self> {
let (header_bytes, dst_bytes, payload) = bytes;
let header = WireMsgHeader::from(header_bytes.clone())?;
let dst: Dst = rmp_serde::from_slice(&dst_bytes).map_err(|err| {
Error::FailedToParse(format!(
"Message dst couldn't be deserialized from the dst bytes: {err}",
))
})?;
Ok(Self {
header,
dst,
payload,
serialized_dst: Some(dst_bytes),
serialized_header: Some(header_bytes),
})
}
pub fn serialize(&self) -> Result<(Bytes, Bytes, Bytes)> {
let header = if let Some(bytes) = &self.serialized_header {
bytes.clone()
} else {
self.header.serialize()?
};
let dst = if let Some(bytes) = &self.serialized_dst {
bytes.clone()
} else {
self.serialize_msg_dst()?
};
Ok((header, dst, self.payload.clone()))
}
pub fn serialize_and_cache_bytes(&mut self) -> Result<(Bytes, Bytes, Bytes)> {
let header = if let Some(hdr_bytes) = &self.serialized_header {
hdr_bytes.clone()
} else {
let hdr_bytes = self.header.serialize()?;
self.serialized_header = Some(hdr_bytes.clone());
hdr_bytes
};
let dst = if let Some(dst_bytes) = &self.serialized_dst {
dst_bytes.clone()
} else {
let dst_bytes = self.serialize_msg_dst()?;
self.serialized_dst = Some(dst_bytes.clone());
dst_bytes
};
Ok((header, dst, self.payload.clone()))
}
pub fn serialize_with_new_dst(&self, dst: &Dst) -> Result<(Bytes, Bytes, Bytes)> {
let header = if let Some(bytes) = &self.serialized_header {
bytes.clone()
} else {
self.header.serialize()?
};
let dst = Self::serialize_dst_payload(dst)?;
Ok((header, dst, self.payload.clone()))
}
pub fn into_msg(&self) -> Result<NetworkMsg> {
match self.header.msg_envelope.kind.clone() {
MsgKind::AntiEntropy(_) => {
let msg: AntiEntropyMsg = rmp_serde::from_slice(&self.payload).map_err(|err| {
Error::FailedToParse(format!("Ae message payload as Msgpack: {err}"))
})?;
Ok(NetworkMsg::AntiEntropy(msg))
}
MsgKind::Client { auth, .. } => {
let msg: ClientMsg = rmp_serde::from_slice(&self.payload).map_err(|err| {
Error::FailedToParse(format!("Data message payload as Msgpack: {err}"))
})?;
let auth = AuthorityProof::verify(auth, &self.payload)?;
Ok(NetworkMsg::Client { auth, msg })
}
MsgKind::DataResponse(_) => {
let msg: DataResponse = rmp_serde::from_slice(&self.payload).map_err(|err| {
Error::FailedToParse(format!("Data message payload as Msgpack: {err}"))
})?;
Ok(NetworkMsg::DataResponse(msg))
}
MsgKind::Node { .. } => {
let msg: NodeMsg = rmp_serde::from_slice(&self.payload).map_err(|err| {
Error::FailedToParse(format!("Node signed message payload as Msgpack: {err}"))
})?;
Ok(NetworkMsg::Node(msg))
}
}
}
pub fn msg_id(&self) -> MsgId {
self.header.msg_envelope.msg_id
}
pub fn kind(&self) -> &MsgKind {
&self.header.msg_envelope.kind
}
pub fn dst(&self) -> &Dst {
&self.dst
}
pub fn deserialize(bytes: (Bytes, Bytes, Bytes)) -> Result<(MsgId, NetworkMsg)> {
let msg = Self::from(bytes)?;
Ok((msg.msg_id(), msg.into_msg()?))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
messaging::{
data::{ClientMsg, DataQuery},
system::NodeMsg,
AuthorityProof, ClientAuth, MsgId,
},
types::{ChunkAddress, Keypair},
};
use bls::SecretKey;
use eyre::Result;
#[test]
fn serialisation_node_msg() -> Result<()> {
let dst = Dst {
name: xor_name::rand::random(),
section_key: SecretKey::random().public_key(),
};
let msg_id = MsgId::new();
let msg = NodeMsg::HandoverAE(100);
let payload = WireMsg::serialize_msg_payload(&msg)?;
let kind = MsgKind::Node {
name: Default::default(),
is_join: true,
};
let wire_msg = WireMsg::new_msg(msg_id, payload, kind, dst);
let serialized = wire_msg.serialize()?;
let deserialized = WireMsg::from(serialized)?;
assert_eq!(deserialized, wire_msg);
assert_eq!(deserialized.msg_id(), wire_msg.msg_id());
assert_eq!(deserialized.dst(), &dst);
assert_eq!(deserialized.dst().section_key, dst.section_key);
assert_eq!(deserialized.into_msg()?, NetworkMsg::Node(msg),);
Ok(())
}
#[test]
fn serialisation_client_msg() -> Result<()> {
let src_client_keypair = Keypair::new_ed25519();
let dst = Dst {
name: xor_name::rand::random(),
section_key: SecretKey::random().public_key(),
};
let msg_id = MsgId::new();
let client_msg =
ClientMsg::Query(DataQuery::GetChunk(ChunkAddress(xor_name::rand::random())));
let payload = WireMsg::serialize_msg_payload(&client_msg)?;
let auth = ClientAuth {
public_key: src_client_keypair.public_key(),
signature: src_client_keypair.sign(&payload),
};
let auth_proof = AuthorityProof::verify(auth.clone(), &payload)?;
let kind = MsgKind::Client {
auth,
is_spend: false,
query_index: None,
};
let wire_msg = WireMsg::new_msg(msg_id, payload, kind, dst);
let serialized = wire_msg.serialize()?;
let deserialized = WireMsg::from(serialized)?;
assert_eq!(deserialized, wire_msg);
assert_eq!(deserialized.msg_id(), wire_msg.msg_id());
assert_eq!(deserialized.dst(), &dst);
assert_eq!(deserialized.dst().section_key, dst.section_key);
assert_eq!(
deserialized.into_msg()?,
NetworkMsg::Client {
auth: auth_proof,
msg: client_msg,
}
);
Ok(())
}
}