use super::{
directory::{self, Directory},
ingress::{Message, Oracle},
Config,
};
use crate::{
authenticated::{
discovery::{
actors::tracker::ingress::Releaser,
types::{self, Info, InfoVerifier},
},
mailbox::UnboundedMailbox,
},
PeerSetUpdate,
};
use commonware_cryptography::Signer;
use commonware_macros::select_loop;
use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, Metrics as RuntimeMetrics, Spawner,
};
use commonware_utils::{
channel::{fallible::FallibleExt, mpsc},
union, SystemTimeExt,
};
use rand::{seq::SliceRandom, Rng};
use tracing::debug;
const NAMESPACE_SUFFIX_IP: &[u8] = b"_IP";
pub struct Actor<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> {
context: ContextCell<E>,
crypto: C,
max_peer_set_size: u64,
peer_gossip_max_count: usize,
receiver: mpsc::UnboundedReceiver<Message<C::PublicKey>>,
directory: Directory<E, C::PublicKey>,
subscribers: Vec<mpsc::UnboundedSender<PeerSetUpdate<C::PublicKey>>>,
}
impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> Actor<E, C> {
#[allow(clippy::type_complexity)]
pub fn new(
context: E,
cfg: Config<C>,
) -> (
Self,
UnboundedMailbox<Message<C::PublicKey>>,
Oracle<C::PublicKey>,
InfoVerifier<C::PublicKey>,
) {
let socket = cfg.address;
let timestamp = context.current().epoch_millis();
let ip_namespace = union(&cfg.namespace, NAMESPACE_SUFFIX_IP);
let myself = types::Info::sign(&cfg.crypto, &ip_namespace, socket, timestamp);
let directory_cfg = directory::Config {
allow_private_ips: cfg.allow_private_ips,
allow_dns: cfg.allow_dns,
max_sets: cfg.tracked_peer_sets,
dial_fail_limit: cfg.dial_fail_limit,
peer_connection_cooldown: cfg.peer_connection_cooldown,
block_duration: cfg.block_duration,
};
let (mailbox, receiver) = UnboundedMailbox::new();
let oracle = Oracle::new(mailbox.clone());
let releaser = Releaser::new(mailbox.clone());
let directory = Directory::init(
context.with_label("directory"),
cfg.bootstrappers,
myself,
directory_cfg,
releaser,
);
let info_verifier = Info::verifier(
cfg.crypto.public_key(),
cfg.peer_gossip_max_count,
cfg.synchrony_bound,
ip_namespace,
);
(
Self {
context: ContextCell::new(context),
crypto: cfg.crypto,
max_peer_set_size: cfg.max_peer_set_size,
peer_gossip_max_count: cfg.peer_gossip_max_count,
receiver,
directory,
subscribers: Vec::new(),
},
mailbox,
oracle,
info_verifier,
)
}
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run().await)
}
async fn run(mut self) {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping tracker");
},
_ = self.directory.wait_for_unblock() => {
self.directory.unblock_expired();
},
Some(msg) = self.receiver.recv() else {
debug!("mailbox closed, stopping tracker");
break;
} => {
self.handle_msg(msg).await;
},
}
}
async fn handle_msg(&mut self, msg: Message<C::PublicKey>) {
match msg {
Message::Register { index, peers } => {
let max = self.max_peer_set_size;
assert!(
peers.primary.len() as u64 <= max,
"primary peer set too large: {} > {max}",
peers.primary.len()
);
if !self.directory.track(index, peers) {
return;
}
let update = self
.directory
.latest_update()
.expect("latest update missing after successful track");
self.subscribers
.retain(|subscriber| subscriber.send_lossy(update.clone()));
}
Message::PeerSet { index, responder } => {
let _ = responder.send(self.directory.get_peer_set(&index));
}
Message::Subscribe { responder } => {
let (sender, receiver) = mpsc::unbounded_channel();
if let Some(update) = self.directory.latest_update() {
sender.send_lossy(update);
}
self.subscribers.push(sender);
let _ = responder.send(receiver);
}
Message::Connect {
public_key,
dialer,
responder,
} => {
if !self.directory.eligible(&public_key) {
return;
}
self.directory.connect(&public_key, dialer);
let info = self.directory.info(&self.crypto.public_key()).unwrap();
let _ = responder.send(info);
}
Message::Construct {
public_key,
mut peer,
} => {
if !self.directory.eligible(&public_key) {
peer.kill().await;
return;
}
if let Some(bit_vec) = self.directory.get_random_bit_vec() {
let _ = peer.bit_vec(bit_vec).await;
} else {
debug!("no peer sets available");
};
}
Message::BitVec { bit_vec, mut peer } => {
let Some(mut infos) = self.directory.infos(bit_vec) else {
peer.kill().await;
return;
};
let max = self.peer_gossip_max_count;
if infos.len() > max {
infos.partial_shuffle(&mut self.context, max);
infos.truncate(max);
}
if !infos.is_empty() {
peer.peers(infos).await;
}
}
Message::Peers { peers } => {
self.directory.update_peers(peers);
}
Message::Dialable { responder } => {
let _ = responder.send(self.directory.dialable());
}
Message::Dial {
public_key,
reservation,
} => {
let _ = reservation.send(self.directory.dial(&public_key));
}
Message::Acceptable {
public_key,
responder,
} => {
let _ = responder.send(self.directory.acceptable(&public_key));
}
Message::Listen {
public_key,
reservation,
} => {
let _ = reservation.send(self.directory.listen(&public_key));
}
Message::Block { public_key } => {
self.directory.block(&public_key);
}
Message::Release { metadata } => {
self.directory.release(metadata);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
authenticated::{
discovery::{
actors::{peer, tracker},
config::Bootstrapper,
types,
},
Mailbox,
},
Ingress, Manager, Provider, TrackedPeers,
};
use commonware_codec::{DecodeExt, Encode};
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey, Signature},
Signer,
};
use commonware_runtime::{deterministic, Clock, Runner};
use commonware_utils::{bitmap::BitMap, ordered::Set, NZUsize, TryCollect};
use futures::future::Either;
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use types::Info;
fn default_test_config<C: Signer>(
crypto: C,
bootstrappers: Vec<Bootstrapper<C::PublicKey>>,
) -> Config<C> {
Config {
crypto,
namespace: b"test_tracker_actor_namespace".to_vec(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0).into(),
bootstrappers,
allow_private_ips: true,
allow_dns: true,
synchrony_bound: Duration::from_secs(10),
tracked_peer_sets: NZUsize!(2),
peer_connection_cooldown: Duration::from_millis(200),
peer_gossip_max_count: 5,
max_peer_set_size: 128,
dial_fail_limit: 1,
block_duration: Duration::from_secs(100),
}
}
fn new_signer_and_pk(seed: u64) -> (PrivateKey, PublicKey) {
let signer = PrivateKey::from_seed(seed);
let pk = signer.public_key();
(signer, pk)
}
fn new_peer_info(
signer: &mut PrivateKey,
ip_namespace: &[u8],
socket: SocketAddr,
timestamp: u64,
target_pk_override: Option<PublicKey>,
make_sig_invalid: bool,
) -> Info<PublicKey> {
let peer_info_pk = target_pk_override.unwrap_or_else(|| signer.public_key());
let ingress: Ingress = socket.into();
let mut signature = signer.sign(ip_namespace, &(ingress.clone(), timestamp).encode());
if make_sig_invalid && !signature.as_ref().is_empty() {
let mut sig_bytes = signature.encode_mut();
sig_bytes[0] = sig_bytes[0].wrapping_add(1);
signature = Signature::decode(sig_bytes).unwrap();
}
Info {
ingress,
timestamp,
public_key: peer_info_pk,
signature,
}
}
async fn connect_to_peer(
mailbox: &mut UnboundedMailbox<Message<PublicKey>>,
peer: &PublicKey,
) -> tracker::Reservation<PublicKey> {
let res = mailbox
.listen(peer.clone())
.await
.expect("reservation failed");
let dialer = false;
let greeting = mailbox.connect(peer.clone(), dialer).await;
assert!(
greeting.is_some(),
"expected greeting info for authorized peer"
);
res
}
struct TestHarness {
mailbox: UnboundedMailbox<Message<PublicKey>>,
oracle: Oracle<PublicKey>,
ip_namespace: Vec<u8>,
tracker_pk: PublicKey,
cfg: Config<PrivateKey>, }
fn setup_actor(
runner_context: deterministic::Context,
cfg_to_clone: Config<PrivateKey>, ) -> TestHarness {
let tracker_signer = cfg_to_clone.crypto.clone();
let tracker_pk = tracker_signer.public_key();
let ip_namespace_base = cfg_to_clone.namespace.clone();
let stored_cfg = cfg_to_clone.clone();
let (actor, mailbox, oracle, _) = Actor::new(runner_context, cfg_to_clone);
let ip_namespace = union(&ip_namespace_base, super::NAMESPACE_SUFFIX_IP);
actor.start();
TestHarness {
mailbox,
oracle,
ip_namespace,
tracker_pk,
cfg: stored_cfg,
}
}
#[test]
#[should_panic(expected = "primary peer set too large")]
fn test_register_primary_peer_set_too_large() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut oracle,
cfg,
mut mailbox,
..
} = setup_actor(context.clone(), cfg_initial);
let too_many_peers: Set<PublicKey> = (1..=cfg.max_peer_set_size + 1)
.map(|i| new_signer_and_pk(i).1)
.try_collect()
.unwrap();
oracle.track(0, too_many_peers).await;
let _ = mailbox.dialable().await;
});
}
#[test]
fn test_register_large_secondary_peer_set_accepted() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut oracle,
cfg,
mut mailbox,
..
} = setup_actor(context.clone(), cfg_initial);
let large_secondary: Set<PublicKey> = (1..=cfg.max_peer_set_size + 1)
.map(|i| new_signer_and_pk(i).1)
.try_collect()
.unwrap();
let primary: Set<PublicKey> = Set::default();
oracle
.track(0, TrackedPeers::new(primary, large_secondary))
.await;
let _ = mailbox.dialable().await;
});
}
#[test]
fn test_connect_unauthorized_peer_returns_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness { mut mailbox, .. } = setup_actor(context.clone(), cfg);
let (_unauth_signer, unauth_pk) = new_signer_and_pk(1);
let result = mailbox.connect(unauth_pk.clone(), false).await;
assert!(
result.is_none(),
"Unauthorized peer should get None on Connect"
);
let result = mailbox.connect(unauth_pk, true).await;
assert!(
result.is_none(),
"Unauthorized peer should get None on Connect"
);
});
}
#[test]
fn test_connect_authorized_peer_receives_tracker_info() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
tracker_pk,
cfg,
ip_namespace,
..
} = setup_actor(context.clone(), cfg_initial);
let (_auth_signer, auth_pk) = new_signer_and_pk(1);
oracle
.track(
0,
Set::try_from([tracker_pk.clone(), auth_pk.clone()]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let _res = mailbox.listen(auth_pk.clone()).await.unwrap();
let tracker_info = mailbox
.connect(auth_pk.clone(), false)
.await
.expect("Expected greeting info for authorized peer");
assert_eq!(tracker_info.public_key, tracker_pk);
assert_eq!(tracker_info.ingress, cfg.address);
assert!(tracker_info.verify(&ip_namespace));
});
}
#[test]
fn test_construct_no_sets_tracked() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9999);
let cfg_with_boot = default_test_config(
PrivateKey::from_seed(0),
vec![(boot_pk.clone(), boot_addr.into())],
);
let TestHarness {
mailbox: mut new_mailbox,
..
} = setup_actor(context.clone(), cfg_with_boot);
let (peer_mailbox, mut peer_receiver) = Mailbox::new(1);
new_mailbox.construct(boot_pk.clone(), peer_mailbox.clone());
match futures::future::select(
Box::pin(peer_receiver.recv()),
Box::pin(context.sleep(Duration::from_millis(50))),
)
.await
{
Either::Left((Some(_), _)) => {
panic!("Expected no message on Construct with no sets",)
}
Either::Left((None, _)) => panic!("Peer mailbox closed unexpectedly"),
Either::Right(_) => { }
};
});
}
#[test]
fn test_handle_bit_vec_for_unknown_index_sends_no_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let (_, pk1) = new_signer_and_pk(1);
oracle
.track(0, Set::try_from([tracker_pk, pk1.clone()]).unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;
let (peer_mailbox_pk1, mut peer_receiver_pk1) = Mailbox::new(1);
let bit_vec_unknown_idx = types::BitVec {
index: 99,
bits: BitMap::ones(1),
};
let _r1 = connect_to_peer(&mut mailbox, &pk1).await;
mailbox.bit_vec(bit_vec_unknown_idx, peer_mailbox_pk1.clone());
assert!(peer_receiver_pk1.try_recv().is_err());
});
}
#[test]
fn test_block_peer_standard_behavior() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let (_s1_signer, pk1) = new_signer_and_pk(1);
oracle
.track(0, Set::try_from([tracker_pk.clone(), pk1.clone()]).unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;
crate::block_peer(&mut oracle, pk1.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let (peer_mailbox_pk1, mut peer_receiver_pk1) = Mailbox::new(1);
mailbox.construct(pk1.clone(), peer_mailbox_pk1.clone());
assert!(matches!(
peer_receiver_pk1.recv().await,
Some(peer::Message::Kill)
));
let dialable_peers = mailbox.dialable().await;
assert!(!dialable_peers.peers.iter().any(|peer| peer == &pk1));
});
}
#[test]
fn test_block_peer_already_blocked_is_noop() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let (_s1_signer, pk1) = new_signer_and_pk(1);
oracle
.track(0, Set::try_from([tracker_pk.clone(), pk1.clone()]).unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;
crate::block_peer(&mut oracle, pk1.clone()).await;
context.sleep(Duration::from_millis(10)).await;
crate::block_peer(&mut oracle, pk1.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let (peer_mailbox_pk1, mut peer_receiver_pk1) = Mailbox::new(1);
mailbox.construct(pk1.clone(), peer_mailbox_pk1.clone());
assert!(matches!(
peer_receiver_pk1.recv().await,
Some(peer::Message::Kill)
));
});
}
#[test]
fn test_block_peer_non_existent_is_noop() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness { mut oracle, .. } = setup_actor(context.clone(), cfg_initial);
let (_s1_signer, pk_non_existent) = new_signer_and_pk(100);
crate::block_peer(&mut oracle, pk_non_existent).await;
context.sleep(Duration::from_millis(10)).await;
});
}
#[test]
fn test_handle_peers_learns_unknown_peer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
ip_namespace,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let (_, pk1) = new_signer_and_pk(1);
let (mut s2_signer, pk2) = new_signer_and_pk(2);
oracle
.track(0, Set::try_from([tracker_pk.clone(), pk1.clone()]).unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;
let pk2_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 2002);
let pk2_timestamp = context.current().epoch_millis();
let pk2_info = new_peer_info(
&mut s2_signer,
&ip_namespace,
pk2_addr,
pk2_timestamp,
Some(pk2.clone()),
false,
);
let set1: Set<_> = [tracker_pk.clone(), pk1.clone(), pk2.clone()]
.try_into()
.unwrap();
oracle.track(1, set1.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let (peer_mailbox_s1, mut peer_receiver_s1) = Mailbox::new(1);
mailbox.peers(vec![pk2_info.clone()]);
context.sleep(Duration::from_millis(10)).await;
let _r1 = connect_to_peer(&mut mailbox, &pk1).await;
let _r2 = connect_to_peer(&mut mailbox, &pk2).await;
let mut bv = BitMap::zeroes(set1.len() as u64);
let idx_tracker_in_set1 = set1.iter().position(|p| p == &tracker_pk).unwrap();
let idx_pk1_in_set1 = set1.iter().position(|p| p == &pk1).unwrap();
bv.set(idx_tracker_in_set1 as u64, true);
bv.set(idx_pk1_in_set1 as u64, true);
mailbox.bit_vec(
types::BitVec { index: 1, bits: bv },
peer_mailbox_s1.clone(),
);
match peer_receiver_s1.recv().await {
Some(peer::Message::Peers(received_peers_info)) => {
assert_eq!(received_peers_info.len(), 1);
let received_pk2_info = &received_peers_info[0];
assert_eq!(received_pk2_info.public_key, pk2);
assert_eq!(received_pk2_info.ingress, Ingress::Socket(pk2_addr));
assert_eq!(received_pk2_info.timestamp, pk2_timestamp);
}
_ => panic!("pk1 did not receive expected Info for pk2",),
}
});
}
#[test]
fn test_handle_peers_rejects_older_info_for_known_peer() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
ip_namespace,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let ts_new = context.current().epoch_millis();
let ts_old = ts_new.saturating_sub(100);
let (_, pk1) = new_signer_and_pk(1);
let (mut s2_signer, pk2) = new_signer_and_pk(2);
let peer_set_0_peers: Set<_> = [tracker_pk.clone(), pk1.clone(), pk2.clone()]
.try_into()
.unwrap();
oracle.track(0, peer_set_0_peers.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let pk2_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 2002);
let pk2_info_initial = new_peer_info(
&mut s2_signer,
&ip_namespace,
pk2_addr,
ts_new,
Some(pk2.clone()),
false,
);
let (peer_mailbox_s1, mut peer_receiver_s1) = Mailbox::new(1);
let _r1 = connect_to_peer(&mut mailbox, &pk1).await;
let _r2 = connect_to_peer(&mut mailbox, &pk2).await;
mailbox.peers(vec![pk2_info_initial.clone()]);
context.sleep(Duration::from_millis(10)).await;
let pk2_info_older = new_peer_info(
&mut s2_signer,
&ip_namespace,
pk2_addr,
ts_old,
Some(pk2.clone()),
false,
);
mailbox.peers(vec![pk2_info_older]);
context.sleep(Duration::from_millis(10)).await;
let mut knowledge_for_set0 = BitMap::zeroes(peer_set_0_peers.len() as u64);
let idx_tracker_in_set0 = peer_set_0_peers.position(&tracker_pk).unwrap();
let idx_pk1_in_set0 = peer_set_0_peers.position(&pk1).unwrap();
knowledge_for_set0.set(idx_tracker_in_set0 as u64, true);
knowledge_for_set0.set(idx_pk1_in_set0 as u64, true);
let bit_vec_from_pk1 = types::BitVec {
index: 0,
bits: knowledge_for_set0,
};
mailbox.bit_vec(bit_vec_from_pk1, peer_mailbox_s1.clone());
match peer_receiver_s1.recv().await {
Some(peer::Message::Peers(received_peers_info)) => {
assert_eq!(received_peers_info.len(), 1);
let received_pk2_info = &received_peers_info[0];
assert_eq!(received_pk2_info.public_key, pk2);
assert_eq!(received_pk2_info.timestamp, ts_new);
}
_ => panic!("pk1 did not receive Info as expected"),
}
});
}
#[test]
fn test_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(0);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(1);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(2);
let cfg_initial = default_test_config(peer_signer, Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert!(!mailbox.acceptable(peer_pk2.clone()).await);
assert!(!mailbox.acceptable(peer_pk3.clone()).await);
oracle
.track(
0,
Set::try_from([peer_pk.clone(), peer_pk2.clone()]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
assert!(!mailbox.acceptable(peer_pk).await);
assert!(mailbox.acceptable(peer_pk2).await);
assert!(!mailbox.acceptable(peer_pk3).await);
});
}
#[test]
fn test_listen() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);
let (_peer_signer, peer_pk) = new_signer_and_pk(1);
let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_none());
oracle
.track(0, Set::try_from([peer_pk.clone()]).unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;
assert!(mailbox.acceptable(peer_pk.clone()).await);
let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());
assert!(!mailbox.acceptable(peer_pk.clone()).await);
let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());
drop(reservation.unwrap());
context.sleep(Duration::from_millis(1_010)).await;
let reservation_after_release = mailbox.listen(peer_pk.clone()).await;
assert!(reservation_after_release.is_some());
});
}
#[test]
fn test_dialable_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let cfg_initial = default_test_config(
PrivateKey::from_seed(0),
vec![(boot_pk.clone(), boot_addr.into())],
);
let TestHarness { mut mailbox, .. } = setup_actor(context.clone(), cfg_initial);
let dialable_peers = mailbox.dialable().await;
assert_eq!(dialable_peers.peers.len(), 1);
assert_eq!(dialable_peers.peers[0], boot_pk);
});
}
#[test]
fn test_secondary_peers_are_connectable_but_not_primary() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
ip_namespace,
..
} = setup_actor(context.clone(), cfg);
let mut subscription = oracle.subscribe().await;
let (_primary_signer, primary_pk) = new_signer_and_pk(1);
let (mut secondary_signer, secondary_pk) = new_signer_and_pk(2);
oracle
.track(
0,
TrackedPeers::new(
Set::try_from([primary_pk.clone()]).unwrap(),
Set::try_from([secondary_pk.clone()]).unwrap(),
),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.primary.position(&primary_pk).is_some());
assert!(update.latest.primary.position(&secondary_pk).is_none());
assert_eq!(
update.latest.secondary,
Set::try_from([secondary_pk.clone()]).unwrap()
);
assert_eq!(update.all.primary, update.latest.primary);
assert_eq!(
update.all.secondary,
Set::try_from([secondary_pk.clone()]).unwrap()
);
assert!(mailbox.acceptable(secondary_pk.clone()).await);
let secondary_info = new_peer_info(
&mut secondary_signer,
&ip_namespace,
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9002),
context.current().epoch_millis(),
Some(secondary_pk.clone()),
false,
);
mailbox.peers(vec![secondary_info]);
context.sleep(Duration::from_millis(10)).await;
let dialable = mailbox.dialable().await;
assert!(!dialable.peers.iter().any(|peer| peer == &secondary_pk));
});
}
#[test]
fn test_overlapping_primary_secondary_no_duplicate_in_subscription() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg);
let mut subscription = oracle.subscribe().await;
let (_signer, pk) = new_signer_and_pk(1);
oracle
.track(
0,
TrackedPeers::new(
Set::try_from([pk.clone()]).unwrap(),
Set::try_from([pk.clone()]).unwrap(),
),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.primary.position(&pk).is_some());
assert!(
update.latest.secondary.is_empty(),
"overlap peer is deduplicated as primary only"
);
assert_eq!(update.all.primary, update.latest.primary);
assert!(
update.all.secondary.is_empty(),
"aggregate secondary excludes keys that are primary"
);
assert!(mailbox.acceptable(pk).await);
});
}
#[test]
fn test_dial_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (_boot_signer, boot_pk) = new_signer_and_pk(99);
let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000);
let cfg_initial = default_test_config(
PrivateKey::from_seed(0),
vec![(boot_pk.clone(), boot_addr.into())],
);
let TestHarness { mut mailbox, .. } = setup_actor(context.clone(), cfg_initial);
let reservation = mailbox.dial(boot_pk.clone()).await;
assert!(reservation.is_some());
if let Some(res) = reservation {
match res.metadata() {
crate::authenticated::discovery::actors::tracker::Metadata::Dialer(
pk,
addr,
) => {
assert_eq!(pk, &boot_pk);
assert_eq!(*addr, Ingress::Socket(boot_addr));
}
_ => panic!("Expected Dialer metadata"),
}
}
let (_unknown_signer, unknown_pk) = new_signer_and_pk(100);
let no_reservation = mailbox.dial(unknown_pk).await;
assert!(no_reservation.is_none());
});
}
#[test]
fn test_bitvec_kill_on_length_mismatch() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let (_s1, pk1) = new_signer_and_pk(1);
let (_s2, pk2) = new_signer_and_pk(2);
oracle
.track(
0,
Set::try_from([tracker_pk, pk1.clone(), pk2.clone()]).unwrap(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let (peer_mailbox, mut peer_receiver) = Mailbox::new(1);
let invalid_bit_vec = types::BitVec {
index: 0,
bits: BitMap::ones(2),
};
mailbox.bit_vec(invalid_bit_vec, peer_mailbox.clone());
assert!(matches!(
peer_receiver.recv().await,
Some(peer::Message::Kill)
));
});
}
#[test]
fn test_bit_vec_comprehensive() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg_initial = default_test_config(PrivateKey::from_seed(0), Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
ip_namespace,
tracker_pk,
..
} = setup_actor(context.clone(), cfg_initial);
let (mut peer1_s, peer1_pk) = new_signer_and_pk(1);
let (_peer2_s, peer2_pk) = new_signer_and_pk(2);
let (peer_mailbox1, mut peer_receiver1) = Mailbox::new(1);
mailbox.construct(peer1_pk.clone(), peer_mailbox1.clone());
assert!(
matches!(peer_receiver1.recv().await, Some(peer::Message::Kill)),
"Unauthorized peer killed on Construct"
);
let set0_peers: Set<_> = [tracker_pk.clone(), peer1_pk.clone(), peer2_pk.clone()]
.try_into()
.unwrap();
oracle.track(0, set0_peers.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let _r1 = connect_to_peer(&mut mailbox, &peer1_pk).await;
mailbox.construct(peer1_pk.clone(), peer_mailbox1.clone());
let bit_vec0 = match peer_receiver1.recv().await {
Some(peer::Message::BitVec(bv)) => bv,
_ => panic!("Expected BitVec for set 0"),
};
assert_eq!(bit_vec0.index, 0);
assert_eq!(bit_vec0.bits.len(), set0_peers.len() as u64);
let tracker_idx_s0 = set0_peers.iter().position(|p| p == &tracker_pk).unwrap();
assert!(
bit_vec0.bits.get(tracker_idx_s0 as u64),
"Tracker should know itself in set 0"
);
let peer1_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
let peer1_ts = context.current().epoch_millis();
let peer1_info = new_peer_info(
&mut peer1_s,
&ip_namespace,
peer1_addr,
peer1_ts,
Some(peer1_pk.clone()),
false,
);
mailbox.peers(vec![peer1_info]);
context.sleep(Duration::from_millis(10)).await;
mailbox.construct(peer1_pk.clone(), peer_mailbox1.clone());
let bit_vec0_updated = match peer_receiver1.recv().await {
Some(peer::Message::BitVec(bv)) => bv,
_ => panic!("Expected updated BitVec for set 0"),
};
let peer1_idx_s0 = set0_peers.iter().position(|p| p == &peer1_pk).unwrap();
assert!(bit_vec0_updated.bits.get(tracker_idx_s0 as u64));
assert!(
bit_vec0_updated.bits.get(peer1_idx_s0 as u64),
"Tracker should know peer1 in set 0 after Peers msg"
);
let mut peer1_knowledge_s0 = BitMap::zeroes(set0_peers.len() as u64);
peer1_knowledge_s0.set(tracker_idx_s0 as u64, true); mailbox.bit_vec(
types::BitVec {
index: 0,
bits: peer1_knowledge_s0,
},
peer_mailbox1.clone(),
);
match peer_receiver1.recv().await {
Some(peer::Message::Peers(infos)) => {
assert_eq!(infos.len(), 1, "Expected 1 Info (for peer1)");
assert_eq!(infos[0].public_key, peer1_pk);
assert_eq!(infos[0].ingress, peer1_addr.into());
}
_ => panic!("Expected Peers message from tracker"),
}
let (_peer3_s, peer3_pk) = new_signer_and_pk(3);
let set1_peers: Set<_> = [tracker_pk.clone(), peer2_pk.clone()].try_into().unwrap(); oracle.track(1, set1_peers.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let set2_peers: Set<_> = [tracker_pk.clone(), peer3_pk.clone()].try_into().unwrap(); oracle.track(2, set2_peers.clone()).await; context.sleep(Duration::from_millis(10)).await;
mailbox.construct(peer1_pk.clone(), peer_mailbox1.clone());
assert!(
matches!(peer_receiver1.recv().await, Some(peer::Message::Kill)),
"Peer1 should be killed after its only set was evicted"
);
let (peer_mailbox2, mut peer_receiver2) = Mailbox::new(1);
let _r2 = connect_to_peer(&mut mailbox, &peer2_pk).await;
let mut indices = HashSet::new();
for _ in 0..100 {
mailbox.construct(peer2_pk.clone(), peer_mailbox2.clone());
let Some(peer::Message::BitVec(bv)) = peer_receiver2.recv().await else {
panic!("Unexpected message type");
};
indices.insert(bv.index);
}
assert!(indices.contains(&1));
assert!(indices.contains(&2));
assert_eq!(indices.len(), 2);
});
}
}