Skip to main content

nklave_core/replication/
protocol.rs

1//! Replication protocol messages
2//!
3//! Defines the wire format for primary-passive communication.
4
5use crate::state::integrity::DecisionRecord;
6use serde::{Deserialize, Serialize};
7
8/// Protocol version for compatibility checking
9pub const PROTOCOL_VERSION: u32 = 1;
10
11/// Maximum message size (10MB)
12pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
13
14/// Replication protocol messages
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub enum ReplicationMessage {
17    /// Handshake to establish connection
18    Hello(HelloMessage),
19
20    /// Periodic heartbeat from primary
21    Heartbeat(Heartbeat),
22
23    /// Decision record streamed after signing
24    Decision(DecisionRecord),
25
26    /// Request to sync from a sequence number
27    SyncRequest(SyncRequest),
28
29    /// Response with batch of decision records
30    SyncResponse(SyncResponse),
31
32    /// Acknowledgment of received messages
33    Ack(AckMessage),
34
35    /// Error message
36    Error(ErrorMessage),
37
38    /// Fencing token for split-brain prevention
39    FencingToken(FencingToken),
40}
41
42/// Initial handshake message
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct HelloMessage {
45    /// Protocol version
46    pub version: u32,
47
48    /// Node identifier
49    pub node_id: String,
50
51    /// Current role (Primary or Passive)
52    pub role: String,
53
54    /// Current sequence number
55    pub sequence: u64,
56
57    /// Current state hash
58    #[serde(with = "hex_bytes")]
59    pub state_hash: [u8; 32],
60
61    /// Genesis validators root (if set)
62    #[serde(with = "option_hex_bytes")]
63    pub genesis_root: Option<[u8; 32]>,
64}
65
66/// Heartbeat message sent periodically by primary
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Heartbeat {
69    /// Current sequence number on primary
70    pub sequence: u64,
71
72    /// Current state hash on primary
73    #[serde(with = "hex_bytes")]
74    pub state_hash: [u8; 32],
75
76    /// Unix timestamp (milliseconds)
77    pub timestamp_ms: u64,
78
79    /// Current fencing token
80    pub fencing_token: u64,
81}
82
83impl Heartbeat {
84    /// Create a new heartbeat
85    pub fn new(sequence: u64, state_hash: [u8; 32], fencing_token: u64) -> Self {
86        let timestamp_ms = std::time::SystemTime::now()
87            .duration_since(std::time::UNIX_EPOCH)
88            .unwrap_or_default()
89            .as_millis() as u64;
90
91        Self {
92            sequence,
93            state_hash,
94            timestamp_ms,
95            fencing_token,
96        }
97    }
98}
99
100/// Request to sync decision records from a specific sequence
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct SyncRequest {
103    /// Start sequence number (exclusive)
104    pub from_sequence: u64,
105
106    /// Maximum number of records to return
107    pub max_records: u32,
108
109    /// Expected state hash at from_sequence
110    #[serde(with = "hex_bytes")]
111    pub expected_hash: [u8; 32],
112}
113
114/// Response containing batched decision records
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct SyncResponse {
117    /// Records in sequence order
118    pub records: Vec<DecisionRecord>,
119
120    /// Whether there are more records available
121    pub has_more: bool,
122
123    /// Current sequence on primary
124    pub current_sequence: u64,
125}
126
127/// Acknowledgment message
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct AckMessage {
130    /// Sequence number being acknowledged
131    pub sequence: u64,
132
133    /// State hash after applying up to this sequence
134    #[serde(with = "hex_bytes")]
135    pub state_hash: [u8; 32],
136}
137
138/// Error message
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct ErrorMessage {
141    /// Error code
142    pub code: ErrorCode,
143
144    /// Human-readable description
145    pub description: String,
146
147    /// Sequence number where error occurred (if applicable)
148    pub sequence: Option<u64>,
149}
150
151/// Error codes for replication protocol
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
153pub enum ErrorCode {
154    /// Protocol version mismatch
155    VersionMismatch,
156
157    /// State hash mismatch (divergence detected)
158    HashMismatch,
159
160    /// Sequence gap detected
161    SequenceGap,
162
163    /// Invalid fencing token
164    InvalidFencingToken,
165
166    /// Genesis root mismatch
167    GenesisRootMismatch,
168
169    /// Not authorized (wrong role)
170    NotAuthorized,
171
172    /// Internal error
173    Internal,
174}
175
176/// Fencing token for split-brain prevention
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FencingToken {
179    /// Token value (monotonically increasing)
180    pub token: u64,
181
182    /// Node ID that generated this token
183    pub issued_by: String,
184
185    /// Unix timestamp when issued
186    pub issued_at: u64,
187
188    /// Signature over (token, issued_by, issued_at) - optional for now
189    pub signature: Option<Vec<u8>>,
190}
191
192impl FencingToken {
193    /// Create a new fencing token
194    pub fn new(token: u64, node_id: &str) -> Self {
195        let issued_at = std::time::SystemTime::now()
196            .duration_since(std::time::UNIX_EPOCH)
197            .unwrap_or_default()
198            .as_secs();
199
200        Self {
201            token,
202            issued_by: node_id.to_string(),
203            issued_at,
204            signature: None,
205        }
206    }
207
208    /// Check if this token supersedes another
209    pub fn supersedes(&self, other: &FencingToken) -> bool {
210        self.token > other.token
211    }
212}
213
214/// Serialize/deserialize helper for [u8; 32]
215mod hex_bytes {
216    use serde::{Deserialize, Deserializer, Serializer};
217
218    pub fn serialize<S>(bytes: &[u8; 32], serializer: S) -> Result<S::Ok, S::Error>
219    where
220        S: Serializer,
221    {
222        serializer.serialize_str(&hex::encode(bytes))
223    }
224
225    pub fn deserialize<'de, D>(deserializer: D) -> Result<[u8; 32], D::Error>
226    where
227        D: Deserializer<'de>,
228    {
229        let s: String = Deserialize::deserialize(deserializer)?;
230        let s = s.strip_prefix("0x").unwrap_or(&s);
231        let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
232        if bytes.len() != 32 {
233            return Err(serde::de::Error::custom(format!(
234                "expected 32 bytes, got {}",
235                bytes.len()
236            )));
237        }
238        let mut arr = [0u8; 32];
239        arr.copy_from_slice(&bytes);
240        Ok(arr)
241    }
242}
243
244/// Serialize/deserialize helper for Option<[u8; 32]>
245mod option_hex_bytes {
246    use serde::{Deserialize, Deserializer, Serializer};
247
248    pub fn serialize<S>(bytes: &Option<[u8; 32]>, serializer: S) -> Result<S::Ok, S::Error>
249    where
250        S: Serializer,
251    {
252        match bytes {
253            Some(b) => serializer.serialize_some(&hex::encode(b)),
254            None => serializer.serialize_none(),
255        }
256    }
257
258    pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<[u8; 32]>, D::Error>
259    where
260        D: Deserializer<'de>,
261    {
262        let opt: Option<String> = Deserialize::deserialize(deserializer)?;
263        match opt {
264            Some(s) => {
265                let s = s.strip_prefix("0x").unwrap_or(&s);
266                let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
267                if bytes.len() != 32 {
268                    return Err(serde::de::Error::custom(format!(
269                        "expected 32 bytes, got {}",
270                        bytes.len()
271                    )));
272                }
273                let mut arr = [0u8; 32];
274                arr.copy_from_slice(&bytes);
275                Ok(Some(arr))
276            }
277            None => Ok(None),
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_heartbeat_creation() {
288        let hb = Heartbeat::new(100, [1u8; 32], 5);
289        assert_eq!(hb.sequence, 100);
290        assert_eq!(hb.fencing_token, 5);
291        assert!(hb.timestamp_ms > 0);
292    }
293
294    #[test]
295    fn test_fencing_token_supersedes() {
296        let token1 = FencingToken::new(1, "node-a");
297        let token2 = FencingToken::new(2, "node-b");
298
299        assert!(token2.supersedes(&token1));
300        assert!(!token1.supersedes(&token2));
301    }
302
303    #[test]
304    fn test_message_serialization() {
305        let msg = ReplicationMessage::Heartbeat(Heartbeat::new(100, [0u8; 32], 1));
306        let json = serde_json::to_string(&msg).unwrap();
307        let parsed: ReplicationMessage = serde_json::from_str(&json).unwrap();
308
309        if let ReplicationMessage::Heartbeat(hb) = parsed {
310            assert_eq!(hb.sequence, 100);
311        } else {
312            panic!("Expected Heartbeat message");
313        }
314    }
315}