use crate::{
events::{Event, TransmissionRequest, TransmissionResponse},
helpers::{fmt_id, Pending, Ready, Storage, WorkerReceiver},
ProposedBatch,
Transport,
MAX_BATCH_DELAY,
MAX_TRANSMISSIONS_PER_BATCH,
MAX_TRANSMISSIONS_PER_WORKER_PING,
MAX_WORKERS,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{
console::prelude::*,
ledger::narwhal::{Data, Transmission, TransmissionID},
prelude::{
block::Transaction,
coinbase::{ProverSolution, PuzzleCommitment},
},
};
use indexmap::{IndexMap, IndexSet};
use parking_lot::Mutex;
use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
const MAX_TRANSMISSIONS_PER_WORKER: usize = MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
#[derive(Clone)]
pub struct Worker<N: Network> {
id: u8,
gateway: Arc<dyn Transport<N>>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
proposed_batch: Arc<ProposedBatch<N>>,
ready: Ready<N>,
pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
}
impl<N: Network> Worker<N> {
pub fn new(
id: u8,
gateway: Arc<dyn Transport<N>>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
proposed_batch: Arc<ProposedBatch<N>>,
) -> Result<Self> {
ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
Ok(Self {
id,
gateway,
storage,
ledger,
proposed_batch,
ready: Default::default(),
pending: Default::default(),
handles: Default::default(),
})
}
pub fn run(&self, receiver: WorkerReceiver<N>) {
info!("Starting worker instance {} of the memory pool...", self.id);
self.start_handlers(receiver);
}
pub const fn id(&self) -> u8 {
self.id
}
}
impl<N: Network> Worker<N> {
pub fn num_transmissions(&self) -> usize {
self.ready.num_transmissions()
}
pub fn num_ratifications(&self) -> usize {
self.ready.num_ratifications()
}
pub fn num_solutions(&self) -> usize {
self.ready.num_solutions()
}
pub fn num_transactions(&self) -> usize {
self.ready.num_transactions()
}
}
impl<N: Network> Worker<N> {
pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
self.ready.transmission_ids()
}
pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
self.ready.transmissions()
}
pub fn solutions(&self) -> impl '_ + Iterator<Item = (PuzzleCommitment<N>, Data<ProverSolution<N>>)> {
self.ready.solutions()
}
pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
self.ready.transactions()
}
}
impl<N: Network> Worker<N> {
pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
let transmission_id = transmission_id.into();
self.ready.contains(transmission_id)
|| self.proposed_batch.read().as_ref().map_or(false, |p| p.contains_transmission(transmission_id))
|| self.storage.contains_transmission(transmission_id)
|| self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
}
pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
if let Some(transmission) = self.ready.get(transmission_id) {
return Some(transmission);
}
if let Some(transmission) = self.storage.get_transmission(transmission_id) {
return Some(transmission);
}
if let Some(transmission) =
self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
{
return Some(transmission.clone());
}
None
}
pub async fn get_or_fetch_transmission(
&self,
peer_ip: SocketAddr,
transmission_id: TransmissionID<N>,
) -> Result<(TransmissionID<N>, Transmission<N>)> {
if let Some(transmission) = self.get_transmission(transmission_id) {
return Ok((transmission_id, transmission));
}
let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
ensure!(candidate_id == transmission_id, "Invalid transmission ID");
Ok((transmission_id, transmission))
}
pub(crate) fn drain(&self, num_transmissions: usize) -> impl Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
self.ready.drain(num_transmissions).into_iter()
}
pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
if !self.contains_transmission(transmission_id) {
return self.ready.insert(transmission_id, transmission);
}
false
}
pub(crate) fn broadcast_ping(&self) {
let transmission_ids =
self.ready.transmission_ids().into_iter().take(MAX_TRANSMISSIONS_PER_WORKER_PING).collect::<IndexSet<_>>();
if !transmission_ids.is_empty() {
self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
}
}
}
impl<N: Network> Worker<N> {
fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
if self.contains_transmission(transmission_id) {
return;
}
if self.ready.num_transmissions() > MAX_TRANSMISSIONS_PER_WORKER {
return;
}
let self_ = self.clone();
tokio::spawn(async move {
match self_.send_transmission_request(peer_ip, transmission_id).await {
Ok((candidate_id, transmission)) => {
if candidate_id == transmission_id {
self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
}
}
Err(e) => {
warn!(
"Worker {} - Failed to fetch transmission '{}' from '{peer_ip}' (ping) - {e}",
self_.id,
fmt_id(transmission_id)
);
}
}
});
}
pub(crate) fn process_transmission_from_peer(
&self,
peer_ip: SocketAddr,
transmission_id: TransmissionID<N>,
transmission: Transmission<N>,
) {
if self.contains_transmission(transmission_id) {
return;
}
let is_well_formed = match (&transmission_id, &transmission) {
(TransmissionID::Solution(_), Transmission::Solution(_)) => true,
(TransmissionID::Transaction(_), Transmission::Transaction(_)) => true,
(TransmissionID::Ratification, Transmission::Ratification) => false,
_ => false,
};
if is_well_formed && self.ready.insert(transmission_id, transmission) {
trace!("Worker {} - Added transmission '{}' from '{peer_ip}'", self.id, fmt_id(transmission_id));
}
}
pub(crate) async fn process_unconfirmed_solution(
&self,
puzzle_commitment: PuzzleCommitment<N>,
prover_solution: Data<ProverSolution<N>>,
) -> Result<()> {
let transmission = Transmission::Solution(prover_solution.clone());
self.pending.remove(puzzle_commitment, Some(transmission.clone()));
if self.contains_transmission(puzzle_commitment) {
bail!("Solution '{}' already exists.", fmt_id(puzzle_commitment));
}
if let Err(e) = self.ledger.check_solution_basic(puzzle_commitment, prover_solution).await {
bail!("Invalid unconfirmed solution '{}': {e}", fmt_id(puzzle_commitment));
}
if self.ready.insert(puzzle_commitment, transmission) {
trace!("Worker {} - Added unconfirmed solution '{}'", self.id, fmt_id(puzzle_commitment));
}
Ok(())
}
pub(crate) async fn process_unconfirmed_transaction(
&self,
transaction_id: N::TransactionID,
transaction: Data<Transaction<N>>,
) -> Result<()> {
let transmission = Transmission::Transaction(transaction.clone());
self.pending.remove(&transaction_id, Some(transmission.clone()));
if self.contains_transmission(&transaction_id) {
bail!("Transaction '{}' already exists.", fmt_id(transaction_id));
}
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
bail!("Invalid unconfirmed transaction '{}': {e}", fmt_id(transaction_id));
}
if self.ready.insert(&transaction_id, transmission) {
trace!("Worker {} - Added unconfirmed transaction '{}'", self.id, fmt_id(transaction_id));
}
Ok(())
}
}
impl<N: Network> Worker<N> {
fn start_handlers(&self, receiver: WorkerReceiver<N>) {
let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
self_.process_transmission_id_from_ping(peer_ip, transmission_id);
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
self_.send_transmission_response(peer_ip, transmission_request);
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
self_.finish_transmission_request(peer_ip, transmission_response);
}
});
}
async fn send_transmission_request(
&self,
peer_ip: SocketAddr,
transmission_id: TransmissionID<N>,
) -> Result<(TransmissionID<N>, Transmission<N>)> {
let (callback_sender, callback_receiver) = oneshot::channel();
self.pending.insert(transmission_id, peer_ip, Some(callback_sender));
if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
bail!("Unable to fetch transmission - failed to send request")
}
match timeout(Duration::from_millis(MAX_BATCH_DELAY), callback_receiver).await {
Ok(result) => Ok((transmission_id, result?)),
Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
}
}
fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
let TransmissionResponse { transmission_id, transmission } = response;
let exists = self.pending.get(transmission_id).unwrap_or_default().contains(&peer_ip);
if exists {
self.pending.remove(transmission_id, Some(transmission));
}
}
fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
let TransmissionRequest { transmission_id } = request;
if let Some(transmission) = self.get_transmission(transmission_id) {
let self_ = self.clone();
tokio::spawn(async move {
self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
});
}
}
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
pub(crate) fn shut_down(&self) {
trace!("Shutting down worker {}...", self.id);
self.handles.lock().iter().for_each(|handle| handle.abort());
}
}
#[cfg(test)]
mod tests {
use super::*;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{
console::{network::Network, types::Field},
ledger::{
block::Block,
committee::Committee,
narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
},
};
use bytes::Bytes;
use indexmap::IndexMap;
use mockall::mock;
use std::{io, ops::Range};
type CurrentNetwork = snarkvm::prelude::Testnet3;
mock! {
Gateway<N: Network> {}
#[async_trait]
impl<N:Network> Transport<N> for Gateway<N> {
fn broadcast(&self, event: Event<N>);
async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
}
}
mock! {
#[derive(Debug)]
Ledger<N: Network> {}
#[async_trait]
impl<N: Network> LedgerService<N> for Ledger<N> {
fn latest_round(&self) -> u64;
fn latest_block_height(&self) -> u32;
fn latest_block(&self) -> Block<N>;
fn contains_block_height(&self, height: u32) -> bool;
fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
fn get_block(&self, height: u32) -> Result<Block<N>>;
fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
fn get_solution(&self, solution_id: &PuzzleCommitment<N>) -> Result<ProverSolution<N>>;
fn get_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
fn current_committee(&self) -> Result<Committee<N>>;
fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
fn get_previous_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
async fn check_solution_basic(
&self,
puzzle_commitment: PuzzleCommitment<N>,
solution: Data<ProverSolution<N>>,
) -> Result<()>;
async fn check_transaction_basic(
&self,
transaction_id: N::TransactionID,
transaction: Data<Transaction<N>>,
) -> Result<()>;
fn check_next_block(&self, block: &Block<N>) -> Result<()>;
fn prepare_advance_to_next_quorum_block(
&self,
subdag: Subdag<N>,
transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
) -> Result<Block<N>>;
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
}
}
#[tokio::test]
async fn test_process_transmission() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let gateway = MockGateway::default();
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
let transmission_id = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let transmission = Transmission::Solution(data(rng));
worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
assert!(worker.contains_transmission(transmission_id));
assert!(worker.ready.contains(transmission_id));
assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
let transmission: Vec<_> = worker.drain(1).collect();
assert_eq!(transmission.len(), 1);
assert!(!worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_send_transmission() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transmission_id = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
worker.finish_transmission_request(peer_ip, TransmissionResponse {
transmission_id,
transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
});
assert!(!worker.pending.contains(transmission_id));
}
#[tokio::test]
async fn test_process_solution_ok() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let puzzle = PuzzleCommitment::from_g1_affine(rng.gen());
let transmission_id = TransmissionID::Solution(puzzle);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker
.process_unconfirmed_solution(
puzzle,
Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())),
)
.await;
assert!(result.is_ok());
assert!(!worker.pending.contains(transmission_id));
assert!(worker.ready.contains(puzzle));
}
#[tokio::test]
async fn test_process_solution_nok() {
let rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let puzzle = PuzzleCommitment::from_g1_affine(rng.gen());
let transmission_id = TransmissionID::Solution(puzzle);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker
.process_unconfirmed_solution(
puzzle,
Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())),
)
.await;
assert!(result.is_err());
assert!(!worker.pending.contains(puzzle));
assert!(!worker.ready.contains(puzzle));
}
#[tokio::test]
async fn test_process_transaction_ok() {
let mut rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
let transmission_id = TransmissionID::Transaction(transaction_id);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker
.process_unconfirmed_transaction(
transaction_id,
Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())),
)
.await;
assert!(result.is_ok());
assert!(!worker.pending.contains(transmission_id));
assert!(worker.ready.contains(transmission_id));
}
#[tokio::test]
async fn test_process_transaction_nok() {
let mut rng = &mut TestRng::default();
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
let (_tx, rx) = oneshot::channel();
Some(rx)
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
let storage = Storage::<CurrentNetwork>::new(ledger.clone(), 1);
let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
let transmission_id = TransmissionID::Transaction(transaction_id);
let worker_ = worker.clone();
let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
assert!(worker.pending.contains(transmission_id));
let result = worker
.process_unconfirmed_transaction(
transaction_id,
Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())),
)
.await;
assert!(result.is_err());
assert!(!worker.pending.contains(transmission_id));
assert!(!worker.ready.contains(transmission_id));
}
}
#[cfg(test)]
mod prop_tests {
use super::*;
use crate::Gateway;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkvm::{
console::account::Address,
ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
};
use test_strategy::proptest;
type CurrentNetwork = snarkvm::prelude::Testnet3;
fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
let mut members = IndexMap::with_capacity(n as usize);
for i in 0..n {
let rng = &mut TestRng::fixed(i as u64);
let address = Address::new(rng.gen());
info!("Validator {i}: {address}");
members.insert(address, (MIN_VALIDATOR_STAKE, false));
}
Committee::<CurrentNetwork>::new(1u64, members).unwrap()
}
#[proptest]
fn worker_initialization(
#[strategy(0..MAX_WORKERS)] id: u8,
gateway: Gateway<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
) {
let committee = new_test_committee(4);
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
assert_eq!(worker.id(), id);
}
#[proptest]
fn invalid_worker_id(
#[strategy(MAX_WORKERS..)] id: u8,
gateway: Gateway<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
) {
let committee = new_test_committee(4);
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
if let Err(error) = worker {
assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
}
}
}