Skip to main content

snarkos_node_bft/
worker.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_FETCH_TIMEOUT_IN_MS,
18    MAX_WORKERS,
19    ProposedBatch,
20    Transport,
21    events::{Event, TransmissionRequest, TransmissionResponse},
22    helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23    spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27    console::prelude::*,
28    ledger::{
29        Transaction,
30        narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31        puzzle::{Solution, SolutionID},
32    },
33};
34
35use anyhow::Context;
36use colored::{ColoredString, Colorize};
37use indexmap::{IndexMap, IndexSet};
38#[cfg(feature = "locktick")]
39use locktick::parking_lot::{Mutex, RwLock};
40#[cfg(not(feature = "locktick"))]
41use parking_lot::{Mutex, RwLock};
42use rand::seq::IteratorRandom;
43use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
44use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
45
46/// A worker's main role is maintaining a queue of verified ("ready") transmissions,
47/// which will eventually be fetched by the primary when the primary generates a new batch.
48#[derive(Clone)]
49pub struct Worker<N: Network> {
50    /// The worker ID.
51    id: u8,
52    /// The gateway.
53    gateway: Arc<dyn Transport<N>>,
54    /// The storage.
55    storage: Storage<N>,
56    /// The ledger service.
57    ledger: Arc<dyn LedgerService<N>>,
58    /// The proposed batch.
59    proposed_batch: Arc<ProposedBatch<N>>,
60    /// The ready queue.
61    ready: Arc<RwLock<Ready<N>>>,
62    /// The pending transmissions queue.
63    pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
64    /// The spawned handles.
65    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
66}
67
68impl<N: Network> Worker<N> {
69    /// Initializes a new worker instance.
70    pub fn new(
71        id: u8,
72        gateway: Arc<dyn Transport<N>>,
73        storage: Storage<N>,
74        ledger: Arc<dyn LedgerService<N>>,
75        proposed_batch: Arc<ProposedBatch<N>>,
76    ) -> Result<Self> {
77        // Ensure the worker ID is valid.
78        ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
79        // Return the worker.
80        Ok(Self {
81            id,
82            gateway,
83            storage,
84            ledger,
85            proposed_batch,
86            ready: Default::default(),
87            pending: Default::default(),
88            handles: Default::default(),
89        })
90    }
91
92    /// Run the worker instance.
93    pub fn run(&self, receiver: WorkerReceiver<N>) {
94        info!("Starting worker instance {} of the memory pool...", self.id);
95        // Start the worker handlers.
96        self.start_handlers(receiver);
97    }
98
99    /// Returns the worker ID.
100    pub const fn id(&self) -> u8 {
101        self.id
102    }
103
104    /// Returns a reference to the pending transmissions queue.
105    pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
106        &self.pending
107    }
108}
109
110impl<N: Network> Worker<N> {
111    /// The maximum number of transmissions allowed in a worker.
112    pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
113        BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
114    /// The maximum number of transmissions allowed in a worker ping.
115    pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
116
117    /// Returns the number of transmissions in the ready queue.
118    pub fn num_transmissions(&self) -> usize {
119        self.ready.read().num_transmissions()
120    }
121
122    /// Returns the number of ratifications in the ready queue.
123    pub fn num_ratifications(&self) -> usize {
124        self.ready.read().num_ratifications()
125    }
126
127    /// Returns the number of solutions in the ready queue.
128    pub fn num_solutions(&self) -> usize {
129        self.ready.read().num_solutions()
130    }
131
132    /// Returns the number of transactions in the ready queue.
133    pub fn num_transactions(&self) -> usize {
134        self.ready.read().num_transactions()
135    }
136}
137
138impl<N: Network> Worker<N> {
139    /// Returns the transmission IDs in the ready queue.
140    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
141        self.ready.read().transmission_ids()
142    }
143
144    /// Returns the transmissions in the ready queue.
145    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
146        self.ready.read().transmissions()
147    }
148
149    /// Returns the solutions in the ready queue.
150    pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
151        self.ready.read().solutions().into_iter()
152    }
153
154    /// Returns the transactions in the ready queue.
155    pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
156        self.ready.read().transactions().into_iter()
157    }
158}
159
160impl<N: Network> Worker<N> {
161    /// Clears the solutions from the ready queue.
162    pub(super) fn clear_solutions(&self) {
163        self.ready.write().clear_solutions()
164    }
165}
166
167impl<N: Network> Worker<N> {
168    // Helper to print the transmission ID and checksum (if any).
169    fn format_transmission_id(&self, transmission_id: TransmissionID<N>) -> ColoredString {
170        if let Some(checksum) = transmission_id.checksum() {
171            // fmt_id will suffix with `..`, so we should not use `.`  as a separator.
172            format!("{}:{}", fmt_id(transmission_id), fmt_id(checksum))
173        } else {
174            fmt_id(transmission_id)
175        }
176        .dimmed()
177    }
178
179    /// Returns `true` if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
180    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
181        let transmission_id = transmission_id.into();
182        // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
183        self.ready.read().contains(transmission_id)
184            || self.proposed_batch.read().as_ref().is_some_and(|p| p.contains_transmission(transmission_id))
185            || self.storage.contains_transmission(transmission_id)
186            || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
187    }
188
189    /// Returns the transmission if it exists in the ready queue, proposed batch, storage.
190    ///
191    /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
192    /// in the ledger are not guaranteed to be invalid for the current batch.
193    pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
194        // Check if the transmission ID exists in the ready queue.
195        if let Some(transmission) = self.ready.read().get(transmission_id) {
196            return Some(transmission);
197        }
198        // Check if the transmission ID exists in storage.
199        if let Some(transmission) = self.storage.get_transmission(transmission_id) {
200            return Some(transmission);
201        }
202        // Check if the transmission ID exists in the proposed batch.
203        if let Some(transmission) =
204            self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
205        {
206            return Some(transmission.clone());
207        }
208        None
209    }
210
211    /// Returns the transmissions if it exists in the worker, or requests it from the specified peer.
212    pub async fn get_or_fetch_transmission(
213        &self,
214        peer_ip: SocketAddr,
215        transmission_id: TransmissionID<N>,
216    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
217        // Attempt to get the transmission from the worker.
218        if let Some(transmission) = self.get_transmission(transmission_id) {
219            return Ok((transmission_id, transmission));
220        }
221        // Send a transmission request to the peer.
222        let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
223        // Ensure the transmission ID matches.
224        ensure!(candidate_id == transmission_id, "Invalid transmission ID");
225        // Return the transmission.
226        Ok((transmission_id, transmission))
227    }
228
229    /// Inserts the transmission at the front of the ready queue.
230    pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
231        self.ready.write().insert_front(key, value);
232    }
233
234    /// Removes and returns the transmission at the front of the ready queue.
235    pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
236        self.ready.write().remove_front()
237    }
238
239    /// Reinserts the specified transmission into the ready queue.
240    pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
241        // Check if the transmission ID exists.
242        if !self.contains_transmission(transmission_id) {
243            // Insert the transmission into the ready queue.
244            return self.ready.write().insert(transmission_id, transmission);
245        }
246        false
247    }
248
249    /// Broadcasts a worker ping event.
250    pub(crate) fn broadcast_ping(&self) {
251        // Retrieve the transmission IDs.
252        let transmission_ids = self
253            .ready
254            .read()
255            .transmission_ids()
256            .into_iter()
257            .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
258            .into_iter()
259            .collect::<IndexSet<_>>();
260
261        // Broadcast the ping event.
262        if !transmission_ids.is_empty() {
263            self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
264        }
265    }
266}
267
268impl<N: Network> Worker<N> {
269    /// Handles the incoming transmission ID from a worker ping event.
270    fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
271        // Check if the transmission ID exists.
272        if self.contains_transmission(transmission_id) {
273            return;
274        }
275        // If the ready queue is full, then skip this transmission.
276        // Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions.
277        if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
278            return;
279        }
280        // Attempt to fetch the transmission from the peer.
281        let self_ = self.clone();
282        tokio::spawn(async move {
283            // Send a transmission request to the peer.
284            match self_.send_transmission_request(peer_ip, transmission_id).await {
285                // If the transmission was fetched, then process it.
286                Ok((candidate_id, transmission)) => {
287                    // Ensure the transmission ID matches.
288                    if candidate_id == transmission_id {
289                        // Insert the transmission into the ready queue.
290                        // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched,
291                        // it could have already been inserted into the ready queue.
292                        self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
293                    }
294                }
295                // If the transmission was not fetched, then attempt to fetch it again.
296                Err(e) => {
297                    warn!(
298                        "Worker {} - Failed to fetch transmission '{}' from '{peer_ip}' (ping) - {e}",
299                        self_.id,
300                        self_.format_transmission_id(transmission_id),
301                    );
302                }
303            }
304        });
305    }
306
307    /// Handles the incoming transmission from a peer.
308    pub(crate) fn process_transmission_from_peer(
309        &self,
310        peer_ip: SocketAddr,
311        transmission_id: TransmissionID<N>,
312        transmission: Transmission<N>,
313    ) {
314        // If the transmission ID already exists, then do not store it.
315        if self.contains_transmission(transmission_id) {
316            return;
317        }
318        // Ensure the transmission ID and transmission type matches.
319        let is_well_formed = match (&transmission_id, &transmission) {
320            (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
321            (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
322            // Note: We explicitly forbid inserting ratifications into the ready queue,
323            // as the protocol currently does not support ratifications.
324            (TransmissionID::Ratification, Transmission::Ratification) => false,
325            // All other combinations are clearly invalid.
326            _ => false,
327        };
328        // If the transmission is a deserialized execution, verify it immediately.
329        // This takes heavy transaction verification out of the hot path during block generation.
330        if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
331            (transmission_id, &transmission)
332            && tx.is_execute()
333        {
334            let self_ = self.clone();
335            let tx_ = tx.clone();
336            tokio::spawn(async move {
337                let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
338            });
339        }
340        // If the transmission ID and transmission type matches, then insert the transmission into the ready queue.
341        if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
342            trace!(
343                "Worker {} - Added transmission '{}' from '{peer_ip}'",
344                self.id,
345                self.format_transmission_id(transmission_id),
346            );
347        }
348    }
349
350    /// Handles the incoming unconfirmed solution.
351    /// Note: This method assumes the incoming solution is valid and does not exist in the ledger.
352    ///
353    /// # Returns
354    /// - `Ok(true)` if the solution was added to the ready queue.
355    /// - `Ok(false)` if the solution was valid but already exists in the ready queue.
356    /// - `Err(anyhow::Error)` if the solution is invalid.
357    pub(crate) async fn process_unconfirmed_solution(
358        &self,
359        solution_id: SolutionID<N>,
360        solution: Data<Solution<N>>,
361    ) -> Result<bool> {
362        // Construct the transmission.
363        let transmission = Transmission::Solution(solution.clone());
364        // Compute the checksum.
365        let checksum = solution.to_checksum::<N>()?;
366        // Construct the transmission ID.
367        let transmission_id = TransmissionID::Solution(solution_id, checksum);
368        // Remove the solution ID from the pending queue.
369        self.pending.remove(transmission_id, Some(transmission.clone()));
370        // Check if the solution exists.
371        if self.contains_transmission(transmission_id) {
372            return Ok(false);
373        }
374        // Check that the solution is well-formed and unique.
375        self.ledger.check_solution_basic(solution_id, solution).await?;
376        // Adds the solution to the ready queue.
377        if self.ready.write().insert(transmission_id, transmission) {
378            trace!(
379                "Worker {} - Added unconfirmed solution '{}'",
380                self.id,
381                self.format_transmission_id(transmission_id),
382            );
383        }
384        Ok(true)
385    }
386
387    /// Handles the incoming unconfirmed transaction.
388    ///
389    /// # Returns
390    /// - `Ok(true)` if the transaction was added to the ready queue.
391    /// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
392    /// - `Err(anyhow::Error)` if the transaction was invalid.
393    pub(crate) async fn process_unconfirmed_transaction(
394        &self,
395        transaction_id: N::TransactionID,
396        transaction: Data<Transaction<N>>,
397    ) -> Result<bool> {
398        // Construct the transmission.
399        let transmission = Transmission::Transaction(transaction.clone());
400        // Compute the checksum.
401        let checksum = transaction.to_checksum::<N>()?;
402        // Construct the transmission ID.
403        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
404        // Remove the transaction from the pending queue.
405        self.pending.remove(transmission_id, Some(transmission.clone()));
406        // Check if the transaction ID exists.
407        if self.contains_transmission(transmission_id) {
408            return Ok(false);
409        }
410        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
411        let transaction = spawn_blocking!({
412            match transaction {
413                Data::Object(transaction) => Ok(transaction),
414                Data::Buffer(bytes) => {
415                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64))?)
416                }
417            }
418        })?;
419
420        // Check that the transaction is well-formed and unique.
421        self.ledger.check_transaction_basic(transaction_id, transaction).await?;
422        // Adds the transaction to the ready queue.
423        if self.ready.write().insert(transmission_id, transmission) {
424            trace!(
425                "Worker {} - Added unconfirmed transaction '{}'",
426                self.id,
427                self.format_transmission_id(transmission_id),
428            );
429        }
430        Ok(true)
431    }
432}
433
434impl<N: Network> Worker<N> {
435    /// Starts the worker handlers.
436    fn start_handlers(&self, receiver: WorkerReceiver<N>) {
437        let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
438
439        // Start the pending queue expiration loop.
440        let self_ = self.clone();
441        self.spawn(async move {
442            loop {
443                // Sleep briefly.
444                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
445
446                // Remove the expired pending certificate requests.
447                let self__ = self_.clone();
448                let _ = spawn_blocking!({
449                    self__.pending.clear_expired_callbacks();
450                    Ok(())
451                });
452            }
453        });
454
455        // Process the ping events.
456        let self_ = self.clone();
457        self.spawn(async move {
458            while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
459                self_.process_transmission_id_from_ping(peer_ip, transmission_id);
460            }
461        });
462
463        // Process the transmission requests.
464        let self_ = self.clone();
465        self.spawn(async move {
466            while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
467                self_.send_transmission_response(peer_ip, transmission_request);
468            }
469        });
470
471        // Process the transmission responses.
472        let self_ = self.clone();
473        self.spawn(async move {
474            while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
475                // Process the transmission response.
476                let self__ = self_.clone();
477                let _ = spawn_blocking!({
478                    self__.finish_transmission_request(peer_ip, transmission_response);
479                    Ok(())
480                });
481            }
482        });
483    }
484
485    /// Sends a transmission request to the specified peer.
486    async fn send_transmission_request(
487        &self,
488        peer_ip: SocketAddr,
489        transmission_id: TransmissionID<N>,
490    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
491        // Initialize a oneshot channel.
492        let (callback_sender, callback_receiver) = oneshot::channel();
493        // Determine how many sent requests are pending.
494        let num_sent_requests = self.pending.num_sent_requests(transmission_id);
495        // Determine if we've already sent a request to the peer.
496        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
497        // Determine the maximum number of redundant requests.
498        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
499        // Determine if we should send a transmission request to the peer.
500        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
501        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
502
503        // Insert the transmission ID into the pending queue.
504        self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
505
506        // If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer.
507        if should_send_request {
508            trace!("Requesting transmission {} from peer '{peer_ip}'", self.format_transmission_id(transmission_id));
509            // Send the transmission request to the peer.
510            if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
511                bail!(
512                    "Unable to fetch transmission {} - failed to send request",
513                    self.format_transmission_id(transmission_id)
514                )
515            }
516        } else {
517            debug!(
518                "Skipped sending request for transmission {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
519                self.format_transmission_id(transmission_id)
520            );
521        }
522        // Wait for the transmission to be fetched.
523
524        let transmission = timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
525            .await
526            .with_context(|| {
527                format!("Unable to fetch transmission {} (timeout)", self.format_transmission_id(transmission_id))
528            })?
529            .with_context(|| {
530                format!("Unable to fetch transmission {}", self.format_transmission_id(transmission_id))
531            })?;
532
533        Ok((transmission_id, transmission))
534    }
535
536    /// Handles the incoming transmission response.
537    /// This method ensures the transmission response is well-formed and matches the transmission ID.
538    fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
539        let TransmissionResponse { transmission_id, mut transmission } = response;
540        // Check if the peer IP exists in the pending queue for the given transmission ID.
541        let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
542        // If the peer IP exists, finish the pending request.
543        if exists {
544            // Ensure the transmission is not a fee and matches the transmission ID.
545            match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
546                Ok(()) => {
547                    trace!(
548                        "Received valid transmission response from peer '{peer_ip}' for transmission '{}'",
549                        self.format_transmission_id(transmission_id)
550                    );
551                    // Remove the transmission ID from the pending queue.
552                    self.pending.remove(transmission_id, Some(transmission));
553                }
554                Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
555            };
556        }
557    }
558
559    /// Sends the requested transmission to the specified peer.
560    fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
561        let TransmissionRequest { transmission_id } = request;
562        // Attempt to retrieve the transmission.
563        if let Some(transmission) = self.get_transmission(transmission_id) {
564            // Send the transmission response to the peer.
565            let self_ = self.clone();
566            tokio::spawn(async move {
567                self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
568            });
569        }
570    }
571
572    /// Spawns a task with the given future; it should only be used for long-running tasks.
573    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
574        self.handles.lock().push(tokio::spawn(future));
575    }
576
577    /// Shuts down the worker.
578    pub(crate) fn shut_down(&self) {
579        trace!("Shutting down worker {}...", self.id);
580        // Abort the tasks.
581        self.handles.lock().iter().for_each(|handle| handle.abort());
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
589    use snarkos_node_bft_ledger_service::LedgerService;
590    use snarkos_node_bft_storage_service::BFTMemoryService;
591    use snarkvm::{
592        console::{network::Network, types::Field},
593        ledger::{
594            Block,
595            CheckBlockError,
596            PendingBlock,
597            committee::Committee,
598            narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
599            test_helpers::sample_execution_transaction_with_fee,
600        },
601        prelude::Address,
602    };
603
604    use bytes::Bytes;
605    use indexmap::IndexMap;
606    use mockall::mock;
607    use std::{io, ops::Range};
608
609    type CurrentNetwork = snarkvm::prelude::MainnetV0;
610
611    const ITERATIONS: usize = 100;
612
613    mock! {
614        Gateway<N: Network> {}
615        #[async_trait]
616        impl<N:Network> Transport<N> for Gateway<N> {
617            fn broadcast(&self, event: Event<N>);
618            async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
619        }
620    }
621
622    mock! {
623        #[derive(Debug)]
624        Ledger<N: Network> {}
625        #[async_trait]
626        impl<N: Network> LedgerService<N> for Ledger<N> {
627            fn latest_round(&self) -> u64;
628            fn latest_block_height(&self) -> u32;
629            fn latest_block(&self) -> Block<N>;
630            fn latest_restrictions_id(&self) -> Field<N>;
631            fn latest_leader(&self) -> Option<(u64, Address<N>)>;
632            fn update_latest_leader(&self, round: u64, leader: Address<N>);
633            fn contains_block_height(&self, height: u32) -> bool;
634            fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
635            fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
636            fn get_block_round(&self, height: u32) -> Result<u64>;
637            fn get_block(&self, height: u32) -> Result<Block<N>>;
638            fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
639            fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
640            fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
641            fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
642            fn current_committee(&self) -> Result<Committee<N>>;
643            fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
644            fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
645            fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
646            fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
647            fn ensure_transmission_is_well_formed(
648                &self,
649                transmission_id: TransmissionID<N>,
650                transmission: &mut Transmission<N>,
651            ) -> Result<()>;
652            async fn check_solution_basic(
653                &self,
654                solution_id: SolutionID<N>,
655                solution: Data<Solution<N>>,
656            ) -> Result<()>;
657            async fn check_transaction_basic(
658                &self,
659                transaction_id: N::TransactionID,
660                transaction: Transaction<N>,
661            ) -> Result<()>;
662            fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> std::result::Result<PendingBlock<N>, CheckBlockError<N>>;
663            fn check_block_content(&self, _block: PendingBlock<N>) -> std::result::Result<Block<N>, CheckBlockError<N>>;
664            fn check_next_block(&self, block: &Block<N>) -> Result<()>;
665            fn prepare_advance_to_next_quorum_block(
666                &self,
667                subdag: Subdag<N>,
668                transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
669            ) -> Result<Block<N>>;
670            fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
671            fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
672        }
673    }
674
675    #[tokio::test]
676    async fn test_max_redundant_requests() {
677        let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
678
679        let rng = &mut TestRng::default();
680        // Sample a committee.
681        let committee =
682            snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
683        let committee_clone = committee.clone();
684        // Setup the mock ledger.
685        let mut mock_ledger = MockLedger::default();
686        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
687        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
688        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
689        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
690        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
691
692        // Ensure the maximum number of redundant requests is correct and consistent across iterations.
693        assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
694    }
695
696    #[tokio::test]
697    async fn test_process_transmission() {
698        let rng = &mut TestRng::default();
699        // Sample a committee.
700        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
701        let committee_clone = committee.clone();
702        // Setup the mock gateway and ledger.
703        let gateway = MockGateway::default();
704        let mut mock_ledger = MockLedger::default();
705        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
706        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
707        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
708        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
709        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
710        // Initialize the storage.
711        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
712
713        // Create the Worker.
714        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
715        let data =
716            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
717        let transmission_id = TransmissionID::Solution(
718            rng.r#gen::<u64>().into(),
719            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
720        );
721        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
722        let transmission = Transmission::Solution(data(rng));
723
724        // Process the transmission.
725        worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
726        assert!(worker.contains_transmission(transmission_id));
727        assert!(worker.ready.read().contains(transmission_id));
728        assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
729        // Take the transmission from the ready set.
730        assert!(worker.ready.write().remove_front().is_some());
731        assert!(!worker.ready.read().contains(transmission_id));
732    }
733
734    #[tokio::test]
735    async fn test_send_transmission() {
736        let rng = &mut TestRng::default();
737        // Sample a committee.
738        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
739        let committee_clone = committee.clone();
740        // Setup the mock gateway and ledger.
741        let mut gateway = MockGateway::default();
742        gateway.expect_send().returning(|_, _| {
743            let (_tx, rx) = oneshot::channel();
744            Some(rx)
745        });
746        let mut mock_ledger = MockLedger::default();
747        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
748        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
749        mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
750        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
751        // Initialize the storage.
752        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
753
754        // Create the Worker.
755        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
756        let transmission_id = TransmissionID::Solution(
757            rng.r#gen::<u64>().into(),
758            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
759        );
760        let worker_ = worker.clone();
761        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
762        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
763        assert!(worker.pending.contains(transmission_id));
764        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
765        // Fake the transmission response.
766        worker.finish_transmission_request(peer_ip, TransmissionResponse {
767            transmission_id,
768            transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
769        });
770        // Check the transmission was removed from the pending set.
771        assert!(!worker.pending.contains(transmission_id));
772    }
773
774    #[tokio::test]
775    async fn test_process_solution_ok() {
776        let rng = &mut TestRng::default();
777        // Sample a committee.
778        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
779        let committee_clone = committee.clone();
780        // Setup the mock gateway and ledger.
781        let mut gateway = MockGateway::default();
782        gateway.expect_send().returning(|_, _| {
783            let (_tx, rx) = oneshot::channel();
784            Some(rx)
785        });
786        let mut mock_ledger = MockLedger::default();
787        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
788        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
789        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
790        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
791        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
792        // Initialize the storage.
793        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
794
795        // Create the Worker.
796        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
797        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
798        let solution_id = rng.r#gen::<u64>().into();
799        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
800        let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
801        let worker_ = worker.clone();
802        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
803        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
804        assert!(worker.pending.contains(transmission_id));
805        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
806        assert!(result.is_ok());
807        assert!(!worker.pending.contains(transmission_id));
808        assert!(worker.ready.read().contains(transmission_id));
809    }
810
811    #[tokio::test]
812    async fn test_process_solution_nok() {
813        let rng = &mut TestRng::default();
814        // Sample a committee.
815        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
816        let committee_clone = committee.clone();
817        // Setup the mock gateway and ledger.
818        let mut gateway = MockGateway::default();
819        gateway.expect_send().returning(|_, _| {
820            let (_tx, rx) = oneshot::channel();
821            Some(rx)
822        });
823        let mut mock_ledger = MockLedger::default();
824        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
825        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
826        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
827        mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
828        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
829        // Initialize the storage.
830        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
831
832        // Create the Worker.
833        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
834        let solution_id = rng.r#gen::<u64>().into();
835        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
836        let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
837        let transmission_id = TransmissionID::Solution(solution_id, checksum);
838        let worker_ = worker.clone();
839        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
840        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
841        assert!(worker.pending.contains(transmission_id));
842        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
843        assert!(result.is_err());
844        assert!(!worker.pending.contains(transmission_id));
845        assert!(!worker.ready.read().contains(transmission_id));
846    }
847
848    #[tokio::test]
849    async fn test_process_transaction_ok() {
850        let rng = &mut TestRng::default();
851        // Sample a committee.
852        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
853        let committee_clone = committee.clone();
854        // Setup the mock gateway and ledger.
855        let mut gateway = MockGateway::default();
856        gateway.expect_send().returning(|_, _| {
857            let (_tx, rx) = oneshot::channel();
858            Some(rx)
859        });
860        let mut mock_ledger = MockLedger::default();
861        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
862        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
863        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
864        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
865        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
866        // Initialize the storage.
867        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
868
869        // Create the Worker.
870        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
871        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
872        let transaction_id = transaction.id();
873        let transaction_data = Data::Object(transaction);
874        let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
875        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
876        let worker_ = worker.clone();
877        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
878        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
879        assert!(worker.pending.contains(transmission_id));
880        let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
881        assert!(result.is_ok());
882        assert!(!worker.pending.contains(transmission_id));
883        assert!(worker.ready.read().contains(transmission_id));
884    }
885
886    #[tokio::test]
887    async fn test_process_transaction_nok() {
888        let mut rng = &mut TestRng::default();
889        // Sample a committee.
890        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
891        let committee_clone = committee.clone();
892        // Setup the mock gateway and ledger.
893        let mut gateway = MockGateway::default();
894        gateway.expect_send().returning(|_, _| {
895            let (_tx, rx) = oneshot::channel();
896            Some(rx)
897        });
898        let mut mock_ledger = MockLedger::default();
899        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
900        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
901        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
902        mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
903        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
904        // Initialize the storage.
905        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
906
907        // Create the Worker.
908        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
909        let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
910        let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
911        let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
912        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
913        let worker_ = worker.clone();
914        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
915        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
916        assert!(worker.pending.contains(transmission_id));
917        let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
918        assert!(result.is_err());
919        assert!(!worker.pending.contains(transmission_id));
920        assert!(!worker.ready.read().contains(transmission_id));
921    }
922
923    #[tokio::test]
924    async fn test_flood_transmission_requests() {
925        let rng = &mut TestRng::default();
926        // Sample a committee.
927        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
928        let committee_clone = committee.clone();
929        // Setup the mock gateway and ledger.
930        let mut gateway = MockGateway::default();
931        gateway.expect_send().returning(|_, _| {
932            let (_tx, rx) = oneshot::channel();
933            Some(rx)
934        });
935        let mut mock_ledger = MockLedger::default();
936        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
937        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
938        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
939        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
940        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
941        // Initialize the storage.
942        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
943
944        // Create the Worker.
945        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
946        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
947        let transaction_id = transaction.id();
948        let transaction_data = Data::Object(transaction);
949        let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
950        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
951
952        // Determine the number of redundant requests are sent.
953        let num_redundant_requests =
954            max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
955        let num_flood_requests = num_redundant_requests * 10;
956        let mut peer_ips =
957            (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
958        let first_peer_ip = peer_ips[0];
959
960        // Flood the pending queue with transmission requests.
961        for i in 1..=num_flood_requests {
962            let worker_ = worker.clone();
963            let peer_ip = peer_ips.pop().unwrap();
964            tokio::spawn(async move {
965                let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
966            });
967            tokio::time::sleep(Duration::from_millis(10)).await;
968            // Check that the number of sent requests does not exceed the maximum number of redundant requests.
969            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
970            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
971        }
972        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
973        assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
974        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
975
976        // Let all the requests expire.
977        tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
978        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
979        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
980
981        // Flood the pending queue with transmission requests again, this time to a single peer
982        for i in 1..=num_flood_requests {
983            let worker_ = worker.clone();
984            tokio::spawn(async move {
985                let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
986            });
987            tokio::time::sleep(Duration::from_millis(10)).await;
988            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
989            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
990        }
991        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
992        assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
993        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
994
995        // Check that fulfilling a transmission request clears the pending queue.
996        let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
997        assert!(result.is_ok());
998        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
999        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
1000        assert!(!worker.pending.contains(transmission_id));
1001        assert!(worker.ready.read().contains(transmission_id));
1002    }
1003
1004    #[tokio::test]
1005    async fn test_storage_gc_on_initialization() {
1006        let rng = &mut TestRng::default();
1007
1008        for _ in 0..ITERATIONS {
1009            // Mock the ledger round.
1010            let max_gc_rounds = rng.gen_range(50..=100);
1011            let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
1012            let expected_gc_round = latest_ledger_round - max_gc_rounds;
1013
1014            // Sample a committee.
1015            let committee =
1016                snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
1017
1018            // Setup the mock gateway and ledger.
1019            let mut mock_ledger = MockLedger::default();
1020            mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
1021
1022            let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
1023            // Initialize the storage.
1024            let storage =
1025                Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1026
1027            // Ensure that the storage GC round is correct.
1028            assert_eq!(storage.gc_round(), expected_gc_round);
1029        }
1030    }
1031}
1032
1033#[cfg(test)]
1034mod prop_tests {
1035    use super::*;
1036    use crate::Gateway;
1037    use snarkos_node_bft_ledger_service::MockLedgerService;
1038    use snarkvm::{
1039        console::account::Address,
1040        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1041    };
1042
1043    use test_strategy::proptest;
1044
1045    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1046
1047    // Initializes a new test committee.
1048    fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1049        let mut members = IndexMap::with_capacity(n as usize);
1050        for i in 0..n {
1051            // Sample the address.
1052            let rng = &mut TestRng::fixed(i as u64);
1053            let address = Address::new(rng.r#gen());
1054            info!("Validator {i}: {address}");
1055            members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
1056        }
1057        // Initialize the committee.
1058        Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1059    }
1060
1061    #[proptest]
1062    fn worker_initialization(
1063        #[strategy(0..MAX_WORKERS)] id: u8,
1064        gateway: Gateway<CurrentNetwork>,
1065        storage: Storage<CurrentNetwork>,
1066    ) {
1067        let committee = new_test_committee(4);
1068        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1069        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1070        assert_eq!(worker.id(), id);
1071    }
1072
1073    #[proptest]
1074    fn invalid_worker_id(
1075        #[strategy(MAX_WORKERS..)] id: u8,
1076        gateway: Gateway<CurrentNetwork>,
1077        storage: Storage<CurrentNetwork>,
1078    ) {
1079        let committee = new_test_committee(4);
1080        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1081        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1082        // TODO once Worker implements Debug, simplify this with `unwrap_err`
1083        if let Err(error) = worker {
1084            assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'"));
1085        }
1086    }
1087}