use crate::state::integrity::DecisionRecord;
use serde::{Deserialize, Serialize};
pub const PROTOCOL_VERSION: u32 = 1;
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationMessage {
Hello(HelloMessage),
Heartbeat(Heartbeat),
Decision(DecisionRecord),
SyncRequest(SyncRequest),
SyncResponse(SyncResponse),
Ack(AckMessage),
Error(ErrorMessage),
FencingToken(FencingToken),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HelloMessage {
pub version: u32,
pub node_id: String,
pub role: String,
pub sequence: u64,
#[serde(with = "hex_bytes")]
pub state_hash: [u8; 32],
#[serde(with = "option_hex_bytes")]
pub genesis_root: Option<[u8; 32]>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Heartbeat {
pub sequence: u64,
#[serde(with = "hex_bytes")]
pub state_hash: [u8; 32],
pub timestamp_ms: u64,
pub fencing_token: u64,
}
impl Heartbeat {
pub fn new(sequence: u64, state_hash: [u8; 32], fencing_token: u64) -> Self {
let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
sequence,
state_hash,
timestamp_ms,
fencing_token,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncRequest {
pub from_sequence: u64,
pub max_records: u32,
#[serde(with = "hex_bytes")]
pub expected_hash: [u8; 32],
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResponse {
pub records: Vec<DecisionRecord>,
pub has_more: bool,
pub current_sequence: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AckMessage {
pub sequence: u64,
#[serde(with = "hex_bytes")]
pub state_hash: [u8; 32],
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorMessage {
pub code: ErrorCode,
pub description: String,
pub sequence: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ErrorCode {
VersionMismatch,
HashMismatch,
SequenceGap,
InvalidFencingToken,
GenesisRootMismatch,
NotAuthorized,
Internal,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FencingToken {
pub token: u64,
pub issued_by: String,
pub issued_at: u64,
pub signature: Option<Vec<u8>>,
}
impl FencingToken {
pub fn new(token: u64, node_id: &str) -> Self {
let issued_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
token,
issued_by: node_id.to_string(),
issued_at,
signature: None,
}
}
pub fn supersedes(&self, other: &FencingToken) -> bool {
self.token > other.token
}
}
mod hex_bytes {
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(bytes: &[u8; 32], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&hex::encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<[u8; 32], D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let s = s.strip_prefix("0x").unwrap_or(&s);
let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
if bytes.len() != 32 {
return Err(serde::de::Error::custom(format!(
"expected 32 bytes, got {}",
bytes.len()
)));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Ok(arr)
}
}
mod option_hex_bytes {
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(bytes: &Option<[u8; 32]>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match bytes {
Some(b) => serializer.serialize_some(&hex::encode(b)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<[u8; 32]>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<String> = Deserialize::deserialize(deserializer)?;
match opt {
Some(s) => {
let s = s.strip_prefix("0x").unwrap_or(&s);
let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
if bytes.len() != 32 {
return Err(serde::de::Error::custom(format!(
"expected 32 bytes, got {}",
bytes.len()
)));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Ok(Some(arr))
}
None => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_heartbeat_creation() {
let hb = Heartbeat::new(100, [1u8; 32], 5);
assert_eq!(hb.sequence, 100);
assert_eq!(hb.fencing_token, 5);
assert!(hb.timestamp_ms > 0);
}
#[test]
fn test_fencing_token_supersedes() {
let token1 = FencingToken::new(1, "node-a");
let token2 = FencingToken::new(2, "node-b");
assert!(token2.supersedes(&token1));
assert!(!token1.supersedes(&token2));
}
#[test]
fn test_message_serialization() {
let msg = ReplicationMessage::Heartbeat(Heartbeat::new(100, [0u8; 32], 1));
let json = serde_json::to_string(&msg).unwrap();
let parsed: ReplicationMessage = serde_json::from_str(&json).unwrap();
if let ReplicationMessage::Heartbeat(hb) = parsed {
assert_eq!(hb.sequence, 100);
} else {
panic!("Expected Heartbeat message");
}
}
}