use super::{
directory::{self, Directory},
ingress::{Mailbox, Message, Oracle},
Config,
};
use crate::{
authenticated::lookup::actors::{listener, peer, tracker::ingress::Releaser},
PeerSetUpdate,
};
use commonware_actor::mailbox;
use commonware_cryptography::Signer;
use commonware_macros::select_loop;
use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, Metrics as RuntimeMetrics, Spawner,
};
use commonware_utils::channel::{fallible::FallibleExt, mpsc};
use rand::Rng;
use std::collections::HashMap;
use tracing::debug;
pub struct Actor<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> {
context: ContextCell<E>,
receiver: mailbox::Receiver<Message<C::PublicKey>>,
listener: listener::Mailbox,
directory: Directory<E, C::PublicKey>,
mailboxes: HashMap<C::PublicKey, peer::Mailbox>,
subscribers: Vec<mpsc::UnboundedSender<PeerSetUpdate<C::PublicKey>>>,
}
impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> Actor<E, C> {
#[allow(clippy::type_complexity)]
pub fn new(context: E, cfg: Config<C>) -> (Self, Mailbox<C::PublicKey>, Oracle<C::PublicKey>) {
let directory_cfg = directory::Config {
max_sets: cfg.tracked_peer_sets,
peer_connection_cooldown: cfg.peer_connection_cooldown,
allow_private_ips: cfg.allow_private_ips,
allow_dns: cfg.allow_dns,
bypass_ip_check: cfg.bypass_ip_check,
block_duration: cfg.block_duration,
};
let (sender, receiver) = mailbox::new(context.child("mailbox"), cfg.mailbox_size);
let oracle = Oracle::new(sender.clone());
let releaser = Releaser::new(sender.clone());
let directory = Directory::init(
context.child("directory"),
cfg.crypto.public_key(),
directory_cfg,
releaser,
);
(
Self {
context: ContextCell::new(context),
receiver,
directory,
listener: cfg.listener,
mailboxes: HashMap::new(),
subscribers: Vec::new(),
},
Mailbox::new(sender),
oracle,
)
}
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run())
}
async fn run(mut self) {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping tracker");
},
_ = self.directory.wait_for_unblock() => {
if self.directory.unblock_expired() {
let _ = self.listener.set(self.directory.listenable());
}
},
Some(msg) = self.receiver.recv() else {
debug!("mailbox closed, stopping tracker");
break;
} => {
self.handle_msg(msg);
},
}
}
fn handle_msg(&mut self, msg: Message<C::PublicKey>) {
match msg {
Message::Register { index, peers } => {
let Some(kill_peers) = self.directory.track(index, peers) else {
return;
};
for peer in kill_peers {
self.kill_peer(&peer);
}
let _ = self.listener.set(self.directory.listenable());
let update = self
.directory
.latest_update()
.expect("latest update missing after successful track");
self.subscribers
.retain(|subscriber| subscriber.send_lossy(update.clone()));
}
Message::Overwrite { peers } => {
let mut any_changed = false;
for (public_key, address) in peers {
if !self.directory.overwrite(&public_key, address) {
continue;
}
any_changed = true;
self.kill_peer(&public_key);
}
if any_changed {
let _ = self.listener.set(self.directory.listenable());
}
}
Message::PeerSet { index, responder } => {
let _ = responder.send(self.directory.get_peer_set(&index));
}
Message::Subscribe { responder } => {
let (sender, receiver) = mpsc::unbounded_channel();
if let Some(update) = self.directory.latest_update() {
sender.send_lossy(update);
}
self.subscribers.push(sender);
let _ = responder.send(receiver);
}
Message::Connect { public_key, peer } => {
if !self.directory.eligible(&public_key) {
peer.kill();
return;
}
if !self.directory.connect(&public_key) {
peer.kill();
return;
}
self.mailboxes.insert(public_key, peer);
}
Message::Dialable { responder } => {
let _ = responder.send(self.directory.dialable());
}
Message::Dial {
public_key,
reservation,
} => {
let _ = reservation.send(self.directory.dial(&public_key));
}
Message::Acceptable {
public_key,
source_ip,
responder,
} => {
let _ = responder.send(self.directory.acceptable(&public_key, source_ip));
}
Message::Listen {
public_key,
source_ip,
reservation,
} => {
let _ = reservation.send(self.directory.listen(&public_key, source_ip));
}
Message::Block { public_key } => {
self.directory.block(&public_key);
self.kill_peer(&public_key);
let _ = self.listener.set(self.directory.listenable());
}
Message::Release { metadata } => {
self.mailboxes.remove(metadata.public_key());
self.directory.release(metadata);
}
}
}
fn kill_peer(&mut self, public_key: &C::PublicKey) {
if let Some(peer) = self.mailboxes.remove(public_key) {
peer.kill();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
authenticated::lookup::actors::peer, AddressableManager, AddressableTrackedPeers, Ingress,
Provider,
};
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Signer,
};
use commonware_runtime::{
deterministic::{self},
Clock, Runner, Supervisor as _,
};
use commonware_utils::{
ordered::{Map, Set},
NZUsize,
};
use futures::{FutureExt, StreamExt};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
time::Duration,
};
fn test_config<C: Signer>(crypto: C, bypass_ip_check: bool) -> (Config<C>, listener::Updates) {
let (registered_ips_sender, registered_ips_receiver) = listener::Mailbox::new();
(
Config {
crypto,
mailbox_size: NZUsize!(1024),
tracked_peer_sets: NZUsize!(2),
peer_connection_cooldown: Duration::from_millis(200),
allow_private_ips: true,
allow_dns: true,
bypass_ip_check,
listener: registered_ips_sender,
block_duration: Duration::from_secs(100),
},
registered_ips_receiver,
)
}
fn new_signer_and_pk(seed: u64) -> (PrivateKey, PublicKey) {
let signer = PrivateKey::from_seed(seed);
let pk = signer.public_key();
(signer, pk)
}
struct TestHarness {
mailbox: Mailbox<PublicKey>,
oracle: Oracle<PublicKey>,
}
fn setup_actor(
runner_context: deterministic::Context,
cfg_to_clone: Config<PrivateKey>, ) -> TestHarness {
let (actor, mailbox, oracle) = Actor::new(runner_context, cfg_to_clone);
actor.start();
TestHarness { mailbox, oracle }
}
#[test]
fn test_connect_unauthorized_peer_is_killed() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness { mailbox, .. } = setup_actor(context.child("actor"), cfg);
let (_unauth_signer, unauth_pk) = new_signer_and_pk(1);
let (peer_mailbox, mut peer_receiver) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(unauth_pk.clone(), peer_mailbox);
assert!(
matches!(peer_receiver.next().await, Some(peer::Message::Kill)),
"Unauthorized peer should be killed on Connect"
);
});
}
#[test]
fn test_block_peer_standard_behavior() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg_initial);
let (_, pk) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(dialable.peers.iter().any(|peer| peer == &pk));
crate::block_peer(&mut oracle, pk.clone());
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &pk));
});
}
#[test]
fn test_block_peer_already_blocked_is_noop() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg_initial);
let (_, pk1) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk1.clone(), addr.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
crate::block_peer(&mut oracle, pk1.clone());
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &pk1));
crate::block_peer(&mut oracle, pk1.clone());
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &pk1));
});
}
#[test]
fn test_block_peer_non_existent_is_noop() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial);
let (_s1_signer, pk_non_existent) = new_signer_and_pk(100);
crate::block_peer(&mut oracle, pk_non_existent);
context.sleep(Duration::from_millis(10)).await;
});
}
#[test]
fn test_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1001);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(2);
let peer_addr2 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 1002);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(3);
let peer_addr3 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 3).into(), 1003);
let (cfg_initial, _) = test_config(peer_signer, false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg_initial);
assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await);
assert!(!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await);
assert!(!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await);
oracle.track(
0,
Map::<_, crate::Address>::try_from([
(peer_pk.clone(), peer_addr.into()),
(peer_pk2.clone(), peer_addr2.into()),
])
.unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
assert!(!mailbox.acceptable(peer_pk, peer_addr.ip()).await);
assert!(mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await);
assert!(!mailbox.acceptable(peer_pk2, peer_addr.ip()).await);
assert!(!mailbox.acceptable(peer_pk3, peer_addr3.ip()).await);
});
}
#[test]
fn test_acceptable_bypass_ip_check() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1001);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(2);
let peer_addr2 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 1002);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(3);
let peer_addr3 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 3).into(), 1003);
let (cfg, _) = test_config(peer_signer, true);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
assert!(
!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await,
"Unknown peer should not be acceptable"
);
oracle.track(
0,
Map::<_, crate::Address>::try_from([
(peer_pk.clone(), peer_addr.into()),
(peer_pk2.clone(), peer_addr2.into()),
])
.unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
assert!(
mailbox.acceptable(peer_pk2.clone(), peer_addr.ip()).await,
"Registered peer with wrong IP should be acceptable with bypass_ip_check=true"
);
assert!(
!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await,
"Self should not be acceptable"
);
crate::block_peer(&mut oracle, peer_pk2.clone());
context.sleep(Duration::from_millis(10)).await;
assert!(
!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await,
"Blocked peer should not be acceptable"
);
});
}
#[test]
fn test_listen() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg_initial);
let (_peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080);
let reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await;
assert!(reservation.is_none());
oracle.track(
0,
Map::<_, crate::Address>::try_from([(peer_pk.clone(), peer_addr.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
assert!(mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await);
let reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await;
assert!(reservation.is_some());
assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await);
let failed_reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await;
assert!(failed_reservation.is_none());
drop(reservation.unwrap());
context.sleep(Duration::from_millis(1_010)).await;
let reservation_after_release = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await;
assert!(reservation_after_release.is_some());
});
}
#[test]
fn test_dialable_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg_initial);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(boot_pk.clone(), boot_addr.into())]).unwrap(),
);
let dialable = mailbox.dialable().await;
assert_eq!(dialable.peers.len(), 1);
assert_eq!(dialable.peers[0], boot_pk);
});
}
#[test]
fn test_dial_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg_initial);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(boot_pk.clone(), boot_addr.into())]).unwrap(),
);
let result = mailbox.dial(boot_pk.clone()).await;
assert!(result.is_some());
if let Some((res, ingress)) = result {
match res.metadata() {
crate::authenticated::lookup::actors::tracker::Metadata::Dialer(pk) => {
assert_eq!(pk, &boot_pk);
}
_ => panic!("Expected Dialer metadata"),
}
assert_eq!(ingress, Ingress::Socket(boot_addr));
}
let (_unknown_signer, unknown_pk) = new_signer_and_pk(100);
let no_reservation = mailbox.dial(unknown_pk).await;
assert!(no_reservation.is_none());
});
}
#[test]
fn test_secondary_peers_are_acceptable_but_not_primary_or_dialable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let mut subscription = oracle.subscribe().await;
let (_primary_signer, primary_pk) = new_signer_and_pk(1);
let primary_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001);
let (_secondary_signer, secondary_pk) = new_signer_and_pk(2);
let secondary_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9002);
oracle.track(
0,
AddressableTrackedPeers::new(
Map::<_, crate::Address>::try_from([(primary_pk.clone(), primary_addr.into())])
.unwrap(),
Map::<_, crate::Address>::try_from([(
secondary_pk.clone(),
secondary_addr.into(),
)])
.unwrap(),
),
);
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.primary.position(&primary_pk).is_some());
assert!(update.latest.primary.position(&secondary_pk).is_none());
assert_eq!(
update.latest.secondary,
Set::try_from([secondary_pk.clone()]).unwrap()
);
assert_eq!(update.all.primary, update.latest.primary);
assert_eq!(
update.all.secondary,
Set::try_from([secondary_pk.clone()]).unwrap()
);
let dialable = mailbox.dialable().await;
assert!(dialable.peers.iter().any(|peer| peer == &primary_pk));
assert!(!dialable.peers.iter().any(|peer| peer == &secondary_pk));
assert!(mailbox.dial(secondary_pk.clone()).await.is_none());
assert!(mailbox.acceptable(secondary_pk, secondary_addr.ip()).await);
});
}
#[test]
fn test_overlapping_primary_secondary_no_duplicate_in_subscription() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let mut subscription = oracle.subscribe().await;
let (_signer, pk) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001);
oracle.track(
0,
AddressableTrackedPeers::new(
Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(),
Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(),
),
);
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.primary.position(&pk).is_some());
assert!(
update.latest.secondary.is_empty(),
"overlap peer is deduplicated as primary only"
);
assert_eq!(update.all.primary, update.latest.primary);
assert!(
update.all.secondary.is_empty(),
"aggregate secondary excludes keys that are primary"
);
let dialable = mailbox.dialable().await;
assert!(dialable.peers.iter().any(|peer| peer == &pk));
assert!(mailbox.acceptable(pk, addr.ip()).await);
});
}
#[test]
fn test_block_clears_peer_mailbox_and_only_kills_once() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let (_peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 12345);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(peer_pk.clone(), peer_addr.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
let reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(peer_pk.clone(), peer_mailbox);
crate::block_peer(&mut oracle, peer_pk.clone());
context.sleep(Duration::from_millis(10)).await;
assert!(
matches!(peer_rx.next().await, Some(peer::Message::Kill)),
"connected peer must be killed on first Block"
);
crate::block_peer(&mut oracle, peer_pk.clone());
context.sleep(Duration::from_millis(10)).await;
assert!(
!matches!(
peer_rx.next().now_or_never(),
Some(Some(peer::Message::Kill))
),
"no kill after handle has been cleared"
);
});
}
#[test]
fn test_register_disconnects_removed_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, my_pk) = new_signer_and_pk(0);
let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 9001);
let pk_2 = new_signer_and_pk(2).1;
let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002);
let (mut cfg, mut listener_receiver) = test_config(my_sk, false);
cfg.tracked_peer_sets = NZUsize!(1);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([
(my_pk.clone(), my_addr.into()),
(pk_1.clone(), addr_1.into()),
])
.unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&my_addr.ip()));
assert!(registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
let reservation = mailbox.listen(pk_1.clone(), addr_1.ip()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk_1.clone(), peer_mailbox);
oracle.track(
1,
Map::<_, crate::Address>::try_from([(pk_2.clone(), addr_2.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&my_addr.ip()));
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(registered_ips.contains(&addr_2.ip()));
assert!(matches!(peer_rx.next().await, Some(peer::Message::Kill)),)
});
}
#[test]
fn test_register_keeps_connected_peer_present_across_rollover() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, _) = new_signer_and_pk(0);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 9001);
let pk_2 = new_signer_and_pk(2).1;
let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002);
let (mut cfg, mut listener_receiver) = test_config(my_sk, false);
cfg.tracked_peer_sets = NZUsize!(1);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
let reservation = mailbox
.listen(pk_1.clone(), addr_1.ip())
.await
.expect("peer should reserve");
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk_1.clone(), peer_mailbox);
oracle.track(
1,
Map::<_, crate::Address>::try_from([
(pk_1.clone(), addr_1.into()),
(pk_2.clone(), addr_2.into()),
])
.unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
assert!(registered_ips.contains(&addr_2.ip()));
assert!(
!matches!(peer_rx.next().now_or_never(), Some(Some(peer::Message::Kill))),
"connected peer present in the new set should not be killed when the old set rolls off"
);
assert_eq!(reservation.metadata().public_key(), &pk_1);
});
}
#[test]
fn test_reserved_removed_peer_rejected_on_connect() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, _) = new_signer_and_pk(0);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 9001);
let pk_2 = new_signer_and_pk(2).1;
let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002);
let (mut cfg, mut listener_receiver) = test_config(my_sk, false);
cfg.tracked_peer_sets = NZUsize!(1);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
let reservation = mailbox
.listen(pk_1.clone(), addr_1.ip())
.await
.expect("peer should reserve");
assert_eq!(reservation.metadata().public_key(), &pk_1);
oracle.track(
1,
Map::<_, crate::Address>::try_from([(pk_2.clone(), addr_2.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(registered_ips.contains(&addr_2.ip()));
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk_1.clone(), peer_mailbox);
assert!(
matches!(peer_rx.next().await, Some(peer::Message::Kill)),
"connect rejection is signaled by killing the peer"
);
});
}
#[test]
fn test_reserved_peer_killed_on_connect_after_tracked_address_change() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, _) = new_signer_and_pk(0);
let pk = new_signer_and_pk(1).1;
let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
let (mut cfg, mut listener_receiver) = test_config(my_sk, false);
cfg.tracked_peer_sets = NZUsize!(2);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_a.ip()));
let (reservation, ingress) =
mailbox.dial(pk.clone()).await.expect("peer should reserve");
assert_eq!(reservation.metadata().public_key(), &pk);
assert_eq!(ingress, Ingress::Socket(addr_a));
oracle.track(
1,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_b.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_a.ip()));
assert!(registered_ips.contains(&addr_b.ip()));
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk.clone(), peer_mailbox);
assert!(
matches!(peer_rx.next().await, Some(peer::Message::Kill)),
"connect rejection is signaled by killing the peer"
);
});
}
#[test]
fn test_reserved_peer_killed_on_connect_after_overwrite() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, _) = new_signer_and_pk(0);
let pk = new_signer_and_pk(1).1;
let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
let (cfg, mut listener_receiver) = test_config(my_sk, false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_a.ip()));
let (reservation, ingress) =
mailbox.dial(pk.clone()).await.expect("peer should reserve");
assert_eq!(reservation.metadata().public_key(), &pk);
assert_eq!(ingress, Ingress::Socket(addr_a));
oracle.overwrite([(pk.clone(), addr_b.into())].try_into().unwrap());
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_a.ip()));
assert!(registered_ips.contains(&addr_b.ip()));
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk.clone(), peer_mailbox);
assert!(
matches!(peer_rx.next().await, Some(peer::Message::Kill)),
"connect rejection is signaled by killing the peer"
);
});
}
#[test]
fn test_stale_inbound_source_rejected_after_overwrite() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, _) = new_signer_and_pk(0);
let pk = new_signer_and_pk(1).1;
let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
let (cfg, mut listener_receiver) = test_config(my_sk, false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_a.ip()));
assert!(mailbox.acceptable(pk.clone(), addr_a.ip()).await);
oracle.overwrite([(pk.clone(), addr_b.into())].try_into().unwrap());
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_a.ip()));
assert!(registered_ips.contains(&addr_b.ip()));
let reservation = mailbox.listen(pk.clone(), addr_a.ip()).await;
assert!(
reservation.is_none(),
"inbound handshake from the old source IP must be rejected"
);
let reservation = mailbox.listen(pk.clone(), addr_b.ip()).await;
assert!(reservation.is_some());
});
}
#[test]
fn test_overwrite_triggers_listener() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, my_pk) = new_signer_and_pk(0);
let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002);
let (cfg, mut listener_receiver) = test_config(my_sk, false);
let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([
(my_pk.clone(), my_addr.into()),
(pk_1.clone(), addr_1.into()),
])
.unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
oracle.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap());
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(registered_ips.contains(&addr_2.ip()));
});
}
#[test]
fn test_overwrite_via_oracle() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr_1 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1002);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_1.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
let result = mailbox.dial(pk.clone()).await;
assert!(result.is_some());
let (_, ingress) = result.unwrap();
assert_eq!(ingress, Ingress::Socket(addr_1));
oracle.overwrite([(pk.clone(), addr_2.into())].try_into().unwrap());
context.sleep(Duration::from_millis(1010)).await;
let result = mailbox.dial(pk.clone()).await;
assert!(result.is_some());
let (_, ingress) = result.unwrap();
assert_eq!(ingress, Ingress::Socket(addr_2));
});
}
#[test]
fn test_overwrite_blocked_peer_not_in_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, my_pk) = new_signer_and_pk(0);
let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002);
let (cfg, mut listener_receiver) = test_config(my_sk, false);
let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([
(my_pk.clone(), my_addr.into()),
(pk_1.clone(), addr_1.into()),
])
.unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
crate::block_peer(&mut oracle, pk_1.clone());
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
oracle.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap());
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
});
}
#[test]
fn test_overwrite_untracked_peer_silently_ignored() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
oracle.overwrite([(pk, addr.into())].try_into().unwrap());
});
}
#[test]
fn test_overwrite_changes_acceptable_ip() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002);
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
assert!(mailbox.acceptable(pk_1.clone(), addr_1.ip()).await);
assert!(!mailbox.acceptable(pk_1.clone(), addr_2.ip()).await);
oracle.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap());
assert!(!mailbox.acceptable(pk_1.clone(), addr_1.ip()).await);
assert!(mailbox.acceptable(pk_1.clone(), addr_2.ip()).await);
});
}
#[test]
fn test_overwrite_severs_existing_connection() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_1.into())]).unwrap(),
);
context.sleep(Duration::from_millis(10)).await;
let reservation = mailbox.listen(pk.clone(), addr_1.ip()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk.clone(), peer_mailbox);
oracle.overwrite([(pk.clone(), addr_2.into())].try_into().unwrap());
assert!(matches!(peer_rx.next().await, Some(peer::Message::Kill)));
});
}
#[test]
fn test_add_set_severs_connection_on_address_change() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, mut listener_receiver) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
oracle.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(),
);
let registered_ips = listener_receiver.next().await.unwrap();
assert!(registered_ips.contains(&addr_a.ip()));
let reservation = mailbox.listen(pk.clone(), addr_a.ip()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk.clone(), peer_mailbox);
oracle.track(
1,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_b.into())]).unwrap(),
);
assert!(matches!(peer_rx.next().await, Some(peer::Message::Kill)));
let registered_ips = listener_receiver.next().await.unwrap();
assert!(!registered_ips.contains(&addr_a.ip()));
assert!(registered_ips.contains(&addr_b.ip()));
});
}
#[test]
fn test_overwrite_batch_mixed_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, mut listener_receiver) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mailbox,
mut oracle,
..
} = setup_actor(context.child("actor"), cfg);
let (_, pk_tracked) = new_signer_and_pk(1);
let (_, pk_unchanged) = new_signer_and_pk(2);
let (_, pk_untracked) = new_signer_and_pk(3);
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
let addr_unchanged = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 10, 10, 10)), 1003);
oracle.track(
0,
Map::<_, crate::Address>::try_from([
(pk_tracked.clone(), addr_1.into()),
(pk_unchanged.clone(), addr_unchanged.into()),
])
.unwrap(),
);
let _ = listener_receiver.next().await.unwrap();
let reservation = mailbox.listen(pk_tracked.clone(), addr_1.ip()).await;
assert!(reservation.is_some());
let (tracked_mailbox, mut tracked_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk_tracked.clone(), tracked_mailbox);
let reservation = mailbox
.listen(pk_unchanged.clone(), addr_unchanged.ip())
.await;
assert!(reservation.is_some());
let (unchanged_mailbox, mut unchanged_rx) = peer::Mailbox::new(NZUsize!(1));
let _ = mailbox.connect(pk_unchanged.clone(), unchanged_mailbox);
oracle.overwrite(
[
(pk_tracked.clone(), addr_2.into()),
(pk_unchanged.clone(), addr_unchanged.into()),
(pk_untracked.clone(), addr_1.into()),
]
.try_into()
.unwrap(),
);
assert!(matches!(tracked_rx.next().await, Some(peer::Message::Kill)));
assert!(
!matches!(
unchanged_rx.next().now_or_never(),
Some(Some(peer::Message::Kill))
),
"Unchanged peer should not receive kill"
);
});
}
}