use crate::{
MAX_FETCH_TIMEOUT_IN_MS,
MAX_WORKERS,
ProposedBatch,
Transport,
events::{Event, TransmissionRequest, TransmissionResponse},
helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
spawn_blocking,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{
console::prelude::*,
ledger::{
block::Transaction,
narwhal::{BatchHeader, Data, Transmission, TransmissionID},
puzzle::{Solution, SolutionID},
},
};
use colored::Colorize;
use indexmap::{IndexMap, IndexSet};
use parking_lot::Mutex;
use rand::seq::IteratorRandom;
use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
#[derive(Clone)]
pub struct Worker<N: Network> {
id: u8,
gateway: Arc<dyn Transport<N>>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
proposed_batch: Arc<ProposedBatch<N>>,
ready: Ready<N>,
pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
}
impl<N: Network> Worker<N> {
pub fn new(
id: u8,
gateway: Arc<dyn Transport<N>>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
proposed_batch: Arc<ProposedBatch<N>>,
) -> Result<Self> {
ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
Ok(Self {
id,
gateway,
storage,
ledger,
proposed_batch,
ready: Default::default(),
pending: Default::default(),
handles: Default::default(),
})
}
pub fn run(&self, receiver: WorkerReceiver<N>) {
info!("Starting worker instance {} of the memory pool...", self.id);
self.start_handlers(receiver);
}
pub const fn id(&self) -> u8 {
self.id
}
pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
&self.pending
}
}
impl<N: Network> Worker<N> {
pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
pub fn num_transmissions(&self) -> usize {
self.ready.num_transmissions()
}
pub fn num_ratifications(&self) -> usize {
self.ready.num_ratifications()
}
pub fn num_solutions(&self) -> usize {
self.ready.num_solutions()
}
pub fn num_transactions(&self) -> usize {
self.ready.num_transactions()
}
}
impl<N: Network> Worker<N> {
pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
self.ready.transmission_ids()
}
pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
self.ready.transmissions()
}
pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
self.ready.solutions()
}
pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
self.ready.transactions()
}
}
impl<N: Network> Worker<N> {
pub(super) fn clear_solutions(&self) {
self.ready.clear_solutions()
}
}
impl<N: Network> Worker<N> {
pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
let transmission_id = transmission_id.into();
self.ready.contains(transmission_id)
|| self.proposed_batch.read().as_ref().map_or(false, |p| p.contains_transmission(transmission_id))
|| self.storage.contains_transmission(transmission_id)
|| self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
}
pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
if let Some(transmission) = self.ready.get(transmission_id) {
return Some(transmission);
}
if let Some(transmission) = self.storage.get_transmission(transmission_id) {
return Some(transmission);
}
if let Some(transmission) =
self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
{
return Some(transmission.clone());
}
None
}
pub async fn get_or_fetch_transmission(
&self,
peer_ip: SocketAddr,
transmission_id: TransmissionID<N>,
) -> Result<(TransmissionID<N>, Transmission<N>)> {
if let Some(transmission) = self.get_transmission(transmission_id) {
return Ok((transmission_id, transmission));
}
let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
ensure!(candidate_id == transmission_id, "Invalid transmission ID");
Ok((transmission_id, transmission))
}
pub(crate) fn drain(&self, num_transmissions: usize) -> impl Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
self.ready.drain(num_transmissions).into_iter()
}
pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
if !self.contains_transmission(transmission_id) {
return self.ready.insert(transmission_id, transmission);
}
false
}
pub(crate) fn broadcast_ping(&self) {
let transmission_ids = self
.ready
.transmission_ids()
.into_iter()
.choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
.into_iter()
.collect::<IndexSet<_>>();
if !transmission_ids.is_empty() {
self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
}
}
}
impl<N: Network> Worker<N> {
fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
if self.contains_transmission(transmission_id) {
return;
}
if self.ready.num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
return;
}
let self_ = self.clone();
tokio::spawn(async move {
match self_.send_transmission_request(peer_ip, transmission_id).await {
Ok((candidate_id, transmission)) => {
if candidate_id == transmission_id {
self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
}
}
Err(e) => {
warn!(
"Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
self_.id,
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
);
}
}
});
}
pub(crate) fn process_transmission_from_peer(
&self,
peer_ip: SocketAddr,
transmission_id: TransmissionID<N>,
transmission: Transmission<N>,
) {
if self.contains_transmission(transmission_id) {
return;
}
let is_well_formed = match (&transmission_id, &transmission) {
(TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
(TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
(TransmissionID::Ratification, Transmission::Ratification) => false,
_ => false,
};
if is_well_formed && self.ready.insert(transmission_id, transmission) {
trace!(
"Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
self.id,
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
);
}
}
pub(crate) async fn process_unconfirmed_solution(
&self,
solution_id: SolutionID<N>,
solution: Data<Solution<N>>,
) -> Result<()> {
let transmission = Transmission::Solution(solution.clone());
let checksum = solution.to_checksum::<N>()?;
let transmission_id = TransmissionID::Solution(solution_id, checksum);
self.pending.remove(transmission_id, Some(transmission.clone()));
if self.contains_transmission(transmission_id) {
bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
}
self.ledger.check_solution_basic(solution_id, solution).await?;
if self.ready.insert(transmission_id, transmission) {
trace!(
"Worker {} - Added unconfirmed solution '{}.{}'",
self.id,
fmt_id(solution_id),
fmt_id(checksum).dimmed()
);
}
Ok(())
}
pub(crate) async fn process_unconfirmed_transaction(
&self,
transaction_id: N::TransactionID,
transaction: Data<Transaction<N>>,
) -> Result<()> {
let transmission = Transmission::Transaction(transaction.clone());
let checksum = transaction.to_checksum::<N>()?;
let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
self.pending.remove(transmission_id, Some(transmission.clone()));
if self.contains_transmission(transmission_id) {
bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
}
self.ledger.check_transaction_basic(transaction_id, transaction).await?;
if self.ready.insert(transmission_id, transmission) {
trace!(
"Worker {}.{} - Added unconfirmed transaction '{}'",
self.id,
fmt_id(transaction_id),
fmt_id(checksum).dimmed()
);
}
Ok(())
}
}
impl<N: Network> Worker<N> {
fn start_handlers(&self, receiver: WorkerReceiver<N>) {
let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
let self_ = self.clone();
self.spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
let self__ = self_.clone();
let _ = spawn_blocking!({
self__.pending.clear_expired_callbacks();
Ok(())
});
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
self_.process_transmission_id_from_ping(peer_ip, transmission_id);
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
self_.send_transmission_response(peer_ip, transmission_request);
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
let self__ = self_.clone();
let _ = spawn_blocking!({
self__.finish_transmission_request(peer_ip, transmission_response);
Ok(())
});
}
});
}
async fn send_transmission_request(
&self,
peer_ip: SocketAddr,
transmission_id: TransmissionID<N>,
) -> Result<(TransmissionID<N>, Transmission<N>)> {
let (callback_sender, callback_receiver) = oneshot::channel();
let num_sent_requests = self.pending.num_sent_requests(transmission_id);
let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round());
let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
if should_send_request {
if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
bail!("Unable to fetch transmission - failed to send request")
}
} else {
debug!(
"Skipped sending request for transmission {}.{} to '{peer_ip}' ({num_sent_requests} redundant requests)",
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
);
}
match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
Ok(result) => Ok((transmission_id, result?)),
Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
}
}
fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
let TransmissionResponse { transmission_id, mut transmission } = response;
let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
if exists {
match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
Ok(()) => {
self.pending.remove(transmission_id, Some(transmission));
}
Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
};
}
}
fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
let TransmissionRequest { transmission_id } = request;
if let Some(transmission) = self.get_transmission(transmission_id) {
let self_ = self.clone();
tokio::spawn(async move {
self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
});
}
}
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
pub(crate) fn shut_down(&self) {
trace!("Shutting down worker {}...", self.id);
self.handles.lock().iter().for_each(|handle| handle.abort());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkvm::{
console::{network::Network, types::Field},
ledger::{
block::Block,
committee::Committee,
narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
},
prelude::Address,
};
use bytes::Bytes;
use indexmap::IndexMap;
use mockall::mock;
use std::{io, ops::Range};
type CurrentNetwork = snarkvm::prelude::MainnetV0;
const ITERATIONS: usize = 100;
mock! {
Gateway<N: Network> {}
#[async_trait]
impl<N:Network> Transport<N> for Gateway<N> {
fn broadcast(&self, event: Event<N>);
async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
}
}
mock! {
#[derive(Debug)]
Ledger<N: Network> {}
#[async_trait]
impl<N: Network> LedgerService<N> for Ledger<N> {
fn latest_round(&self) -> u64;
fn latest_block_height(&self) -> u32;
fn latest_block(&self) -> Block<N>;
fn latest_restrictions_id(&self) -> Field<N>;
fn latest_leader(&self) -> Option<(u64, Address<N>)>;
fn update_latest_leader(&self, round: u64, leader: Address<N>);
fn contains_block_height(&self, height: u32) -> bool;
fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
fn get_block_round(&self, height: u32) -> Result<u64>;
fn get_block(&self, height: u32) -> Result<Block<N>>;
fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
fn current_committee(&self) -> Result<Committee<N>>;
fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
fn ensure_transmission_is_well_formed(
&self,
transmission_id: TransmissionID<N>,
transmission: &mut Transmission<N>,
) -> Result<()>;
async fn check_solution_basic(
&self,
solution_id: SolutionID<N>,
solution: Data<Solution<N>>,
) -> Result<()>;
async fn check_transaction_basic(
&self,
transaction_id: N::TransactionID,
transaction: Data<Transaction<N>>,
) -> Result<()>;
fn check_next_block(&self, block: &Block<N>) -> Result<()>;
fn prepare_advance_to_next_quorum_block(
&self,
subdag: Subdag<N>,
transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
) -> Result<Block<N>>;
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
}
}
#[tokio::test]
async fn test_max_redundant_requests() {
const NUM_NODES: u16 = Committee::<CurrentNetwork>::MAX_COMMITTEE_SIZE;
let rng = &mut TestRng::default();
let committee =
snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, NUM_NODES, rng);
let committee_clone = committee.clone();
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
assert_eq!(max_redundant_requests(ledger, 0), 34, "Update me if the formula changes");
}
#[tokio::test]
async fn test_process_transmission() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let gateway = MockGateway::default();
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let transmission_id = TransmissionID::Solution(
rng.gen::<u64>().into(),
rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
);
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let transmission = Transmission::Solution(data(rng));
worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
assert!(worker.contains_transmission(transmission_id));
assert!(worker.ready.contains(transmission_id));
assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
let transmission: Vec<_> = worker.drain(1).collect();
assert_eq!(transmission.len(), 1);
assert!(!worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_send_transmission() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transmission_id = TransmissionID::Solution(
rng.gen::<u64>().into(),
rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
worker.finish_transmission_request(peer_ip, TransmissionResponse {
transmission_id,
transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
});
assert!(!worker.pending.contains(transmission_id));
}
#[tokio::test]
async fn test_process_solution_ok() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let solution_id = rng.gen::<u64>().into();
let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker.process_unconfirmed_solution(solution_id, solution).await;
assert!(result.is_ok());
assert!(!worker.pending.contains(transmission_id));
assert!(worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_process_solution_nok() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let solution_id = rng.gen::<u64>().into();
let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
let transmission_id = TransmissionID::Solution(solution_id, checksum);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker.process_unconfirmed_solution(solution_id, solution).await;
assert!(result.is_err());
assert!(!worker.pending.contains(transmission_id));
assert!(!worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_process_transaction_ok() {
let mut rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
assert!(result.is_ok());
assert!(!worker.pending.contains(transmission_id));
assert!(worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_process_transaction_nok() {
let mut rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
assert!(result.is_err());
assert!(!worker.pending.contains(transmission_id));
assert!(!worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_flood_transmission_requests() {
let mut rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
let num_redundant_requests = max_redundant_requests(worker.ledger.clone(), worker.storage.current_round());
let num_flood_requests = num_redundant_requests * 10;
let mut peer_ips =
(0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
let first_peer_ip = peer_ips[0];
for i in 1..=num_flood_requests {
let worker_ = worker.clone();
let peer_ip = peer_ips.pop().unwrap();
tokio::spawn(async move {
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
});
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
assert_eq!(worker.pending.num_callbacks(transmission_id), i);
}
assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
for i in 1..=num_flood_requests {
let worker_ = worker.clone();
tokio::spawn(async move {
let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
});
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
assert_eq!(worker.pending.num_callbacks(transmission_id), i);
}
assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
assert!(result.is_ok());
assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
assert!(!worker.pending.contains(transmission_id));
assert!(worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_storage_gc_on_initialization() {
let rng = &mut TestRng::default();
for _ in 0..ITERATIONS {
let max_gc_rounds = rng.gen_range(50..=100);
let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
let expected_gc_round = latest_ledger_round - max_gc_rounds;
let committee =
snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage =
Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
assert_eq!(storage.gc_round(), expected_gc_round);
}
}
}
#[cfg(test)]
mod prop_tests {
use super::*;
use crate::Gateway;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkvm::{
console::account::Address,
ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
};
use test_strategy::proptest;
type CurrentNetwork = snarkvm::prelude::MainnetV0;
fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
let mut members = IndexMap::with_capacity(n as usize);
for i in 0..n {
let rng = &mut TestRng::fixed(i as u64);
let address = Address::new(rng.gen());
info!("Validator {i}: {address}");
members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
}
Committee::<CurrentNetwork>::new(1u64, members).unwrap()
}
#[proptest]
fn worker_initialization(
#[strategy(0..MAX_WORKERS)] id: u8,
gateway: Gateway<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
) {
let committee = new_test_committee(4);
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
assert_eq!(worker.id(), id);
}
#[proptest]
fn invalid_worker_id(
#[strategy(MAX_WORKERS..)] id: u8,
gateway: Gateway<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
) {
let committee = new_test_committee(4);
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
if let Err(error) = worker {
assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
}
}
}