Skip to main content

xenith_core/
wire.rs

1use bytes::{BufMut, Bytes, BytesMut};
2
3use crate::{ChainId, KeyMetadata, Result, StateKey, StateValue, StateVersion, XenithError};
4
5/// Binary wire format for cross-chain state messages.
6///
7/// Layout (all multi-byte integers big-endian):
8/// ```text
9/// [0x58 0x45 0x4E 0x48]  magic ("XENH", 4 bytes)
10/// u8                     wire format version (2 or 3)
11/// u64                    source_chain
12/// u64                    version.timestamp_ms
13/// u64                    version.sequence
14/// u64                    version.source_chain
15/// u64                    updated_at
16/// u16                    key_len
17/// [u8; key_len]          key bytes (UTF-8)
18/// u32                    data_len
19/// [u8; data_len]         payload bytes
20/// --- version 3 only ---
21/// u8                     has_metadata (0 or 1)
22/// [u8; 20]               address  (only when has_metadata == 1)
23/// [u8; 32]               slot     (only when has_metadata == 1)
24/// ```
25///
26/// Version 2 messages decode with `metadata = None`. Version 3 messages carry
27/// optional on-chain metadata (contract address + storage slot) that enables
28/// [`crate::ReadStrategy::Quorum`] reads without a separate `set_metadata` call.
29///
30/// # Example
31///
32/// ```
33/// use bytes::Bytes;
34/// use xenith_core::{ChainId, KeyMetadata, StateKey, StateValue, StateVersion, wire};
35///
36/// let key = StateKey::new("uniswap", "pool", "0xabc");
37/// let value = StateValue {
38///     data: Bytes::from_static(b"price=3400"),
39///     version: StateVersion { timestamp_ms: 1_700_000_000_000, sequence: 0, source_chain: 1 },
40///     updated_at: 1_700_000_000,
41///     source_chain: ChainId::from(1),
42/// };
43/// let encoded = wire::encode(&key, &value, None);
44/// let (decoded_key, decoded_value, meta) = wire::decode(&encoded).unwrap();
45/// assert_eq!(decoded_key, key);
46/// assert_eq!(decoded_value.version.timestamp_ms, 1_700_000_000_000);
47/// assert!(meta.is_none());
48/// ```
49#[derive(Debug, Clone, PartialEq)]
50pub struct WireMessage {
51    pub version: u8,
52    pub source_chain: u64,
53    pub state_version: StateVersion,
54    pub updated_at: u64,
55    pub key: StateKey,
56    pub data: Bytes,
57}
58
59const MAGIC: [u8; 4] = [0x58, 0x45, 0x4E, 0x48];
60const WIRE_VERSION_V2: u8 = 2;
61const WIRE_VERSION_V3: u8 = 3;
62
63/// Encode a `(key, value, metadata)` triple into the xenith binary wire format.
64///
65/// Pass `metadata = Some(...)` if you intend to use [`crate::ReadStrategy::Quorum`]
66/// on the receiving side — the metadata travels with the message so the receiver
67/// can register it without a separate call.
68pub fn encode(key: &StateKey, value: &StateValue, metadata: Option<&KeyMetadata>) -> Bytes {
69    let key_bytes = key.as_ref().as_bytes();
70    // Base: 4 (magic) + 1 (version) + 8×5 (chain/version/updated_at)
71    //     + 2 (key_len) + key + 4 (data_len) + data
72    let mut capacity = 4 + 1 + 8 + 8 + 8 + 8 + 8 + 2 + key_bytes.len() + 4 + value.data.len();
73    if metadata.is_some() {
74        capacity += 1 + 20 + 32; // has_metadata(1) + address(20) + slot(32)
75    } else {
76        capacity += 1; // has_metadata flag (0)
77    }
78    let mut buf = BytesMut::with_capacity(capacity);
79
80    buf.put_slice(&MAGIC);
81    buf.put_u8(WIRE_VERSION_V3);
82    buf.put_u64(value.source_chain.0);
83    buf.put_u64(value.version.timestamp_ms);
84    buf.put_u64(value.version.sequence);
85    buf.put_u64(value.version.source_chain);
86    buf.put_u64(value.updated_at);
87    buf.put_u16(key_bytes.len() as u16);
88    buf.put_slice(key_bytes);
89    buf.put_u32(value.data.len() as u32);
90    buf.put_slice(&value.data);
91
92    match metadata {
93        Some(m) => {
94            buf.put_u8(1);
95            buf.put_slice(&m.address.unwrap_or([0u8; 20]));
96            buf.put_slice(&m.slot.unwrap_or([0u8; 32]));
97        }
98        None => buf.put_u8(0),
99    }
100
101    buf.freeze()
102}
103
104/// Decode a xenith wire message into `(StateKey, StateValue, Option<KeyMetadata>)`.
105///
106/// Accepts both version 2 (no metadata) and version 3 (with optional metadata).
107/// Version 2 messages always decode with `metadata = None`.
108///
109/// Returns [`XenithError::Serialization`] if the buffer is truncated, carries an
110/// unrecognised magic prefix, or uses an unsupported wire format version.
111pub fn decode(raw: &[u8]) -> Result<(StateKey, StateValue, Option<KeyMetadata>)> {
112    // Fixed header: magic(4) + version(1) + source_chain(8)
113    //   + ver.ts(8) + ver.seq(8) + ver.chain(8) + updated_at(8)
114    //   + key_len(2) + data_len(4) = 51 bytes minimum.
115    const MIN_HEADER: usize = 51;
116    if raw.len() < MIN_HEADER {
117        return Err(XenithError::Serialization(format!(
118            "message too short: {} bytes (minimum {MIN_HEADER})",
119            raw.len()
120        )));
121    }
122
123    let mut pos = 0;
124
125    if raw[pos..pos + 4] != MAGIC {
126        return Err(XenithError::Serialization(format!(
127            "invalid magic bytes: {:?}",
128            &raw[pos..pos + 4]
129        )));
130    }
131    pos += 4;
132
133    let version = raw[pos];
134    pos += 1;
135    if version != WIRE_VERSION_V2 && version != WIRE_VERSION_V3 {
136        return Err(XenithError::Serialization(format!(
137            "unsupported wire format version: {version}"
138        )));
139    }
140
141    let source_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
142    pos += 8;
143
144    let ver_ts = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
145    pos += 8;
146    let ver_seq = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
147    pos += 8;
148    let ver_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
149    pos += 8;
150
151    let updated_at = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
152    pos += 8;
153
154    let key_len = u16::from_be_bytes(raw[pos..pos + 2].try_into().unwrap()) as usize;
155    pos += 2;
156
157    if raw.len() < pos + key_len + 4 {
158        return Err(XenithError::Serialization(
159            "message truncated before key or data_len".into(),
160        ));
161    }
162
163    let key_str = std::str::from_utf8(&raw[pos..pos + key_len])
164        .map_err(|e| XenithError::Serialization(format!("invalid UTF-8 in key: {e}")))?;
165    let key = StateKey::from_raw(key_str.to_owned());
166    pos += key_len;
167
168    let data_len = u32::from_be_bytes(raw[pos..pos + 4].try_into().unwrap()) as usize;
169    pos += 4;
170
171    if raw.len() < pos + data_len {
172        return Err(XenithError::Serialization(
173            "message truncated at data payload".into(),
174        ));
175    }
176
177    let data = Bytes::copy_from_slice(&raw[pos..pos + data_len]);
178    pos += data_len;
179
180    let value = StateValue {
181        data,
182        version: StateVersion {
183            timestamp_ms: ver_ts,
184            sequence: ver_seq,
185            source_chain: ver_chain,
186        },
187        updated_at,
188        source_chain: ChainId(source_chain),
189    };
190
191    // Version 2 has no metadata section.
192    if version == WIRE_VERSION_V2 {
193        return Ok((key, value, None));
194    }
195
196    // Version 3: read has_metadata flag.
197    if raw.len() < pos + 1 {
198        return Err(XenithError::Serialization(
199            "v3 message truncated before has_metadata flag".into(),
200        ));
201    }
202    let has_metadata = raw[pos];
203    pos += 1;
204
205    let metadata = if has_metadata == 1 {
206        if raw.len() < pos + 20 + 32 {
207            return Err(XenithError::Serialization(
208                "v3 message truncated in metadata section".into(),
209            ));
210        }
211        let mut address = [0u8; 20];
212        address.copy_from_slice(&raw[pos..pos + 20]);
213        pos += 20;
214        let mut slot = [0u8; 32];
215        slot.copy_from_slice(&raw[pos..pos + 32]);
216        Some(KeyMetadata {
217            address: Some(address),
218            slot: Some(slot),
219        })
220    } else {
221        None
222    };
223
224    Ok((key, value, metadata))
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    fn sample_key() -> StateKey {
232        StateKey::new("uniswap", "pool", "0xabc")
233    }
234
235    fn sample_value() -> StateValue {
236        StateValue {
237            data: Bytes::from_static(b"price=3400"),
238            version: StateVersion {
239                timestamp_ms: 1_700_000_000_000,
240                sequence: 0,
241                source_chain: 1,
242            },
243            updated_at: 1_700_000_000,
244            source_chain: ChainId(1),
245        }
246    }
247
248    #[test]
249    fn encode_decode_roundtrip() {
250        let key = sample_key();
251        let value = sample_value();
252        let encoded = encode(&key, &value, None);
253        let (decoded_key, decoded_value, meta) = decode(&encoded).unwrap();
254        assert_eq!(decoded_key, key);
255        assert_eq!(decoded_value, value);
256        assert!(meta.is_none());
257    }
258
259    #[test]
260    fn encode_with_metadata_roundtrips() {
261        let key = sample_key();
262        let value = sample_value();
263        let metadata = KeyMetadata {
264            address: Some([0xABu8; 20]),
265            slot: Some([0xCDu8; 32]),
266        };
267        let encoded = encode(&key, &value, Some(&metadata));
268        let (decoded_key, decoded_value, decoded_meta) = decode(&encoded).unwrap();
269        assert_eq!(decoded_key, key);
270        assert_eq!(decoded_value, value);
271        let m = decoded_meta.expect("metadata must be present");
272        assert_eq!(m.address, Some([0xABu8; 20]));
273        assert_eq!(m.slot, Some([0xCDu8; 32]));
274    }
275
276    #[test]
277    fn encode_without_metadata_decodes_to_none() {
278        let encoded = encode(&sample_key(), &sample_value(), None);
279        let (_, _, meta) = decode(&encoded).unwrap();
280        assert!(meta.is_none());
281    }
282
283    #[test]
284    fn version2_message_decodes_without_metadata() {
285        // Build a v2 message by hand (no metadata section).
286        let key = sample_key();
287        let value = sample_value();
288        let key_bytes = key.as_ref().as_bytes();
289        let mut buf = BytesMut::new();
290        buf.put_slice(&MAGIC);
291        buf.put_u8(WIRE_VERSION_V2);
292        buf.put_u64(value.source_chain.0);
293        buf.put_u64(value.version.timestamp_ms);
294        buf.put_u64(value.version.sequence);
295        buf.put_u64(value.version.source_chain);
296        buf.put_u64(value.updated_at);
297        buf.put_u16(key_bytes.len() as u16);
298        buf.put_slice(key_bytes);
299        buf.put_u32(value.data.len() as u32);
300        buf.put_slice(&value.data);
301        let raw = buf.freeze();
302
303        let (decoded_key, decoded_value, meta) = decode(&raw).unwrap();
304        assert_eq!(decoded_key, key);
305        assert_eq!(decoded_value, value);
306        assert!(meta.is_none(), "v2 messages decode with metadata = None");
307    }
308
309    #[test]
310    fn bad_magic_returns_error() {
311        let mut raw = encode(&sample_key(), &sample_value(), None).to_vec();
312        raw[0] = 0xFF;
313        assert!(matches!(decode(&raw), Err(XenithError::Serialization(_))));
314    }
315
316    #[test]
317    fn truncated_message_returns_error() {
318        let raw = encode(&sample_key(), &sample_value(), None);
319        assert!(matches!(
320            decode(&raw[..10]),
321            Err(XenithError::Serialization(_))
322        ));
323    }
324}