1use bytes::{BufMut, Bytes, BytesMut};
2
3use crate::{ChainId, KeyMetadata, Result, StateKey, StateValue, StateVersion, XenithError};
4
5#[derive(Debug, Clone, PartialEq)]
50pub struct WireMessage {
51 pub version: u8,
52 pub source_chain: u64,
53 pub state_version: StateVersion,
54 pub updated_at: u64,
55 pub key: StateKey,
56 pub data: Bytes,
57}
58
59const MAGIC: [u8; 4] = [0x58, 0x45, 0x4E, 0x48];
60const WIRE_VERSION_V2: u8 = 2;
61const WIRE_VERSION_V3: u8 = 3;
62
63pub fn encode(key: &StateKey, value: &StateValue, metadata: Option<&KeyMetadata>) -> Bytes {
69 let key_bytes = key.as_ref().as_bytes();
70 let mut capacity = 4 + 1 + 8 + 8 + 8 + 8 + 8 + 2 + key_bytes.len() + 4 + value.data.len();
73 if metadata.is_some() {
74 capacity += 1 + 20 + 32; } else {
76 capacity += 1; }
78 let mut buf = BytesMut::with_capacity(capacity);
79
80 buf.put_slice(&MAGIC);
81 buf.put_u8(WIRE_VERSION_V3);
82 buf.put_u64(value.source_chain.0);
83 buf.put_u64(value.version.timestamp_ms);
84 buf.put_u64(value.version.sequence);
85 buf.put_u64(value.version.source_chain);
86 buf.put_u64(value.updated_at);
87 buf.put_u16(key_bytes.len() as u16);
88 buf.put_slice(key_bytes);
89 buf.put_u32(value.data.len() as u32);
90 buf.put_slice(&value.data);
91
92 match metadata {
93 Some(m) => {
94 buf.put_u8(1);
95 buf.put_slice(&m.address.unwrap_or([0u8; 20]));
96 buf.put_slice(&m.slot.unwrap_or([0u8; 32]));
97 }
98 None => buf.put_u8(0),
99 }
100
101 buf.freeze()
102}
103
104pub fn decode(raw: &[u8]) -> Result<(StateKey, StateValue, Option<KeyMetadata>)> {
112 const MIN_HEADER: usize = 51;
116 if raw.len() < MIN_HEADER {
117 return Err(XenithError::Serialization(format!(
118 "message too short: {} bytes (minimum {MIN_HEADER})",
119 raw.len()
120 )));
121 }
122
123 let mut pos = 0;
124
125 if raw[pos..pos + 4] != MAGIC {
126 return Err(XenithError::Serialization(format!(
127 "invalid magic bytes: {:?}",
128 &raw[pos..pos + 4]
129 )));
130 }
131 pos += 4;
132
133 let version = raw[pos];
134 pos += 1;
135 if version != WIRE_VERSION_V2 && version != WIRE_VERSION_V3 {
136 return Err(XenithError::Serialization(format!(
137 "unsupported wire format version: {version}"
138 )));
139 }
140
141 let source_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
142 pos += 8;
143
144 let ver_ts = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
145 pos += 8;
146 let ver_seq = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
147 pos += 8;
148 let ver_chain = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
149 pos += 8;
150
151 let updated_at = u64::from_be_bytes(raw[pos..pos + 8].try_into().unwrap());
152 pos += 8;
153
154 let key_len = u16::from_be_bytes(raw[pos..pos + 2].try_into().unwrap()) as usize;
155 pos += 2;
156
157 if raw.len() < pos + key_len + 4 {
158 return Err(XenithError::Serialization(
159 "message truncated before key or data_len".into(),
160 ));
161 }
162
163 let key_str = std::str::from_utf8(&raw[pos..pos + key_len])
164 .map_err(|e| XenithError::Serialization(format!("invalid UTF-8 in key: {e}")))?;
165 let key = StateKey::from_raw(key_str.to_owned());
166 pos += key_len;
167
168 let data_len = u32::from_be_bytes(raw[pos..pos + 4].try_into().unwrap()) as usize;
169 pos += 4;
170
171 if raw.len() < pos + data_len {
172 return Err(XenithError::Serialization(
173 "message truncated at data payload".into(),
174 ));
175 }
176
177 let data = Bytes::copy_from_slice(&raw[pos..pos + data_len]);
178 pos += data_len;
179
180 let value = StateValue {
181 data,
182 version: StateVersion {
183 timestamp_ms: ver_ts,
184 sequence: ver_seq,
185 source_chain: ver_chain,
186 },
187 updated_at,
188 source_chain: ChainId(source_chain),
189 };
190
191 if version == WIRE_VERSION_V2 {
193 return Ok((key, value, None));
194 }
195
196 if raw.len() < pos + 1 {
198 return Err(XenithError::Serialization(
199 "v3 message truncated before has_metadata flag".into(),
200 ));
201 }
202 let has_metadata = raw[pos];
203 pos += 1;
204
205 let metadata = if has_metadata == 1 {
206 if raw.len() < pos + 20 + 32 {
207 return Err(XenithError::Serialization(
208 "v3 message truncated in metadata section".into(),
209 ));
210 }
211 let mut address = [0u8; 20];
212 address.copy_from_slice(&raw[pos..pos + 20]);
213 pos += 20;
214 let mut slot = [0u8; 32];
215 slot.copy_from_slice(&raw[pos..pos + 32]);
216 Some(KeyMetadata {
217 address: Some(address),
218 slot: Some(slot),
219 })
220 } else {
221 None
222 };
223
224 Ok((key, value, metadata))
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 fn sample_key() -> StateKey {
232 StateKey::new("uniswap", "pool", "0xabc")
233 }
234
235 fn sample_value() -> StateValue {
236 StateValue {
237 data: Bytes::from_static(b"price=3400"),
238 version: StateVersion {
239 timestamp_ms: 1_700_000_000_000,
240 sequence: 0,
241 source_chain: 1,
242 },
243 updated_at: 1_700_000_000,
244 source_chain: ChainId(1),
245 }
246 }
247
248 #[test]
249 fn encode_decode_roundtrip() {
250 let key = sample_key();
251 let value = sample_value();
252 let encoded = encode(&key, &value, None);
253 let (decoded_key, decoded_value, meta) = decode(&encoded).unwrap();
254 assert_eq!(decoded_key, key);
255 assert_eq!(decoded_value, value);
256 assert!(meta.is_none());
257 }
258
259 #[test]
260 fn encode_with_metadata_roundtrips() {
261 let key = sample_key();
262 let value = sample_value();
263 let metadata = KeyMetadata {
264 address: Some([0xABu8; 20]),
265 slot: Some([0xCDu8; 32]),
266 };
267 let encoded = encode(&key, &value, Some(&metadata));
268 let (decoded_key, decoded_value, decoded_meta) = decode(&encoded).unwrap();
269 assert_eq!(decoded_key, key);
270 assert_eq!(decoded_value, value);
271 let m = decoded_meta.expect("metadata must be present");
272 assert_eq!(m.address, Some([0xABu8; 20]));
273 assert_eq!(m.slot, Some([0xCDu8; 32]));
274 }
275
276 #[test]
277 fn encode_without_metadata_decodes_to_none() {
278 let encoded = encode(&sample_key(), &sample_value(), None);
279 let (_, _, meta) = decode(&encoded).unwrap();
280 assert!(meta.is_none());
281 }
282
283 #[test]
284 fn version2_message_decodes_without_metadata() {
285 let key = sample_key();
287 let value = sample_value();
288 let key_bytes = key.as_ref().as_bytes();
289 let mut buf = BytesMut::new();
290 buf.put_slice(&MAGIC);
291 buf.put_u8(WIRE_VERSION_V2);
292 buf.put_u64(value.source_chain.0);
293 buf.put_u64(value.version.timestamp_ms);
294 buf.put_u64(value.version.sequence);
295 buf.put_u64(value.version.source_chain);
296 buf.put_u64(value.updated_at);
297 buf.put_u16(key_bytes.len() as u16);
298 buf.put_slice(key_bytes);
299 buf.put_u32(value.data.len() as u32);
300 buf.put_slice(&value.data);
301 let raw = buf.freeze();
302
303 let (decoded_key, decoded_value, meta) = decode(&raw).unwrap();
304 assert_eq!(decoded_key, key);
305 assert_eq!(decoded_value, value);
306 assert!(meta.is_none(), "v2 messages decode with metadata = None");
307 }
308
309 #[test]
310 fn bad_magic_returns_error() {
311 let mut raw = encode(&sample_key(), &sample_value(), None).to_vec();
312 raw[0] = 0xFF;
313 assert!(matches!(decode(&raw), Err(XenithError::Serialization(_))));
314 }
315
316 #[test]
317 fn truncated_message_returns_error() {
318 let raw = encode(&sample_key(), &sample_value(), None);
319 assert!(matches!(
320 decode(&raw[..10]),
321 Err(XenithError::Serialization(_))
322 ));
323 }
324}