Skip to main content

snarkos_node_bft/
worker.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(not(test))]
17use crate::Gateway;
18use crate::{
19    MAX_FETCH_TIMEOUT,
20    MAX_WORKERS,
21    ProposedBatch,
22    ProposedBatchState,
23    Transport,
24    events::{Event, TransmissionRequest, TransmissionResponse},
25    helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
26    spawn_blocking,
27};
28use snarkos_node_bft_ledger_service::LedgerService;
29use snarkvm::{
30    console::prelude::*,
31    ledger::{
32        Transaction,
33        narwhal::{BatchHeader, Data, Transmission, TransmissionID},
34        puzzle::{Solution, SolutionID},
35    },
36};
37
38use anyhow::Context;
39use colored::{ColoredString, Colorize};
40use indexmap::{IndexMap, IndexSet};
41#[cfg(feature = "locktick")]
42use locktick::parking_lot::{Mutex, RwLock};
43#[cfg(not(feature = "locktick"))]
44use parking_lot::{Mutex, RwLock};
45use rand::seq::IteratorRandom;
46
47use std::{future::Future, net::SocketAddr, sync::Arc};
48use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
49
50/// A worker's main role is maintaining a queue of verified ("ready") transmissions,
51/// which will eventually be fetched by the primary when the primary generates a new batch.
52#[derive(Clone)]
53pub struct Worker<N: Network> {
54    /// The worker ID.
55    id: u8,
56    /// The gateway.
57    #[cfg(not(test))]
58    gateway: Arc<Gateway<N>>,
59    #[cfg(test)]
60    gateway: Arc<dyn Transport<N>>,
61    /// The storage.
62    storage: Storage<N>,
63    /// The ledger service.
64    ledger: Arc<dyn LedgerService<N>>,
65    /// The proposed batch.
66    proposed_batch: Arc<ProposedBatch<N>>,
67    /// The ready queue.
68    ready: Arc<RwLock<Ready<N>>>,
69    /// The pending transmissions queue.
70    pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
71    /// The spawned handles.
72    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
73}
74
75impl<N: Network> Worker<N> {
76    /// Initializes a new worker instance.
77    pub fn new(
78        id: u8,
79        #[cfg(not(test))] gateway: Arc<Gateway<N>>,
80        #[cfg(test)] gateway: Arc<dyn Transport<N>>,
81        storage: Storage<N>,
82        ledger: Arc<dyn LedgerService<N>>,
83        proposed_batch: Arc<ProposedBatch<N>>,
84    ) -> Result<Self> {
85        // Ensure the worker ID is valid.
86        ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
87        // Return the worker.
88        Ok(Self {
89            id,
90            gateway,
91            storage,
92            ledger,
93            proposed_batch,
94            ready: Default::default(),
95            pending: Default::default(),
96            handles: Default::default(),
97        })
98    }
99
100    /// Run the worker instance.
101    pub fn run(&self, receiver: WorkerReceiver<N>) {
102        info!("Starting worker instance {} of the memory pool...", self.id);
103        // Start the worker handlers.
104        self.start_handlers(receiver);
105    }
106
107    /// Returns the worker ID.
108    pub const fn id(&self) -> u8 {
109        self.id
110    }
111
112    /// Returns a reference to the pending transmissions queue.
113    pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
114        &self.pending
115    }
116}
117
118impl<N: Network> Worker<N> {
119    /// The maximum number of transmissions allowed in a worker.
120    pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
121        BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
122    /// The maximum number of transmissions allowed in a worker ping.
123    pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
124
125    /// Returns the number of transmissions in the ready queue.
126    pub fn num_transmissions(&self) -> usize {
127        self.ready.read().num_transmissions()
128    }
129
130    /// Returns the number of ratifications in the ready queue.
131    pub fn num_ratifications(&self) -> usize {
132        self.ready.read().num_ratifications()
133    }
134
135    /// Returns the number of solutions in the ready queue.
136    pub fn num_solutions(&self) -> usize {
137        self.ready.read().num_solutions()
138    }
139
140    /// Returns the number of transactions in the ready queue.
141    pub fn num_transactions(&self) -> usize {
142        self.ready.read().num_transactions()
143    }
144}
145
146impl<N: Network> Worker<N> {
147    /// Returns the transmission IDs in the ready queue.
148    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
149        self.ready.read().transmission_ids()
150    }
151
152    /// Returns the transmissions in the ready queue.
153    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
154        self.ready.read().transmissions()
155    }
156
157    /// Returns the solutions in the ready queue.
158    pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
159        self.ready.read().solutions().into_iter()
160    }
161
162    /// Returns the transactions in the ready queue.
163    pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
164        self.ready.read().transactions().into_iter()
165    }
166}
167
168impl<N: Network> Worker<N> {
169    /// Clears the solutions from the ready queue.
170    pub(super) fn clear_solutions(&self) {
171        self.ready.write().clear_solutions()
172    }
173}
174
175impl<N: Network> Worker<N> {
176    // Helper to print the transmission ID and checksum (if any).
177    fn format_transmission_id(&self, transmission_id: TransmissionID<N>) -> ColoredString {
178        if let Some(checksum) = transmission_id.checksum() {
179            // fmt_id will suffix with `..`, so we should not use `.`  as a separator.
180            format!("{}:{}", fmt_id(transmission_id), fmt_id(checksum))
181        } else {
182            fmt_id(transmission_id)
183        }
184        .dimmed()
185    }
186
187    /// Returns `true` if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
188    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
189        let transmission_id = transmission_id.into();
190        // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
191        self.ready.read().contains(transmission_id)
192            || matches!(&*self.proposed_batch.read(), ProposedBatchState::Certifying(p) if p.contains_transmission(transmission_id))
193            || self.storage.contains_transmission(transmission_id)
194            || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
195    }
196
197    /// Returns the transmission if it exists in the ready queue, proposed batch, storage.
198    ///
199    /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
200    /// in the ledger are not guaranteed to be invalid for the current batch.
201    pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
202        // Check if the transmission ID exists in the ready queue.
203        if let Some(transmission) = self.ready.read().get(transmission_id) {
204            return Some(transmission);
205        }
206        // Check if the transmission ID exists in storage.
207        if let Some(transmission) = self.storage.get_transmission(transmission_id) {
208            return Some(transmission);
209        }
210        // Check if the transmission ID exists in the proposed batch.
211        if let Some(transmission) = match &*self.proposed_batch.read() {
212            ProposedBatchState::Certifying(p) => p.get_transmission(transmission_id),
213            _ => None,
214        } {
215            return Some(transmission.clone());
216        }
217        None
218    }
219
220    /// Returns the transmissions if it exists in the worker, or requests it from the specified peer.
221    pub async fn get_or_fetch_transmission(
222        &self,
223        peer_ip: SocketAddr,
224        transmission_id: TransmissionID<N>,
225    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
226        // Attempt to get the transmission from the worker.
227        if let Some(transmission) = self.get_transmission(transmission_id) {
228            return Ok((transmission_id, transmission));
229        }
230        // Send a transmission request to the peer.
231        let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
232        // Ensure the transmission ID matches.
233        ensure!(candidate_id == transmission_id, "Invalid transmission ID");
234        // Return the transmission.
235        Ok((transmission_id, transmission))
236    }
237
238    /// Inserts the transmission at the front of the ready queue.
239    pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
240        self.ready.write().insert_front(key, value);
241    }
242
243    /// Removes and returns the transmission at the front of the ready queue.
244    pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
245        self.ready.write().remove_front()
246    }
247
248    /// Reinserts the specified transmission into the ready queue.
249    pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
250        // Check if the transmission ID exists.
251        if !self.contains_transmission(transmission_id) {
252            // Insert the transmission into the ready queue.
253            return self.ready.write().insert(transmission_id, transmission);
254        }
255        false
256    }
257
258    /// Broadcasts a worker ping event.
259    pub(crate) fn broadcast_ping(&self) {
260        // Retrieve the transmission IDs.
261        let transmission_ids = self
262            .ready
263            .read()
264            .transmission_ids()
265            .into_iter()
266            .sample(&mut rand::rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
267            .into_iter()
268            .collect::<IndexSet<_>>();
269
270        // Broadcast the ping event.
271        if !transmission_ids.is_empty() {
272            self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
273        }
274    }
275}
276
277impl<N: Network> Worker<N> {
278    /// Handles the incoming transmission ID from a worker ping event.
279    fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
280        // Check if the transmission ID exists.
281        if self.contains_transmission(transmission_id) {
282            return;
283        }
284        // If the ready queue is full, then skip this transmission.
285        // Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions.
286        if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
287            return;
288        }
289        // Attempt to fetch the transmission from the peer.
290        let self_ = self.clone();
291        tokio::spawn(async move {
292            // Send a transmission request to the peer.
293            match self_.send_transmission_request(peer_ip, transmission_id).await {
294                // If the transmission was fetched, then process it.
295                Ok((candidate_id, transmission)) => {
296                    // Ensure the transmission ID matches.
297                    if candidate_id == transmission_id {
298                        // Insert the transmission into the ready queue.
299                        // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched,
300                        // it could have already been inserted into the ready queue.
301                        self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
302                    }
303                }
304                // If the transmission was not fetched, then attempt to fetch it again.
305                Err(e) => {
306                    warn!(
307                        "Worker {} - Failed to fetch transmission '{}' from '{peer_ip}' (ping) - {e}",
308                        self_.id,
309                        self_.format_transmission_id(transmission_id),
310                    );
311                }
312            }
313        });
314    }
315
316    /// Handles the incoming transmission from a peer.
317    pub(crate) fn process_transmission_from_peer(
318        &self,
319        peer_ip: SocketAddr,
320        transmission_id: TransmissionID<N>,
321        transmission: Transmission<N>,
322    ) {
323        // If the transmission ID already exists, then do not store it.
324        if self.contains_transmission(transmission_id) {
325            return;
326        }
327        // Ensure the transmission ID and transmission type matches.
328        let is_well_formed = match (&transmission_id, &transmission) {
329            (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
330            (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
331            // Note: We explicitly forbid inserting ratifications into the ready queue,
332            // as the protocol currently does not support ratifications.
333            (TransmissionID::Ratification, Transmission::Ratification) => false,
334            // All other combinations are clearly invalid.
335            _ => false,
336        };
337        // If the transmission is a deserialized execution, verify it immediately.
338        // This takes heavy transaction verification out of the hot path during block generation.
339        if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
340            (transmission_id, &transmission)
341            && tx.is_execute()
342        {
343            let self_ = self.clone();
344            let tx_ = tx.clone();
345            tokio::spawn(async move {
346                let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
347            });
348        }
349        // If the transmission ID and transmission type matches, then insert the transmission into the ready queue.
350        if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
351            trace!(
352                "Worker {} - Added transmission '{}' from '{peer_ip}'",
353                self.id,
354                self.format_transmission_id(transmission_id),
355            );
356        }
357    }
358
359    /// Handles the incoming unconfirmed solution.
360    /// Note: This method assumes the incoming solution is valid and does not exist in the ledger.
361    ///
362    /// # Returns
363    /// - `Ok(true)` if the solution was added to the ready queue.
364    /// - `Ok(false)` if the solution was valid but already exists in the ready queue.
365    /// - `Err(anyhow::Error)` if the solution is invalid.
366    pub(crate) async fn process_unconfirmed_solution(
367        &self,
368        solution_id: SolutionID<N>,
369        solution: Data<Solution<N>>,
370    ) -> Result<bool> {
371        // Construct the transmission.
372        let transmission = Transmission::Solution(solution.clone());
373        // Compute the checksum.
374        let checksum = solution.to_checksum::<N>()?;
375        // Construct the transmission ID.
376        let transmission_id = TransmissionID::Solution(solution_id, checksum);
377        // Remove the solution ID from the pending queue.
378        self.pending.remove(transmission_id, Some(transmission.clone()));
379        // Check if the solution exists.
380        if self.contains_transmission(transmission_id) {
381            return Ok(false);
382        }
383        // Check that the solution is well-formed and unique.
384        self.ledger.check_solution_basic(solution_id, solution).await?;
385        // Adds the solution to the ready queue.
386        if self.ready.write().insert(transmission_id, transmission) {
387            trace!(
388                "Worker {} - Added unconfirmed solution '{}'",
389                self.id,
390                self.format_transmission_id(transmission_id),
391            );
392        }
393        Ok(true)
394    }
395
396    /// Handles the incoming unconfirmed transaction.
397    ///
398    /// # Returns
399    /// - `Ok(true)` if the transaction was added to the ready queue.
400    /// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
401    /// - `Err(anyhow::Error)` if the transaction was invalid.
402    pub(crate) async fn process_unconfirmed_transaction(
403        &self,
404        transaction_id: N::TransactionID,
405        transaction: Data<Transaction<N>>,
406    ) -> Result<bool> {
407        // Construct the transmission.
408        let transmission = Transmission::Transaction(transaction.clone());
409        // Compute the checksum.
410        let checksum = transaction.to_checksum::<N>()?;
411        // Construct the transmission ID.
412        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
413        // Remove the transaction from the pending queue.
414        self.pending.remove(transmission_id, Some(transmission.clone()));
415        // Check if the transaction ID exists.
416        if self.contains_transmission(transmission_id) {
417            return Ok(false);
418        }
419        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
420        let transaction = spawn_blocking!({
421            match transaction {
422                Data::Object(transaction) => Ok(transaction),
423                Data::Buffer(bytes) => {
424                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64))?)
425                }
426            }
427        })?;
428
429        // Check that the transaction is well-formed and unique.
430        self.ledger.check_transaction_basic(transaction_id, transaction).await?;
431        // Adds the transaction to the ready queue.
432        if self.ready.write().insert(transmission_id, transmission) {
433            trace!(
434                "Worker {} - Added unconfirmed transaction '{}'",
435                self.id,
436                self.format_transmission_id(transmission_id),
437            );
438        }
439        Ok(true)
440    }
441}
442
443impl<N: Network> Worker<N> {
444    /// Starts the worker handlers.
445    fn start_handlers(&self, receiver: WorkerReceiver<N>) {
446        let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
447
448        // Start the pending queue expiration loop.
449        let self_ = self.clone();
450        self.spawn(async move {
451            loop {
452                // Sleep briefly.
453                tokio::time::sleep(MAX_FETCH_TIMEOUT).await;
454
455                // Remove the expired pending certificate requests.
456                let self__ = self_.clone();
457                let _ = spawn_blocking!({
458                    self__.pending.clear_expired_callbacks();
459                    Ok(())
460                });
461            }
462        });
463
464        // Process the ping events.
465        let self_ = self.clone();
466        self.spawn(async move {
467            while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
468                self_.process_transmission_id_from_ping(peer_ip, transmission_id);
469            }
470        });
471
472        // Process the transmission requests.
473        let self_ = self.clone();
474        self.spawn(async move {
475            while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
476                self_.send_transmission_response(peer_ip, transmission_request);
477            }
478        });
479
480        // Process the transmission responses.
481        let self_ = self.clone();
482        self.spawn(async move {
483            while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
484                // Process the transmission response.
485                let self__ = self_.clone();
486                let _ = spawn_blocking!({
487                    self__.finish_transmission_request(peer_ip, transmission_response);
488                    Ok(())
489                });
490            }
491        });
492    }
493
494    /// Sends a transmission request to the specified peer.
495    async fn send_transmission_request(
496        &self,
497        peer_ip: SocketAddr,
498        transmission_id: TransmissionID<N>,
499    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
500        // Initialize a oneshot channel.
501        let (callback_sender, callback_receiver) = oneshot::channel();
502        // Determine how many sent requests are pending.
503        let num_sent_requests = self.pending.num_sent_requests(transmission_id);
504        // Determine if we've already sent a request to the peer.
505        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
506        // Determine the maximum number of redundant requests.
507        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
508        // Establish whether the peers who already got the request collectively hold sufficient stake.
509        #[cfg(test)]
510        let stake_redundancy_reached = || Ok::<_, anyhow::Error>(true);
511        #[cfg(not(test))]
512        let stake_redundancy_reached = || self.pending.request_stake_redundancy_reached(&self.gateway, transmission_id);
513        // Determine if we should send a transmission request to the peer.
514        // Each peer can only receive one request at a time.
515        // We send at most `num_redundant_requests` requests, unless the stake redundancy factor hasn't been reached.
516        let should_send_request = !contains_peer_with_sent_request
517            && (num_sent_requests < num_redundant_requests || !stake_redundancy_reached()?);
518
519        // Insert the transmission ID into the pending queue.
520        self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
521
522        // If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer.
523        if should_send_request {
524            trace!("Requesting transmission {} from peer '{peer_ip}'", self.format_transmission_id(transmission_id));
525            // Send the transmission request to the peer.
526            if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
527                bail!(
528                    "Unable to fetch transmission {} - failed to send request",
529                    self.format_transmission_id(transmission_id)
530                )
531            }
532        } else {
533            debug!(
534                "Skipped sending request for transmission {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
535                self.format_transmission_id(transmission_id)
536            );
537        }
538
539        // Wait for the transmission to be fetched.
540        let transmission = timeout(MAX_FETCH_TIMEOUT, callback_receiver)
541            .await
542            .with_context(|| {
543                format!("Unable to fetch transmission {} (timeout)", self.format_transmission_id(transmission_id))
544            })?
545            .with_context(|| {
546                format!("Unable to fetch transmission {}", self.format_transmission_id(transmission_id))
547            })?;
548
549        Ok((transmission_id, transmission))
550    }
551
552    /// Handles the incoming transmission response.
553    /// This method ensures the transmission response is well-formed and matches the transmission ID.
554    fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
555        let TransmissionResponse { transmission_id, mut transmission } = response;
556        // Check if the peer IP exists in the pending queue for the given transmission ID.
557        let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
558        // If the peer IP exists, finish the pending request.
559        if exists {
560            // Ensure the transmission is not a fee and matches the transmission ID.
561            match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
562                Ok(()) => {
563                    trace!(
564                        "Received valid transmission response from peer '{peer_ip}' for transmission '{}'",
565                        self.format_transmission_id(transmission_id)
566                    );
567                    // Remove the transmission ID from the pending queue.
568                    self.pending.remove(transmission_id, Some(transmission));
569                }
570                Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
571            };
572        }
573    }
574
575    /// Sends the requested transmission to the specified peer.
576    fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
577        let TransmissionRequest { transmission_id } = request;
578        // Attempt to retrieve the transmission.
579        if let Some(transmission) = self.get_transmission(transmission_id) {
580            // Send the transmission response to the peer.
581            let self_ = self.clone();
582            tokio::spawn(async move {
583                self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
584            });
585        }
586    }
587
588    /// Spawns a task with the given future; it should only be used for long-running tasks.
589    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
590        self.handles.lock().push(tokio::spawn(future));
591    }
592
593    /// Shuts down the worker.
594    pub(crate) fn shut_down(&self) {
595        trace!("Shutting down worker {}...", self.id);
596        // Abort the tasks.
597        self.handles.lock().iter().for_each(|handle| handle.abort());
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
605    use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService, LedgerUpdateService};
606    use snarkos_node_bft_storage_service::BFTMemoryService;
607    use snarkvm::{
608        console::{network::Network, types::Field},
609        ledger::{
610            Block,
611            CheckBlockError,
612            PendingBlock,
613            committee::Committee,
614            narwhal::{BatchCertificate, Transmission, TransmissionID},
615            test_helpers::sample_execution_transaction_with_fee,
616        },
617        prelude::Address,
618    };
619
620    use bytes::Bytes;
621    use mockall::mock;
622    use rand::RngExt;
623    use std::{io, ops::Range, time::Duration};
624
625    type CurrentNetwork = snarkvm::prelude::MainnetV0;
626
627    const ITERATIONS: usize = 100;
628
629    mock! {
630        Gateway<N: Network> {}
631        #[async_trait]
632        impl<N:Network> Transport<N> for Gateway<N> {
633            fn broadcast(&self, event: Event<N>);
634            async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
635        }
636    }
637
638    mock! {
639        #[derive(Debug)]
640        Ledger<N: Network> {}
641        #[async_trait]
642        impl<N: Network> LedgerService<N> for Ledger<N> {
643            fn latest_round(&self) -> u64;
644            fn latest_block_height(&self) -> u32;
645            fn latest_block(&self) -> Block<N>;
646            fn latest_restrictions_id(&self) -> Field<N>;
647            fn latest_leader(&self) -> Option<(u64, Address<N>)>;
648            fn update_latest_leader(&self, round: u64, leader: Address<N>);
649            fn contains_block_height(&self, height: u32) -> bool;
650            fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
651            fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
652            fn get_block_round(&self, height: u32) -> Result<u64>;
653            fn get_block(&self, height: u32) -> Result<Block<N>>;
654            fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
655            fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Option<Solution<N>>>;
656            fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Option<Transaction<N>>>;
657            fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
658            fn current_committee(&self) -> Result<Committee<N>>;
659            fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
660            fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
661            fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
662            fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
663            fn ensure_transmission_is_well_formed(
664                &self,
665                transmission_id: TransmissionID<N>,
666                transmission: &mut Transmission<N>,
667            ) -> Result<()>;
668            async fn check_solution_basic(
669                &self,
670                solution_id: SolutionID<N>,
671                solution: Data<Solution<N>>,
672            ) -> Result<()>;
673            async fn check_transaction_basic(
674                &self,
675                transaction_id: N::TransactionID,
676                transaction: Transaction<N>,
677            ) -> Result<()>;
678            fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>, CheckBlockError<N>>;
679            fn begin_ledger_update<'a>(&'a self) -> Result<Box<dyn LedgerUpdateService<N> + 'a>, BeginLedgerUpdateError>;
680            fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
681            fn is_stopped(&self) -> bool;
682        }
683    }
684
685    #[tokio::test]
686    async fn test_max_redundant_requests() {
687        let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
688
689        let rng = &mut TestRng::default();
690        // Sample a committee.
691        let committee =
692            snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
693        let committee_clone = committee.clone();
694        // Setup the mock ledger.
695        let mut mock_ledger = MockLedger::default();
696        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
697        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
698        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
699        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
700        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
701
702        // Ensure the maximum number of redundant requests is correct and consistent across iterations.
703        assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
704    }
705
706    #[tokio::test]
707    async fn test_process_transmission() {
708        let rng = &mut TestRng::default();
709        // Sample a committee.
710        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
711        let committee_clone = committee.clone();
712        // Setup the mock gateway and ledger.
713        let gateway = MockGateway::default();
714        let mut mock_ledger = MockLedger::default();
715        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
716        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
717        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
718        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
719        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
720        // Initialize the storage.
721        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
722
723        // Create the Worker.
724        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
725        let data =
726            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
727        let transmission_id = TransmissionID::Solution(
728            rng.random::<u64>().into(),
729            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
730        );
731        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
732        let transmission = Transmission::Solution(data(rng));
733
734        // Process the transmission.
735        worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
736        assert!(worker.contains_transmission(transmission_id));
737        assert!(worker.ready.read().contains(transmission_id));
738        assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
739        // Take the transmission from the ready set.
740        assert!(worker.ready.write().remove_front().is_some());
741        assert!(!worker.ready.read().contains(transmission_id));
742    }
743
744    #[tokio::test]
745    async fn test_send_transmission() {
746        let rng = &mut TestRng::default();
747        // Sample a committee.
748        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
749        let committee_clone = committee.clone();
750        // Setup the mock gateway and ledger.
751        let mut gateway = MockGateway::default();
752        gateway.expect_send().returning(|_, _| {
753            let (_tx, rx) = oneshot::channel();
754            Some(rx)
755        });
756        let mut mock_ledger = MockLedger::default();
757        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
758        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
759        mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
760        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
761        // Initialize the storage.
762        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
763
764        // Create the Worker.
765        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
766        let transmission_id = TransmissionID::Solution(
767            rng.random::<u64>().into(),
768            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
769        );
770        let worker_ = worker.clone();
771        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
772        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
773        assert!(worker.pending.contains(transmission_id));
774        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
775        // Fake the transmission response.
776        worker.finish_transmission_request(peer_ip, TransmissionResponse {
777            transmission_id,
778            transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
779        });
780        // Check the transmission was removed from the pending set.
781        assert!(!worker.pending.contains(transmission_id));
782    }
783
784    #[tokio::test]
785    async fn test_process_solution_ok() {
786        let rng = &mut TestRng::default();
787        // Sample a committee.
788        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
789        let committee_clone = committee.clone();
790        // Setup the mock gateway and ledger.
791        let mut gateway = MockGateway::default();
792        gateway.expect_send().returning(|_, _| {
793            let (_tx, rx) = oneshot::channel();
794            Some(rx)
795        });
796        let mut mock_ledger = MockLedger::default();
797        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
798        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
799        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
800        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
801        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
802        // Initialize the storage.
803        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
804
805        // Create the Worker.
806        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
807        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
808        let solution_id = rng.random::<u64>().into();
809        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
810        let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
811        let worker_ = worker.clone();
812        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
813        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
814        assert!(worker.pending.contains(transmission_id));
815        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
816        assert!(result.is_ok());
817        assert!(!worker.pending.contains(transmission_id));
818        assert!(worker.ready.read().contains(transmission_id));
819    }
820
821    #[tokio::test]
822    async fn test_process_solution_nok() {
823        let rng = &mut TestRng::default();
824        // Sample a committee.
825        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
826        let committee_clone = committee.clone();
827        // Setup the mock gateway and ledger.
828        let mut gateway = MockGateway::default();
829        gateway.expect_send().returning(|_, _| {
830            let (_tx, rx) = oneshot::channel();
831            Some(rx)
832        });
833        let mut mock_ledger = MockLedger::default();
834        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
835        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
836        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
837        mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
838        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
839        // Initialize the storage.
840        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
841
842        // Create the Worker.
843        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
844        let solution_id = rng.random::<u64>().into();
845        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
846        let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
847        let transmission_id = TransmissionID::Solution(solution_id, checksum);
848        let worker_ = worker.clone();
849        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
850        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
851        assert!(worker.pending.contains(transmission_id));
852        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
853        assert!(result.is_err());
854        assert!(!worker.pending.contains(transmission_id));
855        assert!(!worker.ready.read().contains(transmission_id));
856    }
857
858    #[tokio::test]
859    async fn test_process_transaction_ok() {
860        let rng = &mut TestRng::default();
861        // Sample a committee.
862        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
863        let committee_clone = committee.clone();
864        // Setup the mock gateway and ledger.
865        let mut gateway = MockGateway::default();
866        gateway.expect_send().returning(|_, _| {
867            let (_tx, rx) = oneshot::channel();
868            Some(rx)
869        });
870        let mut mock_ledger = MockLedger::default();
871        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
872        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
873        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
874        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
875        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
876        // Initialize the storage.
877        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
878
879        // Create the Worker.
880        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
881        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
882        let transaction_id = transaction.id();
883        let transaction_data = Data::Object(transaction);
884        let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
885        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
886        let worker_ = worker.clone();
887        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
888        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
889        assert!(worker.pending.contains(transmission_id));
890        let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
891        assert!(result.is_ok());
892        assert!(!worker.pending.contains(transmission_id));
893        assert!(worker.ready.read().contains(transmission_id));
894    }
895
896    #[tokio::test]
897    async fn test_process_transaction_nok() {
898        let mut rng = &mut TestRng::default();
899        // Sample a committee.
900        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
901        let committee_clone = committee.clone();
902        // Setup the mock gateway and ledger.
903        let mut gateway = MockGateway::default();
904        gateway.expect_send().returning(|_, _| {
905            let (_tx, rx) = oneshot::channel();
906            Some(rx)
907        });
908        let mut mock_ledger = MockLedger::default();
909        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
910        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
911        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
912        mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
913        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
914        // Initialize the storage.
915        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
916
917        // Create the Worker.
918        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
919        let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
920        let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
921        let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
922        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
923        let worker_ = worker.clone();
924        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
925        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
926        assert!(worker.pending.contains(transmission_id));
927        let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
928        assert!(result.is_err());
929        assert!(!worker.pending.contains(transmission_id));
930        assert!(!worker.ready.read().contains(transmission_id));
931    }
932
933    #[tokio::test]
934    async fn test_flood_transmission_requests() {
935        let rng = &mut TestRng::default();
936        // Sample a committee.
937        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
938        let committee_clone = committee.clone();
939        // Setup the mock gateway and ledger.
940        let mut gateway = MockGateway::default();
941        gateway.expect_send().returning(|_, _| {
942            let (_tx, rx) = oneshot::channel();
943            Some(rx)
944        });
945        let mut mock_ledger = MockLedger::default();
946        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
947        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
948        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
949        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
950        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
951        // Initialize the storage.
952        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
953
954        // Create the Worker.
955        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
956        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
957        let transaction_id = transaction.id();
958        let transaction_data = Data::Object(transaction);
959        let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
960        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
961
962        // Determine the number of redundant requests are sent.
963        let num_redundant_requests =
964            max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
965        let num_flood_requests = num_redundant_requests * 10;
966        let mut peer_ips =
967            (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
968        let first_peer_ip = peer_ips[0];
969
970        // Flood the pending queue with transmission requests.
971        for i in 1..=num_flood_requests {
972            let worker_ = worker.clone();
973            let peer_ip = peer_ips.pop().unwrap();
974            tokio::spawn(async move {
975                let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
976            });
977            tokio::time::sleep(Duration::from_millis(10)).await;
978            // Check that the number of sent requests does not exceed the maximum number of redundant requests.
979            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
980            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
981        }
982        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
983        assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
984        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
985
986        // Let all the requests expire.
987        tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
988        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
989        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
990
991        // Flood the pending queue with transmission requests again, this time to a single peer
992        for i in 1..=num_flood_requests {
993            let worker_ = worker.clone();
994            tokio::spawn(async move {
995                let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
996            });
997            tokio::time::sleep(Duration::from_millis(10)).await;
998            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
999            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
1000        }
1001        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
1002        assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
1003        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
1004
1005        // Check that fulfilling a transmission request clears the pending queue.
1006        let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
1007        assert!(result.is_ok());
1008        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
1009        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
1010        assert!(!worker.pending.contains(transmission_id));
1011        assert!(worker.ready.read().contains(transmission_id));
1012    }
1013
1014    #[tokio::test]
1015    async fn test_storage_gc_on_initialization() {
1016        let rng = &mut TestRng::default();
1017
1018        for _ in 0..ITERATIONS {
1019            // Mock the ledger round.
1020            let max_gc_rounds = rng.random_range(50..=100);
1021            let latest_ledger_round = rng.random_range((max_gc_rounds + 1)..1000);
1022            let expected_gc_round = latest_ledger_round - max_gc_rounds;
1023
1024            // Sample a committee.
1025            let committee =
1026                snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
1027
1028            // Setup the mock gateway and ledger.
1029            let mut mock_ledger = MockLedger::default();
1030            mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
1031
1032            let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
1033            // Initialize the storage.
1034            let storage =
1035                Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds)
1036                    .unwrap();
1037
1038            // Ensure that the storage GC round is correct.
1039            assert_eq!(storage.gc_round(), expected_gc_round);
1040        }
1041    }
1042}
1043
1044#[cfg(test)]
1045mod prop_tests {
1046    use super::*;
1047    use crate::Gateway;
1048    use snarkos_node_bft_ledger_service::MockLedgerService;
1049    use snarkvm::{
1050        console::account::Address,
1051        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1052    };
1053
1054    use rand::RngExt;
1055    use test_strategy::proptest;
1056
1057    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1058
1059    // Initializes a new test committee.
1060    fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1061        let mut members = IndexMap::with_capacity(n as usize);
1062        for i in 0..n {
1063            // Sample the address.
1064            let rng = &mut TestRng::fixed(i as u64);
1065            let address = Address::new(rng.random());
1066            info!("Validator {i}: {address}");
1067            members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.random_range(0..100)));
1068        }
1069        // Initialize the committee.
1070        Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1071    }
1072
1073    #[proptest]
1074    fn worker_initialization(
1075        #[strategy(0..MAX_WORKERS)] id: u8,
1076        gateway: Gateway<CurrentNetwork>,
1077        storage: Storage<CurrentNetwork>,
1078    ) {
1079        let committee = new_test_committee(4);
1080        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1081        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1082        assert_eq!(worker.id(), id);
1083    }
1084
1085    #[proptest]
1086    fn invalid_worker_id(
1087        #[strategy(MAX_WORKERS..)] id: u8,
1088        gateway: Gateway<CurrentNetwork>,
1089        storage: Storage<CurrentNetwork>,
1090    ) {
1091        let committee = new_test_committee(4);
1092        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1093        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1094        // TODO once Worker implements Debug, simplify this with `unwrap_err`
1095        if let Err(error) = worker {
1096            assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'"));
1097        }
1098    }
1099}