snarkos_node_bft/
worker.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_FETCH_TIMEOUT_IN_MS,
18    MAX_WORKERS,
19    ProposedBatch,
20    Transport,
21    events::{Event, TransmissionRequest, TransmissionResponse},
22    helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23    spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27    console::prelude::*,
28    ledger::{
29        block::Transaction,
30        narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31        puzzle::{Solution, SolutionID},
32    },
33};
34
35use colored::Colorize;
36use indexmap::{IndexMap, IndexSet};
37#[cfg(feature = "locktick")]
38use locktick::parking_lot::{Mutex, RwLock};
39#[cfg(not(feature = "locktick"))]
40use parking_lot::{Mutex, RwLock};
41use rand::seq::IteratorRandom;
42use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
43use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
44
45#[derive(Clone)]
46pub struct Worker<N: Network> {
47    /// The worker ID.
48    id: u8,
49    /// The gateway.
50    gateway: Arc<dyn Transport<N>>,
51    /// The storage.
52    storage: Storage<N>,
53    /// The ledger service.
54    ledger: Arc<dyn LedgerService<N>>,
55    /// The proposed batch.
56    proposed_batch: Arc<ProposedBatch<N>>,
57    /// The ready queue.
58    ready: Arc<RwLock<Ready<N>>>,
59    /// The pending transmissions queue.
60    pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
61    /// The spawned handles.
62    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
63}
64
65impl<N: Network> Worker<N> {
66    /// Initializes a new worker instance.
67    pub fn new(
68        id: u8,
69        gateway: Arc<dyn Transport<N>>,
70        storage: Storage<N>,
71        ledger: Arc<dyn LedgerService<N>>,
72        proposed_batch: Arc<ProposedBatch<N>>,
73    ) -> Result<Self> {
74        // Ensure the worker ID is valid.
75        ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
76        // Return the worker.
77        Ok(Self {
78            id,
79            gateway,
80            storage,
81            ledger,
82            proposed_batch,
83            ready: Default::default(),
84            pending: Default::default(),
85            handles: Default::default(),
86        })
87    }
88
89    /// Run the worker instance.
90    pub fn run(&self, receiver: WorkerReceiver<N>) {
91        info!("Starting worker instance {} of the memory pool...", self.id);
92        // Start the worker handlers.
93        self.start_handlers(receiver);
94    }
95
96    /// Returns the worker ID.
97    pub const fn id(&self) -> u8 {
98        self.id
99    }
100
101    /// Returns a reference to the pending transmissions queue.
102    pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
103        &self.pending
104    }
105}
106
107impl<N: Network> Worker<N> {
108    /// The maximum number of transmissions allowed in a worker.
109    pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
110        BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
111    /// The maximum number of transmissions allowed in a worker ping.
112    pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
113
114    // transmissions
115
116    /// Returns the number of transmissions in the ready queue.
117    pub fn num_transmissions(&self) -> usize {
118        self.ready.read().num_transmissions()
119    }
120
121    /// Returns the number of ratifications in the ready queue.
122    pub fn num_ratifications(&self) -> usize {
123        self.ready.read().num_ratifications()
124    }
125
126    /// Returns the number of solutions in the ready queue.
127    pub fn num_solutions(&self) -> usize {
128        self.ready.read().num_solutions()
129    }
130
131    /// Returns the number of transactions in the ready queue.
132    pub fn num_transactions(&self) -> usize {
133        self.ready.read().num_transactions()
134    }
135}
136
137impl<N: Network> Worker<N> {
138    /// Returns the transmission IDs in the ready queue.
139    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
140        self.ready.read().transmission_ids()
141    }
142
143    /// Returns the transmissions in the ready queue.
144    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
145        self.ready.read().transmissions()
146    }
147
148    /// Returns the solutions in the ready queue.
149    pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
150        self.ready.read().solutions().into_iter()
151    }
152
153    /// Returns the transactions in the ready queue.
154    pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
155        self.ready.read().transactions().into_iter()
156    }
157}
158
159impl<N: Network> Worker<N> {
160    /// Clears the solutions from the ready queue.
161    pub(super) fn clear_solutions(&self) {
162        self.ready.write().clear_solutions()
163    }
164}
165
166impl<N: Network> Worker<N> {
167    /// Returns `true` if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
168    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
169        let transmission_id = transmission_id.into();
170        // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
171        self.ready.read().contains(transmission_id)
172            || self.proposed_batch.read().as_ref().map_or(false, |p| p.contains_transmission(transmission_id))
173            || self.storage.contains_transmission(transmission_id)
174            || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
175    }
176
177    /// Returns the transmission if it exists in the ready queue, proposed batch, storage.
178    ///
179    /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
180    /// in the ledger are not guaranteed to be invalid for the current batch.
181    pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
182        // Check if the transmission ID exists in the ready queue.
183        if let Some(transmission) = self.ready.read().get(transmission_id) {
184            return Some(transmission);
185        }
186        // Check if the transmission ID exists in storage.
187        if let Some(transmission) = self.storage.get_transmission(transmission_id) {
188            return Some(transmission);
189        }
190        // Check if the transmission ID exists in the proposed batch.
191        if let Some(transmission) =
192            self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
193        {
194            return Some(transmission.clone());
195        }
196        None
197    }
198
199    /// Returns the transmissions if it exists in the worker, or requests it from the specified peer.
200    pub async fn get_or_fetch_transmission(
201        &self,
202        peer_ip: SocketAddr,
203        transmission_id: TransmissionID<N>,
204    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
205        // Attempt to get the transmission from the worker.
206        if let Some(transmission) = self.get_transmission(transmission_id) {
207            return Ok((transmission_id, transmission));
208        }
209        // Send a transmission request to the peer.
210        let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
211        // Ensure the transmission ID matches.
212        ensure!(candidate_id == transmission_id, "Invalid transmission ID");
213        // Return the transmission.
214        Ok((transmission_id, transmission))
215    }
216
217    /// Inserts the transmission at the front of the ready queue.
218    pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
219        self.ready.write().insert_front(key, value);
220    }
221
222    /// Removes and returns the transmission at the front of the ready queue.
223    pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
224        self.ready.write().remove_front()
225    }
226
227    /// Reinserts the specified transmission into the ready queue.
228    pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
229        // Check if the transmission ID exists.
230        if !self.contains_transmission(transmission_id) {
231            // Insert the transmission into the ready queue.
232            return self.ready.write().insert(transmission_id, transmission);
233        }
234        false
235    }
236
237    /// Broadcasts a worker ping event.
238    pub(crate) fn broadcast_ping(&self) {
239        // Retrieve the transmission IDs.
240        let transmission_ids = self
241            .ready
242            .read()
243            .transmission_ids()
244            .into_iter()
245            .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
246            .into_iter()
247            .collect::<IndexSet<_>>();
248
249        // Broadcast the ping event.
250        if !transmission_ids.is_empty() {
251            self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
252        }
253    }
254}
255
256impl<N: Network> Worker<N> {
257    /// Handles the incoming transmission ID from a worker ping event.
258    fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
259        // Check if the transmission ID exists.
260        if self.contains_transmission(transmission_id) {
261            return;
262        }
263        // If the ready queue is full, then skip this transmission.
264        // Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions.
265        if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
266            return;
267        }
268        // Attempt to fetch the transmission from the peer.
269        let self_ = self.clone();
270        tokio::spawn(async move {
271            // Send a transmission request to the peer.
272            match self_.send_transmission_request(peer_ip, transmission_id).await {
273                // If the transmission was fetched, then process it.
274                Ok((candidate_id, transmission)) => {
275                    // Ensure the transmission ID matches.
276                    if candidate_id == transmission_id {
277                        // Insert the transmission into the ready queue.
278                        // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched,
279                        // it could have already been inserted into the ready queue.
280                        self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
281                    }
282                }
283                // If the transmission was not fetched, then attempt to fetch it again.
284                Err(e) => {
285                    warn!(
286                        "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
287                        self_.id,
288                        fmt_id(transmission_id),
289                        fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
290                    );
291                }
292            }
293        });
294    }
295
296    /// Handles the incoming transmission from a peer.
297    pub(crate) fn process_transmission_from_peer(
298        &self,
299        peer_ip: SocketAddr,
300        transmission_id: TransmissionID<N>,
301        transmission: Transmission<N>,
302    ) {
303        // If the transmission ID already exists, then do not store it.
304        if self.contains_transmission(transmission_id) {
305            return;
306        }
307        // Ensure the transmission ID and transmission type matches.
308        let is_well_formed = match (&transmission_id, &transmission) {
309            (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
310            (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
311            // Note: We explicitly forbid inserting ratifications into the ready queue,
312            // as the protocol currently does not support ratifications.
313            (TransmissionID::Ratification, Transmission::Ratification) => false,
314            // All other combinations are clearly invalid.
315            _ => false,
316        };
317        // If the transmission is a deserialized execution, verify it immediately.
318        // This takes heavy transaction verification out of the hot path during block generation.
319        if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
320            (transmission_id, &transmission)
321        {
322            if tx.is_execute() {
323                let self_ = self.clone();
324                let tx_ = tx.clone();
325                tokio::spawn(async move {
326                    let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
327                });
328            }
329        }
330        // If the transmission ID and transmission type matches, then insert the transmission into the ready queue.
331        if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
332            trace!(
333                "Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
334                self.id,
335                fmt_id(transmission_id),
336                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
337            );
338        }
339    }
340
341    /// Handles the incoming unconfirmed solution.
342    /// Note: This method assumes the incoming solution is valid and does not exist in the ledger.
343    pub(crate) async fn process_unconfirmed_solution(
344        &self,
345        solution_id: SolutionID<N>,
346        solution: Data<Solution<N>>,
347    ) -> Result<()> {
348        // Construct the transmission.
349        let transmission = Transmission::Solution(solution.clone());
350        // Compute the checksum.
351        let checksum = solution.to_checksum::<N>()?;
352        // Construct the transmission ID.
353        let transmission_id = TransmissionID::Solution(solution_id, checksum);
354        // Remove the solution ID from the pending queue.
355        self.pending.remove(transmission_id, Some(transmission.clone()));
356        // Check if the solution exists.
357        if self.contains_transmission(transmission_id) {
358            bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
359        }
360        // Check that the solution is well-formed and unique.
361        self.ledger.check_solution_basic(solution_id, solution).await?;
362        // Adds the solution to the ready queue.
363        if self.ready.write().insert(transmission_id, transmission) {
364            trace!(
365                "Worker {} - Added unconfirmed solution '{}.{}'",
366                self.id,
367                fmt_id(solution_id),
368                fmt_id(checksum).dimmed()
369            );
370        }
371        Ok(())
372    }
373
374    /// Handles the incoming unconfirmed transaction.
375    pub(crate) async fn process_unconfirmed_transaction(
376        &self,
377        transaction_id: N::TransactionID,
378        transaction: Data<Transaction<N>>,
379    ) -> Result<()> {
380        // Construct the transmission.
381        let transmission = Transmission::Transaction(transaction.clone());
382        // Compute the checksum.
383        let checksum = transaction.to_checksum::<N>()?;
384        // Construct the transmission ID.
385        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
386        // Remove the transaction from the pending queue.
387        self.pending.remove(transmission_id, Some(transmission.clone()));
388        // Check if the transaction ID exists.
389        if self.contains_transmission(transmission_id) {
390            bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
391        }
392        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
393        let transaction = spawn_blocking!({
394            match transaction {
395                Data::Object(transaction) => Ok(transaction),
396                Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?),
397            }
398        })?;
399
400        // Check that the transaction is well-formed and unique.
401        self.ledger.check_transaction_basic(transaction_id, transaction).await?;
402        // Adds the transaction to the ready queue.
403        if self.ready.write().insert(transmission_id, transmission) {
404            trace!(
405                "Worker {}.{} - Added unconfirmed transaction '{}'",
406                self.id,
407                fmt_id(transaction_id),
408                fmt_id(checksum).dimmed()
409            );
410        }
411        Ok(())
412    }
413}
414
415impl<N: Network> Worker<N> {
416    /// Starts the worker handlers.
417    fn start_handlers(&self, receiver: WorkerReceiver<N>) {
418        let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
419
420        // Start the pending queue expiration loop.
421        let self_ = self.clone();
422        self.spawn(async move {
423            loop {
424                // Sleep briefly.
425                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
426
427                // Remove the expired pending certificate requests.
428                let self__ = self_.clone();
429                let _ = spawn_blocking!({
430                    self__.pending.clear_expired_callbacks();
431                    Ok(())
432                });
433            }
434        });
435
436        // Process the ping events.
437        let self_ = self.clone();
438        self.spawn(async move {
439            while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
440                self_.process_transmission_id_from_ping(peer_ip, transmission_id);
441            }
442        });
443
444        // Process the transmission requests.
445        let self_ = self.clone();
446        self.spawn(async move {
447            while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
448                self_.send_transmission_response(peer_ip, transmission_request);
449            }
450        });
451
452        // Process the transmission responses.
453        let self_ = self.clone();
454        self.spawn(async move {
455            while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
456                // Process the transmission response.
457                let self__ = self_.clone();
458                let _ = spawn_blocking!({
459                    self__.finish_transmission_request(peer_ip, transmission_response);
460                    Ok(())
461                });
462            }
463        });
464    }
465
466    /// Sends a transmission request to the specified peer.
467    async fn send_transmission_request(
468        &self,
469        peer_ip: SocketAddr,
470        transmission_id: TransmissionID<N>,
471    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
472        // Initialize a oneshot channel.
473        let (callback_sender, callback_receiver) = oneshot::channel();
474        // Determine how many sent requests are pending.
475        let num_sent_requests = self.pending.num_sent_requests(transmission_id);
476        // Determine if we've already sent a request to the peer.
477        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
478        // Determine the maximum number of redundant requests.
479        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
480        // Determine if we should send a transmission request to the peer.
481        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
482        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
483
484        // Insert the transmission ID into the pending queue.
485        self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
486
487        // If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer.
488        if should_send_request {
489            // Send the transmission request to the peer.
490            if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
491                bail!("Unable to fetch transmission - failed to send request")
492            }
493        } else {
494            debug!(
495                "Skipped sending request for transmission {}.{} to '{peer_ip}' ({num_sent_requests} redundant requests)",
496                fmt_id(transmission_id),
497                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
498            );
499        }
500        // Wait for the transmission to be fetched.
501        match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
502            // If the transmission was fetched, return it.
503            Ok(result) => Ok((transmission_id, result?)),
504            // If the transmission was not fetched, return an error.
505            Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
506        }
507    }
508
509    /// Handles the incoming transmission response.
510    /// This method ensures the transmission response is well-formed and matches the transmission ID.
511    fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
512        let TransmissionResponse { transmission_id, mut transmission } = response;
513        // Check if the peer IP exists in the pending queue for the given transmission ID.
514        let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
515        // If the peer IP exists, finish the pending request.
516        if exists {
517            // Ensure the transmission is not a fee and matches the transmission ID.
518            match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
519                Ok(()) => {
520                    // Remove the transmission ID from the pending queue.
521                    self.pending.remove(transmission_id, Some(transmission));
522                }
523                Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
524            };
525        }
526    }
527
528    /// Sends the requested transmission to the specified peer.
529    fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
530        let TransmissionRequest { transmission_id } = request;
531        // Attempt to retrieve the transmission.
532        if let Some(transmission) = self.get_transmission(transmission_id) {
533            // Send the transmission response to the peer.
534            let self_ = self.clone();
535            tokio::spawn(async move {
536                self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
537            });
538        }
539    }
540
541    /// Spawns a task with the given future; it should only be used for long-running tasks.
542    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
543        self.handles.lock().push(tokio::spawn(future));
544    }
545
546    /// Shuts down the worker.
547    pub(crate) fn shut_down(&self) {
548        trace!("Shutting down worker {}...", self.id);
549        // Abort the tasks.
550        self.handles.lock().iter().for_each(|handle| handle.abort());
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
558    use snarkos_node_bft_ledger_service::LedgerService;
559    use snarkos_node_bft_storage_service::BFTMemoryService;
560    use snarkvm::{
561        console::{network::Network, types::Field},
562        ledger::{
563            block::Block,
564            committee::Committee,
565            ledger_test_helpers::sample_execution_transaction_with_fee,
566            narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
567        },
568        prelude::Address,
569    };
570
571    use bytes::Bytes;
572    use indexmap::IndexMap;
573    use mockall::mock;
574    use std::{io, ops::Range};
575
576    type CurrentNetwork = snarkvm::prelude::MainnetV0;
577
578    const ITERATIONS: usize = 100;
579
580    mock! {
581        Gateway<N: Network> {}
582        #[async_trait]
583        impl<N:Network> Transport<N> for Gateway<N> {
584            fn broadcast(&self, event: Event<N>);
585            async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
586        }
587    }
588
589    mock! {
590        #[derive(Debug)]
591        Ledger<N: Network> {}
592        #[async_trait]
593        impl<N: Network> LedgerService<N> for Ledger<N> {
594            fn latest_round(&self) -> u64;
595            fn latest_block_height(&self) -> u32;
596            fn latest_block(&self) -> Block<N>;
597            fn latest_restrictions_id(&self) -> Field<N>;
598            fn latest_leader(&self) -> Option<(u64, Address<N>)>;
599            fn update_latest_leader(&self, round: u64, leader: Address<N>);
600            fn contains_block_height(&self, height: u32) -> bool;
601            fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
602            fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
603            fn get_block_round(&self, height: u32) -> Result<u64>;
604            fn get_block(&self, height: u32) -> Result<Block<N>>;
605            fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
606            fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
607            fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
608            fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
609            fn current_committee(&self) -> Result<Committee<N>>;
610            fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
611            fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
612            fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
613            fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
614            fn ensure_transmission_is_well_formed(
615                &self,
616                transmission_id: TransmissionID<N>,
617                transmission: &mut Transmission<N>,
618            ) -> Result<()>;
619            async fn check_solution_basic(
620                &self,
621                solution_id: SolutionID<N>,
622                solution: Data<Solution<N>>,
623            ) -> Result<()>;
624            async fn check_transaction_basic(
625                &self,
626                transaction_id: N::TransactionID,
627                transaction: Transaction<N>,
628            ) -> Result<()>;
629            fn check_next_block(&self, block: &Block<N>) -> Result<()>;
630            fn prepare_advance_to_next_quorum_block(
631                &self,
632                subdag: Subdag<N>,
633                transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
634            ) -> Result<Block<N>>;
635            fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
636            fn transaction_spent_cost_in_microcredits(&self, transaction_id: N::TransactionID, transaction: Transaction<N>) -> Result<u64>;
637        }
638    }
639
640    #[tokio::test]
641    async fn test_max_redundant_requests() {
642        let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
643
644        let rng = &mut TestRng::default();
645        // Sample a committee.
646        let committee =
647            snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
648        let committee_clone = committee.clone();
649        // Setup the mock ledger.
650        let mut mock_ledger = MockLedger::default();
651        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
652        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
653        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
654        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
655        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
656
657        // Ensure the maximum number of redundant requests is correct and consistent across iterations.
658        assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
659    }
660
661    #[tokio::test]
662    async fn test_process_transmission() {
663        let rng = &mut TestRng::default();
664        // Sample a committee.
665        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
666        let committee_clone = committee.clone();
667        // Setup the mock gateway and ledger.
668        let gateway = MockGateway::default();
669        let mut mock_ledger = MockLedger::default();
670        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
671        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
672        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
673        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
674        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
675        // Initialize the storage.
676        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
677
678        // Create the Worker.
679        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
680        let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
681        let transmission_id = TransmissionID::Solution(
682            rng.gen::<u64>().into(),
683            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
684        );
685        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
686        let transmission = Transmission::Solution(data(rng));
687
688        // Process the transmission.
689        worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
690        assert!(worker.contains_transmission(transmission_id));
691        assert!(worker.ready.read().contains(transmission_id));
692        assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
693        // Take the transmission from the ready set.
694        assert!(worker.ready.write().remove_front().is_some());
695        assert!(!worker.ready.read().contains(transmission_id));
696    }
697
698    #[tokio::test]
699    async fn test_send_transmission() {
700        let rng = &mut TestRng::default();
701        // Sample a committee.
702        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
703        let committee_clone = committee.clone();
704        // Setup the mock gateway and ledger.
705        let mut gateway = MockGateway::default();
706        gateway.expect_send().returning(|_, _| {
707            let (_tx, rx) = oneshot::channel();
708            Some(rx)
709        });
710        let mut mock_ledger = MockLedger::default();
711        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
712        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
713        mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
714        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
715        // Initialize the storage.
716        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
717
718        // Create the Worker.
719        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
720        let transmission_id = TransmissionID::Solution(
721            rng.gen::<u64>().into(),
722            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
723        );
724        let worker_ = worker.clone();
725        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
726        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
727        assert!(worker.pending.contains(transmission_id));
728        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
729        // Fake the transmission response.
730        worker.finish_transmission_request(peer_ip, TransmissionResponse {
731            transmission_id,
732            transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
733        });
734        // Check the transmission was removed from the pending set.
735        assert!(!worker.pending.contains(transmission_id));
736    }
737
738    #[tokio::test]
739    async fn test_process_solution_ok() {
740        let rng = &mut TestRng::default();
741        // Sample a committee.
742        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
743        let committee_clone = committee.clone();
744        // Setup the mock gateway and ledger.
745        let mut gateway = MockGateway::default();
746        gateway.expect_send().returning(|_, _| {
747            let (_tx, rx) = oneshot::channel();
748            Some(rx)
749        });
750        let mut mock_ledger = MockLedger::default();
751        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
752        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
753        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
754        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
755        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
756        // Initialize the storage.
757        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
758
759        // Create the Worker.
760        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
761        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
762        let solution_id = rng.gen::<u64>().into();
763        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
764        let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
765        let worker_ = worker.clone();
766        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
767        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
768        assert!(worker.pending.contains(transmission_id));
769        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
770        assert!(result.is_ok());
771        assert!(!worker.pending.contains(transmission_id));
772        assert!(worker.ready.read().contains(transmission_id));
773    }
774
775    #[tokio::test]
776    async fn test_process_solution_nok() {
777        let rng = &mut TestRng::default();
778        // Sample a committee.
779        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
780        let committee_clone = committee.clone();
781        // Setup the mock gateway and ledger.
782        let mut gateway = MockGateway::default();
783        gateway.expect_send().returning(|_, _| {
784            let (_tx, rx) = oneshot::channel();
785            Some(rx)
786        });
787        let mut mock_ledger = MockLedger::default();
788        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
789        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
790        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
791        mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
792        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
793        // Initialize the storage.
794        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
795
796        // Create the Worker.
797        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
798        let solution_id = rng.gen::<u64>().into();
799        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
800        let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
801        let transmission_id = TransmissionID::Solution(solution_id, checksum);
802        let worker_ = worker.clone();
803        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
804        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
805        assert!(worker.pending.contains(transmission_id));
806        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
807        assert!(result.is_err());
808        assert!(!worker.pending.contains(transmission_id));
809        assert!(!worker.ready.read().contains(transmission_id));
810    }
811
812    #[tokio::test]
813    async fn test_process_transaction_ok() {
814        let rng = &mut TestRng::default();
815        // Sample a committee.
816        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
817        let committee_clone = committee.clone();
818        // Setup the mock gateway and ledger.
819        let mut gateway = MockGateway::default();
820        gateway.expect_send().returning(|_, _| {
821            let (_tx, rx) = oneshot::channel();
822            Some(rx)
823        });
824        let mut mock_ledger = MockLedger::default();
825        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
826        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
827        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
828        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
829        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
830        // Initialize the storage.
831        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
832
833        // Create the Worker.
834        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
835        let transaction = sample_execution_transaction_with_fee(false, rng);
836        let transaction_id = transaction.id();
837        let transaction_data = Data::Object(transaction);
838        let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
839        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
840        let worker_ = worker.clone();
841        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
842        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
843        assert!(worker.pending.contains(transmission_id));
844        let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
845        assert!(result.is_ok());
846        assert!(!worker.pending.contains(transmission_id));
847        assert!(worker.ready.read().contains(transmission_id));
848    }
849
850    #[tokio::test]
851    async fn test_process_transaction_nok() {
852        let mut rng = &mut TestRng::default();
853        // Sample a committee.
854        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
855        let committee_clone = committee.clone();
856        // Setup the mock gateway and ledger.
857        let mut gateway = MockGateway::default();
858        gateway.expect_send().returning(|_, _| {
859            let (_tx, rx) = oneshot::channel();
860            Some(rx)
861        });
862        let mut mock_ledger = MockLedger::default();
863        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
864        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
865        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
866        mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
867        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
868        // Initialize the storage.
869        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
870
871        // Create the Worker.
872        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
873        let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
874        let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
875        let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
876        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
877        let worker_ = worker.clone();
878        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
879        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
880        assert!(worker.pending.contains(transmission_id));
881        let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
882        assert!(result.is_err());
883        assert!(!worker.pending.contains(transmission_id));
884        assert!(!worker.ready.read().contains(transmission_id));
885    }
886
887    #[tokio::test]
888    async fn test_flood_transmission_requests() {
889        let rng = &mut TestRng::default();
890        // Sample a committee.
891        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
892        let committee_clone = committee.clone();
893        // Setup the mock gateway and ledger.
894        let mut gateway = MockGateway::default();
895        gateway.expect_send().returning(|_, _| {
896            let (_tx, rx) = oneshot::channel();
897            Some(rx)
898        });
899        let mut mock_ledger = MockLedger::default();
900        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
901        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
902        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
903        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
904        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
905        // Initialize the storage.
906        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
907
908        // Create the Worker.
909        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
910        let transaction = sample_execution_transaction_with_fee(false, rng);
911        let transaction_id = transaction.id();
912        let transaction_data = Data::Object(transaction);
913        let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
914        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
915
916        // Determine the number of redundant requests are sent.
917        let num_redundant_requests =
918            max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
919        let num_flood_requests = num_redundant_requests * 10;
920        let mut peer_ips =
921            (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
922        let first_peer_ip = peer_ips[0];
923
924        // Flood the pending queue with transmission requests.
925        for i in 1..=num_flood_requests {
926            let worker_ = worker.clone();
927            let peer_ip = peer_ips.pop().unwrap();
928            tokio::spawn(async move {
929                let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
930            });
931            tokio::time::sleep(Duration::from_millis(10)).await;
932            // Check that the number of sent requests does not exceed the maximum number of redundant requests.
933            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
934            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
935        }
936        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
937        assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
938        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
939
940        // Let all the requests expire.
941        tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
942        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
943        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
944
945        // Flood the pending queue with transmission requests again, this time to a single peer
946        for i in 1..=num_flood_requests {
947            let worker_ = worker.clone();
948            tokio::spawn(async move {
949                let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
950            });
951            tokio::time::sleep(Duration::from_millis(10)).await;
952            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
953            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
954        }
955        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
956        assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
957        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
958
959        // Check that fulfilling a transmission request clears the pending queue.
960        let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
961        assert!(result.is_ok());
962        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
963        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
964        assert!(!worker.pending.contains(transmission_id));
965        assert!(worker.ready.read().contains(transmission_id));
966    }
967
968    #[tokio::test]
969    async fn test_storage_gc_on_initialization() {
970        let rng = &mut TestRng::default();
971
972        for _ in 0..ITERATIONS {
973            // Mock the ledger round.
974            let max_gc_rounds = rng.gen_range(50..=100);
975            let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
976            let expected_gc_round = latest_ledger_round - max_gc_rounds;
977
978            // Sample a committee.
979            let committee =
980                snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
981
982            // Setup the mock gateway and ledger.
983            let mut mock_ledger = MockLedger::default();
984            mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
985
986            let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
987            // Initialize the storage.
988            let storage =
989                Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
990
991            // Ensure that the storage GC round is correct.
992            assert_eq!(storage.gc_round(), expected_gc_round);
993        }
994    }
995}
996
997#[cfg(test)]
998mod prop_tests {
999    use super::*;
1000    use crate::Gateway;
1001    use snarkos_node_bft_ledger_service::MockLedgerService;
1002    use snarkvm::{
1003        console::account::Address,
1004        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1005    };
1006
1007    use test_strategy::proptest;
1008
1009    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1010
1011    // Initializes a new test committee.
1012    fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1013        let mut members = IndexMap::with_capacity(n as usize);
1014        for i in 0..n {
1015            // Sample the address.
1016            let rng = &mut TestRng::fixed(i as u64);
1017            let address = Address::new(rng.gen());
1018            info!("Validator {i}: {address}");
1019            members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
1020        }
1021        // Initialize the committee.
1022        Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1023    }
1024
1025    #[proptest]
1026    fn worker_initialization(
1027        #[strategy(0..MAX_WORKERS)] id: u8,
1028        gateway: Gateway<CurrentNetwork>,
1029        storage: Storage<CurrentNetwork>,
1030    ) {
1031        let committee = new_test_committee(4);
1032        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1033        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1034        assert_eq!(worker.id(), id);
1035    }
1036
1037    #[proptest]
1038    fn invalid_worker_id(
1039        #[strategy(MAX_WORKERS..)] id: u8,
1040        gateway: Gateway<CurrentNetwork>,
1041        storage: Storage<CurrentNetwork>,
1042    ) {
1043        let committee = new_test_committee(4);
1044        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1045        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1046        // TODO once Worker implements Debug, simplify this with `unwrap_err`
1047        if let Err(error) = worker {
1048            assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
1049        }
1050    }
1051}