use bytes::Bytes;
use commonware_utils::{channel::oneshot, Span};
mod config;
pub use config::Config;
mod engine;
pub use engine::Engine;
mod fetcher;
mod inflight;
mod ingress;
pub use ingress::Mailbox;
mod metrics;
mod wire;
#[cfg(feature = "mocks")]
pub mod mocks;
pub trait Producer: Clone + Send + 'static {
type Key: Span;
fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes>;
}
#[cfg(test)]
mod tests {
use super::{
mocks::{Consumer, Key, Producer},
Config, Engine, Mailbox,
};
use crate::{Delivery, Fetch, Resolver, TargetedResolver};
use bytes::Bytes;
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Signer,
};
use commonware_macros::{select, test_traced};
use commonware_p2p::{
simulated::{Link, Network, Oracle, Receiver, Sender},
Blocker, Manager as _, Provider, TrackedPeers,
};
use commonware_runtime::{
deterministic, telemetry::metrics::count_running_tasks, Clock, Metrics as _, Quota, Runner,
Spawner as _, Supervisor as _,
};
use commonware_utils::{
channel::{fallible::FallibleExt, mpsc, oneshot},
non_empty_vec,
ordered::Set,
sync::Mutex,
NZUsize, NZU32,
};
use std::{
collections::{HashMap, VecDeque},
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
time::Duration,
};
const MAILBOX_SIZE: NonZeroUsize = NZUsize!(1024);
const RATE_LIMIT: NonZeroU32 = NZU32!(10);
const INITIAL_DURATION: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_millis(400);
const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
const LINK: Link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
const LINK_UNRELIABLE: Link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 0.5,
};
fn status_metric_total(metrics: &str, name: &str, status: &str) -> u64 {
let prefix = format!("{name}{{");
let status_label = format!("status=\"{status}\"");
metrics
.lines()
.filter(|line| line.starts_with(&prefix) && line.contains(&status_label))
.map(|line| {
line.split_whitespace()
.next_back()
.expect("metric line must have a value")
.parse::<u64>()
.expect("status metric value must be an integer")
})
.sum()
}
async fn setup_network_and_peers(
context: &deterministic::Context,
peer_seeds: &[u64],
) -> (
Oracle<PublicKey, deterministic::Context>,
Vec<PrivateKey>,
Vec<PublicKey>,
Vec<(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
)>,
) {
setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
.await
}
async fn setup_network_and_peers_with_rate_limit(
context: &deterministic::Context,
peer_seeds: &[u64],
rate_limit: Quota,
) -> (
Oracle<PublicKey, deterministic::Context>,
Vec<PrivateKey>,
Vec<PublicKey>,
Vec<(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
)>,
) {
let (network, oracle) = Network::new(
context.child("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let schemes: Vec<PrivateKey> = peer_seeds
.iter()
.map(|seed| PrivateKey::from_seed(*seed))
.collect();
let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
let mut manager = oracle.manager();
manager.track(0, Set::try_from(peers.clone()).unwrap());
let mut connections = Vec::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, rate_limit)
.await
.unwrap();
connections.push((sender, receiver));
}
(oracle, schemes, peers, connections)
}
async fn add_link(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
link: Link,
peers: &[PublicKey],
from: usize,
to: usize,
) {
oracle
.add_link(peers[from].clone(), peers[to].clone(), link.clone())
.await
.unwrap();
oracle
.add_link(peers[to].clone(), peers[from].clone(), link)
.await
.unwrap();
}
#[derive(Clone, Default)]
struct SequencedProducer {
data: Arc<Mutex<HashMap<Key, VecDeque<Bytes>>>>,
}
impl SequencedProducer {
fn insert(&mut self, key: Key, values: impl IntoIterator<Item = Bytes>) {
self.data.lock().insert(key, values.into_iter().collect());
}
fn remaining(&self, key: &Key) -> Vec<Bytes> {
self.data
.lock()
.get(key)
.map(|values| values.iter().cloned().collect())
.unwrap_or_default()
}
}
impl crate::p2p::Producer for SequencedProducer {
type Key = Key;
fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
let (sender, receiver) = oneshot::channel();
if let Some(value) = self.data.lock().get_mut(&key).and_then(VecDeque::pop_front) {
let _ = sender.send(value);
}
receiver
}
}
fn setup_and_spawn_actor<C, R>(
context: &deterministic::Context,
provider: impl Provider<PublicKey = PublicKey>,
blocker: impl Blocker<PublicKey = PublicKey>,
signer: impl Signer<PublicKey = PublicKey>,
connection: (
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
consumer: C,
producer: Producer<Key, Bytes>,
) -> Mailbox<Key, PublicKey, R>
where
C: crate::Consumer<Key = Key, Subscriber = R, Value = Bytes>,
R: Clone + Ord + Send + 'static,
{
setup_and_spawn_actor_with_producer(
context, provider, blocker, signer, connection, consumer, producer,
)
}
fn setup_and_spawn_actor_with_producer<C, R, Pro>(
context: &deterministic::Context,
provider: impl Provider<PublicKey = PublicKey>,
blocker: impl Blocker<PublicKey = PublicKey>,
signer: impl Signer<PublicKey = PublicKey>,
connection: (
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
consumer: C,
producer: Pro,
) -> Mailbox<Key, PublicKey, R>
where
C: crate::Consumer<Key = Key, Subscriber = R, Value = Bytes>,
Pro: crate::p2p::Producer<Key = Key>,
R: Clone + Ord + Send + 'static,
{
let public_key = signer.public_key();
let (engine, mailbox) = Engine::new(
context.child("actor").with_attribute("peer", &public_key),
Config {
peer_provider: provider,
blocker,
consumer,
producer,
mailbox_size: MAILBOX_SIZE,
me: Some(public_key),
initial: INITIAL_DURATION,
timeout: TIMEOUT,
fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
priority_requests: false,
priority_responses: false,
},
);
engine.start(connection);
mailbox
}
type DeliveryGate = (oneshot::Receiver<()>, bool);
type DeliveryGates = Arc<Mutex<VecDeque<DeliveryGate>>>;
#[derive(Clone)]
struct BlockingConsumer {
context: Arc<deterministic::Context>,
sender: mpsc::UnboundedSender<(Key, Bytes)>,
started: mpsc::UnboundedSender<Key>,
gates: DeliveryGates,
}
impl BlockingConsumer {
fn new(
context: deterministic::Context,
gates: Vec<DeliveryGate>,
) -> (
Self,
mpsc::UnboundedReceiver<(Key, Bytes)>,
mpsc::UnboundedReceiver<Key>,
) {
let (sender, receiver) = mpsc::unbounded_channel();
let (started, started_receiver) = mpsc::unbounded_channel();
(
Self {
context: Arc::new(context),
sender,
started,
gates: Arc::new(Mutex::new(gates.into())),
},
receiver,
started_receiver,
)
}
}
impl crate::Consumer for BlockingConsumer {
type Key = Key;
type Value = Bytes;
type Subscriber = ();
fn deliver(
&mut self,
delivery: Delivery<Self::Key, Self::Subscriber>,
value: Self::Value,
) -> oneshot::Receiver<bool> {
let key = delivery.key;
self.started.send_lossy(key.clone());
let (gate, valid) = self
.gates
.lock()
.pop_front()
.map_or((None, true), |(gate, valid)| (Some(gate), valid));
let (mut response, receiver) = oneshot::channel();
let sender = self.sender.clone();
self.context.child("delivery").spawn(move |_| async move {
if let Some(gate) = gate {
select! {
_ = response.closed() => return,
result = gate => {
if result.is_err() {
let _ = response.send(false);
return;
}
},
}
}
if valid {
sender.send_lossy((key, value));
}
let _ = response.send(valid);
});
receiver
}
}
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
struct SubscriberTag(u16);
type RecordedDelivery = (Delivery<Key, SubscriberTag>, Bytes);
#[derive(Clone)]
struct BlockingSubscriberRecordingConsumer {
context: Arc<deterministic::Context>,
sender: mpsc::UnboundedSender<RecordedDelivery>,
started: mpsc::UnboundedSender<Delivery<Key, SubscriberTag>>,
gates: DeliveryGates,
}
impl BlockingSubscriberRecordingConsumer {
fn new(
context: deterministic::Context,
gates: Vec<DeliveryGate>,
) -> (
Self,
mpsc::UnboundedReceiver<RecordedDelivery>,
mpsc::UnboundedReceiver<Delivery<Key, SubscriberTag>>,
) {
let (sender, receiver) = mpsc::unbounded_channel();
let (started, started_receiver) = mpsc::unbounded_channel();
(
Self {
context: Arc::new(context),
sender,
started,
gates: Arc::new(Mutex::new(gates.into())),
},
receiver,
started_receiver,
)
}
}
impl crate::Consumer for BlockingSubscriberRecordingConsumer {
type Key = Key;
type Value = Bytes;
type Subscriber = SubscriberTag;
fn deliver(
&mut self,
delivery: Delivery<Self::Key, Self::Subscriber>,
value: Self::Value,
) -> oneshot::Receiver<bool> {
self.started.send_lossy(delivery.clone());
let (gate, valid) = self
.gates
.lock()
.pop_front()
.map_or((None, true), |(gate, valid)| (Some(gate), valid));
let (mut response, receiver) = oneshot::channel();
let sender = self.sender.clone();
self.context.child("delivery").spawn(move |_| async move {
if let Some(gate) = gate {
select! {
_ = response.closed() => return,
result = gate => {
if result.is_err() {
let _ = response.send(false);
return;
}
},
}
}
if valid {
sender.send_lossy((delivery, value));
}
let _ = response.send(valid);
});
receiver
}
}
#[derive(Clone)]
struct SubscriberRecordingConsumer {
sender: mpsc::UnboundedSender<RecordedDelivery>,
}
impl SubscriberRecordingConsumer {
fn new() -> (Self, mpsc::UnboundedReceiver<RecordedDelivery>) {
let (sender, receiver) = mpsc::unbounded_channel();
(Self { sender }, receiver)
}
}
impl crate::Consumer for SubscriberRecordingConsumer {
type Key = Key;
type Value = Bytes;
type Subscriber = SubscriberTag;
fn deliver(
&mut self,
delivery: Delivery<Self::Key, Self::Subscriber>,
value: Self::Value,
) -> oneshot::Receiver<bool> {
let (sender, receiver) = oneshot::channel();
self.sender.send_lossy((delivery, value));
let _ = sender.send(true);
receiver
}
}
fn dummy_consumer() -> Consumer<Key, Bytes> {
Consumer::dummy()
}
fn consumer() -> (Consumer<Key, Bytes>, mpsc::UnboundedReceiver<(Key, Bytes)>) {
Consumer::new()
}
async fn wait_for_blocked(
context: &deterministic::Context,
oracle: &Oracle<PublicKey, deterministic::Context>,
blocker: &PublicKey,
blocked: &PublicKey,
) {
loop {
let blocked_peers = oracle.blocked().await.unwrap();
if blocked_peers
.iter()
.any(|(a, b)| a == blocker && b == blocked)
{
return;
}
context.sleep(Duration::from_millis(10)).await;
}
}
#[test_traced]
fn test_fetch_success() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(2);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 2"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 2"));
});
}
#[test_traced]
fn test_pending_delivery_does_not_block_engine() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let key1 = Key(1);
let key2 = Key(2);
let data1 = Bytes::from("data for key 1");
let data2 = Bytes::from("data for key 2");
let mut prod2 = Producer::default();
prod2.insert(key1.clone(), data1.clone());
let mut prod3 = Producer::default();
prod3.insert(key2.clone(), data2.clone());
let (gate_sender1, gate_receiver1) = oneshot::channel();
let (gate_sender2, gate_receiver2) = oneshot::channel();
let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
context.child("consumer"),
vec![(gate_receiver1, true), (gate_receiver2, true)],
);
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
mailbox1.fetch(key1.clone());
let started_key = started.recv().await.expect("delivery did not start");
assert_eq!(started_key, key1);
mailbox1.fetch(key2.clone());
select! {
started_key = started.recv() => {
assert_eq!(started_key.expect("delivery did not start"), key2);
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("resolver engine blocked on pending delivery");
},
};
gate_sender2.send(()).unwrap();
let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
assert_eq!(key_actual, key2);
assert_eq!(value, data2);
gate_sender1.send(()).unwrap();
let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
assert_eq!(key_actual, key1);
assert_eq!(value, data1);
});
}
#[test_traced]
fn test_retain_drops_pending_delivery() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let data = Bytes::from("data for key 1");
let mut prod2 = Producer::default();
prod2.insert(key.clone(), data.clone());
let (mut first_gate_sender, first_gate_receiver) = oneshot::channel();
let (second_gate_sender, second_gate_receiver) = oneshot::channel();
let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
context.child("consumer"),
vec![(first_gate_receiver, true), (second_gate_receiver, true)],
);
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.fetch(key.clone());
let started_key = started.recv().await.expect("delivery did not start");
assert_eq!(started_key, key);
let canceled = key.clone();
mailbox1.retain(move |key, _| key != &canceled);
mailbox1.fetch(key.clone());
first_gate_sender.closed().await;
let started_key = started.recv().await.expect("second delivery did not start");
assert_eq!(started_key, key);
second_gate_sender.send(()).unwrap();
let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
assert_eq!(key_actual, key);
assert_eq!(value, data);
select! {
_ = cons_out1.recv() => panic!("unexpected extra event"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
});
}
#[test_traced]
fn test_invalid_delivery_retries_and_rearms_slot() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let data = Bytes::from("data for key 1");
let mut prod2 = Producer::default();
prod2.insert(key.clone(), data.clone());
let mut prod3 = Producer::default();
prod3.insert(key.clone(), data.clone());
let (first_gate_sender, first_gate_receiver) = oneshot::channel();
let (second_gate_sender, second_gate_receiver) = oneshot::channel();
let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
context.child("consumer"),
vec![(first_gate_receiver, false), (second_gate_receiver, true)],
);
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
mailbox1.fetch_targeted(
key.clone(),
non_empty_vec![peers[1].clone(), peers[2].clone()],
);
let started_key = started.recv().await.expect("delivery did not start");
assert_eq!(started_key, key);
first_gate_sender.send(()).unwrap();
wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
oracle.manager().track(
1,
Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
);
let started_key = started.recv().await.expect("retry delivery did not start");
assert_eq!(started_key, key);
second_gate_sender.send(()).unwrap();
let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
assert_eq!(key_actual, key);
assert_eq!(value, data);
});
}
async fn run_pending_invalid_delivery_race(
context: &deterministic::Context,
validation_first: bool,
) {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 1"));
let (mut gate_sender, gate_receiver) = oneshot::channel();
let (cons1, mut cons_out1, mut started) =
BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, false)]);
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.fetch(key.clone());
let started_key = started.recv().await.expect("delivery did not start");
assert_eq!(started_key, key);
if validation_first {
gate_sender.send(()).unwrap();
wait_for_blocked(context, &oracle, &peers[0], &peers[1]).await;
mailbox1.retain(|_, _| false);
let blocked = oracle.blocked().await.unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].0, peers[0]);
assert_eq!(blocked[0].1, peers[1]);
} else {
mailbox1.retain(|_, _| false);
gate_sender.closed().await;
assert!(oracle.blocked().await.unwrap().is_empty());
}
select! {
_ = cons_out1.recv() => panic!("unexpected event"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
}
#[test_traced]
fn test_retain_pending_invalid_delivery_race() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
run_pending_invalid_delivery_race(&context, false).await;
run_pending_invalid_delivery_race(&context, true).await;
});
}
#[test_traced]
fn test_retain_drops_fetch() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (oracle, mut schemes, _peers, mut connections) =
setup_network_and_peers(&context, &[1]).await;
let (cons1, mut cons_out1) = consumer();
let prod1 = Producer::default();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
prod1,
);
let key = Key(3);
mailbox1.fetch(key.clone());
let canceled = key.clone();
mailbox1.retain(move |key, _| key != &canceled);
select! {
_ = cons_out1.recv() => panic!("unexpected event"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
});
}
#[test_traced]
fn test_peer_no_data() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let prod1 = Producer::default();
let prod2 = Producer::default();
let mut prod3 = Producer::default();
let key = Key(3);
prod3.insert(key.clone(), Bytes::from("data for key 3"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
prod1,
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 3"));
});
}
#[test_traced]
fn test_no_peers_available() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (oracle, mut schemes, _peers, mut connections) =
setup_network_and_peers(&context, &[1]).await;
let (cons1, mut cons_out1) = consumer();
let prod1 = Producer::default();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
prod1,
);
mailbox1.fetch(Key(4));
context.sleep(Duration::from_secs(5)).await;
select! {
_ = cons_out1.recv() => panic!("Fetch should have failed due to no peers"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
});
}
#[test_traced]
fn test_fetch_before_initial_peer_set_waits_for_update() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (network, mut oracle) = Network::new(
context.child("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut schemes = [1_u64, 2]
.into_iter()
.map(PrivateKey::from_seed)
.collect::<Vec<_>>();
schemes.sort_by_key(|s| s.public_key());
let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
let mut connections = Vec::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, Quota::per_second(RATE_LIMIT))
.await
.unwrap();
connections.push((sender, receiver));
}
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(2);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 2"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.fetch(key.clone());
select! {
event = cons_out1.recv() => {
panic!("fetch should wait for the initial peer set, got {event:?}");
},
_ = context.sleep(Duration::from_millis(200)) => {},
};
oracle
.manager()
.track(0, Set::try_from(peers.clone()).unwrap());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 2"));
});
}
#[test_traced]
fn test_concurrent_fetch_requests() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
let key2 = Key(2);
let key3 = Key(3);
let mut prod2 = Producer::default();
prod2.insert(key2.clone(), Bytes::from("data for key 2"));
let mut prod3 = Producer::default();
prod3.insert(key3.clone(), Bytes::from("data for key 3"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
for _ in 0..10 {
mailbox1.fetch(key2.clone());
mailbox1.fetch(key3.clone());
let mut events = Vec::new();
events.push(cons_out1.recv().await.expect("Consumer channel closed"));
events.push(cons_out1.recv().await.expect("Consumer channel closed"));
let mut found_key2 = false;
let mut found_key3 = false;
for (key_actual, value) in events {
if key_actual == key2 {
assert_eq!(value, Bytes::from("data for key 2"));
found_key2 = true;
} else if key_actual == key3 {
assert_eq!(value, Bytes::from("data for key 3"));
found_key3 = true;
} else {
panic!("Unexpected key received");
}
}
assert!(found_key2 && found_key3,);
}
});
}
#[test_traced]
fn test_retain_drops_key() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(6);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 6"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let canceled = key.clone();
mailbox1.retain(move |key, _| key != &canceled);
select! {
_ = cons_out1.recv() => {
panic!("unexpected event");
},
_ = context.sleep(Duration::from_millis(100)) => {},
};
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 6"));
let canceled = key.clone();
mailbox1.retain(move |key, _| key != &canceled);
select! {
_ = cons_out1.recv() => {
panic!("unexpected event");
},
_ = context.sleep(Duration::from_millis(100)) => {},
};
let key = Key(7);
mailbox1.fetch(key.clone());
let canceled = key.clone();
mailbox1.retain(move |key, _| key != &canceled);
select! {
_ = cons_out1.recv() => panic!("unexpected event"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
});
}
#[test_traced]
fn test_blocking_peer() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
let key_a = Key(1);
let key_b = Key(2);
let invalid_data_a = Bytes::from("invalid for A");
let valid_data_a = Bytes::from("valid for A");
let valid_data_b = Bytes::from("valid for B");
let mut prod2 = Producer::default();
prod2.insert(key_a.clone(), invalid_data_a.clone());
prod2.insert(key_b.clone(), valid_data_b.clone());
let mut prod3 = Producer::default();
prod3.insert(key_a.clone(), valid_data_a.clone());
let (mut cons1, mut cons_out1) = consumer();
cons1.add_expected(key_a.clone(), valid_data_a.clone());
cons1.add_expected(key_b.clone(), valid_data_b.clone());
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
for _ in 0..20 {
mailbox1.fetch(key_a.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key_a);
assert_eq!(value, valid_data_a);
}
mailbox1.fetch(key_b.clone());
context.sleep(Duration::from_secs(5)).await;
select! {
_ = cons_out1.recv() => panic!("unexpected event"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
let canceled = key_b.clone();
mailbox1.retain(move |key, _| key != &canceled);
let blocked = oracle.blocked().await.unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].0, peers[0]);
assert_eq!(blocked[0].1, peers[1]);
});
}
#[test_traced]
fn test_duplicate_fetch_key() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.fetch(key.clone());
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 5"));
select! {
_ = cons_out1.recv() => {
panic!("Unexpected second event received for duplicate fetch");
},
_ = context.sleep(Duration::from_millis(500)) => {
},
};
});
}
#[test_traced]
fn test_changing_peer_sets() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let key1 = Key(1);
let key2 = Key(2);
let mut prod2 = Producer::default();
prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
let mut prod3 = Producer::default();
prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.fetch(key1.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key1);
assert_eq!(value, Bytes::from("data from peer 2"));
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
context.sleep(Duration::from_millis(200)).await;
mailbox1.fetch(key2.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key2);
assert_eq!(value, Bytes::from("data from peer 3"));
});
}
#[test_traced]
fn test_fetch_targeted() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let key = Key(1);
let invalid_data = Bytes::from("invalid data");
let valid_data = Bytes::from("valid data");
let mut prod2 = Producer::default();
prod2.insert(key.clone(), invalid_data.clone());
let mut prod3 = Producer::default();
prod3.insert(key.clone(), valid_data.clone());
let (mut cons1, mut cons_out1) = consumer();
cons1.add_expected(key.clone(), valid_data.clone());
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch_targeted(
key.clone(),
non_empty_vec![peers[1].clone(), peers[2].clone()],
);
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, valid_data);
let blocked = oracle.blocked().await.unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].0, peers[0]);
assert_eq!(blocked[0].1, peers[1]);
let metrics = context.encode();
assert_eq!(
status_metric_total(&metrics, "actor_fetch_total", "Success"),
1
);
});
}
#[test_traced]
fn test_fetch_targeted_no_fallback() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
let key = Key(1);
let mut prod4 = Producer::default();
prod4.insert(key.clone(), Bytes::from("data from peer 4"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
Producer::default(), );
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
Producer::default(), );
let scheme = schemes.remove(0);
let _mailbox4 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod4,
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch_targeted(
key.clone(),
non_empty_vec![peers[1].clone(), peers[2].clone()],
);
select! {
event = cons_out1.recv() => {
panic!("Fetch should not succeed, but got: {event:?}");
},
_ = context.sleep(Duration::from_secs(3)) => {
},
};
});
}
#[test_traced]
fn test_fetch_all_targeted() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
let key1 = Key(1);
let key2 = Key(2);
let key3 = Key(3);
let mut prod2 = Producer::default();
prod2.insert(key1.clone(), Bytes::from("data for key 1"));
let mut prod3 = Producer::default();
prod3.insert(key3.clone(), Bytes::from("data for key 3"));
let mut prod4 = Producer::default();
prod4.insert(key2.clone(), Bytes::from("data for key 2"));
let (mut cons1, mut cons_out1) = consumer();
cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
let scheme = schemes.remove(0);
let _mailbox4 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod4,
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch_all_targeted(vec![
(key1.clone(), non_empty_vec![peers[1].clone()]), (key2.clone(), non_empty_vec![peers[3].clone()]), ]);
mailbox1.fetch(key3.clone());
let mut results = HashMap::new();
for _ in 0..3 {
let (key, value) = cons_out1.recv().await.unwrap();
results.insert(key, value);
}
assert_eq!(results.len(), 3);
assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
let metrics = context.encode();
assert_eq!(
status_metric_total(&metrics, "actor_fetch_total", "Success"),
3
);
});
}
#[test_traced]
fn test_fetch_clears_targets() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let key = Key(1);
let valid_data = Bytes::from("valid data");
let mut prod3 = Producer::default();
prod3.insert(key.clone(), valid_data.clone());
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
Producer::default(), );
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]);
context.sleep(Duration::from_millis(500)).await;
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, valid_data);
});
}
#[test_traced]
fn test_fetch_targeted_does_not_restrict_all() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let key = Key(1);
let valid_data = Bytes::from("valid data");
let mut prod3 = Producer::default();
prod3.insert(key.clone(), valid_data.clone());
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
Producer::default(), );
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch(key.clone());
context.sleep(Duration::from_millis(50)).await;
mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]);
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, valid_data);
});
}
#[test_traced]
fn test_retain() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.retain(|_, _| true);
select! {
_ = cons_out1.recv() => {
panic!("unexpected event");
},
_ = context.sleep(Duration::from_millis(100)) => {},
};
mailbox1.fetch(key.clone());
let key_clone = key.clone();
mailbox1.retain(move |key, _| key != &key_clone);
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 5"));
});
}
#[test_traced]
fn test_retain_uses_subscribers() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1): (Consumer<Key, Bytes, SubscriberTag>, _) = Consumer::new();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let dropped_subscriber = SubscriberTag(50);
let kept_subscriber = SubscriberTag(51);
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: dropped_subscriber,
});
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: kept_subscriber.clone(),
});
context.sleep(Duration::from_millis(100)).await;
mailbox1.retain(move |_, subscriber| subscriber == &kept_subscriber);
context.sleep(Duration::from_millis(100)).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 5"));
});
}
#[test_traced]
fn test_deliver_receives_subscribers() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let first_subscriber = SubscriberTag(50);
let second_subscriber = SubscriberTag(51);
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: second_subscriber.clone(),
});
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: first_subscriber.clone(),
});
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (delivery, value) = cons_out1.recv().await.unwrap();
assert_eq!(
delivery,
Delivery {
key,
subscribers: non_empty_vec![first_subscriber, second_subscriber],
}
);
assert_eq!(value, Bytes::from("data for key 5"));
});
}
#[test_traced]
fn test_deliver_receives_multiple_subscribers() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let first_subscriber = SubscriberTag(49);
let second_subscriber = SubscriberTag(50);
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: first_subscriber.clone(),
});
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: second_subscriber.clone(),
});
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (delivery, value) = cons_out1.recv().await.unwrap();
assert_eq!(
delivery,
Delivery {
key: key.clone(),
subscribers: non_empty_vec![first_subscriber, second_subscriber],
}
);
assert_eq!(value, Bytes::from("data for key 5"));
});
}
#[test_traced]
fn test_fetch_during_validation_reuses_response_after_success() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(5);
let first_response = Bytes::from("data for key 5");
let second_response = Bytes::from("refetched data for key 5");
let mut prod2 = SequencedProducer::default();
prod2.insert(
key.clone(),
[first_response.clone(), second_response.clone()],
);
let prod2_observer = prod2.clone();
let (first_gate_sender, first_gate_receiver) = oneshot::channel();
let (second_gate_sender, second_gate_receiver) = oneshot::channel();
let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new(
context.child("consumer"),
vec![(first_gate_receiver, true), (second_gate_receiver, true)],
);
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor_with_producer(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let first_subscriber = SubscriberTag(49);
let second_subscriber = SubscriberTag(50);
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: first_subscriber.clone(),
});
let delivery = started.recv().await.expect("delivery did not start");
assert_eq!(
delivery,
Delivery {
key: key.clone(),
subscribers: non_empty_vec![first_subscriber.clone()],
}
);
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: second_subscriber.clone(),
});
context.sleep(Duration::from_millis(100)).await;
assert_eq!(
prod2_observer.remaining(&key),
vec![second_response.clone()]
);
first_gate_sender.send(()).unwrap();
let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
assert_eq!(
delivery,
Delivery {
key: key.clone(),
subscribers: non_empty_vec![first_subscriber],
}
);
assert_eq!(value, first_response);
let delivery = select! {
delivery = started.recv() => delivery.expect("second delivery did not start"),
_ = context.sleep(Duration::from_secs(2)) => {
panic!("late subscriber was not delivered");
},
};
assert_eq!(
delivery,
Delivery {
key: key.clone(),
subscribers: non_empty_vec![second_subscriber.clone()],
}
);
second_gate_sender.send(()).unwrap();
let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
assert_eq!(
delivery,
Delivery {
key: key.clone(),
subscribers: non_empty_vec![second_subscriber],
}
);
assert_eq!(value, first_response);
assert_eq!(prod2_observer.remaining(&key), vec![second_response]);
});
}
#[test_traced]
fn test_late_subscriber_delivery_ignores_unrelated_waiter() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2, 3]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let blocked_key = Key(4);
let waiting_key = Key(5);
let main_key = Key(6);
let data = Bytes::from("data for key 6");
let mut prod2 = Producer::default();
prod2.insert(blocked_key.clone(), Bytes::from("bad data"));
let mut prod3 = Producer::default();
prod3.insert(main_key.clone(), data.clone());
let (first_gate_sender, first_gate_receiver) = oneshot::channel();
let (second_gate_sender, second_gate_receiver) = oneshot::channel();
let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new(
context.child("consumer"),
vec![(first_gate_receiver, false), (second_gate_receiver, true)],
);
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
mailbox1.fetch(Fetch {
key: blocked_key.clone(),
subscriber: SubscriberTag(1),
});
started
.recv()
.await
.expect("blocking delivery did not start");
first_gate_sender.send(()).unwrap();
wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await;
mailbox1.fetch_targeted(
Fetch {
key: waiting_key,
subscriber: SubscriberTag(2),
},
non_empty_vec![peers[1].clone()],
);
context.sleep(Duration::from_millis(100)).await;
let first_subscriber = SubscriberTag(3);
let second_subscriber = SubscriberTag(4);
mailbox1.fetch(Fetch {
key: main_key.clone(),
subscriber: first_subscriber.clone(),
});
let delivery = started.recv().await.expect("delivery did not start");
assert_eq!(
delivery,
Delivery {
key: main_key.clone(),
subscribers: non_empty_vec![first_subscriber.clone()],
}
);
mailbox1.fetch(Fetch {
key: main_key.clone(),
subscriber: second_subscriber.clone(),
});
context.sleep(Duration::from_millis(100)).await;
second_gate_sender.send(()).unwrap();
let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
assert_eq!(
delivery,
Delivery {
key: main_key.clone(),
subscribers: non_empty_vec![first_subscriber],
}
);
assert_eq!(value, data);
let delivery = select! {
delivery = started.recv() => delivery.expect("second delivery did not start"),
_ = context.sleep(Duration::from_secs(2)) => {
panic!("late subscriber was not delivered while an unrelated waiter was armed");
},
};
assert_eq!(
delivery,
Delivery {
key: main_key,
subscribers: non_empty_vec![second_subscriber],
}
);
});
}
#[test_traced]
fn test_deliver_receives_distinct_subscriber_type() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let subscriber = SubscriberTag(50);
let retained = subscriber.clone();
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: subscriber.clone(),
});
context.sleep(Duration::from_millis(100)).await;
mailbox1.retain(move |_, subscriber| subscriber == &retained);
context.sleep(Duration::from_millis(100)).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (delivery, value) = cons_out1.recv().await.unwrap();
assert_eq!(
delivery,
Delivery {
key,
subscribers: non_empty_vec![subscriber],
}
);
assert_eq!(value, Bytes::from("data for key 5"));
});
}
#[test_traced]
fn test_deliver_receives_single_subscriber() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(5);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 5"));
let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let subscriber = SubscriberTag(50);
mailbox1.fetch(Fetch {
key: key.clone(),
subscriber: subscriber.clone(),
});
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (delivery, value) = cons_out1.recv().await.unwrap();
assert_eq!(
delivery,
Delivery {
key,
subscribers: non_empty_vec![subscriber],
}
);
assert_eq!(value, Bytes::from("data for key 5"));
});
}
#[test_traced]
fn test_retain_drops_all() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
let key = Key(6);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 6"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
mailbox1.retain(|_, _| false);
select! {
_ = cons_out1.recv() => {
panic!("unexpected event");
},
_ = context.sleep(Duration::from_millis(100)) => {},
};
mailbox1.fetch(key.clone());
mailbox1.retain(|_, _| false);
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, Bytes::from("data for key 6"));
});
}
#[test_traced]
fn test_rate_limit_spillover() {
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers_with_rate_limit(
&context,
&[1, 2, 3],
Quota::per_second(NZU32!(1)),
)
.await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let mut prod2 = Producer::default();
let mut prod3 = Producer::default();
prod2.insert(Key(0), Bytes::from("data for key 0"));
prod2.insert(Key(1), Bytes::from("data for key 1"));
prod3.insert(Key(0), Bytes::from("data for key 0"));
prod3.insert(Key(1), Bytes::from("data for key 1"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
context.sleep(Duration::from_millis(100)).await;
let start = context.current();
mailbox1.fetch(Key(0));
mailbox1.fetch(Key(1));
let mut results = HashMap::new();
for _ in 0..2 {
let (key, value) = cons_out1.recv().await.unwrap();
results.insert(key.clone(), value);
}
assert_eq!(results.len(), 2);
assert_eq!(
results.get(&Key(0)).unwrap(),
&Bytes::from("data for key 0")
);
assert_eq!(
results.get(&Key(1)).unwrap(),
&Bytes::from("data for key 1")
);
let elapsed = context.current().duration_since(start).unwrap();
assert!(
elapsed < Duration::from_millis(500),
"Expected quick completion via spill-over, but took {elapsed:?}"
);
});
}
#[test_traced]
fn test_rate_limit_retry_after_reset() {
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers_with_rate_limit(
&context,
&[1, 2],
Quota::per_second(NZU32!(1)),
)
.await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let mut prod2 = Producer::default();
prod2.insert(Key(1), Bytes::from("data for key 1"));
prod2.insert(Key(2), Bytes::from("data for key 2"));
prod2.insert(Key(3), Bytes::from("data for key 3"));
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
context.sleep(Duration::from_millis(100)).await;
let start = context.current();
mailbox1.fetch(Key(1));
mailbox1.fetch(Key(2));
mailbox1.fetch(Key(3));
let mut results = HashMap::new();
for _ in 0..3 {
let (key, value) = cons_out1.recv().await.unwrap();
results.insert(key.clone(), value);
}
assert_eq!(results.len(), 3);
for i in 1..=3 {
assert_eq!(
results.get(&Key(i)).unwrap(),
&Bytes::from(format!("data for key {}", i))
);
}
let elapsed = context.current().duration_since(start).unwrap();
assert!(
elapsed > Duration::from_secs(2),
"Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
);
});
}
#[test_traced]
fn test_self_exclusion() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let data = Bytes::from("shared data");
let mut prod1 = Producer::default();
prod1.insert(key.clone(), data.clone());
let mut prod2 = Producer::default();
prod2.insert(key.clone(), data.clone());
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
prod1, );
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, data);
});
}
#[test_traced]
fn test_fetch_uses_primary_peers_only() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.child("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let schemes: Vec<PrivateKey> = [1u64, 2, 3]
.into_iter()
.map(PrivateKey::from_seed)
.collect();
let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
let mut schemes = schemes;
let mut connections = Vec::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, Quota::per_second(RATE_LIMIT))
.await
.unwrap();
connections.push((sender, receiver));
}
let mut oracle = oracle;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
oracle.manager().track(
1,
TrackedPeers::new(
Set::try_from([peers[1].clone()]).unwrap(),
Set::try_from([peers[2].clone()]).unwrap(),
),
);
context.sleep(Duration::from_millis(100)).await;
let key = Key(1);
let data = Bytes::from("secondary only data");
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
Producer::default(),
);
let mut prod3 = Producer::default();
prod3.insert(key.clone(), data);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
mailbox1.fetch(key.clone());
select! {
event = cons_out1.recv() => {
panic!("fetch should not succeed from a secondary peer, got: {event:?}");
},
_ = context.sleep(Duration::from_secs(2)) => {},
}
});
}
#[test_traced]
fn test_fetch_uses_latest_primary_set_only() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.child("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
},
);
network.start();
let schemes: Vec<PrivateKey> = [1u64, 2, 3]
.into_iter()
.map(PrivateKey::from_seed)
.collect();
let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
let mut schemes = schemes;
let mut connections = Vec::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, Quota::per_second(RATE_LIMIT))
.await
.unwrap();
connections.push((sender, receiver));
}
let mut oracle = oracle;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
oracle
.manager()
.track(
0,
Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
);
context.sleep(Duration::from_millis(100)).await;
let key = Key(7);
let targeted_key = Key(8);
let data = Bytes::from("old primary data");
let (cons1, mut cons_out1) = consumer();
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), data.clone());
prod2.insert(targeted_key.clone(), data);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
Producer::default(),
);
context.sleep(Duration::from_millis(100)).await;
oracle
.manager()
.track(
1,
Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch(key);
select! {
event = cons_out1.recv() => {
panic!(
"fetch should not succeed from an old primary retained only in the overlap window, got: {event:?}"
);
},
_ = context.sleep(Duration::from_secs(1)) => {},
}
mailbox1
.fetch_targeted(targeted_key, non_empty_vec![peers[1].clone()]);
select! {
event = cons_out1.recv() => {
panic!(
"targeted fetch should not bypass the latest-primary filter, got: {event:?}"
);
},
_ = context.sleep(Duration::from_secs(1)) => {},
}
});
}
#[test_traced]
fn test_fetch_after_cutover_relies_on_latest_primary_history() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.child("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
},
);
network.start();
let schemes: Vec<PrivateKey> = [1u64, 2, 3]
.into_iter()
.map(PrivateKey::from_seed)
.collect();
let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
let mut schemes = schemes;
let mut connections = Vec::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, Quota::per_second(RATE_LIMIT))
.await
.unwrap();
connections.push((sender, receiver));
}
let mut oracle = oracle;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
oracle.manager().track(
0,
Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
);
context.sleep(Duration::from_millis(100)).await;
let key = Key(9);
let invalid_history = Bytes::from("stale overlap history");
let valid_history = Bytes::from("latest primary history");
let (mut cons1, mut cons_out1) = consumer();
cons1.add_expected(key.clone(), valid_history.clone());
let scheme = schemes.remove(0);
let mut mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons1,
Producer::default(),
);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), invalid_history);
let scheme = schemes.remove(0);
let _mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod2,
);
let mut prod3 = Producer::default();
prod3.insert(key.clone(), valid_history.clone());
let scheme = schemes.remove(0);
let _mailbox3 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod3,
);
context.sleep(Duration::from_millis(100)).await;
oracle.manager().track(
1,
Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
);
context.sleep(Duration::from_millis(100)).await;
mailbox1.fetch(key.clone());
let (key_actual, value) = cons_out1.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, valid_history);
assert!(
oracle.blocked().await.unwrap().is_empty(),
"overlap-only peers should not be queried for post-cutover history"
);
});
}
#[test_traced]
fn test_secondary_peer_requests_are_served() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
oracle.manager().track(
1,
TrackedPeers::new(
Set::try_from([peers[0].clone()]).unwrap(),
Set::try_from([peers[1].clone()]).unwrap(),
),
);
context.sleep(Duration::from_millis(100)).await;
let key = Key(9);
let data = Bytes::from("served to secondary");
let mut prod1 = Producer::default();
prod1.insert(key.clone(), data.clone());
let scheme = schemes.remove(0);
let _mailbox1 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
dummy_consumer(),
prod1,
);
let (mut cons2, mut cons_out2) = consumer();
cons2.add_expected(key.clone(), data.clone());
let scheme = schemes.remove(0);
let mut mailbox2 = setup_and_spawn_actor(
&context,
oracle.manager(),
oracle.control(scheme.public_key()),
scheme,
connections.remove(0),
cons2,
Producer::default(),
);
mailbox2.fetch_targeted(key.clone(), non_empty_vec![peers[0].clone()]);
let (key_actual, value) = cons_out2.recv().await.unwrap();
assert_eq!(key_actual, key);
assert_eq!(value, data);
});
}
#[test_traced]
fn test_shutdown_aborts_pending_delivery_without_leaked_tasks() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, mut schemes, peers, mut connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let data = Bytes::from("data for key 1");
let mut prod2 = Producer::default();
prod2.insert(key.clone(), data);
let (mut gate_sender, gate_receiver) = oneshot::channel();
let (cons1, mut cons_out1, mut started) =
BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, true)]);
let actor_context = context.child("actor");
let scheme = schemes.remove(0);
let public_key = scheme.public_key();
let (engine, mut mailbox1): (_, Mailbox<Key, PublicKey>) = Engine::new(
actor_context.child("peer").with_attribute("index", 0),
Config {
peer_provider: oracle.manager(),
blocker: oracle.control(public_key.clone()),
consumer: cons1,
producer: Producer::<Key, Bytes>::default(),
mailbox_size: MAILBOX_SIZE,
me: Some(public_key),
initial: INITIAL_DURATION,
timeout: TIMEOUT,
fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
priority_requests: false,
priority_responses: false,
},
);
let handle1 = engine.start(connections.remove(0));
let scheme = schemes.remove(0);
let public_key = scheme.public_key();
let (engine, _mailbox2): (_, Mailbox<Key, PublicKey>) = Engine::new(
actor_context.child("peer").with_attribute("index", 1),
Config {
peer_provider: oracle.manager(),
blocker: oracle.control(public_key.clone()),
consumer: dummy_consumer(),
producer: prod2,
mailbox_size: MAILBOX_SIZE,
me: Some(public_key),
initial: INITIAL_DURATION,
timeout: TIMEOUT,
fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
priority_requests: false,
priority_responses: false,
},
);
let handle2 = engine.start(connections.remove(0));
mailbox1.fetch(key.clone());
let started_key = started.recv().await.expect("delivery did not start");
assert_eq!(started_key, key);
assert!(count_running_tasks(&context, "actor") > 0);
handle1.abort();
handle2.abort();
context.sleep(Duration::from_millis(100)).await;
select! {
_ = gate_sender.closed() => {},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("pending delivery was not aborted");
},
};
select! {
event = cons_out1.recv() => assert!(event.is_none(), "unexpected event"),
_ = context.sleep(Duration::from_millis(100)) => {},
};
let running_after = count_running_tasks(&context, "actor");
assert_eq!(
running_after, 0,
"all actor tasks should be stopped, but {running_after} still running"
);
});
}
#[allow(clippy::type_complexity)]
fn spawn_actors_with_handles(
context: &deterministic::Context,
oracle: &Oracle<PublicKey, deterministic::Context>,
schemes: Vec<PrivateKey>,
connections: Vec<(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
)>,
consumers: Vec<Consumer<Key, Bytes>>,
producers: Vec<Producer<Key, Bytes>>,
) -> (
Vec<Mailbox<Key, PublicKey>>,
Vec<commonware_runtime::Handle<()>>,
) {
let actor_context = context.child("actor");
let mut mailboxes = Vec::new();
let mut handles = Vec::new();
for (idx, ((scheme, conn), (consumer, producer))) in schemes
.into_iter()
.zip(connections)
.zip(consumers.into_iter().zip(producers))
.enumerate()
{
let ctx = actor_context.child("peer").with_attribute("index", idx);
let public_key = scheme.public_key();
let (engine, mailbox) = Engine::new(
ctx,
Config {
peer_provider: oracle.manager(),
blocker: oracle.control(public_key.clone()),
consumer,
producer,
mailbox_size: MAILBOX_SIZE,
me: Some(public_key),
initial: INITIAL_DURATION,
timeout: TIMEOUT,
fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
priority_requests: false,
priority_responses: false,
},
);
handles.push(engine.start(conn));
mailboxes.push(mailbox);
}
(mailboxes, handles)
}
#[test_traced]
fn test_operations_after_shutdown_do_not_panic() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 1"));
let (cons1, mut cons_out1) = consumer();
let (mut mailboxes, handles) = spawn_actors_with_handles(
&context,
&oracle,
schemes,
connections,
vec![cons1, dummy_consumer()],
vec![Producer::default(), prod2],
);
mailboxes[0].fetch(key.clone());
let (_, value) = cons_out1.recv().await.unwrap();
assert_eq!(value, Bytes::from("data for key 1"));
for handle in handles {
handle.abort();
}
context.sleep(Duration::from_millis(100)).await;
let key2 = Key(2);
mailboxes[0].fetch(key2.clone());
let canceled = key2;
mailboxes[0].retain(move |key, _| key != &canceled);
mailboxes[0].retain(|_, _| true);
mailboxes[0].fetch_targeted(Key(3), non_empty_vec![peers[1].clone()]);
});
}
fn clean_shutdown(seed: u64) {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[1, 2]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let key = Key(1);
let mut prod2 = Producer::default();
prod2.insert(key.clone(), Bytes::from("data for key 1"));
let (cons1, mut cons_out1) = consumer();
let (mut mailboxes, handles) = spawn_actors_with_handles(
&context,
&oracle,
schemes,
connections,
vec![cons1, dummy_consumer()],
vec![Producer::default(), prod2],
);
context.sleep(Duration::from_millis(100)).await;
let running_before = count_running_tasks(&context, "actor");
assert!(
running_before > 0,
"at least one actor task should be running"
);
mailboxes[0].fetch(key.clone());
let (_, value) = cons_out1.recv().await.unwrap();
assert_eq!(value, Bytes::from("data for key 1"));
for handle in handles {
handle.abort();
}
context.sleep(Duration::from_millis(100)).await;
let running_after = count_running_tasks(&context, "actor");
assert_eq!(
running_after, 0,
"all actor tasks should be stopped, but {running_after} still running"
);
});
}
#[test]
fn test_clean_shutdown() {
for seed in 0..25 {
clean_shutdown(seed);
}
}
}