xenith-core 0.1.0

Transport-agnostic traits, types, and errors for xenith cross-chain state sync
Documentation
use bytes::{BufMut, Bytes, BytesMut};

use crate::{ChainId, KeyMetadata, Result, StateKey, StateValue, StateVersion, XenithError};

/// Binary wire format for cross-chain state messages.
///
/// Layout (all multi-byte integers big-endian):
/// ```text
/// [0x58 0x45 0x4E 0x48]  magic ("XENH", 4 bytes)
/// u8                     wire format version (2 or 3)
/// u64                    source_chain
/// u64                    version.timestamp_ms
/// u64                    version.sequence
/// u64                    version.source_chain
/// u64                    updated_at
/// u16                    key_len
/// [u8; key_len]          key bytes (UTF-8)
/// u32                    data_len
/// [u8; data_len]         payload bytes
/// --- version 3 only ---
/// u8                     has_metadata (0 or 1)
/// [u8; 20]               address  (only when has_metadata == 1)
/// [u8; 32]               slot     (only when has_metadata == 1)
/// ```
///
/// Version 2 messages decode with `metadata = None`. Version 3 messages carry
/// optional on-chain metadata (contract address + storage slot) that enables
/// [`crate::ReadStrategy::Quorum`] reads without a separate `set_metadata` call.
///
/// # Example
///
/// ```
/// use bytes::Bytes;
/// use xenith_core::{ChainId, KeyMetadata, StateKey, StateValue, StateVersion, wire};
///
/// let key = StateKey::new("uniswap", "pool", "0xabc");
/// let value = StateValue {
///     data: Bytes::from_static(b"price=3400"),
///     version: StateVersion { timestamp_ms: 1_700_000_000_000, sequence: 0, source_chain: 1 },
///     updated_at: 1_700_000_000,
///     source_chain: ChainId::from(1),
/// };
/// let encoded = wire::encode(&key, &value, None);
/// let (decoded_key, decoded_value, meta) = wire::decode(&encoded).unwrap();
/// assert_eq!(decoded_key, key);
/// assert_eq!(decoded_value.version.timestamp_ms, 1_700_000_000_000);
/// assert!(meta.is_none());
/// ```
#[derive(Debug, Clone, PartialEq)]
pub struct WireMessage {
    pub version: u8,
    pub source_chain: u64,
    pub state_version: StateVersion,
    pub updated_at: u64,
    pub key: StateKey,
    pub data: Bytes,
}

const MAGIC: [u8; 4] = [0x58, 0x45, 0x4E, 0x48];
const WIRE_VERSION_V2: u8 = 2;
const WIRE_VERSION_V3: u8 = 3;

/// Encode a `(key, value, metadata)` triple into the xenith binary wire format.
///
/// Pass `metadata = Some(...)` if you intend to use [`crate::ReadStrategy::Quorum`]
/// on the receiving side — the metadata travels with the message so the receiver
/// can register it without a separate call.
pub fn encode(key: &StateKey, value: &StateValue, metadata: Option<&KeyMetadata>) -> Bytes {
    let key_bytes = key.as_ref().as_bytes();
    // Base: 4 (magic) + 1 (version) + 8×5 (chain/version/updated_at)
    //     + 2 (key_len) + key + 4 (data_len) + data
    let mut capacity = 4 + 1 + 8 + 8 + 8 + 8 + 8 + 2 + key_bytes.len() + 4 + value.data.len();
    if metadata.is_some() {
        capacity += 1 + 20 + 32; // has_metadata(1) + address(20) + slot(32)
    } else {
        capacity += 1; // has_metadata flag (0)
    }
    let mut buf = BytesMut::with_capacity(capacity);

    buf.put_slice(&MAGIC);
    buf.put_u8(WIRE_VERSION_V3);
    buf.put_u64(value.source_chain.0);
    buf.put_u64(value.version.timestamp_ms);
    buf.put_u64(value.version.sequence);
    buf.put_u64(value.version.source_chain);
    buf.put_u64(value.updated_at);
    buf.put_u16(key_bytes.len() as u16);
    buf.put_slice(key_bytes);
    buf.put_u32(value.data.len() as u32);
    buf.put_slice(&value.data);

    match metadata {
        Some(m) => {
            buf.put_u8(1);
            buf.put_slice(&m.address.unwrap_or([0u8; 20]));
            buf.put_slice(&m.slot.unwrap_or([0u8; 32]));
        }
        None => buf.put_u8(0),
    }

    buf.freeze()
}

