use bytes::{BufMut, Bytes, BytesMut};
use crate::{ChainId, KeyMetadata, Result, StateKey, StateValue, StateVersion, XenithError};
#[derive(Debug, Clone, PartialEq)]
pub struct WireMessage {
pub version: u8,
pub source_chain: u64,
pub state_version: StateVersion,
pub updated_at: u64,
pub key: StateKey,
pub data: Bytes,
}
const MAGIC: [u8; 4] = [0x58, 0x45, 0x4E, 0x48];
const WIRE_VERSION_V2: u8 = 2;
const WIRE_VERSION_V3: u8 = 3;
pub fn encode(key: &StateKey, value: &StateValue, metadata: Option<&KeyMetadata>) -> Bytes {
let key_bytes = key.as_ref().as_bytes();
let mut capacity = 4 + 1 + 8 + 8 + 8 + 8 + 8 + 2 + key_bytes.len() + 4 + value.data.len();
if metadata.is_some() {
capacity += 1 + 20 + 32; } else {
capacity += 1; }
let mut buf = BytesMut::with_capacity(capacity);
buf.put_slice(&MAGIC);
buf.put_u8(WIRE_VERSION_V3);
buf.put_u64(value.source_chain.0);
buf.put_u64(value.version.timestamp_ms);
buf.put_u64(value.version.sequence);
buf.put_u64(value.version.source_chain);
buf.put_u64(value.updated_at);
buf.put_u16(key_bytes.len() as u16);
buf.put_slice(key_bytes);
buf.put_u32(value.data.len() as u32);
buf.put_slice(&value.data);
match metadata {
Some(m) => {
buf.put_u8(1);
buf.put_slice(&m.address.unwrap_or([0u8; 20]));
buf.put_slice(&m.slot.unwrap_or([0u8; 32]));
}
None => buf.put_u8(0),
}
buf.freeze()
}
pub fn decode(raw: &[u8]) -> Result<(StateKey, StateValue, Option<KeyMetadata>)> {
const MIN_HEADER: usize = 51;
if raw.len() < MIN_HEADER {
return Err(XenithError::Serialization(format!(
"message too short: {} bytes (minimum {MIN_HEADER})",
raw.len()
)));
}
let mut pos = 0;
if raw[pos..pos + 4] != MAGIC {
return Err(XenithError::Serialization(format!(
"invalid magic bytes: {:?}",
&raw[pos..pos + 4]
)));
}
pos += 4;
let version = raw[pos];
pos += 1;
if version != WIRE_VERSION_V2 && version != WIRE_VERSION_V3 {
return Err(XenithError::Serialization(format!(
"unsupported wire format version: {version}"
)));
}
let source_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
pos += 8;
let ver_ts = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
pos += 8;
let ver_seq = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
pos += 8;
let ver_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
pos += 8;
let updated_at = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
pos += 8;
let key_len = u16::from_be_bytes(raw[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if raw.len() < pos + key_len + 4 {
return Err(XenithError::Serialization(
"message truncated before key or data_len".into(),
));
}
let key_str = std::str::from_utf8(&raw[pos..pos + key_len])
.map_err(|e| XenithError::Serialization(format!("invalid UTF-8 in key: {e}")))?;
let key = StateKey::from_raw(key_str.to_owned());
pos += key_len;
let data_len = u32::from_be_bytes(raw[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if raw.len() < pos + data_len {
return Err(XenithError::Serialization(
"message truncated at data payload".into(),
));
}
let data = Bytes::copy_from_slice(&raw[pos..pos + data_len]);
pos += data_len;
let value = StateValue {
data,
version: StateVersion {
timestamp_ms: ver_ts,
sequence: ver_seq,
source_chain: ver_chain,
},
updated_at,
source_chain: ChainId(source_chain),
};
if version == WIRE_VERSION_V2 {
return Ok((key, value, None));
}
if raw.len() < pos + 1 {
return Err(XenithError::Serialization(
"v3 message truncated before has_metadata flag".into(),
));
}
let has_metadata = raw[pos];
pos += 1;
let metadata = if has_metadata == 1 {
if raw.len() < pos + 20 + 32 {
return Err(XenithError::Serialization(
"v3 message truncated in metadata section".into(),
));
}
let mut address = [0u8; 20];
address.copy_from_slice(&raw[pos..pos + 20]);
pos += 20;
let mut slot = [0u8; 32];
slot.copy_from_slice(&raw[pos..pos + 32]);
Some(KeyMetadata {
address: Some(address),
slot: Some(slot),
})
} else {
None
};
Ok((key, value, metadata))
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_key() -> StateKey {
StateKey::new("uniswap", "pool", "0xabc")
}
fn sample_value() -> StateValue {
StateValue {
data: Bytes::from_static(b"price=3400"),
version: StateVersion {
timestamp_ms: 1_700_000_000_000,
sequence: 0,
source_chain: 1,
},
updated_at: 1_700_000_000,
source_chain: ChainId(1),
}
}
#[test]
fn encode_decode_roundtrip() {
let key = sample_key();
let value = sample_value();
let encoded = encode(&key, &value, None);
let (decoded_key, decoded_value, meta) = decode(&encoded).unwrap();
assert_eq!(decoded_key, key);
assert_eq!(decoded_value, value);
assert!(meta.is_none());
}
#[test]
fn encode_with_metadata_roundtrips() {
let key = sample_key();
let value = sample_value();
let metadata = KeyMetadata {
address: Some([0xABu8; 20]),
slot: Some([0xCDu8; 32]),
};
let encoded = encode(&key, &value, Some(&metadata));
let (decoded_key, decoded_value, decoded_meta) = decode(&encoded).unwrap();
assert_eq!(decoded_key, key);
assert_eq!(decoded_value, value);
let m = decoded_meta.expect("metadata must be present");
assert_eq!(m.address, Some([0xABu8; 20]));
assert_eq!(m.slot, Some([0xCDu8; 32]));
}
#[test]
fn encode_without_metadata_decodes_to_none() {
let encoded = encode(&sample_key(), &sample_value(), None);
let (_, _, meta) = decode(&encoded).unwrap();
assert!(meta.is_none());
}
#[test]
fn version2_message_decodes_without_metadata() {
let key = sample_key();
let value = sample_value();
let key_bytes = key.as_ref().as_bytes();
let mut buf = BytesMut::new();
buf.put_slice(&MAGIC);
buf.put_u8(WIRE_VERSION_V2);
buf.put_u64(value.source_chain.0);
buf.put_u64(value.version.timestamp_ms);
buf.put_u64(value.version.sequence);
buf.put_u64(value.version.source_chain);
buf.put_u64(value.updated_at);
buf.put_u16(key_bytes.len() as u16);
buf.put_slice(key_bytes);
buf.put_u32(value.data.len() as u32);
buf.put_slice(&value.data);
let raw = buf.freeze();
let (decoded_key, decoded_value, meta) = decode(&raw).unwrap();
assert_eq!(decoded_key, key);
assert_eq!(decoded_value, value);
assert!(meta.is_none(), "v2 messages decode with metadata = None");
}
#[test]
fn bad_magic_returns_error() {
let mut raw = encode(&sample_key(), &sample_value(), None).to_vec();
raw[0] = 0xFF;
assert!(matches!(decode(&raw), Err(XenithError::Serialization(_))));
}
#[test]
fn truncated_message_returns_error() {
let raw = encode(&sample_key(), &sample_value(), None);
assert!(matches!(
decode(&raw[..10]),
Err(XenithError::Serialization(_))
));
}
}