use crate::{
authenticated::data::{Data, EncodedData},
Channel, Ingress,
};
use commonware_codec::{
config::RangeCfg, varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
};
use commonware_cryptography::{PublicKey, Signer};
use commonware_runtime::{Buf, BufMut, BufferPool, Clock, IoBufs};
use commonware_utils::SystemTimeExt;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("too many peers: {0}")]
TooManyPeers(usize),
#[error("received self")]
ReceivedSelf,
#[error("invalid signature")]
InvalidSignature,
#[error("synchrony bound violated")]
SynchronyBound,
}
pub const MAX_PAYLOAD_DATA_OVERHEAD: u32 = 1 + 10 + 5;
const DATA_PREFIX: u8 = 0;
const GREETING_PREFIX: u8 = 1;
const BIT_VEC_PREFIX: u8 = 2;
const PEERS_PREFIX: u8 = 3;
type BitMap = commonware_utils::bitmap::BitMap<1>;
#[derive(Clone)]
pub struct PayloadConfig {
pub max_bit_vec: u64,
pub max_peers: usize,
pub max_data_length: usize,
}
#[derive(Clone, Debug)]
pub enum Payload<C: PublicKey> {
Data(Data),
Greeting(Info<C>),
BitVec(BitVec),
Peers(Vec<Info<C>>),
}
impl<C: PublicKey> Payload<C> {
pub(crate) fn encode_data(pool: &BufferPool, channel: Channel, message: IoBufs) -> EncodedData {
EncodedData::new(pool, DATA_PREFIX, channel, message)
}
}
impl<C: PublicKey> EncodeSize for Payload<C> {
fn encode_size(&self) -> usize {
(match self {
Self::Data(data) => data.encode_size(),
Self::Greeting(info) => info.encode_size(),
Self::BitVec(bit_vec) => bit_vec.encode_size(),
Self::Peers(peers) => peers.encode_size(),
}) + 1
}
}
impl<C: PublicKey> Write for Payload<C> {
fn write(&self, buf: &mut impl BufMut) {
match self {
Self::Data(data) => {
DATA_PREFIX.write(buf);
data.write(buf);
}
Self::Greeting(info) => {
GREETING_PREFIX.write(buf);
info.write(buf);
}
Self::BitVec(bit_vec) => {
BIT_VEC_PREFIX.write(buf);
bit_vec.write(buf);
}
Self::Peers(peers) => {
PEERS_PREFIX.write(buf);
peers.write(buf);
}
}
}
}
impl<C: PublicKey> Read for Payload<C> {
type Cfg = PayloadConfig;
fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
let PayloadConfig {
max_bit_vec,
max_peers,
max_data_length,
} = cfg;
let payload_type = <u8>::read(buf)?;
match payload_type {
DATA_PREFIX => {
let data = Data::read_cfg(buf, &(..=*max_data_length).into())?;
Ok(Self::Data(data))
}
GREETING_PREFIX => {
let info = Info::<C>::read(buf)?;
Ok(Self::Greeting(info))
}
BIT_VEC_PREFIX => {
let bit_vec = BitVec::read_cfg(buf, max_bit_vec)?;
Ok(Self::BitVec(bit_vec))
}
PEERS_PREFIX => {
let peers = Vec::<Info<C>>::read_cfg(buf, &(RangeCfg::new(..=*max_peers), ()))?;
Ok(Self::Peers(peers))
}
other => Err(CodecError::InvalidEnum(other)),
}
}
}
#[cfg(feature = "arbitrary")]
impl<C: PublicKey> arbitrary::Arbitrary<'_> for Payload<C>
where
C: for<'a> arbitrary::Arbitrary<'a>,
C::Signature: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
let choice = u.int_in_range(0..=3)?;
match choice {
0 => Ok(Self::Data(u.arbitrary()?)),
1 => Ok(Self::Greeting(u.arbitrary()?)),
2 => Ok(Self::BitVec(u.arbitrary()?)),
3 => Ok(Self::Peers(u.arbitrary()?)),
_ => unreachable!(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
pub struct BitVec {
pub index: u64,
pub bits: BitMap,
}
impl EncodeSize for BitVec {
fn encode_size(&self) -> usize {
UInt(self.index).encode_size() + self.bits.encode_size()
}
}
impl Write for BitVec {
fn write(&self, buf: &mut impl BufMut) {
UInt(self.index).write(buf);
self.bits.write(buf);
}
}
impl Read for BitVec {
type Cfg = u64;
fn read_cfg(buf: &mut impl Buf, max_bits: &u64) -> Result<Self, CodecError> {
let index = UInt::read(buf)?.into();
let bits = BitMap::read_cfg(buf, max_bits)?;
Ok(Self { index, bits })
}
}
#[derive(Clone, Debug)]
pub struct Info<C: PublicKey> {
pub ingress: Ingress,
pub timestamp: u64,
pub public_key: C,
pub signature: C::Signature,
}
impl<C: PublicKey> Info<C> {
pub fn verify(&self, namespace: &[u8]) -> bool {
self.public_key.verify(
namespace,
&(self.ingress.clone(), self.timestamp).encode(),
&self.signature,
)
}
pub const fn verifier(
me: C,
peer_gossip_max_count: usize,
synchrony_bound: Duration,
ip_namespace: Vec<u8>,
) -> InfoVerifier<C> {
InfoVerifier::new(me, peer_gossip_max_count, synchrony_bound, ip_namespace)
}
pub fn sign<Sk: Signer<PublicKey = C, Signature = C::Signature>>(
signer: &Sk,
namespace: &[u8],
ingress: impl Into<Ingress>,
timestamp: u64,
) -> Self {
let ingress = ingress.into();
let signature = signer.sign(namespace, &(ingress.clone(), timestamp).encode());
Self {
ingress,
timestamp,
public_key: signer.public_key(),
signature,
}
}
}
impl<C: PublicKey> EncodeSize for Info<C> {
fn encode_size(&self) -> usize {
self.ingress.encode_size()
+ UInt(self.timestamp).encode_size()
+ self.public_key.encode_size()
+ self.signature.encode_size()
}
}
impl<C: PublicKey> Write for Info<C> {
fn write(&self, buf: &mut impl BufMut) {
self.ingress.write(buf);
UInt(self.timestamp).write(buf);
self.public_key.write(buf);
self.signature.write(buf);
}
}
impl<C: PublicKey> Read for Info<C> {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _cfg: &Self::Cfg) -> Result<Self, CodecError> {
let ingress = Ingress::read(buf)?;
let timestamp = UInt::read(buf)?.into();
let public_key = C::read(buf)?;
let signature = C::Signature::read(buf)?;
Ok(Self {
ingress,
timestamp,
public_key,
signature,
})
}
}
#[cfg(feature = "arbitrary")]
impl<C: PublicKey> arbitrary::Arbitrary<'_> for Info<C>
where
C: for<'a> arbitrary::Arbitrary<'a>,
C::Signature: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
let ingress = u.arbitrary()?;
let timestamp = u.arbitrary()?;
let public_key = u.arbitrary()?;
let signature = u.arbitrary()?;
Ok(Self {
ingress,
timestamp,
public_key,
signature,
})
}
}
#[derive(Clone)]
pub struct InfoVerifier<C: PublicKey> {
me: C,
peer_gossip_max_count: usize,
synchrony_bound: Duration,
ip_namespace: Vec<u8>,
}
impl<C: PublicKey> InfoVerifier<C> {
const fn new(
me: C,
peer_gossip_max_count: usize,
synchrony_bound: Duration,
ip_namespace: Vec<u8>,
) -> Self {
Self {
me,
peer_gossip_max_count,
synchrony_bound,
ip_namespace,
}
}
pub fn validate(&self, clock: &impl Clock, infos: &[Info<C>]) -> Result<(), Error> {
if infos.len() > self.peer_gossip_max_count {
return Err(Error::TooManyPeers(infos.len()));
}
for info in infos {
if info.public_key == self.me {
return Err(Error::ReceivedSelf);
}
if Duration::from_millis(info.timestamp)
> clock.current().epoch().saturating_add(self.synchrony_bound)
{
return Err(Error::SynchronyBound);
}
if !info.verify(self.ip_namespace.as_ref()) {
return Err(Error::InvalidSignature);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_codec::{Decode, DecodeExt};
use commonware_cryptography::secp256r1::standard::{PrivateKey, PublicKey};
use commonware_math::algebra::Random;
use commonware_runtime::{deterministic, Clock, IoBuf, Runner};
use commonware_utils::{hostname, test_rng};
use std::{net::SocketAddr, time::Duration};
const NAMESPACE: &[u8] = b"test";
fn signed_peer_info(rng: &mut impl rand_core::CryptoRngCore) -> Info<PublicKey> {
let c = PrivateKey::random(rng);
Info {
ingress: Ingress::Socket(SocketAddr::from(([127, 0, 0, 1], 8080))),
timestamp: 1234567890,
public_key: c.public_key(),
signature: c.sign(NAMESPACE, &[1, 2, 3, 4, 5]),
}
}
#[test]
fn test_bit_vec_codec() {
let original = BitVec {
index: 1234,
bits: BitMap::ones(71),
};
let decoded = BitVec::decode_cfg(original.encode(), &71).unwrap();
assert_eq!(original, decoded);
}
#[test]
fn test_signed_peer_info_codec() {
let mut rng = test_rng();
let original = vec![
signed_peer_info(&mut rng),
signed_peer_info(&mut rng),
signed_peer_info(&mut rng),
];
let encoded = original.encode();
let decoded =
Vec::<Info<PublicKey>>::decode_cfg(encoded, &(RangeCfg::new(3..=3), ())).unwrap();
for (original, decoded) in original.iter().zip(decoded.iter()) {
assert_eq!(original.ingress, decoded.ingress);
assert_eq!(original.timestamp, decoded.timestamp);
assert_eq!(original.public_key, decoded.public_key);
assert_eq!(original.signature, decoded.signature);
}
let too_short =
Vec::<Info<PublicKey>>::decode_cfg(original.encode(), &(RangeCfg::new(..3), ()));
assert!(matches!(too_short, Err(CodecError::InvalidLength(3))));
let too_long =
Vec::<Info<PublicKey>>::decode_cfg(original.encode(), &(RangeCfg::new(4..), ()));
assert!(matches!(too_long, Err(CodecError::InvalidLength(3))));
}
#[test]
fn test_payload_codec() {
let mut rng = test_rng();
let cfg = PayloadConfig {
max_bit_vec: 1024,
max_peers: 10,
max_data_length: 100,
};
let original = signed_peer_info(&mut rng);
let encoded = Payload::Greeting(original.clone()).encode();
let decoded = match Payload::<PublicKey>::decode_cfg(encoded, &cfg) {
Ok(Payload::<PublicKey>::Greeting(info)) => info,
_ => panic!("Expected Greeting payload"),
};
assert_eq!(original.ingress, decoded.ingress);
assert_eq!(original.timestamp, decoded.timestamp);
assert_eq!(original.public_key, decoded.public_key);
assert_eq!(original.signature, decoded.signature);
let original = BitVec {
index: 1234,
bits: BitMap::ones(100),
};
let encoded = Payload::<PublicKey>::BitVec(original.clone()).encode();
let decoded = match Payload::<PublicKey>::decode_cfg(encoded, &cfg) {
Ok(Payload::<PublicKey>::BitVec(b)) => b,
_ => panic!(),
};
assert_eq!(original, decoded);
let original = vec![signed_peer_info(&mut rng), signed_peer_info(&mut rng)];
let encoded = Payload::Peers(original.clone()).encode();
let decoded = match Payload::<PublicKey>::decode_cfg(encoded, &cfg) {
Ok(Payload::<PublicKey>::Peers(p)) => p,
_ => panic!(),
};
for (a, b) in original.iter().zip(decoded.iter()) {
assert_eq!(a.ingress, b.ingress);
assert_eq!(a.timestamp, b.timestamp);
assert_eq!(a.public_key, b.public_key);
assert_eq!(a.signature, b.signature);
}
let original = Data {
channel: 12345,
message: IoBuf::from(b"Hello, world!"),
};
let encoded = Payload::<PublicKey>::Data(original.clone()).encode();
let decoded = match Payload::<PublicKey>::decode_cfg(encoded, &cfg) {
Ok(Payload::<PublicKey>::Data(d)) => d,
_ => panic!(),
};
assert_eq!(original, decoded);
}
#[test]
fn test_payload_decode_invalid_type() {
let cfg = PayloadConfig {
max_bit_vec: 1024,
max_peers: 10,
max_data_length: 100,
};
let invalid_payload = [4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let result = Payload::<PublicKey>::decode_cfg(&invalid_payload[..], &cfg);
assert!(result.is_err());
}
#[test]
fn test_payload_bitvec_respects_limit() {
let cfg = PayloadConfig {
max_bit_vec: 8,
max_peers: 10,
max_data_length: 32,
};
let encoded = Payload::<PublicKey>::BitVec(BitVec {
index: 5,
bits: BitMap::ones(9),
})
.encode();
let err = Payload::<PublicKey>::decode_cfg(encoded, &cfg).unwrap_err();
assert!(matches!(err, CodecError::InvalidLength(9)));
}
#[test]
fn test_payload_peers_respects_limit() {
let mut rng = test_rng();
let cfg = PayloadConfig {
max_bit_vec: 1024,
max_peers: 1,
max_data_length: 32,
};
let peers = vec![signed_peer_info(&mut rng), signed_peer_info(&mut rng)];
let encoded = Payload::Peers(peers).encode();
let err = Payload::<PublicKey>::decode_cfg(encoded, &cfg).unwrap_err();
assert!(matches!(err, CodecError::InvalidLength(2)));
}
#[test]
fn test_payload_data_respects_limit() {
let cfg = PayloadConfig {
max_bit_vec: 1024,
max_peers: 10,
max_data_length: 4,
};
let encoded = Payload::<PublicKey>::Data(Data {
channel: 1,
message: IoBuf::from(b"hello"),
})
.encode();
let err = Payload::<PublicKey>::decode_cfg(encoded, &cfg).unwrap_err();
assert!(matches!(err, CodecError::InvalidLength(5)));
}
#[test]
fn test_max_payload_data_overhead() {
let message = IoBuf::from(vec![0; 1 << 29]);
let message_len = message.len();
let payload = Payload::<PublicKey>::Data(Data {
channel: u64::MAX,
message,
});
assert_eq!(
payload.encode_size(),
message_len + MAX_PAYLOAD_DATA_OVERHEAD as usize
);
}
#[test]
fn info_verifier_accepts_valid_peer() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let peer_key = PrivateKey::random(&mut context);
let validator = Info::verifier(
validator_key.public_key(),
4,
Duration::from_secs(30),
NAMESPACE.to_vec(),
);
let timestamp = context.current().epoch().as_millis() as u64;
let peer = Info::sign(
&peer_key,
NAMESPACE,
SocketAddr::from(([8, 8, 8, 8], 8080)),
timestamp,
);
assert!(validator.validate(&context, &[peer]).is_ok());
});
}
#[test]
fn info_verifier_rejects_too_many_peers() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let synchrony_bound = Duration::from_secs(30);
let timestamp = context.current().epoch().as_millis() as u64;
let peers = {
let addr_a = SocketAddr::from(([8, 8, 8, 8], 9000));
let addr_b = SocketAddr::from(([8, 8, 4, 4], 9001));
let peer_a = Info::sign(
&PrivateKey::random(&mut context),
NAMESPACE,
addr_a,
timestamp,
);
let peer_b = Info::sign(
&PrivateKey::random(&mut context),
NAMESPACE,
addr_b,
timestamp,
);
vec![peer_a, peer_b]
};
let validator = Info::verifier(
validator_key.public_key(),
1,
synchrony_bound,
NAMESPACE.to_vec(),
);
let err = validator.validate(&context, &peers).unwrap_err();
assert!(matches!(err, Error::TooManyPeers(count) if count == 2));
});
}
#[test]
fn info_verifier_rejects_self() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let validator = Info::verifier(
validator_key.public_key(),
4,
Duration::from_secs(30),
NAMESPACE.to_vec(),
);
let timestamp = context.current().epoch().as_millis() as u64;
let peer = Info::sign(
&validator_key,
NAMESPACE,
SocketAddr::from(([203, 0, 113, 1], 8080)),
timestamp,
);
let err = validator.validate(&context, &[peer]).unwrap_err();
assert!(matches!(err, Error::ReceivedSelf));
});
}
#[test]
fn info_verifier_rejects_future_timestamp() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let peer_key = PrivateKey::random(&mut context);
let synchrony_bound = Duration::from_secs(30);
let validator = Info::verifier(
validator_key.public_key(),
4,
synchrony_bound,
NAMESPACE.to_vec(),
);
let future_timestamp =
(context.current().epoch() + synchrony_bound + Duration::from_secs(1)).as_millis()
as u64;
let peer = Info::sign(
&peer_key,
NAMESPACE,
SocketAddr::from(([198, 51, 100, 1], 8080)),
future_timestamp,
);
let err = validator.validate(&context, &[peer]).unwrap_err();
assert!(matches!(err, Error::SynchronyBound));
});
}
#[test]
fn info_verifier_allows_past_timestamp() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let peer_key = PrivateKey::random(&mut context);
let synchrony_bound = Duration::from_secs(30);
let validator = Info::verifier(
validator_key.public_key(),
4,
synchrony_bound,
NAMESPACE.to_vec(),
);
context.sleep(synchrony_bound * 2).await;
let past_timestamp =
(context.current().epoch() - synchrony_bound - Duration::from_secs(1)).as_millis()
as u64;
let peer = Info::sign(
&peer_key,
NAMESPACE,
SocketAddr::from(([198, 51, 100, 1], 8080)),
past_timestamp,
);
assert!(validator.validate(&context, &[peer]).is_ok());
});
}
#[test]
fn info_verifier_rejects_invalid_signature() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let peer_key = PrivateKey::random(&mut context);
let validator = Info::verifier(
validator_key.public_key(),
4,
Duration::from_secs(30),
NAMESPACE.to_vec(),
);
let timestamp = context.current().epoch().as_millis() as u64;
let peer = Info::sign(
&peer_key,
b"wrong-namespace",
SocketAddr::from(([8, 8, 4, 4], 8080)),
timestamp,
);
let err = validator.validate(&context, &[peer]).unwrap_err();
assert!(matches!(err, Error::InvalidSignature));
});
}
#[test]
fn info_with_dns_ingress_sign_and_verify() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let peer_key = PrivateKey::random(&mut context);
let timestamp = context.current().epoch().as_millis() as u64;
let dns_ingress = Ingress::Dns {
host: hostname!("node.example.com"),
port: 8080,
};
let peer = Info::sign(&peer_key, NAMESPACE, dns_ingress.clone(), timestamp);
assert_eq!(peer.ingress, dns_ingress);
assert_eq!(peer.timestamp, timestamp);
assert_eq!(peer.public_key, peer_key.public_key());
assert!(peer.verify(NAMESPACE));
});
}
#[test]
fn info_with_dns_ingress_codec() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let peer_key = PrivateKey::random(&mut context);
let timestamp = context.current().epoch().as_millis() as u64;
let dns_ingress = Ingress::Dns {
host: hostname!("validator-1.network.io"),
port: 9090,
};
let original = Info::sign(&peer_key, NAMESPACE, dns_ingress.clone(), timestamp);
let encoded = original.encode();
let decoded = Info::<PublicKey>::decode(encoded).unwrap();
assert_eq!(decoded.ingress, dns_ingress);
assert_eq!(decoded.timestamp, timestamp);
assert_eq!(decoded.public_key, original.public_key);
assert_eq!(decoded.signature, original.signature);
assert!(decoded.verify(NAMESPACE));
});
}
#[test]
fn info_verifier_accepts_dns_ingress() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let peer_key = PrivateKey::random(&mut context);
let validator = Info::verifier(
validator_key.public_key(),
4,
Duration::from_secs(30),
NAMESPACE.to_vec(),
);
let timestamp = context.current().epoch().as_millis() as u64;
let dns_ingress = Ingress::Dns {
host: hostname!("peer.network.com"),
port: 8080,
};
let peer = Info::sign(&peer_key, NAMESPACE, dns_ingress, timestamp);
assert!(validator.validate(&context, &[peer]).is_ok());
});
}
#[test]
fn info_verifier_accepts_dns_ingress_with_internal_hostname() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let validator_key = PrivateKey::random(&mut context);
let peer_key = PrivateKey::random(&mut context);
let validator = Info::verifier(
validator_key.public_key(),
4,
Duration::from_secs(30),
NAMESPACE.to_vec(),
);
let timestamp = context.current().epoch().as_millis() as u64;
let dns_ingress = Ingress::Dns {
host: hostname!("internal.local"),
port: 8080,
};
let peer = Info::sign(&peer_key, NAMESPACE, dns_ingress, timestamp);
assert!(
validator.validate(&context, &[peer]).is_ok(),
"DNS ingress should be accepted"
);
});
}
#[cfg(feature = "arbitrary")]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
commonware_conformance::conformance_tests! {
CodecConformance<Payload<PublicKey>> => 1024,
CodecConformance<BitVec>,
CodecConformance<Info<PublicKey>> => 1024,
}
}
}