snarkos_node_bft/
worker.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_FETCH_TIMEOUT_IN_MS,
18    MAX_WORKERS,
19    ProposedBatch,
20    Transport,
21    events::{Event, TransmissionRequest, TransmissionResponse},
22    helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23    spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27    console::prelude::*,
28    ledger::{
29        block::Transaction,
30        narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31        puzzle::{Solution, SolutionID},
32    },
33};
34
35use colored::Colorize;
36use indexmap::{IndexMap, IndexSet};
37use parking_lot::Mutex;
38use rand::seq::IteratorRandom;
39use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
40use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
41
42#[derive(Clone)]
43pub struct Worker<N: Network> {
44    /// The worker ID.
45    id: u8,
46    /// The gateway.
47    gateway: Arc<dyn Transport<N>>,
48    /// The storage.
49    storage: Storage<N>,
50    /// The ledger service.
51    ledger: Arc<dyn LedgerService<N>>,
52    /// The proposed batch.
53    proposed_batch: Arc<ProposedBatch<N>>,
54    /// The ready queue.
55    ready: Ready<N>,
56    /// The pending transmissions queue.
57    pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
58    /// The spawned handles.
59    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
60}
61
62impl<N: Network> Worker<N> {
63    /// Initializes a new worker instance.
64    pub fn new(
65        id: u8,
66        gateway: Arc<dyn Transport<N>>,
67        storage: Storage<N>,
68        ledger: Arc<dyn LedgerService<N>>,
69        proposed_batch: Arc<ProposedBatch<N>>,
70    ) -> Result<Self> {
71        // Ensure the worker ID is valid.
72        ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
73        // Return the worker.
74        Ok(Self {
75            id,
76            gateway,
77            storage,
78            ledger,
79            proposed_batch,
80            ready: Default::default(),
81            pending: Default::default(),
82            handles: Default::default(),
83        })
84    }
85
86    /// Run the worker instance.
87    pub fn run(&self, receiver: WorkerReceiver<N>) {
88        info!("Starting worker instance {} of the memory pool...", self.id);
89        // Start the worker handlers.
90        self.start_handlers(receiver);
91    }
92
93    /// Returns the worker ID.
94    pub const fn id(&self) -> u8 {
95        self.id
96    }
97
98    /// Returns a reference to the pending transmissions queue.
99    pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
100        &self.pending
101    }
102}
103
104impl<N: Network> Worker<N> {
105    /// The maximum number of transmissions allowed in a worker.
106    pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
107        BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
108    /// The maximum number of transmissions allowed in a worker ping.
109    pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
110
111    // transmissions
112
113    /// Returns the number of transmissions in the ready queue.
114    pub fn num_transmissions(&self) -> usize {
115        self.ready.num_transmissions()
116    }
117
118    /// Returns the number of ratifications in the ready queue.
119    pub fn num_ratifications(&self) -> usize {
120        self.ready.num_ratifications()
121    }
122
123    /// Returns the number of solutions in the ready queue.
124    pub fn num_solutions(&self) -> usize {
125        self.ready.num_solutions()
126    }
127
128    /// Returns the number of transactions in the ready queue.
129    pub fn num_transactions(&self) -> usize {
130        self.ready.num_transactions()
131    }
132}
133
134impl<N: Network> Worker<N> {
135    /// Returns the transmission IDs in the ready queue.
136    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
137        self.ready.transmission_ids()
138    }
139
140    /// Returns the transmissions in the ready queue.
141    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
142        self.ready.transmissions()
143    }
144
145    /// Returns the solutions in the ready queue.
146    pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
147        self.ready.solutions()
148    }
149
150    /// Returns the transactions in the ready queue.
151    pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
152        self.ready.transactions()
153    }
154}
155
156impl<N: Network> Worker<N> {
157    /// Clears the solutions from the ready queue.
158    pub(super) fn clear_solutions(&self) {
159        self.ready.clear_solutions()
160    }
161}
162
163impl<N: Network> Worker<N> {
164    /// Returns `true` if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
165    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
166        let transmission_id = transmission_id.into();
167        // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
168        self.ready.contains(transmission_id)
169            || self.proposed_batch.read().as_ref().map_or(false, |p| p.contains_transmission(transmission_id))
170            || self.storage.contains_transmission(transmission_id)
171            || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
172    }
173
174    /// Returns the transmission if it exists in the ready queue, proposed batch, storage.
175    ///
176    /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
177    /// in the ledger are not guaranteed to be invalid for the current batch.
178    pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
179        // Check if the transmission ID exists in the ready queue.
180        if let Some(transmission) = self.ready.get(transmission_id) {
181            return Some(transmission);
182        }
183        // Check if the transmission ID exists in storage.
184        if let Some(transmission) = self.storage.get_transmission(transmission_id) {
185            return Some(transmission);
186        }
187        // Check if the transmission ID exists in the proposed batch.
188        if let Some(transmission) =
189            self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
190        {
191            return Some(transmission.clone());
192        }
193        None
194    }
195
196    /// Returns the transmissions if it exists in the worker, or requests it from the specified peer.
197    pub async fn get_or_fetch_transmission(
198        &self,
199        peer_ip: SocketAddr,
200        transmission_id: TransmissionID<N>,
201    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
202        // Attempt to get the transmission from the worker.
203        if let Some(transmission) = self.get_transmission(transmission_id) {
204            return Ok((transmission_id, transmission));
205        }
206        // Send a transmission request to the peer.
207        let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
208        // Ensure the transmission ID matches.
209        ensure!(candidate_id == transmission_id, "Invalid transmission ID");
210        // Return the transmission.
211        Ok((transmission_id, transmission))
212    }
213
214    /// Removes up to the specified number of transmissions from the ready queue, and returns them.
215    pub(crate) fn drain(&self, num_transmissions: usize) -> impl Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
216        self.ready.drain(num_transmissions).into_iter()
217    }
218
219    /// Reinserts the specified transmission into the ready queue.
220    pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
221        // Check if the transmission ID exists.
222        if !self.contains_transmission(transmission_id) {
223            // Insert the transmission into the ready queue.
224            return self.ready.insert(transmission_id, transmission);
225        }
226        false
227    }
228
229    /// Broadcasts a worker ping event.
230    pub(crate) fn broadcast_ping(&self) {
231        // Retrieve the transmission IDs.
232        let transmission_ids = self
233            .ready
234            .transmission_ids()
235            .into_iter()
236            .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
237            .into_iter()
238            .collect::<IndexSet<_>>();
239
240        // Broadcast the ping event.
241        if !transmission_ids.is_empty() {
242            self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
243        }
244    }
245}
246
247impl<N: Network> Worker<N> {
248    /// Handles the incoming transmission ID from a worker ping event.
249    fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
250        // Check if the transmission ID exists.
251        if self.contains_transmission(transmission_id) {
252            return;
253        }
254        // If the ready queue is full, then skip this transmission.
255        // Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions.
256        if self.ready.num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
257            return;
258        }
259        // Attempt to fetch the transmission from the peer.
260        let self_ = self.clone();
261        tokio::spawn(async move {
262            // Send a transmission request to the peer.
263            match self_.send_transmission_request(peer_ip, transmission_id).await {
264                // If the transmission was fetched, then process it.
265                Ok((candidate_id, transmission)) => {
266                    // Ensure the transmission ID matches.
267                    if candidate_id == transmission_id {
268                        // Insert the transmission into the ready queue.
269                        // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched,
270                        // it could have already been inserted into the ready queue.
271                        self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
272                    }
273                }
274                // If the transmission was not fetched, then attempt to fetch it again.
275                Err(e) => {
276                    warn!(
277                        "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
278                        self_.id,
279                        fmt_id(transmission_id),
280                        fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
281                    );
282                }
283            }
284        });
285    }
286
287    /// Handles the incoming transmission from a peer.
288    pub(crate) fn process_transmission_from_peer(
289        &self,
290        peer_ip: SocketAddr,
291        transmission_id: TransmissionID<N>,
292        transmission: Transmission<N>,
293    ) {
294        // If the transmission ID already exists, then do not store it.
295        if self.contains_transmission(transmission_id) {
296            return;
297        }
298        // Ensure the transmission ID and transmission type matches.
299        let is_well_formed = match (&transmission_id, &transmission) {
300            (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
301            (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
302            // Note: We explicitly forbid inserting ratifications into the ready queue,
303            // as the protocol currently does not support ratifications.
304            (TransmissionID::Ratification, Transmission::Ratification) => false,
305            // All other combinations are clearly invalid.
306            _ => false,
307        };
308        // If the transmission is a deserialized execution, verify it immediately.
309        // This takes heavy transaction verification out of the hot path during block generation.
310        if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(tx)) = (transmission_id, &transmission)
311        {
312            if let Data::Object(Transaction::Execute(..)) = tx {
313                let self_ = self.clone();
314                let tx_ = tx.clone();
315                tokio::spawn(async move {
316                    let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
317                });
318            }
319        }
320        // If the transmission ID and transmission type matches, then insert the transmission into the ready queue.
321        if is_well_formed && self.ready.insert(transmission_id, transmission) {
322            trace!(
323                "Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
324                self.id,
325                fmt_id(transmission_id),
326                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
327            );
328        }
329    }
330
331    /// Handles the incoming unconfirmed solution.
332    /// Note: This method assumes the incoming solution is valid and does not exist in the ledger.
333    pub(crate) async fn process_unconfirmed_solution(
334        &self,
335        solution_id: SolutionID<N>,
336        solution: Data<Solution<N>>,
337    ) -> Result<()> {
338        // Construct the transmission.
339        let transmission = Transmission::Solution(solution.clone());
340        // Compute the checksum.
341        let checksum = solution.to_checksum::<N>()?;
342        // Construct the transmission ID.
343        let transmission_id = TransmissionID::Solution(solution_id, checksum);
344        // Remove the solution ID from the pending queue.
345        self.pending.remove(transmission_id, Some(transmission.clone()));
346        // Check if the solution exists.
347        if self.contains_transmission(transmission_id) {
348            bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
349        }
350        // Check that the solution is well-formed and unique.
351        self.ledger.check_solution_basic(solution_id, solution).await?;
352        // Adds the solution to the ready queue.
353        if self.ready.insert(transmission_id, transmission) {
354            trace!(
355                "Worker {} - Added unconfirmed solution '{}.{}'",
356                self.id,
357                fmt_id(solution_id),
358                fmt_id(checksum).dimmed()
359            );
360        }
361        Ok(())
362    }
363
364    /// Handles the incoming unconfirmed transaction.
365    pub(crate) async fn process_unconfirmed_transaction(
366        &self,
367        transaction_id: N::TransactionID,
368        transaction: Data<Transaction<N>>,
369    ) -> Result<()> {
370        // Construct the transmission.
371        let transmission = Transmission::Transaction(transaction.clone());
372        // Compute the checksum.
373        let checksum = transaction.to_checksum::<N>()?;
374        // Construct the transmission ID.
375        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
376        // Remove the transaction from the pending queue.
377        self.pending.remove(transmission_id, Some(transmission.clone()));
378        // Check if the transaction ID exists.
379        if self.contains_transmission(transmission_id) {
380            bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
381        }
382        // Check that the transaction is well-formed and unique.
383        self.ledger.check_transaction_basic(transaction_id, transaction).await?;
384        // Adds the transaction to the ready queue.
385        if self.ready.insert(transmission_id, transmission) {
386            trace!(
387                "Worker {}.{} - Added unconfirmed transaction '{}'",
388                self.id,
389                fmt_id(transaction_id),
390                fmt_id(checksum).dimmed()
391            );
392        }
393        Ok(())
394    }
395}
396
397impl<N: Network> Worker<N> {
398    /// Starts the worker handlers.
399    fn start_handlers(&self, receiver: WorkerReceiver<N>) {
400        let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
401
402        // Start the pending queue expiration loop.
403        let self_ = self.clone();
404        self.spawn(async move {
405            loop {
406                // Sleep briefly.
407                tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
408
409                // Remove the expired pending certificate requests.
410                let self__ = self_.clone();
411                let _ = spawn_blocking!({
412                    self__.pending.clear_expired_callbacks();
413                    Ok(())
414                });
415            }
416        });
417
418        // Process the ping events.
419        let self_ = self.clone();
420        self.spawn(async move {
421            while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
422                self_.process_transmission_id_from_ping(peer_ip, transmission_id);
423            }
424        });
425
426        // Process the transmission requests.
427        let self_ = self.clone();
428        self.spawn(async move {
429            while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
430                self_.send_transmission_response(peer_ip, transmission_request);
431            }
432        });
433
434        // Process the transmission responses.
435        let self_ = self.clone();
436        self.spawn(async move {
437            while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
438                // Process the transmission response.
439                let self__ = self_.clone();
440                let _ = spawn_blocking!({
441                    self__.finish_transmission_request(peer_ip, transmission_response);
442                    Ok(())
443                });
444            }
445        });
446    }
447
448    /// Sends a transmission request to the specified peer.
449    async fn send_transmission_request(
450        &self,
451        peer_ip: SocketAddr,
452        transmission_id: TransmissionID<N>,
453    ) -> Result<(TransmissionID<N>, Transmission<N>)> {
454        // Initialize a oneshot channel.
455        let (callback_sender, callback_receiver) = oneshot::channel();
456        // Determine how many sent requests are pending.
457        let num_sent_requests = self.pending.num_sent_requests(transmission_id);
458        // Determine if we've already sent a request to the peer.
459        let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
460        // Determine the maximum number of redundant requests.
461        let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
462        // Determine if we should send a transmission request to the peer.
463        // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
464        let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
465
466        // Insert the transmission ID into the pending queue.
467        self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
468
469        // If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer.
470        if should_send_request {
471            // Send the transmission request to the peer.
472            if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
473                bail!("Unable to fetch transmission - failed to send request")
474            }
475        } else {
476            debug!(
477                "Skipped sending request for transmission {}.{} to '{peer_ip}' ({num_sent_requests} redundant requests)",
478                fmt_id(transmission_id),
479                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
480            );
481        }
482        // Wait for the transmission to be fetched.
483        match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
484            // If the transmission was fetched, return it.
485            Ok(result) => Ok((transmission_id, result?)),
486            // If the transmission was not fetched, return an error.
487            Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
488        }
489    }
490
491    /// Handles the incoming transmission response.
492    /// This method ensures the transmission response is well-formed and matches the transmission ID.
493    fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
494        let TransmissionResponse { transmission_id, mut transmission } = response;
495        // Check if the peer IP exists in the pending queue for the given transmission ID.
496        let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
497        // If the peer IP exists, finish the pending request.
498        if exists {
499            // Ensure the transmission is not a fee and matches the transmission ID.
500            match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
501                Ok(()) => {
502                    // Remove the transmission ID from the pending queue.
503                    self.pending.remove(transmission_id, Some(transmission));
504                }
505                Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
506            };
507        }
508    }
509
510    /// Sends the requested transmission to the specified peer.
511    fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
512        let TransmissionRequest { transmission_id } = request;
513        // Attempt to retrieve the transmission.
514        if let Some(transmission) = self.get_transmission(transmission_id) {
515            // Send the transmission response to the peer.
516            let self_ = self.clone();
517            tokio::spawn(async move {
518                self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
519            });
520        }
521    }
522
523    /// Spawns a task with the given future; it should only be used for long-running tasks.
524    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
525        self.handles.lock().push(tokio::spawn(future));
526    }
527
528    /// Shuts down the worker.
529    pub(crate) fn shut_down(&self) {
530        trace!("Shutting down worker {}...", self.id);
531        // Abort the tasks.
532        self.handles.lock().iter().for_each(|handle| handle.abort());
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
540    use snarkos_node_bft_ledger_service::LedgerService;
541    use snarkos_node_bft_storage_service::BFTMemoryService;
542    use snarkvm::{
543        console::{network::Network, types::Field},
544        ledger::{
545            block::Block,
546            committee::Committee,
547            narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
548        },
549        prelude::Address,
550    };
551
552    use bytes::Bytes;
553    use indexmap::IndexMap;
554    use mockall::mock;
555    use std::{io, ops::Range};
556
557    type CurrentNetwork = snarkvm::prelude::MainnetV0;
558
559    const ITERATIONS: usize = 100;
560
561    mock! {
562        Gateway<N: Network> {}
563        #[async_trait]
564        impl<N:Network> Transport<N> for Gateway<N> {
565            fn broadcast(&self, event: Event<N>);
566            async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
567        }
568    }
569
570    mock! {
571        #[derive(Debug)]
572        Ledger<N: Network> {}
573        #[async_trait]
574        impl<N: Network> LedgerService<N> for Ledger<N> {
575            fn latest_round(&self) -> u64;
576            fn latest_block_height(&self) -> u32;
577            fn latest_block(&self) -> Block<N>;
578            fn latest_restrictions_id(&self) -> Field<N>;
579            fn latest_leader(&self) -> Option<(u64, Address<N>)>;
580            fn update_latest_leader(&self, round: u64, leader: Address<N>);
581            fn contains_block_height(&self, height: u32) -> bool;
582            fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
583            fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
584            fn get_block_round(&self, height: u32) -> Result<u64>;
585            fn get_block(&self, height: u32) -> Result<Block<N>>;
586            fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
587            fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
588            fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
589            fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
590            fn current_committee(&self) -> Result<Committee<N>>;
591            fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
592            fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
593            fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
594            fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
595            fn ensure_transmission_is_well_formed(
596                &self,
597                transmission_id: TransmissionID<N>,
598                transmission: &mut Transmission<N>,
599            ) -> Result<()>;
600            async fn check_solution_basic(
601                &self,
602                solution_id: SolutionID<N>,
603                solution: Data<Solution<N>>,
604            ) -> Result<()>;
605            async fn check_transaction_basic(
606                &self,
607                transaction_id: N::TransactionID,
608                transaction: Data<Transaction<N>>,
609            ) -> Result<()>;
610            fn check_next_block(&self, block: &Block<N>) -> Result<()>;
611            fn prepare_advance_to_next_quorum_block(
612                &self,
613                subdag: Subdag<N>,
614                transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
615            ) -> Result<Block<N>>;
616            fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
617        }
618    }
619
620    #[tokio::test]
621    async fn test_max_redundant_requests() {
622        let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
623
624        let rng = &mut TestRng::default();
625        // Sample a committee.
626        let committee =
627            snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
628        let committee_clone = committee.clone();
629        // Setup the mock ledger.
630        let mut mock_ledger = MockLedger::default();
631        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
632        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
633        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
634        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
635        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
636
637        // Ensure the maximum number of redundant requests is correct and consistent across iterations.
638        assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
639    }
640
641    #[tokio::test]
642    async fn test_process_transmission() {
643        let rng = &mut TestRng::default();
644        // Sample a committee.
645        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
646        let committee_clone = committee.clone();
647        // Setup the mock gateway and ledger.
648        let gateway = MockGateway::default();
649        let mut mock_ledger = MockLedger::default();
650        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
651        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
652        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
653        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
654        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
655        // Initialize the storage.
656        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
657
658        // Create the Worker.
659        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
660        let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
661        let transmission_id = TransmissionID::Solution(
662            rng.gen::<u64>().into(),
663            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
664        );
665        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
666        let transmission = Transmission::Solution(data(rng));
667
668        // Process the transmission.
669        worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
670        assert!(worker.contains_transmission(transmission_id));
671        assert!(worker.ready.contains(transmission_id));
672        assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
673        // Take the transmission from the ready set.
674        let transmission: Vec<_> = worker.drain(1).collect();
675        assert_eq!(transmission.len(), 1);
676        assert!(!worker.ready.contains(transmission_id));
677    }
678
679    #[tokio::test]
680    async fn test_send_transmission() {
681        let rng = &mut TestRng::default();
682        // Sample a committee.
683        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
684        let committee_clone = committee.clone();
685        // Setup the mock gateway and ledger.
686        let mut gateway = MockGateway::default();
687        gateway.expect_send().returning(|_, _| {
688            let (_tx, rx) = oneshot::channel();
689            Some(rx)
690        });
691        let mut mock_ledger = MockLedger::default();
692        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
693        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
694        mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
695        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
696        // Initialize the storage.
697        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
698
699        // Create the Worker.
700        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
701        let transmission_id = TransmissionID::Solution(
702            rng.gen::<u64>().into(),
703            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
704        );
705        let worker_ = worker.clone();
706        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
707        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
708        assert!(worker.pending.contains(transmission_id));
709        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
710        // Fake the transmission response.
711        worker.finish_transmission_request(peer_ip, TransmissionResponse {
712            transmission_id,
713            transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
714        });
715        // Check the transmission was removed from the pending set.
716        assert!(!worker.pending.contains(transmission_id));
717    }
718
719    #[tokio::test]
720    async fn test_process_solution_ok() {
721        let rng = &mut TestRng::default();
722        // Sample a committee.
723        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
724        let committee_clone = committee.clone();
725        // Setup the mock gateway and ledger.
726        let mut gateway = MockGateway::default();
727        gateway.expect_send().returning(|_, _| {
728            let (_tx, rx) = oneshot::channel();
729            Some(rx)
730        });
731        let mut mock_ledger = MockLedger::default();
732        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
733        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
734        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
735        mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
736        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
737        // Initialize the storage.
738        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
739
740        // Create the Worker.
741        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
742        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
743        let solution_id = rng.gen::<u64>().into();
744        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
745        let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
746        let worker_ = worker.clone();
747        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
748        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
749        assert!(worker.pending.contains(transmission_id));
750        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
751        assert!(result.is_ok());
752        assert!(!worker.pending.contains(transmission_id));
753        assert!(worker.ready.contains(transmission_id));
754    }
755
756    #[tokio::test]
757    async fn test_process_solution_nok() {
758        let rng = &mut TestRng::default();
759        // Sample a committee.
760        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
761        let committee_clone = committee.clone();
762        // Setup the mock gateway and ledger.
763        let mut gateway = MockGateway::default();
764        gateway.expect_send().returning(|_, _| {
765            let (_tx, rx) = oneshot::channel();
766            Some(rx)
767        });
768        let mut mock_ledger = MockLedger::default();
769        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
770        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
771        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
772        mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
773        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
774        // Initialize the storage.
775        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
776
777        // Create the Worker.
778        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
779        let solution_id = rng.gen::<u64>().into();
780        let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
781        let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
782        let transmission_id = TransmissionID::Solution(solution_id, checksum);
783        let worker_ = worker.clone();
784        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
785        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
786        assert!(worker.pending.contains(transmission_id));
787        let result = worker.process_unconfirmed_solution(solution_id, solution).await;
788        assert!(result.is_err());
789        assert!(!worker.pending.contains(transmission_id));
790        assert!(!worker.ready.contains(transmission_id));
791    }
792
793    #[tokio::test]
794    async fn test_process_transaction_ok() {
795        let mut rng = &mut TestRng::default();
796        // Sample a committee.
797        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
798        let committee_clone = committee.clone();
799        // Setup the mock gateway and ledger.
800        let mut gateway = MockGateway::default();
801        gateway.expect_send().returning(|_, _| {
802            let (_tx, rx) = oneshot::channel();
803            Some(rx)
804        });
805        let mut mock_ledger = MockLedger::default();
806        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
807        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
808        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
809        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
810        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
811        // Initialize the storage.
812        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
813
814        // Create the Worker.
815        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
816        let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
817        let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
818        let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
819        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
820        let worker_ = worker.clone();
821        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
822        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
823        assert!(worker.pending.contains(transmission_id));
824        let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
825        assert!(result.is_ok());
826        assert!(!worker.pending.contains(transmission_id));
827        assert!(worker.ready.contains(transmission_id));
828    }
829
830    #[tokio::test]
831    async fn test_process_transaction_nok() {
832        let mut rng = &mut TestRng::default();
833        // Sample a committee.
834        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
835        let committee_clone = committee.clone();
836        // Setup the mock gateway and ledger.
837        let mut gateway = MockGateway::default();
838        gateway.expect_send().returning(|_, _| {
839            let (_tx, rx) = oneshot::channel();
840            Some(rx)
841        });
842        let mut mock_ledger = MockLedger::default();
843        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
844        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
845        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
846        mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
847        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
848        // Initialize the storage.
849        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
850
851        // Create the Worker.
852        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
853        let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
854        let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
855        let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
856        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
857        let worker_ = worker.clone();
858        let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
859        let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
860        assert!(worker.pending.contains(transmission_id));
861        let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
862        assert!(result.is_err());
863        assert!(!worker.pending.contains(transmission_id));
864        assert!(!worker.ready.contains(transmission_id));
865    }
866
867    #[tokio::test]
868    async fn test_flood_transmission_requests() {
869        let mut rng = &mut TestRng::default();
870        // Sample a committee.
871        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
872        let committee_clone = committee.clone();
873        // Setup the mock gateway and ledger.
874        let mut gateway = MockGateway::default();
875        gateway.expect_send().returning(|_, _| {
876            let (_tx, rx) = oneshot::channel();
877            Some(rx)
878        });
879        let mut mock_ledger = MockLedger::default();
880        mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
881        mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
882        mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
883        mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
884        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
885        // Initialize the storage.
886        let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
887
888        // Create the Worker.
889        let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
890        let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
891        let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
892        let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
893        let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
894
895        // Determine the number of redundant requests are sent.
896        let num_redundant_requests =
897            max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
898        let num_flood_requests = num_redundant_requests * 10;
899        let mut peer_ips =
900            (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
901        let first_peer_ip = peer_ips[0];
902
903        // Flood the pending queue with transmission requests.
904        for i in 1..=num_flood_requests {
905            let worker_ = worker.clone();
906            let peer_ip = peer_ips.pop().unwrap();
907            tokio::spawn(async move {
908                let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
909            });
910            tokio::time::sleep(Duration::from_millis(10)).await;
911            // Check that the number of sent requests does not exceed the maximum number of redundant requests.
912            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
913            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
914        }
915        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
916        assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
917        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
918
919        // Let all the requests expire.
920        tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
921        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
922        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
923
924        // Flood the pending queue with transmission requests again, this time to a single peer
925        for i in 1..=num_flood_requests {
926            let worker_ = worker.clone();
927            tokio::spawn(async move {
928                let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
929            });
930            tokio::time::sleep(Duration::from_millis(10)).await;
931            assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
932            assert_eq!(worker.pending.num_callbacks(transmission_id), i);
933        }
934        // Check that the number of sent requests does not exceed the maximum number of redundant requests.
935        assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
936        assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
937
938        // Check that fulfilling a transmission request clears the pending queue.
939        let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
940        assert!(result.is_ok());
941        assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
942        assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
943        assert!(!worker.pending.contains(transmission_id));
944        assert!(worker.ready.contains(transmission_id));
945    }
946
947    #[tokio::test]
948    async fn test_storage_gc_on_initialization() {
949        let rng = &mut TestRng::default();
950
951        for _ in 0..ITERATIONS {
952            // Mock the ledger round.
953            let max_gc_rounds = rng.gen_range(50..=100);
954            let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
955            let expected_gc_round = latest_ledger_round - max_gc_rounds;
956
957            // Sample a committee.
958            let committee =
959                snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
960
961            // Setup the mock gateway and ledger.
962            let mut mock_ledger = MockLedger::default();
963            mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
964
965            let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
966            // Initialize the storage.
967            let storage =
968                Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
969
970            // Ensure that the storage GC round is correct.
971            assert_eq!(storage.gc_round(), expected_gc_round);
972        }
973    }
974}
975
976#[cfg(test)]
977mod prop_tests {
978    use super::*;
979    use crate::Gateway;
980    use snarkos_node_bft_ledger_service::MockLedgerService;
981    use snarkvm::{
982        console::account::Address,
983        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
984    };
985
986    use test_strategy::proptest;
987
988    type CurrentNetwork = snarkvm::prelude::MainnetV0;
989
990    // Initializes a new test committee.
991    fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
992        let mut members = IndexMap::with_capacity(n as usize);
993        for i in 0..n {
994            // Sample the address.
995            let rng = &mut TestRng::fixed(i as u64);
996            let address = Address::new(rng.gen());
997            info!("Validator {i}: {address}");
998            members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
999        }
1000        // Initialize the committee.
1001        Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1002    }
1003
1004    #[proptest]
1005    fn worker_initialization(
1006        #[strategy(0..MAX_WORKERS)] id: u8,
1007        gateway: Gateway<CurrentNetwork>,
1008        storage: Storage<CurrentNetwork>,
1009    ) {
1010        let committee = new_test_committee(4);
1011        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1012        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1013        assert_eq!(worker.id(), id);
1014    }
1015
1016    #[proptest]
1017    fn invalid_worker_id(
1018        #[strategy(MAX_WORKERS..)] id: u8,
1019        gateway: Gateway<CurrentNetwork>,
1020        storage: Storage<CurrentNetwork>,
1021    ) {
1022        let committee = new_test_committee(4);
1023        let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1024        let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1025        // TODO once Worker implements Debug, simplify this with `unwrap_err`
1026        if let Err(error) = worker {
1027            assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
1028        }
1029    }
1030}