use serde::{Deserialize, Serialize};
use crate::ant_protocol::XorName;
pub use super::config::MAX_REPLICATION_MESSAGE_SIZE;
pub const ABSENT_KEY_DIGEST: [u8; 32] = [0u8; 32];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMessage {
pub request_id: u64,
pub body: ReplicationMessageBody,
}
impl ReplicationMessage {
pub fn encode(&self) -> Result<Vec<u8>, ReplicationProtocolError> {
let bytes = postcard::to_stdvec(self)
.map_err(|e| ReplicationProtocolError::SerializationFailed(e.to_string()))?;
if bytes.len() > MAX_REPLICATION_MESSAGE_SIZE {
return Err(ReplicationProtocolError::MessageTooLarge {
size: bytes.len(),
max_size: MAX_REPLICATION_MESSAGE_SIZE,
});
}
Ok(bytes)
}
pub fn decode(data: &[u8]) -> Result<Self, ReplicationProtocolError> {
if data.len() > MAX_REPLICATION_MESSAGE_SIZE {
return Err(ReplicationProtocolError::MessageTooLarge {
size: data.len(),
max_size: MAX_REPLICATION_MESSAGE_SIZE,
});
}
postcard::from_bytes(data)
.map_err(|e| ReplicationProtocolError::DeserializationFailed(e.to_string()))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationMessageBody {
FreshReplicationOffer(FreshReplicationOffer),
FreshReplicationResponse(FreshReplicationResponse),
PaidNotify(PaidNotify),
NeighborSyncRequest(NeighborSyncRequest),
NeighborSyncResponse(NeighborSyncResponse),
VerificationRequest(VerificationRequest),
VerificationResponse(VerificationResponse),
FetchRequest(FetchRequest),
FetchResponse(FetchResponse),
AuditChallenge(AuditChallenge),
AuditResponse(AuditResponse),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FreshReplicationOffer {
pub key: XorName,
pub data: Vec<u8>,
pub proof_of_payment: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FreshReplicationResponse {
Accepted {
key: XorName,
},
Rejected {
key: XorName,
reason: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaidNotify {
pub key: XorName,
pub proof_of_payment: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NeighborSyncRequest {
pub replica_hints: Vec<XorName>,
pub paid_hints: Vec<XorName>,
pub bootstrapping: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NeighborSyncResponse {
pub replica_hints: Vec<XorName>,
pub paid_hints: Vec<XorName>,
pub bootstrapping: bool,
pub rejected_keys: Vec<XorName>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationRequest {
pub keys: Vec<XorName>,
pub paid_list_check_indices: Vec<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeyVerificationResult {
pub key: XorName,
pub present: bool,
pub paid: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationResponse {
pub results: Vec<KeyVerificationResult>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FetchRequest {
pub key: XorName,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FetchResponse {
Success {
key: XorName,
data: Vec<u8>,
},
NotFound {
key: XorName,
},
Error {
key: XorName,
reason: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditChallenge {
pub challenge_id: u64,
pub nonce: [u8; 32],
pub challenged_peer_id: [u8; 32],
pub keys: Vec<XorName>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuditResponse {
Digests {
challenge_id: u64,
digests: Vec<[u8; 32]>,
},
Bootstrapping {
challenge_id: u64,
},
Rejected {
challenge_id: u64,
reason: String,
},
}
#[must_use]
pub fn compute_audit_digest(
nonce: &[u8; 32],
challenged_peer_id: &[u8; 32],
key: &XorName,
record_bytes: &[u8],
) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(nonce);
hasher.update(challenged_peer_id);
hasher.update(key);
hasher.update(record_bytes);
*hasher.finalize().as_bytes()
}
#[derive(Debug, Clone)]
pub enum ReplicationProtocolError {
SerializationFailed(String),
DeserializationFailed(String),
MessageTooLarge {
size: usize,
max_size: usize,
},
}
impl std::fmt::Display for ReplicationProtocolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SerializationFailed(msg) => {
write!(f, "replication serialization failed: {msg}")
}
Self::DeserializationFailed(msg) => {
write!(f, "replication deserialization failed: {msg}")
}
Self::MessageTooLarge { size, max_size } => {
write!(
f,
"replication message size {size} exceeds maximum {max_size}"
)
}
}
}
}
impl std::error::Error for ReplicationProtocolError {}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn fresh_replication_offer_roundtrip() {
let msg = ReplicationMessage {
request_id: 1,
body: ReplicationMessageBody::FreshReplicationOffer(FreshReplicationOffer {
key: [0xAA; 32],
data: vec![1, 2, 3, 4, 5],
proof_of_payment: vec![10, 20, 30],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 1);
if let ReplicationMessageBody::FreshReplicationOffer(offer) = decoded.body {
assert_eq!(offer.key, [0xAA; 32]);
assert_eq!(offer.data, vec![1, 2, 3, 4, 5]);
assert_eq!(offer.proof_of_payment, vec![10, 20, 30]);
} else {
panic!("expected FreshReplicationOffer");
}
}
#[test]
fn fresh_replication_response_accepted_roundtrip() {
let msg = ReplicationMessage {
request_id: 2,
body: ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Accepted { key: [0xBB; 32] },
),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 2);
if let ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Accepted { key },
) = decoded.body
{
assert_eq!(key, [0xBB; 32]);
} else {
panic!("expected FreshReplicationResponse::Accepted");
}
}
#[test]
fn fresh_replication_response_rejected_roundtrip() {
let msg = ReplicationMessage {
request_id: 3,
body: ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Rejected {
key: [0xCC; 32],
reason: "out of range".to_string(),
},
),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 3);
if let ReplicationMessageBody::FreshReplicationResponse(
FreshReplicationResponse::Rejected { key, reason },
) = decoded.body
{
assert_eq!(key, [0xCC; 32]);
assert_eq!(reason, "out of range");
} else {
panic!("expected FreshReplicationResponse::Rejected");
}
}
#[test]
fn paid_notify_roundtrip() {
let msg = ReplicationMessage {
request_id: 4,
body: ReplicationMessageBody::PaidNotify(PaidNotify {
key: [0xDD; 32],
proof_of_payment: vec![99, 100],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 4);
if let ReplicationMessageBody::PaidNotify(notify) = decoded.body {
assert_eq!(notify.key, [0xDD; 32]);
assert_eq!(notify.proof_of_payment, vec![99, 100]);
} else {
panic!("expected PaidNotify");
}
}
#[test]
fn neighbor_sync_request_roundtrip() {
let msg = ReplicationMessage {
request_id: 5,
body: ReplicationMessageBody::NeighborSyncRequest(NeighborSyncRequest {
replica_hints: vec![[0x01; 32], [0x02; 32]],
paid_hints: vec![[0x03; 32]],
bootstrapping: true,
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 5);
if let ReplicationMessageBody::NeighborSyncRequest(req) = decoded.body {
assert_eq!(req.replica_hints.len(), 2);
assert_eq!(req.paid_hints.len(), 1);
assert!(req.bootstrapping);
} else {
panic!("expected NeighborSyncRequest");
}
}
#[test]
fn neighbor_sync_response_roundtrip() {
let msg = ReplicationMessage {
request_id: 6,
body: ReplicationMessageBody::NeighborSyncResponse(NeighborSyncResponse {
replica_hints: vec![[0x04; 32]],
paid_hints: vec![],
bootstrapping: false,
rejected_keys: vec![[0x05; 32], [0x06; 32]],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 6);
if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
assert_eq!(resp.replica_hints.len(), 1);
assert!(resp.paid_hints.is_empty());
assert!(!resp.bootstrapping);
assert_eq!(resp.rejected_keys.len(), 2);
} else {
panic!("expected NeighborSyncResponse");
}
}
#[test]
fn verification_request_roundtrip() {
let msg = ReplicationMessage {
request_id: 7,
body: ReplicationMessageBody::VerificationRequest(VerificationRequest {
keys: vec![[0x10; 32], [0x20; 32], [0x30; 32]],
paid_list_check_indices: vec![0, 2],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 7);
if let ReplicationMessageBody::VerificationRequest(req) = decoded.body {
assert_eq!(req.keys.len(), 3);
assert_eq!(req.paid_list_check_indices, vec![0, 2]);
} else {
panic!("expected VerificationRequest");
}
}
#[test]
fn verification_response_roundtrip() {
let results = vec![
KeyVerificationResult {
key: [0x10; 32],
present: true,
paid: Some(true),
},
KeyVerificationResult {
key: [0x20; 32],
present: false,
paid: None,
},
KeyVerificationResult {
key: [0x30; 32],
present: true,
paid: Some(false),
},
];
let msg = ReplicationMessage {
request_id: 8,
body: ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 8);
if let ReplicationMessageBody::VerificationResponse(resp) = decoded.body {
assert_eq!(resp.results.len(), 3);
assert!(resp.results[0].present);
assert_eq!(resp.results[0].paid, Some(true));
assert!(!resp.results[1].present);
assert_eq!(resp.results[1].paid, None);
assert!(resp.results[2].present);
assert_eq!(resp.results[2].paid, Some(false));
} else {
panic!("expected VerificationResponse");
}
}
#[test]
fn fetch_request_roundtrip() {
let msg = ReplicationMessage {
request_id: 9,
body: ReplicationMessageBody::FetchRequest(FetchRequest { key: [0x40; 32] }),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 9);
if let ReplicationMessageBody::FetchRequest(req) = decoded.body {
assert_eq!(req.key, [0x40; 32]);
} else {
panic!("expected FetchRequest");
}
}
#[test]
fn fetch_response_success_roundtrip() {
let msg = ReplicationMessage {
request_id: 10,
body: ReplicationMessageBody::FetchResponse(FetchResponse::Success {
key: [0x50; 32],
data: vec![7, 8, 9],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 10);
if let ReplicationMessageBody::FetchResponse(FetchResponse::Success { key, data }) =
decoded.body
{
assert_eq!(key, [0x50; 32]);
assert_eq!(data, vec![7, 8, 9]);
} else {
panic!("expected FetchResponse::Success");
}
}
#[test]
fn fetch_response_not_found_roundtrip() {
let msg = ReplicationMessage {
request_id: 11,
body: ReplicationMessageBody::FetchResponse(FetchResponse::NotFound {
key: [0x60; 32],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 11);
if let ReplicationMessageBody::FetchResponse(FetchResponse::NotFound { key }) = decoded.body
{
assert_eq!(key, [0x60; 32]);
} else {
panic!("expected FetchResponse::NotFound");
}
}
#[test]
fn fetch_response_error_roundtrip() {
let msg = ReplicationMessage {
request_id: 12,
body: ReplicationMessageBody::FetchResponse(FetchResponse::Error {
key: [0x70; 32],
reason: "disk full".to_string(),
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 12);
if let ReplicationMessageBody::FetchResponse(FetchResponse::Error { key, reason }) =
decoded.body
{
assert_eq!(key, [0x70; 32]);
assert_eq!(reason, "disk full");
} else {
panic!("expected FetchResponse::Error");
}
}
#[test]
fn audit_challenge_roundtrip() {
let msg = ReplicationMessage {
request_id: 13,
body: ReplicationMessageBody::AuditChallenge(AuditChallenge {
challenge_id: 999,
nonce: [0xAB; 32],
challenged_peer_id: [0xCD; 32],
keys: vec![[0x01; 32], [0x02; 32]],
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 13);
if let ReplicationMessageBody::AuditChallenge(challenge) = decoded.body {
assert_eq!(challenge.challenge_id, 999);
assert_eq!(challenge.nonce, [0xAB; 32]);
assert_eq!(challenge.challenged_peer_id, [0xCD; 32]);
assert_eq!(challenge.keys.len(), 2);
} else {
panic!("expected AuditChallenge");
}
}
#[test]
fn audit_response_digests_roundtrip() {
let digests = vec![[0x11; 32], ABSENT_KEY_DIGEST];
let msg = ReplicationMessage {
request_id: 14,
body: ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
challenge_id: 999,
digests: digests.clone(),
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 14);
if let ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
challenge_id,
digests: decoded_digests,
}) = decoded.body
{
assert_eq!(challenge_id, 999);
assert_eq!(decoded_digests, digests);
} else {
panic!("expected AuditResponse::Digests");
}
}
#[test]
fn audit_response_bootstrapping_roundtrip() {
let msg = ReplicationMessage {
request_id: 15,
body: ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
challenge_id: 42,
}),
};
let encoded = msg.encode().expect("encode should succeed");
let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.request_id, 15);
if let ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
challenge_id,
}) = decoded.body
{
assert_eq!(challenge_id, 42);
} else {
panic!("expected AuditResponse::Bootstrapping");
}
}
#[test]
fn decode_rejects_oversized_payload() {
let oversized = vec![0u8; MAX_REPLICATION_MESSAGE_SIZE + 1];
let result = ReplicationMessage::decode(&oversized);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, ReplicationProtocolError::MessageTooLarge { .. }),
"expected MessageTooLarge, got {err:?}"
);
}
#[test]
fn encode_rejects_oversized_message() {
let msg = ReplicationMessage {
request_id: 0,
body: ReplicationMessageBody::FreshReplicationOffer(FreshReplicationOffer {
key: [0; 32],
data: vec![0xFF; MAX_REPLICATION_MESSAGE_SIZE],
proof_of_payment: vec![],
}),
};
let result = msg.encode();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, ReplicationProtocolError::MessageTooLarge { .. }),
"expected MessageTooLarge, got {err:?}"
);
}
#[test]
fn decode_rejects_invalid_data() {
let invalid = vec![0xFF, 0xFF, 0xFF];
let result = ReplicationMessage::decode(&invalid);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, ReplicationProtocolError::DeserializationFailed(_)),
"expected DeserializationFailed, got {err:?}"
);
}
#[test]
fn audit_digest_is_deterministic() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let record_bytes = b"hello world";
let digest_a = compute_audit_digest(&nonce, &peer_id, &key, record_bytes);
let digest_b = compute_audit_digest(&nonce, &peer_id, &key, record_bytes);
assert_eq!(digest_a, digest_b, "same inputs must produce same digest");
}
#[test]
fn audit_digest_differs_with_different_nonce() {
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let record_bytes = b"hello world";
let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, record_bytes);
let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, record_bytes);
assert_ne!(
digest_a, digest_b,
"different nonces must produce different digests"
);
}
#[test]
fn audit_digest_differs_with_different_data() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data-A");
let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data-B");
assert_ne!(
digest_a, digest_b,
"different data must produce different digests"
);
}
#[test]
fn audit_digest_differs_with_different_peer() {
let nonce = [0x01; 32];
let key: XorName = [0x03; 32];
let record_bytes = b"hello";
let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, record_bytes);
let digest_b = compute_audit_digest(&nonce, &[0xFF; 32], &key, record_bytes);
assert_ne!(
digest_a, digest_b,
"different peer IDs must produce different digests"
);
}
#[test]
fn audit_digest_differs_with_different_key() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let record_bytes = b"hello";
let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], record_bytes);
let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFF; 32], record_bytes);
assert_ne!(
digest_a, digest_b,
"different keys must produce different digests"
);
}
#[test]
fn absent_key_digest_is_all_zeros() {
assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32]);
}
#[test]
fn real_digest_differs_from_absent_sentinel() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let record_bytes = b"non-empty data";
let digest = compute_audit_digest(&nonce, &peer_id, &key, record_bytes);
assert_ne!(
digest, ABSENT_KEY_DIGEST,
"a real digest should not collide with the all-zeros sentinel"
);
}
#[test]
fn error_display_serialization_failed() {
let err = ReplicationProtocolError::SerializationFailed("boom".to_string());
assert_eq!(err.to_string(), "replication serialization failed: boom");
}
#[test]
fn error_display_deserialization_failed() {
let err = ReplicationProtocolError::DeserializationFailed("bad data".to_string());
assert_eq!(
err.to_string(),
"replication deserialization failed: bad data"
);
}
#[test]
fn error_display_message_too_large() {
let err = ReplicationProtocolError::MessageTooLarge {
size: 20_000_000,
max_size: MAX_REPLICATION_MESSAGE_SIZE,
};
let display = err.to_string();
assert!(display.contains("20000000"));
assert!(display.contains(&MAX_REPLICATION_MESSAGE_SIZE.to_string()));
}
}