use super::{
directory::{self, Directory},
ingress::{Message, Oracle},
Config,
};
use crate::{
authenticated::{
lookup::actors::{peer, tracker::ingress::Releaser},
mailbox::UnboundedMailbox,
Mailbox,
},
PeerSetUpdate,
};
use commonware_cryptography::Signer;
use commonware_macros::select_loop;
use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, Metrics as RuntimeMetrics, Spawner,
};
use commonware_utils::channel::{
fallible::{AsyncFallibleExt, FallibleExt},
mpsc,
};
use rand::Rng;
use std::{
collections::{HashMap, HashSet},
net::IpAddr,
};
use tracing::debug;
pub struct Actor<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> {
context: ContextCell<E>,
receiver: mpsc::UnboundedReceiver<Message<C::PublicKey>>,
listener: Mailbox<HashSet<IpAddr>>,
directory: Directory<E, C::PublicKey>,
mailboxes: HashMap<C::PublicKey, Mailbox<peer::Message>>,
subscribers: Vec<mpsc::UnboundedSender<PeerSetUpdate<C::PublicKey>>>,
}
impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> Actor<E, C> {
#[allow(clippy::type_complexity)]
pub fn new(
context: E,
cfg: Config<C>,
) -> (
Self,
UnboundedMailbox<Message<C::PublicKey>>,
Oracle<C::PublicKey>,
) {
let directory_cfg = directory::Config {
max_sets: cfg.tracked_peer_sets,
peer_connection_cooldown: cfg.peer_connection_cooldown,
allow_private_ips: cfg.allow_private_ips,
allow_dns: cfg.allow_dns,
bypass_ip_check: cfg.bypass_ip_check,
block_duration: cfg.block_duration,
};
let (mailbox, receiver) = UnboundedMailbox::new();
let oracle = Oracle::new(mailbox.clone());
let releaser = Releaser::new(mailbox.clone());
let directory = Directory::init(
context.with_label("directory"),
cfg.crypto.public_key(),
directory_cfg,
releaser,
);
(
Self {
context: ContextCell::new(context),
receiver,
directory,
listener: cfg.listener,
mailboxes: HashMap::new(),
subscribers: Vec::new(),
},
mailbox,
oracle,
)
}
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run().await)
}
async fn run(mut self) {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping tracker");
},
_ = self.directory.wait_for_unblock() => {
if self.directory.unblock_expired() {
self.listener
.0
.send_lossy(self.directory.listenable())
.await;
}
},
Some(msg) = self.receiver.recv() else {
debug!("mailbox closed, stopping tracker");
break;
} => {
self.handle_msg(msg).await;
},
}
}
async fn handle_msg(&mut self, msg: Message<C::PublicKey>) {
match msg {
Message::Register { index, peers } => {
let Some(reset_peers) = self.directory.track(index, peers) else {
return;
};
for peer in reset_peers {
if let Some(mut mailbox) = self.mailboxes.remove(&peer) {
mailbox.kill().await;
}
}
self.listener
.0
.send_lossy(self.directory.listenable())
.await;
let update = self
.directory
.latest_update()
.expect("latest update missing after successful track");
self.subscribers
.retain(|subscriber| subscriber.send_lossy(update.clone()));
}
Message::Overwrite { peers } => {
let mut any_changed = false;
for (public_key, address) in peers {
if !self.directory.overwrite(&public_key, address) {
continue;
}
any_changed = true;
if let Some(mut peer) = self.mailboxes.remove(&public_key) {
peer.kill().await;
}
}
if any_changed {
self.listener
.0
.send_lossy(self.directory.listenable())
.await;
}
}
Message::PeerSet { index, responder } => {
let _ = responder.send(self.directory.get_peer_set(&index));
}
Message::Subscribe { responder } => {
let (sender, receiver) = mpsc::unbounded_channel();
if let Some(update) = self.directory.latest_update() {
sender.send_lossy(update);
}
self.subscribers.push(sender);
let _ = responder.send(receiver);
}
Message::Connect {
public_key,
mut peer,
} => {
if !self.directory.eligible(&public_key) {
peer.kill().await;
return;
}
self.directory.connect(&public_key);
self.mailboxes.insert(public_key, peer);
}
Message::Dialable { responder } => {
let _ = responder.send(self.directory.dialable());
}
Message::Dial {
public_key,
reservation,
} => {
let _ = reservation.send(self.directory.dial(&public_key));
}
Message::Acceptable {
public_key,
source_ip,
responder,
} => {
let _ = responder.send(self.directory.acceptable(&public_key, source_ip));
}
Message::Listen {
public_key,
reservation,
} => {
let _ = reservation.send(self.directory.listen(&public_key));
}
Message::Block { public_key } => {
self.directory.block(&public_key);
if let Some(mut peer) = self.mailboxes.remove(&public_key) {
peer.kill().await;
}
self.listener
.0
.send_lossy(self.directory.listenable())
.await;
}
Message::Release { metadata } => {
self.mailboxes.remove(metadata.public_key());
self.directory.release(metadata);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
authenticated::lookup::actors::peer, AddressableManager, AddressableTrackedPeers, Ingress,
Provider,
};
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Signer,
};
use commonware_runtime::{
deterministic::{self},
Clock, Runner,
};
use commonware_utils::{
ordered::{Map, Set},
NZUsize,
};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
time::Duration,
};
fn test_config<C: Signer>(
crypto: C,
bypass_ip_check: bool,
) -> (Config<C>, mpsc::Receiver<HashSet<IpAddr>>) {
let (registered_ips_sender, registered_ips_receiver) = Mailbox::new(1);
(
Config {
crypto,
tracked_peer_sets: NZUsize!(2),
peer_connection_cooldown: Duration::from_millis(200),
allow_private_ips: true,
allow_dns: true,
bypass_ip_check,
listener: registered_ips_sender,
block_duration: Duration::from_secs(100),
},
registered_ips_receiver,
)
}
fn new_signer_and_pk(seed: u64) -> (PrivateKey, PublicKey) {
let signer = PrivateKey::from_seed(seed);
let pk = signer.public_key();
(signer, pk)
}
struct TestHarness {
mailbox: UnboundedMailbox<Message<PublicKey>>,
oracle: Oracle<PublicKey>,
}
fn setup_actor(
runner_context: deterministic::Context,
cfg_to_clone: Config<PrivateKey>, ) -> TestHarness {
let (actor, mailbox, oracle) = Actor::new(runner_context, cfg_to_clone);
actor.start();
TestHarness { mailbox, oracle }
}
#[test]
fn test_connect_unauthorized_peer_is_killed() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness { mut mailbox, .. } = setup_actor(context.clone(), cfg);
let (_unauth_signer, unauth_pk) = new_signer_and_pk(1);
let (peer_mailbox, mut peer_receiver) = Mailbox::new(1);
mailbox.connect(unauth_pk.clone(), peer_mailbox);
assert!(
matches!(peer_receiver.recv().await, Some(peer::Message::Kill)),
"Unauthorized peer should be killed on Connect"
);
});
}
#[test]
fn test_block_peer_standard_behavior() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
let (_, pk) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(dialable.peers.iter().any(|peer| peer == &pk));
crate::block_peer(&mut oracle, pk.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &pk));
});
}
#[test]
fn test_block_peer_already_blocked_is_noop() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
let (_, pk1) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(pk1.clone(), addr.into())]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
crate::block_peer(&mut oracle, pk1.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &pk1));
crate::block_peer(&mut oracle, pk1.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &pk1));
});
}
#[test]
fn test_block_peer_non_existent_is_noop() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness { mut oracle, .. } = setup_actor(context.clone(), cfg_initial);
let (_s1_signer, pk_non_existent) = new_signer_and_pk(100);
crate::block_peer(&mut oracle, pk_non_existent).await;
context.sleep(Duration::from_millis(10)).await;
});
}
#[test]
fn test_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1001);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(2);
let peer_addr2 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 1002);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(3);
let peer_addr3 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 3).into(), 1003);
let (cfg_initial, _) = test_config(peer_signer, false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await);
assert!(!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await);
assert!(!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([
(peer_pk.clone(), peer_addr.into()),
(peer_pk2.clone(), peer_addr2.into()),
])
.unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
assert!(!mailbox.acceptable(peer_pk, peer_addr.ip()).await);
assert!(mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await);
assert!(!mailbox.acceptable(peer_pk2, peer_addr.ip()).await);
assert!(!mailbox.acceptable(peer_pk3, peer_addr3.ip()).await);
});
}
#[test]
fn test_acceptable_bypass_ip_check() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1001);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(2);
let peer_addr2 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 1002);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(3);
let peer_addr3 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 3).into(), 1003);
let (cfg, _) = test_config(peer_signer, true);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
assert!(
!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await,
"Unknown peer should not be acceptable"
);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([
(peer_pk.clone(), peer_addr.into()),
(peer_pk2.clone(), peer_addr2.into()),
])
.unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
assert!(
mailbox.acceptable(peer_pk2.clone(), peer_addr.ip()).await,
"Registered peer with wrong IP should be acceptable with bypass_ip_check=true"
);
assert!(
!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await,
"Self should not be acceptable"
);
crate::block_peer(&mut oracle, peer_pk2.clone()).await;
context.sleep(Duration::from_millis(10)).await;
assert!(
!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await,
"Blocked peer should not be acceptable"
);
});
}
#[test]
fn test_listen() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
let (_peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080);
let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_none());
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(peer_pk.clone(), peer_addr.into())])
.unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
assert!(mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await);
let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());
assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await);
let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());
drop(reservation.unwrap());
context.sleep(Duration::from_millis(1_010)).await;
let reservation_after_release = mailbox.listen(peer_pk.clone()).await;
assert!(reservation_after_release.is_some());
});
}
#[test]
fn test_dialable_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(boot_pk.clone(), boot_addr.into())])
.unwrap(),
)
.await;
let dialable = mailbox.dialable().await;
assert_eq!(dialable.peers.len(), 1);
assert_eq!(dialable.peers[0], boot_pk);
});
}
#[test]
fn test_dial_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(boot_pk.clone(), boot_addr.into())])
.unwrap(),
)
.await;
let result = mailbox.dial(boot_pk.clone()).await;
assert!(result.is_some());
if let Some((res, ingress)) = result {
match res.metadata() {
crate::authenticated::lookup::actors::tracker::Metadata::Dialer(pk) => {
assert_eq!(pk, &boot_pk);
}
_ => panic!("Expected Dialer metadata"),
}
assert_eq!(ingress, Ingress::Socket(boot_addr));
}
let (_unknown_signer, unknown_pk) = new_signer_and_pk(100);
let no_reservation = mailbox.dial(unknown_pk).await;
assert!(no_reservation.is_none());
});
}
#[test]
fn test_secondary_peers_are_acceptable_but_not_primary_or_dialable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let mut subscription = oracle.subscribe().await;
let (_primary_signer, primary_pk) = new_signer_and_pk(1);
let primary_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001);
let (_secondary_signer, secondary_pk) = new_signer_and_pk(2);
let secondary_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9002);
oracle
.track(
0,
AddressableTrackedPeers::new(
Map::<_, crate::Address>::try_from([(
primary_pk.clone(),
primary_addr.into(),
)])
.unwrap(),
Map::<_, crate::Address>::try_from([(
secondary_pk.clone(),
secondary_addr.into(),
)])
.unwrap(),
),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.primary.position(&primary_pk).is_some());
assert!(update.latest.primary.position(&secondary_pk).is_none());
assert_eq!(
update.latest.secondary,
Set::try_from([secondary_pk.clone()]).unwrap()
);
assert_eq!(update.all.primary, update.latest.primary);
assert_eq!(
update.all.secondary,
Set::try_from([secondary_pk.clone()]).unwrap()
);
let dialable = mailbox.dialable().await;
assert!(dialable.peers.iter().any(|peer| peer == &primary_pk));
assert!(!dialable.peers.iter().any(|peer| peer == &secondary_pk));
assert!(mailbox.dial(secondary_pk.clone()).await.is_none());
assert!(mailbox.acceptable(secondary_pk, secondary_addr.ip()).await);
});
}
#[test]
fn test_overlapping_primary_secondary_no_duplicate_in_subscription() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let mut subscription = oracle.subscribe().await;
let (_signer, pk) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001);
oracle
.track(
0,
AddressableTrackedPeers::new(
Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(),
Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(),
),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.primary.position(&pk).is_some());
assert!(
update.latest.secondary.is_empty(),
"overlap peer is deduplicated as primary only"
);
assert_eq!(update.all.primary, update.latest.primary);
assert!(
update.all.secondary.is_empty(),
"aggregate secondary excludes keys that are primary"
);
let dialable = mailbox.dialable().await;
assert!(dialable.peers.iter().any(|peer| peer == &pk));
assert!(mailbox.acceptable(pk, addr.ip()).await);
});
}
#[test]
fn test_block_clears_peer_mailbox_and_only_kills_once() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let (_peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 12345);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(peer_pk.clone(), peer_addr.into())])
.unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = Mailbox::new(1);
mailbox.connect(peer_pk.clone(), peer_mailbox);
crate::block_peer(&mut oracle, peer_pk.clone()).await;
context.sleep(Duration::from_millis(10)).await;
assert!(
matches!(peer_rx.recv().await, Some(peer::Message::Kill)),
"connected peer must be killed on first Block"
);
crate::block_peer(&mut oracle, peer_pk.clone()).await;
context.sleep(Duration::from_millis(10)).await;
assert!(
peer_rx.recv().await.is_none(),
"no kill after handle has been cleared"
);
});
}
#[test]
fn test_register_disconnects_removed_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, my_pk) = new_signer_and_pk(0);
let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001);
let pk_2 = new_signer_and_pk(2).1;
let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002);
let (mut cfg, mut listener_receiver) = test_config(my_sk, false);
cfg.tracked_peer_sets = NZUsize!(1);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([
(my_pk.clone(), my_addr.into()),
(pk_1.clone(), addr_1.into()),
])
.unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(registered_ips.contains(&my_addr.ip()));
assert!(registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
let reservation = mailbox.listen(pk_1.clone()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = Mailbox::new(1);
mailbox.connect(my_pk.clone(), peer_mailbox);
oracle
.track(
1,
Map::<_, crate::Address>::try_from([(pk_2.clone(), addr_2.into())]).unwrap(),
)
.await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(!registered_ips.contains(&my_addr.ip()));
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(registered_ips.contains(&addr_2.ip()));
assert!(matches!(peer_rx.recv().await, Some(peer::Message::Kill)),)
});
}
#[test]
fn test_overwrite_triggers_listener() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, my_pk) = new_signer_and_pk(0);
let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002);
let (cfg, mut listener_receiver) = test_config(my_sk, false);
let TestHarness { mut oracle, .. } = setup_actor(context.clone(), cfg);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([
(my_pk.clone(), my_addr.into()),
(pk_1.clone(), addr_1.into()),
])
.unwrap(),
)
.await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
oracle
.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap())
.await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(registered_ips.contains(&addr_2.ip()));
});
}
#[test]
fn test_overwrite_via_oracle() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr_1 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1002);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_1.into())]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let result = mailbox.dial(pk.clone()).await;
assert!(result.is_some());
let (_, ingress) = result.unwrap();
assert_eq!(ingress, Ingress::Socket(addr_1));
oracle
.overwrite([(pk.clone(), addr_2.into())].try_into().unwrap())
.await;
context.sleep(Duration::from_millis(1010)).await;
let result = mailbox.dial(pk.clone()).await;
assert!(result.is_some());
let (_, ingress) = result.unwrap();
assert_eq!(ingress, Ingress::Socket(addr_2));
});
}
#[test]
fn test_overwrite_blocked_peer_not_in_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (my_sk, my_pk) = new_signer_and_pk(0);
let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002);
let (cfg, mut listener_receiver) = test_config(my_sk, false);
let TestHarness { mut oracle, .. } = setup_actor(context.clone(), cfg);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([
(my_pk.clone(), my_addr.into()),
(pk_1.clone(), addr_1.into()),
])
.unwrap(),
)
.await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(registered_ips.contains(&addr_1.ip()));
crate::block_peer(&mut oracle, pk_1.clone()).await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
oracle
.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap())
.await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(!registered_ips.contains(&addr_1.ip()));
assert!(!registered_ips.contains(&addr_2.ip()));
});
}
#[test]
fn test_overwrite_untracked_peer_silently_ignored() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness { mut oracle, .. } = setup_actor(context.clone(), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
oracle
.overwrite([(pk, addr.into())].try_into().unwrap())
.await;
});
}
#[test]
fn test_overwrite_changes_acceptable_ip() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let pk_1 = new_signer_and_pk(1).1;
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002);
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
assert!(mailbox.acceptable(pk_1.clone(), addr_1.ip()).await);
assert!(!mailbox.acceptable(pk_1.clone(), addr_2.ip()).await);
oracle
.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap())
.await;
assert!(!mailbox.acceptable(pk_1.clone(), addr_1.ip()).await);
assert!(mailbox.acceptable(pk_1.clone(), addr_2.ip()).await);
});
}
#[test]
fn test_overwrite_severs_existing_connection() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, _) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_1.into())]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let reservation = mailbox.listen(pk.clone()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = Mailbox::new(1);
mailbox.connect(pk.clone(), peer_mailbox);
oracle
.overwrite([(pk.clone(), addr_2.into())].try_into().unwrap())
.await;
assert!(matches!(peer_rx.recv().await, Some(peer::Message::Kill)));
});
}
#[test]
fn test_add_set_severs_connection_on_address_change() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, mut listener_receiver) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let (_, pk) = new_signer_and_pk(1);
let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(),
)
.await;
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(registered_ips.contains(&addr_a.ip()));
let reservation = mailbox.listen(pk.clone()).await;
assert!(reservation.is_some());
let (peer_mailbox, mut peer_rx) = Mailbox::new(1);
mailbox.connect(pk.clone(), peer_mailbox);
oracle
.track(
1,
Map::<_, crate::Address>::try_from([(pk.clone(), addr_b.into())]).unwrap(),
)
.await;
assert!(matches!(peer_rx.recv().await, Some(peer::Message::Kill)));
let registered_ips = listener_receiver.recv().await.unwrap();
assert!(!registered_ips.contains(&addr_a.ip()));
assert!(registered_ips.contains(&addr_b.ip()));
});
}
#[test]
fn test_overwrite_batch_mixed_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (cfg, mut listener_receiver) = test_config(PrivateKey::from_seed(0), false);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let (_, pk_tracked) = new_signer_and_pk(1);
let (_, pk_unchanged) = new_signer_and_pk(2);
let (_, pk_untracked) = new_signer_and_pk(3);
let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001);
let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002);
let addr_unchanged = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 10, 10, 10)), 1003);
oracle
.track(
0,
Map::<_, crate::Address>::try_from([
(pk_tracked.clone(), addr_1.into()),
(pk_unchanged.clone(), addr_unchanged.into()),
])
.unwrap(),
)
.await;
let _ = listener_receiver.recv().await.unwrap();
let reservation = mailbox.listen(pk_tracked.clone()).await;
assert!(reservation.is_some());
let (tracked_mailbox, mut tracked_rx) = Mailbox::new(1);
mailbox.connect(pk_tracked.clone(), tracked_mailbox);
let reservation = mailbox.listen(pk_unchanged.clone()).await;
assert!(reservation.is_some());
let (unchanged_mailbox, mut unchanged_rx) = Mailbox::new(1);
mailbox.connect(pk_unchanged.clone(), unchanged_mailbox);
oracle
.overwrite(
[
(pk_tracked.clone(), addr_2.into()),
(pk_unchanged.clone(), addr_unchanged.into()),
(pk_untracked.clone(), addr_1.into()),
]
.try_into()
.unwrap(),
)
.await;
assert!(matches!(tracked_rx.recv().await, Some(peer::Message::Kill)));
assert!(
unchanged_rx.try_recv().is_err(),
"Unchanged peer should not receive kill"
);
});
}
}