nomad_protocol/sync/
message.rs

1//! Sync message types
2//!
3//! Implements the sync message format from 3-SYNC.md contract.
4
5use thiserror::Error;
6
7/// Sync message format (inside encrypted payload)
8///
9/// Wire format:
10/// ```text
11/// +0   Sender State Num (8 bytes LE64)
12/// +8   Acked State Num (8 bytes LE64)
13/// +16  Base State Num (8 bytes LE64)
14/// +24  Diff Length (4 bytes LE32)
15/// +28  Diff Payload (variable)
16/// ```
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct SyncMessage {
19    /// Version of sender's current state
20    pub sender_state_num: u64,
21    /// Highest version received from peer (acknowledgment)
22    pub acked_state_num: u64,
23    /// Version this diff was computed from
24    pub base_state_num: u64,
25    /// Application-specific diff encoding
26    pub diff: Vec<u8>,
27}
28
29/// Header size in bytes (3 x u64 + u32 = 28)
30pub const SYNC_MESSAGE_HEADER_SIZE: usize = 28;
31
32impl SyncMessage {
33    /// Create a new sync message
34    pub fn new(
35        sender_state_num: u64,
36        acked_state_num: u64,
37        base_state_num: u64,
38        diff: Vec<u8>,
39    ) -> Self {
40        Self {
41            sender_state_num,
42            acked_state_num,
43            base_state_num,
44            diff,
45        }
46    }
47
48    /// Create an ack-only message (empty diff)
49    pub fn ack_only(current_version: u64, acked_version: u64) -> Self {
50        Self {
51            sender_state_num: current_version,
52            acked_state_num: acked_version,
53            base_state_num: 0,
54            diff: Vec::new(),
55        }
56    }
57
58    /// Check if this is an ack-only message
59    pub fn is_ack_only(&self) -> bool {
60        self.diff.is_empty()
61    }
62
63    /// Total wire size
64    pub fn wire_size(&self) -> usize {
65        SYNC_MESSAGE_HEADER_SIZE + self.diff.len()
66    }
67
68    /// Encode to wire format (28-byte header + diff)
69    pub fn encode(&self) -> Vec<u8> {
70        let mut buf = Vec::with_capacity(self.wire_size());
71        buf.extend_from_slice(&self.sender_state_num.to_le_bytes());
72        buf.extend_from_slice(&self.acked_state_num.to_le_bytes());
73        buf.extend_from_slice(&self.base_state_num.to_le_bytes());
74        buf.extend_from_slice(&(self.diff.len() as u32).to_le_bytes());
75        buf.extend_from_slice(&self.diff);
76        buf
77    }
78
79    /// Encode into existing buffer, returns bytes written
80    pub fn encode_into(&self, buf: &mut [u8]) -> Result<usize, MessageError> {
81        let size = self.wire_size();
82        if buf.len() < size {
83            return Err(MessageError::BufferTooSmall {
84                required: size,
85                available: buf.len(),
86            });
87        }
88
89        buf[0..8].copy_from_slice(&self.sender_state_num.to_le_bytes());
90        buf[8..16].copy_from_slice(&self.acked_state_num.to_le_bytes());
91        buf[16..24].copy_from_slice(&self.base_state_num.to_le_bytes());
92        buf[24..28].copy_from_slice(&(self.diff.len() as u32).to_le_bytes());
93        buf[28..size].copy_from_slice(&self.diff);
94
95        Ok(size)
96    }
97
98    /// Decode from wire format
99    pub fn decode(data: &[u8]) -> Result<Self, MessageError> {
100        if data.len() < SYNC_MESSAGE_HEADER_SIZE {
101            return Err(MessageError::TooShort {
102                expected: SYNC_MESSAGE_HEADER_SIZE,
103                actual: data.len(),
104            });
105        }
106
107        let sender_state_num = u64::from_le_bytes(data[0..8].try_into().unwrap());
108        let acked_state_num = u64::from_le_bytes(data[8..16].try_into().unwrap());
109        let base_state_num = u64::from_le_bytes(data[16..24].try_into().unwrap());
110        let diff_len = u32::from_le_bytes(data[24..28].try_into().unwrap()) as usize;
111
112        if data.len() < SYNC_MESSAGE_HEADER_SIZE + diff_len {
113            return Err(MessageError::TooShort {
114                expected: SYNC_MESSAGE_HEADER_SIZE + diff_len,
115                actual: data.len(),
116            });
117        }
118
119        let diff = data[SYNC_MESSAGE_HEADER_SIZE..SYNC_MESSAGE_HEADER_SIZE + diff_len].to_vec();
120
121        Ok(Self {
122            sender_state_num,
123            acked_state_num,
124            base_state_num,
125            diff,
126        })
127    }
128
129    /// Decode from wire format, returning message and bytes consumed
130    pub fn decode_with_length(data: &[u8]) -> Result<(Self, usize), MessageError> {
131        let msg = Self::decode(data)?;
132        let consumed = SYNC_MESSAGE_HEADER_SIZE + msg.diff.len();
133        Ok((msg, consumed))
134    }
135}
136
137/// Sync message encoding/decoding errors.
138#[derive(Debug, Error, Clone, PartialEq, Eq)]
139pub enum MessageError {
140    /// Input data is shorter than required.
141    #[error("message too short: expected {expected} bytes, got {actual}")]
142    TooShort {
143        /// Minimum bytes required.
144        expected: usize,
145        /// Actual bytes received.
146        actual: usize,
147    },
148
149    /// Output buffer is too small to hold encoded data.
150    #[error("buffer too small: required {required} bytes, available {available}")]
151    BufferTooSmall {
152        /// Bytes needed for encoding.
153        required: usize,
154        /// Bytes available in buffer.
155        available: usize,
156    },
157
158    /// Message format is invalid or corrupted.
159    #[error("invalid format: {0}")]
160    InvalidFormat(String),
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[test]
168    fn test_encode_decode_roundtrip() {
169        let msg = SyncMessage::new(100, 50, 45, vec![1, 2, 3, 4, 5]);
170
171        let encoded = msg.encode();
172        assert_eq!(encoded.len(), SYNC_MESSAGE_HEADER_SIZE + 5);
173
174        let decoded = SyncMessage::decode(&encoded).unwrap();
175        assert_eq!(decoded, msg);
176    }
177
178    #[test]
179    fn test_ack_only_message() {
180        let msg = SyncMessage::ack_only(100, 50);
181
182        assert!(msg.is_ack_only());
183        assert_eq!(msg.sender_state_num, 100);
184        assert_eq!(msg.acked_state_num, 50);
185        assert_eq!(msg.base_state_num, 0);
186        assert!(msg.diff.is_empty());
187
188        let encoded = msg.encode();
189        assert_eq!(encoded.len(), SYNC_MESSAGE_HEADER_SIZE);
190    }
191
192    #[test]
193    fn test_decode_too_short() {
194        let data = [0u8; 20]; // Less than header size
195        let result = SyncMessage::decode(&data);
196        assert!(matches!(result, Err(MessageError::TooShort { .. })));
197    }
198
199    #[test]
200    fn test_decode_diff_truncated() {
201        let msg = SyncMessage::new(1, 2, 3, vec![1, 2, 3, 4, 5]);
202        let mut encoded = msg.encode();
203        encoded.truncate(30); // Cut off some diff bytes
204
205        let result = SyncMessage::decode(&encoded);
206        assert!(matches!(result, Err(MessageError::TooShort { .. })));
207    }
208
209    #[test]
210    fn test_encode_into_buffer() {
211        let msg = SyncMessage::new(100, 50, 45, vec![1, 2, 3]);
212        let mut buf = [0u8; 100];
213
214        let written = msg.encode_into(&mut buf).unwrap();
215        assert_eq!(written, SYNC_MESSAGE_HEADER_SIZE + 3);
216
217        let decoded = SyncMessage::decode(&buf[..written]).unwrap();
218        assert_eq!(decoded, msg);
219    }
220
221    #[test]
222    fn test_encode_into_small_buffer() {
223        let msg = SyncMessage::new(100, 50, 45, vec![1, 2, 3, 4, 5]);
224        let mut buf = [0u8; 10]; // Too small
225
226        let result = msg.encode_into(&mut buf);
227        assert!(matches!(result, Err(MessageError::BufferTooSmall { .. })));
228    }
229
230    #[test]
231    fn test_wire_size() {
232        let msg = SyncMessage::new(1, 2, 3, vec![0; 100]);
233        assert_eq!(msg.wire_size(), SYNC_MESSAGE_HEADER_SIZE + 100);
234    }
235
236    #[test]
237    fn test_decode_with_length() {
238        let msg = SyncMessage::new(10, 20, 30, vec![1, 2, 3]);
239        let mut data = msg.encode();
240        data.extend_from_slice(&[0xFF; 50]); // Extra trailing data
241
242        let (decoded, consumed) = SyncMessage::decode_with_length(&data).unwrap();
243        assert_eq!(decoded, msg);
244        assert_eq!(consumed, SYNC_MESSAGE_HEADER_SIZE + 3);
245    }
246}