1use crate::state::integrity::DecisionRecord;
6use serde::{Deserialize, Serialize};
7
8pub const PROTOCOL_VERSION: u32 = 1;
10
11pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub enum ReplicationMessage {
17 Hello(HelloMessage),
19
20 Heartbeat(Heartbeat),
22
23 Decision(DecisionRecord),
25
26 SyncRequest(SyncRequest),
28
29 SyncResponse(SyncResponse),
31
32 Ack(AckMessage),
34
35 Error(ErrorMessage),
37
38 FencingToken(FencingToken),
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct HelloMessage {
45 pub version: u32,
47
48 pub node_id: String,
50
51 pub role: String,
53
54 pub sequence: u64,
56
57 #[serde(with = "hex_bytes")]
59 pub state_hash: [u8; 32],
60
61 #[serde(with = "option_hex_bytes")]
63 pub genesis_root: Option<[u8; 32]>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Heartbeat {
69 pub sequence: u64,
71
72 #[serde(with = "hex_bytes")]
74 pub state_hash: [u8; 32],
75
76 pub timestamp_ms: u64,
78
79 pub fencing_token: u64,
81}
82
83impl Heartbeat {
84 pub fn new(sequence: u64, state_hash: [u8; 32], fencing_token: u64) -> Self {
86 let timestamp_ms = std::time::SystemTime::now()
87 .duration_since(std::time::UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_millis() as u64;
90
91 Self {
92 sequence,
93 state_hash,
94 timestamp_ms,
95 fencing_token,
96 }
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct SyncRequest {
103 pub from_sequence: u64,
105
106 pub max_records: u32,
108
109 #[serde(with = "hex_bytes")]
111 pub expected_hash: [u8; 32],
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct SyncResponse {
117 pub records: Vec<DecisionRecord>,
119
120 pub has_more: bool,
122
123 pub current_sequence: u64,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct AckMessage {
130 pub sequence: u64,
132
133 #[serde(with = "hex_bytes")]
135 pub state_hash: [u8; 32],
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct ErrorMessage {
141 pub code: ErrorCode,
143
144 pub description: String,
146
147 pub sequence: Option<u64>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
153pub enum ErrorCode {
154 VersionMismatch,
156
157 HashMismatch,
159
160 SequenceGap,
162
163 InvalidFencingToken,
165
166 GenesisRootMismatch,
168
169 NotAuthorized,
171
172 Internal,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FencingToken {
179 pub token: u64,
181
182 pub issued_by: String,
184
185 pub issued_at: u64,
187
188 pub signature: Option<Vec<u8>>,
190}
191
192impl FencingToken {
193 pub fn new(token: u64, node_id: &str) -> Self {
195 let issued_at = std::time::SystemTime::now()
196 .duration_since(std::time::UNIX_EPOCH)
197 .unwrap_or_default()
198 .as_secs();
199
200 Self {
201 token,
202 issued_by: node_id.to_string(),
203 issued_at,
204 signature: None,
205 }
206 }
207
208 pub fn supersedes(&self, other: &FencingToken) -> bool {
210 self.token > other.token
211 }
212}
213
214mod hex_bytes {
216 use serde::{Deserialize, Deserializer, Serializer};
217
218 pub fn serialize<S>(bytes: &[u8; 32], serializer: S) -> Result<S::Ok, S::Error>
219 where
220 S: Serializer,
221 {
222 serializer.serialize_str(&hex::encode(bytes))
223 }
224
225 pub fn deserialize<'de, D>(deserializer: D) -> Result<[u8; 32], D::Error>
226 where
227 D: Deserializer<'de>,
228 {
229 let s: String = Deserialize::deserialize(deserializer)?;
230 let s = s.strip_prefix("0x").unwrap_or(&s);
231 let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
232 if bytes.len() != 32 {
233 return Err(serde::de::Error::custom(format!(
234 "expected 32 bytes, got {}",
235 bytes.len()
236 )));
237 }
238 let mut arr = [0u8; 32];
239 arr.copy_from_slice(&bytes);
240 Ok(arr)
241 }
242}
243
244mod option_hex_bytes {
246 use serde::{Deserialize, Deserializer, Serializer};
247
248 pub fn serialize<S>(bytes: &Option<[u8; 32]>, serializer: S) -> Result<S::Ok, S::Error>
249 where
250 S: Serializer,
251 {
252 match bytes {
253 Some(b) => serializer.serialize_some(&hex::encode(b)),
254 None => serializer.serialize_none(),
255 }
256 }
257
258 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<[u8; 32]>, D::Error>
259 where
260 D: Deserializer<'de>,
261 {
262 let opt: Option<String> = Deserialize::deserialize(deserializer)?;
263 match opt {
264 Some(s) => {
265 let s = s.strip_prefix("0x").unwrap_or(&s);
266 let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
267 if bytes.len() != 32 {
268 return Err(serde::de::Error::custom(format!(
269 "expected 32 bytes, got {}",
270 bytes.len()
271 )));
272 }
273 let mut arr = [0u8; 32];
274 arr.copy_from_slice(&bytes);
275 Ok(Some(arr))
276 }
277 None => Ok(None),
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn test_heartbeat_creation() {
288 let hb = Heartbeat::new(100, [1u8; 32], 5);
289 assert_eq!(hb.sequence, 100);
290 assert_eq!(hb.fencing_token, 5);
291 assert!(hb.timestamp_ms > 0);
292 }
293
294 #[test]
295 fn test_fencing_token_supersedes() {
296 let token1 = FencingToken::new(1, "node-a");
297 let token2 = FencingToken::new(2, "node-b");
298
299 assert!(token2.supersedes(&token1));
300 assert!(!token1.supersedes(&token2));
301 }
302
303 #[test]
304 fn test_message_serialization() {
305 let msg = ReplicationMessage::Heartbeat(Heartbeat::new(100, [0u8; 32], 1));
306 let json = serde_json::to_string(&msg).unwrap();
307 let parsed: ReplicationMessage = serde_json::from_str(&json).unwrap();
308
309 if let ReplicationMessage::Heartbeat(hb) = parsed {
310 assert_eq!(hb.sequence, 100);
311 } else {
312 panic!("Expected Heartbeat message");
313 }
314 }
315}