use super::{metrics::Metrics, record::Record, Metadata, Reservation};
use crate::{
authenticated::{
dialing::{earliest, DialStatus, Dialable, ReserveResult},
lookup::{actors::tracker::ingress::Releaser, metrics},
},
types::Address,
utils::PeerSetsAtIndex as PeerSetsAtIndexBase,
AddressableTrackedPeers, Ingress, PeerSetUpdate, TrackedPeers,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{
telemetry::metrics::status::GaugeExt, Clock, Metrics as RuntimeMetrics, Spawner,
};
use commonware_utils::{ordered::Set, IpAddrExt, PrioritySet, SystemTimeExt};
use rand::Rng;
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
net::IpAddr,
num::NonZeroUsize,
time::{Duration, SystemTime},
};
use tracing::{debug, warn};
type PeerSetsAtIndex<C> = PeerSetsAtIndexBase<Set<C>, Set<C>>;
pub struct Config {
pub allow_private_ips: bool,
pub allow_dns: bool,
pub bypass_ip_check: bool,
pub max_sets: NonZeroUsize,
pub peer_connection_cooldown: Duration,
pub block_duration: Duration,
}
pub struct Directory<E: Rng + Clock + RuntimeMetrics, C: PublicKey> {
context: E,
max_sets: NonZeroUsize,
pub allow_private_ips: bool,
allow_dns: bool,
bypass_ip_check: bool,
block_duration: Duration,
peer_connection_cooldown: Duration,
peers: HashMap<C, Record>,
peer_sets: BTreeMap<u64, PeerSetsAtIndex<C>>,
blocked: PrioritySet<C, SystemTime>,
releaser: Releaser<C>,
metrics: Metrics,
}
impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
pub fn init(context: E, myself: C, cfg: Config, releaser: Releaser<C>) -> Self {
let mut peers = HashMap::new();
peers.insert(myself, Record::myself());
let metrics = Metrics::init(context.clone());
let _ = metrics.tracked.try_set(peers.len() - 1);
Self {
context,
max_sets: cfg.max_sets,
allow_private_ips: cfg.allow_private_ips,
allow_dns: cfg.allow_dns,
bypass_ip_check: cfg.bypass_ip_check,
block_duration: cfg.block_duration,
peer_connection_cooldown: cfg.peer_connection_cooldown,
peers,
peer_sets: BTreeMap::new(),
blocked: PrioritySet::new(),
releaser,
metrics,
}
}
pub fn release(&mut self, metadata: Metadata<C>) {
let peer = metadata.public_key();
let Some(record) = self.peers.get_mut(peer) else {
return;
};
record.release();
self.metrics.connected.remove(&metrics::Peer::new(peer));
self.metrics.reserved.dec();
self.delete_if_needed(peer);
}
pub fn connect(&mut self, peer: &C) {
let record = self.peers.get_mut(peer).unwrap();
record.connect();
let _ = self
.metrics
.connected
.get_or_create(&metrics::Peer::new(peer))
.try_set(self.context.current().epoch_millis());
}
pub fn track(&mut self, index: u64, peers: AddressableTrackedPeers<C>) -> Option<Set<C>> {
if self.peer_sets.contains_key(&index) {
warn!(index, "peer set already exists");
return None;
}
if let Some((last, _)) = self.peer_sets.last_key_value() {
if index <= *last {
warn!(?index, ?last, "index must monotonically increase");
return None;
}
}
let mut reset_peers = Vec::new();
for (primary, addr) in &peers.primary {
let record = match self.peers.entry(primary.clone()) {
Entry::Occupied(entry) => {
let entry = entry.into_mut();
if entry.update(addr.clone()) {
reset_peers.push(primary.clone());
}
entry
}
Entry::Vacant(entry) => {
self.metrics.tracked.inc();
entry.insert(Record::known(addr.clone()))
}
};
record.increment_primary();
}
for (secondary, addr) in &peers.secondary {
if peers.primary.position(secondary).is_some() {
continue;
}
let record = match self.peers.entry(secondary.clone()) {
Entry::Occupied(entry) => {
let entry = entry.into_mut();
if entry.update(addr.clone()) {
reset_peers.push(secondary.clone());
}
entry
}
Entry::Vacant(entry) => {
self.metrics.tracked.inc();
entry.insert(Record::known(addr.clone()))
}
};
record.increment_secondary();
}
let secondary_set = Set::from_iter_dedup(
peers
.secondary
.keys()
.iter()
.filter(|k| peers.primary.position(k).is_none())
.cloned(),
);
let primary_keys_set = peers.primary.into_keys();
self.peer_sets.insert(
index,
PeerSetsAtIndex {
primary: primary_keys_set,
secondary: secondary_set,
},
);
while self.peer_sets.len() > self.max_sets.get() {
let (removed_index, sets) = self.peer_sets.pop_first().unwrap();
debug!(index = removed_index, "removed oldest tracked peer sets");
sets.primary.into_iter().for_each(|primary| {
self.peers.get_mut(&primary).unwrap().decrement_primary();
let deleted = self.delete_if_needed(&primary);
if deleted {
reset_peers.push(primary);
}
});
sets.secondary.into_iter().for_each(|secondary| {
self.peers
.get_mut(&secondary)
.unwrap()
.decrement_secondary();
let deleted = self.delete_if_needed(&secondary);
if deleted {
reset_peers.push(secondary);
}
});
}
Some(Set::from_iter_dedup(reset_peers))
}
pub fn overwrite(&mut self, peer: &C, address: Address) -> bool {
let Some(record) = self.peers.get_mut(peer) else {
return false;
};
record.update(address)
}
pub fn get_peer_set(&self, index: &u64) -> Option<TrackedPeers<C>> {
let entry = self.peer_sets.get(index)?;
Some(TrackedPeers::new(
entry.primary.clone(),
entry.secondary.clone(),
))
}
pub fn latest_set_index(&self) -> Option<u64> {
self.peer_sets.keys().last().copied()
}
pub fn latest_update(&self) -> Option<PeerSetUpdate<C>> {
let index = self.latest_set_index()?;
Some(PeerSetUpdate {
index,
latest: self.get_peer_set(&index).unwrap(),
all: self.all(),
})
}
pub fn dial(&mut self, peer: &C) -> Option<(Reservation<C>, Ingress)> {
let record = self.peers.get(peer)?;
if !record.is_outbound_target() {
return None;
}
let ingress = record.ingress()?;
let reservation = self.reserve(Metadata::Dialer(peer.clone()))?;
Some((reservation, ingress))
}
pub fn listen(&mut self, peer: &C) -> Option<Reservation<C>> {
self.reserve(Metadata::Listener(peer.clone()))
}
fn is_blocked(&self, peer: &C) -> bool {
self.blocked
.get(peer)
.is_some_and(|t| t > self.context.current())
}
pub fn block(&mut self, peer: &C) {
if self.is_blocked(peer) {
return;
}
if let Some(record) = self.peers.get(peer) {
if !record.is_blockable() {
return;
}
}
let blocked_until = self.context.current() + self.block_duration;
self.blocked.put(peer.clone(), blocked_until);
let _ = self
.metrics
.blocked
.get_or_create(&metrics::Peer::new(peer))
.try_set(blocked_until.epoch_millis());
}
pub fn all(&self) -> TrackedPeers<C> {
let mut primary = Vec::new();
let mut secondary = Vec::new();
for (k, record) in &self.peers {
if record.primary_sets() > 0 {
primary.push(k.clone());
} else if record.secondary_sets() > 0 {
secondary.push(k.clone());
}
}
TrackedPeers::new(
Set::from_iter_dedup(primary),
Set::from_iter_dedup(secondary),
)
}
pub fn eligible(&self, peer: &C) -> bool {
!self.is_blocked(peer) && self.peers.get(peer).is_some_and(|r| r.eligible())
}
pub fn dialable(&self) -> Dialable<C> {
let now = self.context.current();
let mut next_query_at: Option<SystemTime> = None;
let mut peers = Vec::new();
for (peer, record) in &self.peers {
if let Some(blocked_until) = self.blocked.get(peer).filter(|t| *t > now) {
next_query_at = earliest(next_query_at, blocked_until);
continue;
}
match record.dialable(now, self.allow_private_ips, self.allow_dns) {
DialStatus::Now => peers.push(peer.clone()),
DialStatus::After(t) => {
next_query_at = earliest(next_query_at, t);
}
DialStatus::Unavailable => {}
}
}
peers.sort();
Dialable {
peers,
next_query_at,
}
}
pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> bool {
!self.is_blocked(peer)
&& self
.peers
.get(peer)
.is_some_and(|record| record.acceptable(source_ip, self.bypass_ip_check))
}
pub fn listenable(&self) -> HashSet<IpAddr> {
self.peers
.iter()
.filter(|(peer, r)| !self.is_blocked(peer) && r.eligible())
.filter_map(|(_, r)| r.egress_ip())
.filter(|ip| self.allow_private_ips || IpAddrExt::is_global(ip))
.collect()
}
pub fn unblock_expired(&mut self) -> bool {
let now = self.context.current();
let mut any_unblocked = false;
while let Some((_, &blocked_until)) = self.blocked.peek() {
if blocked_until > now {
break;
}
let (peer, _) = self.blocked.pop().unwrap();
debug!(?peer, "unblocked peer");
self.metrics.blocked.remove(&metrics::Peer::new(&peer));
any_unblocked = true;
}
any_unblocked
}
pub async fn wait_for_unblock(&self) {
match self.blocked.peek() {
Some((_, &time)) => self.context.sleep_until(time).await,
None => futures::future::pending().await,
}
}
#[cfg(test)]
pub fn blocked(&self) -> usize {
self.blocked.len()
}
fn reserve(&mut self, metadata: Metadata<C>) -> Option<Reservation<C>> {
let peer = metadata.public_key();
if !self.eligible(peer) {
return None;
}
let record = self.peers.get_mut(peer).unwrap();
match record.reserve(&mut self.context, self.peer_connection_cooldown) {
ReserveResult::Reserved => {
self.metrics.reserved.inc();
Some(Reservation::new(metadata, self.releaser.clone()))
}
ReserveResult::RateLimited => {
self.metrics
.limits
.get_or_create(&metrics::Peer::new(peer))
.inc();
None
}
ReserveResult::Unavailable => None,
}
}
fn delete_if_needed(&mut self, peer: &C) -> bool {
let Some(record) = self.peers.get(peer) else {
return false;
};
if !record.deletable() {
return false;
}
self.peers.remove(peer);
self.metrics.tracked.dec();
true
}
}
#[cfg(test)]
mod tests {
use crate::{
authenticated::{
lookup::{actors::tracker::directory::Directory, metrics},
mailbox::UnboundedMailbox,
},
types::Address,
AddressableTrackedPeers, Ingress,
};
use commonware_cryptography::{ed25519, Signer};
use commonware_runtime::{deterministic, Clock, Metrics, Runner};
use commonware_utils::{
hostname,
ordered::{Map, Set},
NZUsize, SystemTimeExt,
};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
fn addr(socket: SocketAddr) -> Address {
Address::Symmetric(socket)
}
fn primary(
map: Map<ed25519::PublicKey, Address>,
) -> AddressableTrackedPeers<ed25519::PublicKey> {
AddressableTrackedPeers::from(map)
}
fn metric_value(metrics: &str, name: &str, peer: &str) -> Option<i64> {
metrics
.lines()
.find(|line| line.starts_with(&format!("{name}{{peer=\"{peer}\"}} ")))
.and_then(|line| line.split_whitespace().nth(1))
.and_then(|value| value.parse::<i64>().ok())
}
#[test]
fn test_track_return_value() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(1),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236);
let pk_3 = ed25519::PrivateKey::from_seed(3).public_key();
let addr_3 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1237);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
let reset_peers = directory
.track(
0,
primary(
[(pk_1.clone(), addr(addr_1)), (pk_2.clone(), addr(addr_2))]
.try_into()
.unwrap(),
),
)
.unwrap();
assert!(
reset_peers.is_empty(),
"No peers should be deleted on first set"
);
let reset_peers = directory
.track(
1,
primary(
[(pk_2.clone(), addr(addr_2)), (pk_3.clone(), addr(addr_3))]
.try_into()
.unwrap(),
),
)
.unwrap();
assert_eq!(reset_peers.len(), 1, "One peer should be reset");
assert!(
reset_peers.position(&pk_1).is_some(),
"Reset peer should be pk_1"
);
let reset_peers = directory
.track(
2,
primary([(pk_3.clone(), addr(addr_3))].try_into().unwrap()),
)
.unwrap();
assert_eq!(reset_peers.len(), 1, "One peer should be reset");
assert!(
reset_peers.position(&pk_2).is_some(),
"Reset peer should be pk_2"
);
let reset_peers = directory
.track(
3,
primary([(pk_3.clone(), addr(addr_3))].try_into().unwrap()),
)
.unwrap();
assert!(reset_peers.is_empty(), "No peers should be reset");
});
}
#[test]
fn test_secondary_sets_remain_until_eviction() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(2),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let primary_0 = ed25519::PrivateKey::from_seed(1).public_key();
let primary_0_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
let primary_1 = ed25519::PrivateKey::from_seed(2).public_key();
let primary_1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236);
let primary_2 = ed25519::PrivateKey::from_seed(3).public_key();
let primary_2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1237);
let secondary_0 = ed25519::PrivateKey::from_seed(4).public_key();
let secondary_0_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1238);
let secondary_1 = ed25519::PrivateKey::from_seed(5).public_key();
let secondary_1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1239);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
assert!(directory
.track(
0,
AddressableTrackedPeers::new(
[(primary_0, addr(primary_0_addr))].try_into().unwrap(),
[(secondary_0.clone(), addr(secondary_0_addr))]
.try_into()
.unwrap(),
),
)
.is_some());
assert!(directory.eligible(&secondary_0));
assert!(directory
.track(
1,
AddressableTrackedPeers::new(
[(primary_1, addr(primary_1_addr))].try_into().unwrap(),
[(secondary_1.clone(), addr(secondary_1_addr))]
.try_into()
.unwrap(),
),
)
.is_some());
assert!(directory.eligible(&secondary_0));
assert!(directory.eligible(&secondary_1));
assert!(directory
.track(
2,
primary([(primary_2, addr(primary_2_addr))].try_into().unwrap(),),
)
.is_some());
assert!(!directory.peers.contains_key(&secondary_0));
assert!(directory.eligible(&secondary_1));
});
}
#[test]
fn test_track_overwrite() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
let addr_4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1238);
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236);
let pk_3 = ed25519::PrivateKey::from_seed(3).public_key();
let addr_3 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1237);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk.clone(), config, releaser);
directory.track(
0,
primary(
[(pk_1.clone(), addr(addr_1)), (pk_2.clone(), addr(addr_2))]
.try_into()
.unwrap(),
),
);
assert!(directory.peers.get(&my_pk).unwrap().ingress().is_none());
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_1))
);
assert_eq!(
directory.peers.get(&pk_2).unwrap().ingress(),
Some(Ingress::Socket(addr_2))
);
assert!(!directory.peers.contains_key(&pk_3));
directory.track(
1,
primary([(pk_1.clone(), addr(addr_4))].try_into().unwrap()),
);
assert!(directory.peers.get(&my_pk).unwrap().ingress().is_none());
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_4))
);
assert_eq!(
directory.peers.get(&pk_2).unwrap().ingress(),
Some(Ingress::Socket(addr_2))
);
assert!(!directory.peers.contains_key(&pk_3));
directory.track(
2,
primary([(my_pk.clone(), addr(addr_3))].try_into().unwrap()),
);
assert!(directory.peers.get(&my_pk).unwrap().ingress().is_none());
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_4))
);
assert_eq!(
directory.peers.get(&pk_2).unwrap().ingress(),
Some(Ingress::Socket(addr_2))
);
assert!(!directory.peers.contains_key(&pk_3));
let reset_peers = directory
.track(
3,
primary([(my_pk.clone(), addr(my_addr))].try_into().unwrap()),
)
.unwrap();
assert_eq!(reset_peers.len(), 1);
assert!(reset_peers.position(&pk_2).is_some());
let reset_peers = directory
.track(
4,
primary([(my_pk.clone(), addr(addr_3))].try_into().unwrap()),
)
.unwrap();
assert_eq!(reset_peers.len(), 1);
assert!(reset_peers.position(&pk_1).is_some());
let result = directory.track(
0,
primary(
[(pk_1.clone(), addr(addr_1)), (pk_2.clone(), addr(addr_2))]
.try_into()
.unwrap(),
),
);
assert!(result.is_none());
});
}
#[test]
fn test_track_primary_secondary_overlap_deduplicates() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let primary_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let secondary_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 2235);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
let reset_peers = directory
.track(
0,
AddressableTrackedPeers::new(
[(pk_1.clone(), addr(primary_addr))].try_into().unwrap(),
[(pk_1.clone(), addr(secondary_addr))].try_into().unwrap(),
),
)
.unwrap();
assert!(reset_peers.is_empty());
assert_eq!(directory.latest_set_index(), Some(0));
let peer_set = directory.get_peer_set(&0).unwrap();
assert_eq!(peer_set.primary, [pk_1.clone()].try_into().unwrap());
assert!(
peer_set.secondary.is_empty(),
"overlap peer is deduplicated as primary only"
);
assert!(directory.eligible(&pk_1));
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(primary_addr))
);
assert_eq!(directory.all().primary, [pk_1.clone()].try_into().unwrap());
assert!(directory.all().secondary.is_empty());
assert_eq!(directory.dialable().peers, vec![pk_1.clone()]);
let rec = directory.peers.get(&pk_1).unwrap();
assert_eq!(rec.primary_sets(), 1);
assert_eq!(rec.secondary_sets(), 0);
});
}
#[test]
fn test_demotion_from_primary_to_secondary() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(2),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_x = ed25519::PrivateKey::from_seed(1).public_key();
let pk_y = ed25519::PrivateKey::from_seed(2).public_key();
let addr_x = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1000);
let addr_y = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 2000);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
directory
.track(
0,
AddressableTrackedPeers::new(
[(pk_x.clone(), addr(addr_x))].try_into().unwrap(),
[(pk_y.clone(), addr(addr_y))].try_into().unwrap(),
),
)
.unwrap();
assert_eq!(directory.peers.get(&pk_x).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_x).unwrap().secondary_sets(), 0);
assert_eq!(directory.peers.get(&pk_y).unwrap().primary_sets(), 0);
assert_eq!(directory.peers.get(&pk_y).unwrap().secondary_sets(), 1);
directory
.track(
1,
AddressableTrackedPeers::new(
[(pk_y.clone(), addr(addr_y))].try_into().unwrap(),
[(pk_x.clone(), addr(addr_x))].try_into().unwrap(),
),
)
.unwrap();
assert_eq!(directory.peers.get(&pk_x).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_x).unwrap().secondary_sets(), 1);
assert_eq!(directory.peers.get(&pk_y).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_y).unwrap().secondary_sets(), 1);
let agg = directory.all();
assert!(agg.primary.position(&pk_x).is_some());
assert!(agg.primary.position(&pk_y).is_some());
assert!(agg.secondary.is_empty());
directory
.track(
2,
AddressableTrackedPeers::new(
[(pk_y.clone(), addr(addr_y))].try_into().unwrap(),
[(pk_x.clone(), addr(addr_x))].try_into().unwrap(),
),
)
.unwrap();
assert_eq!(directory.peers.get(&pk_x).unwrap().primary_sets(), 0);
assert_eq!(directory.peers.get(&pk_x).unwrap().secondary_sets(), 2);
assert_eq!(directory.peers.get(&pk_y).unwrap().primary_sets(), 2);
assert_eq!(directory.peers.get(&pk_y).unwrap().secondary_sets(), 0);
let agg = directory.all();
assert!(agg.primary.position(&pk_y).is_some());
assert!(agg.secondary.position(&pk_x).is_some());
assert!(agg.primary.position(&pk_x).is_none());
assert!(agg.secondary.position(&pk_y).is_none());
});
}
#[test]
fn test_track_primary_wins_conflicting_overlap_when_updating_existing_address() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let old_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 2235);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
let initial_reset = directory
.track(
0,
primary([(pk_1.clone(), addr(old_addr))].try_into().unwrap()),
)
.unwrap();
assert!(initial_reset.is_empty());
let reset_peers = directory
.track(
1,
AddressableTrackedPeers::new(
[(pk_1.clone(), addr(new_addr))].try_into().unwrap(),
[(pk_1.clone(), addr(old_addr))].try_into().unwrap(),
),
)
.unwrap();
assert_eq!(reset_peers, Set::try_from([pk_1.clone()]).unwrap());
assert_eq!(directory.latest_set_index(), Some(1));
assert_eq!(
directory.get_peer_set(&1).unwrap().primary,
[pk_1.clone()].try_into().unwrap()
);
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(new_addr))
);
assert_eq!(directory.all().primary, [pk_1.clone()].try_into().unwrap());
assert_eq!(directory.dialable().peers, vec![pk_1.clone()]);
assert_eq!(directory.dial(&pk_1).unwrap().1, Ingress::Socket(new_addr));
assert!(directory.listenable().contains(&new_addr.ip()));
assert!(!directory.listenable().contains(&old_addr.ip()));
});
}
#[test]
fn test_all_cross_index_primary_wins_for_overlap_peer() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_a = ed25519::PrivateKey::from_seed(31).public_key();
let pk_b = ed25519::PrivateKey::from_seed(32).public_key();
let pk_overlap = ed25519::PrivateKey::from_seed(33).public_key();
let pk_sec = ed25519::PrivateKey::from_seed(34).public_key();
let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 4001);
let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)), 4002);
let addr_overlap_p = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), 4003);
let addr_overlap_s = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), 5003);
let addr_sec = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 4)), 4004);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
assert!(directory
.track(
10,
primary(
[
(pk_a.clone(), addr(addr_a)),
(pk_overlap.clone(), addr(addr_overlap_p)),
]
.try_into()
.unwrap(),
),
)
.is_some());
assert!(directory
.track(
11,
AddressableTrackedPeers::new(
[(pk_b.clone(), addr(addr_b))].try_into().unwrap(),
[
(pk_overlap.clone(), addr(addr_overlap_s)),
(pk_sec.clone(), addr(addr_sec)),
]
.try_into()
.unwrap(),
),
)
.is_some());
let agg = directory.all();
assert!(
agg.primary.position(&pk_overlap).is_some(),
"any primary membership across tracked sets -> aggregate primary only"
);
assert!(
agg.secondary.position(&pk_overlap).is_none(),
"aggregate secondary must not duplicate keys that have a primary role somewhere"
);
assert!(
agg.secondary.position(&pk_sec).is_some(),
"peers who are only secondary across sets stay under aggregate secondary"
);
});
}
#[test]
fn test_connected_metric_tracks_active_peers() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(1),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory
.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
)
.unwrap();
let _reservation = directory.listen(&pk_1).expect("peer should reserve");
let connected_at: i64 = context.current().epoch_millis().try_into().unwrap();
directory.connect(&pk_1);
context.sleep(Duration::from_secs(5)).await;
let metrics = context.encode();
assert_eq!(
metric_value(&metrics, "connected", &pk_1.to_string()),
Some(connected_at)
);
directory.release(super::Metadata::Listener(pk_1.clone()));
let metrics = context.encode();
assert_eq!(metric_value(&metrics, "connected", &pk_1.to_string()), None);
});
}
#[test]
fn test_blocked_peer_remains_blocked_on_update() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 2235);
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk.clone(), config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
assert!(
directory.blocked.contains(&pk_1),
"Peer should be blocked after call to block"
);
let record = directory.peers.get(&pk_1).unwrap();
assert_eq!(
record.ingress(),
Some(Ingress::Socket(addr_1)),
"Record still has address (blocking is at Directory level)"
);
directory.track(
1,
primary([(pk_1.clone(), addr(addr_2))].try_into().unwrap()),
);
assert!(
directory.blocked.contains(&pk_1),
"Blocked peer should remain blocked after update"
);
let record = directory.peers.get(&pk_1).unwrap();
assert_eq!(
record.ingress(),
Some(Ingress::Socket(addr_2)),
"Record has updated address"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
!directory.blocked.contains(&pk_1),
"Peer should be unblocked after expiry"
);
let record = directory.peers.get(&pk_1).unwrap();
assert_eq!(
record.ingress(),
Some(Ingress::Socket(addr_2)),
"Unblocked peer should have the updated address"
);
});
}
#[test]
fn test_asymmetric_addresses() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let ingress_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 8080);
let egress_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 9090);
let asymmetric_addr = Address::Asymmetric {
ingress: Ingress::Socket(ingress_socket),
egress: egress_socket,
};
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let egress_socket_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 9090);
let dns_addr = Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!("node.example.com"),
port: 8080,
},
egress: egress_socket_2,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk.clone(), config, releaser);
let reset_peers = directory
.track(
0,
primary(
[
(pk_1.clone(), asymmetric_addr.clone()),
(pk_2.clone(), dns_addr.clone()),
]
.try_into()
.unwrap(),
),
)
.unwrap();
assert!(reset_peers.is_empty());
let record_1 = directory.peers.get(&pk_1).unwrap();
assert_eq!(
record_1.ingress(),
Some(Ingress::Socket(ingress_socket)),
"Ingress should match the asymmetric address's ingress"
);
assert_eq!(
record_1.egress_ip(),
Some(egress_socket.ip()),
"Egress IP should be from the egress socket"
);
let record_2 = directory.peers.get(&pk_2).unwrap();
assert_eq!(
record_2.ingress(),
Some(Ingress::Dns {
host: hostname!("node.example.com"),
port: 8080
}),
"Ingress should be DNS address"
);
assert_eq!(
record_2.egress_ip(),
Some(egress_socket_2.ip()),
"Egress IP should be from the egress socket"
);
let listenable = directory.listenable();
assert!(
listenable.contains(&egress_socket.ip()),
"Listenable should contain peer 1's egress IP"
);
assert!(
listenable.contains(&egress_socket_2.ip()),
"Listenable should contain peer 2's egress IP"
);
assert!(
!listenable.contains(&ingress_socket.ip()),
"Listenable should NOT contain peer 1's ingress IP"
);
});
}
#[test]
fn test_dns_addresses_registered_but_not_dialable_when_disabled() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: false,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_socket = ed25519::PrivateKey::from_seed(1).public_key();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 8080);
let socket_peer_addr = Address::Symmetric(socket_addr);
let pk_dns = ed25519::PrivateKey::from_seed(2).public_key();
let egress_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 9090);
let dns_peer_addr = Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!("node.example.com"),
port: 8080,
},
egress: egress_socket,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
let reset_peers = directory
.track(
0,
primary(
[
(pk_socket.clone(), socket_peer_addr.clone()),
(pk_dns.clone(), dns_peer_addr.clone()),
]
.try_into()
.unwrap(),
),
)
.unwrap();
assert!(reset_peers.is_empty());
assert!(
directory.peers.contains_key(&pk_socket),
"Socket peer should be in the peer set"
);
assert!(
directory.peers.contains_key(&pk_dns),
"DNS peer should be in the peer set for consistency"
);
let dialable = directory.dialable();
assert_eq!(dialable.peers.len(), 1);
assert_eq!(dialable.peers[0], pk_socket);
});
}
#[test]
fn test_private_egress_ip_in_peer_set_but_not_dialable_or_tracked() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: false,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_public = ed25519::PrivateKey::from_seed(1).public_key();
let public_addr =
Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 8080));
let pk_private = ed25519::PrivateKey::from_seed(2).public_key();
let private_addr = Address::Symmetric(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
8080,
));
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
let reset_peers = directory
.track(
0,
primary(
[
(pk_public.clone(), public_addr.clone()),
(pk_private.clone(), private_addr.clone()),
]
.try_into()
.unwrap(),
),
)
.unwrap();
assert!(reset_peers.is_empty());
assert!(
directory.peers.contains_key(&pk_public),
"Public peer should be in the peer set"
);
assert!(
directory.peers.contains_key(&pk_private),
"Private peer should be in the peer set for consistency"
);
let dialable = directory.dialable();
assert_eq!(dialable.peers.len(), 1);
assert_eq!(dialable.peers[0], pk_public);
let listenable = directory.listenable();
assert!(listenable.contains(&Ipv4Addr::new(8, 8, 8, 8).into()));
assert!(!listenable.contains(&Ipv4Addr::new(10, 0, 0, 1).into()));
});
}
#[test]
fn test_listenable_ip_collision_eligible_wins() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let shared_ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1));
let addr_1 = Address::Symmetric(SocketAddr::new(shared_ip, 8080));
let addr_2 = Address::Symmetric(SocketAddr::new(shared_ip, 8081));
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary(
[(pk_1.clone(), addr_1), (pk_2.clone(), addr_2)]
.try_into()
.unwrap(),
),
);
let listenable = directory.listenable();
assert!(
listenable.contains(&shared_ip),
"IP should be listenable when both peers are eligible"
);
directory.block(&pk_1);
let listenable = directory.listenable();
assert!(
listenable.contains(&shared_ip),
"IP should be listenable when at least one peer is eligible"
);
directory.block(&pk_2);
let listenable = directory.listenable();
assert!(
!listenable.contains(&shared_ip),
"IP should not be listenable when all peers are blocked"
);
});
}
#[test]
fn test_unblock_expired() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
assert!(
!directory.listenable().contains(&addr_1.ip()),
"Blocked peer should not be listenable"
);
assert_eq!(directory.blocked(), 1, "Should have one blocked peer");
let first_expiry = directory
.blocked
.get(&pk_1)
.expect("peer should be blocked");
assert!(
!directory.unblock_expired(),
"No peers should be unblocked before expiry"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
assert!(directory.unblock_expired(), "Should have unblocked a peer");
assert!(
directory.listenable().contains(&addr_1.ip()),
"Unblocked peer should be listenable"
);
assert_eq!(directory.blocked(), 0, "No more blocked peers");
directory.block(&pk_1);
assert_eq!(directory.blocked(), 1, "Should have one blocked peer again");
let second_expiry = directory
.blocked
.get(&pk_1)
.expect("peer should be blocked again");
assert!(
second_expiry > first_expiry,
"Re-blocking should have a later expiry time"
);
});
}
#[test]
fn test_unblock_expired_peer_removed_and_readded() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(1), peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236);
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none(),
"pk_1 should not be blocked initially"
);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
assert!(directory.blocked.contains(&pk_1));
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"pk_1 should be marked blocked"
);
directory.track(
1,
primary([(pk_2.clone(), addr(addr_2))].try_into().unwrap()),
);
assert!(
!directory.peers.contains_key(&pk_1),
"pk_1 should be removed"
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"blocked metric should persist after peer removal"
);
directory.track(
2,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
assert!(
directory.blocked.contains(&pk_1),
"Re-added pk_1 should still be blocked"
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"blocked metric should persist after re-add"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
assert!(directory.unblock_expired());
assert!(
!directory.blocked.contains(&pk_1),
"pk_1 should no longer be blocked"
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none(),
"blocked metric should be removed after unblock"
);
});
}
#[test]
fn test_blocked_metric_multiple_peers() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235);
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236);
let pk_3 = ed25519::PrivateKey::from_seed(3).public_key();
let addr_3 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1237);
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary(
[
(pk_1.clone(), addr(addr_1)),
(pk_2.clone(), addr(addr_2)),
(pk_3.clone(), addr(addr_3)),
]
.try_into()
.unwrap(),
),
);
assert_eq!(directory.blocked(), 0);
directory.block(&pk_1);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some());
directory.block(&pk_2);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_2))
.is_some());
directory.block(&pk_3);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_3))
.is_some());
assert_eq!(directory.blocked(), 3);
directory.block(&pk_1);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some());
context.sleep(block_duration + Duration::from_secs(1)).await;
assert!(directory.unblock_expired());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_2))
.is_none());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_3))
.is_none());
assert_eq!(directory.blocked(), 0);
});
}
#[test]
fn test_block_myself_no_panic_on_expiry() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk.clone(), config, releaser);
directory.block(&my_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&my_pk))
.is_none(),
"Blocking myself should not create metric entry"
);
assert_eq!(directory.blocked(), 0, "No peers should be blocked");
context.sleep(block_duration + Duration::from_secs(1)).await;
assert!(!directory.unblock_expired(), "No peers should be unblocked");
});
}
#[test]
fn test_block_nonexistent_peer_then_add_to_set() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let unknown_pk = ed25519::PrivateKey::from_seed(99).public_key();
let unknown_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9999);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking nonexistent peer should create metric entry"
);
assert_eq!(directory.blocked(), 1, "One peer should be blocked");
assert!(
!directory.peers.contains_key(&unknown_pk),
"Peer should not be in peers yet"
);
directory.track(
0,
primary(
[(unknown_pk.clone(), addr(unknown_addr))]
.try_into()
.unwrap(),
),
);
assert!(
directory.peers.contains_key(&unknown_pk),
"Peer should be in peers after tracking"
);
assert!(
directory.blocked.contains(&unknown_pk),
"Peer should be blocked after tracking"
);
assert!(
!directory.eligible(&unknown_pk),
"Blocked peer should not be eligible"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_none(),
"Blocked metric should be removed after unblock"
);
assert!(
directory.eligible(&unknown_pk),
"Peer should be eligible after unblock"
);
});
}
#[test]
fn test_block_peer_multiple_times() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let unknown_pk = ed25519::PrivateKey::from_seed(99).public_key();
let registered_pk = ed25519::PrivateKey::from_seed(50).public_key();
let registered_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5050);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary(
[(registered_pk.clone(), addr(registered_addr))]
.try_into()
.unwrap(),
),
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_none(),
"Peer should not be blocked initially"
);
directory.block(®istered_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_some(),
"Tracked peer should be marked blocked"
);
directory.block(®istered_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_some(),
"Blocking same tracked peer twice should not change metric"
);
directory.block(®istered_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_some(),
"Blocking same tracked peer thrice should not change metric"
);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Unknown peer should be marked blocked"
);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking same nonexistent peer twice should not change metric"
);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking same nonexistent peer thrice should not change metric"
);
});
}
#[test]
fn test_blocked_peer_not_dialable() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
assert!(
directory.dialable().peers.contains(&pk_1),
"Peer should be dialable before blocking"
);
directory.block(&pk_1);
assert!(
!directory.dialable().peers.contains(&pk_1),
"Blocked peer should not be dialable"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.dialable().peers.contains(&pk_1),
"Peer should be dialable after unblock"
);
});
}
#[test]
fn test_reservation_rate_limits_redial() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let cooldown = Duration::from_secs(1);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: cooldown,
block_duration: Duration::from_secs(100),
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
let reservation = directory.dial(&pk_1).expect("first dial should succeed");
drop(reservation);
directory.release(super::Metadata::Dialer(pk_1.clone()));
assert!(
directory.dial(&pk_1).is_none(),
"should be rate-limited immediately after release"
);
assert!(
!directory.dialable().peers.contains(&pk_1),
"should not appear in dialable list during rate-limit window"
);
context.sleep(cooldown * 2).await;
assert!(directory.dialable().peers.contains(&pk_1));
let (_reservation, ingress) = directory
.dial(&pk_1)
.expect("should succeed after interval");
assert_eq!(ingress, Ingress::Socket(addr_1));
});
}
#[test]
fn test_dialable_next_query_at_reflects_rate_limit() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let cooldown = Duration::from_secs(1);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: cooldown,
block_duration: Duration::from_secs(100),
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
let reservation = directory.dial(&pk_1).expect("first dial should succeed");
let reserved_at = context.current();
drop(reservation);
directory.release(super::Metadata::Dialer(pk_1.clone()));
let interval = cooldown;
let dialable = directory.dialable();
assert!(!dialable.peers.contains(&pk_1));
let nqa = dialable.next_query_at.unwrap();
assert!(nqa >= reserved_at + interval);
assert!(nqa <= reserved_at + interval * 2);
});
}
#[test]
fn test_dialable_empty() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let cooldown = Duration::from_millis(200);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: cooldown,
block_duration: Duration::from_secs(100),
};
runtime.start(|context| async move {
let directory = Directory::init(context.clone(), my_pk, config, releaser);
let dialable = directory.dialable();
assert!(dialable.peers.is_empty());
assert_eq!(dialable.next_query_at, None);
});
}
#[test]
fn test_dialable_next_query_at_includes_blocked() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1234);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let cooldown = Duration::from_millis(200);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: cooldown,
block_duration: Duration::from_secs(3600),
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
let dialable = directory.dialable();
assert!(dialable.peers.is_empty());
assert_eq!(
dialable.next_query_at,
Some(context.current() + Duration::from_secs(3600))
);
});
}
#[test]
fn test_dialable_expired_block_without_unblock() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1234);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(1);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(200),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
assert!(directory.dialable().peers.is_empty());
context.sleep(block_duration + Duration::from_secs(1)).await;
let dialable = directory.dialable();
assert!(
dialable.peers.contains(&pk_1),
"expired block should not prevent dialing"
);
assert_eq!(
dialable.next_query_at, None,
"expired block should not contribute a stale hint"
);
directory
.dial(&pk_1)
.expect("expired block should not prevent reservation");
});
}
#[test]
fn test_reblock_after_expired_block_without_unblock() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1234);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(1);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(200),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
assert!(directory.dialable().peers.is_empty());
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.block(&pk_1);
assert!(
directory.dialable().peers.is_empty(),
"re-blocked peer should not be dialable"
);
assert!(
directory.dial(&pk_1).is_none(),
"re-blocked peer should not be reservable"
);
});
}
#[test]
fn test_blocked_peer_not_acceptable() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: true, max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
assert!(
directory.acceptable(&pk_1, addr_1.ip()),
"Peer should be acceptable before blocking"
);
directory.block(&pk_1);
assert!(
!directory.acceptable(&pk_1, addr_1.ip()),
"Blocked peer should not be acceptable"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.acceptable(&pk_1, addr_1.ip()),
"Peer should be acceptable after unblock"
);
});
}
#[test]
fn test_blocked_peer_not_listenable() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
assert!(
directory.listenable().contains(&addr_1.ip()),
"Peer's IP should be listenable before blocking"
);
directory.block(&pk_1);
assert!(
!directory.listenable().contains(&addr_1.ip()),
"Blocked peer's IP should not be listenable"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.listenable().contains(&addr_1.ip()),
"Peer's IP should be listenable after unblock"
);
});
}
#[test]
fn test_blocked_peer_not_eligible() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
assert!(
directory.eligible(&pk_1),
"Peer should be eligible before blocking"
);
directory.block(&pk_1);
assert!(
!directory.eligible(&pk_1),
"Blocked peer should not be eligible"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.eligible(&pk_1),
"Peer should be eligible after unblock"
);
});
}
#[test]
fn test_overwrite_basic() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1236);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_1))
);
let success = directory.overwrite(&pk_1, addr(addr_2));
assert!(success);
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_2))
);
});
}
#[test]
fn test_overwrite_untracked_peer() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
let success = directory.overwrite(&pk_1, addr(addr_1));
assert!(!success);
});
}
#[test]
fn test_overwrite_peer_not_in_set() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(1),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let pk_2 = ed25519::PrivateKey::from_seed(2).public_key();
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1236);
let addr_3 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 10, 10, 10)), 1237);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.track(
1,
primary([(pk_2.clone(), addr(addr_2))].try_into().unwrap()),
);
let success = directory.overwrite(&pk_1, addr(addr_3));
assert!(!success);
});
}
#[test]
fn test_overwrite_blocked_peer() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1236);
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
directory.block(&pk_1);
let success = directory.overwrite(&pk_1, addr(addr_2));
assert!(success);
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_2))
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert_eq!(
directory.peers.get(&pk_1).unwrap().ingress(),
Some(Ingress::Socket(addr_2))
);
assert!(directory.dialable().peers.contains(&pk_1));
});
}
#[test]
fn test_overwrite_myself() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk.clone(), config, releaser);
let success = directory.overwrite(&my_pk, addr(addr_1));
assert!(!success);
});
}
#[test]
fn test_overwrite_same_address() {
let runtime = deterministic::Runner::default();
let my_pk = ed25519::PrivateKey::from_seed(0).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = super::Releaser::new(tx);
let config = super::Config {
allow_private_ips: true,
allow_dns: true,
bypass_ip_check: false,
max_sets: NZUsize!(3),
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = ed25519::PrivateKey::from_seed(1).public_key();
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235);
runtime.start(|context| async move {
let mut directory = Directory::init(context, my_pk, config, releaser);
directory.track(
0,
primary([(pk_1.clone(), addr(addr_1))].try_into().unwrap()),
);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1236);
assert!(directory.overwrite(&pk_1, addr(addr_2)));
assert!(!directory.overwrite(&pk_1, addr(addr_2)));
let addr_3 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 10, 10, 10)), 1237);
assert!(directory.overwrite(&pk_1, addr(addr_3)));
});
}
}