use crate::{
authenticated::{
dialing::{DialStatus, ReserveResult},
discovery::types::Info,
},
Ingress,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::Clock;
use commonware_utils::SystemTimeExt;
use rand::Rng;
use std::time::{Duration, SystemTime};
use tracing::trace;
#[derive(Clone, Debug)]
pub enum Address<C: PublicKey> {
Unknown,
Myself(Info<C>),
Bootstrapper(Ingress),
Discovered(Info<C>, usize),
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Status {
Inert,
Reserved,
Active,
}
#[derive(Clone, Debug)]
pub struct Record<C: PublicKey> {
address: Address<C>,
status: Status,
primary_sets: usize,
secondary_sets: usize,
persistent: bool,
next_reservable_at: SystemTime,
next_dial_at: SystemTime,
}
impl<C: PublicKey> Record<C> {
pub const fn unknown() -> Self {
Self {
address: Address::Unknown,
status: Status::Inert,
primary_sets: 0,
secondary_sets: 0,
persistent: false,
next_reservable_at: SystemTime::UNIX_EPOCH,
next_dial_at: SystemTime::UNIX_EPOCH,
}
}
pub const fn myself(info: Info<C>) -> Self {
Self {
address: Address::Myself(info),
status: Status::Inert,
primary_sets: 0,
secondary_sets: 0,
persistent: true,
next_reservable_at: SystemTime::UNIX_EPOCH,
next_dial_at: SystemTime::UNIX_EPOCH,
}
}
pub fn bootstrapper(ingress: impl Into<Ingress>) -> Self {
Self {
address: Address::Bootstrapper(ingress.into()),
status: Status::Inert,
primary_sets: 0,
secondary_sets: 0,
persistent: true,
next_reservable_at: SystemTime::UNIX_EPOCH,
next_dial_at: SystemTime::UNIX_EPOCH,
}
}
pub fn update(&mut self, info: Info<C>) -> bool {
match &self.address {
Address::Myself(_) => false,
Address::Unknown | Address::Bootstrapper(_) => {
self.address = Address::Discovered(info, 0);
true
}
Address::Discovered(prev, _) => {
let existing_ts = prev.timestamp;
let incoming_ts = info.timestamp;
if existing_ts >= incoming_ts {
let peer = info.public_key;
trace!(
?peer,
?existing_ts,
?incoming_ts,
"peer discovery not updated"
);
return false;
}
self.address = Address::Discovered(info, 0);
true
}
}
}
pub const fn increment_primary(&mut self) {
self.primary_sets = self.primary_sets.checked_add(1).unwrap();
}
pub const fn decrement_primary(&mut self) {
self.primary_sets = self.primary_sets.checked_sub(1).unwrap();
}
pub const fn increment_secondary(&mut self) {
self.secondary_sets = self.secondary_sets.checked_add(1).unwrap();
}
pub const fn decrement_secondary(&mut self) {
self.secondary_sets = self.secondary_sets.checked_sub(1).unwrap();
}
pub const fn is_outbound_target(&self) -> bool {
self.primary_sets > 0 || self.persistent
}
pub fn reserve(
&mut self,
context: &mut (impl Rng + Clock),
interval: Duration,
) -> ReserveResult {
if matches!(self.address, Address::Myself(_)) || !matches!(self.status, Status::Inert) {
return ReserveResult::Unavailable;
}
let now = context.current();
if now < self.next_reservable_at {
return ReserveResult::RateLimited;
}
self.status = Status::Reserved;
self.next_reservable_at = now.saturating_add_ext(interval);
self.next_dial_at = self.next_reservable_at.add_jittered(context, interval / 2);
ReserveResult::Reserved
}
pub fn connect(&mut self) {
assert!(matches!(self.status, Status::Reserved));
self.status = Status::Active;
}
pub fn release(&mut self) {
assert!(self.status != Status::Inert, "Cannot release an Inert peer");
self.status = Status::Inert;
}
pub fn dial_failure(&mut self, ingress: &Ingress) {
if let Address::Discovered(info, fails) = &mut self.address {
if &info.ingress == ingress {
*fails += 1;
}
}
}
pub const fn dial_success(&mut self) {
if let Address::Discovered(_, fails) = &mut self.address {
*fails = 0;
}
}
pub const fn is_blockable(&self) -> bool {
!matches!(self.address, Address::Myself(_))
}
pub const fn secondary_sets(&self) -> usize {
self.secondary_sets
}
pub const fn primary_sets(&self) -> usize {
self.primary_sets
}
pub fn dialable(
&self,
now: SystemTime,
allow_private_ips: bool,
allow_dns: bool,
) -> DialStatus {
if self.status != Status::Inert || !self.is_outbound_target() {
return DialStatus::Unavailable;
}
let ingress = match &self.address {
Address::Bootstrapper(ingress) => ingress,
Address::Discovered(info, _) => &info.ingress,
_ => return DialStatus::Unavailable,
};
if !ingress.is_valid(allow_private_ips, allow_dns) {
return DialStatus::Unavailable;
}
if self.next_dial_at > now {
DialStatus::After(self.next_dial_at)
} else {
DialStatus::Now
}
}
pub fn acceptable(&self) -> bool {
self.eligible() && self.status == Status::Inert
}
pub const fn ingress(&self) -> Option<&Ingress> {
match &self.address {
Address::Unknown => None,
Address::Myself(info) => Some(&info.ingress),
Address::Bootstrapper(ingress) => Some(ingress),
Address::Discovered(info, _) => Some(&info.ingress),
}
}
pub fn sharable(&self) -> Option<Info<C>> {
match &self.address {
Address::Unknown => None,
Address::Myself(info) => Some(info),
Address::Bootstrapper(_) => None,
Address::Discovered(info, _) => (self.status == Status::Active).then_some(info),
}
.cloned()
}
pub fn want(&self, min_fails: usize) -> bool {
match self.address {
Address::Myself(_) => false,
Address::Unknown | Address::Bootstrapper(_) => true,
Address::Discovered(_, fails) => self.status != Status::Active && fails >= min_fails,
}
}
pub const fn deletable(&self) -> bool {
self.primary_sets == 0
&& self.secondary_sets == 0
&& !self.persistent
&& matches!(self.status, Status::Inert)
}
pub const fn eligible(&self) -> bool {
match self.address {
Address::Myself(_) => false,
Address::Bootstrapper(_) | Address::Unknown | Address::Discovered(_, _) => {
self.primary_sets > 0 || self.secondary_sets > 0 || self.persistent
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::authenticated::discovery::types;
use commonware_cryptography::secp256r1::standard::{PrivateKey, PublicKey};
use commonware_runtime::{deterministic, Runner};
use std::net::SocketAddr;
const NAMESPACE: &[u8] = b"test";
fn create_peer_info<S>(
signer_seed: u64,
socket: SocketAddr,
timestamp: u64,
) -> Info<S::PublicKey>
where
S: commonware_cryptography::PrivateKey,
{
let signer = S::from_seed(signer_seed);
types::Info::sign(&signer, NAMESPACE, socket, timestamp)
}
fn test_socket() -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], 8080))
}
fn test_socket2() -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], 8081))
}
fn peer_info_contents_are_equal<S: commonware_cryptography::PublicKey>(
actual: &Info<S>,
expected: &Info<S>,
) -> bool {
actual.ingress == expected.ingress
&& actual.timestamp == expected.timestamp
&& actual.public_key == expected.public_key
&& actual.signature == expected.signature
}
fn compare_optional_peer_info<S: commonware_cryptography::PublicKey>(
actual_opt: Option<&Info<S>>,
expected: &Info<S>,
) -> bool {
actual_opt.is_some_and(|actual| peer_info_contents_are_equal(actual, expected))
}
#[test]
fn test_unknown_initial_state() {
let record = Record::<PublicKey>::unknown();
assert!(matches!(record.address, Address::Unknown));
assert_eq!(record.status, Status::Inert);
assert_eq!(record.primary_sets, 0);
assert!(!record.persistent);
assert!(record.ingress().is_none());
assert!(record.sharable().is_none());
assert_eq!(record.status, Status::Inert);
assert!(record.want(0), "Should want info for unknown peer");
assert!(record.deletable());
assert!(!record.eligible());
}
#[test]
fn test_myself_initial_state() {
let my_info = create_peer_info::<PrivateKey>(0, test_socket(), 100);
let record = Record::<PublicKey>::myself(my_info.clone());
assert!(
matches!(&record.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info))
);
assert_eq!(record.status, Status::Inert);
assert_eq!(record.primary_sets, 0);
assert!(record.persistent);
assert_eq!(record.ingress(), Some(&my_info.ingress));
assert!(compare_optional_peer_info(
record.sharable().as_ref(),
&my_info
));
assert_eq!(record.status, Status::Inert);
assert!(!record.want(0), "Should not want info for myself");
assert!(!record.deletable());
assert!(!record.eligible());
}
#[test]
fn test_bootstrapper_initial_state() {
let socket = test_socket();
let ingress = Ingress::Socket(socket);
let record = Record::<PublicKey>::bootstrapper(socket);
assert!(matches!(&record.address, Address::Bootstrapper(i) if *i == ingress));
assert_eq!(record.status, Status::Inert);
assert_eq!(record.primary_sets, 0);
assert!(record.persistent);
assert_eq!(record.ingress(), Some(&ingress));
assert!(record.sharable().is_none());
assert_eq!(record.status, Status::Inert);
assert!(record.want(0), "Should want info for bootstrapper");
assert!(!record.deletable());
assert!(record.eligible());
}
#[test]
fn test_unknown_to_discovered() {
let socket = test_socket();
let mut record = Record::<PublicKey>::unknown();
let peer_info = create_peer_info::<PrivateKey>(1, socket, 1000);
assert!(record.update(peer_info.clone()));
assert_eq!(record.ingress(), Some(&peer_info.ingress));
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info)),
"Address should be Discovered with 0 failures"
);
assert!(record.sharable().is_none(), "Info not sharable yet");
assert!(!record.persistent);
}
#[test]
fn test_bootstrapper_to_discovered() {
let socket = test_socket();
let mut record = Record::<PublicKey>::bootstrapper(socket);
let peer_info = create_peer_info::<PrivateKey>(2, socket, 1000);
assert!(record.persistent, "Should start as persistent");
assert!(record.update(peer_info.clone()));
assert_eq!(record.ingress(), Some(&peer_info.ingress));
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info)),
"Address should be Discovered with 0 failures"
);
assert!(record.sharable().is_none());
assert!(record.persistent, "Should remain persistent after update");
}
#[test]
fn test_discovered_update_newer_timestamp() {
let socket = test_socket();
let mut record = Record::<PublicKey>::unknown();
let peer_info_old = create_peer_info::<PrivateKey>(3, socket, 1000);
let peer_info_new = create_peer_info::<PrivateKey>(3, socket, 2000);
assert!(record.update(peer_info_old));
assert!(record.update(peer_info_new.clone()));
assert_eq!(record.ingress(), Some(&peer_info_new.ingress));
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_new)),
"Address should contain newer info"
);
}
#[test]
fn test_discovered_no_update_older_or_equal_timestamp() {
let socket = test_socket();
let mut record = Record::<PublicKey>::unknown();
let peer_info_current = create_peer_info::<PrivateKey>(5, socket, 1000);
let peer_info_older = create_peer_info::<PrivateKey>(5, socket, 500);
let peer_info_equal = create_peer_info::<PrivateKey>(5, socket, 1000);
assert!(record.update(peer_info_current.clone()));
assert!(!record.update(peer_info_older));
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_current)),
"Address should not update with older info"
);
assert!(!record.update(peer_info_equal));
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_current)),
"Address should not update with equal timestamp info"
);
}
#[test]
fn test_update_myself() {
let my_info = create_peer_info::<PrivateKey>(0, test_socket(), 100);
let mut record_myself = Record::myself(my_info.clone());
let other_info = create_peer_info::<PrivateKey>(1, test_socket2(), 200);
let newer_my_info = create_peer_info::<PrivateKey>(0, test_socket(), 300);
assert!(!record_myself.update(other_info));
assert!(!record_myself.update(newer_my_info));
assert!(
matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)),
"Myself record should remain unchanged"
);
}
#[test]
fn test_update_with_different_public_key() {
let socket = test_socket();
let mut record = Record::<PublicKey>::unknown();
let peer_info_pk1_ts1000 = create_peer_info::<PrivateKey>(10, socket, 1000);
let peer_info_pk2_ts2000 = create_peer_info::<PrivateKey>(11, socket, 2000);
assert!(record.update(peer_info_pk1_ts1000.clone()));
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_pk1_ts1000))
);
assert!(
record.update(peer_info_pk2_ts2000.clone()),
"Update should succeed based on newer timestamp"
);
assert!(
matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_pk2_ts2000))
);
}
#[test]
fn test_increment_decrement_and_deletable() {
let mut record_unknown = Record::<PublicKey>::unknown();
assert!(record_unknown.deletable());
record_unknown.increment_primary(); assert!(!record_unknown.deletable());
record_unknown.decrement_primary(); assert!(record_unknown.deletable());
let peer_info = create_peer_info::<PrivateKey>(7, test_socket(), 1000);
let mut record_disc = Record::<PublicKey>::unknown();
assert!(record_disc.update(peer_info));
assert!(record_disc.deletable());
record_disc.increment_primary(); assert!(!record_disc.deletable());
record_disc.decrement_primary(); assert!(record_disc.deletable());
let mut record_boot = Record::<PublicKey>::bootstrapper(test_socket());
assert!(!record_boot.deletable()); record_boot.increment_primary(); assert!(!record_boot.deletable());
record_boot.decrement_primary(); assert!(!record_boot.deletable());
let my_info = create_peer_info::<PrivateKey>(0, test_socket(), 100);
let mut record_myself = Record::myself(my_info);
assert!(!record_myself.deletable()); record_myself.increment_primary(); assert!(!record_myself.deletable());
record_myself.decrement_primary(); assert!(!record_myself.deletable()); }
#[test]
#[should_panic]
fn test_decrement_panics_at_zero() {
let mut record = Record::<PublicKey>::unknown();
assert_eq!(record.primary_sets, 0);
record.decrement_primary(); }
#[test]
fn test_is_blockable() {
let my_info = create_peer_info::<PrivateKey>(0, test_socket(), 100);
let record_myself = Record::myself(my_info);
assert!(!record_myself.is_blockable());
let record_boot = Record::<PublicKey>::bootstrapper(test_socket());
assert!(record_boot.is_blockable());
let record_unknown = Record::<PublicKey>::unknown();
assert!(record_unknown.is_blockable());
let peer_info = create_peer_info::<PrivateKey>(1, test_socket(), 1000);
let mut record_disc = Record::<PublicKey>::unknown();
assert!(record_disc.update(peer_info));
assert!(record_disc.is_blockable());
}
#[test]
fn test_status_transitions_reserve_connect_release() {
deterministic::Runner::default().start(|mut context| async move {
let mut record = Record::<PublicKey>::unknown();
assert_eq!(record.status, Status::Inert);
assert_eq!(
record.reserve(&mut context, Duration::ZERO),
ReserveResult::Reserved
);
assert_eq!(record.status, Status::Reserved);
assert_eq!(
record.reserve(&mut context, Duration::ZERO),
ReserveResult::Unavailable,
"Cannot re-reserve when Reserved"
);
assert_eq!(record.status, Status::Reserved);
record.connect();
assert_eq!(record.status, Status::Active);
assert_eq!(
record.reserve(&mut context, Duration::ZERO),
ReserveResult::Unavailable,
"Cannot reserve when Active"
);
assert_eq!(record.status, Status::Active);
record.release();
assert_eq!(record.status, Status::Inert);
assert_eq!(
record.reserve(&mut context, Duration::ZERO),
ReserveResult::Reserved
);
assert_eq!(record.status, Status::Reserved);
record.release();
assert_eq!(record.status, Status::Inert);
});
}
#[test]
#[should_panic]
fn test_connect_when_not_reserved_panics_from_inert() {
let mut record = Record::<PublicKey>::unknown();
record.connect(); }
#[test]
#[should_panic]
fn test_connect_when_active_panics() {
deterministic::Runner::default().start(|mut context| async move {
let mut record = Record::<PublicKey>::unknown();
record.reserve(&mut context, Duration::ZERO);
record.connect();
record.connect(); });
}
#[test]
#[should_panic]
fn test_release_when_inert_panics() {
let mut record = Record::<PublicKey>::unknown();
record.release(); }
#[test]
fn test_sharable_logic() {
deterministic::Runner::default().start(|mut context| async move {
let socket = test_socket();
let peer_info_data = create_peer_info::<PrivateKey>(12, socket, 100);
let record_unknown = Record::<PublicKey>::unknown();
assert!(record_unknown.sharable().is_none());
let record_myself = Record::myself(peer_info_data.clone());
assert!(compare_optional_peer_info(
record_myself.sharable().as_ref(),
&peer_info_data
));
let record_boot = Record::<PublicKey>::bootstrapper(socket);
assert!(record_boot.sharable().is_none());
let mut record_disc = Record::<PublicKey>::unknown();
assert!(record_disc.update(peer_info_data.clone()));
assert!(record_disc.sharable().is_none()); record_disc.reserve(&mut context, Duration::ZERO);
assert!(record_disc.sharable().is_none());
record_disc.connect();
assert!(compare_optional_peer_info(
record_disc.sharable().as_ref(),
&peer_info_data
));
record_disc.release();
assert!(record_disc.sharable().is_none());
});
}
#[test]
fn test_reserved_status_check() {
deterministic::Runner::default().start(|mut context| async move {
let mut record = Record::<PublicKey>::unknown();
assert_eq!(record.status, Status::Inert);
assert_eq!(
record.reserve(&mut context, Duration::ZERO),
ReserveResult::Reserved
);
assert_eq!(record.status, Status::Reserved);
record.connect();
assert_eq!(record.status, Status::Active);
record.release();
assert_eq!(record.status, Status::Inert);
});
}
#[test]
fn test_dial_failure_and_dial_success() {
let socket = test_socket();
let ingress = Ingress::Socket(socket);
let peer_info = create_peer_info::<PrivateKey>(18, socket, 1000);
let mut record = Record::<PublicKey>::unknown();
record.dial_failure(&ingress);
assert!(matches!(record.address, Address::Unknown));
assert!(record.update(peer_info));
assert!(matches!(&record.address, Address::Discovered(_, 0)));
record.dial_failure(&ingress);
assert!(matches!(&record.address, Address::Discovered(_, 1)));
record.dial_failure(&ingress);
assert!(matches!(&record.address, Address::Discovered(_, 2)));
let wrong_ingress = Ingress::Socket(test_socket2());
record.dial_failure(&wrong_ingress);
assert!(
matches!(&record.address, Address::Discovered(_, 2)),
"Failure count should not change for wrong ingress"
);
record.dial_success();
assert!(
matches!(&record.address, Address::Discovered(_, 0)),
"Failures should reset"
);
record.dial_failure(&ingress);
assert!(matches!(&record.address, Address::Discovered(_, 1)));
}
#[test]
fn test_want_logic_with_min_fails() {
deterministic::Runner::default().start(|mut context| async move {
let socket = test_socket();
let ingress = Ingress::Socket(socket);
let peer_info = create_peer_info::<PrivateKey>(13, socket, 100);
let min_fails = 2;
assert!(Record::<PublicKey>::unknown().want(min_fails));
assert!(Record::<PublicKey>::bootstrapper(socket).want(min_fails));
assert!(!Record::myself(peer_info.clone()).want(min_fails));
let mut record_disc = Record::<PublicKey>::unknown();
assert!(record_disc.update(peer_info));
assert!(
!record_disc.want(min_fails),
"Should not want when fails=0 < min_fails"
);
record_disc.dial_failure(&ingress); assert!(
!record_disc.want(min_fails),
"Should not want when fails=1 < min_fails"
);
record_disc.dial_failure(&ingress); assert!(
record_disc.want(min_fails),
"Should want when fails=2 >= min_fails"
);
record_disc.reserve(&mut context, Duration::ZERO);
assert!(
record_disc.want(min_fails),
"Should still want when Reserved and fails >= min_fails"
);
record_disc.connect();
assert!(!record_disc.want(min_fails), "Should not want when Active");
record_disc.release();
assert!(record_disc.want(min_fails));
record_disc.dial_success(); assert!(
!record_disc.want(min_fails),
"Should not want when Inert and fails=0"
);
record_disc.dial_failure(&ingress); assert!(!record_disc.want(min_fails));
record_disc.dial_failure(&ingress); assert!(record_disc.want(min_fails));
});
}
#[test]
fn test_deletable_logic_detailed() {
deterministic::Runner::default().start(|mut context| async move {
let peer_info = create_peer_info::<PrivateKey>(14, test_socket(), 100);
assert!(!Record::myself(peer_info.clone()).deletable());
assert!(!Record::<PublicKey>::bootstrapper(test_socket()).deletable());
let mut record_pers = Record::<PublicKey>::bootstrapper(test_socket());
assert!(record_pers.update(peer_info));
assert!(!record_pers.deletable());
let mut record = Record::<PublicKey>::unknown(); assert_eq!(record.primary_sets, 0);
assert_eq!(record.status, Status::Inert);
assert!(record.deletable());
record.increment_primary(); assert!(!record.deletable());
record.reserve(&mut context, Duration::ZERO); assert!(!record.deletable());
record.connect(); assert!(!record.deletable());
record.release(); assert!(!record.deletable());
record.decrement_primary(); assert!(record.deletable()); });
}
#[test]
fn test_eligible_logic() {
let peer_info = create_peer_info::<PrivateKey>(16, test_socket(), 100);
assert!(!Record::myself(peer_info.clone()).eligible());
assert!(Record::<PublicKey>::bootstrapper(test_socket()).eligible());
let mut record_pers = Record::<PublicKey>::bootstrapper(test_socket());
assert!(record_pers.update(peer_info.clone()));
assert!(record_pers.eligible());
let mut record_unknown = Record::<PublicKey>::unknown();
assert!(!record_unknown.eligible()); record_unknown.increment_primary(); assert!(record_unknown.eligible()); record_unknown.decrement_primary(); assert!(!record_unknown.eligible());
let mut record_disc = Record::<PublicKey>::unknown();
assert!(record_disc.update(peer_info));
assert!(!record_disc.eligible()); record_disc.increment_primary(); assert!(record_disc.eligible()); }
}