/// Decode a xenith wire message into `(StateKey, StateValue, Option<KeyMetadata>)`.
///
/// Accepts both version 2 (no metadata) and version 3 (with optional metadata).
/// Version 2 messages always decode with `metadata = None`.
///
/// Returns [`XenithError::Serialization`] if the buffer is truncated, carries an
/// unrecognised magic prefix, or uses an unsupported wire format version.
pub fn decode(raw: &[u8]) -> Result<(StateKey, StateValue, Option<KeyMetadata>)> {
    // Fixed header: magic(4) + version(1) + source_chain(8)
    //   + ver.ts(8) + ver.seq(8) + ver.chain(8) + updated_at(8)
    //   + key_len(2) + data_len(4) = 51 bytes minimum.
    const MIN_HEADER: usize = 51;
    if raw.len() < MIN_HEADER {
        return Err(XenithError::Serialization(format!(
            "message too short: {} bytes (minimum {MIN_HEADER})",
            raw.len()
        )));
    }

    let mut pos = 0;

    if raw[pos..pos + 4] != MAGIC {
        return Err(XenithError::Serialization(format!(
            "invalid magic bytes: {:?}",
            &raw[pos..pos + 4]
        )));
    }
    pos += 4;

    let version = raw[pos];
    pos += 1;
    if version != WIRE_VERSION_V2 && version != WIRE_VERSION_V3 {
        return Err(XenithError::Serialization(format!(
            "unsupported wire format version: {version}"
        )));
    }

    let source_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
    pos += 8;

    let ver_ts = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
    pos += 8;
    let ver_seq = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
    pos += 8;
    let ver_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
    pos += 8;

    let updated_at = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
    pos += 8;

    let key_len = u16::from_be_bytes(raw[pos..pos + 2].try_into().unwrap()) as usize;
    pos += 2;

    if raw.len() < pos + key_len + 4 {
        return Err(XenithError::Serialization(
            "message truncated before key or data_len".into(),
        ));
    }

    let key_str = std::str::from_utf8(&raw[pos..pos + key_len])
        .map_err(|e| XenithError::Serialization(format!("invalid UTF-8 in key: {e}")))?;
    let key = StateKey::from_raw(key_str.to_owned());
    pos += key_len;

    let data_len = u32::from_be_bytes(raw[pos..pos + 4].try_into().unwrap()) as usize;
    pos += 4;

    if raw.len() < pos + data_len {
        return Err(XenithError::Serialization(
            "message truncated at data payload".into(),
        ));
    }

    let data = Bytes::copy_from_slice(&raw[pos..pos + data_len]);
    pos += data_len;

    let value = StateValue {
        data,
        version: StateVersion {
            timestamp_ms: ver_ts,
            sequence: ver_seq,
            source_chain: ver_chain,
        },
        updated_at,
        source_chain: ChainId(source_chain),
    };

    // Version 2 has no metadata section.
    if version == WIRE_VERSION_V2 {
        return Ok((key, value, None));
    }

    // Version 3: read has_metadata flag.
    if raw.len() < pos + 1 {
        return Err(XenithError::Serialization(
            "v3 message truncated before has_metadata flag".into(),
        ));
    }
    let has_metadata = raw[pos];
    pos += 1;

    let metadata = if has_metadata == 1 {
        if raw.len() < pos + 20 + 32 {
            return Err(XenithError::Serialization(
                "v3 message truncated in metadata section".into(),
            ));
        }
        let mut address = [0u8; 20];
        address.copy_from_slice(&raw[pos..pos + 20]);
        pos += 20;
        let mut slot = [0u8; 32];
        slot.copy_from_slice(&raw[pos..pos + 32]);
        Some(KeyMetadata {
            address: Some(address),
            slot: Some(slot),
        })
    } else {
        None
    };

    Ok((key, value, metadata))
}

