use super::{bit_set::BitSet, metrics::Metrics, record::Record, Metadata, Reservation};
use crate::{
authenticated::{
dialing::{earliest, DialStatus, Dialable, ReserveResult},
discovery::{
actors::tracker::ingress::Releaser,
metrics,
types::{self, Info},
},
},
utils::PeerSetsAtIndex as PeerSetsAtIndexBase,
Ingress, PeerSetUpdate, TrackedPeers,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{
telemetry::metrics::status::GaugeExt, Clock, Metrics as RuntimeMetrics, Spawner,
};
use commonware_utils::{ordered::Set as OrderedSet, PrioritySet, SystemTimeExt};
use rand::{seq::IteratorRandom, Rng};
use std::{
collections::{BTreeMap, HashMap},
num::NonZeroUsize,
ops::Deref,
time::{Duration, SystemTime},
};
use tracing::{debug, warn};
type PeerSetsAtIndex<C> = PeerSetsAtIndexBase<BitSet<C>, OrderedSet<C>>;
pub struct Config {
pub allow_private_ips: bool,
pub allow_dns: bool,
pub max_sets: NonZeroUsize,
pub dial_fail_limit: usize,
pub peer_connection_cooldown: Duration,
pub block_duration: Duration,
}
pub struct Directory<E: Rng + Clock + RuntimeMetrics, C: PublicKey> {
context: E,
allow_private_ips: bool,
allow_dns: bool,
max_sets: NonZeroUsize,
dial_fail_limit: usize,
block_duration: Duration,
peer_connection_cooldown: Duration,
peers: HashMap<C, Record<C>>,
peer_sets: BTreeMap<u64, PeerSetsAtIndex<C>>,
blocked: PrioritySet<C, SystemTime>,
releaser: Releaser<C>,
metrics: Metrics,
}
impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
pub fn init(
context: E,
bootstrappers: Vec<(C, Ingress)>,
myself: Info<C>,
cfg: Config,
releaser: Releaser<C>,
) -> Self {
let mut peers = HashMap::new();
for (peer, ingress) in bootstrappers {
peers.insert(peer, Record::bootstrapper(ingress));
}
peers.insert(myself.public_key.clone(), Record::myself(myself));
let metrics = Metrics::init(context.clone());
let _ = metrics.tracked.try_set(peers.len() - 1);
Self {
context,
allow_private_ips: cfg.allow_private_ips,
allow_dns: cfg.allow_dns,
max_sets: cfg.max_sets,
dial_fail_limit: cfg.dial_fail_limit,
block_duration: cfg.block_duration,
peer_connection_cooldown: cfg.peer_connection_cooldown,
peers,
peer_sets: BTreeMap::new(),
blocked: PrioritySet::new(),
releaser,
metrics,
}
}
pub fn release(&mut self, metadata: Metadata<C>) {
let peer = metadata.public_key();
let Some(record) = self.peers.get_mut(peer) else {
return;
};
record.release();
self.metrics.connected.remove(&metrics::Peer::new(peer));
self.metrics.reserved.dec();
if let Metadata::Dialer(_, ingress) = &metadata {
record.dial_failure(ingress);
}
let want = record.want(self.dial_fail_limit);
for entry in self.peer_sets.values_mut() {
entry.primary.update(peer, !want);
}
self.delete_if_needed(peer);
}
pub fn connect(&mut self, peer: &C, dialer: bool) {
let record = self.peers.get_mut(peer).unwrap();
if dialer {
record.dial_success();
}
record.connect();
let _ = self
.metrics
.connected
.get_or_create(&metrics::Peer::new(peer))
.try_set(self.context.current().epoch_millis());
let want = record.want(self.dial_fail_limit);
for entry in self.peer_sets.values_mut() {
entry.primary.update(peer, !want);
}
}
pub fn update_peers(&mut self, infos: Vec<types::Info<C>>) {
for info in infos {
let peer = info.public_key.clone();
let Some(record) = self.peers.get_mut(&peer) else {
continue;
};
if !record.update(info) {
continue;
}
self.metrics
.updates
.get_or_create(&metrics::Peer::new(&peer))
.inc();
let want = record.want(self.dial_fail_limit);
for entry in self.peer_sets.values_mut() {
entry.primary.update(&peer, !want);
}
debug!(?peer, "updated peer record");
}
}
pub fn track(&mut self, index: u64, peers: TrackedPeers<C>) -> bool {
if self.peer_sets.contains_key(&index) {
warn!(index, "peer set already exists");
return false;
}
if let Some((last, _)) = self.peer_sets.last_key_value() {
if index <= *last {
warn!(?index, ?last, "index must monotonically increase");
return false;
}
}
let secondary_deduped: OrderedSet<C> = OrderedSet::from_iter_dedup(
peers
.secondary
.iter()
.filter(|s| peers.primary.position(s).is_none())
.cloned(),
);
let mut primary_set = BitSet::new(peers.primary);
for i in 0..primary_set.len() {
let primary = primary_set[i].clone();
let record = self.peers.entry(primary).or_insert_with(|| {
self.metrics.tracked.inc();
Record::unknown()
});
record.increment_primary();
assert!(
primary_set.update_at(i, !record.want(self.dial_fail_limit)),
"index in 0..primary_set.len() must map to a knowledge bit"
);
}
for secondary in secondary_deduped.iter() {
let record = self.peers.entry(secondary.clone()).or_insert_with(|| {
self.metrics.tracked.inc();
Record::unknown()
});
record.increment_secondary();
}
self.peer_sets.insert(
index,
PeerSetsAtIndex {
primary: primary_set,
secondary: secondary_deduped,
},
);
while self.peer_sets.len() > self.max_sets.get() {
let (index, sets) = self.peer_sets.pop_first().unwrap();
debug!(index, "removed oldest tracked peer sets");
sets.primary.into_iter().for_each(|primary| {
self.peers.get_mut(primary).unwrap().decrement_primary();
self.delete_if_needed(primary);
});
sets.secondary.iter().for_each(|secondary| {
self.peers.get_mut(secondary).unwrap().decrement_secondary();
self.delete_if_needed(secondary);
});
}
true
}
pub fn get_peer_set(&self, index: &u64) -> Option<TrackedPeers<C>> {
let entry = self.peer_sets.get(index)?;
Some(TrackedPeers::new(
entry.primary.deref().clone(),
entry.secondary.clone(),
))
}
pub fn latest_set_index(&self) -> Option<u64> {
self.peer_sets.keys().last().copied()
}
pub fn latest_update(&self) -> Option<PeerSetUpdate<C>> {
let index = self.latest_set_index()?;
Some(PeerSetUpdate {
index,
latest: self.get_peer_set(&index).unwrap(),
all: self.all(),
})
}
pub fn dial(&mut self, peer: &C) -> Option<Reservation<C>> {
let ingress = self.peers.get(peer)?.ingress()?.clone();
self.reserve(Metadata::Dialer(peer.clone(), ingress))
}
pub fn listen(&mut self, peer: &C) -> Option<Reservation<C>> {
self.reserve(Metadata::Listener(peer.clone()))
}
pub fn get_random_bit_vec(&mut self) -> Option<types::BitVec> {
let (&index, entry) = self.peer_sets.iter().choose(&mut self.context)?;
Some(types::BitVec {
index,
bits: entry.primary.knowledge(),
})
}
fn is_blocked(&self, peer: &C) -> bool {
self.blocked
.get(peer)
.is_some_and(|t| t > self.context.current())
}
pub fn block(&mut self, peer: &C) {
if self.is_blocked(peer) {
return;
}
if let Some(record) = self.peers.get(peer) {
if !record.is_blockable() {
return;
}
}
let blocked_until = self.context.current() + self.block_duration;
self.blocked.put(peer.clone(), blocked_until);
let _ = self
.metrics
.blocked
.get_or_create(&metrics::Peer::new(peer))
.try_set(blocked_until.epoch_millis());
}
pub fn all(&self) -> TrackedPeers<C> {
let mut primary = Vec::new();
let mut secondary = Vec::new();
for (k, record) in &self.peers {
if record.primary_sets() > 0 {
primary.push(k.clone());
} else if record.secondary_sets() > 0 {
secondary.push(k.clone());
}
}
TrackedPeers::new(
OrderedSet::from_iter_dedup(primary),
OrderedSet::from_iter_dedup(secondary),
)
}
pub fn info(&self, peer: &C) -> Option<Info<C>> {
self.peers.get(peer).and_then(|r| r.sharable())
}
pub fn infos(&self, bit_vec: types::BitVec) -> Option<Vec<types::Info<C>>> {
let Some(entry) = self.peer_sets.get(&bit_vec.index) else {
debug!(index = bit_vec.index, "requested peer set not found");
return Some(vec![]);
};
if bit_vec.bits.len() != entry.primary.len() as u64 {
debug!(
index = bit_vec.index,
expected = entry.primary.len(),
actual = bit_vec.bits.len(),
"bit vector length mismatch"
);
return None;
}
let peers: Vec<_> = bit_vec
.bits
.iter()
.enumerate()
.filter_map(|(i, b)| {
let peer = (!b).then_some(&entry.primary[i])?; let info = self.peers.get(peer).and_then(|r| r.sharable());
info.filter(|i| i.timestamp <= self.context.current().epoch_millis())
})
.collect();
Some(peers)
}
pub fn eligible(&self, peer: &C) -> bool {
!self.is_blocked(peer) && self.peers.get(peer).is_some_and(|r| r.eligible())
}
pub fn dialable(&self) -> Dialable<C> {
let now = self.context.current();
let mut next_query_at: Option<SystemTime> = None;
let mut peers = Vec::new();
for (peer, record) in &self.peers {
if let Some(blocked_until) = self.blocked.get(peer).filter(|t| *t > now) {
next_query_at = earliest(next_query_at, blocked_until);
continue;
}
match record.dialable(now, self.allow_private_ips, self.allow_dns) {
DialStatus::Now => peers.push(peer.clone()),
DialStatus::After(t) => {
next_query_at = earliest(next_query_at, t);
}
DialStatus::Unavailable => {}
}
}
peers.sort();
Dialable {
peers,
next_query_at,
}
}
pub fn acceptable(&self, peer: &C) -> bool {
!self.is_blocked(peer) && self.peers.get(peer).is_some_and(|r| r.acceptable())
}
pub fn unblock_expired(&mut self) {
let now = self.context.current();
while let Some((_, &blocked_until)) = self.blocked.peek() {
if blocked_until > now {
break;
}
let (peer, _) = self.blocked.pop().unwrap();
debug!(?peer, "unblocked peer");
self.metrics.blocked.remove(&metrics::Peer::new(&peer));
if let Some(record) = self.peers.get(&peer) {
let want = record.want(self.dial_fail_limit);
for entry in self.peer_sets.values_mut() {
entry.primary.update(&peer, !want);
}
}
}
}
pub async fn wait_for_unblock(&self) {
match self.blocked.peek() {
Some((_, &time)) => self.context.sleep_until(time).await,
None => futures::future::pending().await,
}
}
#[cfg(test)]
pub fn blocked(&self) -> usize {
self.blocked.len()
}
fn reserve(&mut self, metadata: Metadata<C>) -> Option<Reservation<C>> {
let peer = metadata.public_key();
if !self.eligible(peer) {
return None;
}
let record = self.peers.get_mut(peer).unwrap();
match record.reserve(&mut self.context, self.peer_connection_cooldown) {
ReserveResult::Reserved => {
self.metrics.reserved.inc();
Some(Reservation::new(metadata, self.releaser.clone()))
}
ReserveResult::RateLimited => {
self.metrics
.limits
.get_or_create(&metrics::Peer::new(peer))
.inc();
None
}
ReserveResult::Unavailable => None,
}
}
fn delete_if_needed(&mut self, peer: &C) -> bool {
let Some(record) = self.peers.get(peer) else {
return false;
};
if !record.deletable() {
return false;
}
self.peers.remove(peer);
self.metrics.tracked.dec();
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::authenticated::{discovery::types, mailbox::UnboundedMailbox};
use commonware_cryptography::{secp256r1::standard::PrivateKey, Signer};
use commonware_runtime::{deterministic, Clock, Metrics, Runner};
use commonware_utils::{
bitmap::BitMap, ordered::Set as OrderedSet, NZUsize, SystemTimeExt, TryCollect,
};
use std::net::SocketAddr;
const NAMESPACE: &[u8] = b"test";
fn test_socket() -> SocketAddr {
SocketAddr::from(([8, 8, 8, 8], 8080))
}
fn create_myself_info<S>(
signer: &S,
socket: SocketAddr,
timestamp: u64,
) -> types::Info<S::PublicKey>
where
S: commonware_cryptography::Signer,
{
types::Info::sign(signer, NAMESPACE, socket, timestamp)
}
fn metric_value(metrics: &str, name: &str, peer: &str) -> Option<i64> {
metrics
.lines()
.find(|line| line.starts_with(&format!("{name}{{peer=\"{peer}\"}} ")))
.and_then(|line| line.split_whitespace().nth(1))
.and_then(|value| value.parse::<i64>().ok())
}
#[test]
fn test_block_myself_no_panic_on_expiry() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_pk = signer.public_key();
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
directory.block(&my_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&my_pk))
.is_none(),
"Blocking myself should not create metric entry"
);
assert_eq!(directory.blocked(), 0, "No peers should be blocked");
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
});
}
#[test]
fn test_secondary_sets_remain_until_eviction() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(2),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let primary_0 = PrivateKey::from_seed(1).public_key();
let primary_1 = PrivateKey::from_seed(2).public_key();
let primary_2 = PrivateKey::from_seed(3).public_key();
let secondary_0 = PrivateKey::from_seed(4).public_key();
let secondary_1 = PrivateKey::from_seed(5).public_key();
runtime.start(|context| async move {
let mut directory = Directory::init(context, vec![], my_info, config, releaser);
assert!(directory.track(
0,
TrackedPeers::new(
[primary_0].try_into().unwrap(),
[secondary_0.clone()].try_into().unwrap(),
),
));
assert!(directory.eligible(&secondary_0));
assert!(directory.track(
1,
TrackedPeers::new(
[primary_1].try_into().unwrap(),
[secondary_1.clone()].try_into().unwrap(),
),
));
assert!(directory.eligible(&secondary_0));
assert!(directory.eligible(&secondary_1));
assert!(directory.track(
2,
TrackedPeers::from(OrderedSet::try_from([primary_2]).unwrap()),
));
assert!(!directory.peers.contains_key(&secondary_0));
assert!(directory.eligible(&secondary_1));
});
}
#[test]
fn test_track_primary_secondary_overlap_deduplicates() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_a = PrivateKey::from_seed(1).public_key();
let pk_b = PrivateKey::from_seed(2).public_key();
let pk_c = PrivateKey::from_seed(3).public_key();
runtime.start(|context| async move {
let mut directory = Directory::init(context, vec![], my_info, config, releaser);
assert!(directory.track(
0,
TrackedPeers::new(
[pk_a.clone(), pk_b.clone()].try_into().unwrap(),
[pk_b.clone(), pk_c.clone()].try_into().unwrap(),
),
));
let peer_set = directory.get_peer_set(&0).unwrap();
assert_eq!(peer_set.secondary.len(), 1);
assert!(peer_set.secondary.position(&pk_c).is_some());
assert!(peer_set.secondary.position(&pk_b).is_none());
assert_eq!(directory.peers.get(&pk_b).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_b).unwrap().secondary_sets(), 0);
assert_eq!(directory.peers.get(&pk_c).unwrap().secondary_sets(), 1);
let latest = directory.latest_update().unwrap();
assert!(latest.latest.secondary.position(&pk_b).is_none());
assert!(latest.latest.primary.position(&pk_b).is_some());
let agg = directory.all();
assert!(agg.primary.position(&pk_b).is_some());
assert!(agg.secondary.position(&pk_b).is_none());
});
}
#[test]
fn test_demotion_from_primary_to_secondary() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(2),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_x = PrivateKey::from_seed(1).public_key();
let pk_y = PrivateKey::from_seed(2).public_key();
runtime.start(|context| async move {
let mut directory = Directory::init(context, vec![], my_info, config, releaser);
assert!(directory.track(
0,
TrackedPeers::new(
OrderedSet::try_from([pk_x.clone()]).unwrap(),
OrderedSet::try_from([pk_y.clone()]).unwrap(),
),
));
assert_eq!(directory.peers.get(&pk_x).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_x).unwrap().secondary_sets(), 0);
assert_eq!(directory.peers.get(&pk_y).unwrap().primary_sets(), 0);
assert_eq!(directory.peers.get(&pk_y).unwrap().secondary_sets(), 1);
assert!(directory.track(
1,
TrackedPeers::new(
OrderedSet::try_from([pk_y.clone()]).unwrap(),
OrderedSet::try_from([pk_x.clone()]).unwrap(),
),
));
assert_eq!(directory.peers.get(&pk_x).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_x).unwrap().secondary_sets(), 1);
assert_eq!(directory.peers.get(&pk_y).unwrap().primary_sets(), 1);
assert_eq!(directory.peers.get(&pk_y).unwrap().secondary_sets(), 1);
let agg = directory.all();
assert!(agg.primary.position(&pk_x).is_some());
assert!(agg.primary.position(&pk_y).is_some());
assert!(agg.secondary.is_empty());
assert!(directory.track(
2,
TrackedPeers::new(
OrderedSet::try_from([pk_y.clone()]).unwrap(),
OrderedSet::try_from([pk_x.clone()]).unwrap(),
),
));
assert_eq!(directory.peers.get(&pk_x).unwrap().primary_sets(), 0);
assert_eq!(directory.peers.get(&pk_x).unwrap().secondary_sets(), 2);
assert_eq!(directory.peers.get(&pk_y).unwrap().primary_sets(), 2);
assert_eq!(directory.peers.get(&pk_y).unwrap().secondary_sets(), 0);
let agg = directory.all();
assert!(agg.primary.position(&pk_y).is_some());
assert!(agg.secondary.position(&pk_x).is_some());
assert!(agg.primary.position(&pk_x).is_none());
assert!(agg.secondary.position(&pk_y).is_none());
});
}
#[test]
fn test_all_cross_index_primary_wins_for_overlap_peer() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_a = PrivateKey::from_seed(31).public_key();
let pk_b = PrivateKey::from_seed(32).public_key();
let pk_overlap = PrivateKey::from_seed(33).public_key();
let pk_sec = PrivateKey::from_seed(34).public_key();
runtime.start(|context| async move {
let mut directory = Directory::init(context, vec![], my_info, config, releaser);
assert!(directory.track(
0,
TrackedPeers::from(
OrderedSet::try_from([pk_a.clone(), pk_overlap.clone()]).unwrap(),
),
));
assert!(directory.track(
1,
TrackedPeers::new(
[pk_b.clone()].try_into().unwrap(),
[pk_overlap.clone(), pk_sec.clone()].try_into().unwrap(),
),
));
let agg = directory.all();
assert!(
agg.primary.position(&pk_overlap).is_some(),
"any primary membership across tracked sets -> aggregate primary only"
);
assert!(
agg.secondary.position(&pk_overlap).is_none(),
"aggregate secondary must not duplicate keys that have a primary role somewhere"
);
assert!(
agg.secondary.position(&pk_sec).is_some(),
"peers who are only secondary across sets stay under aggregate secondary"
);
});
}
#[test]
fn test_block_nonexistent_peer_then_add_to_set() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let unknown_pk = PrivateKey::from_seed(99).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking nonexistent peer should create metric entry"
);
assert_eq!(directory.blocked(), 1, "One peer should be blocked");
assert!(
!directory.peers.contains_key(&unknown_pk),
"Peer should not be in peers yet"
);
let peer_set: OrderedSet<_> = [unknown_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
assert!(
directory.peers.contains_key(&unknown_pk),
"Peer should be in peers after tracking"
);
assert!(
directory.blocked.contains(&unknown_pk),
"Peer should be blocked after tracking"
);
assert!(
!directory.eligible(&unknown_pk),
"Blocked peer should not be eligible"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_none(),
"Blocked metric should be removed after unblock"
);
assert!(
directory.eligible(&unknown_pk),
"Peer should be eligible after unblock"
);
});
}
#[test]
fn test_connected_metric_tracks_active_peers() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration: Duration::from_secs(100),
};
let pk_1 = PrivateKey::from_seed(1).public_key();
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let _reservation = directory.listen(&pk_1).expect("peer should reserve");
let connected_at: i64 = context.current().epoch_millis().try_into().unwrap();
directory.connect(&pk_1, false);
context.sleep(Duration::from_secs(5)).await;
let metrics = context.encode();
assert_eq!(
metric_value(&metrics, "connected", &pk_1.to_string()),
Some(connected_at)
);
directory.release(Metadata::Listener(pk_1.clone()));
let metrics = context.encode();
assert_eq!(metric_value(&metrics, "connected", &pk_1.to_string()), None);
});
}
#[test]
fn test_block_peer_multiple_times() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let unknown_pk = PrivateKey::from_seed(99).public_key();
let registered_pk = PrivateKey::from_seed(50).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: false,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> =
[registered_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_none(),
"Peer should not be blocked initially"
);
directory.block(®istered_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_some(),
"Tracked peer should be marked blocked"
);
directory.block(®istered_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_some(),
"Blocking same tracked peer twice should not change metric"
);
directory.block(®istered_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(®istered_pk))
.is_some(),
"Blocking same tracked peer thrice should not change metric"
);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Unknown peer should be marked blocked"
);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking same nonexistent peer twice should not change metric"
);
directory.block(&unknown_pk);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking same nonexistent peer thrice should not change metric"
);
});
}
#[test]
fn test_blocked_peer_remains_blocked_on_update() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
directory.block(&peer_pk);
assert!(
directory.blocked.contains(&peer_pk),
"Peer should be blocked after call to block"
);
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info.clone()]);
assert!(
directory.blocked.contains(&peer_pk),
"Peer should remain blocked after update"
);
let record = directory.peers.get(&peer_pk).unwrap();
assert!(
record.ingress().is_some(),
"Peer info should be updated while blocked"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
!directory.blocked.contains(&peer_pk),
"Peer should be unblocked after expiry"
);
let record = directory.peers.get(&peer_pk).unwrap();
assert!(
record.ingress().is_some(),
"Unblocked peer should have the updated info"
);
});
}
#[test]
fn test_unblock_expired() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_pk = PrivateKey::from_seed(1).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
directory.block(&peer_pk);
assert!(directory.blocked.contains(&peer_pk));
assert_eq!(directory.blocked(), 1, "Should have one blocked peer");
let first_expiry = directory
.blocked
.get(&peer_pk)
.expect("peer should be blocked");
directory.unblock_expired();
assert!(
directory.blocked.contains(&peer_pk),
"Peer should still be blocked before expiry"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
!directory.blocked.contains(&peer_pk),
"Peer should be unblocked after expiry"
);
assert_eq!(directory.blocked(), 0, "No more blocked peers");
directory.block(&peer_pk);
assert_eq!(directory.blocked(), 1, "Should have one blocked peer again");
let second_expiry = directory
.blocked
.get(&peer_pk)
.expect("peer should be blocked again");
assert!(
second_expiry > first_expiry,
"Re-blocking should have a later expiry time"
);
});
}
#[test]
fn test_unblock_expired_peer_removed_and_readded() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let pk_1 = PrivateKey::from_seed(1).public_key();
let pk_2 = PrivateKey::from_seed(2).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(1), dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none(),
"pk_1 should not be blocked initially"
);
let peer_set: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
directory.block(&pk_1);
assert!(directory.blocked.contains(&pk_1));
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"pk_1 should be marked blocked"
);
let peer_set_2: OrderedSet<_> = [pk_2.clone()].into_iter().try_collect().unwrap();
directory.track(1, TrackedPeers::from(peer_set_2));
assert!(
!directory.peers.contains_key(&pk_1),
"pk_1 should be removed"
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"blocked metric should persist after peer removal"
);
let peer_set_3: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap();
directory.track(2, TrackedPeers::from(peer_set_3));
assert!(
directory.blocked.contains(&pk_1),
"Re-added pk_1 should still be blocked"
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"blocked metric should persist after re-add"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
!directory.blocked.contains(&pk_1),
"pk_1 should no longer be blocked"
);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none(),
"blocked metric should be removed after unblock"
);
});
}
#[test]
fn test_blocked_metric_multiple_peers() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let pk_1 = PrivateKey::from_seed(1).public_key();
let pk_2 = PrivateKey::from_seed(2).public_key();
let pk_3 = PrivateKey::from_seed(3).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [pk_1.clone(), pk_2.clone(), pk_3.clone()]
.into_iter()
.try_collect()
.unwrap();
directory.track(0, TrackedPeers::from(peer_set));
assert_eq!(directory.blocked(), 0);
directory.block(&pk_1);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some());
directory.block(&pk_2);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_2))
.is_some());
directory.block(&pk_3);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_3))
.is_some());
assert_eq!(directory.blocked(), 3);
directory.block(&pk_1);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some());
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_2))
.is_none());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_3))
.is_none());
assert_eq!(directory.blocked(), 0);
});
}
#[test]
fn test_blocked_peer_not_dialable() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
assert!(
directory.dialable().peers.contains(&peer_pk),
"Peer should be dialable before blocking"
);
directory.block(&peer_pk);
assert!(
!directory.dialable().peers.contains(&peer_pk),
"Blocked peer should not be dialable"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.dialable().peers.contains(&peer_pk),
"Peer should be dialable after unblock"
);
});
}
#[test]
fn test_blocked_peer_not_acceptable() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
assert!(
directory.acceptable(&peer_pk),
"Peer should be acceptable before blocking"
);
directory.block(&peer_pk);
assert!(
!directory.acceptable(&peer_pk),
"Blocked peer should not be acceptable"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.acceptable(&peer_pk),
"Peer should be acceptable after unblock"
);
});
}
#[test]
fn test_blocked_peer_not_eligible() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_pk = PrivateKey::from_seed(1).public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
assert!(
directory.eligible(&peer_pk),
"Peer should be eligible before blocking"
);
directory.block(&peer_pk);
assert!(
!directory.eligible(&peer_pk),
"Blocked peer should not be eligible"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.eligible(&peer_pk),
"Peer should be eligible after unblock"
);
});
}
#[test]
fn test_blocked_peer_info_not_sharable() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
let reservation = directory.dial(&peer_pk);
assert!(reservation.is_some(), "Should be able to dial peer");
directory.connect(&peer_pk, true);
assert!(
directory.info(&peer_pk).is_some(),
"Connected peer's info should be sharable"
);
directory.block(&peer_pk);
directory.release(Metadata::Dialer(
peer_pk.clone(),
Ingress::Socket(test_socket()),
));
assert!(
directory.info(&peer_pk).is_none(),
"Blocked peer's info should not be sharable after disconnect"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert!(
directory.info(&peer_pk).is_none(),
"Unblocked but disconnected peer's info should not be sharable"
);
});
}
#[test]
fn test_bootstrapper_remains_persistent_after_blocking() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let bootstrapper_pk = PrivateKey::from_seed(1).public_key();
let bootstrapper_ingress = Ingress::Socket(SocketAddr::from(([1, 2, 3, 4], 8080)));
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(
context.clone(),
vec![(bootstrapper_pk.clone(), bootstrapper_ingress)],
my_info,
config,
releaser,
);
let record = directory.peers.get(&bootstrapper_pk).unwrap();
assert!(
!record.deletable(),
"Bootstrapper should not be deletable (persistent)"
);
directory.block(&bootstrapper_pk);
assert!(
directory.blocked.contains(&bootstrapper_pk),
"Bootstrapper should be blocked"
);
let record = directory.peers.get(&bootstrapper_pk).unwrap();
assert!(
!record.deletable(),
"Bootstrapper should still not be deletable after blocking"
);
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
let record = directory.peers.get(&bootstrapper_pk).unwrap();
assert!(
!record.deletable(),
"Bootstrapper should remain not deletable after unblock"
);
});
}
#[test]
fn test_infos_excludes_blocked_peers() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer_1 = PrivateKey::from_seed(1);
let peer_pk_1 = peer_signer_1.public_key();
let peer_signer_2 = PrivateKey::from_seed(2);
let peer_pk_2 = peer_signer_2.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(100);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(100),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk_1.clone(), peer_pk_2.clone()]
.into_iter()
.try_collect()
.unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info_1 = types::Info::sign(&peer_signer_1, NAMESPACE, test_socket(), 0);
let peer_info_2 = types::Info::sign(
&peer_signer_2,
NAMESPACE,
SocketAddr::from(([9, 9, 9, 9], 9090)),
0,
);
directory.update_peers(vec![peer_info_1, peer_info_2]);
let reservation_1 = directory.dial(&peer_pk_1);
assert!(reservation_1.is_some());
directory.connect(&peer_pk_1, true);
let reservation_2 = directory.dial(&peer_pk_2);
assert!(reservation_2.is_some());
directory.connect(&peer_pk_2, true);
let bit_vec = types::BitVec {
index: 0,
bits: BitMap::zeroes(2),
};
let infos = directory.infos(bit_vec.clone()).unwrap();
assert_eq!(infos.len(), 2, "Should have info for both peers");
directory.block(&peer_pk_1);
directory.release(Metadata::Dialer(
peer_pk_1.clone(),
Ingress::Socket(test_socket()),
));
let infos = directory.infos(bit_vec).unwrap();
assert_eq!(
infos.len(),
1,
"Should only have info for unblocked connected peer"
);
assert_eq!(
infos[0].public_key, peer_pk_2,
"Returned info should be for peer 2"
);
});
}
#[test]
fn test_reservation_rate_limits_redial() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let cooldown = Duration::from_secs(1);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: cooldown,
block_duration: Duration::from_secs(100),
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
let reservation = directory.dial(&peer_pk).expect("first dial should succeed");
drop(reservation);
directory.release(Metadata::Dialer(
peer_pk.clone(),
Ingress::Socket(test_socket()),
));
assert!(
directory.dial(&peer_pk).is_none(),
"should be rate-limited immediately after release"
);
assert!(
!directory.dialable().peers.contains(&peer_pk),
"should not appear in dialable list during rate-limit window"
);
context.sleep(cooldown * 2).await;
assert!(directory.dialable().peers.contains(&peer_pk));
directory
.dial(&peer_pk)
.expect("should succeed after interval");
});
}
#[test]
fn test_dialable_next_query_at_reflects_rate_limit() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let cooldown = Duration::from_secs(1);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: cooldown,
block_duration: Duration::from_secs(100),
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
let reservation = directory.dial(&peer_pk).expect("first dial should succeed");
let reserved_at = context.current();
drop(reservation);
directory.release(Metadata::Dialer(
peer_pk.clone(),
Ingress::Socket(test_socket()),
));
let dialable = directory.dialable();
assert!(!dialable.peers.contains(&peer_pk));
let nqa = dialable.next_query_at.unwrap();
assert!(nqa >= reserved_at + cooldown);
assert!(nqa <= reserved_at + cooldown * 2);
});
}
#[test]
fn test_dialable_empty() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(200),
block_duration: Duration::from_secs(100),
};
runtime.start(|context| async move {
let directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let dialable = directory.dialable();
assert!(dialable.peers.is_empty());
assert_eq!(dialable.next_query_at, None);
});
}
#[test]
fn test_dialable_next_query_at_includes_blocked() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(3600);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(200),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
directory.block(&peer_pk);
let dialable = directory.dialable();
assert!(dialable.peers.is_empty());
assert_eq!(
dialable.next_query_at,
Some(context.current() + block_duration)
);
});
}
#[test]
fn test_dialable_expired_block_without_unblock() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(1);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(200),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
directory.block(&peer_pk);
assert!(directory.dialable().peers.is_empty());
context.sleep(block_duration + Duration::from_secs(1)).await;
let dialable = directory.dialable();
assert!(
dialable.peers.contains(&peer_pk),
"expired block should not prevent dialing"
);
assert_eq!(
dialable.next_query_at, None,
"expired block should not contribute a stale hint"
);
directory
.dial(&peer_pk)
.expect("expired block should not prevent reservation");
});
}
#[test]
fn test_reblock_after_expired_block_without_unblock() {
let runtime = deterministic::Runner::default();
let signer = PrivateKey::from_seed(0);
let my_info = create_myself_info(&signer, test_socket(), 100);
let peer_signer = PrivateKey::from_seed(1);
let peer_pk = peer_signer.public_key();
let (tx, _rx) = UnboundedMailbox::new();
let releaser = Releaser::new(tx);
let block_duration = Duration::from_secs(1);
let config = Config {
allow_private_ips: true,
allow_dns: true,
max_sets: NZUsize!(3),
dial_fail_limit: 1,
peer_connection_cooldown: Duration::from_millis(200),
block_duration,
};
runtime.start(|context| async move {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);
let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap();
directory.track(0, TrackedPeers::from(peer_set));
let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200);
directory.update_peers(vec![peer_info]);
directory.block(&peer_pk);
assert!(directory.dialable().peers.is_empty());
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.block(&peer_pk);
assert!(
directory.dialable().peers.is_empty(),
"re-blocked peer should not be dialable"
);
assert!(
directory.dial(&peer_pk).is_none(),
"re-blocked peer should not be reservable"
);
});
}
}