#[cfg(test)]
mod tests {
    use super::*;

    fn sample_key() -> StateKey {
        StateKey::new("uniswap", "pool", "0xabc")
    }

    fn sample_value() -> StateValue {
        StateValue {
            data: Bytes::from_static(b"price=3400"),
            version: StateVersion {
                timestamp_ms: 1_700_000_000_000,
                sequence: 0,
                source_chain: 1,
            },
            updated_at: 1_700_000_000,
            source_chain: ChainId(1),
        }
    }

    #[test]
    fn encode_decode_roundtrip() {
        let key = sample_key();
        let value = sample_value();
        let encoded = encode(&key, &value, None);
        let (decoded_key, decoded_value, meta) = decode(&encoded).unwrap();
        assert_eq!(decoded_key, key);
        assert_eq!(decoded_value, value);
        assert!(meta.is_none());
    }

    #[test]
    fn encode_with_metadata_roundtrips() {
        let key = sample_key();
        let value = sample_value();
        let metadata = KeyMetadata {
            address: Some([0xABu8; 20]),
            slot: Some([0xCDu8; 32]),
        };
        let encoded = encode(&key, &value, Some(&metadata));
        let (decoded_key, decoded_value, decoded_meta) = decode(&encoded).unwrap();
        assert_eq!(decoded_key, key);
        assert_eq!(decoded_value, value);
        let m = decoded_meta.expect("metadata must be present");
        assert_eq!(m.address, Some([0xABu8; 20]));
        assert_eq!(m.slot, Some([0xCDu8; 32]));
    }

    #[test]
    fn encode_without_metadata_decodes_to_none() {
        let encoded = encode(&sample_key(), &sample_value(), None);
        let (_, _, meta) = decode(&encoded).unwrap();
        assert!(meta.is_none());
    }

    #[test]
    fn version2_message_decodes_without_metadata() {
        // Build a v2 message by hand (no metadata section).
        let key = sample_key();
        let value = sample_value();
        let key_bytes = key.as_ref().as_bytes();
        let mut buf = BytesMut::new();
        buf.put_slice(&MAGIC);
        buf.put_u8(WIRE_VERSION_V2);
        buf.put_u64(value.source_chain.0);
        buf.put_u64(value.version.timestamp_ms);
        buf.put_u64(value.version.sequence);
        buf.put_u64(value.version.source_chain);
        buf.put_u64(value.updated_at);
        buf.put_u16(key_bytes.len() as u16);
        buf.put_slice(key_bytes);
        buf.put_u32(value.data.len() as u32);
        buf.put_slice(&value.data);
        let raw = buf.freeze();

        let (decoded_key, decoded_value, meta) = decode(&raw).unwrap();
        assert_eq!(decoded_key, key);
        assert_eq!(decoded_value, value);
        assert!(meta.is_none(), "v2 messages decode with metadata = None");
    }

    #[test]
    fn bad_magic_returns_error() {
        let mut raw = encode(&sample_key(), &sample_value(), None).to_vec();
        raw[0] = 0xFF;
        assert!(matches!(decode(&raw), Err(XenithError::Serialization(_))));
    }

    #[test]
    fn truncated_message_returns_error() {
        let raw = encode(&sample_key(), &sample_value(), None);
        assert!(matches!(
            decode(&raw[..10]),
            Err(XenithError::Serialization(_))
        ));
    }
}